View Javadoc

1   /*
2    * Jour - java profiler and monitoring library
3    *
4    * Copyright (C) 2004 Jour team
5    *
6    * This library is free software; you can redistribute it and/or
7    * modify it under the terms of the GNU Library General Public
8    * License as published by the Free Software Foundation; either
9    * version 2 of the License, or (at your option) any later version.
10   *
11   * This library is distributed in the hope that it will be useful,
12   * but WITHOUT ANY WARRANTY; without even the implied warranty of
13   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14   * Library General Public License for more details.
15   *
16   * You should have received a copy of the GNU Library General Public
17   * License along with this library; if not, write to the
18   * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19   * Boston, MA  02111-1307, USA.
20   */
21  package net.sf.jour.rt.agent;
22  
23  import net.sf.jour.rt.RtProperties;
24  import net.sf.jour.util.queue.Queue;
25  
26  import javax.jms.*;
27  import javax.naming.*;
28  
29  import org.apache.log4j.Logger;
30  
31  /***
32   * TODO Add docs
33   *
34   * Created on 11.12.2004
35   * Contributing Author(s):
36   *
37   *   Misha Lifschitz <mishalifschitz at users.sourceforge.net> (Inital implementation)
38   *   Vlad Skarzhevskyy <vlads at users.sourceforge.net> (Inital implementation)
39   *
40   * @author vlads
41   * @version $Revision: 1.2 $ ($Author: vlads $)  $Date: 2004/12/15 08:00:36 $
42   */
43  public class BulkEventJMSWriter implements BulkEventLogger {
44  
45      protected final Logger log = Logger.getLogger(BulkEventJMSWriter.class);
46  
47      String topicConnectionFactoryBindingName;
48  
49      String topicBindingName;
50  
51      TopicConnection topicConnection;
52  
53      TopicSession topicSession;
54  
55      TopicPublisher topicPublisher;
56  
57  
58      public void recive(Thread key, Queue eventQueue) {
59          if (!checkConnection()) {
60              return;
61          }
62  
63          try {
64              ThreadNumber thread = ThreadNumber.getThreadNumberEvent(key);
65              SystemInfoEvent systemInfo = SystemInfoEvent.instance();
66              // TODO move to common code.
67              eventQueue.enqueueFirst(new JVMInfoEvent());
68              eventQueue.enqueueFirst(thread);
69            	eventQueue.enqueueFirst(systemInfo);
70  
71              ObjectMessage msg = topicSession.createObjectMessage();
72              msg.setObject(eventQueue);
73              topicPublisher.publish(msg);
74  
75          } catch (Exception e) {
76              log.error("Could not publish message in BulkEventJMSWriter", e);
77              closeConnection();
78          }
79      }
80  
81      boolean checkConnection() {
82          String fail = null;
83  
84          if (topicPublisher == null) {
85              activateConnection();
86          }
87  
88          if (this.topicConnection == null) {
89              fail = "No TopicConnection";
90          } else if (this.topicSession == null) {
91              fail = "No TopicSession";
92          } else if (this.topicPublisher == null) {
93              fail = "No TopicPublisher";
94          }
95  
96          if (fail != null) {
97              log.error(fail + " for BulkEventJMSWriter");
98              return false;
99          } else {
100             return true;
101         }
102     }
103 
104     void getProperties() {
105         this.topicConnectionFactoryBindingName = RtProperties.getInstance().getProperty("BulkEventJMSWriter.TopicConnectionFactoryBindingName", "net.sf.jour.ConnectionFactory");
106         this.topicBindingName = RtProperties.getInstance().getProperty("BulkEventJMSWriter.TopicBindingName", "net.sf.jour.JourJMSQueue");
107     }
108 
109     void activateConnection() {
110         try {
111             getProperties();
112             Context jndi = new InitialContext();
113 
114             TopicConnectionFactory topicConnectionFactory;
115             topicConnectionFactory = (TopicConnectionFactory) lookup(jndi, this.topicConnectionFactoryBindingName, TopicConnectionFactory.class);
116             topicConnection = topicConnectionFactory.createTopicConnection();
117 
118             log.debug("Creating TopicSession, non-transactional, in AUTO_ACKNOWLEDGE mode.");
119             topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
120             Topic topic = (Topic) lookup(jndi, topicBindingName, Topic.class);
121 
122             log.debug("Creating TopicPublisher.");
123             topicPublisher = topicSession.createPublisher(topic);
124 
125             log.debug("Starting TopicConnection.");
126             topicConnection.start();
127 
128             jndi.close();
129         } catch (Exception e) {
130             log.error("Error while activating connection", e);
131             closeConnection();
132         }
133     }
134 
135     protected Object lookup(Context jndi, String name, Class target) throws NamingException {
136         try {
137             log.debug("Looking up " + target.getName() + " [" + name + "]");
138             Object obj = jndi.lookup(name);
139             if (log.isDebugEnabled()) {
140                 log.debug("Found :" + obj.getClass().getName());
141             }
142             if (!target.isAssignableFrom(obj.getClass())) {
143                 log.warn("result is not assignable From " + target.getName());
144             }
145             return obj;
146         } catch (NameNotFoundException e) {
147             log.error("Could not find name [" + name + "].");
148             throw e;
149         }
150     }
151 
152     void closeConnection() {
153         try {
154             if (topicSession != null) {
155                 topicSession.close();
156             }
157             if (topicConnection != null) {
158                 topicConnection.close();
159             }
160         } catch (Exception e) {
161             log.error("Error while closing JMS", e);
162         }
163         // Help garbage collection
164         topicPublisher = null;
165         topicSession = null;
166         topicConnection = null;
167 
168     }
169 
170 	/***
171 	 * Close the Queue for this thread.
172 	 */
173     public void close(Thread key) {
174         close();
175     }
176 
177 	/***
178 	 * Close All and be ready to exit.
179 	 */
180     public void close() {
181 
182 	}
183 
184     public boolean isClose() {
185 	    return true;
186 	}
187 }