001package org.apache.turbine.services.schedule;
002
003/*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *   http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied.  See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022import java.util.List;
023
024import org.apache.logging.log4j.LogManager;
025import org.apache.logging.log4j.Logger;
026import org.apache.turbine.services.InitializationException;
027import org.apache.turbine.services.TurbineBaseService;
028import org.apache.turbine.util.TurbineException;
029
030/**
031 * Service for a cron like scheduler.
032 *
033 * @author <a href="mailto:mbryson@mont.mindspring.com">Dave Bryson</a>
034 * @author <a href="mailto:quintonm@bellsouth.net">Quinton McCombs</a>
035 * @version $Id: TorqueSchedulerService.java 534527 2007-05-02 16:10:59Z tv $
036 */
037public abstract class AbstractSchedulerService extends TurbineBaseService implements ScheduleService
038{
039    /** Logging */
040    protected static final Logger log = LogManager.getLogger(ScheduleService.LOGGER_NAME);
041
042    /** The queue */
043    protected JobQueue<JobEntry> scheduleQueue = null;
044
045    /** Current status of the scheduler */
046    protected boolean enabled = false;
047
048    /** The main loop for starting jobs. */
049    protected MainLoop mainLoop;
050
051    /** The thread used to process commands. */
052    protected Thread thread;
053
054    /**
055     * Creates a new instance.
056     */
057    public AbstractSchedulerService()
058    {
059        mainLoop = null;
060        thread = null;
061    }
062
063    /**
064     * Initializes the SchedulerService.
065     *
066     * @throws InitializationException
067     *             Something went wrong in the init stage
068     */
069    @Override
070    public void init() throws InitializationException
071    {
072        try
073        {
074            setEnabled(getConfiguration().getBoolean("enabled", true));
075            scheduleQueue = new JobQueue<>();
076            mainLoop = new MainLoop();
077
078            @SuppressWarnings("unchecked") // Why is this cast necessary?
079            List<JobEntry> jobs = (List<JobEntry>)loadJobs();
080            scheduleQueue.batchLoad(jobs);
081            restart();
082
083            setInit(true);
084        }
085        catch (Exception e)
086        {
087            throw new InitializationException("Could not initialize the scheduler service", e);
088        }
089    }
090
091    /**
092     * Load all jobs from configuration storage
093     *
094     * @return the list of pre-configured jobs
095     * @throws TurbineException if jobs could not be loaded
096     */
097    protected abstract List<? extends JobEntry> loadJobs() throws TurbineException;
098
099    /**
100     * Shutdowns the service.
101     *
102     * This methods interrupts the housekeeping thread.
103     */
104    @Override
105    public void shutdown()
106    {
107        if (getThread() != null)
108        {
109            getThread().interrupt();
110        }
111    }
112
113    /**
114     * @see org.apache.turbine.services.schedule.ScheduleService#newJob(int, int, int, int, int, java.lang.String)
115     */
116    @Override
117    public abstract JobEntry newJob(int sec, int min, int hour, int wd, int day_mo, String task) throws TurbineException;
118
119    /**
120     * Get a specific Job from Storage.
121     *
122     * @param oid
123     *            The int id for the job.
124     * @return A JobEntry.
125     * @throws TurbineException
126     *                job could not be retrieved.
127     */
128    @Override
129    public abstract JobEntry getJob(int oid) throws TurbineException;
130
131    /**
132     * Add a new job to the queue.
133     *
134     * @param je
135     *            A JobEntry with the job to add.
136     * @throws TurbineException
137     *             job could not be added
138     */
139    @Override
140    public void addJob(JobEntry je) throws TurbineException
141    {
142        updateJob(je);
143    }
144
145    /**
146     * Remove a job from the queue.
147     *
148     * @param je
149     *            A JobEntry with the job to remove.
150     * @throws TurbineException
151     *                job could not be removed
152     */
153    @Override
154    public abstract void removeJob(JobEntry je) throws TurbineException;
155
156    /**
157     * Add or update a job.
158     *
159     * @param je
160     *            A JobEntry with the job to modify
161     * @throws TurbineException
162     *             job could not be updated
163     */
164    @Override
165    public abstract void updateJob(JobEntry je) throws TurbineException;
166
167    /**
168     * List jobs in the queue. This is used by the scheduler UI.
169     *
170     * @return A List of jobs.
171     */
172    @Override
173    public List<JobEntry> listJobs()
174    {
175        return scheduleQueue.list();
176    }
177
178    /**
179     * Sets the enabled status of the scheduler
180     *
181     * @param enabled true to enable the scheduler
182     *
183     */
184    protected void setEnabled(boolean enabled)
185    {
186        this.enabled = enabled;
187    }
188
189    /**
190     * Determines if the scheduler service is currently enabled.
191     *
192     * @return Status of the scheduler service.
193     */
194    @Override
195    public boolean isEnabled()
196    {
197        return enabled;
198    }
199
200    /**
201     * Starts or restarts the scheduler if not already running.
202     */
203    @Override
204    public synchronized void startScheduler()
205    {
206        setEnabled(true);
207        restart();
208    }
209
210    /**
211     * Stops the scheduler if it is currently running.
212     */
213    @Override
214    public synchronized void stopScheduler()
215    {
216        log.info("Stopping job scheduler");
217        Thread thread = getThread();
218        if (thread != null)
219        {
220            thread.interrupt();
221        }
222        enabled = false;
223    }
224
225    /**
226     * Return the thread being used to process commands, or null if there is no
227     * such thread. You can use this to invoke any special methods on the
228     * thread, for example, to interrupt it.
229     *
230     * @return A Thread.
231     */
232    public synchronized Thread getThread()
233    {
234        return thread;
235    }
236
237    /**
238     * Set thread to null to indicate termination.
239     */
240    protected synchronized void clearThread()
241    {
242        thread = null;
243    }
244
245    /**
246     * Start (or restart) a thread to process commands, or wake up an existing
247     * thread if one is already running. This method can be invoked if the
248     * background thread crashed due to an unrecoverable exception in an
249     * executed command.
250     */
251    public synchronized void restart()
252    {
253        if (enabled)
254        {
255            log.info("Starting job scheduler");
256            if (thread == null)
257            {
258                // Create the the housekeeping thread of the scheduler. It will
259                // wait for the time when the next task needs to be started,
260                // and then launch a worker thread to execute the task.
261                thread = new Thread(mainLoop, ScheduleService.SERVICE_NAME);
262                // Indicate that this is a system thread. JVM will quit only
263                // when there are no more enabled user threads. Settings threads
264                // spawned internally by Turbine as daemons allows commandline
265                // applications using Turbine to terminate in an orderly manner.
266                thread.setDaemon(true);
267                thread.start();
268            }
269            else
270            {
271                notify();
272            }
273        }
274    }
275
276    /**
277     * Return the next Job to execute, or null if thread is interrupted.
278     *
279     * @return A JobEntry.
280     * @throws TurbineException
281     *                a generic exception.
282     */
283    protected synchronized JobEntry nextJob() throws TurbineException
284    {
285        try
286        {
287            while (!Thread.interrupted())
288            {
289                // Grab the next job off the queue.
290                JobEntry je = scheduleQueue.getNext();
291
292                if (je == null)
293                {
294                    // Queue must be empty. Wait on it.
295                    wait();
296                }
297                else
298                {
299                    long now = System.currentTimeMillis();
300                    long when = je.getNextRuntime();
301
302                    if (when > now)
303                    {
304                        // Wait till next runtime.
305                        wait(when - now);
306                    }
307                    else
308                    {
309                        // Update the next runtime for the job.
310                        scheduleQueue.updateQueue(je);
311                        // Return the job to run it.
312                        return je;
313                    }
314                }
315            }
316        }
317        catch (InterruptedException ex)
318        {
319            // ignore
320        }
321
322        // On interrupt.
323        return null;
324    }
325
326    /**
327     * Inner class. This is isolated in its own Runnable class just so that the
328     * main class need not implement Runnable, which would allow others to
329     * directly invoke run, which is not supported.
330     */
331    protected class MainLoop implements Runnable
332    {
333        /**
334         * Method to run the class.
335         */
336        @Override
337        public void run()
338        {
339            String taskName = null;
340            try
341            {
342                while (enabled)
343                {
344                    JobEntry je = nextJob();
345                    if (je != null)
346                    {
347                        taskName = je.getTask();
348
349                        // Start the thread to run the job.
350                        Runnable wt = new WorkerThread(je);
351                        Thread helper = new Thread(wt);
352                        helper.start();
353                    }
354                    else
355                    {
356                        break;
357                    }
358                }
359            }
360            catch (Exception e)
361            {
362                log.error("Error running a Scheduled Job: {}", taskName, e);
363                enabled = false;
364            }
365            finally
366            {
367                clearThread();
368            }
369        }
370    }
371}