1 package org.apache.turbine.services.schedule;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.Iterator;
23 import java.util.List;
24
25 import javax.servlet.ServletConfig;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29
30 import org.apache.torque.TorqueException;
31 import org.apache.torque.util.Criteria;
32
33 import org.apache.turbine.services.InitializationException;
34 import org.apache.turbine.services.TurbineBaseService;
35 import org.apache.turbine.util.TurbineException;
36
37 /***
38 * Service for a cron like scheduler.
39 *
40 * @author <a href="mailto:mbryson@mont.mindspring.com">Dave Bryson</a>
41 * @author <a href="mailto:quintonm@bellsouth.net">Quinton McCombs</a>
42 * @version $Id: TurbineSchedulerService.java 534527 2007-05-02 16:10:59Z tv $
43 */
44 public class TurbineSchedulerService
45 extends TurbineBaseService
46 implements ScheduleService
47 {
48 /*** Logging */
49 private static Log log = LogFactory.getLog(ScheduleService.LOGGER_NAME);
50
51 /*** The queue */
52 protected JobQueue scheduleQueue = null;
53
54 /*** Current status of the scheduler */
55 private boolean enabled = false;
56
57 /*** The main loop for starting jobs. */
58 protected MainLoop mainLoop;
59
60 /*** The thread used to process commands. */
61 protected Thread thread;
62
63 /***
64 * Creates a new instance.
65 */
66 public TurbineSchedulerService()
67 {
68 mainLoop = null;
69 thread = null;
70 }
71
72 /***
73 * Initializes the SchedulerService.
74 *
75 * @throws InitializationException Something went wrong in the init
76 * stage
77 */
78 public void init()
79 throws InitializationException
80 {
81 try
82 {
83 setEnabled(getConfiguration().getBoolean("enabled", true));
84 scheduleQueue = new JobQueue();
85 mainLoop = new MainLoop();
86
87
88 List jobs = JobEntryPeer.doSelect(new Criteria());
89
90 if (jobs != null && jobs.size() > 0)
91 {
92 Iterator it = jobs.iterator();
93 while (it.hasNext())
94 {
95 ((JobEntry) it.next()).calcRunTime();
96 }
97 scheduleQueue.batchLoad(jobs);
98
99 restart();
100 }
101
102 setInit(true);
103 }
104 catch (Exception e)
105 {
106 String errorMessage = "Could not initialize the scheduler service";
107 log.error(errorMessage, e);
108 throw new InitializationException(errorMessage, e);
109 }
110 }
111
112 /***
113 * Called the first time the Service is used.<br>
114 *
115 * Load all the jobs from cold storage. Add jobs to the queue
116 * (sorted in ascending order by runtime) and start the scheduler
117 * thread.
118 *
119 * @param config A ServletConfig.
120 * @deprecated use init() instead.
121 */
122 public void init(ServletConfig config) throws InitializationException
123 {
124 init();
125 }
126
127 /***
128 * Shutdowns the service.
129 *
130 * This methods interrupts the housekeeping thread.
131 */
132 public void shutdown()
133 {
134 if (getThread() != null)
135 {
136 getThread().interrupt();
137 }
138 }
139
140 /***
141 * Get a specific Job from Storage.
142 *
143 * @param oid The int id for the job.
144 * @return A JobEntry.
145 * @exception TurbineException job could not be retreived.
146 */
147 public JobEntry getJob(int oid)
148 throws TurbineException
149 {
150 try
151 {
152 JobEntry je = JobEntryPeer.retrieveByPK(oid);
153 return scheduleQueue.getJob(je);
154 }
155 catch (TorqueException e)
156 {
157 String errorMessage = "Error retrieving job from persistent storage.";
158 log.error(errorMessage, e);
159 throw new TurbineException(errorMessage, e);
160 }
161 }
162
163 /***
164 * Add a new job to the queue.
165 *
166 * @param je A JobEntry with the job to add.
167 * @throws TurbineException job could not be added
168 */
169 public void addJob(JobEntry je)
170 throws TurbineException
171 {
172 updateJob(je);
173 }
174
175 /***
176 * Remove a job from the queue.
177 *
178 * @param je A JobEntry with the job to remove.
179 * @exception TurbineException job could not be removed
180 */
181 public void removeJob(JobEntry je)
182 throws TurbineException
183 {
184 try
185 {
186
187 Criteria c = new Criteria().add(JobEntryPeer.JOB_ID, je.getPrimaryKey());
188 JobEntryPeer.doDelete(c);
189
190
191 scheduleQueue.remove(je);
192
193
194 restart();
195 }
196 catch (Exception e)
197 {
198 String errorMessage = "Problem removing Scheduled Job: " + je.getTask();
199 log.error(errorMessage, e);
200 throw new TurbineException(errorMessage, e);
201 }
202 }
203
204 /***
205 * Add or update a job.
206 *
207 * @param je A JobEntry with the job to modify
208 * @throws TurbineException job could not be updated
209 */
210 public void updateJob(JobEntry je)
211 throws TurbineException
212 {
213 try
214 {
215 je.calcRunTime();
216
217
218 if (je.isNew())
219 {
220 scheduleQueue.add(je);
221 }
222 else
223 {
224 scheduleQueue.modify(je);
225 }
226
227 je.save();
228
229 restart();
230 }
231 catch (Exception e)
232 {
233 String errorMessage = "Problem updating Scheduled Job: " + je.getTask();
234 log.error(errorMessage, e);
235 throw new TurbineException(errorMessage, e);
236 }
237 }
238
239 /***
240 * List jobs in the queue. This is used by the scheduler UI.
241 *
242 * @return A List of jobs.
243 */
244 public List listJobs()
245 {
246 return scheduleQueue.list();
247 }
248
249 /***
250 * Sets the enabled status of the scheduler
251 *
252 * @param enabled
253 *
254 */
255 protected void setEnabled(boolean enabled)
256 {
257 this.enabled = enabled;
258 }
259
260 /***
261 * Determines if the scheduler service is currently enabled.
262 *
263 * @return Status of the scheduler service.
264 */
265 public boolean isEnabled()
266 {
267 return enabled;
268 }
269
270 /***
271 * Starts or restarts the scheduler if not already running.
272 */
273 public synchronized void startScheduler()
274 {
275 setEnabled(true);
276 restart();
277 }
278
279 /***
280 * Stops the scheduler if it is currently running.
281 */
282 public synchronized void stopScheduler()
283 {
284 log.info("Stopping job scheduler");
285 Thread thread = getThread();
286 if (thread != null)
287 {
288 thread.interrupt();
289 }
290 enabled = false;
291 }
292
293 /***
294 * Return the thread being used to process commands, or null if
295 * there is no such thread. You can use this to invoke any
296 * special methods on the thread, for example, to interrupt it.
297 *
298 * @return A Thread.
299 */
300 public synchronized Thread getThread()
301 {
302 return thread;
303 }
304
305 /***
306 * Set thread to null to indicate termination.
307 */
308 private synchronized void clearThread()
309 {
310 thread = null;
311 }
312
313 /***
314 * Start (or restart) a thread to process commands, or wake up an
315 * existing thread if one is already running. This method can be
316 * invoked if the background thread crashed due to an
317 * unrecoverable exception in an executed command.
318 */
319 public synchronized void restart()
320 {
321 if (enabled)
322 {
323 log.info("Starting job scheduler");
324 if (thread == null)
325 {
326
327
328
329 thread = new Thread(mainLoop, ScheduleService.SERVICE_NAME);
330
331
332
333
334 thread.setDaemon(true);
335 thread.start();
336 }
337 else
338 {
339 notify();
340 }
341 }
342 }
343
344 /***
345 * Return the next Job to execute, or null if thread is
346 * interrupted.
347 *
348 * @return A JobEntry.
349 * @exception TurbineException a generic exception.
350 */
351 private synchronized JobEntry nextJob()
352 throws TurbineException
353 {
354 try
355 {
356 while (!Thread.interrupted())
357 {
358
359 JobEntry je = scheduleQueue.getNext();
360
361 if (je == null)
362 {
363
364 wait();
365 }
366 else
367 {
368 long now = System.currentTimeMillis();
369 long when = je.getNextRuntime();
370
371 if (when > now)
372 {
373
374 wait(when - now);
375 }
376 else
377 {
378
379 scheduleQueue.updateQueue(je);
380
381 return je;
382 }
383 }
384 }
385 }
386 catch (InterruptedException ex)
387 {
388 }
389
390
391 return null;
392 }
393
394 /***
395 * Inner class. This is isolated in its own Runnable class just
396 * so that the main class need not implement Runnable, which would
397 * allow others to directly invoke run, which is not supported.
398 */
399 protected class MainLoop
400 implements Runnable
401 {
402 /***
403 * Method to run the class.
404 */
405 public void run()
406 {
407 String taskName = null;
408 try
409 {
410 while (enabled)
411 {
412 JobEntry je = nextJob();
413 if (je != null)
414 {
415 taskName = je.getTask();
416
417
418 Runnable wt = new WorkerThread(je);
419 Thread helper = new Thread(wt);
420 helper.start();
421 }
422 else
423 {
424 break;
425 }
426 }
427 }
428 catch (Exception e)
429 {
430 log.error("Error running a Scheduled Job: " + taskName, e);
431 enabled = false;
432 }
433 finally
434 {
435 clearThread();
436 }
437 }
438 }
439 }