Introduction
Ensuring certain computations happen on the right thread, usually off the main thread, is a very common development task when dealing with reactive flows. When building up tools for Java 9's Flow API, one can decide to add this thread-switching support to each operator directly - see the range() operator from the start of the series -, or have a standalone stage for this purpose.
This is a tradeoff. Inlining thread switching avoids bogging down the source thread like the thread-stealing behavior of most of the queue-drain approach presented so far. A separate operator allows better composition and may even allow working with exotic asynchrony-providing components.
The observeOn operator
In Java, threading support is provided via the Executor, ExecutorService and ScheduledExecutorService-based API. Executor is is the most basic one of them which only provides a single execute(Runnable) method. This allows creating an Executor from a lambda:
Executor trampoline = Runnable::run; Executor swing = SwingUtilities::invokeLater; Executor pool = ForkJoinPool.commonPool();
As the least common denominator, we'll use Executor in defining our observeOn operator:
public static <T> Flow.Publisher<T> observeOn( Flow.Publisher<T> source, Executor exec, int prefetch) { return new ObserveOnPublisher<>(source, exec, prefetch); }
Crossing an asynchronous boundary requires the temporary storage of an event until the other side can pick it up. The queue-drain approach can provide a nice bounded queue we can size with prefetch. In addition, the so-called stable-prefetch request management (shown in the mapFilter operator before) allows minimizing the overhead of requesting more items.
First, let's see the skeleton of the operator's main Flow.Subscriber implementation:
static final class ObserveOnSubscriber<T> implements Flow.Subscriber<T>, Flow.Subscription, Runnable { final Flow.Subscriber<? super T> downstream; final Executor exec; final int prefetch; final Queue<T> queue; Flow.Subscription upstream; int wip; static final VarHandle WIP = VH.find(MethodHandles.lookup(), ObserveOnSubscriber.class, "wip", int.class); long requested; static final VarHandle REQUESTED = VH.find(MethodHandles.lookup(), ObserveOnSubscriber.class, "requested", long.class); boolean done; static final VarHandle DONE = VH.find(MethodHandles.lookup(), ObserveOnSubscriber.class, "done", boolean.class); boolean cancelled; static final VarHandle CANCELLED = VH.find(MethodHandles.lookup(), ObserveOnSubscriber.class, "cancelled", boolean.class); Throwable error; long emitted; int consumed; ObserveOnSubscriber( Flow.Subscriber<? super T> downstream, Executor exec, int prefetch ) { this.downstream = downstream; this.exec = exec; this.prefetch = prefetch; this.queue = new SpscArrayQueue<>(prefetch); } @Override public void onSubscribe(Flow.Subscription s) { // TODO implement } @Override public void onNext(T item) { // TODO implement } @Override public void onError(Throwable throwable) { // TODO implement } @Override public void onComplete() { // TODO implement } @Override public void request(long n) { // TODO implement } @Override public void cancel() { // TODO implement } void schedule() { // TODO implement } @Override public void run() { // TODO implement } }
The SpscArrayQueue is available from the JCTools library. VH is a shortand utility class for getting a VarHandle for a particular field:
public final class VH { public static VarHandle find( MethodHandles.Lookup lookup, Class clazz, String field, Class type) { try { return lookup.findVarHandle(clazz, field, type); } catch (Throwable ex) { throw new InternalError(ex); } } }
If you are using IntelliJ IDEA, the latest version has nice parameter matching support for findVarHandle and highlights it if the field or type doesn't match the target class' member definition. Unfortunately, the method throws a checked exception which requires wrapping it in try-catch unavailable at field initialization time. By factoring the logic out, we lose that support but gain some other convenience. Needless to say, if the field names or types are off, we'll get a nice InternalError during unit tests anyway. Note that the MethodHandles.Lookup instance has to be provided because otherwise the find method could not access the fields of the target class due to visibility restrictions enforced by the JVM otherwise.
Now let's describe the fields quickly:
- downstream comes from the parent Flow.Publisher's subscribe method as usual.
- exec is the Executor the operator will use
- prefetch defines the number of items to request from the upstream when the connection is established. A specific proportion (75%) will be used for replenishing items.
- queue will hold up to the given prefetch amount of items. Since there is only one thread calling onNext and one thread draining the queue, we'll use a bounded single-producer single-consumer queue.
- upstream is received through onSubscribe from the upstream source that allows requesting and cancelling the flow.
- wip will ensure there is only one drain running at a time, executing on the given Executor. This is the same trampolining logic as with other concurrent operators: a transition from 0 to 1 will start the drain process and any further increments to wip will indicate additional work has to be done by the drain process.
- requested will track the items requested by the downstream as the requesting is decoupled by the operator. The reason for this is that the operator uses a bounded buffer and there is no way to expect the downstream to request only as much as the buffer can hold at a given moment.
- done indicates the upstream has finished emitting items. Calling onComplete on the downstream immediately is not an option as there could be items still queued up.
- cancelled indicates the downstream issued a cancel() call and both the drain loop and the upstream has to stop producing events.
- error holds the Throwable from the upstream. We'll only read it if done == true ensuring proper cross-thread visibility.
- emitted counts how many items have been emitted towards the downstream. Decrementing the requested field is possible but generally expensive due to unavoidable atomics (and may not auto-batch decrements at all, thus each item incurs the cost).
- consumed counts how many items have been taken from the queue. It can be reset to zero when a certain limit is reached (75% of prefetch for example) and issue a request() towards the upstream, asking for a fixed number of items periodically.
@Override public void onSubscribe(Flow.Subscription s) { upstream = s; downstream.onSubscribe(this); s.request(prefetch); } @Override public void onNext(T item) { queue.offer(item); schedule(); } @Override public void onError(Throwable throwable) { error = throwable; DONE.setRelease(this, true); schedule(); } @Override public void onComplete() { DONE.setRelease(this, true); schedule(); }
The implementation of Flow.Subscription part is similarly not that complicated. The handling of non-positive request amount is left out for brevity. We have to trigger the draining logic when the downstream requests in case the upstream has items queued up already due to possible speed difference.
@Override public void request(long n) { for (;;) { long a = (long)REQUESTED.getAcquire(this); long b = a + b; if (b < 0L) { b = Long.MAX_VALUE; } if (REQUESTED.compareAndSet(this, a, b)) { schedule(); break; } } @Override public void cancel() { if (CANCELLED.compareAndSet(this, false, true)) { upstream.cancel(); if ((int)WIP.getAndAdd(this, 1) == 0) { queue.clear(); } } }
The request() uses the typical, capped atomic addition of the requested amount. The cancel() method sets the cancelled flag once, cancels the upstream and if there is no draining happening at the moment, clears the queue to help the GC.
The schedule() method conceptually works like any drain() method we've implemented so far. The difference is that the drain loop inside it has to be run by the Executor (potentially) on another thread:
void schedule() { if ((int)WIP.getAndAdd(this, 1) == 0) { exec.execute(this); } }
By implementing Runnable directly, it saves us creating the drain task all the time; here, all the state that need to be communicated between the caller of schedule() and the run() method is done through fields. Since the transition from 0 to 1 and later on, N back to 0 happens on a single thread, the underlying Executor can be a pool of any threads; this type of trampolining will make the Executor to pick one thread from this pool and while the drain lasts, prevent any other thread from the same pool to start another drain run (which would violate the Flow protocol and/or reorder events).
Finally, let's see the implementation of run() that holds the rest of a typical drain-loop:
@Override public void run() { int missed = 1; Flow.Subscriber<? super T> a = downstream; Queue<T> q = queue; long e = emitted; int c = consumed; int limit = prefetch - (prefetch >> 2); for (;;) { long r = (long)REQUESTED.getAcquire(this); while (e != r) { // TODO implement } if (e == r) { // TODO implement } emitted = e; consumed = c; missed = (int)WIP.getAndAdd(this, -missed) - missed; if (missed == 0) { break; } } }
The pattern for the drain loop is pretty standard: the missing counter will detect if there is more work to be done, we load fields into local variables to avoid fetching them from last-level-cache due to all the atomics around. Lastly, we have the usual while loop that keeps running until we run out of requests or upstream events, then checking if by reaching a terminal state in the operator, the terminal events can be emitted without actual requests or not.
There is nothing unusual at this point in the inner while loop and the if statements:
while (e != r) { if ((boolean)CANCELLED.getAcquire(this)) { q.clear(); return; } boolean d = (boolean)DONE.getAcquire(this); T v = q.poll(); boolean empty = v == null; if (d && empty) { Throwable ex = error; if (ex == null) { a.onComplete(); } else { a.onError(ex); } return; } if (empty) { break; } a.onNext(v); e++; if (++c == limit) { c = 0; upstream.request(limit); } } if (e == r) { if ((boolean)CANCELLED.getAcquire(this)) { q.clear(); return; } boolean d = (boolean)DONE.getAcquire(this); boolean empty = q.isEmpty(); if (d && empty) { Throwable ex = error; if (ex == null) { a.onComplete(); } else { a.onError(ex); } return; } }
Refitting operators with async drain
As hinted during the conclusion of the orderedMerge() operator, sometimes we'd want to drain the events of a multi-source operator on another thread. If one thinks about the structure shown in the previous section, the solution to the problem should be clear: propagate an Executor instance into the operator, implement Runnable, keep the WIP increment in drain() and move the rest of the method into the run() method:
final Executor exec; // ... void drain() { if (getAndIncrement() != 0) { return; } exec.execute(this); } @Override public void run() { int missed = 1; Flow.Subscriber downstream = this.downstream; Comparator comparator = comparator; OrderedMergeInnerSubscriber[] subscribers = this.subscribers; int n = subscribers.length; Object[] values = this.values; long e = emitted; for (;;) { long r = (long)REQUESTED.getAcquire(this); for (;;) { /* unchanged, left out for brevity */ } emitted = e; missed = addAndGet(-missed); if (missed == 0) { break; } } }
Cancelling the drain task
So far, we used the void execute(Runnable) method and relied upon cancelled to stop the drain process itself. However, there are a couple of problems:
- What if the onNext call on downstream blocks or takes a long time?
- What if the Executor is so busy the drain doesn't execute within a timeout?
One way of dealing with this situation is to use the more advanced ExecutorService API and work with instances provided by the java.util.concurrent.Executors utility class (or the ForkJoinPool.commonPool()). Their submit() method returns a Future instance we can call cancel() on. The benefit is that all the Thread interruption infrastructure is provided and we don't have to write that ourselves:
final ExecutorService execService; // ... void schedule() { if ((int)WIP.getAndAdd(this, 1) == 0) { Future<?> future = exec.submit(this); // do something with "future" } }
At this point we have to make future available to the cancellation routine in some thread-safe fashion because cancel() can be called at any time from any thread, even while the schedule() method is executing. The solution is to use the deferred cancellation pattern. First we have to define a field to store the current Future of the running drain task and an associated VarHandle to perform atomic operations with the reference:
Future<?> future; static final VarHandle FUTURE = VH.find(MethodHandles.lookup(), ObserveOnSubscriber.class, "future", Future.class); static final Future<?> CANCELLED_TASK = new FutureTask<>(() -> { }, null);
Next, we have to update the schedule() method to store the Future obtained from the ExecutorService.submit call not just atomically, but also avoiding a peculiar race.
void schedule() { if ((int)WIP.getAndAdd(this, 1) == 0) { Future<?> old = (Future<?>)FUTURE.getAcquire(this); if (old != CANCELLED_TASK) { Future<?> future = exec.submit(this); if (!FUTURE.compareAndSet(this, old, future)) { old = (Future<?>)FUTURE.getAcquire(this); if (old == CANCELLED_TASK) { future.cancel(true); } } } } }
Once the submit() returns, we might no longer be under the protection of the wip != 0 and thus a concurrent call to schedule() (via request() for example) may have triggered another drain run with another Future instance. If we'd update the future reference unconditionally, that newer Future instance would be knocked out and the task-cancellation would no longer work properly.
In addition, a concurrent cancellation may have swapped in the CANCELLED_TASK indicator in which case we should cancel the Future right away. There is no need to loop this construct because if the CAS fails, it is either due to a cancellation or a newer Future task. The former requires cancelling the returned Future and the latter can be ignored because the returned Future is practically finished anyway.
@Override public void cancel() { if (CANCELLED.compareAndSet(this, false, true)) { upstream.cancel(); Future<?> future = (Future<?>)FUTURE.getAndSet(this, CANCELLED_TASK); if (future != null && future != CANCELLED_TASK) {
future.cancel(true); } if ((int)WIP.getAndAdd(this, 1) == 0) { queue.clear(); } } }
The cancel() method requires less changes: we atomically replace the current future with the CANCELLED_TASK indicator and if otherwise the operator was not cancelled already, we cancel the non-null Future instance and carry on.
One can, of course, go extra lengths and make sure the future field is nulled out in between drain runs to avoid leaking references with certain executor service-alikes.
Conclusion
In this post I've shown how one can implement a thread-switching operator, observeOn in RxJava terms, with the Java 9 Flow API and the standard Java async-providing in options such as Executors. The presented algorithms which automatically trampoline the draining of the queued events allows using all sorts of Executors, including a completely synchronous Runnable::run Executor that executes the task on the caller thread. Since the operator's logic is largely independent of how the asynchrony is established, it can be tested quite easily in a synchronous unit test by itself or as part of a complicated chain of potentially asynchronous flows.
In practice, one may have consider the handling of a RejectedExecutionException thrown by the execute/submit methods on certain ExecutorServices. The trouble is that even though downstream.onError can be called - as no drain() is/can run at that time -, the downstream may really expect errors also delivered on the same background thread as the onNext items. In addition, one has to suppress other onXXX signals from the upstream similar to how a failed user-provided function in an onNext has to suppress any subsequent onError or onComplete call by the upstream because these terminal events are not required to stop emitting events. This part is left to the library developer to decide on his/her own.
In the next post, we'll see how one can stop a flow if items didn't arrive in a timely manner.
Nincsenek megjegyzések:
Megjegyzés küldése