2015. szeptember 30., szerda

Subjects (part 3 - final)

Introduction


In this final part about Subjects, I'll show how to implement a PublishSubject. To make things a bit interesting, I'll implement it in a way that prevents PublishSubject overflowing its child subscribers in case they happen to be not requesting fast enough.


PublishSubject

The main body of the PublishSubject will look very similar to UnicastSubject from the last post, therefore, I'm going to skip those details and focus on the State class.

The State class will look differently because in PublishSubject, there could be many Subscribers at once, each with its own unsubscribed, requested and wip counters. Therefore State can't implement Producer and Subscription anymore. We need to implement those in a new class, SubscriberState, and have an instance for each child Subscriber.

One thing before detailing the classes, namely the backpressure handling strategy. We want to give the user the ability to specify the backpressure-behavior of the PublishSubject, so he/she doesn't have to apply onBackpressureXXX methods on the output. For this, I define an enum with 3 values:


enum BackpressureStrategy {
    DROP,
    BUFFER,
    ERROR
}

public static <T> PublishSubject&ltT> createWith(
        BackpressureStrategy strategy) {
    State&ltT> state = new State<>(strategy);
    return new PublishSubject<>(state);
}

The values should be self-explanatory: drop values, buffer values until the child can consume it or kick out the child Subscriber with an error.

Now let's see the skeleton of the State class:


static final class State<T> 
implements OnSubscribe<T>, Observer<T> {
    final BackpressureStrategy strategy;
    
    @SuppressWarnings("unchecked")
    volatile SubscriberState<T>[] subscribers = EMPTY;
    
    @SuppressWarnings("rawtypes")
    static final SubscriberState[] EMPTY = new SubscriberState[0];

    @SuppressWarnings("rawtypes")
    static final SubscriberState[] TERMINATED = 
        new SubscriberState[0];
    
    volatile boolean done;
    Throwable error;
    
    public State(BackpressureStrategy strategy) {
        this.strategy = strategy;
    }
    
    boolean add(SubscriberState<T> subscriber) {
        // TODO Auto-generated method stub
        
    }
    
    void remove(SubscriberState<T> subscriber) {
        // TODO Auto-generated method stub
        
    }
    
    Subscriber<T>[] terminate() {
        // TODO Auto-generated method stub
        
    }
    
    
    @Override
    public void call(Subscriber<? super T> t) {
        // TODO Auto-generated method stub
        
    }
    
    @Override
    public void onNext(T t) {
        // TODO Auto-generated method stub
        
    }
    
    @Override
    public void onError(Throwable e) {
        // TODO Auto-generated method stub
        
    }
    
    @Override
    public void onCompleted() {
        // TODO Auto-generated method stub
        
    }
}

Nothing standing out so far. We have a volatile array of SubscriberState instances, add, remove and terminate methods. By using EMPTY, we will avoid allocating an empty array whenever all subscribers unsubscribe. This pattern should be familiar from an earlier post about Subscription containers. Now let's see the implementation of add().


boolean add(SubscriberState<T> subscriber) {
    synchronized (this) {
        SubscriberState<T>[] a = subscribers;
        if (a == TERMINATED) {
            return false;
        }
        int n = a.length;
        
        @SuppressWarnings("unchecked")
        SubscriberState<T>[] b = new SubscriberState[n + 1];
        
        System.arraycopy(a, 0, b, 0, n);
        b[n] = subscriber;
        subscribers = b;
        return true;
    }
}


For the sake of diversity, the State class will use synchronized instead of an atomic CAS loop. The block is essentially a copy-on-write implementation. The benefit of such implementation is that looping through the current array of subscribers is faster and relies on the observation that many Subjects don't actually serve too many Subscribers at once. If, however, one encounters a case where the number of subscribers is large, one can use any list or set based container inside the block instead. The drawback there is that one needs a safe way to iterate over the collection which may only be possible by doing a defensive copy all the time.

Let's see the remove() method:

