1 package org.apache.turbine.services.schedule;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.List;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.Executors;
25 import java.util.concurrent.atomic.AtomicBoolean;
26
27 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
28 import org.apache.logging.log4j.LogManager;
29 import org.apache.logging.log4j.Logger;
30 import org.apache.turbine.services.InitializationException;
31 import org.apache.turbine.services.TurbineBaseService;
32 import org.apache.turbine.util.TurbineException;
33
34
35
36
37
38
39
40
41 public abstract class AbstractSchedulerService extends TurbineBaseService implements ScheduleService
42 {
43
44 protected static final Logger log = LogManager.getLogger(ScheduleService.LOGGER_NAME);
45
46
47 protected JobQueue<JobEntry> scheduleQueue = null;
48
49
50 private AtomicBoolean enabled = new AtomicBoolean(false);
51
52
53 protected Thread houseKeepingThread;
54
55
56 protected ExecutorService threadPool;
57
58
59
60
61
62
63
64 @Override
65 public void init() throws InitializationException
66 {
67 try
68 {
69 setEnabled(getConfiguration().getBoolean("enabled", true));
70 scheduleQueue = new JobQueue<>();
71 threadPool = Executors.newCachedThreadPool(
72 new BasicThreadFactory.Builder()
73 .namingPattern("Turbine-ScheduledJob-")
74 .daemon(true)
75 .build());
76
77 @SuppressWarnings("unchecked")
78 List<JobEntry> jobs = (List<JobEntry>)loadJobs();
79 scheduleQueue.batchLoad(jobs);
80 restart();
81
82 setInit(true);
83 }
84 catch (Exception e)
85 {
86 throw new InitializationException("Could not initialize the scheduler service", e);
87 }
88 }
89
90
91
92
93
94
95
96 protected abstract List<? extends JobEntry> loadJobs() throws TurbineException;
97
98
99
100
101
102
103 @Override
104 public void shutdown()
105 {
106 if (getThread() != null)
107 {
108 getThread().interrupt();
109 }
110
111 threadPool.shutdownNow();
112 }
113
114
115
116
117 @Override
118 public abstract JobEntry newJob(int sec, int min, int hour, int wd, int day_mo, String task) throws TurbineException;
119
120
121
122
123
124
125
126
127
128
129 @Override
130 public abstract JobEntry getJob(int oid) throws TurbineException;
131
132
133
134
135
136
137
138
139
140 @Override
141 public void addJob(JobEntry je) throws TurbineException
142 {
143 updateJob(je);
144 }
145
146
147
148
149
150
151
152
153
154 @Override
155 public abstract void removeJob(JobEntry je) throws TurbineException;
156
157
158
159
160
161
162
163
164
165 @Override
166 public abstract void updateJob(JobEntry je) throws TurbineException;
167
168
169
170
171
172
173 @Override
174 public List<JobEntry> listJobs()
175 {
176 return scheduleQueue.list();
177 }
178
179
180
181
182
183
184
185 protected void setEnabled(boolean enabled)
186 {
187 this.enabled.set(enabled);
188 }
189
190
191
192
193
194
195 @Override
196 public boolean isEnabled()
197 {
198 return enabled.get();
199 }
200
201
202
203
204 @Override
205 public synchronized void startScheduler()
206 {
207 setEnabled(true);
208 restart();
209 }
210
211
212
213
214 @Override
215 public synchronized void stopScheduler()
216 {
217 log.info("Stopping job scheduler");
218 Thread thread = getThread();
219 if (thread != null)
220 {
221 thread.interrupt();
222 }
223 setEnabled(false);
224 }
225
226
227
228
229
230
231
232
233 public synchronized Thread getThread()
234 {
235 return houseKeepingThread;
236 }
237
238
239
240
241 protected synchronized void clearThread()
242 {
243 houseKeepingThread = null;
244 }
245
246
247
248
249
250
251
252 public synchronized void restart()
253 {
254 if (enabled.get())
255 {
256 log.info("Starting job scheduler");
257 if (houseKeepingThread == null)
258 {
259
260
261
262 houseKeepingThread = new Thread(() -> houseKeeping(), ScheduleService.SERVICE_NAME);
263
264
265
266
267 houseKeepingThread.setDaemon(true);
268 houseKeepingThread.start();
269 }
270 else
271 {
272 notify();
273 }
274 }
275 }
276
277
278
279
280
281
282
283
284 protected synchronized JobEntry nextJob() throws TurbineException
285 {
286 try
287 {
288 while (!Thread.interrupted())
289 {
290
291
292 JobEntry je = scheduleQueue.getFirst();
293
294 if (je == null)
295 {
296
297 wait();
298 }
299 else
300 {
301 long now = System.currentTimeMillis();
302 long when = je.getNextRuntime();
303
304 if (when > now)
305 {
306
307 wait(when - now);
308 }
309 else
310 {
311
312 scheduleQueue.updateQueue(je);
313
314 return je;
315 }
316 }
317 }
318 }
319 catch (InterruptedException ex)
320 {
321
322 }
323
324
325 return null;
326 }
327
328
329
330
331
332
333 private void houseKeeping()
334 {
335 String taskName = null;
336 try
337 {
338 while (enabled.get())
339 {
340 JobEntry je = nextJob();
341 if (je != null)
342 {
343 taskName = je.getTask();
344
345
346 threadPool.execute(new WorkerThread(je));
347 }
348 else
349 {
350 break;
351 }
352 }
353 }
354 catch (Exception e)
355 {
356 log.error("Error running a Scheduled Job: {}", taskName, e);
357 setEnabled(false);
358 }
359 finally
360 {
361 clearThread();
362 }
363 }
364 }