1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package net.sf.jour.rt.agent;
22
23 import net.sf.jour.rt.RtProperties;
24 import net.sf.jour.util.queue.Queue;
25
26 import javax.jms.*;
27 import javax.naming.*;
28
29 import org.apache.log4j.Logger;
30
31 /***
32 * TODO Add docs
33 *
34 * Created on 11.12.2004
35 * Contributing Author(s):
36 *
37 * Misha Lifschitz <mishalifschitz at users.sourceforge.net> (Inital implementation)
38 * Vlad Skarzhevskyy <vlads at users.sourceforge.net> (Inital implementation)
39 *
40 * @author vlads
41 * @version $Revision: 1.2 $ ($Author: vlads $) $Date: 2004/12/15 08:00:36 $
42 */
43 public class BulkEventJMSWriter implements BulkEventLogger {
44
45 protected final Logger log = Logger.getLogger(BulkEventJMSWriter.class);
46
47 String topicConnectionFactoryBindingName;
48
49 String topicBindingName;
50
51 TopicConnection topicConnection;
52
53 TopicSession topicSession;
54
55 TopicPublisher topicPublisher;
56
57
58 public void recive(Thread key, Queue eventQueue) {
59 if (!checkConnection()) {
60 return;
61 }
62
63 try {
64 ThreadNumber thread = ThreadNumber.getThreadNumberEvent(key);
65 SystemInfoEvent systemInfo = SystemInfoEvent.instance();
66
67 eventQueue.enqueueFirst(new JVMInfoEvent());
68 eventQueue.enqueueFirst(thread);
69 eventQueue.enqueueFirst(systemInfo);
70
71 ObjectMessage msg = topicSession.createObjectMessage();
72 msg.setObject(eventQueue);
73 topicPublisher.publish(msg);
74
75 } catch (Exception e) {
76 log.error("Could not publish message in BulkEventJMSWriter", e);
77 closeConnection();
78 }
79 }
80
81 boolean checkConnection() {
82 String fail = null;
83
84 if (topicPublisher == null) {
85 activateConnection();
86 }
87
88 if (this.topicConnection == null) {
89 fail = "No TopicConnection";
90 } else if (this.topicSession == null) {
91 fail = "No TopicSession";
92 } else if (this.topicPublisher == null) {
93 fail = "No TopicPublisher";
94 }
95
96 if (fail != null) {
97 log.error(fail + " for BulkEventJMSWriter");
98 return false;
99 } else {
100 return true;
101 }
102 }
103
104 void getProperties() {
105 this.topicConnectionFactoryBindingName = RtProperties.getInstance().getProperty("BulkEventJMSWriter.TopicConnectionFactoryBindingName", "net.sf.jour.ConnectionFactory");
106 this.topicBindingName = RtProperties.getInstance().getProperty("BulkEventJMSWriter.TopicBindingName", "net.sf.jour.JourJMSQueue");
107 }
108
109 void activateConnection() {
110 try {
111 getProperties();
112 Context jndi = new InitialContext();
113
114 TopicConnectionFactory topicConnectionFactory;
115 topicConnectionFactory = (TopicConnectionFactory) lookup(jndi, this.topicConnectionFactoryBindingName, TopicConnectionFactory.class);
116 topicConnection = topicConnectionFactory.createTopicConnection();
117
118 log.debug("Creating TopicSession, non-transactional, in AUTO_ACKNOWLEDGE mode.");
119 topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
120 Topic topic = (Topic) lookup(jndi, topicBindingName, Topic.class);
121
122 log.debug("Creating TopicPublisher.");
123 topicPublisher = topicSession.createPublisher(topic);
124
125 log.debug("Starting TopicConnection.");
126 topicConnection.start();
127
128 jndi.close();
129 } catch (Exception e) {
130 log.error("Error while activating connection", e);
131 closeConnection();
132 }
133 }
134
135 protected Object lookup(Context jndi, String name, Class target) throws NamingException {
136 try {
137 log.debug("Looking up " + target.getName() + " [" + name + "]");
138 Object obj = jndi.lookup(name);
139 if (log.isDebugEnabled()) {
140 log.debug("Found :" + obj.getClass().getName());
141 }
142 if (!target.isAssignableFrom(obj.getClass())) {
143 log.warn("result is not assignable From " + target.getName());
144 }
145 return obj;
146 } catch (NameNotFoundException e) {
147 log.error("Could not find name [" + name + "].");
148 throw e;
149 }
150 }
151
152 void closeConnection() {
153 try {
154 if (topicSession != null) {
155 topicSession.close();
156 }
157 if (topicConnection != null) {
158 topicConnection.close();
159 }
160 } catch (Exception e) {
161 log.error("Error while closing JMS", e);
162 }
163
164 topicPublisher = null;
165 topicSession = null;
166 topicConnection = null;
167
168 }
169
170 /***
171 * Close the Queue for this thread.
172 */
173 public void close(Thread key) {
174 close();
175 }
176
177 /***
178 * Close All and be ready to exit.
179 */
180 public void close() {
181
182 }
183
184 public boolean isClose() {
185 return true;
186 }
187 }