@SuppressWarnings("unchecked")
void remove(SubscriberState<T> subscriber) {
    synchronized (this) {
        SubscriberState<T>[] a = subscribers;
        if (a == TERMINATED || a == EMPTY) {
            return;
        }
        int n = a.length;
        
        int j = -1;
        for (int i = 0; i < n; i++) {
            if (a[i] == subscriber) {
                j = i;
                break;
            }
        }
        
        if (j < 0) {
            return;
        }
        SubscriberState<T>[] b;
        if (n == 1) {
            b = EMPTY;
        } else {
            b = new SubscriberState[n - 1];
            System.arraycopy(a, 0, b, 0, j);
            System.arraycopy(a, j + 1, b, j, n - j - 1);
        }
        subscribers = b;
    }
}

Again, it is a copy-on-write implementation with the reuse of the empty array.

Next comes the terimate() method:


@SuppressWarnings("unchecked")
SubscriberState<T>[] terminate() {
    synchronized (this) {
        SubscriberState<T>[] a = subscribers;
        if (a != TERMINATED) {
            subscribers = TERMINATED;
        }
        return a;
    }
}

Here, we check if the current state isn't terminated and if so, we set the terminated array and return the last array of known subscribers.

Implementing the call() method is now possible:


@Override
public void call(Subscriber<? super T> t) {
    SubscriberState<T> innerState = 
        new SubscriberState<>(t, this);                // (1)
    t.add(innerState);                                 // (2)
    t.setProducer(innerState);
    
    if (add(innerState)) {                             // (3)
        if (strategy == BackpressureStrategy.BUFFER) { // (4)
            innerState.drain();
        } else
        if (innerState.unsubscribed) {                 // (5)
            remove(innerState);
        }
    } else {
        Throwable e = error;                           // (6)
        if (e != null) {
            t.onError(e);
        } else {
            t.onCompleted();
        }
    }
}


  1. We create and wrap the child subscriber into a SubscriberState that will manage the dispatching of events for each subscriber individually.
  2. We add it to the child subscriber as a Subscription for unsubscription and request handling.
  3. We try to add the innerState object into the array of subscribers, which may fail if the subject is concurrently terminating.
  4. In case we run in a buffering mode, we need to start draining the buffer.
  5. Even if the add() succeeded, the child might have unsubscribed concurrently and the subscribers array may still contain a reference to it. In this case, we explicitly try to remove it again.
  6. If the add() failed on line (3), it means the subject reached its terminal state and we need to emit the terminal event to the child.
Implementing the onXXX methods are relatively simple, all conform to a similar pattern:

@Override
public void onNext(T t) {
    if (done) {
        return;
    }
    for (SubscriberState<T> innerState : subscribers) {
        innerState.onNext(t);
    }
}

@Override
public void onError(Throwable e) {
    if (done) {
        return;
    }
    error = e;
    done = true;
    for (SubscriberState<T> innerState : terminate()) {
        innerState.onError(e);
    }
}

@Override
public void onCompleted() {
    if (done) {
        return;
    }
    done = true;
    for (SubscriberState<T> innerState : terminate()) {
        innerState.onCompleted();
    }
}

They simply loop over the known subscribers and emit the event to them.

So far, we have just forwarded the event handling to another class. It's time we handle the event delivery to the actual child subscriber in SubscriberState.


static final class SubscriberState<T> 
implements Producer, Subscription, Observer<T> {
    final Subscriber<? super T> child;                      // (1)
    final State<T> state;                                   // (2)
    final BackpressureStrategy strategy;                    // (3)
    
    final AtomicLong requested = new AtomicLong();          // (4)
    
    final AtomicInteger wip = new AtomicInteger();          // (5)
    
    volatile boolean unsubscribed;                          // (6)

    volatile boolean done;
    Throwable error;

    final Queue<T> queue;                                   // (7)
    
    public SubscriberState(
            Subscriber<? super T> child, State<T> state) {
        this.child = child;
        this.state = state;
        this.strategy = state.strategy;
        Queue<T> q = null;
        if (strategy == BackpressureStrategy.BUFFER) {      // (8)
            q = new SpscLinkedAtomicQueue<>();
        }
        this.queue = q;
    }
    
    @Override
    public void onNext(T t) {
        // TODO Auto-generated method stub
    }
    
    @Override
    public void onError(Throwable e) {
        // TODO Auto-generated method stub
    }
    
    @Override
    public void onCompleted() {
        // TODO Auto-generated method stub
    }
    
    @Override
    public void request(long n) {
        // TODO Auto-generated method stub
    }
    
    @Override
    public boolean isUnsubscribed() {
        return unsubscribed;
    }
    
    @Override
    public void unsubscribe() {
        // TODO Auto-generated method stub
    }

    void drain() {
        // TODO Auto-generated method stub
    }
}

  1. We keep a reference to the actual subscriber,
  2. we want to remove the SubscriberState in case the child unsubscribes from the state,
  3. we keep a local reference to the BackpressureStrategy to avoid one dereference,
  4. we track the requested amount,
  5. we want to implement a queue-drain logic and require a work-in-progress indicator, 
  6. we want to know if the child called unsubscribe(),
  7. in case the backpressure strategy is BUFFER, we need to store values temporarily and
  8. finally, we instantiate the queue only if the strategy is BUFFER.
