001package org.apache.turbine.services.schedule; 002 003/* 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, 015 * software distributed under the License is distributed on an 016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 017 * KIND, either express or implied. See the License for the 018 * specific language governing permissions and limitations 019 * under the License. 020 */ 021 022import java.util.List; 023import java.util.concurrent.ExecutorService; 024import java.util.concurrent.Executors; 025import java.util.concurrent.atomic.AtomicBoolean; 026 027import org.apache.commons.lang3.concurrent.BasicThreadFactory; 028import org.apache.logging.log4j.LogManager; 029import org.apache.logging.log4j.Logger; 030import org.apache.turbine.services.InitializationException; 031import org.apache.turbine.services.TurbineBaseService; 032import org.apache.turbine.util.TurbineException; 033 034/** 035 * Service for a cron like scheduler. 036 * 037 * @author <a href="mailto:mbryson@mont.mindspring.com">Dave Bryson</a> 038 * @author <a href="mailto:quintonm@bellsouth.net">Quinton McCombs</a> 039 * @version $Id: TorqueSchedulerService.java 534527 2007-05-02 16:10:59Z tv $ 040 */ 041public abstract class AbstractSchedulerService extends TurbineBaseService implements ScheduleService 042{ 043 /** Logging */ 044 protected static final Logger log = LogManager.getLogger(ScheduleService.LOGGER_NAME); 045 046 /** The queue */ 047 protected JobQueue<JobEntry> scheduleQueue = null; 048 049 /** Current status of the scheduler */ 050 private AtomicBoolean enabled = new AtomicBoolean(false); 051 052 /** The housekeeping thread. */ 053 protected Thread houseKeepingThread; 054 055 /** The thread pool used to process jobs. */ 056 protected ExecutorService threadPool; 057 058 /** 059 * Initializes the SchedulerService. 060 * 061 * @throws InitializationException 062 * Something went wrong in the init stage 063 */ 064 @Override 065 public void init() throws InitializationException 066 { 067 try 068 { 069 setEnabled(getConfiguration().getBoolean("enabled", true)); 070 scheduleQueue = new JobQueue<>(); 071 threadPool = Executors.newCachedThreadPool( 072 new BasicThreadFactory.Builder() 073 .namingPattern("Turbine-ScheduledJob-") 074 .daemon(true) 075 .build()); 076 077 @SuppressWarnings("unchecked") // Why is this cast necessary? 078 List<JobEntry> jobs = (List<JobEntry>)loadJobs(); 079 scheduleQueue.batchLoad(jobs); 080 restart(); 081 082 setInit(true); 083 } 084 catch (Exception e) 085 { 086 throw new InitializationException("Could not initialize the scheduler service", e); 087 } 088 } 089 090 /** 091 * Load all jobs from configuration storage 092 * 093 * @return the list of pre-configured jobs 094 * @throws TurbineException if jobs could not be loaded 095 */ 096 protected abstract List<? extends JobEntry> loadJobs() throws TurbineException; 097 098 /** 099 * Shutdowns the service. 100 * 101 * This methods interrupts the housekeeping thread. 102 */ 103 @Override 104 public void shutdown() 105 { 106 if (getThread() != null) 107 { 108 getThread().interrupt(); 109 } 110 111 threadPool.shutdownNow(); 112 } 113 114 /** 115 * @see org.apache.turbine.services.schedule.ScheduleService#newJob(int, int, int, int, int, java.lang.String) 116 */ 117 @Override 118 public abstract JobEntry newJob(int sec, int min, int hour, int wd, int day_mo, String task) throws TurbineException; 119 120 /** 121 * Get a specific Job from Storage. 122 * 123 * @param oid 124 * The int id for the job. 125 * @return A JobEntry. 126 * @throws TurbineException 127 * job could not be retrieved. 128 */ 129 @Override 130 public abstract JobEntry getJob(int oid) throws TurbineException; 131 132 /** 133 * Add a new job to the queue. 134 * 135 * @param je 136 * A JobEntry with the job to add. 137 * @throws TurbineException 138 * job could not be added 139 */ 140 @Override 141 public void addJob(JobEntry je) throws TurbineException 142 { 143 updateJob(je); 144 } 145 146 /** 147 * Remove a job from the queue. 148 * 149 * @param je 150 * A JobEntry with the job to remove. 151 * @throws TurbineException 152 * job could not be removed 153 */ 154 @Override 155 public abstract void removeJob(JobEntry je) throws TurbineException; 156 157 /** 158 * Add or update a job. 159 * 160 * @param je 161 * A JobEntry with the job to modify 162 * @throws TurbineException 163 * job could not be updated 164 */ 165 @Override 166 public abstract void updateJob(JobEntry je) throws TurbineException; 167 168 /** 169 * List jobs in the queue. This is used by the scheduler UI. 170 * 171 * @return A List of jobs. 172 */ 173 @Override 174 public List<JobEntry> listJobs() 175 { 176 return scheduleQueue.list(); 177 } 178 179 /** 180 * Sets the enabled status of the scheduler 181 * 182 * @param enabled true to enable the scheduler 183 * 184 */ 185 protected void setEnabled(boolean enabled) 186 { 187 this.enabled.set(enabled); 188 } 189 190 /** 191 * Determines if the scheduler service is currently enabled. 192 * 193 * @return Status of the scheduler service. 194 */ 195 @Override 196 public boolean isEnabled() 197 { 198 return enabled.get(); 199 } 200 201 /** 202 * Starts or restarts the scheduler if not already running. 203 */ 204 @Override 205 public synchronized void startScheduler() 206 { 207 setEnabled(true); 208 restart(); 209 } 210 211 /** 212 * Stops the scheduler if it is currently running. 213 */ 214 @Override 215 public synchronized void stopScheduler() 216 { 217 log.info("Stopping job scheduler"); 218 Thread thread = getThread(); 219 if (thread != null) 220 { 221 thread.interrupt(); 222 } 223 setEnabled(false); 224 } 225 226 /** 227 * Return the thread being used to process commands, or null if there is no 228 * such thread. You can use this to invoke any special methods on the 229 * thread, for example, to interrupt it. 230 * 231 * @return A Thread. 232 */ 233 public synchronized Thread getThread() 234 { 235 return houseKeepingThread; 236 } 237 238 /** 239 * Set thread to null to indicate termination. 240 */ 241 protected synchronized void clearThread() 242 { 243 houseKeepingThread = null; 244 } 245 246 /** 247 * Start (or restart) a thread to process commands, or wake up an existing 248 * thread if one is already running. This method can be invoked if the 249 * background thread crashed due to an unrecoverable exception in an 250 * executed command. 251 */ 252 public synchronized void restart() 253 { 254 if (enabled.get()) 255 { 256 log.info("Starting job scheduler"); 257 if (houseKeepingThread == null) 258 { 259 // Create the the housekeeping thread of the scheduler. It will 260 // wait for the time when the next task needs to be started, 261 // and then launch a worker thread to execute the task. 262 houseKeepingThread = new Thread(() -> houseKeeping(), ScheduleService.SERVICE_NAME); 263 // Indicate that this is a system thread. JVM will quit only 264 // when there are no more enabled user threads. Settings threads 265 // spawned internally by Turbine as daemons allows commandline 266 // applications using Turbine to terminate in an orderly manner. 267 houseKeepingThread.setDaemon(true); 268 houseKeepingThread.start(); 269 } 270 else 271 { 272 notify(); 273 } 274 } 275 } 276 277 /** 278 * Return the next Job to execute, or null if thread is interrupted. 279 * 280 * @return A JobEntry. 281 * @throws TurbineException 282 * a generic exception. 283 */ 284 protected synchronized JobEntry nextJob() throws TurbineException 285 { 286 try 287 { 288 while (!Thread.interrupted()) 289 { 290 // Grab the next job off the queue. 291 //JobEntry je = scheduleQueue.getNext(); 292 JobEntry je = scheduleQueue.getFirst(); 293 294 if (je == null) 295 { 296 // Queue must be empty. Wait on it. 297 wait(); 298 } 299 else 300 { 301 long now = System.currentTimeMillis(); 302 long when = je.getNextRuntime(); 303 304 if (when > now) 305 { 306 // Wait till next runtime. 307 wait(when - now); 308 } 309 else 310 { 311 // Update the next runtime for the job. 312 scheduleQueue.updateQueue(je); 313 // Return the job to run it. 314 return je; 315 } 316 } 317 } 318 } 319 catch (InterruptedException ex) 320 { 321 // ignore 322 } 323 324 // On interrupt. 325 return null; 326 } 327 328 /** 329 * Create the the housekeeping thread of the scheduler. It will 330 * wait for the time when the next task needs to be started, 331 * and then launch a worker thread to execute the task. 332 */ 333 private void houseKeeping() 334 { 335 String taskName = null; 336 try 337 { 338 while (enabled.get()) 339 { 340 JobEntry je = nextJob(); 341 if (je != null) 342 { 343 taskName = je.getTask(); 344 345 // Get a thread to run the job. 346 threadPool.execute(new WorkerThread(je)); 347 } 348 else 349 { 350 break; 351 } 352 } 353 } 354 catch (Exception e) 355 { 356 log.error("Error running a Scheduled Job: {}", taskName, e); 357 setEnabled(false); 358 } 359 finally 360 { 361 clearThread(); 362 } 363 } 364}