View Javadoc

1   /*
2    * Jour - java profiler and monitoring library
3    *
4    * Copyright (C) 2004 Jour team
5    *
6    * This library is free software; you can redistribute it and/or
7    * modify it under the terms of the GNU Library General Public
8    * License as published by the Free Software Foundation; either
9    * version 2 of the License, or (at your option) any later version.
10   *
11   * This library is distributed in the hope that it will be useful,
12   * but WITHOUT ANY WARRANTY; without even the implied warranty of
13   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14   * Library General Public License for more details.
15   *
16   * You should have received a copy of the GNU Library General Public
17   * License along with this library; if not, write to the
18   * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19   * Boston, MA  02111-1307, USA.
20   */
21  package net.sf.jour.rt.agent;
22  
23  import java.util.Enumeration;
24  import java.util.Hashtable;
25  import java.util.LinkedList;
26  
27  import org.apache.log4j.Logger;
28  
29  import net.sf.jour.rt.RtProperties;
30  import net.sf.jour.util.ShutdownListener;
31  import net.sf.jour.util.ShutdownHook;
32  import net.sf.jour.util.queue.Queue;
33  
34  /***
35   * TODO Add docs
36   * Created on 02.12.2004
37   *
38   * Contributing Author(s):
39   *
40   *   Misha Lifschitz <mishalifschitz at users.sourceforge.net> (Inital implementation)
41   *   Vlad Skarzhevskyy <vlads at users.sourceforge.net> (Inital implementation)
42   *
43   * @author vlads
44   * @version $Revision: 1.8 $ ($Author: vlads $) $Date: 2004/12/12 02:00:01 $
45   */
46  public class AsyncEventWriter implements BulkEventLogger {
47  
48      protected static final Logger log = Logger.getLogger(AsyncEventWriter.class);
49  
50  	public static final int DEFAULT_DISPATCHER_THREADS = 100;
51  	public static final int DEFAULT_DISPATCHER_BUFFER = 100;
52  
53      /***
54       * There could be more than one thread in each Dispatcher
55       */
56      Hashtable dispatchers = new Hashtable(50);
57      int dispatcherCnt;
58  
59      Dispatcher lastUsed = null;
60  	int maxThreads;
61  
62  	public AsyncEventWriter() {
63  		log.info("new AsyncEventWriter()");
64  		maxThreads = RtProperties.getInstance().getProperty("AsyncEventLogger.threads", -1);
65  	}
66  
67  	public boolean isThreadable() {
68  		return (maxThreads != 0);
69  	}
70  
71  	public void recive(Thread key, Queue eventQueue) {
72  		if (isThreadable()) {
73  			dispatch(key, eventQueue);
74  		} else {
75  			if (lastUsed == null) {
76  				lastUsed = new Dispatcher("JourD");
77  				dispatchers.put(key, lastUsed);
78  			}
79  			lastUsed.write(key, eventQueue);
80  		}
81  	}
82  
83      /* (non-Javadoc)
84       * @see net.sf.jour.rt.agent.BulkEventLogger#recive(java.lang.Thread, net.sf.jour.util.queue.Queue)
85       */
86      public synchronized void dispatch(Thread key, Queue eventQueue) {
87  		log.debug("dispatch");
88          Dispatcher d = (Dispatcher)dispatchers.get(key);
89          if (d == null) {
90  			int per_thread = RtProperties.getInstance().getProperty("AsyncEventLogger.dispatcher.per_thread", DEFAULT_DISPATCHER_THREADS);
91  			log.debug("per_thread " + per_thread);
92              if ((lastUsed == null) || (lastUsed.size() >= per_thread)) {
93                  dispatcherCnt ++;
94                  lastUsed = new Dispatcher("JourD-" + dispatcherCnt);
95                  lastUsed.start();
96              }
97              d = lastUsed;
98  			dispatchers.put(key, d);
99  			d.attach(key);
100         }
101         d.recive(key, eventQueue);
102     }
103 
104 	/***
105 	 * Close the Queue for this thread.
106 	 */
107     public void close(Thread key) {
108 		log.debug("close Thread");
109         Dispatcher d;
110 
111         if (isThreadable()) {
112             d = (Dispatcher) dispatchers.get(key);
113         } else {
114             d = lastUsed;
115         }
116 
117         if (d != null) {
118             d.close(key);
119         }
120 
121         if (d.size() == 0) {
122             if (isThreadable()) {
123                 dispatchers.remove(key);
124             } else {
125                 lastUsed = null;
126             }
127 			d.close();
128             if (isThreadable()) {
129                 try {
130                     d.join();
131                 } catch (InterruptedException ex) {
132                     log.error("Got an InterruptedException while waiting for the dispatcher to finish.", ex);
133                 }
134             }
135         }
136     }
137 
138 	/***
139 	 * Close All and be ready to exit.
140 	 */
141     public void close() {
142 		log.debug("close, dispatchers " + dispatchers.size());
143         synchronized (dispatchers) {
144             for (Enumeration e = dispatchers.keys(); e.hasMoreElements();) {
145                 Thread key = (Thread) e.nextElement();
146                 Dispatcher d = (Dispatcher)dispatchers.get(key);
147                 d.close();
148                 if (isThreadable()) {
149                     try {
150                         d.join();
151                     } catch (InterruptedException ex) {
152                         log.error("Got an InterruptedException while waiting for the dispatcher to finish.", ex);
153                     }
154                 } else {
155                 	d.closeWriters();
156                 }
157             }
158             dispatchers.clear();
159             lastUsed = null;
160         }
161 		log.debug("closed");
162     }
163 
164 	public boolean isClose() {
165 		return (dispatchers.size() == 0);
166 	}
167 
168     class QueueItem {
169 
170         Thread key;
171 
172         Queue eventQueue;
173 
174         QueueItem(Thread key, Queue eventQueue) {
175             this.key = key;
176             this.eventQueue = eventQueue;
177         }
178     }
179 
180     class Dispatcher extends Thread implements ShutdownListener {
181 
182         boolean interrupted = false;
183 
184         Hashtable writers = new Hashtable(10);
185 		Hashtable threadQueueCount = new Hashtable(10);
186 		Hashtable toClose = new Hashtable(10);
187 
188         private LinkedList buffer = new LinkedList();
189         private int buffer_max;
190 
191         Dispatcher(String name) {
192         	super(name);
193             log.info("new Dispatcher()" + name);
194             //It is the user's responsibility to close appenders before exiting.
195             this.setDaemon(true);
196 			this.buffer_max = RtProperties.getInstance().getProperty("AsyncEventLogger.dispatcher.buffer_max", DEFAULT_DISPATCHER_BUFFER);
197 			ShutdownHook.addListener(this);
198         }
199 
200 		void attach(Thread key) {
201 			getReciver(key);
202 		}
203 
204         int size() {
205             return writers.size();
206         }
207 
208         void recive(Thread key, Queue eventQueue) {
209             log.debug("recive");
210             QueueItem qi = new QueueItem(key, eventQueue);
211 			int sz;
212 			int thisQueueCount;
213             synchronized (buffer) {
214                 sz = buffer.size();
215                 buffer.addLast(qi);
216 				thisQueueCount = queueCount(key, +1);
217                 if (sz == 0) {
218                 	// wakeup
219                     buffer.notify();
220                 }
221             }
222 
223             // TODO review this.
224             // Only Thread that actual have items Queue would sleep.
225 			if ((sz > this.buffer_max) && (thisQueueCount > 10)) {
226 				int sleeps = 0;
227 				do {
228 					try {
229                         Thread.sleep(100);
230                     } catch (InterruptedException e) {
231 						break;
232                     }
233 					sleeps ++;
234 					if (sleeps > 10) {
235 						log.warn("Bufer too small Queue:" + thisQueueCount);
236 						break;
237 					}
238 				} while (buffer.size() > this.buffer_max);
239 			}
240         }
241 
242         void close() {
243             log.debug("close, buffer " + buffer.size());
244             synchronized (buffer) {
245                 interrupted = true;
246                 // We have a waiting dispacther.
247                 // In that case, we need to give it a death kiss.
248 				if (buffer.size() == 0) {
249 					buffer.notify();
250 				}
251             }
252 			log.debug("closed");
253         }
254 
255         /***
256           * Close the Queue for this thread.
257           */
258         void close(Thread key) {
259 			log.debug("close Thread");
260             toClose.put(key, key);
261             int cnt = queueCount(key, 0);
262 			BulkEventLogger w = (BulkEventLogger)writers.get(key);
263             while ((cnt > 0) || (!w.isClose())) {
264             	synchronized (buffer) {
265                 	buffer.notify();
266             	}
267 				cnt = queueCount(key, 0);
268             }
269         }
270 
271 		synchronized void closeWriters() {
272 			log.debug("closeWriters " + writers.size());
273 			for (Enumeration e = writers.keys(); e.hasMoreElements();) {
274 				Object key = e.nextElement();
275 				BulkEventLogger w = (BulkEventLogger) writers.get(key);
276 				if (w != null) {
277 					w.close();
278 				}
279 			}
280 		}
281 
282         void write(Thread key, Queue eventQueue) {
283 			BulkEventLogger reciver = getReciver(key);
284 			reciver.recive(key, eventQueue);
285         }
286 
287 		BulkEventLogger getReciver(Thread key) {
288             BulkEventLogger reciver = (BulkEventLogger) writers.get(key);
289             if (reciver == null) {
290                 String dest = RtProperties.getInstance().getProperty("AsyncEventLogger.dispatcher.destination", "BulkEventFileWriter");
291                 if (dest.equals("BulkEventLog4jWriter")) {
292                     reciver = new BulkEventLog4jWriter();
293                 } else if (dest.equals("BulkEventJMSWriter")) {
294                     reciver = new BulkEventJMSWriter();
295                 } else {
296                     reciver = new BulkEventFileWriter();
297                 }
298                 writers.put(key, reciver);
299                 log.info("dispatcher.destination selected " + dest);
300             }
301             return reciver;
302         }
303 
304         public int queueCount(Thread key, int inc) {
305             Integer i = (Integer) threadQueueCount.get(key);
306             if (i == null) {
307                 i = new Integer(inc);
308                 if (inc != 0) {
309                     threadQueueCount.put(key, i);
310                 }
311             } else {
312 				if (inc != 0) {
313 					i = new Integer( i.intValue() + inc);
314 					threadQueueCount.put(key, i);
315 				}
316             }
317             return i.intValue();
318         }
319 
320         public void shutdown() {
321             log.debug("Dispatcher " + this.getName() + " shutdown");
322             this.interrupted = true;
323         }
324 
325         public void run() {
326             try {
327                 log.info("run");
328                 while (true) {
329                     QueueItem qi = null;
330                     BulkEventLogger reciver = null;
331 					int cnt = -1;
332                     synchronized (buffer) {
333                         if (buffer.size() == 0) {
334                             // Exit loop if interrupted but only if the the buffer is empty.
335                       	  	if (interrupted) {
336                       	  	    log.info("Exiting.");
337                       	  	    break;
338                       	  	}
339 
340                             try {
341 								log.debug("wait");
342                                 buffer.wait();
343                             } catch (InterruptedException e) {
344                                 log.error("The dispathcer should not be interrupted.");
345                                 break;
346                             }
347                         }
348 
349 						if (buffer.size() != 0) {
350                         	qi = (QueueItem)buffer.removeFirst();
351 							cnt = queueCount(qi.key, -1);
352 							reciver = getReciver(qi.key);
353 						}
354                     }
355                     log.debug("continue");
356                     if ((reciver != null) && (qi != null)) {
357                         reciver.recive(qi.key, qi.eventQueue);
358                         if (cnt == 0) {
359 							if (toClose.remove(qi.key) != null) {
360 								writers.remove(qi.key);
361 								reciver.close();
362 							}
363                         }
364                     }
365                 }
366                 closeWriters();
367             } catch (RuntimeException e) {
368 				log.error("The dispathcer terminated", e);
369                 throw e;
370             } finally {
371                 ShutdownHook.removeListener(this);
372             }
373         }
374     }
375 
376 }