View Javadoc

1   /*
2    * Jour - java profiler and monitoring library
3    *
4    * Copyright (C) 2004 Jour team
5    *
6    * This library is free software; you can redistribute it and/or
7    * modify it under the terms of the GNU Library General Public
8    * License as published by the Free Software Foundation; either
9    * version 2 of the License, or (at your option) any later version.
10   *
11   * This library is distributed in the hope that it will be useful,
12   * but WITHOUT ANY WARRANTY; without even the implied warranty of
13   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14   * Library General Public License for more details.
15   *
16   * You should have received a copy of the GNU Library General Public
17   * License along with this library; if not, write to the
18   * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19   * Boston, MA  02111-1307, USA.
20   */
21  package net.sf.jour.rt.agent;
22  
23  import java.util.Hashtable;
24  import java.util.Enumeration;
25  
26  import org.apache.log4j.Logger;
27  
28  import net.sf.jour.util.ShutdownHook;
29  import net.sf.jour.util.ShutdownListener;
30  import net.sf.jour.util.queue.*;
31  import net.sf.jour.rt.RtProperties;
32  
33  /***
34   * Store events in the multiple Queues per Thread.
35   * Queues are responsible for storing events.
36   *
37   * Created on 02.12.2004
38   *
39   * Contributing Author(s):
40   *
41   *   Misha Lifschitz <mishalifschitz at users.sourceforge.net> (Inital implementation)
42   *   Vlad Skarzhevskyy <vlads at users.sourceforge.net> (Inital implementation)
43   *
44   * @author vlads
45   * @version $Revision: 1.7 $ ($Author: vlads $) $Date: 2004/12/16 06:28:17 $
46   */
47  
48  public class AsyncEventLogger implements EventLogger, ShutdownListener {
49  
50  	protected static final Logger log = Logger.getLogger(AsyncEventLogger.class);
51  
52      /***   DOCUMENT ME!   */
53      private static final AsyncEventLogger instance = new AsyncEventLogger();
54  
55      // The synchronized keyword is not used in this class. This may seem
56      // dangerous, especially since the class will be used by
57      // multiple-threads. In particular, all threads share the same
58      // hashtable (the "ht" variable). This is OK since java hashtables
59      // are thread safe.
60  
61      static Hashtable ht = new Hashtable(50);
62  
63      /*** The default buffer size is set to 128 events. */
64      public static final int DEFAULT_BUFFER_SIZE = 128;
65      
66      private int bufferSize;
67  
68      private BulkEventLogger bulkEventLogger;
69      
70      public static final boolean profileitself = false;
71      
72      private static long jvmInfoEventNextTime;
73      private long jvmInfoEventFrequency;
74      
75      /***
76       * Creates a new EventMTQueueLogger object.
77       */
78      public AsyncEventLogger() {
79  		log.info("new AsyncEventLogger()");
80  		bufferSize = RtProperties.getInstance().getProperty("AsyncEventLogger.queue.size", DEFAULT_BUFFER_SIZE);
81  		jvmInfoEventFrequency  = RtProperties.getInstance().getProperty("AsyncEventLogger.JVMInfoEvents.frequency", 0);
82  		ShutdownHook.addListener(this);
83  		this.bulkEventLogger = new AsyncEventWriter();
84      }
85  
86      private Queue getEventQueue(Thread key) {
87          Queue eventQueue = (Queue) ht.get(key);
88  
89          if (eventQueue == null) {
90              eventQueue = new Queue();
91              ht.put(key, eventQueue);
92          }
93          return eventQueue;
94      }
95      /***
96       * DOCUMENT ME!
97       *
98       * @param event DOCUMENT ME!
99       */
100     public void logEvent(Event event) {
101         //if (true) return;
102         Thread key = Thread.currentThread();
103         Queue eventQueue = getEventQueue(key);
104 
105         eventQueue.enqueue(event);
106 
107         if (eventQueue.size() > bufferSize) {
108 			ht.remove(key);
109 			
110 			if (profileitself) {
111 			    eventQueue.enqueue(new ProfilerEvent(ProfilerEvent.START, null, 
112 			        	new MapedEventID(MapedEventID.ID_DISPATCH_QUEUE)));
113 			}
114 			
115 			if (jvmInfoEventFrequency != 0) {
116 			    long now = System.currentTimeMillis();
117 			    if (now > jvmInfoEventNextTime) {
118 			        jvmInfoEventNextTime = now + jvmInfoEventFrequency;
119 			        eventQueue.enqueue(new JVMInfoEvent());
120 			    }
121 			}
122             
123 			dispatchQueue(key, eventQueue);
124             
125             if (profileitself) {
126                 Queue newEventQueue = getEventQueue(key);
127                 newEventQueue.enqueue(new ProfilerEvent(ProfilerEvent.END, null, 
128 			        	new MapedEventID(MapedEventID.ID_DISPATCH_QUEUE)));
129 
130             }
131         }
132     }
133 
134     /***
135      * Remove the Queue from this thread.
136      */
137     private void dispatchQueue(Thread key, Queue eventQueue) {
138         this.bulkEventLogger.recive(key, eventQueue);
139     }
140 
141 	public void close() {
142 		close(Thread.currentThread());
143 	}
144 	/***
145 	 * Close the Queue for this thread.
146 	 */
147 	public void close(Thread key) {
148 		Queue eventQueue = (Queue) ht.get(key);
149 		if (eventQueue != null) {
150 			log.debug("close");
151 			ht.remove(key);
152 			dispatchQueue(key, eventQueue);
153 			this.bulkEventLogger.close(key);
154 		}
155 	}
156 	
157 	public void shutdown() {
158 	    log.debug("AsyncEventLogger shutdown");
159 	    closeAll();
160 	}
161 	
162     public void closeAll() {
163 		log.debug("closeAll, threads " + ht.size());
164         synchronized (ht) {
165             for (Enumeration e = ht.keys(); e.hasMoreElements();) {
166                 Thread key = (Thread) e.nextElement();
167                 Queue eventQueue = (Queue) ht.get(key);
168 				dispatchQueue(key, eventQueue);
169             }
170             ht.clear();
171 
172             this.bulkEventLogger.close();
173         }
174 		log.debug("closed all");
175     }
176 }