View Javadoc

1   /*
2    * Jour - java profiler and monitoring library
3    *
4    * Copyright (C) 2004 Jour team
5    *
6    * This library is free software; you can redistribute it and/or
7    * modify it under the terms of the GNU Library General Public
8    * License as published by the Free Software Foundation; either
9    * version 2 of the License, or (at your option) any later version.
10   *
11   * This library is distributed in the hope that it will be useful,
12   * but WITHOUT ANY WARRANTY; without even the implied warranty of
13   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14   * Library General Public License for more details.
15   *
16   * You should have received a copy of the GNU Library General Public
17   * License along with this library; if not, write to the
18   * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19   * Boston, MA  02111-1307, USA.
20   */
21  package net.sf.jour.rt.agent;
22  
23  import java.io.File;
24  import java.io.*;
25  import java.io.IOException;
26  import java.io.EOFException;
27  import java.util.*;
28  
29  import java.io.FileInputStream;
30  import java.io.ObjectInputStream;
31  
32  
33  import org.apache.log4j.Logger;
34  
35  import net.sf.jour.util.queue.Queue;
36  import net.sf.jour.util.FileUtil;
37  import net.sf.jour.rt.RtProperties;
38  
39  /***
40   * TODO Add docs
41   * Created on 05.12.2004
42   *
43   * Contributing Author(s):
44   *
45   *   Misha Lifschitz <mishalifschitz at users.sourceforge.net> (Inital implementation)
46   *   Vlad Skarzhevskyy <vlads at users.sourceforge.net> (Inital implementation)
47   *
48   * @author vlads
49   * @version $Revision: 1.9 $ ($Author: vlads $) $Date: 2004/12/16 00:08:42 $
50   */
51  public class BulkEventFileReader implements Runnable {
52  
53  	protected final Logger log = Logger.getLogger(BulkEventFileReader.class);
54  
55  	private static EventQueueLogger eventLogger = (EventQueueLogger) EventQueueLogger.getInstance();
56  
57  	private static Hashtable loaded;
58  	private static boolean resetFlag;
59  	
60  	/* Reading and sorting file list may be too expensive on Windows */
61  	private File[] cashfileList;
62  	private String cashDirectory;
63  
64  	private BulkEventFileReader() {
65  		log.debug("new BulkEventFileReader()");
66  	}
67  
68  	public static void start() {
69  		new Thread(new BulkEventFileReader()).start();
70  	}
71  
72  	public static void reset() {
73  		loaded = null;
74  		resetFlag = true;
75  	}
76  
77  	private synchronized Hashtable getTable() {
78  		if (loaded == null) {
79  			loaded = new Hashtable();
80  		}
81  		return loaded;
82  	}
83  
84      private Queue process(File file) {
85          try {
86              getTable().put(file.getCanonicalPath(), new Boolean(true));
87          } catch (IOException e) {
88              log.error("file error " + file.getName(), e);
89              return null;
90          }
91  
92  		try {
93  			log.debug("Reading file:"  + file.getName());
94              FileInputStream fis  = new FileInputStream(file);
95  			ObjectInputStream ois = new ObjectInputStream(fis);
96  
97  			Queue eventQueue = new Queue();
98  
99              int cnt = 0;
100             ThreadNumber tn = null;
101             SystemInfoEvent si = null;
102             while (true) {
103                 Object q;
104                 try {
105                     q = ois.readObject();
106                 } catch (EOFException e1) {
107                     break;
108                 }
109                 if (q instanceof ThreadNumber) {
110                     tn = (ThreadNumber)q;
111                 }else if (q instanceof SystemInfoEvent) {
112                     si = (SystemInfoEvent)q;
113                 } else if (q instanceof Event) {
114                     eventQueue.enqueue(q);
115                 } else if (q instanceof Queue) {
116                 	cnt ++;
117 					log.debug("Reading Queue" + cnt);
118                     eventQueue.enqueueAll((Queue) q);
119                 } else {
120                 	log.warn("Unknow object:" + q.getClass().getName());
121                 	break;
122                 }
123             }
124 
125 			fis.close();
126 
127 			log.debug("Loaded queue:" + eventQueue.size());
128 
129 			for (Iterator i = eventQueue.iterator(); i.hasNext(); ) {
130 			    Object obj = i.next();
131 			    if (obj instanceof ThreadNumber) {
132 			        tn = (ThreadNumber)obj;
133 			    } else if (obj instanceof JVMInfoEvent) {
134 			        ((JVMInfoEvent)obj).setSystemInfo(si);
135 			    } else if (obj instanceof SystemInfoEvent) {
136 	                si = (SystemInfoEvent)obj;
137 	            } else if (obj instanceof ProfilerEvent) {
138 	                ProfilerEvent e = (ProfilerEvent)obj;
139 	    			String threadName = ThreadNumber.getUniqueThreadName(tn, si);
140 			    	e.setThreadName(threadName);
141 	            }
142 			}
143 
144 			// reset been called
145 			if (loaded == null) {
146 				return null;
147 			}
148 			return eventQueue;
149 
150         } catch (Exception e) {
151 			log.error("file read error " + file.getName(), e);
152 			return null;
153         }
154     }
155 
156 	private boolean isLoaded(File file) {
157 		try {
158             return getTable().containsKey(file.getCanonicalPath());
159         } catch (IOException e) {
160             log.error("file error " + file.getName(), e);
161             return true;
162         }
163 	}
164 	
165 	public class BinFilenameFilter implements FilenameFilter {
166 	    public boolean accept(File dir, String name)  {
167 	        if ((name.endsWith(".bin")) ||
168 					(name.endsWith(".wrk"))) {
169 				return true;
170 			} else {
171 			    return false;
172 			}
173 	    }
174 	}
175 	
176 	private File[] append(File[] a1, File[] a2) {
177 	    if (a2.length == 0) {
178 	        return a1;
179 	    }
180 	    if (a1.length == 0) {
181 	        return a2;
182 	    }
183 	    File[] flist = new File[a1.length + a2.length];
184 	    int i = 0;
185 	    for (int k =0 ; k < a1.length; i++, k ++) {
186 	        flist[i] = a1[k];
187 	    }
188 	    for (int k = 0 ; k < a2.length; i++, k ++) {
189 	        flist[i] = a2[k];
190 	    }
191 	    return flist;
192 	}
193 	
194 	private File[] getSubFolderFiles(File dir) {
195 	    
196 	    File[] flist = new File[0];
197 	    File[] dirList = dir.listFiles();
198 		
199 	    for (int i = 0; i < dirList.length; i++) {
200 			File subDir = dirList[i];
201 			if (!subDir.isDirectory()) {
202 			    continue;
203 			}
204 			File[] subDirFilelist = subDir.listFiles(new BinFilenameFilter());
205 			flist = append(flist, subDirFilelist);
206 		}
207 		return flist;
208 	}
209 	
210 	private Queue read() {
211 		File dir = RtProperties.getInstance().getFolder("BulkEventFileReader.folder", false);
212 		log.debug("Reading folder:"  + dir);
213 		if (dir == null) {
214 			return null;
215 		}
216 		File[] flist;
217 		
218 		if ((cashDirectory != null) && (cashfileList != null) && (cashDirectory.equals(dir.getAbsolutePath()))) {
219             flist = cashfileList;
220         } else {
221             cashDirectory = dir.getAbsolutePath();
222             flist = dir.listFiles(new BinFilenameFilter());
223             if (flist.length == 0) {
224                 flist = getSubFolderFiles(dir);
225             }
226             flist = FileUtil.sortFileListByDate(flist);
227             cashfileList = flist;
228         }
229 
230 		for (int i = 0; i < flist.length; i++) {
231 			File f1 = flist[i];
232 			if ((f1.getName().endsWith(".bin")) ||
233 				(f1.getName().endsWith(".wrk"))) {
234 				if (isLoaded(f1)) {
235 					continue;
236 				}
237 				return process(f1);
238 			}
239 		}
240 
241 		return null;
242 	}
243 
244 	public void run() {
245 	    long idleCount = 0;
246 		while (true) {
247 			try {
248 				Queue eventQueue = read();
249 				if (eventQueue != null) {
250 					eventLogger.enqueueAll(eventQueue);
251 					idleCount = 0;
252 				} else {
253 				    idleCount ++;
254 				    if (idleCount > 10) {
255 				        Thread.sleep(5000); // 5 sec
256 				        if (resetFlag) {
257 				            cashfileList = null;
258 				            resetFlag = false; 
259 				        } else if ((idleCount % 12) == 0){
260 				            // Reset list every min.
261 				            cashfileList = null;
262 				        }
263 				    } else {
264 				        Thread.sleep(200);
265 				    }
266 				}
267 			} catch (Exception e) {
268 				log.error("Error", e);
269 			}
270 		}
271 	}
272 
273 }