View Javadoc

1   package org.apache.turbine.services.schedule;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.Iterator;
23  import java.util.List;
24  
25  import javax.servlet.ServletConfig;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  
30  import org.apache.torque.TorqueException;
31  import org.apache.torque.util.Criteria;
32  
33  import org.apache.turbine.services.InitializationException;
34  import org.apache.turbine.services.TurbineBaseService;
35  import org.apache.turbine.util.TurbineException;
36  
37  /***
38   * Service for a cron like scheduler.
39   *
40   * @author <a href="mailto:mbryson@mont.mindspring.com">Dave Bryson</a>
41   * @author <a href="mailto:quintonm@bellsouth.net">Quinton McCombs</a>
42   * @version $Id: TurbineSchedulerService.java 534527 2007-05-02 16:10:59Z tv $
43   */
44  public class TurbineSchedulerService
45          extends TurbineBaseService
46          implements ScheduleService
47  {
48      /*** Logging */
49      private static Log log = LogFactory.getLog(ScheduleService.LOGGER_NAME);
50  
51      /*** The queue */
52      protected JobQueue scheduleQueue = null;
53  
54      /*** Current status of the scheduler */
55      private boolean enabled = false;
56  
57      /*** The main loop for starting jobs. */
58      protected MainLoop mainLoop;
59  
60      /*** The thread used to process commands.  */
61      protected Thread thread;
62  
63      /***
64       * Creates a new instance.
65       */
66      public TurbineSchedulerService()
67      {
68          mainLoop = null;
69          thread = null;
70      }
71  
72      /***
73       * Initializes the SchedulerService.
74       *
75       * @throws InitializationException Something went wrong in the init
76       *         stage
77       */
78      public void init()
79              throws InitializationException
80      {
81          try
82          {
83              setEnabled(getConfiguration().getBoolean("enabled", true));
84              scheduleQueue = new JobQueue();
85              mainLoop = new MainLoop();
86  
87              // Load all from cold storage.
88              List jobs = JobEntryPeer.doSelect(new Criteria());
89  
90              if (jobs != null && jobs.size() > 0)
91              {
92                  Iterator it = jobs.iterator();
93                  while (it.hasNext())
94                  {
95                      ((JobEntry) it.next()).calcRunTime();
96                  }
97                  scheduleQueue.batchLoad(jobs);
98  
99                  restart();
100             }
101 
102             setInit(true);
103         }
104         catch (Exception e)
105         {
106             String errorMessage = "Could not initialize the scheduler service";
107             log.error(errorMessage, e);
108             throw new InitializationException(errorMessage, e);
109         }
110     }
111 
112     /***
113      * Called the first time the Service is used.<br>
114      *
115      * Load all the jobs from cold storage.  Add jobs to the queue
116      * (sorted in ascending order by runtime) and start the scheduler
117      * thread.
118      *
119      * @param config A ServletConfig.
120      * @deprecated use init() instead.
121      */
122     public void init(ServletConfig config) throws InitializationException
123     {
124         init();
125     }
126 
127     /***
128      * Shutdowns the service.
129      *
130      * This methods interrupts the housekeeping thread.
131      */
132     public void shutdown()
133     {
134         if (getThread() != null)
135         {
136             getThread().interrupt();
137         }
138     }
139 
140     /***
141      * Get a specific Job from Storage.
142      *
143      * @param oid The int id for the job.
144      * @return A JobEntry.
145      * @exception TurbineException job could not be retreived.
146      */
147     public JobEntry getJob(int oid)
148             throws TurbineException
149     {
150         try
151         {
152             JobEntry je = JobEntryPeer.retrieveByPK(oid);
153             return scheduleQueue.getJob(je);
154         }
155         catch (TorqueException e)
156         {
157             String errorMessage = "Error retrieving job from persistent storage.";
158             log.error(errorMessage, e);
159             throw new TurbineException(errorMessage, e);
160         }
161     }
162 
163     /***
164      * Add a new job to the queue.
165      *
166      * @param je A JobEntry with the job to add.
167      * @throws TurbineException job could not be added
168      */
169     public void addJob(JobEntry je)
170             throws TurbineException
171     {
172         updateJob(je);
173     }
174 
175     /***
176      * Remove a job from the queue.
177      *
178      * @param je A JobEntry with the job to remove.
179      * @exception TurbineException job could not be removed
180      */
181     public void removeJob(JobEntry je)
182             throws TurbineException
183     {
184         try
185         {
186             // First remove from DB.
187             Criteria c = new Criteria().add(JobEntryPeer.JOB_ID, je.getPrimaryKey());
188             JobEntryPeer.doDelete(c);
189 
190             // Remove from the queue.
191             scheduleQueue.remove(je);
192 
193             // restart the scheduler
194             restart();
195         }
196         catch (Exception e)
197         {
198             String errorMessage = "Problem removing Scheduled Job: " + je.getTask();
199             log.error(errorMessage, e);
200             throw new TurbineException(errorMessage, e);
201         }
202     }
203 
204     /***
205      * Add or update a job.
206      *
207      * @param je A JobEntry with the job to modify
208      * @throws TurbineException job could not be updated
209      */
210     public void updateJob(JobEntry je)
211             throws TurbineException
212     {
213         try
214         {
215             je.calcRunTime();
216 
217             // Update the queue.
218             if (je.isNew())
219             {
220                 scheduleQueue.add(je);
221             }
222             else
223             {
224                 scheduleQueue.modify(je);
225             }
226 
227             je.save();
228 
229             restart();
230         }
231         catch (Exception e)
232         {
233             String errorMessage = "Problem updating Scheduled Job: " + je.getTask();
234             log.error(errorMessage, e);
235             throw new TurbineException(errorMessage, e);
236         }
237     }
238 
239     /***
240      * List jobs in the queue.  This is used by the scheduler UI.
241      *
242      * @return A List of jobs.
243      */
244     public List listJobs()
245     {
246         return scheduleQueue.list();
247     }
248 
249     /***
250      * Sets the enabled status of the scheduler
251      *
252      * @param enabled
253      *
254      */
255     protected void setEnabled(boolean enabled)
256     {
257         this.enabled = enabled;
258     }
259 
260     /***
261      * Determines if the scheduler service is currently enabled.
262      *
263      * @return Status of the scheduler service.
264      */
265     public boolean isEnabled()
266     {
267         return enabled;
268     }
269 
270     /***
271      * Starts or restarts the scheduler if not already running.
272      */
273     public synchronized void startScheduler()
274     {
275         setEnabled(true);
276         restart();
277     }
278 
279     /***
280      * Stops the scheduler if it is currently running.
281      */
282     public synchronized void stopScheduler()
283     {
284         log.info("Stopping job scheduler");
285         Thread thread = getThread();
286         if (thread != null)
287         {
288             thread.interrupt();
289         }
290         enabled = false;
291     }
292 
293     /***
294      * Return the thread being used to process commands, or null if
295      * there is no such thread.  You can use this to invoke any
296      * special methods on the thread, for example, to interrupt it.
297      *
298      * @return A Thread.
299      */
300     public synchronized Thread getThread()
301     {
302         return thread;
303     }
304 
305     /***
306      * Set thread to null to indicate termination.
307      */
308     private synchronized void clearThread()
309     {
310         thread = null;
311     }
312 
313     /***
314      * Start (or restart) a thread to process commands, or wake up an
315      * existing thread if one is already running.  This method can be
316      * invoked if the background thread crashed due to an
317      * unrecoverable exception in an executed command.
318      */
319     public synchronized void restart()
320     {
321         if (enabled)
322         {
323             log.info("Starting job scheduler");
324             if (thread == null)
325             {
326                 // Create the the housekeeping thread of the scheduler. It will wait
327                 // for the time when the next task needs to be started, and then
328                 // launch a worker thread to execute the task.
329                 thread = new Thread(mainLoop, ScheduleService.SERVICE_NAME);
330                 // Indicate that this is a system thread. JVM will quit only when there
331                 // are no more enabled user threads. Settings threads spawned internally
332                 // by Turbine as daemons allows commandline applications using Turbine
333                 // to terminate in an orderly manner.
334                 thread.setDaemon(true);
335                 thread.start();
336             }
337             else
338             {
339                 notify();
340             }
341         }
342     }
343 
344     /***
345      *  Return the next Job to execute, or null if thread is
346      *  interrupted.
347      *
348      * @return A JobEntry.
349      * @exception TurbineException a generic exception.
350      */
351     private synchronized JobEntry nextJob()
352             throws TurbineException
353     {
354         try
355         {
356             while (!Thread.interrupted())
357             {
358                 // Grab the next job off the queue.
359                 JobEntry je = scheduleQueue.getNext();
360 
361                 if (je == null)
362                 {
363                     // Queue must be empty. Wait on it.
364                     wait();
365                 }
366                 else
367                 {
368                     long now = System.currentTimeMillis();
369                     long when = je.getNextRuntime();
370 
371                     if (when > now)
372                     {
373                         // Wait till next runtime.
374                         wait(when - now);
375                     }
376                     else
377                     {
378                         // Update the next runtime for the job.
379                         scheduleQueue.updateQueue(je);
380                         // Return the job to run it.
381                         return je;
382                     }
383                 }
384             }
385         }
386         catch (InterruptedException ex)
387         {
388         }
389 
390         // On interrupt.
391         return null;
392     }
393 
394     /***
395      * Inner class.  This is isolated in its own Runnable class just
396      * so that the main class need not implement Runnable, which would
397      * allow others to directly invoke run, which is not supported.
398      */
399     protected class MainLoop
400             implements Runnable
401     {
402         /***
403          * Method to run the class.
404          */
405         public void run()
406         {
407             String taskName = null;
408             try
409             {
410                 while (enabled)
411                 {
412                     JobEntry je = nextJob();
413                     if (je != null)
414                     {
415                         taskName = je.getTask();
416 
417                         // Start the thread to run the job.
418                         Runnable wt = new WorkerThread(je);
419                         Thread helper = new Thread(wt);
420                         helper.start();
421                     }
422                     else
423                     {
424                         break;
425                     }
426                 }
427             }
428             catch (Exception e)
429             {
430                 log.error("Error running a Scheduled Job: " + taskName, e);
431                 enabled = false;
432             }
433             finally
434             {
435                 clearThread();
436             }
437         }
438     }
439 }