Introduction
There are situations when the same elements of a source should be dispatched to multiple consumers. Certainly, if the source supports multiple subscribers and is deterministic (such as our previous async range), one can just instantiate the flow multiple times. However, if the source doesn't support multiple subscribers or each subscription ends up being unique and/or non-deterministic, that simple approach doesn't work anymore.
We need a way to have a single realization of the (upstream) source yet allow multiple consumers. Since we are dealing with Flow.Publishers that require backpressure management, such intermediate solution has to coordinate requests from its Flow.Subscribers in addition to handling the dynamic subscription and unsubscription (cancellation) of said Flow.Subscribers while the flow is active. Enter, MulticastProcessor.
Flow.Processor recap
What is a Processor? By definition, it is a combination of a Flow.Publisher and a Flow.Subscriber, i.e., it can act as a source and can be subscribed to via subscribe() as well as the processor itself can be used with somebody else's Flow.Publisher.subscribe(). It has a mixed history as the idea comes from the original Observer pattern (i.e., java.util.Observable) and Rx.NET's Subject that allows dispatching signals to multiple Observers in an imperative (and synchronous) fashion.
The Flow.Processor in Java 9 defines two type arguments, one for its input side (Flow.Subscriber) and one for its output side (Flow.Publisher). The idea behind it was that a Flow.Processor can act as a transformation step between an upstream and a downstream. However, such transformation often mandates the Flow.Processor implementation only accepts a single Flow.Subscriber during its entire lifetime. Since the implementation has to follow the Reactive Streams specification nonetheless, this adds a lot of overhead to the flow. As demonstrated in previous posts, when a flow is realized, there are only one subscriber per stage in it and thus the Flow.Processor overhead can be completely avoided.
Still, restricting the number of Flow.Subscribers has some use in operators such as groupBy() and window() where the inner Flow.Publishers are actually Flow.Processors that work in an unicast mode: holding onto elements until the consumer is ready to receive them while letting the main source progress with other groups/windows.
Of course, when the items in these inner Flow.Publishers are required by multiple processing stages, a separate multicasting stage, in the form of a Flow.Processor can come in quite handy.
MulticastProcessor
The first step is to define the skeleton of the class.
public final class MulticastProcessor<T> implements Flow.Processor<T, T> { final int prefetch; final SpscArrayQueue<T> queue; /* Some other fields, see below. */ public MulticastProcessor(int prefetch) { this.prefetch = prefetch; this.queue = new SpscArrayQueue<>(prefetch); } @Override public void subscribe(Flow.Subscriber<? super T> subsciber) { // TODO implement } @Override public void onSubscribe(Flow.Subscription s) { // TODO implement } @Override public void onNext(T item) { // TODO implement } @Override public void onError(Throwable t) { // TODO implement } @Override public void onComplete() { // TODO implement } }
The class has to implement 5 public methods from the Flow.Processor interface. Since the intended usage is to have multiple Flow.Subscribers and be able to subscribe the MulticastProcessor itself to some other Flow.Publisher, both the upstream's Flow.Subscription and any Flow.Subscribers have to be tracked. The input and output types are the same as there won't be any item transformation happening in this particular Flow.Processor implementation.
We need a queue to store the upstream values because some of the downstream Flow.Subscribers may not be ready to receive items and we don't want any discontinuity in item delivery for them. A JCTools single-producer single-consumer queue suffices because the items coming from the upstream are guaranteed to be single thread at a time and the dispatch logic consuming the queue will be serialized as well by MulticastProcessor.
The two sides can operate asynchronously to each other, we will need some atomics around the state of the MulticastProcessor:
Flow.Subscription upstream; static final VarHandle UPSTREAM = VH.find(MethodHandles.lookup(), MulticastProcessor.class, "upstream", Flow.Subscription.class); MulticastSubscription<T>[] subscribers = EMPTY; static final VarHandle SUBSCRIBERS = VH.find(MethodHandles.lookup(), MulticastProcessor.class, "subscribers", MulticastSubscription[].class); int wip; static final VarHandle WIP = VH.find(MethodHandles.lookup(), MulticastProcessor.class, "wip", int.class); static final MulticastSubscription[] EMPTY = new MulticastSubscription[0]; static final MulticastSubscription[] TERMINATED = new MulticastSubscription[0]; volatile boolean done; Throwable error; int consumed;
The upstream stores the Flow.Subscription received from the source to be multicasted. Flow.Subscribers can come and go independently and as such, a typical copy-on-write array structure with terminal state can be employed. If the terminal state has been reached, any subsequent subscribe() attempt will either call onError() or onComplete() on the Flow.Subscriber, depending on how the MulticastProcessor itself has been terminated (i.e., the error field is null or not). The VH utility class has been introduced in a previous post.
One could ask, why do we need the wip field? The upstream will invoke onNext synchronously on the MulticastProcessor so a simple loop over the current array of Flow.Subscribers should suffice, right? Not exactly. Becase we want every Flow.Subscriber to receive the same items in the same order without skipping, they will move in a so-called lockstep-mode. If a Flow.Subscriber is not ready to receive, the others won't receive data even if they could. Once this single Flow.Subscriber is ready to receive (it called request()), we have to try and drain the queue for the available items, which can happen from a different thread than the upstream is calling onNext from. Second, if a Flow.Subscriber decides to cancel and was preventing the others from making progress because of not requesting, its leave may enable the others to receive available items right on.
Due to the prefetching nature of MulticastProcessor, we won't be able to use the TERMINAL indicator as a completion indicator because the queue may hold non-dispatched elements when an onComplete() or onError() arrives. Therefore, a separate done flag is employed which tells the drain logic later on to not expect any further items from the queue.
Each individual Flow.Subscriber needs its own state so that its request amounts can be tracked, the MulticastSubscription class is employed:
static final class MulticastSubscription<T> extends AtomicLong implements Flow.Subscription { final Flow.Subscriber<? super T> downstream; final MulticastProcessor<T> parent; long emitted; MulticastSubscription(Flow.Subscriber<? super T> downstream, MulticastProcessor<T> parent) { this.downstream = downstream; this.parent = parent; } @Override public void request(long n) { if (n <= 0L) { cancel(); // for brevity, serialization is omitted here, see HalfSerializer. downstream.onError(new IllegalArgumentException()); } else { for (;;) { long current = (long)getAcquire(); if (current == Long.MIN_VALUE || current == Long.MAX_VALUE) { break; } long next = current + n; if (next < 0L) { next = Long.MAX_VALUE; } if (compareAndSet(current, next)) { parent.drain(); break; } } } } @Override public void cancel() { if ((long)getAndSet(Long.MIN_VALUE) != Long.MIN_VALUE) { parent.remove(this); parent.drain(); } } }
The MulticastSubscription takes a reference to the actual downstream Flow.Subscriber and the parent MulticastProcessor to trigger removal and item draining. The emitted field counts how many times downstream.onNext() was invoked to know the remaining items this particular subscription can take. The AtomicLong tracks the total downstream requested amounts so the remaining can be calculated with getAcquire() - emitted (should happen on a single thread). The Long.MIN_VALUE indicates the downstream has cancelled the subscription.
The parent MulticastProcessor requires a few services to handle the various MulticastSubscription instances: drain() to start emitting items if possible, remove() to let a particular Flow.Subscriber go. We'll need its pair, add(), to register a new Flow.Subscriber to work with. First, let's see add() and remove():
boolean add(MulticastSubscription<T> ms) { for (;;) { MulticastSubscription<T>[] current = (MulticastSubscription<T>[])SUBSCRIBERS.getAcquire(this); if (current == TERMINATED) { return false; } int n = current.length; MulticastSubscription<T>[] next = new MulticastSubscription[n + 1]; System.arraycopy(current, 0, next, 0, n); next[n] = ms; if (SUBSCRIBERS.compareAndSet(this, current, next)) { return true; } } }
The add() method should look familiar. We read the current array of MulticastSubscriptions, see if it is the terminal indicator (in which case we return false indicating the MulticastProcessor is in a terminal state an no new Flow.Subscribers are accepted) and if not, we copy and extend the array to incorporate the incoming new MulticastSubscription. The compareAndSet then ensures the changes are applied atomically and in case of a concurrent change, the process is retried.
void remove(MulticastSubscription<T> ms) { for (;;) { MulticastSubscription<T>[] current = (MulticastSubscription<T>[])SUBSCRIBERS.getAcquire(this); int n = current.length; if (n == 0) { break; } int j = -1; for (int i = 0; i < n; i++) { if (ms == current[i]) { j = i; break; } } if (j < 0) { break; } MulticastSubscription<T>[] next; if (n == 1) { next = EMPTY; } else { next = new MulticastSubscription[n + 1]; System.arraycopy(current, 0, next, 0, j); System.arraycopy(current, j + 1, next, j, n - j - 1); } if (SUBSCRIBERS.compareAndSet(this, current, next)) { break; } } }
The remove() method is fairly typical again. We read the current array and if it is empty (due to having no subscribers or being terminated) we quit. Otherwise we search for the index of the MulticastSubscription and if found, a new array is created and items copied around the index into it. The compareAndSet again ensures proper visibility and retry in case of concurrent state changes. If the current array happens to be one element long, we avoid creating a new empty array and simply reuse the shared static EMPTY instance.
We can now implement the subscribe() method:
@Override public void subscribe(Flow.Subscriber<? super T> subscriber) { Objects.requireNonNull(subscriber, "subscriber == null"); MulticastSubscription<T> ms = new MulticastSubscription<T>(subscriber, this); subscriber.onSubscribe(ms); if (add(ms)) { if ((long)ms.getAcquire() == Long.MIN_VALUE) { remove(ms); } drain(); } else { if ((long)ms.getAcquire() != Long.MIN_VALUE) { Throwable ex = error; if (ex == null) { ms.downstream.onComplete(); } else { ms.downstream.onError(ex); } } } }
First, we verify subscriber is not null. Then, an unique instance of MulticastSubscription is associated with the Flow.Subscriber. We try to register this with the MulticastProcessor via add(). This may fail in case the MulticastProcessor reached its terminal state in which case we emit the terminal event, unless the Flow.Subscriber has cancelled in the meantime (the request accounting shows Long.MIN_VALUE). This can happen concurrently (remember, we sent out the Flow.Subscription via onSubscribe before) with the add() call so the remove() triggered by MulticastSubscription.cancel() may not find itself in the array: calling it again ensures that the reference is not retained in this corner case. In either case, the additional call to drain() will ensure items are dispatched as soon as possible to the (remaining) parties.
One could ask, why not call add() first and then call onSubscribe() since as long as there is no request() from downstream, the visibility of the MulticastSubscription shouldn't be a problem, right? Not exactly. The Reactive Streams specification allows emitting onError or onComplete without prior request; in this case, since the reference to the MulticastSubscription would be visible sooner, the onError/onComplete could be invoked before onSubscribe which is a violation of the specification (and will likely lead to NullPointerException down the line). Reversing the call order avoids this violation but requires the aforementioned add()-if cancelled-remove() logic to avoid leaking cancelled subscriptions.
Let's see the implementation of the remaining 4 public methods:
@Override public void onSubscribe(Flow.Subscription s) { Objects.requireNonNull(s, "s == null"); if (UPSTREAM.compareAndSet(this, null, s)) { s.request(prefetch); } else { s.cancel(); } } @Override public void onNext(T item) { Objects.requireNonNull(item, "item == null"); queue.offer(item); drain(); } @Override public void onError(Throwable t) { Objects.requireNonNull(t, "t == null"); error = t; done = true; drain(); } @Override public void onComplete() { done = true; drain(); }
The onSubscribe atomically sets the Flow.Subscription if not already set, cancels it otherwise. The other onXXX methods queue the item/error/done indicator and call drain() to dispatch events if possible. The Reactive Streams specification requires throwing NullPointerException if the input parameters are null.
Lastly, we can now implement the drain() method, piece by piece for better clarity. It is implemented as our typical queue-drain loop:
void drain() { if ((int)WIP.getAndAdd(this, 1) != 0) { return; } int missed = 1; for (;;) { // TODO implement missed = (int)WIP.getAndAdd(this, -missed) - missed; if (missed == 0) { break; } } }
Usually, the inner loop starts with reading the requested amount, however, we don't have a single requested amount in this case: we have to find out the smallest number of items that can be emitted to all current Flow.Subscribers:
// for (;;) { MulticastSubscription<T>[] current = (MulticastSubscription<T>[])SUBSCRIBERS.getAcquire(this); int n = current.length; if (n != 0) { long requested = Long.MAX_VALUE; for (MulticastSubscription<T> ms : current) { long r = ms.getAcquire(); if (r != Long.MIN_VALUE) { requested = Math.min(requested, r - ms.emitted); } } // TODO try emitting handle } // }
Remember, the amount of items a particular MulticastSubscription can receive is its total requested amount minus the emitted items to it so far.
Next, we try to emit any available item from the queue:
// if (n != 0) { ... while (requested != 0) { boolean d = done; T v = queue.poll(); boolean empty = v == null; if (d && empty) { // TODO terminal case return; } if (empty) { break; } for (MulticastSubscription<T> ms : current) { if ((long)ms.getAcquire() != Long.MIN_VALUE) { ms.downstream.onNext(v); ms.emitted++; } } requested--; if (++consumed == (prefetch >> 1)) { upstream.request(consumed); consumed = 0; } } // TODO no request - no more items case // }
In this inner loop, which is quit if requested reaches zero, we try to get the next available item from the queue and if found, we emit it to non-cancelled Flow.Subscribers inside the MulticastSubscriptions. Since the MulticastProcessor works in a stable-prefetch mode, we have to track the consumption via consumed and request more after a certain amount of items have been taken. In this case, we request half of the prefetch amount if we reach that amount.
One case to be implemented is when the upstream has terminated and we emitted all queued items:
// if (d && empty) { Throwable ex = error; current = (MulticastSubscription<T>[])SUBSCRIBERS.getAndSet(this, TERMINATED); if (ex == null) { for (MulticastSubscription<T> ms : current) { if ((long)ms.getAcquire() != Long.MIN_VALUE) { ms.downstream.onComplete(); } } } else { for (MulticastSubscription<T> ms : current) { if ((long)ms.getAcquire() != Long.MIN_VALUE) { ms.downstream.onError(ex); } } } return; // }
We atomically swap in the terminal array which gives us the last array of MulticastSubscriptions. We have to do this because more subscribers could have arrived since the previous SUBSCRIBERS.getAcquire() and would be left non-terminated and hanging forever. The non-cancelled Flow.Subscribers are terminated with onError or onComplete based on the error field's contents.
Lastly, we have to deal with the case when none of the MulticastSubscribers have requested yet (or run out of requests) and the upstream has terminated without any further values stored in the queue:
// while (requested != 0) { // ... // } if (requested == 0) { boolean d = done; boolean empty = queue.isEmpty(); if (d && empty) { Throwable ex = error; current = (MulticastSubscription<T>[])SUBSCRIBERS.getAndSet(this, TERMINATED); if (ex == null) { for (MulticastSubscription<T> ms : current) { if ((long)ms.getAcquire() != Long.MIN_VALUE) { ms.downstream.onComplete(); } } } else { for (MulticastSubscription<T> ms : current) { if ((long)ms.getAcquire() != Long.MIN_VALUE) { ms.downstream.onError(ex); } } } return; } }
Unsurprisingly, this almost looks like as the fist part of the while (requested != 0) loop, except we don't poll the item if there is one as we can't put it back and would lose it.
Testing with the Reactive Streams TCK
Now that the code is complete, we want to make sure the implementation properly implements the Reactive Streams specification. We can do this via the Test Compatibility Kit of it, however, since Reactive Streams is a separate library predating the Java 9 Flow API, we need some adapters to utilize the TCK. Fortunately, the upcoming 1.0.2 both features such adapters and a Java 9 Flow-based port of the TCK itself. For that, one only needs to import the tck-flow (and its dependencies if doing it manually!):
compile "org.reactivestreams:reactive-streams-tck-flow:1.0.2-RC2"
When writing this blog post, the release candidate version is only available, hence the -RC2 postfix.
The TCK contains the IdentityFlowProcessorVerification we must implement in our tests. Note that the TCK uses TestNG instead of JUnit.
@Test public class MulticastProcessorTest extends IdentityFlowProcessorVerification<Integer> { public MulticastProcessorTest() { super(new TestEnvironment(50)); } @Override public ExecutorService publisherExecutorService() { return Executors.newCachedThreadPool(); } @Override public Integer createElement(int element) { return element; } @Override protected Flow.Publisher<Integer> createFailedFlowPublisher() { MulticastProcessormp = new MulticastProcessor<>(128); mp.onError(new IOException()); return mp; } @Override protected Flow.Processor<Integer, Integer> createIdentityFlowProcessor(int bufferSize) { return new MulticastProcessor<>(bufferSize); } }
At first glance, there is no complication in writing the test. We have to specify a TestEnvironment with a default timeout (50 milliseconds should be enough), we have t specify a helper ExecutorService from which test items will be emitted. Those test items will be created via createElement so the target type of the Flow.Processor to be tested can be matched (plain integer in this case). The createFailedFlowPublisher verifies an immediately failing Flow.Publisher (remember Flow.Processor is one) and the createIdentityFlowProcessor should return a fresh and empty processor instance.
If we run the test, 3 of the 68 tests will fail (plus 24 gets ignored as they don't apply or can't be properly verified as of now):
- required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError
- required_mustRequestFromUpstreamForElementsThatHaveBeenRequestedLongAgo
- required_exerciseWhiteboxHappyPath
From these, failures 1) and 2) are due to what the TCK expects the Flow.Processor behaves by default: independent requests from independend Flow.Subscribers must reach the upstream and produce elements independently. Unfortunately, our MulticastProcessor uses a prefetch and lockstep mechanisms, thus if only one Flow.Subscriber requested and the others didn't, nobody gets items. Fortunately, the 1.0.2 has been enhanced in this regard and one only has to override a method in the IdentityFlowProcessorVerification class:
@Override public boolean doesCoordinatedEmission() { return true; }
This will instruct the methods 1) and 2) to try a different request-emission pattern ant they should now pass.
Now lets look at 3) which fails with a different exception: "Did not receive expected cancelling of upstream subscription within 50 ms". Indeed, looking at the code, there upstream's Flow.Subscription.cancel() is never called on a normal path (only when onSubscribe is called twice).
What's happening? This is another default expectation of the TCK: when all Flow.Subscribers cancel the Flow.Processor implementation is expected to cancel its upstream promptly. This may not be the desired business behavior as one may want to keep dispatching even if Flow.Subscribers come and go and temporarily, there are no downstream to send events to. Unfortunately, there is no switch for this in the TCK and we have to modify the MulticastProcessor for anticipate this expectation. This is similar to the concept and behavior of refCount() in RxJava.
Luckily, it requires only a small set of changes. We can detect when all the Flow.Subscribers have cancelled conveniently in the remove() method and instead of setting the subscribers array to EMPTY, we set it to TERMINATED and cancel() the upstream instance:
void remove(MulticastSubscription<T> ms) { for (;;) { // the find index is omitted for brevity MulticastSubscription<T>[] next; if (n == 1) { if (UPSTREAM.getAcquire(this) == null) { next = EMPTY; } else { next = TERMINATED; } } else { next = new MulticastSubscription[n + 1]; System.arraycopy(current, 0, next, 0, j); System.arraycopy(current, j + 1, next, j, n - j - 1); } if (SUBSCRIBERS.compareAndSet(this, current, next)) { if (next == TERMINATED) { ((Flow.Subscription)UPSTREAM.getAcquire(this)).cancel(); } break; } } }
This change will allow Flow.Subscribers to come and go until the MulticastPublisher is subscribed to another Flow.Publisher without terminating the MulticastPublisher itself and otherwise will cancel the upstream on a live one.
Conclusion
In this post, I showed how a multicasting Flow.Processor can be implemented and then tested with the Reactive Streams TCK. In fact, one reason I was holding off the demonstration of this MulticastProcessor is because of the limits of the TCK itself: prior to 1.0.2-RC2, now available from Maven, there was no standard adapter and no support for a lockstepping Flow.Publisher implementation. Now that I contributed both and the maintainers released it after some time, we can now use it and experiment with various Java 9-based Flow.Processor and Flow.Publisher implementations.
We have applied the usual patterns: copy-on-write with terminal state, queue-drain and request accounting that should be quite familiar for veteran readers of this blog. Once the test indicated issues, we could simply adjust the logic to match its expectations by tweaking the nice thread-safe and lock-free logic of it.