1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package net.sf.jour.rt.agent;
22
23 import java.util.Enumeration;
24 import java.util.Hashtable;
25 import java.util.LinkedList;
26
27 import org.apache.log4j.Logger;
28
29 import net.sf.jour.rt.RtProperties;
30 import net.sf.jour.util.ShutdownListener;
31 import net.sf.jour.util.ShutdownHook;
32 import net.sf.jour.util.queue.Queue;
33
34 /***
35 * TODO Add docs
36 * Created on 02.12.2004
37 *
38 * Contributing Author(s):
39 *
40 * Misha Lifschitz <mishalifschitz at users.sourceforge.net> (Inital implementation)
41 * Vlad Skarzhevskyy <vlads at users.sourceforge.net> (Inital implementation)
42 *
43 * @author vlads
44 * @version $Revision: 1.8 $ ($Author: vlads $) $Date: 2004/12/12 02:00:01 $
45 */
46 public class AsyncEventWriter implements BulkEventLogger {
47
48 protected static final Logger log = Logger.getLogger(AsyncEventWriter.class);
49
50 public static final int DEFAULT_DISPATCHER_THREADS = 100;
51 public static final int DEFAULT_DISPATCHER_BUFFER = 100;
52
53 /***
54 * There could be more than one thread in each Dispatcher
55 */
56 Hashtable dispatchers = new Hashtable(50);
57 int dispatcherCnt;
58
59 Dispatcher lastUsed = null;
60 int maxThreads;
61
62 public AsyncEventWriter() {
63 log.info("new AsyncEventWriter()");
64 maxThreads = RtProperties.getInstance().getProperty("AsyncEventLogger.threads", -1);
65 }
66
67 public boolean isThreadable() {
68 return (maxThreads != 0);
69 }
70
71 public void recive(Thread key, Queue eventQueue) {
72 if (isThreadable()) {
73 dispatch(key, eventQueue);
74 } else {
75 if (lastUsed == null) {
76 lastUsed = new Dispatcher("JourD");
77 dispatchers.put(key, lastUsed);
78 }
79 lastUsed.write(key, eventQueue);
80 }
81 }
82
83
84
85
86 public synchronized void dispatch(Thread key, Queue eventQueue) {
87 log.debug("dispatch");
88 Dispatcher d = (Dispatcher)dispatchers.get(key);
89 if (d == null) {
90 int per_thread = RtProperties.getInstance().getProperty("AsyncEventLogger.dispatcher.per_thread", DEFAULT_DISPATCHER_THREADS);
91 log.debug("per_thread " + per_thread);
92 if ((lastUsed == null) || (lastUsed.size() >= per_thread)) {
93 dispatcherCnt ++;
94 lastUsed = new Dispatcher("JourD-" + dispatcherCnt);
95 lastUsed.start();
96 }
97 d = lastUsed;
98 dispatchers.put(key, d);
99 d.attach(key);
100 }
101 d.recive(key, eventQueue);
102 }
103
104 /***
105 * Close the Queue for this thread.
106 */
107 public void close(Thread key) {
108 log.debug("close Thread");
109 Dispatcher d;
110
111 if (isThreadable()) {
112 d = (Dispatcher) dispatchers.get(key);
113 } else {
114 d = lastUsed;
115 }
116
117 if (d != null) {
118 d.close(key);
119 }
120
121 if (d.size() == 0) {
122 if (isThreadable()) {
123 dispatchers.remove(key);
124 } else {
125 lastUsed = null;
126 }
127 d.close();
128 if (isThreadable()) {
129 try {
130 d.join();
131 } catch (InterruptedException ex) {
132 log.error("Got an InterruptedException while waiting for the dispatcher to finish.", ex);
133 }
134 }
135 }
136 }
137
138 /***
139 * Close All and be ready to exit.
140 */
141 public void close() {
142 log.debug("close, dispatchers " + dispatchers.size());
143 synchronized (dispatchers) {
144 for (Enumeration e = dispatchers.keys(); e.hasMoreElements();) {
145 Thread key = (Thread) e.nextElement();
146 Dispatcher d = (Dispatcher)dispatchers.get(key);
147 d.close();
148 if (isThreadable()) {
149 try {
150 d.join();
151 } catch (InterruptedException ex) {
152 log.error("Got an InterruptedException while waiting for the dispatcher to finish.", ex);
153 }
154 } else {
155 d.closeWriters();
156 }
157 }
158 dispatchers.clear();
159 lastUsed = null;
160 }
161 log.debug("closed");
162 }
163
164 public boolean isClose() {
165 return (dispatchers.size() == 0);
166 }
167
168 class QueueItem {
169
170 Thread key;
171
172 Queue eventQueue;
173
174 QueueItem(Thread key, Queue eventQueue) {
175 this.key = key;
176 this.eventQueue = eventQueue;
177 }
178 }
179
180 class Dispatcher extends Thread implements ShutdownListener {
181
182 boolean interrupted = false;
183
184 Hashtable writers = new Hashtable(10);
185 Hashtable threadQueueCount = new Hashtable(10);
186 Hashtable toClose = new Hashtable(10);
187
188 private LinkedList buffer = new LinkedList();
189 private int buffer_max;
190
191 Dispatcher(String name) {
192 super(name);
193 log.info("new Dispatcher()" + name);
194
195 this.setDaemon(true);
196 this.buffer_max = RtProperties.getInstance().getProperty("AsyncEventLogger.dispatcher.buffer_max", DEFAULT_DISPATCHER_BUFFER);
197 ShutdownHook.addListener(this);
198 }
199
200 void attach(Thread key) {
201 getReciver(key);
202 }
203
204 int size() {
205 return writers.size();
206 }
207
208 void recive(Thread key, Queue eventQueue) {
209 log.debug("recive");
210 QueueItem qi = new QueueItem(key, eventQueue);
211 int sz;
212 int thisQueueCount;
213 synchronized (buffer) {
214 sz = buffer.size();
215 buffer.addLast(qi);
216 thisQueueCount = queueCount(key, +1);
217 if (sz == 0) {
218
219 buffer.notify();
220 }
221 }
222
223
224
225 if ((sz > this.buffer_max) && (thisQueueCount > 10)) {
226 int sleeps = 0;
227 do {
228 try {
229 Thread.sleep(100);
230 } catch (InterruptedException e) {
231 break;
232 }
233 sleeps ++;
234 if (sleeps > 10) {
235 log.warn("Bufer too small Queue:" + thisQueueCount);
236 break;
237 }
238 } while (buffer.size() > this.buffer_max);
239 }
240 }
241
242 void close() {
243 log.debug("close, buffer " + buffer.size());
244 synchronized (buffer) {
245 interrupted = true;
246
247
248 if (buffer.size() == 0) {
249 buffer.notify();
250 }
251 }
252 log.debug("closed");
253 }
254
255 /***
256 * Close the Queue for this thread.
257 */
258 void close(Thread key) {
259 log.debug("close Thread");
260 toClose.put(key, key);
261 int cnt = queueCount(key, 0);
262 BulkEventLogger w = (BulkEventLogger)writers.get(key);
263 while ((cnt > 0) || (!w.isClose())) {
264 synchronized (buffer) {
265 buffer.notify();
266 }
267 cnt = queueCount(key, 0);
268 }
269 }
270
271 synchronized void closeWriters() {
272 log.debug("closeWriters " + writers.size());
273 for (Enumeration e = writers.keys(); e.hasMoreElements();) {
274 Object key = e.nextElement();
275 BulkEventLogger w = (BulkEventLogger) writers.get(key);
276 if (w != null) {
277 w.close();
278 }
279 }
280 }
281
282 void write(Thread key, Queue eventQueue) {
283 BulkEventLogger reciver = getReciver(key);
284 reciver.recive(key, eventQueue);
285 }
286
287 BulkEventLogger getReciver(Thread key) {
288 BulkEventLogger reciver = (BulkEventLogger) writers.get(key);
289 if (reciver == null) {
290 String dest = RtProperties.getInstance().getProperty("AsyncEventLogger.dispatcher.destination", "BulkEventFileWriter");
291 if (dest.equals("BulkEventLog4jWriter")) {
292 reciver = new BulkEventLog4jWriter();
293 } else if (dest.equals("BulkEventJMSWriter")) {
294 reciver = new BulkEventJMSWriter();
295 } else {
296 reciver = new BulkEventFileWriter();
297 }
298 writers.put(key, reciver);
299 log.info("dispatcher.destination selected " + dest);
300 }
301 return reciver;
302 }
303
304 public int queueCount(Thread key, int inc) {
305 Integer i = (Integer) threadQueueCount.get(key);
306 if (i == null) {
307 i = new Integer(inc);
308 if (inc != 0) {
309 threadQueueCount.put(key, i);
310 }
311 } else {
312 if (inc != 0) {
313 i = new Integer( i.intValue() + inc);
314 threadQueueCount.put(key, i);
315 }
316 }
317 return i.intValue();
318 }
319
320 public void shutdown() {
321 log.debug("Dispatcher " + this.getName() + " shutdown");
322 this.interrupted = true;
323 }
324
325 public void run() {
326 try {
327 log.info("run");
328 while (true) {
329 QueueItem qi = null;
330 BulkEventLogger reciver = null;
331 int cnt = -1;
332 synchronized (buffer) {
333 if (buffer.size() == 0) {
334
335 if (interrupted) {
336 log.info("Exiting.");
337 break;
338 }
339
340 try {
341 log.debug("wait");
342 buffer.wait();
343 } catch (InterruptedException e) {
344 log.error("The dispathcer should not be interrupted.");
345 break;
346 }
347 }
348
349 if (buffer.size() != 0) {
350 qi = (QueueItem)buffer.removeFirst();
351 cnt = queueCount(qi.key, -1);
352 reciver = getReciver(qi.key);
353 }
354 }
355 log.debug("continue");
356 if ((reciver != null) && (qi != null)) {
357 reciver.recive(qi.key, qi.eventQueue);
358 if (cnt == 0) {
359 if (toClose.remove(qi.key) != null) {
360 writers.remove(qi.key);
361 reciver.close();
362 }
363 }
364 }
365 }
366 closeWriters();
367 } catch (RuntimeException e) {
368 log.error("The dispathcer terminated", e);
369 throw e;
370 } finally {
371 ShutdownHook.removeListener(this);
372 }
373 }
374 }
375
376 }