View Javadoc
1   package org.apache.turbine.services.schedule;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.List;
23  import java.util.concurrent.ExecutorService;
24  import java.util.concurrent.Executors;
25  import java.util.concurrent.atomic.AtomicBoolean;
26  
27  import org.apache.commons.lang3.concurrent.BasicThreadFactory;
28  import org.apache.logging.log4j.LogManager;
29  import org.apache.logging.log4j.Logger;
30  import org.apache.turbine.services.InitializationException;
31  import org.apache.turbine.services.TurbineBaseService;
32  import org.apache.turbine.util.TurbineException;
33  
34  /**
35   * Service for a cron like scheduler.
36   *
37   * @author <a href="mailto:mbryson@mont.mindspring.com">Dave Bryson</a>
38   * @author <a href="mailto:quintonm@bellsouth.net">Quinton McCombs</a>
39   * @version $Id: TorqueSchedulerService.java 534527 2007-05-02 16:10:59Z tv $
40   */
41  public abstract class AbstractSchedulerService extends TurbineBaseService implements ScheduleService
42  {
43      /** Logging */
44      protected static final Logger log = LogManager.getLogger(ScheduleService.LOGGER_NAME);
45  
46      /** The queue */
47      protected JobQueue<JobEntry> scheduleQueue = null;
48  
49      /** Current status of the scheduler */
50      private AtomicBoolean enabled = new AtomicBoolean(false);
51  
52      /** The housekeeping thread. */
53      protected Thread houseKeepingThread;
54  
55      /** The thread pool used to process jobs. */
56      protected ExecutorService threadPool;
57  
58      /**
59       * Initializes the SchedulerService.
60       *
61       * @throws InitializationException
62       *             Something went wrong in the init stage
63       */
64      @Override
65      public void init() throws InitializationException
66      {
67          try
68          {
69              setEnabled(getConfiguration().getBoolean("enabled", true));
70              scheduleQueue = new JobQueue<>();
71              threadPool = Executors.newCachedThreadPool(
72                      new BasicThreadFactory.Builder()
73                          .namingPattern("Turbine-ScheduledJob-")
74                          .daemon(true)
75                          .build());
76  
77              @SuppressWarnings("unchecked") // Why is this cast necessary?
78              List<JobEntry> jobs = (List<JobEntry>)loadJobs();
79              scheduleQueue.batchLoad(jobs);
80              restart();
81  
82              setInit(true);
83          }
84          catch (Exception e)
85          {
86              throw new InitializationException("Could not initialize the scheduler service", e);
87          }
88      }
89  
90      /**
91       * Load all jobs from configuration storage
92       *
93       * @return the list of pre-configured jobs
94       * @throws TurbineException if jobs could not be loaded
95       */
96      protected abstract List<? extends JobEntry> loadJobs() throws TurbineException;
97  
98      /**
99       * Shutdowns the service.
100      *
101      * This methods interrupts the housekeeping thread.
102      */
103     @Override
104     public void shutdown()
105     {
106         if (getThread() != null)
107         {
108             getThread().interrupt();
109         }
110 
111         threadPool.shutdownNow();
112     }
113 
114     /**
115      * @see org.apache.turbine.services.schedule.ScheduleService#newJob(int, int, int, int, int, java.lang.String)
116      */
117     @Override
118     public abstract JobEntry newJob(int sec, int min, int hour, int wd, int day_mo, String task) throws TurbineException;
119 
120     /**
121      * Get a specific Job from Storage.
122      *
123      * @param oid
124      *            The int id for the job.
125      * @return A JobEntry.
126      * @throws TurbineException
127      *                job could not be retrieved.
128      */
129     @Override
130     public abstract JobEntry getJob(int oid) throws TurbineException;
131 
132     /**
133      * Add a new job to the queue.
134      *
135      * @param je
136      *            A JobEntry with the job to add.
137      * @throws TurbineException
138      *             job could not be added
139      */
140     @Override
141     public void addJob(JobEntry je) throws TurbineException
142     {
143         updateJob(je);
144     }
145 
146     /**
147      * Remove a job from the queue.
148      *
149      * @param je
150      *            A JobEntry with the job to remove.
151      * @throws TurbineException
152      *                job could not be removed
153      */
154     @Override
155     public abstract void removeJob(JobEntry je) throws TurbineException;
156 
157     /**
158      * Add or update a job.
159      *
160      * @param je
161      *            A JobEntry with the job to modify
162      * @throws TurbineException
163      *             job could not be updated
164      */
165     @Override
166     public abstract void updateJob(JobEntry je) throws TurbineException;
167 
168     /**
169      * List jobs in the queue. This is used by the scheduler UI.
170      *
171      * @return A List of jobs.
172      */
173     @Override
174     public List<JobEntry> listJobs()
175     {
176         return scheduleQueue.list();
177     }
178 
179     /**
180      * Sets the enabled status of the scheduler
181      *
182      * @param enabled true to enable the scheduler
183      *
184      */
185     protected void setEnabled(boolean enabled)
186     {
187         this.enabled.set(enabled);
188     }
189 
190     /**
191      * Determines if the scheduler service is currently enabled.
192      *
193      * @return Status of the scheduler service.
194      */
195     @Override
196     public boolean isEnabled()
197     {
198         return enabled.get();
199     }
200 
201     /**
202      * Starts or restarts the scheduler if not already running.
203      */
204     @Override
205     public synchronized void startScheduler()
206     {
207         setEnabled(true);
208         restart();
209     }
210 
211     /**
212      * Stops the scheduler if it is currently running.
213      */
214     @Override
215     public synchronized void stopScheduler()
216     {
217         log.info("Stopping job scheduler");
218         Thread thread = getThread();
219         if (thread != null)
220         {
221             thread.interrupt();
222         }
223         setEnabled(false);
224     }
225 
226     /**
227      * Return the thread being used to process commands, or null if there is no
228      * such thread. You can use this to invoke any special methods on the
229      * thread, for example, to interrupt it.
230      *
231      * @return A Thread.
232      */
233     public synchronized Thread getThread()
234     {
235         return houseKeepingThread;
236     }
237 
238     /**
239      * Set thread to null to indicate termination.
240      */
241     protected synchronized void clearThread()
242     {
243         houseKeepingThread = null;
244     }
245 
246     /**
247      * Start (or restart) a thread to process commands, or wake up an existing
248      * thread if one is already running. This method can be invoked if the
249      * background thread crashed due to an unrecoverable exception in an
250      * executed command.
251      */
252     public synchronized void restart()
253     {
254         if (enabled.get())
255         {
256             log.info("Starting job scheduler");
257             if (houseKeepingThread == null)
258             {
259                 // Create the the housekeeping thread of the scheduler. It will
260                 // wait for the time when the next task needs to be started,
261                 // and then launch a worker thread to execute the task.
262                 houseKeepingThread = new Thread(() -> houseKeeping(), ScheduleService.SERVICE_NAME);
263                 // Indicate that this is a system thread. JVM will quit only
264                 // when there are no more enabled user threads. Settings threads
265                 // spawned internally by Turbine as daemons allows commandline
266                 // applications using Turbine to terminate in an orderly manner.
267                 houseKeepingThread.setDaemon(true);
268                 houseKeepingThread.start();
269             }
270             else
271             {
272                 notify();
273             }
274         }
275     }
276 
277     /**
278      * Return the next Job to execute, or null if thread is interrupted.
279      *
280      * @return A JobEntry.
281      * @throws TurbineException
282      *                a generic exception.
283      */
284     protected synchronized JobEntry nextJob() throws TurbineException
285     {
286         try
287         {
288             while (!Thread.interrupted())
289             {
290                 // Grab the next job off the queue.
291                 //JobEntry je = scheduleQueue.getNext();
292                 JobEntry je = scheduleQueue.getFirst();
293 
294                 if (je == null)
295                 {
296                     // Queue must be empty. Wait on it.
297                     wait();
298                 }
299                 else
300                 {
301                     long now = System.currentTimeMillis();
302                     long when = je.getNextRuntime();
303 
304                     if (when > now)
305                     {
306                         // Wait till next runtime.
307                         wait(when - now);
308                     }
309                     else
310                     {
311                         // Update the next runtime for the job.
312                         scheduleQueue.updateQueue(je);
313                         // Return the job to run it.
314                         return je;
315                     }
316                 }
317             }
318         }
319         catch (InterruptedException ex)
320         {
321             // ignore
322         }
323 
324         // On interrupt.
325         return null;
326     }
327 
328     /**
329      * Create the the housekeeping thread of the scheduler. It will
330      * wait for the time when the next task needs to be started,
331      * and then launch a worker thread to execute the task.
332      */
333     private void houseKeeping()
334     {
335         String taskName = null;
336         try
337         {
338             while (enabled.get())
339             {
340                 JobEntry je = nextJob();
341                 if (je != null)
342                 {
343                     taskName = je.getTask();
344 
345                     // Get a thread to run the job.
346                     threadPool.execute(new WorkerThread(je));
347                 }
348                 else
349                 {
350                     break;
351                 }
352             }
353         }
354         catch (Exception e)
355         {
356             log.error("Error running a Scheduled Job: {}", taskName, e);
357             setEnabled(false);
358         }
359         finally
360         {
361             clearThread();
362         }
363     }
364 }