1 package org.apache.turbine.services.schedule;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.List;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.turbine.services.InitializationException;
27 import org.apache.turbine.services.TurbineBaseService;
28 import org.apache.turbine.util.TurbineException;
29
30
31
32
33
34
35
36
37 public abstract class AbstractSchedulerService extends TurbineBaseService implements ScheduleService
38 {
39
40 protected static Log log = LogFactory.getLog(ScheduleService.LOGGER_NAME);
41
42
43 protected JobQueue<JobEntry> scheduleQueue = null;
44
45
46 protected boolean enabled = false;
47
48
49 protected MainLoop mainLoop;
50
51
52 protected Thread thread;
53
54
55
56
57 public AbstractSchedulerService()
58 {
59 mainLoop = null;
60 thread = null;
61 }
62
63
64
65
66
67
68
69 @Override
70 public void init() throws InitializationException
71 {
72 try
73 {
74 setEnabled(getConfiguration().getBoolean("enabled", true));
75 scheduleQueue = new JobQueue<JobEntry>();
76 mainLoop = new MainLoop();
77
78 @SuppressWarnings("unchecked")
79 List<JobEntry> jobs = (List<JobEntry>)loadJobs();
80 scheduleQueue.batchLoad(jobs);
81 restart();
82
83 setInit(true);
84 }
85 catch (Exception e)
86 {
87 throw new InitializationException("Could not initialize the scheduler service", e);
88 }
89 }
90
91
92
93
94
95
96
97 protected abstract List<? extends JobEntry> loadJobs() throws TurbineException;
98
99
100
101
102
103
104 @Override
105 public void shutdown()
106 {
107 if (getThread() != null)
108 {
109 getThread().interrupt();
110 }
111 }
112
113
114
115
116 @Override
117 public abstract JobEntry newJob(int sec, int min, int hour, int wd, int day_mo, String task) throws TurbineException;
118
119
120
121
122
123
124
125
126
127
128 @Override
129 public abstract JobEntry getJob(int oid) throws TurbineException;
130
131
132
133
134
135
136
137
138
139 @Override
140 public void addJob(JobEntry je) throws TurbineException
141 {
142 updateJob(je);
143 }
144
145
146
147
148
149
150
151
152
153 @Override
154 public abstract void removeJob(JobEntry je) throws TurbineException;
155
156
157
158
159
160
161
162
163
164 @Override
165 public abstract void updateJob(JobEntry je) throws TurbineException;
166
167
168
169
170
171
172 @Override
173 public List<JobEntry> listJobs()
174 {
175 return scheduleQueue.list();
176 }
177
178
179
180
181
182
183
184 protected void setEnabled(boolean enabled)
185 {
186 this.enabled = enabled;
187 }
188
189
190
191
192
193
194 @Override
195 public boolean isEnabled()
196 {
197 return enabled;
198 }
199
200
201
202
203 @Override
204 public synchronized void startScheduler()
205 {
206 setEnabled(true);
207 restart();
208 }
209
210
211
212
213 @Override
214 public synchronized void stopScheduler()
215 {
216 log.info("Stopping job scheduler");
217 Thread thread = getThread();
218 if (thread != null)
219 {
220 thread.interrupt();
221 }
222 enabled = false;
223 }
224
225
226
227
228
229
230
231
232 public synchronized Thread getThread()
233 {
234 return thread;
235 }
236
237
238
239
240 protected synchronized void clearThread()
241 {
242 thread = null;
243 }
244
245
246
247
248
249
250
251 public synchronized void restart()
252 {
253 if (enabled)
254 {
255 log.info("Starting job scheduler");
256 if (thread == null)
257 {
258
259
260
261 thread = new Thread(mainLoop, ScheduleService.SERVICE_NAME);
262
263
264
265
266 thread.setDaemon(true);
267 thread.start();
268 }
269 else
270 {
271 notify();
272 }
273 }
274 }
275
276
277
278
279
280
281
282
283 protected synchronized JobEntry nextJob() throws TurbineException
284 {
285 try
286 {
287 while (!Thread.interrupted())
288 {
289
290 JobEntry je = scheduleQueue.getNext();
291
292 if (je == null)
293 {
294
295 wait();
296 }
297 else
298 {
299 long now = System.currentTimeMillis();
300 long when = je.getNextRuntime();
301
302 if (when > now)
303 {
304
305 wait(when - now);
306 }
307 else
308 {
309
310 scheduleQueue.updateQueue(je);
311
312 return je;
313 }
314 }
315 }
316 }
317 catch (InterruptedException ex)
318 {
319
320 }
321
322
323 return null;
324 }
325
326
327
328
329
330
331 protected class MainLoop implements Runnable
332 {
333
334
335
336 @Override
337 public void run()
338 {
339 String taskName = null;
340 try
341 {
342 while (enabled)
343 {
344 JobEntry je = nextJob();
345 if (je != null)
346 {
347 taskName = je.getTask();
348
349
350 Runnable wt = new WorkerThread(je);
351 Thread helper = new Thread(wt);
352 helper.start();
353 }
354 else
355 {
356 break;
357 }
358 }
359 }
360 catch (Exception e)
361 {
362 log.error("Error running a Scheduled Job: " + taskName, e);
363 enabled = false;
364 }
365 finally
366 {
367 clearThread();
368 }
369 }
370 }
371 }