AbstractSchedulerService.java

package org.apache.turbine.services.schedule;

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.turbine.services.InitializationException;
import org.apache.turbine.services.TurbineBaseService;
import org.apache.turbine.util.TurbineException;

/**
 * Service for a cron like scheduler.
 *
 * @author <a href="mailto:mbryson@mont.mindspring.com">Dave Bryson</a>
 * @author <a href="mailto:quintonm@bellsouth.net">Quinton McCombs</a>
 * @version $Id: TorqueSchedulerService.java 534527 2007-05-02 16:10:59Z tv $
 */
public abstract class AbstractSchedulerService extends TurbineBaseService implements ScheduleService
{
    /** Logging */
    protected static final Logger log = LogManager.getLogger(ScheduleService.LOGGER_NAME);

    /** The queue */
    protected JobQueue<JobEntry> scheduleQueue = null;

    /** Current status of the scheduler */
    private AtomicBoolean enabled = new AtomicBoolean(false);

    /** The housekeeping thread. */
    protected Thread houseKeepingThread;

    /** The thread pool used to process jobs. */
    protected ExecutorService threadPool;

    /**
     * Initializes the SchedulerService.
     *
     * @throws InitializationException
     *             Something went wrong in the init stage
     */
    @Override
    public void init() throws InitializationException
    {
        try
        {
            setEnabled(getConfiguration().getBoolean("enabled", true));
            scheduleQueue = new JobQueue<>();
            threadPool = Executors.newCachedThreadPool(
                    new BasicThreadFactory.Builder()
                        .namingPattern("Turbine-ScheduledJob-")
                        .daemon(true)
                        .build());

            @SuppressWarnings("unchecked") // Why is this cast necessary?
            List<JobEntry> jobs = (List<JobEntry>)loadJobs();
            scheduleQueue.batchLoad(jobs);
            restart();

            setInit(true);
        }
        catch (Exception e)
        {
            throw new InitializationException("Could not initialize the scheduler service", e);
        }
    }

    /**
     * Load all jobs from configuration storage
     *
     * @return the list of pre-configured jobs
     * @throws TurbineException if jobs could not be loaded
     */
    protected abstract List<? extends JobEntry> loadJobs() throws TurbineException;

    /**
     * Shutdowns the service.
     * <p>
     * This methods interrupts the housekeeping thread.
     */
    @Override
    public void shutdown()
    {
        if (getThread() != null)
        {
            getThread().interrupt();
        }

        threadPool.shutdownNow();
    }

    /**
     * @see org.apache.turbine.services.schedule.ScheduleService#newJob(int, int, int, int, int, java.lang.String)
     */
    @Override
    public abstract JobEntry newJob(int sec, int min, int hour, int wd, int day_mo, String task) throws TurbineException;

    /**
     * Get a specific Job from Storage.
     *
     * @param oid
     *            The int id for the job.
     * @return A JobEntry.
     * @throws TurbineException
     *                job could not be retrieved.
     */
    @Override
    public abstract JobEntry getJob(int oid) throws TurbineException;

    /**
     * Add a new job to the queue.
     *
     * @param je
     *            A JobEntry with the job to add.
     * @throws TurbineException
     *             job could not be added
     */
    @Override
    public void addJob(JobEntry je) throws TurbineException
    {
        updateJob(je);
    }

    /**
     * Remove a job from the queue.
     *
     * @param je
     *            A JobEntry with the job to remove.
     * @throws TurbineException
     *                job could not be removed
     */
    @Override
    public abstract void removeJob(JobEntry je) throws TurbineException;

    /**
     * Add or update a job.
     *
     * @param je
     *            A JobEntry with the job to modify
     * @throws TurbineException
     *             job could not be updated
     */
    @Override
    public abstract void updateJob(JobEntry je) throws TurbineException;

    /**
     * List jobs in the queue. This is used by the scheduler UI.
     *
     * @return A List of jobs.
     */
    @Override
    public List<JobEntry> listJobs()
    {
        return scheduleQueue.list();
    }

