Callable with the timeout in Java ExecutorService

Callable with the timeout in Java ExecutorService

ExecutorService is a concurrency framework introduced in JDK 5. It empowers Java developers to write multithreaded applications without being concerned about thread management such as Thread or Runnable. ExecutorService does all that and keeps all threads (Callable) in a thread pool for reusability. That avoids the expensive cost of thread creation. Using ExecutorService, however, means that we have less flexibility. One of which is timeout control of a spun thread. In this article, we focus on how to use callable or runnable with the timeout in Java ExecutorService.

Set future timeout

With great power comes great responsibility. Without in-depth familiarity with the ExecutorService, the code is built on the wrong foundation, and debugging it will be tedious. To set a timeout for a ExecutorService managed Runnable, or Callable we need to know how to interrupt the said Runnable or Callable.

Interrupting a Runnable, or Callable that is controlled via ExecutorService is possible through its Future that returns after calling submit() method of the executor.

Future.get() accepts timeout with TimeUnit. Something like this:

Future<?> future = executorService.submit(aCallable);
future.get(2, TimeUnit.seconds);

The above code sets a two seconds timeout on the Callable. If the task does not finish in two seconds, TimeoutException will be thrown which should be handled properly. Otherwise, the Callable continues to run. The best approach is to call the future.cancel(true) like the below:

try {
    futureTask.get(timeoutInSeconds, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException ex) {
   ex.printStackTrace();
} catch (TimeoutException timeoutException) {
   futureTask.cancel(true);
}

However, that is not the end of the story. The cancel method does not force the Callable to stop. To ensure that, let’s look at the Javadoc of the cancel(boolean mayInterruptIfRunning),

Attempts to cancel execution of this task. This attempt will fail if the task has already completed, has already been canceled, or could not be canceled for some other reason. If successful, and this task has not started when cancel is called, this task should never run. If the task has already started, then the mayInterruptIfRunning parameter determines whether the thread executing this task should be interrupted in an attempt to stop the task.

After this method returns, subsequent calls to isDone() will always return true. Subsequent calls to isCancelled() will always return true if this method returned true.

As Javadoc mentioned, cancel attempts to cancel the execution of the tasks. But what is it depend on? It depends on the Callable implementation.

Let’s assume that our Runnable, or Callable in the example is a dummy loop like this:

public class DummyRunnable implements Runnable {
    @Override
    public void run() {
        for(long i=0; i<=10000000;i++) {
            System.out.println(i);
        }
    }
}

With such an implementation calling Future.cancel(true) does not stop the task because it does not handle interruption at all. In fact, Future.cancel(true) sets the interrupt flag to true in the Callable thread that requires a proper handling. Correct interruption handling is as follows:

public class DummyRunnable implements Runnable {
    @Override
    public void run() {
        for(long i=0; i<=10000000;i++) {
            System.out.println(i);
            if(Thread.currentThread().isInterrupted()) { 
                return;
            }
        }
    }
}

In the above snippet, we check whether the thread is interrupted. If yes, the method exits and won’t continue further. Otherwise, it continues as usual.

To put everything together, we can rewrite the code like the one below,

import java.util.concurrent.*;

public class Service {

    private final ExecutorService executorService;

    public Service() {
        executorService = Executors.newSingleThreadExecutor();
    }

    public void testExecutorServiceInterrputCallable() {
            setTimeout(executorService.submit(() -> someBlockingTask()), 1);
            System.out.println("Continue main thread!");
            shutdownExecutorService();
    }

    private void setTimeout(Future<?> futureTask, int timeoutInSeconds) {
        try {
            futureTask.get(timeoutInSeconds, TimeUnit.SECONDS); // 1
        } catch (InterruptedException | ExecutionException ex) {
            ex.printStackTrace();
        } catch (TimeoutException timeoutException) {
            futureTask.cancel(true); // 2
        }
    }

    private void someBlockingTask() {
        for(long i=0; i<=10000000;i++) {
            System.out.println(i);
            if(Thread.currentThread().isInterrupted()) { // 3
                System.out.println("The callable has been interrupted!");
                return;
            }
        }
    }

    private void shutdownExecutorService() {
        executorService.shutdown();
        if(!executorService.isShutdown()) {
            executorService.shutdownNow();
        }
    }
}

Common misconceptions

In many online articles and StackOverflow threads, wrong advice is provided to readers. That happens mainly because of some incorrect assumptions. Authors or commenters obviously did not read the documentation deeply enough and took methods’ functionalities at the face value based on their names.

Here, we gathered some of the most common mistakes:

  1. Future.get(long, TimeUnit) stops the running Runnable, or Callable.
  2. Future.cancel(true) stops the running Runnable, Callable.
  3. No need to check isInterrupted() method.
  4. Thread.interrupted(), is the same as Thread.currentThread().isInterrupted().

The above assumptions are all wrong. The correct assumptions are:

  1. Future.get(long, TimeUnit) causes the Future to throw TimeoutException that requires explicit error handling
  2. Future.cancel(true) only sets the interrupted flag of the running Runnable, or Callable thread and nothing else
  3. Without proper checking in the Runnable, or Callable thread will not stop
  4. Thread.interrupted() is a static method that checks whether the thread is interrupted since last call or not. It then resets the flag. On the other hand, Thread.currentThread().isInterrupted() checks whether the thread is interrupted at all or not. It does not reset any flag.

Conclusion

In this article, we discussed setting a callable with the timeout in Java ExecutorService. Not only we should pass the timeout when spinning a thread, but we also have to write the code in a way that stops when it receives the interrupt signal. Lastly, we discussed some common misconceptions and pitfalls in online resources. If you are interested to know more about ExecutorService, we recommend reading Java Concurrency in Practice book by Brian Goetz, available on Amazon.

Inline/featured images credits