View Javadoc

1   /*
2    * Jour - java profiler and monitoring library
3    *
4    * Copyright (C) 2004 Jour team
5    *
6    * This library is free software; you can redistribute it and/or
7    * modify it under the terms of the GNU Library General Public
8    * License as published by the Free Software Foundation; either
9    * version 2 of the License, or (at your option) any later version.
10   *
11   * This library is distributed in the hope that it will be useful,
12   * but WITHOUT ANY WARRANTY; without even the implied warranty of
13   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14   * Library General Public License for more details.
15   *
16   * You should have received a copy of the GNU Library General Public
17   * License along with this library; if not, write to the
18   * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19   * Boston, MA  02111-1307, USA.
20   */
21  package net.sf.jour.rt.agent;
22  
23  import java.util.*;
24  import javax.naming.Context;
25  import javax.naming.InitialContext;
26  import javax.naming.NameNotFoundException;
27  import javax.naming.NamingException;
28  
29  import net.sf.jour.rt.RtProperties;
30  import net.sf.jour.util.ShutdownHook;
31  import net.sf.jour.util.ShutdownListener;
32  import net.sf.jour.util.queue.Queue;
33  
34  import org.apache.log4j.Logger;
35  import javax.jms.*;
36  
37  //import javax.naming.*;
38  
39  /***
40   * TODO Add docs
41   *
42   * Created on 11.12.2004
43   * Contributing Author(s):
44   *
45   *   Misha Lifschitz <mishalifschitz at users.sourceforge.net> (Inital implementation)
46   *   Vlad Skarzhevskyy <vlads at users.sourceforge.net> (Inital implementation)
47   *
48   * @author vlads
49   * @version $Revision: 1.1 $ ($Author: vlads $)  $Date: 2004/12/12 02:00:01 $
50   */
51  public class BulkEventJMSConsumer implements javax.jms.MessageListener, ShutdownListener {
52  
53  	protected final Logger log = Logger.getLogger(BulkEventJMSConsumer.class);
54  
55  	private static EventQueueLogger eventLogger = (EventQueueLogger) EventQueueLogger.getInstance();
56  	
57      String topicConnectionFactoryBindingName;
58      
59      String topicBindingName;
60      
61  	TopicConnection topicConnection;
62  	
63      public static void main(String[] args) {
64      }
65      
66  	
67  	public BulkEventJMSConsumer() {
68  	    
69          try {
70              getProperties();
71              Context jndi = new InitialContext();
72              
73              TopicConnectionFactory topicConnectionFactory;
74              topicConnectionFactory = (TopicConnectionFactory) lookup(jndi, this.topicConnectionFactoryBindingName, TopicConnectionFactory.class);
75              topicConnection = topicConnectionFactory.createTopicConnection();
76              topicConnection.start();
77              
78              log.debug("Creating TopicSession, non-transactional, in AUTO_ACKNOWLEDGE mode.");
79              TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
80              Topic topic = (Topic) lookup(jndi, topicBindingName, Topic.class);
81              
82              log.debug("Creating TopicPublisher.");
83              TopicSubscriber topicSubscriber = topicSession.createSubscriber(topic);
84  
85              log.info("Staring Listener for " + topicBindingName);
86  
87              topicSubscriber.setMessageListener(this);
88              
89              jndi.close();
90          } catch (Exception e) {
91              log.error("Error while activating connection", e);
92          }
93  	    
94  	    ShutdownHook.addListener(this);
95  	}
96  
97  	public static BulkEventJMSConsumer start() {
98  		return new BulkEventJMSConsumer();
99  	}
100 
101 	void getProperties() {
102 	    this.topicConnectionFactoryBindingName = RtProperties.getInstance().getProperty("BulkEventJMSConsumer.TopicConnectionFactoryBindingName", "net.sf.jour.ConnectionFactory");
103         this.topicBindingName = RtProperties.getInstance().getProperty("BulkEventJMSConsumer.TopicBindingName", "net.sf.jour.JourJMSQueue");
104 	    
105 	}
106 	
107     public void onMessage(javax.jms.Message message) {
108         try {
109             if (message instanceof ObjectMessage) {
110                 
111                 ObjectMessage objectMessage = (ObjectMessage) message;
112 
113                 Queue eventQueue = (Queue)objectMessage.getObject();
114                 
115                 process(eventQueue);
116                 
117             } else {
118                 log.warn("Received message is of type " + message.getJMSType() + ", was expecting ObjectMessage.");
119             }
120         } catch (Error jmse) {
121             log.error("Error thrown while processing incoming message.", jmse);
122         } catch (JMSException jmse) {
123             log.error("Exception thrown while processing incoming message.", jmse);
124         }
125     }
126     
127     private void process(Queue eventQueue) {
128         ThreadNumber tn = null;
129         SystemInfoEvent si = null;
130         
131         log.debug("Loaded queue:" + eventQueue.size());
132 
133 		for (Iterator i = eventQueue.iterator(); i.hasNext(); ) {
134 		    Object obj = i.next();
135 		    if (obj instanceof ThreadNumber) {
136 		        tn = (ThreadNumber)obj;
137             } else if (obj instanceof SystemInfoEvent) {
138                 si = (SystemInfoEvent)obj;
139             } else if (obj instanceof ProfilerEvent) {
140                 ProfilerEvent e = (ProfilerEvent)obj;
141     			String threadName = ThreadNumber.getUniqueThreadName(tn, si);
142 		    	e.setThreadName(threadName);
143             }
144 		}
145 		eventLogger.enqueueAll(eventQueue);
146     }
147     
148     /*
149      * Called when the Java virtual machine shuts down .
150      */
151     public void shutdown() {
152         if (topicConnection != null) {
153             try {
154                 topicConnection.close();
155             } catch (JMSException e) {
156                 log.warn("Error", e);
157             }
158         }
159     }
160 
161     
162     protected Object lookup(Context jndi, String name, Class target) throws NamingException {
163         try {
164             log.debug("Looking up " + target.getName() + " [" + name + "]");
165             Object obj = jndi.lookup(name);
166             if (log.isDebugEnabled()) {
167                 log.debug("Found :" + obj.getClass().getName());
168             }
169             if (!target.isAssignableFrom(obj.getClass())) {
170                 log.warn("result is not assignable From " + target.getName());
171             }
172             return obj;
173         } catch (NameNotFoundException e) {
174             log.error("Could not find name [" + name + "].");
175             throw e;
176         }
177     }
178 }