001package org.apache.turbine.services.schedule;
002
003/*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *   http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied.  See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022import java.util.List;
023import java.util.concurrent.ExecutorService;
024import java.util.concurrent.Executors;
025import java.util.concurrent.atomic.AtomicBoolean;
026
027import org.apache.commons.lang3.concurrent.BasicThreadFactory;
028import org.apache.logging.log4j.LogManager;
029import org.apache.logging.log4j.Logger;
030import org.apache.turbine.services.InitializationException;
031import org.apache.turbine.services.TurbineBaseService;
032import org.apache.turbine.util.TurbineException;
033
034/**
035 * Service for a cron like scheduler.
036 *
037 * @author <a href="mailto:mbryson@mont.mindspring.com">Dave Bryson</a>
038 * @author <a href="mailto:quintonm@bellsouth.net">Quinton McCombs</a>
039 * @version $Id: TorqueSchedulerService.java 534527 2007-05-02 16:10:59Z tv $
040 */
041public abstract class AbstractSchedulerService extends TurbineBaseService implements ScheduleService
042{
043    /** Logging */
044    protected static final Logger log = LogManager.getLogger(ScheduleService.LOGGER_NAME);
045
046    /** The queue */
047    protected JobQueue<JobEntry> scheduleQueue = null;
048
049    /** Current status of the scheduler */
050    private AtomicBoolean enabled = new AtomicBoolean(false);
051
052    /** The housekeeping thread. */
053    protected Thread houseKeepingThread;
054
055    /** The thread pool used to process jobs. */
056    protected ExecutorService threadPool;
057
058    /**
059     * Initializes the SchedulerService.
060     *
061     * @throws InitializationException
062     *             Something went wrong in the init stage
063     */
064    @Override
065    public void init() throws InitializationException
066    {
067        try
068        {
069            setEnabled(getConfiguration().getBoolean("enabled", true));
070            scheduleQueue = new JobQueue<>();
071            threadPool = Executors.newCachedThreadPool(
072                    new BasicThreadFactory.Builder()
073                        .namingPattern("Turbine-ScheduledJob-")
074                        .daemon(true)
075                        .build());
076
077            @SuppressWarnings("unchecked") // Why is this cast necessary?
078            List<JobEntry> jobs = (List<JobEntry>)loadJobs();
079            scheduleQueue.batchLoad(jobs);
080            restart();
081
082            setInit(true);
083        }
084        catch (Exception e)
085        {
086            throw new InitializationException("Could not initialize the scheduler service", e);
087        }
088    }
089
090    /**
091     * Load all jobs from configuration storage
092     *
093     * @return the list of pre-configured jobs
094     * @throws TurbineException if jobs could not be loaded
095     */
096    protected abstract List<? extends JobEntry> loadJobs() throws TurbineException;
097
098    /**
099     * Shutdowns the service.
100     *
101     * This methods interrupts the housekeeping thread.
102     */
103    @Override
104    public void shutdown()
105    {
106        if (getThread() != null)
107        {
108            getThread().interrupt();
109        }
110
111        threadPool.shutdownNow();
112    }
113
114    /**
115     * @see org.apache.turbine.services.schedule.ScheduleService#newJob(int, int, int, int, int, java.lang.String)
116     */
117    @Override
118    public abstract JobEntry newJob(int sec, int min, int hour, int wd, int day_mo, String task) throws TurbineException;
119
120    /**
121     * Get a specific Job from Storage.
122     *
123     * @param oid
124     *            The int id for the job.
125     * @return A JobEntry.
126     * @throws TurbineException
127     *                job could not be retrieved.
128     */
129    @Override
130    public abstract JobEntry getJob(int oid) throws TurbineException;
131
132    /**
133     * Add a new job to the queue.
134     *
135     * @param je
136     *            A JobEntry with the job to add.
137     * @throws TurbineException
138     *             job could not be added
139     */
140    @Override
141    public void addJob(JobEntry je) throws TurbineException
142    {
143        updateJob(je);
144    }
145
146    /**
147     * Remove a job from the queue.
148     *
149     * @param je
150     *            A JobEntry with the job to remove.
151     * @throws TurbineException
152     *                job could not be removed
153     */
154    @Override
155    public abstract void removeJob(JobEntry je) throws TurbineException;
156
157    /**
158     * Add or update a job.
159     *
160     * @param je
161     *            A JobEntry with the job to modify
162     * @throws TurbineException
163     *             job could not be updated
164     */
165    @Override
166    public abstract void updateJob(JobEntry je) throws TurbineException;
167
168    /**
169     * List jobs in the queue. This is used by the scheduler UI.
170     *
171     * @return A List of jobs.
172     */
173    @Override
174    public List<JobEntry> listJobs()
175    {
176        return scheduleQueue.list();
177    }
178
179    /**
180     * Sets the enabled status of the scheduler
181     *
182     * @param enabled true to enable the scheduler
183     *
184     */
185    protected void setEnabled(boolean enabled)
186    {
187        this.enabled.set(enabled);
188    }
189
190    /**
191     * Determines if the scheduler service is currently enabled.
192     *
193     * @return Status of the scheduler service.
194     */
195    @Override
196    public boolean isEnabled()
197    {
198        return enabled.get();
199    }
200
201    /**
202     * Starts or restarts the scheduler if not already running.
203     */
204    @Override
205    public synchronized void startScheduler()
206    {
207        setEnabled(true);
208        restart();
209    }
210
211    /**
212     * Stops the scheduler if it is currently running.
213     */
214    @Override
215    public synchronized void stopScheduler()
216    {
217        log.info("Stopping job scheduler");
218        Thread thread = getThread();
219        if (thread != null)
220        {
221            thread.interrupt();
222        }
223        setEnabled(false);
224    }
225
226    /**
227     * Return the thread being used to process commands, or null if there is no
228     * such thread. You can use this to invoke any special methods on the
229     * thread, for example, to interrupt it.
230     *
231     * @return A Thread.
232     */
233    public synchronized Thread getThread()
234    {
235        return houseKeepingThread;
236    }
237
238    /**
239     * Set thread to null to indicate termination.
240     */
241    protected synchronized void clearThread()
242    {
243        houseKeepingThread = null;
244    }
245
246    /**
247     * Start (or restart) a thread to process commands, or wake up an existing
248     * thread if one is already running. This method can be invoked if the
249     * background thread crashed due to an unrecoverable exception in an
250     * executed command.
251     */
252    public synchronized void restart()
253    {
254        if (enabled.get())
255        {
256            log.info("Starting job scheduler");
257            if (houseKeepingThread == null)
258            {
259                // Create the the housekeeping thread of the scheduler. It will
260                // wait for the time when the next task needs to be started,
261                // and then launch a worker thread to execute the task.
262                houseKeepingThread = new Thread(() -> houseKeeping(), ScheduleService.SERVICE_NAME);
263                // Indicate that this is a system thread. JVM will quit only
264                // when there are no more enabled user threads. Settings threads
265                // spawned internally by Turbine as daemons allows commandline
266                // applications using Turbine to terminate in an orderly manner.
267                houseKeepingThread.setDaemon(true);
268                houseKeepingThread.start();
269            }
270            else
271            {
272                notify();
273            }
274        }
275    }
276
277    /**
278     * Return the next Job to execute, or null if thread is interrupted.
279     *
280     * @return A JobEntry.
281     * @throws TurbineException
282     *                a generic exception.
283     */
284    protected synchronized JobEntry nextJob() throws TurbineException
285    {
286        try
287        {
288            while (!Thread.interrupted())
289            {
290                // Grab the next job off the queue.
291                //JobEntry je = scheduleQueue.getNext();
292                JobEntry je = scheduleQueue.getFirst();
293
294                if (je == null)
295                {
296                    // Queue must be empty. Wait on it.
297                    wait();
298                }
299                else
300                {
301                    long now = System.currentTimeMillis();
302                    long when = je.getNextRuntime();
303
304                    if (when > now)
305                    {
306                        // Wait till next runtime.
307                        wait(when - now);
308                    }
309                    else
310                    {
311                        // Update the next runtime for the job.
312                        scheduleQueue.updateQueue(je);
313                        // Return the job to run it.
314                        return je;
315                    }
316                }
317            }
318        }
319        catch (InterruptedException ex)
320        {
321            // ignore
322        }
323
324        // On interrupt.
325        return null;
326    }
327
328    /**
329     * Create the the housekeeping thread of the scheduler. It will
330     * wait for the time when the next task needs to be started,
331     * and then launch a worker thread to execute the task.
332     */
333    private void houseKeeping()
334    {
335        String taskName = null;
336        try
337        {
338            while (enabled.get())
339            {
340                JobEntry je = nextJob();
341                if (je != null)
342                {
343                    taskName = je.getTask();
344
345                    // Get a thread to run the job.
346                    threadPool.execute(new WorkerThread(je));
347                }
348                else
349                {
350                    break;
351                }
352            }
353        }
354        catch (Exception e)
355        {
356            log.error("Error running a Scheduled Job: {}", taskName, e);
357            setEnabled(false);
358        }
359        finally
360        {
361            clearThread();
362        }
363    }
364}