Now let's see the unimplemented methods one by one:


@Override
public void onNext(T t) {
    if (unsubscribed) {
        return;
    }
    switch (strategy) {
    case BUFFER:
        queue.offer(t);                               // (1)
        drain();
        break;
    case DROP: {
        long r = requested.get();                     // (2)
        if (r != 0L) {
            child.onNext(t);
            if (r != Long.MAX_VALUE) {
                requested.decrementAndGet();
            }
        }
        break;
    }
    case ERROR: {
        long r = requested.get();                     // (3)
        if (r != 0L) {
            child.onNext(t);
            if (r != Long.MAX_VALUE) {
                requested.decrementAndGet();
            }
        } else {
            unsubscribe();
            child.onError(
                new MissingBackpressureException());
        }
        
        break;
    }
    default:
    }
}

The method looks complicated because it handles all strategies together:

  1. When running in BUFFER mode, we queue the value and call drain.
  2. In DROP mode, we check and decrement the requested amount and drop values if there are no requests.
  3. In ERROR mode, if the requested amount is zero, we unsubscribe and send out a MissingBackpressureException.
The implementation of onError() and onCompleted(), again, is really similar and nothing complicated:

@Override
public void onError(Throwable e) {
    if (unsubscribed) {
        return;
    }
    if (strategy == BackpressureStrategy.BUFFER) {
        error = e;
        done = true;
        drain();
    } else {
        child.onError(e);
    }
}

@Override
public void onCompleted() {
    if (unsubscribed) {
        return;
    }
    if (strategy == BackpressureStrategy.BUFFER) {
        done = true;
        drain();
    } else {
        child.onCompleted();
    }
}

The next three methods, request(), isUnsubscribed() and unsubscribed() should be familiar to you:


@Override
public void request(long n) {
    if (n < 0) {
        throw new IllegalArgumentException();
    }
    if (n > 0) {
        BackpressureUtils.getAndAddRequest(requested, n);
        if (strategy == BackpressureStrategy.BUFFER) {
            drain();
        }
    }
}

@Override
public boolean isUnsubscribed() {
    return unsubscribed;
}

@Override
public void unsubscribe() {
    if (!unsubscribed) {
        unsubscribed = true;
        state.remove(this);
        if (strategy == BackpressureStrategy.BUFFER) {
            if (wip.getAndIncrement() == 0) {
                queue.clear();
            }
        }
    }
}

We only need to call drain and clear the queue in case we run in a BUFFER mode.

Last but not least, let's see the drain() method.


void drain() {
    if (wip.getAndIncrement() != 0) {
        return;
    }
    
    int missed = 1;

    Queue<> q = queue;
    Subscriber child = this.child;

    for (;;) {

        if (checkTerminated(done, q.isEmpty(), child)) {
            return;
        }

        long r = requested.get();
        boolean unbounded = r == Long.MAX_VALUE;
        long e = 0L;

        while (r != 0) {
            boolean d = done;
            T v = q.poll();
            boolean empty = v == null;

            if (checkTerminated(d, empty, child)) {
                return;
            }

            if (empty) {
                break;
            }

            child.onNext(v);

            r--;
            e--;
        }

        if (e != 0) {
            if (!unbounded) {
                requested.addAndGet(e);
            }
        }

        missed = wip.addAndGet(-missed);
        if (missed == 0) {
            return;
        }
    }
}

