View Javadoc

1   package org.apache.turbine.services.schedule;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.List;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.turbine.services.InitializationException;
27  import org.apache.turbine.services.TurbineBaseService;
28  import org.apache.turbine.util.TurbineException;
29  
30  /**
31   * Service for a cron like scheduler.
32   *
33   * @author <a href="mailto:mbryson@mont.mindspring.com">Dave Bryson</a>
34   * @author <a href="mailto:quintonm@bellsouth.net">Quinton McCombs</a>
35   * @version $Id: TorqueSchedulerService.java 534527 2007-05-02 16:10:59Z tv $
36   */
37  public abstract class AbstractSchedulerService extends TurbineBaseService implements ScheduleService
38  {
39      /** Logging */
40      protected static Log log = LogFactory.getLog(ScheduleService.LOGGER_NAME);
41  
42      /** The queue */
43      protected JobQueue<JobEntry> scheduleQueue = null;
44  
45      /** Current status of the scheduler */
46      protected boolean enabled = false;
47  
48      /** The main loop for starting jobs. */
49      protected MainLoop mainLoop;
50  
51      /** The thread used to process commands. */
52      protected Thread thread;
53  
54      /**
55       * Creates a new instance.
56       */
57      public AbstractSchedulerService()
58      {
59          mainLoop = null;
60          thread = null;
61      }
62  
63      /**
64       * Initializes the SchedulerService.
65       *
66       * @throws InitializationException
67       *             Something went wrong in the init stage
68       */
69      @Override
70      public void init() throws InitializationException
71      {
72          try
73          {
74              setEnabled(getConfiguration().getBoolean("enabled", true));
75              scheduleQueue = new JobQueue<JobEntry>();
76              mainLoop = new MainLoop();
77  
78              @SuppressWarnings("unchecked") // Why is this cast necessary?
79              List<JobEntry> jobs = (List<JobEntry>)loadJobs();
80              scheduleQueue.batchLoad(jobs);
81              restart();
82  
83              setInit(true);
84          }
85          catch (Exception e)
86          {
87              throw new InitializationException("Could not initialize the scheduler service", e);
88          }
89      }
90  
91      /**
92       * Load all jobs from configuration storage
93       *
94       * @return the list of pre-configured jobs
95       * @throws TurbineException
96       */
97      protected abstract List<? extends JobEntry> loadJobs() throws TurbineException;
98  
99      /**
100      * Shutdowns the service.
101      *
102      * This methods interrupts the housekeeping thread.
103      */
104     @Override
105     public void shutdown()
106     {
107         if (getThread() != null)
108         {
109             getThread().interrupt();
110         }
111     }
112 
113     /**
114      * @see org.apache.turbine.services.schedule.ScheduleService#newJob(int, int, int, int, int, java.lang.String)
115      */
116     @Override
117     public abstract JobEntry newJob(int sec, int min, int hour, int wd, int day_mo, String task) throws TurbineException;
118 
119     /**
120      * Get a specific Job from Storage.
121      *
122      * @param oid
123      *            The int id for the job.
124      * @return A JobEntry.
125      * @exception TurbineException
126      *                job could not be retrieved.
127      */
128     @Override
129     public abstract JobEntry getJob(int oid) throws TurbineException;
130 
131     /**
132      * Add a new job to the queue.
133      *
134      * @param je
135      *            A JobEntry with the job to add.
136      * @throws TurbineException
137      *             job could not be added
138      */
139     @Override
140     public void addJob(JobEntry je) throws TurbineException
141     {
142         updateJob(je);
143     }
144 
145     /**
146      * Remove a job from the queue.
147      *
148      * @param je
149      *            A JobEntry with the job to remove.
150      * @exception TurbineException
151      *                job could not be removed
152      */
153     @Override
154     public abstract void removeJob(JobEntry je) throws TurbineException;
155 
156     /**
157      * Add or update a job.
158      *
159      * @param je
160      *            A JobEntry with the job to modify
161      * @throws TurbineException
162      *             job could not be updated
163      */
164     @Override
165     public abstract void updateJob(JobEntry je) throws TurbineException;
166 
167     /**
168      * List jobs in the queue. This is used by the scheduler UI.
169      *
170      * @return A List of jobs.
171      */
172     @Override
173     public List<JobEntry> listJobs()
174     {
175         return scheduleQueue.list();
176     }
177 
178     /**
179      * Sets the enabled status of the scheduler
180      *
181      * @param enabled
182      *
183      */
184     protected void setEnabled(boolean enabled)
185     {
186         this.enabled = enabled;
187     }
188 
189     /**
190      * Determines if the scheduler service is currently enabled.
191      *
192      * @return Status of the scheduler service.
193      */
194     @Override
195     public boolean isEnabled()
196     {
197         return enabled;
198     }
199 
200     /**
201      * Starts or restarts the scheduler if not already running.
202      */
203     @Override
204     public synchronized void startScheduler()
205     {
206         setEnabled(true);
207         restart();
208     }
209 
210     /**
211      * Stops the scheduler if it is currently running.
212      */
213     @Override
214     public synchronized void stopScheduler()
215     {
216         log.info("Stopping job scheduler");
217         Thread thread = getThread();
218         if (thread != null)
219         {
220             thread.interrupt();
221         }
222         enabled = false;
223     }
224 
225     /**
226      * Return the thread being used to process commands, or null if there is no
227      * such thread. You can use this to invoke any special methods on the
228      * thread, for example, to interrupt it.
229      *
230      * @return A Thread.
231      */
232     public synchronized Thread getThread()
233     {
234         return thread;
235     }
236 
237     /**
238      * Set thread to null to indicate termination.
239      */
240     protected synchronized void clearThread()
241     {
242         thread = null;
243     }
244 
245     /**
246      * Start (or restart) a thread to process commands, or wake up an existing
247      * thread if one is already running. This method can be invoked if the
248      * background thread crashed due to an unrecoverable exception in an
249      * executed command.
250      */
251     public synchronized void restart()
252     {
253         if (enabled)
254         {
255             log.info("Starting job scheduler");
256             if (thread == null)
257             {
258                 // Create the the housekeeping thread of the scheduler. It will
259                 // wait for the time when the next task needs to be started,
260                 // and then launch a worker thread to execute the task.
261                 thread = new Thread(mainLoop, ScheduleService.SERVICE_NAME);
262                 // Indicate that this is a system thread. JVM will quit only
263                 // when there are no more enabled user threads. Settings threads
264                 // spawned internally by Turbine as daemons allows commandline
265                 // applications using Turbine to terminate in an orderly manner.
266                 thread.setDaemon(true);
267                 thread.start();
268             }
269             else
270             {
271                 notify();
272             }
273         }
274     }
275 
276     /**
277      * Return the next Job to execute, or null if thread is interrupted.
278      *
279      * @return A JobEntry.
280      * @exception TurbineException
281      *                a generic exception.
282      */
283     protected synchronized JobEntry nextJob() throws TurbineException
284     {
285         try
286         {
287             while (!Thread.interrupted())
288             {
289                 // Grab the next job off the queue.
290                 JobEntry je = scheduleQueue.getNext();
291 
292                 if (je == null)
293                 {
294                     // Queue must be empty. Wait on it.
295                     wait();
296                 }
297                 else
298                 {
299                     long now = System.currentTimeMillis();
300                     long when = je.getNextRuntime();
301 
302                     if (when > now)
303                     {
304                         // Wait till next runtime.
305                         wait(when - now);
306                     }
307                     else
308                     {
309                         // Update the next runtime for the job.
310                         scheduleQueue.updateQueue(je);
311                         // Return the job to run it.
312                         return je;
313                     }
314                 }
315             }
316         }
317         catch (InterruptedException ex)
318         {
319             // ignore
320         }
321 
322         // On interrupt.
323         return null;
324     }
325 
326     /**
327      * Inner class. This is isolated in its own Runnable class just so that the
328      * main class need not implement Runnable, which would allow others to
329      * directly invoke run, which is not supported.
330      */
331     protected class MainLoop implements Runnable
332     {
333         /**
334          * Method to run the class.
335          */
336         @Override
337         public void run()
338         {
339             String taskName = null;
340             try
341             {
342                 while (enabled)
343                 {
344                     JobEntry je = nextJob();
345                     if (je != null)
346                     {
347                         taskName = je.getTask();
348 
349                         // Start the thread to run the job.
350                         Runnable wt = new WorkerThread(je);
351                         Thread helper = new Thread(wt);
352                         helper.start();
353                     }
354                     else
355                     {
356                         break;
357                     }
358                 }
359             }
360             catch (Exception e)
361             {
362                 log.error("Error running a Scheduled Job: " + taskName, e);
363                 enabled = false;
364             }
365             finally
366             {
367                 clearThread();
368             }
369         }
370     }
371 }