<< The Spiel >>
Java 5 brings a new arsenal of weaponry when it comes to creating highly scalable concurrent applications. The JVM has been improved to allow classes to take advantage of hardware-level concurrency support, and a rich set of new concurrency classes has been provided to make it easier to develop thread-safe, well-tested, high-performance concurrent applications.
The following code example makes full use of these new features and leverages the power of Spring to fully automate an asynchronous job to run periodically for a given duration (or forever if required). There is also a couple of extra goodies in the bag. Not only do jobs get kicked off, we also get a result back when they finish. And if the unimaginable happens and the job hangs, no problem, a timeout will occur, thus ensuring that no threads are left hanging around to hinder our application’s robustness.
<< The Brains >>
package com.twopaths.core.asynchronous;
import static java.util.concurrent.TimeUnit.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
public class AsynchronousProcessor<T>
implements Runnable, ApplicationContextAware {
protected final Log logger = LogFactory.getLog(this.getClass());
protected static boolean started = false;
private ScheduledFuture handle;
// Specifies which bean in spring to instantiate as a new job
// Default is job
private String jobBean = "job";
// Specified initial delay before kicking off the first job
// Default is 1 second
private int initialDelay = 1;
// Specifies the delay between finishing and starting a new job
// Default is 10 seconds
private int delay = 10;
// Specifies how long to wait for a job to complete before timing out
// Default is 60 seconds
private int timeout = 60;
// Specifies how long job will keep running
// Default is 1 hour. Set to zero to run forever.
private int duration = 60 * 60;
protected final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(1);
protected final ExecutorService THREADPOOL =
Executors.newCachedThreadPool();
private ApplicationContext applicationContext;
public void setApplicationContext(ApplicationContext applicationContext)
throws BeansException {
this.applicationContext = applicationContext;
}
public AsynchronousProcessor() {
this.start();
}
protected synchronized void start() {
if (!this.started) { // Ensures that it only starts up once
this.started = true;
final ScheduledFuture handle =
scheduler.scheduleWithFixedDelay(this, this.getInitialDelay(),
this.getDelay(), SECONDS);
if (this.getDuration() > 0) {
scheduler.schedule(new Runnable() {
public void run() {handle.cancel(true);}
}, this.getDuration(), SECONDS);
}
this.setHandle(handle);
}
}
public void run() {
try {
Callable<T> job =
(Callable<T>) applicationContext.getBean(this.getJobBean());
T result = this.call(job, this.getTimeout());
this.logger.info(result);
} catch (Exception e) {
this.logger.error("Exception occured when running job", e);
}
}
protected <T> T call(Callable<T> c, long timeout)
throws InterruptedException, ExecutionException, TimeoutException {
FutureTask<T> t = new FutureTask<T>(c);
THREADPOOL.execute(t);
T result = t.get(timeout, SECONDS);
return result;
}
public void destroy() {
if ((this.getHandle() != null) && (!this.getHandle().isCancelled())) {
this.getHandle().cancel(true);
}
}
public String getJobBean() {
return jobBean;
}
public void setJobBean(String jobBean) {
this.jobBean = jobBean;
}
public ScheduledFuture getHandle() {
return handle;
}
protected void setHandle(ScheduledFuture handle) {
this.handle = handle;
}
public int getDelay() {
return delay;
}
public void setDelay(int delay) {
this.delay = delay;
}
public int getInitialDelay() {
return initialDelay;
}
public void setInitialDelay(int initialDelay) {
this.initialDelay = initialDelay;
}
public int getTimeout() {
return timeout;
}
public void setTimeout(int timeout) {
this.timeout = timeout;
}
public int getDuration() {
return duration;
}
public void setDuration(int duration) {
this.duration = duration;
}
}
<< The Slave >>
package com.twopaths.core.asynchronous;
import java.util.concurrent.Callable;
public class HelloWorldJob<T> implements Callable<T> {
private String message = "Hello World"; // default
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public T call() {
return (T) message;
}
}
<< The Glue >>
<?xml version=”1.0″ encoding=”UTF-8″?>
<beans xmlns=”http://www.springframework.org/schema/beans”>
<bean id=”job”
class=”com.twopaths.core.asynchronous.HelloWorldJob”
scope=”prototype”/>
<bean id=”asynchronousProcessor”
class=”com.twopaths.core.asynchronous.AsynchronousProcessor”
destroy-method=”destroy”/>
</beans>