Introduction
In this final part about Subjects, I'll show how to implement a PublishSubject. To make things a bit interesting, I'll implement it in a way that prevents PublishSubject overflowing its child subscribers in case they happen to be not requesting fast enough.
PublishSubject
The main body of the PublishSubject will look very similar to UnicastSubject from the last post, therefore, I'm going to skip those details and focus on the State class.The State class will look differently because in PublishSubject, there could be many Subscribers at once, each with its own unsubscribed, requested and wip counters. Therefore State can't implement Producer and Subscription anymore. We need to implement those in a new class, SubscriberState, and have an instance for each child Subscriber.
One thing before detailing the classes, namely the backpressure handling strategy. We want to give the user the ability to specify the backpressure-behavior of the PublishSubject, so he/she doesn't have to apply onBackpressureXXX methods on the output. For this, I define an enum with 3 values:
enum BackpressureStrategy { DROP, BUFFER, ERROR } public static <T> PublishSubject<T> createWith( BackpressureStrategy strategy) { State<T> state = new State<>(strategy); return new PublishSubject<>(state); }
The values should be self-explanatory: drop values, buffer values until the child can consume it or kick out the child Subscriber with an error.
Now let's see the skeleton of the State class:
static final class State<T> implements OnSubscribe<T>, Observer<T> { final BackpressureStrategy strategy; @SuppressWarnings("unchecked") volatile SubscriberState<T>[] subscribers = EMPTY; @SuppressWarnings("rawtypes") static final SubscriberState[] EMPTY = new SubscriberState[0]; @SuppressWarnings("rawtypes") static final SubscriberState[] TERMINATED = new SubscriberState[0]; volatile boolean done; Throwable error; public State(BackpressureStrategy strategy) { this.strategy = strategy; } boolean add(SubscriberState<T> subscriber) { // TODO Auto-generated method stub } void remove(SubscriberState<T> subscriber) { // TODO Auto-generated method stub } Subscriber<T>[] terminate() { // TODO Auto-generated method stub } @Override public void call(Subscriber<? super T> t) { // TODO Auto-generated method stub } @Override public void onNext(T t) { // TODO Auto-generated method stub } @Override public void onError(Throwable e) { // TODO Auto-generated method stub } @Override public void onCompleted() { // TODO Auto-generated method stub } }
Nothing standing out so far. We have a volatile array of SubscriberState instances, add, remove and terminate methods. By using EMPTY, we will avoid allocating an empty array whenever all subscribers unsubscribe. This pattern should be familiar from an earlier post about Subscription containers. Now let's see the implementation of add().
boolean add(SubscriberState<T> subscriber) { synchronized (this) { SubscriberState<T>[] a = subscribers; if (a == TERMINATED) { return false; } int n = a.length; @SuppressWarnings("unchecked") SubscriberState<T>[] b = new SubscriberState[n + 1]; System.arraycopy(a, 0, b, 0, n); b[n] = subscriber; subscribers = b; return true; } }
For the sake of diversity, the State class will use synchronized instead of an atomic CAS loop. The block is essentially a copy-on-write implementation. The benefit of such implementation is that looping through the current array of subscribers is faster and relies on the observation that many Subjects don't actually serve too many Subscribers at once. If, however, one encounters a case where the number of subscribers is large, one can use any list or set based container inside the block instead. The drawback there is that one needs a safe way to iterate over the collection which may only be possible by doing a defensive copy all the time.
Let's see the remove() method:
@SuppressWarnings("unchecked") void remove(SubscriberState<T> subscriber) { synchronized (this) { SubscriberState<T>[] a = subscribers; if (a == TERMINATED || a == EMPTY) { return; } int n = a.length; int j = -1; for (int i = 0; i < n; i++) { if (a[i] == subscriber) { j = i; break; } } if (j < 0) { return; } SubscriberState<T>[] b; if (n == 1) { b = EMPTY; } else { b = new SubscriberState[n - 1]; System.arraycopy(a, 0, b, 0, j); System.arraycopy(a, j + 1, b, j, n - j - 1); } subscribers = b; } }
Again, it is a copy-on-write implementation with the reuse of the empty array.
Next comes the terimate() method:
@SuppressWarnings("unchecked") SubscriberState<T>[] terminate() { synchronized (this) { SubscriberState<T>[] a = subscribers; if (a != TERMINATED) { subscribers = TERMINATED; } return a; } }
Here, we check if the current state isn't terminated and if so, we set the terminated array and return the last array of known subscribers.
Implementing the call() method is now possible:
@Override public void call(Subscriber<? super T> t) { SubscriberState<T> innerState = new SubscriberState<>(t, this); // (1) t.add(innerState); // (2) t.setProducer(innerState); if (add(innerState)) { // (3) if (strategy == BackpressureStrategy.BUFFER) { // (4) innerState.drain(); } else if (innerState.unsubscribed) { // (5) remove(innerState); } } else { Throwable e = error; // (6) if (e != null) { t.onError(e); } else { t.onCompleted(); } } }
- We create and wrap the child subscriber into a SubscriberState that will manage the dispatching of events for each subscriber individually.
- We add it to the child subscriber as a Subscription for unsubscription and request handling.
- We try to add the innerState object into the array of subscribers, which may fail if the subject is concurrently terminating.
- In case we run in a buffering mode, we need to start draining the buffer.
- Even if the add() succeeded, the child might have unsubscribed concurrently and the subscribers array may still contain a reference to it. In this case, we explicitly try to remove it again.
- If the add() failed on line (3), it means the subject reached its terminal state and we need to emit the terminal event to the child.
Implementing the onXXX methods are relatively simple, all conform to a similar pattern:
@Override public void onNext(T t) { if (done) { return; } for (SubscriberState<T> innerState : subscribers) { innerState.onNext(t); } } @Override public void onError(Throwable e) { if (done) { return; } error = e; done = true; for (SubscriberState<T> innerState : terminate()) { innerState.onError(e); } } @Override public void onCompleted() { if (done) { return; } done = true; for (SubscriberState<T> innerState : terminate()) { innerState.onCompleted(); } }
They simply loop over the known subscribers and emit the event to them.
So far, we have just forwarded the event handling to another class. It's time we handle the event delivery to the actual child subscriber in SubscriberState.
The method looks complicated because it handles all strategies together:
So far, we have just forwarded the event handling to another class. It's time we handle the event delivery to the actual child subscriber in SubscriberState.
static final class SubscriberState<T> implements Producer, Subscription, Observer<T> { final Subscriber<? super T> child; // (1) final State<T> state; // (2) final BackpressureStrategy strategy; // (3) final AtomicLong requested = new AtomicLong(); // (4) final AtomicInteger wip = new AtomicInteger(); // (5) volatile boolean unsubscribed; // (6) volatile boolean done; Throwable error; final Queue<T> queue; // (7) public SubscriberState( Subscriber<? super T> child, State<T> state) { this.child = child; this.state = state; this.strategy = state.strategy; Queue<T> q = null; if (strategy == BackpressureStrategy.BUFFER) { // (8) q = new SpscLinkedAtomicQueue<>(); } this.queue = q; } @Override public void onNext(T t) { // TODO Auto-generated method stub } @Override public void onError(Throwable e) { // TODO Auto-generated method stub } @Override public void onCompleted() { // TODO Auto-generated method stub } @Override public void request(long n) { // TODO Auto-generated method stub } @Override public boolean isUnsubscribed() { return unsubscribed; } @Override public void unsubscribe() { // TODO Auto-generated method stub } void drain() { // TODO Auto-generated method stub } }
- We keep a reference to the actual subscriber,
- we want to remove the SubscriberState in case the child unsubscribes from the state,
- we keep a local reference to the BackpressureStrategy to avoid one dereference,
- we track the requested amount,
- we want to implement a queue-drain logic and require a work-in-progress indicator,
- we want to know if the child called unsubscribe(),
- in case the backpressure strategy is BUFFER, we need to store values temporarily and
- finally, we instantiate the queue only if the strategy is BUFFER.
@Override public void onNext(T t) { if (unsubscribed) { return; } switch (strategy) { case BUFFER: queue.offer(t); // (1) drain(); break; case DROP: { long r = requested.get(); // (2) if (r != 0L) { child.onNext(t); if (r != Long.MAX_VALUE) { requested.decrementAndGet(); } } break; } case ERROR: { long r = requested.get(); // (3) if (r != 0L) { child.onNext(t); if (r != Long.MAX_VALUE) { requested.decrementAndGet(); } } else { unsubscribe(); child.onError( new MissingBackpressureException()); } break; } default: } }
The method looks complicated because it handles all strategies together:
- When running in BUFFER mode, we queue the value and call drain.
- In DROP mode, we check and decrement the requested amount and drop values if there are no requests.
- In ERROR mode, if the requested amount is zero, we unsubscribe and send out a MissingBackpressureException.
The implementation of onError() and onCompleted(), again, is really similar and nothing complicated:
@Override public void onError(Throwable e) { if (unsubscribed) { return; } if (strategy == BackpressureStrategy.BUFFER) { error = e; done = true; drain(); } else { child.onError(e); } } @Override public void onCompleted() { if (unsubscribed) { return; } if (strategy == BackpressureStrategy.BUFFER) { done = true; drain(); } else { child.onCompleted(); } }
The next three methods, request(), isUnsubscribed() and unsubscribed() should be familiar to you:
@Override public void request(long n) { if (n < 0) { throw new IllegalArgumentException(); } if (n > 0) { BackpressureUtils.getAndAddRequest(requested, n); if (strategy == BackpressureStrategy.BUFFER) { drain(); } } } @Override public boolean isUnsubscribed() { return unsubscribed; } @Override public void unsubscribe() { if (!unsubscribed) { unsubscribed = true; state.remove(this); if (strategy == BackpressureStrategy.BUFFER) { if (wip.getAndIncrement() == 0) { queue.clear(); } } } }
We only need to call drain and clear the queue in case we run in a BUFFER mode.
Last but not least, let's see the drain() method.
void drain() { if (wip.getAndIncrement() != 0) { return; } int missed = 1; Queue<> q = queue; Subscriber child = this.child; for (;;) { if (checkTerminated(done, q.isEmpty(), child)) { return; } long r = requested.get(); boolean unbounded = r == Long.MAX_VALUE; long e = 0L; while (r != 0) { boolean d = done; T v = q.poll(); boolean empty = v == null; if (checkTerminated(d, empty, child)) { return; } if (empty) { break; } child.onNext(v); r--; e--; } if (e != 0) { if (!unbounded) { requested.addAndGet(e); } } missed = wip.addAndGet(-missed); if (missed == 0) { return; } } }
The drain loop should look quite familiar at this point. Nothing new or special here.
Finally, let's see the checkTerminated() method which now has the responsibility to clean up.
boolean checkTerminated(boolean done, boolean empty, Subscriber<? super T> child) { if (unsubscribed) { queue.clear(); // (1) state.remove(this); return true; } if (done && empty) { unsubscribed = true; // (2) Throwable e = error; if (e != null) { child.onError(e); } else { child.onCompleted(); } return true; } return false; }
If we detect unsubscription, we clear the queue and remove ourselves from the active set of subscribers (1). However, we don't need to call remove when reaching an done and empty state (2) because at this point, the state contains no subscribers anymore and is also terminated.
A word about BehaviorSubject.
BehaviorSubject is a special kind of subject sitting between a PublishSubject and ReplaySubject: it replays the very last onNext event before relaying other events then on. One might think it can be emulated via a size-bound ReplaySubject, however, their terminal behaviors differ. ReplaySubject of 1 will always replay 1 event and 1 terminal event whereas a terminated BehaviorSubject won't retain the very last onNext event and will emit only the terminal even.From concurrency perspective, the single value-replay creates complications when there is a race between a subscription and onNext. Since the requirement is that there can't be missed events from the point on where the subscription happens, one has to somehow capture the last value, emit it and then emit any other value.
In the current 1.x implementation of BehaviorSubject, this is achieved via a per-subscriber lock and a split mode: first and next. When the subscription happens, the subscribing thread tries to enter into the first mode which reads out the last value from the subject and emits it. If there is a concurrent onNext call, it is temporarily blocked. Once the first mode finishes, it switches to next mode and now onNext calls are immediately relayed.
protected void emitNext(Object n, final NotificationLite<T> nl) { if (!fastPath) { synchronized (this) { first = false; if (emitting) { if (queue == null) { queue = new ArrayList<Object>(); } queue.add(n); return; } } fastPath = true; } nl.accept(actual, n); } protected void emitFirst(Object n, final NotificationLite<T> nl) { synchronized (this) { if (!first || emitting) { return; } first = false; emitting = n != null; } if (n != null) { emitLoop(null, n, nl); } }
Basically, it is an asymmetric emitter loop: if the emitNext() wins, emitFirst() won't run and who is to say the first onNext() the child subscribes wasn't the last one when the subscription happens asynchronously?
There is, however, a very subtle bug still lurking in this approach. It is possible emitFirst() will emit the same value twice!
In the right interleaved conditions, onNext sets the last value in the state, emitFirst picks up the last state. Then onNext tries to run emitNext() which finds an emitting state and queues the value. Finally, emitFirst notices there is still work to do and dequeues the value and now we have emitted the same value twice.
The solution, although works, is a bit complicated and can be seen here with RxJava 2.x. Basically one has to add a version tag to the value, lock out onNext() for a small duration and drop old indexed values when emitting. The clear drawback is that we now have another lock in the execution path and in theory, any concurrently subscribing child can now block the emitter thread. A lock-free approach is possible, but it requires allocation of an immutable value+index every time an onNext event appears.
Conclusion
In this post, I've shown how to implement a PublishSubject with 3 different kinds of backpressure-handling strategies. I consider this as the final piece about Subjects.If you look into RxJava 1.x, you may see that standard Subjects aren't implemented this way, however, 2.x Subjects are. This is no accident and the 2.x implementation come from lessons learned from the 1.x implementation.
In the next blog post-series, we're going to utilize the knowledge about Subject internals and I'm going to show how to implement ConnectableObservables.