The drain loop should look quite familiar at this point. Nothing new or special here.

Finally, let's see the checkTerminated() method which now has the responsibility to clean up.


boolean checkTerminated(boolean done, 
        boolean empty, 
        Subscriber<? super T> child) {
    if (unsubscribed) {
        queue.clear();                     // (1)
        state.remove(this);
        return true;
    }
    if (done && empty) {
        unsubscribed = true;               // (2)
        Throwable e = error;
        if (e != null) {
            child.onError(e);
        } else {
            child.onCompleted();
        }
        return true;
    }
    return false;
}

If we detect unsubscription, we clear the queue and remove ourselves from the active set of subscribers (1). However, we don't need to call remove when reaching an done and empty state (2) because at this point, the state contains no subscribers anymore and is also terminated.

A word about BehaviorSubject.

BehaviorSubject is a special kind of subject sitting between a PublishSubject and ReplaySubject: it replays the very last onNext event before relaying other events then on. One might think it can be emulated via a size-bound ReplaySubject, however, their terminal behaviors differ. ReplaySubject of 1 will always replay 1 event and 1 terminal event whereas a terminated BehaviorSubject won't retain the very last onNext event and will emit only the terminal even.

From concurrency perspective, the single value-replay creates complications when there is a race between a subscription and onNext. Since the requirement is that there can't be missed events from the point on where the subscription happens, one has to somehow capture the last value, emit it and then emit any other value.

In the current 1.x implementation of BehaviorSubject, this is achieved via a per-subscriber lock and a split mode: first and next. When the subscription happens, the subscribing thread tries to enter into the first mode which reads out the last value from the subject and emits it. If there is a concurrent onNext call, it is temporarily blocked. Once the first mode finishes, it switches to next mode and now onNext calls are immediately relayed.


protected void emitNext(Object n, final NotificationLite<T> nl) {
    if (!fastPath) {
        synchronized (this) {
            first = false;
            if (emitting) {
                if (queue == null) {
                    queue = new ArrayList<Object>();
                }
                queue.add(n);
                return;
            }
        }
        fastPath = true;
    }
    nl.accept(actual, n);
}

protected void emitFirst(Object n, final NotificationLite<T> nl) {
    synchronized (this) {
        if (!first || emitting) {
            return;
        }
        first = false;
        emitting = n != null;
    }
    if (n != null) {
        emitLoop(null, n, nl);
    }
}

Basically, it is an asymmetric emitter loop: if the emitNext() wins, emitFirst() won't run and who is to say the first onNext() the child subscribes wasn't the last one when the subscription happens asynchronously?

There is, however, a very subtle bug still lurking in this approach. It is possible emitFirst() will emit the same value twice!

In the right interleaved conditions, onNext sets the last value in the state, emitFirst picks up the last state. Then onNext tries to run emitNext() which finds an emitting state and queues the value. Finally, emitFirst notices there is still work to do and dequeues the value and now we have emitted the same value twice.

The solution, although works, is a bit complicated and can be seen here with RxJava 2.x. Basically one has to add a version tag to the value, lock out onNext() for a small duration and drop old indexed values when emitting. The clear drawback is that we now have another lock in the execution path and in theory, any concurrently subscribing child can now block the emitter thread. A lock-free approach is possible, but it requires allocation of an immutable value+index every time an onNext event appears.

Conclusion

In this post, I've shown how to implement a PublishSubject with 3 different kinds of backpressure-handling strategies. I consider this as the final piece about Subjects.

If you look into RxJava 1.x, you may see that standard Subjects aren't implemented this way, however, 2.x Subjects are. This is no accident and the 2.x implementation come from lessons learned from the 1.x implementation.

In the next blog post-series, we're going to utilize the knowledge about Subject internals and I'm going to show how to implement ConnectableObservables.

Subjects (part 2)

Introduction


Sorry for the delay in posting, I was busy with my professional work and I've been implementing a fully reactive-streams compliant RxJava 2.0 in the meantime.

In this blog post, I'm going to talk about the requirements of building Subjects, the related structures and algorithms and I'm going to build a backpressure-aware special subject, UnicastSubject, with them.

