1 package org.apache.turbine.services.schedule;
2
3 /*
4 * Licensed to the Apache Software Foundation (ASF) under one
5 * or more contributor license agreements. See the NOTICE file
6 * distributed with this work for additional information
7 * regarding copyright ownership. The ASF licenses this file
8 * to you under the Apache License, Version 2.0 (the
9 * "License"); you may not use this file except in compliance
10 * with the License. You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing,
15 * software distributed under the License is distributed on an
16 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17 * KIND, either express or implied. See the License for the
18 * specific language governing permissions and limitations
19 * under the License.
20 */
21
22 import java.util.List;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.Executors;
25 import java.util.concurrent.atomic.AtomicBoolean;
26
27 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
28 import org.apache.logging.log4j.LogManager;
29 import org.apache.logging.log4j.Logger;
30 import org.apache.turbine.services.InitializationException;
31 import org.apache.turbine.services.TurbineBaseService;
32 import org.apache.turbine.util.TurbineException;
33
34 /**
35 * Service for a cron like scheduler.
36 *
37 * @author <a href="mailto:mbryson@mont.mindspring.com">Dave Bryson</a>
38 * @author <a href="mailto:quintonm@bellsouth.net">Quinton McCombs</a>
39 * @version $Id: TorqueSchedulerService.java 534527 2007-05-02 16:10:59Z tv $
40 */
41 public abstract class AbstractSchedulerService extends TurbineBaseService implements ScheduleService
42 {
43 /** Logging */
44 protected static final Logger log = LogManager.getLogger(ScheduleService.LOGGER_NAME);
45
46 /** The queue */
47 protected JobQueue<JobEntry> scheduleQueue = null;
48
49 /** Current status of the scheduler */
50 private AtomicBoolean enabled = new AtomicBoolean(false);
51
52 /** The housekeeping thread. */
53 protected Thread houseKeepingThread;
54
55 /** The thread pool used to process jobs. */
56 protected ExecutorService threadPool;
57
58 /**
59 * Initializes the SchedulerService.
60 *
61 * @throws InitializationException
62 * Something went wrong in the init stage
63 */
64 @Override
65 public void init() throws InitializationException
66 {
67 try
68 {
69 setEnabled(getConfiguration().getBoolean("enabled", true));
70 scheduleQueue = new JobQueue<>();
71 threadPool = Executors.newCachedThreadPool(
72 new BasicThreadFactory.Builder()
73 .namingPattern("Turbine-ScheduledJob-")
74 .daemon(true)
75 .build());
76
77 @SuppressWarnings("unchecked") // Why is this cast necessary?
78 List<JobEntry> jobs = (List<JobEntry>)loadJobs();
79 scheduleQueue.batchLoad(jobs);
80 restart();
81
82 setInit(true);
83 }
84 catch (Exception e)
85 {
86 throw new InitializationException("Could not initialize the scheduler service", e);
87 }
88 }
89
90 /**
91 * Load all jobs from configuration storage
92 *
93 * @return the list of pre-configured jobs
94 * @throws TurbineException if jobs could not be loaded
95 */
96 protected abstract List<? extends JobEntry> loadJobs() throws TurbineException;
97
98 /**
99 * Shutdowns the service.
100 *
101 * This methods interrupts the housekeeping thread.
102 */
103 @Override
104 public void shutdown()
105 {
106 if (getThread() != null)
107 {
108 getThread().interrupt();
109 }
110
111 threadPool.shutdownNow();
112 }
113
114 /**
115 * @see org.apache.turbine.services.schedule.ScheduleService#newJob(int, int, int, int, int, java.lang.String)
116 */
117 @Override
118 public abstract JobEntry newJob(int sec, int min, int hour, int wd, int day_mo, String task) throws TurbineException;
119
120 /**
121 * Get a specific Job from Storage.
122 *
123 * @param oid
124 * The int id for the job.
125 * @return A JobEntry.
126 * @throws TurbineException
127 * job could not be retrieved.
128 */
129 @Override
130 public abstract JobEntry getJob(int oid) throws TurbineException;
131
132 /**
133 * Add a new job to the queue.
134 *
135 * @param je
136 * A JobEntry with the job to add.
137 * @throws TurbineException
138 * job could not be added
139 */
140 @Override
141 public void addJob(JobEntry je) throws TurbineException
142 {
143 updateJob(je);
144 }
145
146 /**
147 * Remove a job from the queue.
148 *
149 * @param je
150 * A JobEntry with the job to remove.
151 * @throws TurbineException
152 * job could not be removed
153 */
154 @Override
155 public abstract void removeJob(JobEntry je) throws TurbineException;
156
157 /**
158 * Add or update a job.
159 *
160 * @param je
161 * A JobEntry with the job to modify
162 * @throws TurbineException
163 * job could not be updated
164 */
165 @Override
166 public abstract void updateJob(JobEntry je) throws TurbineException;
167
168 /**
169 * List jobs in the queue. This is used by the scheduler UI.
170 *
171 * @return A List of jobs.
172 */
173 @Override
174 public List<JobEntry> listJobs()
175 {
176 return scheduleQueue.list();
177 }
178
179 /**
180 * Sets the enabled status of the scheduler
181 *
182 * @param enabled true to enable the scheduler
183 *
184 */
185 protected void setEnabled(boolean enabled)
186 {
187 this.enabled.set(enabled);
188 }
189
190 /**
191 * Determines if the scheduler service is currently enabled.
192 *
193 * @return Status of the scheduler service.
194 */
195 @Override
196 public boolean isEnabled()
197 {
198 return enabled.get();
199 }
200
201 /**
202 * Starts or restarts the scheduler if not already running.
203 */
204 @Override
205 public synchronized void startScheduler()
206 {
207 setEnabled(true);
208 restart();
209 }
210
211 /**
212 * Stops the scheduler if it is currently running.
213 */
214 @Override
215 public synchronized void stopScheduler()
216 {
217 log.info("Stopping job scheduler");
218 Thread thread = getThread();
219 if (thread != null)
220 {
221 thread.interrupt();
222 }
223 setEnabled(false);
224 }
225
226 /**
227 * Return the thread being used to process commands, or null if there is no
228 * such thread. You can use this to invoke any special methods on the
229 * thread, for example, to interrupt it.
230 *
231 * @return A Thread.
232 */
233 public synchronized Thread getThread()
234 {
235 return houseKeepingThread;
236 }
237
238 /**
239 * Set thread to null to indicate termination.
240 */
241 protected synchronized void clearThread()
242 {
243 houseKeepingThread = null;
244 }
245
246 /**
247 * Start (or restart) a thread to process commands, or wake up an existing
248 * thread if one is already running. This method can be invoked if the
249 * background thread crashed due to an unrecoverable exception in an
250 * executed command.
251 */
252 public synchronized void restart()
253 {
254 if (enabled.get())
255 {
256 log.info("Starting job scheduler");
257 if (houseKeepingThread == null)
258 {
259 // Create the the housekeeping thread of the scheduler. It will
260 // wait for the time when the next task needs to be started,
261 // and then launch a worker thread to execute the task.
262 houseKeepingThread = new Thread(() -> houseKeeping(), ScheduleService.SERVICE_NAME);
263 // Indicate that this is a system thread. JVM will quit only
264 // when there are no more enabled user threads. Settings threads
265 // spawned internally by Turbine as daemons allows commandline
266 // applications using Turbine to terminate in an orderly manner.
267 houseKeepingThread.setDaemon(true);
268 houseKeepingThread.start();
269 }
270 else
271 {
272 notify();
273 }
274 }
275 }
276
277 /**
278 * Return the next Job to execute, or null if thread is interrupted.
279 *
280 * @return A JobEntry.
281 * @throws TurbineException
282 * a generic exception.
283 */
284 protected synchronized JobEntry nextJob() throws TurbineException
285 {
286 try
287 {
288 while (!Thread.interrupted())
289 {
290 // Grab the next job off the queue.
291 //JobEntry je = scheduleQueue.getNext();
292 JobEntry je = scheduleQueue.getFirst();
293
294 if (je == null)
295 {
296 // Queue must be empty. Wait on it.
297 wait();
298 }
299 else
300 {
301 long now = System.currentTimeMillis();
302 long when = je.getNextRuntime();
303
304 if (when > now)
305 {
306 // Wait till next runtime.
307 wait(when - now);
308 }
309 else
310 {
311 // Update the next runtime for the job.
312 scheduleQueue.updateQueue(je);
313 // Return the job to run it.
314 return je;
315 }
316 }
317 }
318 }
319 catch (InterruptedException ex)
320 {
321 // ignore
322 }
323
324 // On interrupt.
325 return null;
326 }
327
328 /**
329 * Create the the housekeeping thread of the scheduler. It will
330 * wait for the time when the next task needs to be started,
331 * and then launch a worker thread to execute the task.
332 */
333 private void houseKeeping()
334 {
335 String taskName = null;
336 try
337 {
338 while (enabled.get())
339 {
340 JobEntry je = nextJob();
341 if (je != null)
342 {
343 taskName = je.getTask();
344
345 // Get a thread to run the job.
346 threadPool.execute(new WorkerThread(je));
347 }
348 else
349 {
350 break;
351 }
352 }
353 }
354 catch (Exception e)
355 {
356 log.error("Error running a Scheduled Job: {}", taskName, e);
357 setEnabled(false);
358 }
359 finally
360 {
361 clearThread();
362 }
363 }
364 }