    /**
     * Sets the enabled status of the scheduler
     *
     * @param enabled true to enable the scheduler
     *
     */
    protected void setEnabled(boolean enabled)
    {
        this.enabled.set(enabled);
    }

    /**
     * Determines if the scheduler service is currently enabled.
     *
     * @return Status of the scheduler service.
     */
    @Override
    public boolean isEnabled()
    {
        return enabled.get();
    }

    /**
     * Starts or restarts the scheduler if not already running.
     */
    @Override
    public synchronized void startScheduler()
    {
        setEnabled(true);
        restart();
    }

    /**
     * Stops the scheduler if it is currently running.
     */
    @Override
    public synchronized void stopScheduler()
    {
        log.info("Stopping job scheduler");
        Thread thread = getThread();
        if (thread != null)
        {
            thread.interrupt();
        }
        setEnabled(false);
    }

    /**
     * Return the thread being used to process commands, or null if there is no
     * such thread. You can use this to invoke any special methods on the
     * thread, for example, to interrupt it.
     *
     * @return A Thread.
     */
    public synchronized Thread getThread()
    {
        return houseKeepingThread;
    }

    /**
     * Set thread to null to indicate termination.
     */
    protected synchronized void clearThread()
    {
        houseKeepingThread = null;
    }

    /**
     * Start (or restart) a thread to process commands, or wake up an existing
     * thread if one is already running. This method can be invoked if the
     * background thread crashed due to an unrecoverable exception in an
     * executed command.
     */
    public synchronized void restart()
    {
        if (enabled.get())
        {
            log.info("Starting job scheduler");
            if (houseKeepingThread == null)
            {
                // Create the the housekeeping thread of the scheduler. It will
                // wait for the time when the next task needs to be started,
                // and then launch a worker thread to execute the task.
                houseKeepingThread = new Thread(this::houseKeeping, ScheduleService.SERVICE_NAME);
                // Indicate that this is a system thread. JVM will quit only
                // when there are no more enabled user threads. Settings threads
                // spawned internally by Turbine as daemons allows commandline
                // applications using Turbine to terminate in an orderly manner.
                houseKeepingThread.setDaemon(true);
                houseKeepingThread.start();
            }
            else
            {
                notify();
            }
        }
    }

    /**
     * Return the next Job to execute, or null if thread is interrupted.
     *
     * @return A JobEntry.
     * @throws TurbineException
     *                a generic exception.
     */
    protected synchronized JobEntry nextJob() throws TurbineException
    {
        try
        {
            while (!Thread.interrupted())
            {
                // Grab the next job off the queue.
                //JobEntry je = scheduleQueue.getNext();
                JobEntry je = scheduleQueue.getFirst();

                if (je == null)
                {
                    // Queue must be empty. Wait on it.
                    wait();
                }
                else
                {
                    long now = System.currentTimeMillis();
                    long when = je.getNextRuntime();

                    if (when > now)
                    {
                        // Wait till next runtime.
                        wait(when - now);
                    }
                    else
                    {
                        // Update the next runtime for the job.
                        scheduleQueue.updateQueue(je);
                        // Return the job to run it.
                        return je;
                    }
                }
            }
        }
        catch (InterruptedException ex)
        {
            // ignore
        }

        // On interrupt.
        return null;
    }

    /**
     * Create the the housekeeping thread of the scheduler. It will
     * wait for the time when the next task needs to be started,
     * and then launch a worker thread to execute the task.
     */
    private void houseKeeping()
    {
        String taskName = null;
        try
        {
            while (enabled.get())
            {
                JobEntry je = nextJob();
                if (je != null)
                {
                    taskName = je.getTask();

                    // Get a thread to run the job.
                    threadPool.execute(new WorkerThread(je));
                }
                else
                {
                    break;
                }
            }
        }
        catch (Exception e)
        {
            log.error("Error running a Scheduled Job: {}", taskName, e);
            setEnabled(false);
        }
        finally
        {
            clearThread();
        }
    }
}