1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package net.sf.jour.rt.agent;
22
23 import java.io.File;
24 import java.io.FileOutputStream;
25 import java.io.IOException;
26 import java.io.ObjectOutputStream;
27 import java.io.OutputStream;
28 import java.text.DecimalFormat;
29 import java.text.SimpleDateFormat;
30 import java.util.Date;
31
32 import net.sf.jour.rt.RtProperties;
33 import net.sf.jour.util.queue.Queue;
34
35 import org.apache.log4j.Logger;
36
37 /***
38 * This creates new file for each theread.
39 * Should be used by One thread only.
40 *
41 * Created on 02.12.2004
42 *
43 * Contributing Author(s):
44 *
45 * Misha Lifschitz <mishalifschitz at users.sourceforge.net> (Inital implementation)
46 * Vlad Skarzhevskyy <vlads at users.sourceforge.net> (Inital implementation)
47 *
48 * @author vlads
49 * @version $Revision: 1.8 $ ($Author: vlads $) $Date: 2004/12/15 08:00:35 $
50 */
51 public class BulkEventFileWriter implements BulkEventLogger {
52
53 protected final Logger log = Logger.getLogger(BulkEventFileWriter.class);
54
55 static String runID;
56
57 private String fileName;
58 private String threadID;
59 private OutputStream outputStream;
60 private ObjectOutputStream oos;
61 private int fileCount;
62
63 private long eventCount;
64 private long firstRecivedTime;
65 private long lastRecivedTime;
66
67 public static final long DEFAULT_EVENTS_MAX = 1000;
68
69 private long maxEventsInFile;
70 private long fileCreatedTime;
71
72 public static final String WRK_SUFIX = ".wrk";
73
74 public BulkEventFileWriter() {
75 log.info("new BulkEventFileWriter()");
76 }
77
78
79
80
81 public void recive(Thread key, Queue eventQueue) {
82 log.debug("recive");
83 if ((!eventQueue.isEmpty()) && (this.firstRecivedTime == 0)) {
84 Object event = eventQueue.getFirst();
85 if (event instanceof ProfilerEvent) {
86 this.firstRecivedTime = ((ProfilerEvent)event).getTimestampLong();
87 }
88 }
89
90 ThreadNumber thread = ThreadNumber.getThreadNumberEvent(key);
91 SystemInfoEvent systemInfo = SystemInfoEvent.instance();
92 if (eventCount == 0) {
93
94 eventQueue.enqueueFirst(new JVMInfoEvent());
95 eventQueue.enqueueFirst(thread);
96 eventQueue.enqueueFirst(systemInfo);
97 }
98 write(eventQueue, thread, systemInfo);
99 }
100
101 ThreadNumber thread;
102 SystemInfoEvent systemInfo;
103
104 private void reset() {
105 this.thread = null;
106 this.systemInfo = null;
107 this.firstRecivedTime = 0;
108 }
109
110 public void write(Queue eventQueue, ThreadNumber thread, SystemInfoEvent systemInfo) {
111 this.lastRecivedTime = System.currentTimeMillis();
112 if (this.thread == null) {
113 this.thread = thread;
114 }
115 if (this.systemInfo == null) {
116 this.systemInfo = systemInfo;
117 }
118 if (this.outputStream == null) {
119 try {
120
121 if (this.firstRecivedTime == 0) {
122 this.firstRecivedTime = System.currentTimeMillis();
123 }
124 this.fileName = createFileName();
125 this.outputStream = new FileOutputStream(this.fileName + WRK_SUFIX);
126 this.oos = new ObjectOutputStream(this.outputStream);
127 this.eventCount = 0;
128 this.fileCreatedTime = System.currentTimeMillis();
129 this.maxEventsInFile = RtProperties.getInstance().getProperty("BulkEventFileWriter.events_max", DEFAULT_EVENTS_MAX);
130
131
132 } catch (IOException e) {
133 log.error("Error creating file", e);
134 this.outputStream = null;
135 return;
136 }
137 }
138
139 if (!eventQueue.isEmpty()) {
140 Object event = eventQueue.getLast();
141 if (event instanceof ProfilerEvent) {
142 this.lastRecivedTime = ((ProfilerEvent)event).getTimestampLong();
143 }
144 }
145
146
147
148
149 try {
150 oos.writeObject(eventQueue);
151 this.eventCount += eventQueue.size();
152
153 if (eventCount >= maxEventsInFile) {
154
155 close();
156 }
157
158 } catch (IOException e) {
159 log.error("Error saving Queue", e);
160 close();
161 }
162
163 int maxLifetime = RtProperties.getInstance().getIntProperty("BulkEventFileWriter.max_lifetime_minutes");
164 if ((maxLifetime > 0)
165 && ((System.currentTimeMillis() - this.fileCreatedTime) >= (maxLifetime * 60000))) {
166 close();
167 }
168 }
169
170
171 /***
172 * Close the Queue for this thread.
173 */
174 public void close(Thread key) {
175 close();
176 }
177
178 public boolean isClose() {
179 return (outputStream == null);
180 }
181
182 public void close() {
183 if (outputStream != null) {
184 log.debug("close, eventCount " + eventCount);
185 try {
186 oos.close();
187 outputStream.close();
188 outputStream = null;
189
190 File file = new File(fileName + WRK_SUFIX);
191 String newFilename = getFileName();
192 file.renameTo(new File(newFilename));
193
194 log.info("File closed:" + newFilename);
195
196 } catch (IOException e) {
197 log.error("Error closing stream", e);
198 }
199 eventCount = 0;
200 }
201 }
202
203
204 private String getFileID() {
205 SimpleDateFormat dfmt = new SimpleDateFormat("HH_mm_ss");
206 DecimalFormat cfmt = new DecimalFormat("0000");
207 return dfmt.format(new Date(this.firstRecivedTime))
208 + "-" + dfmt.format(new Date(this.lastRecivedTime))
209 + "_f" + cfmt.format(fileCount);
210 }
211
212 private String getThread() {
213 DecimalFormat cfmt = new DecimalFormat("00000000");
214 DecimalFormat cfmt2 = new DecimalFormat("00");
215 int n = this.thread.getLocalNumber();
216 return "t" + cfmt2.format(n);
217
218 }
219
220
221 private String createFileName() {
222 fileCount ++;
223 this.threadID = getThread();
224 return getFileName();
225 }
226
227 private String getFileName() {
228
229 String name = "jour_" + this.systemInfo.getRunID() + "_" + getFileID() + "_" + this.threadID + ".bin";
230
231 File file;
232
233 File dir = RtProperties.getInstance().getFolder("BulkEventFileWriter.dest_folder", true);
234 if (dir != null) {
235 file = new File(dir, name);
236 } else {
237 file = new File(name);
238 }
239 log.info("File created:" + name);
240 return file.getAbsolutePath();
241 }
242 }