Requirements

Since subjects implement both Observer and Observable, they have to conform to both:

  • [Observer] onXXX events have to be sequential and expected to be sequential
  • [Observer] should conform the pattern onNext* (onError | onCompleted)?
  • [Observable] subscribing to it should be thread-safe
In addition, since subjects can reach a terminal state via onError() or onCompleted(), we must deal with the situation when a Subscriber subscribes to a Subject after such event. Clearly, keeping the Subscriber hanging at this point isn't a good idea. The standard RxJava subjects, therefore, re-emit their terminal event to such late-commers (ReplaySubject may emit onNext events before that though).

Given that, we want the UnicastSubject to allow only a single Subscriber, buffer incoming events until this single Subscriber subscribes and replay/relay events while conforming to the backpressure requests of the Subscriber.

Luckily, we already saw all components needed for implementing the UnicastSubject: tracking the Subscriber's presence and using queue-drain to replay/relay events to it.



UnicastSubject


Let's start by building a skeleton class UnicastSubject:

public final class UnicastSubject<T> 
extends Subject<T, T> {

    public static <T> UnicastSubject<T> create() {   // (1)
        State<T> state = new State<>();
        
        return new UnicastSubject<>(state);
    }
    
    final State<T> state;                            // (2)
    
    protected UnicastSubject(State<T> state) {       // (3)
        super(state);
        this.state = state;
    }
    
    @Override
    public void onNext(T t) {
        // implement
    }
    
    @Override
    public void onError(Throwable e) {
        // implement
    }
    
    @Override
    public void onCompleted() {
        // implement
    }
    
    @Override
    public boolean hasObservers() {
        // implement
    }
    
    static class State<T> implements 
        OnSubscribe<T>, Observer<T>, 
        Producer, Subscription {                       // (4)
        
    }
}

The main drawback of the Java language regarding RxJava is that there are no extension methods: methods that appear to be part of a class but in reality, they are static methods somewhere else and the compiler turns a fluent invocation to them into a regular imperative static method call. Therefore, a fluent API requires a wrapper class, Observable, to hold onto all operators and methods.

Since we need to customize the subscription actions for each custom Observable, the Observable class has a protected constructor with a callback, OnSubscribe<T> on it. However, Subjects need to both handle the OnSubscribe<T> calls and the onXXX methods at the same time. Java forbids calling instance methods before the constructor calls super, therefore, the following code doesn't compile:

public class MyObservable<T> extends Observable<T> {
    protected MyObservable() {
        super(this::handleSubscriber);
    }

    void handleSubscriber(Subscriber<? super T> s) {
    }
}

The workaround is somewhat awkward, but nonetheless working: use static factory methods to create Subjects and have a shared state object between which is then used as an OnSubscribe<T> target and serves as the state of the Observable itself (1).

Note that most 1.x Observable extensions, such as the standard Subjects use separate objects for state and subscription handling. 2.x has been an improvement in this regard and Subjects also use a single state object for both tasks, similar to what I'm showing here.

Given a single State<T> object, we use it as the OnSubscribe<T> callback and store it inside the UnicastSubject (2, 3). The State class itself implements a bunch of interfaces (4):


  • OnSubscribe for handling subscription, 
  • Observer for conveniently have onXXX methods that will be delegated to from the UnicastSubject.onXXX methods, 
  • Producer, since we know there will be only a single Subscriber which needs only a single Producer to communicate with (saving on allocation and on cross-communication) and
  • Subscription for handling the unsubscription call coming from the child directly (again saving on allocation and cross-communication).
The implementation of the main onXXX methods is straightforward delegation into the state object:

    // ...
    @Override
    public void onNext(T t) {
        state.onNext(t);
    }
    
    @Override
    public void onError(Throwable e) {
        state.onError(e);
    }
    
    @Override
    public void onCompleted() {
        state.onCompleted();
    }
    
    @Override
    public boolean hasObservers() {
        return state.child != null;
    }
    // ...


