Introduction
There are cases where mapping an upstream value of type T has to be mapped to type U , one-for-one, but the mapping process itself involves asynchronous work. With RxJava, this is a de-facto use case for concatMap, concatMapEager and flatMap, depending on the concurrency expectations about the mapping itself (i.e., one at a time, multiple at once but in-order and arbitrary order respectively).
Let's assume we don't want to run multiple concurrent mapping thus concatMap would suffice. We can (and will in a future post) write that operator, but we should face two additional challenges: the standard Java 9 Flow API has no notion of 0..1 reactive type so we have to restrict the inner Flow.Publisher to at most one element (take(1)); and we'd sometimes zip the original and the mapped result into a third type R.
These requirements warrant their own custom operator, enter mapWhen().
The mapWhen operator
I must admit, the name comes from Reactor-Core after they picked my implementation named mapAsync() from RxJava 2 Extensions. It certainly matches the naming of other operators, such retryWhen(), but arguably the function parameter signature is different (i.e., not a Publisher -> Publisher transformation):public static <T, U> Flow.Publisher<U> mapWhen(Flow.Publisher<T> source, Function<? super T, ? extends Flow.Publisher<U>> mapper) { return mapWhen(source, mapper, (t, u) -> u); } public static <T, U, R> Flow.Publisher<R> mapWhen(Flow.Publisher<T> source, Function<? super T, ? extends Flow.Publisher<U>> mapper, BiFunction<? super T, ? super U, ? extend R> combiner ) { return new FlowMapWhen<>(source, mapper, combiner); }
One would think that supporting the combiner case with the same operator implementation adds unreasonable overhead. We'll see later that this is not the case because both the original and mapped value will be available in a way that makes application (t, u) -> u bi-function a trivial, and when JIT-ed, a fall-through case.
I'll omit the outer FlowMapWhen class' implementation because it is just a boilerplate forwarding the Flow.Subscriber and the two functional callbacks to the actual MapWhenSubscriber operator implementation. Let's see the skeleton of the MapWhenSubscriber:
// inside class FlowMapWhen static final class MapWhenSubscriber<T, U, R> implements Flow.Subscriber<T>, Flow.Subscription { final Flow.Subscriber<? super R> downstream; final Function<? super T, ? extends Flow.Publisher<U>> mapper; final BiFunction<? super T, ? super U, ? extend R> combiner; final T[] buffer; long producerIndex; long consumerIndex; Flow.Subscription upstream; int wip; boolean done; Throwable error; boolean cancelled; long emitted; int consumed; long requested; U mapperResult; int mapperState; MapWhenInnerSubscriber<T, U, R> inner; // ------------------------------------- static final VarHandle WIP; static final VarHandle ERROR; static final VarHandle DONE; static final VarHandle CANCELLED; static final VarHandle REQUESTED; static final VarHandle BUFFER; static final VarHandle MAPPER_STATE; static final VarHandle INNER; static final MapWhenInnerSubscriber<Object, Object, Object> INNER_CANCELLED = new MapWhenInnerSubscriber<>(null); static { Lookup lk = MethodHandles.lookup(); try { WIP = lk.findVarHandle( MapWhenSubscriber.class, "wip", int.class); ERROR = lk.findVarHandle( MapWhenSubscriber.class, "error", Throwable.class); DONE = lk.findVarHandle( MapWhenSubscriber.class, "done", boolean.class); CANCELLED = lk.findVarHandle( MapWhenSubscriber.class, "cancelled", boolean.class); REQUESTED = lk.findVarHandle( MapWhenSubscriber.class, "requested", long.class); MAPPER_STATE = lk.findVarHandle( MapWhenSubscriber.class, "mapperState", int.class); INNER = lk.findVarHandle( MapWhenSubscriber.class, "inner", MapWhenInnerSubscriber.class); } catch (Throwable ex) { throw new InternalError(ex); } BUFFER = MethodHandles.arrayElementVarHandle(Object[].class); } // ------------------------------------- MapWhenSubscriber( Flow.Subscriber<? super R> downstream, Function<? super T, ? extends Flow.Publisher<U>> mapper, BiFunction<? super T, ? super U, ? extend R> combiner) { this.downstream = downstream; this.mapper = mapper; this.combiner = combiner; this.buffer = (T[])new Object[Flow.defaultBufferSize()]; } @Override public void onSubscribe(Flow.Subscription s) { // TODO implement } @Override public void onNext(T item) { // TODO implement } @Override public void onError(Throwable throwable) { // TODO implement } @Override public void onComplete() { // TODO implement } @Override public void request(long n) { // TODO implement } @Override public void cancel() { // TODO implement } void innerSuccess(U result) { // TODO implement } void innerError(Throwable throwable) { // TODO implement } void innerComplete() { // TODO implement } void updateError(Throwable throwable) { // TODO implement } void drain() { // TODO implement } }
We have quite a set of fields so let's describe them from top to bottom:
- downstream represents the recipient of each mapped value.
- mapper is the primary mapping function that creates a Flow.Publisher for each individual upstream value
- combiner is the secondary mapping function that turns the original upstream value and the asnyc-mapped value into the end result.
- buffer is a power-of-2 fixed size circular buffer for prefetching and holding the upstream values efficiently. Since we'll match the buffer size with backpressure requests, it will never overflow. Accessing the array elements will have to be atomic, thus we will use the BUFFER VarHandle to accomplish that.
- producerIndex and consumerIndex are pointers into the buffer and will be updated by the onNext()'s thread and the drain() thread respectively.
- upstream is the connection to the upstream Flow.Publisher; we'll request from it in a stable prefetch manner.
- wip is the work-in-progress counter for the usual queue-drain serialization approach: all sorts of events (requesting, upstream values, mapping responses) will be in flight but only one thread can act on them at a time. Changing this value has to be atomic thus the WIP companion VarHandle is also present.
- done indicates the upstream has finished emitting events. It could be volatile but we can save some nanoseconds by using the DONE VarHandle and by using setRelease otherwise not available. This is correct because, as we'll see, setting done is always followed by a full-barrier atomic integer to wip.
- cancelled is an indication from downstream that the processing should stop. We'll check this inside the drain loop. Again, this could be also simply a volatile field but we'll use compareAndSet in cancel() to make the cancellation process happen at most once. For this, the VarHandle CANCELLED will be used.
- emitted tracks how many result items have been sent to the downstream consumer. When the emitted amount is equal to the requested amount, we quit emitting temporarily until the downstream requests more. No concurrent access happens to this variable because it will be only incremented from within the drain-loop.
- consumed tracks the number of items processed from the upstream so that when a certain threshold has been reached, a replenishing request() call will be issued for more. Reusing consumerIndex is not an option because we have to reset the counter whenever such replenishing request() call happens.
- requested tracks the total amount the downstream consumer has requested. This has to be atomically updated and the REQUESTED VarHandle will help us ensure that. Tracking requested and emitted separately instead of decrementing the requested amount saves us again a few nanoseconds.
- mapperResult stores the result of the mapper's activity or null if there was no response.
- mapperState indicates where the inner mapping is at: 0 indicates no mapper Flow.Publisher is being observed, 1 indicates observation is in progress but no values yet, 2 indicates a value has been received and the main flow can continue and 3 indicates the observation completed without any value. Since observing and changing this state can happen from different threads, the MAPPER_STATE VarHandle's atomics will ensure proper visibility.
- inner holds onto the currently running MapWhenInnerSubscriber observing the generated Flow.Publisher for an upstream value. If the downstream cancels the entire flow, the operator should prevent any further subscription to an inner Flow.Publisher that may happen concurrently. The VarHandle INNER helps with this "terminal atomics" scenario through a static cancellation indicator instance INNER_CANCELLED of type MapWhenInnerSubscriber.
static final class MapWhenInnerSubscriber<T, U, R> extends AtomicReference<Flow.Subscription> implements Flow.Subscriber<U>, Flow.Subscription { final MapWhenSubscriber<T, U, R> parent; MapWhenInnerSubscriber(MapWhenSubscriber<T, U, R> parent) { this.parent = parent; } @Override public void onSubscribe(Flow.Subscription s) { if (compareAndSet(null, s)) { s.request(Long.MAX_VALUE); } else { s.cancel(); } } @Override public void onNext(U item) { Flow.Subscription upstream = getPlain(); if (upstream != this) { setPlain(this); upstream.cancel(); parent.innerSuccess(item); } } @Override public void onError(Throwable throwable) { Flow.Subscription upstream = getPlain(); if (upstream != this) { setPlain(this); parent.innerError(throwable); } } @Override public void onComplete() { Flow.Subscription upstream = getPlain(); if (upstream != this) { setPlain(this); parent.innerComplete(); } } void cancel() { Flow.Subscription upstream = getAndSet(this); if (upstream != null && upstream != this) { upstream.cancel(); } } }
When working with inner subscriber such as this, I found it practical to delegate most logic back to the parent coordinator class; sometimes this allows the reuse of the inner Flow.Subscriber implementation, other times it allows us to focus on the behavior of the operator now confined into a single parent class.
One new pattern that the MapWhenInnerSubscriber shows is the use of getPlain() and setPlain(). These were Java 9 additions to the AtomicXXX classes that allows us to access its contents without barriers. Since onNext(), onError() and onComplete() are guaranteed to execute sequentially, we don't really need atomics in the case when an onNext() should prevent any subsequent effects of an onError() on onComplete(). Usually, a library has its standard cancelled Flow.Subscription indicator defined somewhere but we will simply resort to using this to indicate a cancelled MapWhenInnerSubscriber, hence it also implements Flow.Subscription to support this case.
Now let's get back to the main MapWhenSubscriber and implement the methods.
onSubscribe
@Override public void onSubscribe(Flow.Subscription s) { this.upstream = s; downstream.onSubscribe(this); s.request(Flow.defaultBufferSize()); }
A typical stable-prefetch onSubscribe implementation: save the upstream handle, introduce ourselves to downstream and then request the fixed amount from upstream.
onNext
@Override public void onNext(T item) { T[] buf = buffer; int mask = buf.length - 1; long pi = producerIndex; BUFFER.setRelease(buf, (int)pi & mask, item); producerIndex = pi + 1; drain(); }
The code before drain() is usually hidden behind a Queue.offer() implementation; you may recognize algorithm from the JCTools library's SpscArrayQueue implementation. Of course I could have just used that but then we'd miss a nice use for VarHandles, that unlike field updaters, can target array elements directly. Compared to the JCTools version, there are two important differences: no look-ahead, no item padding (avoid false sharing) and no overflow detection. The latter is omitted because we are in a backpressured flow: remember we sized the buffer with Flow.defaultBufferSize() and requested the same amount? By definition a reactive flow should not emit more than requested. Note that we don't map the upstream item into a Flow.Publisher and subscribe MapWhenInnerSubscriber here, unlike a flatMap() for example, because we want only one of them to be active at a time and we want to wait for it to signal.
onError
Before we implement onError(), a helper method needs to be introduced that captures a behavior required by later handler methods as well:
void updateError(Throwable throwable) { for (;;) { Throwable current = (Throwable)ERROR.getAcquire(this); Throwable next; if (current == null) { next = throwable; } else { next = new Throwable(); next.addSuppressed(current); next.addSuppressed(throwable); } if (ERROR.compareAndSet(this, current, next)) { break; } } }
Perhaps the easier behavior is to delay errors until all upstream values have been processed and present the accumulated errors at once at the end. The trouble is that both the upstream and the inner Flow.Publisher may signal error at the same time. To handle this case, we do something like a copy-on-write atomic update. If error delaying is not preferred, a simple ERROR.compareAndSet(this, null, throwable) suffices (don't forget to do something with throwable in case the compareAndSet returns false!).
@Override public void onError(Throwable throwable) { updateError(throwable); DONE.setRelease(this, true); drain(); }
Handling the upstream error requires us to update the errors we are collecting, indicating the upstream has completed and trigger a drain() that will perform the necessary emissions of events.
onComplete
@Override public void onComplete() { DONE.setRelease(this, true); drain(); }
Handling the normal upstream completion is trivial (and similar to onError): indicate the upstream completed and issue drain() to emit events as the current overall state indicates.
request
@Override public void request(long n) { if (n <= 0L) { updateError(new IllegalArgumentException("non-negative request expected")); } else { for (;;) { long current = (long)REQUESTED.getAcquire(this); long next = current + n; if (next < 0L) { next = Long.MAX_VALUE; } if (REQUESTED.compareAndSet(this, current, next)) { break; } } } drain(); }
Handling downstream requests is done by aggregating them atomically and capping it to Long.MAX_VALUE. Non-positive requests are honored by an IllegalArgumentException as the spec requires. Either way, drain() will actually deliver the relevant events.
cancel
@Override public void cancel() { if (CANCELLED.compareAndSet(this, false, true)) { upstream.cancel(); MapWhenInnerSubscriber inner = (MapWhenInnerSubscriber)INNER.getAndSet(this, INNER_CANCELLED); if (inner != null && inner != INNER_CANCELLED) { inner.cancel(); } if ((int)WIP.getAndAdd(this, 1) == 0) { innerResult = null; Arrays.fill(buffer, null); } } }
We atomically change into the cancelled state (exactly once) and cancel the upstream. Since there might be an active inner observation going on, we have to cancel that as well and prevent any further observation to take place by storing INNER_CANCELLED. Note the unfortunate need for casting the returned object from the VarHandle invocation. The last part, incrementing the wip counter is there to make sure the buffer and any current result is cleaned up even when there is no drain loop running at the moment.
innerSuccess
@Override public void innerSuccess(U result) { this.innerResult = result; MAPPER_STATE.setRelease(this, 2); drain(); }
Handling the emission from the inner source requires storing the mapped result into the field, indicating that there is a value available (2) and issuing a drain() to perform the signal emissions accordingly.
innerError
@Override public void innerError(Throwable throwable) { updateError(throwable); MAPPER_STATE.setRelease(this, 3); drain(); }
Handling the error from the MapWhenInnerSubscriber looks similar, update the errors, indicate the empty state (3) for the inner result and issue a drain().
innerComplete
@Override public void innerComplete() { MAPPER_STATE.setRelease(this, 3); drain(); }
Pretty much a copy-paste of innerError(), minus the updating of the error of course.
drain
Arguably, the methods described in previous sections were only to prepare the state for the actual workhorse: the drain() method which is a corner stone for most lock-free operator implementations. Since we are to coalesce the handling of upstream values with the handling of inner results, the drain() method will be described in several code listings.First, let's write the skeleton drain loop:
void drain() { if ((int)WIP.getAndAdd(this, 1) != 0) { return; } int missed = 1; Flow.Subscriber<? super R> downstream = this.downstream; T[] buf = buffer; int mask = buf.length - 1; int limit = buf.length - (buf.length >> 2); long ci = consumerIndex; int c = consumed; long e = emitted; for (;;) { long r = (long)REQUESTED.getAcquire(this); while (e != r) { // TODO implement } if (e == r) { // TODO implement } consumerIndex = ci; consumed = c; emitted = e; missed = (int)WIP.getAndAdd(this, -missed) - missed; if (missed) { break; } } }
So far, this is a typical drain loop that loads frequently needed values into local variables, can emit when there are unfulfilled requests and saves any changes to the tracking variables before (potentially) giving up the emission thread. Unfortunately, there is no VarHandle.addAndGet (unlike AtomicInteger) thus we have to get the effect from its dual getAndAdd: we have to manually subtract the value from the current value returned by the method to get back the "current" after-value of the wip field.
The next step is to implement the while loop. It has two main responsibilities: figuring out there are no more upstream values and acting on the result of the inner Flow.Publisher.
while (e != r) { if ((boolean)CANCELLED.getAcquire(this)) { mapperResult = null; Arrays.fill(buf, null); return; } boolean d = (boolean)DONE.getAcquire(this); T value = (T)buf[(int)ci & mask]; boolean empty = value == null; if (d && empty) { Throwable error = (Throwable)ERROR.getAcquire(this); if (error == null) { downstream.onComplete(); } else { downstream.onError(error); } return; } if (empty) { break; } // TODO implement the rest }
First, we detect a cancellation and clean up the internal storage of the operator before quitting. Then, we gather the state of the operator. Is it done? Is there a current upstream value to work with?
If the upstream is done and there are no further buffered values, we check for an error and terminate the downstream accordingly. Note that the buffer is not "dequeued" for the consumerIndex (ci) until its associated inner Flow.Publisher has produced its result. If the upstream has simply not produced its next item, we quit the while loop.
The next step is to determine what to do with the upstream value. There are four cases to handle:
int state = (int)MAPPER_STATE.getAcquire(this); if (state == 0) { Flow.Publisher<U> publisher; try { publisher = Objects.requireNonNull(mapper.apply(value)); } catch (Throwable ex) { updateError(ex); MAPPER_STATE.setRelease(3); continue; } MapWhenInnerSubscriber<T, U, R> next = new MapWhenInnerSubscriber<>(this); MapWhenInnerSubscriber current = (MapWhenInnerSubscriber)INNER.getAcquire(this); if (current != INNER_CANCELLED && INNER.compareAndSet(this, current, next)) { MAPPER_STATE.setRelease(1); publisher.subscribe(next); } else { return; } } else if (state == 2) { // TODO implement } else if (state == 3) { // TODO implement } else { break; }
State 0 is when there is no ongoing inner observation and given an upstream value, we can setup that observation. First, we map the upstream value into a Flow.Publisher instance, checking nullness along the way. If this mapping crashes for some reason, we update the error tracking and jump to state 3 similar to innerError(). Otherwise, we prepare a new MapWhenInnerSubscriber and unless there has been a concurrent cancellation, we atomically store it in the inner field, update the state to 1 and subscribe to the Flow.Publisher with it. Whether or not this inner source is synchronous or not, the drain loop protects against reentrance.
State 1 indicates an ongoing inner observation in which case there is nothing to do but quit the while loop and wait for another drain() invocation by one of the innerXXX() methods.
State 2 happens when the inner observation produces a result of type U:
if (state == 2) { U result = mapperResult; mapperResult = null; R output; try { output = Objects.requireNonNull(combiner.apply(value, result)); } catch (Throwable ex) { updateError(ex); MAPPER_STATE.setRelease(3); continue; } downstream.onNext(output); e++; BUFFER.setRelease(buf, (int)ci & mask, null); ci++; if (++c == limit) { c = 0; upstream.request(limit); } MAPPER_STATE.setRelease(0); }
We pick up the inner result, apply the combiner function to it to get back the output for the downstream. Again, if the mapping crashes, we update the error tracking and move to state 3. Once the emission happened, we increment the emission counter, release the buffer slot for the original item, increment the consumer index for getting the next upstream value later on. We increment the number of items consumed from upstream (c) and once it hits the prefetch limit (75% of the buffer size), we reset the count and request more from upstream. Finally, we move back to state 0 to allow the handling the next item.
The last state, 3, handles the case when there was no inner result at all; we have to move on to the next upstream item:
if (state == 3) { BUFFER.setRelease(buf, (int)ci & mask, null); ci++; if (++c == limit) { c = 0; upstream.request(limit); } MAPPER_STATE.setRelease(0); }
This time, there is nothing to combine and we don't have anything of type R to emit, thus what remains is the release of the buffer slot. The emission count is unchanged in this case but we still have to replenish as we have just handled an item from upstream. Finally, we move back to state 0.
The last section deals with the case when a terminal state has been reached but the downstream didn't request anything. The Reactive-Streams specification allows signalling a terminal event without requests and it is often desirable to end a stream eagerly.
if (e == r) { if ((boolean)CANCELLED.getAcquire(this)) { mapperResult = null; Arrays.fill(buf, null); return; } if ((boolean)DONE.getAcquire(this) && buf[(int)ci & mask] == null) { Throwable error = (Throwable)ERROR.getAcquire(this); if (error == null) { downstream.onComplete(); } else { downstream.onError(error); } return; } }
Practically, it is just the front half of the code from the while loop in a more concise fashion: cleanup if cancellation happened when there was no request or see if we run out of upstream items and no more will arrive.
Conclusion
I think we can agree that these types of operators can become quite complicated due to the potential asynchronous execution of its parts. Working out the state management is a difficult without some experience and even then, ensuring proper visibility (and finding bugs due the lack of it) is an advanced developer responsibility.Perhaps the main takeaway for mapWhen is that when events from multiple streams have to be managed, the safest bet is to use the queue-drain approach. Other method should focus on preparing the "queue" or state of the operator, including proper visibility, so that the draining thread can see and act upon that state while also honoring the sequential call requirements of the specification.
In the next post, we'll see how one can handle the task of merging multiple streams while keeping the order of the resulting sequence according to a Comparator.
There is property/local variable named `s` inside `onNext`/`onError`/`onComplete`, you it is being compared with `this`. Can you please correct the code in the article
VálaszTörlésThanks
Fixed.
Törlés