1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package net.sf.jour.rt.agent;
22
23 import java.util.*;
24 import javax.naming.Context;
25 import javax.naming.InitialContext;
26 import javax.naming.NameNotFoundException;
27 import javax.naming.NamingException;
28
29 import net.sf.jour.rt.RtProperties;
30 import net.sf.jour.util.ShutdownHook;
31 import net.sf.jour.util.ShutdownListener;
32 import net.sf.jour.util.queue.Queue;
33
34 import org.apache.log4j.Logger;
35 import javax.jms.*;
36
37
38
39 /***
40 * TODO Add docs
41 *
42 * Created on 11.12.2004
43 * Contributing Author(s):
44 *
45 * Misha Lifschitz <mishalifschitz at users.sourceforge.net> (Inital implementation)
46 * Vlad Skarzhevskyy <vlads at users.sourceforge.net> (Inital implementation)
47 *
48 * @author vlads
49 * @version $Revision: 1.1 $ ($Author: vlads $) $Date: 2004/12/12 02:00:01 $
50 */
51 public class BulkEventJMSConsumer implements javax.jms.MessageListener, ShutdownListener {
52
53 protected final Logger log = Logger.getLogger(BulkEventJMSConsumer.class);
54
55 private static EventQueueLogger eventLogger = (EventQueueLogger) EventQueueLogger.getInstance();
56
57 String topicConnectionFactoryBindingName;
58
59 String topicBindingName;
60
61 TopicConnection topicConnection;
62
63 public static void main(String[] args) {
64 }
65
66
67 public BulkEventJMSConsumer() {
68
69 try {
70 getProperties();
71 Context jndi = new InitialContext();
72
73 TopicConnectionFactory topicConnectionFactory;
74 topicConnectionFactory = (TopicConnectionFactory) lookup(jndi, this.topicConnectionFactoryBindingName, TopicConnectionFactory.class);
75 topicConnection = topicConnectionFactory.createTopicConnection();
76 topicConnection.start();
77
78 log.debug("Creating TopicSession, non-transactional, in AUTO_ACKNOWLEDGE mode.");
79 TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
80 Topic topic = (Topic) lookup(jndi, topicBindingName, Topic.class);
81
82 log.debug("Creating TopicPublisher.");
83 TopicSubscriber topicSubscriber = topicSession.createSubscriber(topic);
84
85 log.info("Staring Listener for " + topicBindingName);
86
87 topicSubscriber.setMessageListener(this);
88
89 jndi.close();
90 } catch (Exception e) {
91 log.error("Error while activating connection", e);
92 }
93
94 ShutdownHook.addListener(this);
95 }
96
97 public static BulkEventJMSConsumer start() {
98 return new BulkEventJMSConsumer();
99 }
100
101 void getProperties() {
102 this.topicConnectionFactoryBindingName = RtProperties.getInstance().getProperty("BulkEventJMSConsumer.TopicConnectionFactoryBindingName", "net.sf.jour.ConnectionFactory");
103 this.topicBindingName = RtProperties.getInstance().getProperty("BulkEventJMSConsumer.TopicBindingName", "net.sf.jour.JourJMSQueue");
104
105 }
106
107 public void onMessage(javax.jms.Message message) {
108 try {
109 if (message instanceof ObjectMessage) {
110
111 ObjectMessage objectMessage = (ObjectMessage) message;
112
113 Queue eventQueue = (Queue)objectMessage.getObject();
114
115 process(eventQueue);
116
117 } else {
118 log.warn("Received message is of type " + message.getJMSType() + ", was expecting ObjectMessage.");
119 }
120 } catch (Error jmse) {
121 log.error("Error thrown while processing incoming message.", jmse);
122 } catch (JMSException jmse) {
123 log.error("Exception thrown while processing incoming message.", jmse);
124 }
125 }
126
127 private void process(Queue eventQueue) {
128 ThreadNumber tn = null;
129 SystemInfoEvent si = null;
130
131 log.debug("Loaded queue:" + eventQueue.size());
132
133 for (Iterator i = eventQueue.iterator(); i.hasNext(); ) {
134 Object obj = i.next();
135 if (obj instanceof ThreadNumber) {
136 tn = (ThreadNumber)obj;
137 } else if (obj instanceof SystemInfoEvent) {
138 si = (SystemInfoEvent)obj;
139 } else if (obj instanceof ProfilerEvent) {
140 ProfilerEvent e = (ProfilerEvent)obj;
141 String threadName = ThreadNumber.getUniqueThreadName(tn, si);
142 e.setThreadName(threadName);
143 }
144 }
145 eventLogger.enqueueAll(eventQueue);
146 }
147
148
149
150
151 public void shutdown() {
152 if (topicConnection != null) {
153 try {
154 topicConnection.close();
155 } catch (JMSException e) {
156 log.warn("Error", e);
157 }
158 }
159 }
160
161
162 protected Object lookup(Context jndi, String name, Class target) throws NamingException {
163 try {
164 log.debug("Looking up " + target.getName() + " [" + name + "]");
165 Object obj = jndi.lookup(name);
166 if (log.isDebugEnabled()) {
167 log.debug("Found :" + obj.getClass().getName());
168 }
169 if (!target.isAssignableFrom(obj.getClass())) {
170 log.warn("result is not assignable From " + target.getName());
171 }
172 return obj;
173 } catch (NameNotFoundException e) {
174 log.error("Could not find name [" + name + "].");
175 throw e;
176 }
177 }
178 }