A Polling Java Future
There are times in Java, when we want to act on a state transition in some object. Ideally, the object allows us to register a listener, which is called when the state changes. Unfortunately, this is not always the case, especially when the class stems from a third-party library and is outside our control. In this case, we need to poll the state regularly, to detect the change we are interested in and then invoke the callback we want. The Java concurrency library offers a neat abstraction with the CompletableFuture. In this post we try to wrap the polling into such a future, that can be used by a wide variety of frameworks, especially for reactive programming.
As a starting point we need to have an object, that we want to observe. Let us assume, this object implements org.springframework.context.Phased. Essentially, this interface looks like this:
public interface Phased {
int getPhase();
}
We want to create a CompletableFuture, that gets completed, when our observed phase has a certain value, say 1 for this post. So we want to create some implementation for the following method:
public <P extends Phased> CompletableFuture<P> awaitPhaseOne(P observable) {
// poll observable until observable.getPhase() == 1
return CompletableFuture.failedFuture(
new IllegalStateException("not yet implemented")
);
}
For now, we just return a failed future until we have the correct implementation. But let us have a look at the message signature first: The function returns a future, that potentially wraps the observed object. This allows consumers to execute further operations on this object:
awaitPhaseOne(observable).thenApply(o -> doSomething(o));
This return type is not the only possibility, it just serves as a useful example.
Let us now have a look, how we can use java.util.concurrent to implement our polling future. We will need three ingredients:
- the CompletableFuture, that we want to create
- a Runnable, that does the actual polling
- a ScheduledExecutorService, that executes the Runnable
Putting it all together we arrive at the following implementation.
public <P extends Phased> CompletableFuture<P> awaitPhaseOne(P observable) {
ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor();
CompletableFuture<P> future = new CompletableFuture<>();
ScheduledFuture<?> scheduled = executor.scheduleAtFixedRate(
// the Runnable executed in each polling run
() -> {
if (observable.getPhase() == 1) {
future.complete(observable);
}
},
100, // initial delay
200, // period between executions
TimeUnit.MILLISECONDS // time unit
);
future.whenComplete((obs, thrown) -> scheduled.cancel(true));
return future;
}
We start with creating a new executor.
This is a very simplistic approach, that we will revisit later.
Next we create the future, that is our desired CompletableFuture.
We have not checked our observable yet, so the future is not completed.
The polling is now started by scheduling a Runnable with the executor.
We use an anonymous lambda function, that checks the phase of our observable.
If we find the desired phase 1, we complete the future.
The runnable is scheduled to start polling after 100 milliseconds with an interval of 200 milliseconds.
Of course, these values will be configurable in a production setting.
Before we return the future we need to ensure, that the polling is stopped once the future is completed.
This is done with a small callback using future.whenComplete.
Our implementation separates nicely the code that is testing the observable from the polling infrastructure.
The underlying design is a closure consisting of the Runnable accessing both the observable and the CompletableFuture.
A ScheduledExecutorService allows scheduling with a fixed rate, as we used in our implementation or scheduling with fixed delay in between executions.
More sophisticated schedules like for example exponential back-off require custom scheduling of the runs.
Let us come back to the creation of the ScheduledExecutorService: With the code above, a new instance is created for every call to the function awaitPhaseOne().
This implementation will spawn a new thread during each invocation.
If it is called at high frequency, this is a severe performance issue.
We can solve this issue by using a globally managed ScheduledExecutorService either in the surrounding class or from some other part of our application.
Another optimization is to check the polling condition before scheduling the Runnable and return immediately. Combining this pre-check with an extracted ScheduledExecutorService, we get the following implementation:
public class PollingFuture {
private final ScheduledExecutorService executor;
public PollingFuture(int corePoolSize) {
this.executor = Executors.newScheduledThreadPool(corePoolSize);
}
public <P extends Phased> CompletableFuture<P> awaitPhaseOne(P observable) {
if (isPhaseOne(observable)) {
return CompletableFuture.completedFuture(observable);
}
CompletableFuture<P> future = new CompletableFuture<>();
ScheduledFuture<?> scheduled = executor.scheduleAtFixedRate(
// the Runnable executed in each polling run
() -> {
if (isPhaseOne(observable)) {
future.complete(observable);
}
}, 100, // initial delay
200, // period between executions
TimeUnit.MILLISECONDS // time unit
);
future.whenComplete((obs, thrown) -> scheduled.cancel(true));
return future;
}
private <P extends Phased> boolean isPhaseOne(P observable) {
return observable.getPhase() == 1;
}
}
Note, that the first check of the polling condition is now executed during the function invocation in the main thread.
If this was a long-running execution, this early check may hurt performance more, than what can be gained by not scheduling the Runnable.
The whole logic on when to complete the CompletableFuture is now extracted to the private function isPhaseOne(...).
Everything else is the glue code to create the polling future.
In summary, we developed a small class, that allows us to wrap polling for some condition into a CompletableFuture. We have seen, that handling a ScheduledExecutorService is required for the implementation. The resulting Future can be used in different asynchronous applications, e.g. with Spring WebFlux a reactive stack for building web applications.