001package org.apache.turbine.services.schedule; 002 003/* 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, 015 * software distributed under the License is distributed on an 016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 017 * KIND, either express or implied. See the License for the 018 * specific language governing permissions and limitations 019 * under the License. 020 */ 021 022import java.util.List; 023 024import org.apache.logging.log4j.LogManager; 025import org.apache.logging.log4j.Logger; 026import org.apache.turbine.services.InitializationException; 027import org.apache.turbine.services.TurbineBaseService; 028import org.apache.turbine.util.TurbineException; 029 030/** 031 * Service for a cron like scheduler. 032 * 033 * @author <a href="mailto:mbryson@mont.mindspring.com">Dave Bryson</a> 034 * @author <a href="mailto:quintonm@bellsouth.net">Quinton McCombs</a> 035 * @version $Id: TorqueSchedulerService.java 534527 2007-05-02 16:10:59Z tv $ 036 */ 037public abstract class AbstractSchedulerService extends TurbineBaseService implements ScheduleService 038{ 039 /** Logging */ 040 protected static final Logger log = LogManager.getLogger(ScheduleService.LOGGER_NAME); 041 042 /** The queue */ 043 protected JobQueue<JobEntry> scheduleQueue = null; 044 045 /** Current status of the scheduler */ 046 protected boolean enabled = false; 047 048 /** The main loop for starting jobs. */ 049 protected MainLoop mainLoop; 050 051 /** The thread used to process commands. */ 052 protected Thread thread; 053 054 /** 055 * Creates a new instance. 056 */ 057 public AbstractSchedulerService() 058 { 059 mainLoop = null; 060 thread = null; 061 } 062 063 /** 064 * Initializes the SchedulerService. 065 * 066 * @throws InitializationException 067 * Something went wrong in the init stage 068 */ 069 @Override 070 public void init() throws InitializationException 071 { 072 try 073 { 074 setEnabled(getConfiguration().getBoolean("enabled", true)); 075 scheduleQueue = new JobQueue<>(); 076 mainLoop = new MainLoop(); 077 078 @SuppressWarnings("unchecked") // Why is this cast necessary? 079 List<JobEntry> jobs = (List<JobEntry>)loadJobs(); 080 scheduleQueue.batchLoad(jobs); 081 restart(); 082 083 setInit(true); 084 } 085 catch (Exception e) 086 { 087 throw new InitializationException("Could not initialize the scheduler service", e); 088 } 089 } 090 091 /** 092 * Load all jobs from configuration storage 093 * 094 * @return the list of pre-configured jobs 095 * @throws TurbineException if jobs could not be loaded 096 */ 097 protected abstract List<? extends JobEntry> loadJobs() throws TurbineException; 098 099 /** 100 * Shutdowns the service. 101 * 102 * This methods interrupts the housekeeping thread. 103 */ 104 @Override 105 public void shutdown() 106 { 107 if (getThread() != null) 108 { 109 getThread().interrupt(); 110 } 111 } 112 113 /** 114 * @see org.apache.turbine.services.schedule.ScheduleService#newJob(int, int, int, int, int, java.lang.String) 115 */ 116 @Override 117 public abstract JobEntry newJob(int sec, int min, int hour, int wd, int day_mo, String task) throws TurbineException; 118 119 /** 120 * Get a specific Job from Storage. 121 * 122 * @param oid 123 * The int id for the job. 124 * @return A JobEntry. 125 * @throws TurbineException 126 * job could not be retrieved. 127 */ 128 @Override 129 public abstract JobEntry getJob(int oid) throws TurbineException; 130 131 /** 132 * Add a new job to the queue. 133 * 134 * @param je 135 * A JobEntry with the job to add. 136 * @throws TurbineException 137 * job could not be added 138 */ 139 @Override 140 public void addJob(JobEntry je) throws TurbineException 141 { 142 updateJob(je); 143 } 144 145 /** 146 * Remove a job from the queue. 147 * 148 * @param je 149 * A JobEntry with the job to remove. 150 * @throws TurbineException 151 * job could not be removed 152 */ 153 @Override 154 public abstract void removeJob(JobEntry je) throws TurbineException; 155 156 /** 157 * Add or update a job. 158 * 159 * @param je 160 * A JobEntry with the job to modify 161 * @throws TurbineException 162 * job could not be updated 163 */ 164 @Override 165 public abstract void updateJob(JobEntry je) throws TurbineException; 166 167 /** 168 * List jobs in the queue. This is used by the scheduler UI. 169 * 170 * @return A List of jobs. 171 */ 172 @Override 173 public List<JobEntry> listJobs() 174 { 175 return scheduleQueue.list(); 176 } 177 178 /** 179 * Sets the enabled status of the scheduler 180 * 181 * @param enabled true to enable the scheduler 182 * 183 */ 184 protected void setEnabled(boolean enabled) 185 { 186 this.enabled = enabled; 187 } 188 189 /** 190 * Determines if the scheduler service is currently enabled. 191 * 192 * @return Status of the scheduler service. 193 */ 194 @Override 195 public boolean isEnabled() 196 { 197 return enabled; 198 } 199 200 /** 201 * Starts or restarts the scheduler if not already running. 202 */ 203 @Override 204 public synchronized void startScheduler() 205 { 206 setEnabled(true); 207 restart(); 208 } 209 210 /** 211 * Stops the scheduler if it is currently running. 212 */ 213 @Override 214 public synchronized void stopScheduler() 215 { 216 log.info("Stopping job scheduler"); 217 Thread thread = getThread(); 218 if (thread != null) 219 { 220 thread.interrupt(); 221 } 222 enabled = false; 223 } 224 225 /** 226 * Return the thread being used to process commands, or null if there is no 227 * such thread. You can use this to invoke any special methods on the 228 * thread, for example, to interrupt it. 229 * 230 * @return A Thread. 231 */ 232 public synchronized Thread getThread() 233 { 234 return thread; 235 } 236 237 /** 238 * Set thread to null to indicate termination. 239 */ 240 protected synchronized void clearThread() 241 { 242 thread = null; 243 } 244 245 /** 246 * Start (or restart) a thread to process commands, or wake up an existing 247 * thread if one is already running. This method can be invoked if the 248 * background thread crashed due to an unrecoverable exception in an 249 * executed command. 250 */ 251 public synchronized void restart() 252 { 253 if (enabled) 254 { 255 log.info("Starting job scheduler"); 256 if (thread == null) 257 { 258 // Create the the housekeeping thread of the scheduler. It will 259 // wait for the time when the next task needs to be started, 260 // and then launch a worker thread to execute the task. 261 thread = new Thread(mainLoop, ScheduleService.SERVICE_NAME); 262 // Indicate that this is a system thread. JVM will quit only 263 // when there are no more enabled user threads. Settings threads 264 // spawned internally by Turbine as daemons allows commandline 265 // applications using Turbine to terminate in an orderly manner. 266 thread.setDaemon(true); 267 thread.start(); 268 } 269 else 270 { 271 notify(); 272 } 273 } 274 } 275 276 /** 277 * Return the next Job to execute, or null if thread is interrupted. 278 * 279 * @return A JobEntry. 280 * @throws TurbineException 281 * a generic exception. 282 */ 283 protected synchronized JobEntry nextJob() throws TurbineException 284 { 285 try 286 { 287 while (!Thread.interrupted()) 288 { 289 // Grab the next job off the queue. 290 JobEntry je = scheduleQueue.getNext(); 291 292 if (je == null) 293 { 294 // Queue must be empty. Wait on it. 295 wait(); 296 } 297 else 298 { 299 long now = System.currentTimeMillis(); 300 long when = je.getNextRuntime(); 301 302 if (when > now) 303 { 304 // Wait till next runtime. 305 wait(when - now); 306 } 307 else 308 { 309 // Update the next runtime for the job. 310 scheduleQueue.updateQueue(je); 311 // Return the job to run it. 312 return je; 313 } 314 } 315 } 316 } 317 catch (InterruptedException ex) 318 { 319 // ignore 320 } 321 322 // On interrupt. 323 return null; 324 } 325 326 /** 327 * Inner class. This is isolated in its own Runnable class just so that the 328 * main class need not implement Runnable, which would allow others to 329 * directly invoke run, which is not supported. 330 */ 331 protected class MainLoop implements Runnable 332 { 333 /** 334 * Method to run the class. 335 */ 336 @Override 337 public void run() 338 { 339 String taskName = null; 340 try 341 { 342 while (enabled) 343 { 344 JobEntry je = nextJob(); 345 if (je != null) 346 { 347 taskName = je.getTask(); 348 349 // Start the thread to run the job. 350 Runnable wt = new WorkerThread(je); 351 Thread helper = new Thread(wt); 352 helper.start(); 353 } 354 else 355 { 356 break; 357 } 358 } 359 } 360 catch (Exception e) 361 { 362 log.error("Error running a Scheduled Job: {}", taskName, e); 363 enabled = false; 364 } 365 finally 366 { 367 clearThread(); 368 } 369 } 370 } 371}