(For brevity, I'm not going to implement the Subject state peeking methods in this post.)

Now let's build the State class. First, I'll add the state variables:


static final class State<T> implements 
    OnSubscribe<T>, Observer<T>, Producer, Subscription {

        volatile Subscriber<? super T> child;                  // (1)
        
        final AtomicBoolean once = new AtomicBoolean();        // (2)
        
        final Queue<T> queue = new SpscLinkedAtomicQueue<>();  // (3)
        
        volatile boolean done;                                 // (4)
        Throwable error;
        
        volatile boolean unsubscribed;                         // (5)

        final AtomicLong requested = new AtomicLong();         // (6)

        final AtomicInteger wip = new AtomicInteger();         // (7)
        // ...

We have a couple of components here:

  1. The child field will hold onto the only subscriber when it arrives and will be set back to null once it leaves. It has to be volatile because hasObservers() needs to check it in thread-safe manner.
  2. We need to make sure only a single Subscriber is let in during the entire lifetime of the subject and we do this via an atomic boolean. Naturally, this field could have been inlined into State by State extends AtomicBoolean. An alternative, although requires more work would have been to use child directly and have a private static Subscriber instance that indicates a terminal state.
  3. This queue will hold onto the incoming values until there is a Subscriber or said subscriber requests some elements. Here, I'm using a single-producer single-consumer linked queue implementation from RxJava, but this queue is relatively expensive due to constant node allocation. In an upcoming PR and in RxJava 2.x, we have a SpscLinkedArrayQueue (it is a slightly modified version of the JCTools SpscUnboundedArrayQueue) that amortizes this allocation by using 'islands' of Spsc buffers.
  4. We hold onto the terminal event (which might be an error) in these two fields. Since error will be written once before done and read after done, it doesn't need to be a volatile in itself.
  5. Since the child may go at any time, it would be infeasible to keep buffering events indefinitely since no one else could observe them after that. This flag, along with the done flag will be used in the onXXX methods to drop events.
  6. We need to keep track of the requested amount of the child subscriber so we emit values no more than requested.
  7. The wip field is part of the queue-drain approach explained in an earlier post and makes sure only a single thread is emitting values to the child subscriber, if any.
In the next step, let's implement the call() method of OnSubscribe and manage the incoming Subscribers:


@Override
public void call(Subscriber<? super T> t) {
    if (!once.get() && once.compareAndSet(false, true)) {  // (1)
        t.add(this);
        t.setProducer(this);                               // (2)
        child = t;
        drain();                                           // (3)
    } else {
        if (done) {                                        // (4)
            Throwable e = error;
            if (e != null) {                               // (5)
                t.onError(e);
            } else {
                t.onCompleted();
            }
        } else {
            t.onError(new IllegalStateException(
                "Only one subscriber allowed."));          // (6)
        }
    }
}


  1. If once is not set and we succeed setting it to true atomically, we now have our single child subscriber.
  2. It is important that setting up the unsubscription callback and the producer happens before the store to the child field, otherwise, an asynchronous onNext call may end up running before this setup. This isn't much of a problem in RxJava 1.x but it is of a large concern in a reactive-streams compliant Publisher (to which we'd like to port our code more easily.)
  3. Once the producer/unsubscription are set up, we set the child reference. Whether or not the child is cancelled at this point, we need to call drain which will take care of replaying buffered values and cleaning up if the child has unsubscribed in the meantime.
  4. If there is/was a subscriber already there, we need to check if the subject is actually terminated.
  5. If the subject is terminated, we simply emit the terminal event (error or completion), similar to how the standard subjects behave.
  6. Otherwise, if there is a subscriber and the subject isn't terminated, we simple emit an IllegalStateException explaining the situation.
Next comes onNext, which could be implemented in a simple way and in a more complicated way. Let's see the simpler way:


@Override
public void onNext(T t) {
    if (done || unsubscribed) {
        return;
    }
    queue.offer(t);
    drain();
}

Not very exciting, is it? If the subject isn't done or isn't unsubscribed, offer the value to the queue and call drain(). (Note that I'm omitting the null handling here for brevity, again).

The complicated way is to have a fast-path implementation that bypasses the queue if there is no contention, but more importantly, when the child has caught up and thus the queue is empty. Note, however, that such caught-up state is not permanent because the child can slow down and not request in time. In this case, we still have to put the value into the queue.


@Override
public void onNext(T t) {
    if (done || unsubscribed) {
        return;
    }
    if (wip.get() == 0 && wip.compareAndSet(0, 1)) {  // (1)
        long r = requested.get();                     // (2)
        if (r != 0 && queue.isEmpty()) {              // (3)
            child.onNext(t);
            if (r != Long.MAX_VALUE) {                // (4)
                requested.decrementAndGet();
            }
            if (wip.decrementAndGet() == 0) {         // (5)
                return;
            }
        } else {
            queue.offer(t);                           // (6)
        }
    } else {
        queue.offer(t);                               // (7)
        if (wip.getAndIncrement() != 0) {
            return;
        }
    }
    drainLoop();                                      // (8)
}

This looks more interesting. Here is how it works:

  1. This is the entry to the fast-path; if we manage to set the wip counter from 0 to 1, we are in.
  2. We retrieve the requested amount.
  3. We need to check if the requested amount is non-zero and if the queue is empty. This queue check is crucial because otherwise the fast-path would skip the contents of the queue and thus reorder the events.
  4. If the queue happens to be empty and the child runs in bounded mode, we emit the event and decrement the requested amount.
  5. We also decrement the wip count and if it is zero, we simply return. If it is non-zero, it means a concurrent request() call arrived and we need to deal with it. The execution stay's in "emission mode" and continues on line (8)
  6. If there is no request or the queue is non-empty, we enqueue the current value and let the drainLoop() deal with the situation (8).
  7. If we couldn't enter the fast-path, we offer the value into the queue and try to enter the drain loop by incrementing wip. If it was actually zero, we enter the drain loop, otherwise, we did indicate a running drain loop there is more work to do.
  8. Finally, while still in "emission mode", we jump to the drain loop.

The implementation of the onError and onCompleted isn't that different from the simpler onNext implementation:

@Override
public void onError(Throwable e) {
    if (done || unsubscribed) {
        return;
    }
    error = e;
    done = true;
    drain();
}

@Override
public void onCompleted() {
    if (done || unsubscribed) {
        return;
    }
    done = true;
    drain();
}

They are quite straightforward: unless done or unsubscribed, set an error, then done and then call drain().

Handling child requests and unsubscription isn't that complicated either:


@Override
public void request(long n) {
    if (n < 0) {
        throw new IllegalArgumentException("n >= 0 required");
    }
    if (n > 0) {
        BackpressureUtils.getAndAddRequest(requested, n);      // (1)
        drain();
    }
}

@Override
public boolean isUnsubscribed() {
    return unsubscribed;
}

@Override
public void unsubscribe() {
    if (!unsubscribed) {
        unsubscribed = true;
        if (wip.getAndIncrement() == 0) {                      // (2)
            clear();
        }
    }
}

In request() (1), we add the amount to the requested value and then call drain. The unsubscribe() is a bit more interesting (2). We set the unsubscribed flag (the get and then set isn't atomic, but it doesn't matter here) then increment the wip counter, which may seem odd. The idea here is that if there is no contention, the transition from 0 to 1 makes sure the cleanup code runs only once and also prevents any further attempts to enter the emission loop by other means. If there is a drain loop running, this will indicate more work is available and since drainLoop() will check for unsubscription before any other action, drainLoop() will call the cleanup for us.

Moving on, the clear() method and the drain() methods are also short:


void clear() {
    queue.clear();
    child = null;
}

void drain() {
    if (wip.getAndIncrement() == 0) {
        drainLoop();
    }
}

The clear() method clears the queue and nulls out the child. Since this only runs while wip != 0, there is no chance the child gets null and onNext tries to invoke a method on it, yielding NullPointerException. The drain() method simply increments wip and if it was zero, enters the drainLoop().

Okay, this is what we've been waiting for, the drainLoop method:


void drainLoop() {
    int missed = 1;                                           // (1)
    
    final Queue<T> q = queue;
    Subscriber<? super T> child = this.child;                 // (2)
    
    for (;;) {
        
        if (child != null) {                                  // (3)
            
            if (checkTerminated(done, q.isEmpty(), child)) {  // (4)
                return;
            }
            
            long r = requested.get();
            boolean unbounded = r == Long.MAX_VALUE;
            long e = 0L;                                      // (5)
            
            while (r != 0L) {
                boolean d = done;
                T v = q.poll();
                boolean empty = v == null;                    // (6)
                
                if (checkTerminated(d, empty, child)) {
                    return;
                }
                
                if (empty) {
                    break;
                }
                
                child.onNext(v);
                
                r--;
                e--;                                          // (7)
            }
            
            if (e != 0) {
                if (!unbounded) {
                    requested.addAndGet(e);                   // (8)
                }
            }
        }
        
        missed = wip.addAndGet(-missed);                      // (9)
        if (missed == 0) {
            return;
        }
        
        if (child == null) {                                  // (10)
            child = this.child;
        }
    }
}

Should look quite familiar. Let's dig into it:

  1. Instead of decrementing the wip counter one by one, we fist assume we only missed 1 drain call. Later on, if we happened to miss multiple calls, missed will be bigger and we'll subtract all of them at once on line (9) reducing the likelihood of looping too many times.
  2. We cache the child and queue fields to avoid re-reading them all the time.
  3. If we don't have a child subscriber, there is nothing we can do as of then.
  4. We have to check if a terminal state has been reached. Since onError and onCompleted travel without requests, we need to do the check before checking the requested amount and quit the loop if so. It is important to remember that checking the done flag before the emptiness of the queue is mandatory, because values added to the queue happen before the setting of the done flag.
  5. We read the requested amount, check if it is unbounded and prepare an emission counter.
  6. We read the done flag, poll an element from the queue and if that value is null, we set an empty flag. The call to checkTerminated, again makes sure the unsubscription and terminal events are handled.
  7. We decrement the requested amount and the emission count. By decrementing instead of incrementing, we save a negation on line (8).
  8. If there was emission and the request amount is not unbounded, we decrement the requested amount be the emission amount (e is negative).
  9. Once all that could be done emission-wise, we update the wip counter by subtracting the known missed amount. It is possible more calls were missed and thus the return value won't be zero. In this case, we loop again, otherwise we quit the drain loop.
  10. If we missed an event, it is possible a subscriber arrived thus the child field has to be re-read if we know it as being null locally.
The final method is the checkTerminated and we are done. Depending on one wants to delay error or not, there are two implementations possible. If errors should be delayed, the method looks like this:

boolean checkTerminated(boolean done, boolean empty, 
        Subscriber<? super T> child) {
    if (unsubscribed) {                              // (1)
        clear();
        return true;
    }
    if (done && empty) {                             // (2)
        unsubscribed = true;
        this.child = null;
        Throwable e = error;
        if (e != null) {
            child.onError(e);
        } else {
            child.onCompleted();
        }
        return true;
    }
    return false;
}

Here, we first check for the unsubscribed flag, if set, we clear the queue, null out the child field and indicate the loop should simply quit (1). Otherwise, we check if the source is done and the queue is empty, at which point we set the unsubscribed flag (for convenience), null out the child and emit the terminal event. Any other case will keep the loop running (i.e., done but queue not empty).

The alternative implementation sends an error as soon as it is detected, ignoring the contents of the queue.


boolean checkTerminated(boolean done, boolean empty, 
        Subscriber<? super T> child) {
    if (unsubscribed) {
        clear();
        return true;
    }
    if (done) {
        Throwable e = error;                            // (1)
        if (e != null) {
            unsubscribed = true;
            clear();
            child.onError(e);
            return true;
        } else 
        if (empty) {                                    // (2)
            unsubscribed = true;
            this.child = null;
            child.onCompleted();
            return true;
        }
    }
    return false;
}

If the done flag is set, we check if there is an associated error as well. In this case, we clear the queue, null out the child via clear() and emit the error (1). Otherwise, we need to check if the queue is empty since we first have to emit the events and only then complete the child (2). There is no need to clear the queue since we know it to be empty at this point, hence no call to clear().


Conclusion

In this blog post, I've shown how to build a custom UnicastSubject that honors backpressure and allows only a single Subscriber to subscribe to it.

In the next blog post, I'll show how to handle multiple Subscribers within a subject by re-implementing the PublishSubject.