2015. május 12., kedd

Operator concurrency primitives: producers (part 1)

Introduction

Backpressure was officially introduced in RxJava 1.0 as the main way of exercising flow control and bound the until-then unbounded work queues of various operators. Operator backpressure implementation is based on the concept of coroutines, of which I don't really know anything about. There is perhaps a more intuitive way to understand backpressure through the implementation and use of various Producers.

If you are somewhat familiar with the reactive-streams-jvm specification, you can discover the similarities between its Subscription and RxJava's Producer. However, since this blog post is mostly about RxJava 1.0, I'm going to explain things from RxJava's perspective and with RxJava's own constructs.

Producer is an interface in RxJava and is the main way to communicate backpressure-related information, aka request for n more elements, across the operator chain. It has a sole method called request(long n) and is provided to Subscribers through the setProducer method from an upstream operator when the operator chain is established during subscription.

Now before one rejoices Producer being a @FunctionalInterface, I have to disappoint said person and tell that it is highly unlikely one can come up with a meaningful producer without state or other methods supporting cancellation and indicator for values-produced.

One can think of the Producer as a means to have long values flow upstream (against the flow of onXXX events travelling downstream), but I instead think of both (value and request) streams as always coming down; it helps reason about their concurrent behavior in respect of each other (i.e., as if you single stepped both next to each other in some debugger) which has to happen with most advanced operators to ensure their correctness.

So instead of such conceptual approach, I'll demonstrate producers by implementing a very basic producer that emits a certain number of values, just like Observable.range().

The range producer

When I first learned about producers in RxJava, I struggled with understanding most of the few implemented by the Netflix team, but luckily, I understood the producers behind range() and from() almost immediately.

Basically, in the producer of the Observable.range() factory method, the downstream asks for n values and the producer produces n (ever incrementing) values:

Observable<Integer> range = Observable.create(child -> {   // (1)
    int[] index = new int[] { 0 };                         // (2)
    Producer p = n -> {                                    // (3)
       int j = index[0];
       for (int i = 0; i < n; i++) {                       // (4)
           child.onNext(j);
           j++;
           if (j == 100) {                                 // (5)
              child.onCompleted();
              return;
           }
       }
       index[0] = j;                                       // (6)
    };

    child.setProducer(p);                                  // (7)
});

Warning! This example lacks some important features I will add later on so please don't run off and implement your producers like this!

Let's step through this (simple) range Observable:

  1. We are using the create() factory method with an OnSubscribe lambda to gain hold onto the subscribing child Subscriber.
  2. We like lambdas but we need to store the index where each request ended to allow resuming the sequence of values.
  3. Our Producer implementation is also a lambda (extremely unlikely in real life), which whenever called, reads the last known index.
  4. We need to produce the requested amount of incremental values via a loop from 0 to n - 1, emitting the current value of j and incrementing it after that.
  5. We check if we reached a total of 100 emissions and complete the child.
  6. In case the emission count of j didn't reach 100, we store its value in index and await further requests (by returning to whoever called request()).
  7. Finally, we let the child Subscriber know about our producer p.
If we try to subscribe to our range Observable in a synchronous manner, it seems to work quite well:

range.subscribe(
    System.out::println, 
    Throwable::printStackTrace, 
    () -> System.out.println("Done"));

TestSubscriber<Integer> ts = new TestSubscriber<>();
ts.requestMore(0);

range.subscribe(ts);

ts.requestMore(25);
ts.getOnNextEvents().forEach(System.out::println);

ts.requestMore(10);
ts.getOnNextEvents().forEach(System.out::println);

ts.requestMore(65);
ts.getOnNextEvents().forEach(System.out::println);

However, the moment we try to observe it through a complex chain of operators, including observeOn, quite strange things can happen: it may emit all values, print duplicates or even hang or throw a MissingBackpressureException.

The reason for such undefined behavior is that our Producer is not thread-safe and not reentrant-safe either.

The thread-safety requirement of a Producer.request() method comes from the fact that many asynchronous operator may request from another thread than the producer is producing and thus clashing on the producer's internal state.

The reentrant-safety requirement comes from the potential that certain operators don't request in distinct batches but perhaps request some large value upfront and request tiny bits from their onNext method (i.e. our child.onNext()) which then calls request() again to emit some duplicate values (remember, j is not written back into index yet).

(On a side note, these requirements are strangely not enforced the reactive-streams-jvm specs despite my arguments and experiments in implementing a fluent API on top of the specs. Although some operators can get away by being only reentrant-safe, my opinion is that being conservative and adhering both requirements in non-trivial producers is the general way to go.)

Solving the safety issues with range

We can now apply the principles of serializing access we saw in the previous post to the producer's request method. However, since the requested amount is a number too, we can merge it with the wip counter to save memory. In addition, we can extend AtomicLong and implement Producer directly to save on the allocation of the request counter itself (a win of 24+ bytes per producer instance).

Because the logic in the producer is going to be a bit longer, first I show its surrounding class I'll name RangeProducer:

public final class RangeProducer 
extends AtomicLong implements Producer {
    private static final long serialVersionUID = 1;

    final Subscriber<? super Integer> child;                  // (1)
    int index;                                                // (2)
    int remaining;                                            // (3)
    
    public RangeProducer(
            Subscriber<? super Integer> child, 
            int start, int count) {
        if (count <= 0) {
            throw new IllegalArgumentException();             // (4)
        }
        this.child = child;
        this.index = start;
        this.remaining = count;
    }

    @Override
    public void request(long n) {
        // the logic comes here
    }
}

The structure at this point is quite straightforward:

  1. Since we are going to produce values for the child to consume, we need to get hold onto its reference which was implicit in the very first example.
  2. The index, now an instance, holds the next value to be emitted.
  3. The remaining number of values to emit. We will decrement this after each value has been emitted and quit emitting once it reached zero.
  4. We consider a non-positive count as an illegal argument in this operator which opens up a slight optimization path when the range's OnSubscribe logic is implemented: if count is zero, one can immediately call onCompleted() on the child and thus skipping the cost of handling a producer instance.
First, we need to perform some request accounting:

    // ...
    @Override
    public void request(long n) {
        if (n < 0) { 
            throw new IllegalArgumentException();           // (1)
        }
        if (n == 0) {
            return;                                         // (2)
        }
        long r;
        for (;;) {
            r = get();                                      // (3)
            long u = r + n;                                 // (4)
        
            if (u < 0) {
                u = Long.MAX_VALUE;                         // (5)
            }
        
            if (compareAndSet(r, u)) {                      // (6)
                break;
            }
        }
        // ... will be continued


  1. We check for negative request values. Such value is always a bug and we indicate this by throwing the unchecked IllegalArgumentException.
  2. Requesting zero is a no-op, but we need to filter it out since a 0 to 0 transition would trigger the drain loop for concurrent callers.
  3. We need to track the total number of requests and we hold it in the underlying AtomicLong. A CAS loop is necessary because the request amount has to be capped at Long.MAX_VALUE and the change must be atomic as well.
  4. Given the current request amount, add the new requested amount the request() method received.
  5. Because the addition can overflow, we cap the new request amount at Long.MAX_VALUE, which is can be considered as infinite.
  6. We try to atomically swap in the new request value of u and break the loop if successful. Otherwise, some concurrent request call (or an emitter) might have changed the current request amount and we need to retry.
In fact, this update loop is so common in RxJava that we have a utility method for it: BackpressureUtils.getAndAddRequested().


        if (r != 0) {
            return;                             // (1)
        }

        r = n;                                  // (2)

        for (;;) {
            int i = index;                      // (3)
            int k = remaining;
            int e = 0;

            while (r > 0 && k > 0) {            // (4)
                child.onNext(i);
                k--;
                if (k == 0) {                   // (5)
                    child.onCompleted();
                    return;
                }
                i++;                            // (6)
                e++;
                r--;
            }

            remaining = k;                      // (7)
            index = i;

            r = addAndGet(-e);                  // (8)

            if (r == 0) {
                break;                          // (9)
            }
        }
    } // end of method
}

I admit, the final part of the request method looks quite complicated, but understanding it gives a significant insight into how producers can be constructed:


  1. Once the loop breaks, we have the previous request amount in r and we can start emitting if it is 0. Otherwise, it means there is an emission going on.
  2. First, we assume the current request amount is the parameter n. Of course, a concurrent call might have changed the underlying AtomicLong's value, but we will get a fresh value for it in (8) and thus save on a really unnecessary memory barrier.
  3. We load the state into local variables to avoid re-reading them due to the pontential atomics involved in downstream.
  4. Here comes the drain part of the operation. We loop if there the downstream has requested (r) and we still have remaining values (k) to emit. Once the current value is emitted, we decrement the remaining value counter first.
  5. If no more values remained, we call onCompleted() and quit without changing any other value. Since we don't decrement the underlying request value to zero, the producer remains in drain-state indefinitely and acts as a no-op to any subsequent request() calls after completion.
  6. Otherwise, we increment the index, increment an emission counter and decrement the cached requested amount.
  7. Once the loop finishes we store the cached remaining and index values back to the instance variables. Note that their visibility is ensured by the compareAndSet() and the getAndAdd() method pair: the changes to them get properly published for any subsequent entry on the drain loop at (2).
  8. This is the closing part of the request accounting. We decrement the requested count by the amount emitted and take the difference atomically. It is analogous to the final decrementAndGet() in the plain queue-drain approach and allows either safely quitting the drain loop or continue with it if any concurrent requests arrived in the meantime.
  9. With all the emissions accounted, if we got back zero, it means the downstream is not interested in any new value for now and the loop can quit. If there was a concurrent request just after (8), it will increment the value from 0 to its n and enter the emission loop to continue the work.
Before I conclude this post, there is one missing feature: unsubscription. Downstream can unsubscribe at any time and from any thread asynchronously, yet the example Producer keeps emitting regardless. To avoid wasting CPU cycles we need to check if the child subscriber is still interested in receiving values by calling isUnsubscribed() on it in the inner loop.

            // ...
            if (child.isUnsubscribed()) {              // (1)
                return;
            }
            while (r > 0 && k > 0) {
                child.onNext(i);
                if (child.isUnsubscribed()) {          // (2)
                    return;
                }
                k--;
                if (k == 0) {
                    child.onCompleted();
                    return;
                }
            // ... the rest is the same

Depending on your eagerness, the isUnsubscribed() check can be performed at more locations, but generally you'd want to do it just before the inner loop (1) and just after the value emission (2). The former will skip the loop entirely and the latter will skip any subsequent emissions of values and the onCompleted().

Note that RxJava implementation is a bit inconsistent about receiving completion events after unsubscription; most operators will work just fine while others can cause some unintentional side effects. Generally, the guidelines allow for some value and completion event emissions to slip through as unsubscription is considered best effort only.

If you find some operator that reacts strangely to events after it unsubscribed, don't hesitate to post on the RxJava issue list.


Conclusion

In this block post, I've shown how simple Producers can be built step by step and how their logic works. I've also explained a set of pitfalls that such simple Producer has to avoid in order to be compliant with the RxJava requirements. As for closing the post, here is the entire source code of our fully-functional, runnable range operator (gist):

public final class RxRange 
implements OnSubscribe<Integer> {
    final int start;
    final int count;
    public RxRange(int start, int count) {
        if (count < 0) {
            throw new IllegalArgumentException();
        }
        this.start = start;
        this.count = count;
    }
    @Override
    public void call(Subscriber t) {
        if (count == 0) {
            t.onCompleted();
            return;
        }
        RangeProducer p = new RangeProducer(t, start, count);
        t.setProducer(p);
    }
    
    public Observable<integer> toObservable() {
        return Observable.create(this);
    }
    
    static final class RangeProducer 
    extends AtomicLong implements Producer {
        /** */
        private static final long serialVersionUID = 
            5318571951669533517L;
        final Subscriber<? super Integer> child;
        int index;
        int remaining;
        public RangeProducer(
                Subscriber<? super Integer> child,
                int start, int count) {
            this.child = child;
            this.index = start;
            this.remaining = count;
        }
        @Override
        public void request(long n) {
            if (n < 0) {
                throw new IllegalArgumentException();
            }
            if (n == 0) {
                return;
            }
            if (BackpressureUtils.getAndAddRequest(this, n) != 0) {
                return;
            }
            long r = n;
            for (;;) {
                if (child.isUnsubscribed()) {
                    return;
                }
                int i = index;
                int k = remaining;
                int e = 0;
                
                while (r > 0 && k > 0) {
                    child.onNext(i);
                    if (child.isUnsubscribed()) {
                        return;
                    }
                    k--;
                    if (k == 0) {
                        child.onCompleted();
                        return;
                    }
                    e++;
                    i++;
                    r--;
                }
                index = i;
                remaining = k;
                
                r = addAndGet(-e);
                
                if (r == 0) {
                    return;
                }
            }
        }
    }
    
    public static void main(String[] args) {
        Observable<Integer> range = 
            new RxRange(1, 10).toObservable();
        
        range.take(5).subscribe(
            System.out::println,
            Throwable::printStackTrace,
            () -> System.out.println("Done")
        );
    }
}

In the next post, I'll continue to talk about more advanced producers that come up quite frequently in backpressure-aware operators: single-producers and single-delayed-producers.

3 megjegyzés:

  1. Dear David, I have a question, why do you introduce extra local variables `i` and `k` instead of using `index` and `remaining` directly?

    VálaszTörlés
  2. To avoid re-reading and writing those fields and increase the likelihood those end up in CPU registers, making the whole process quite efficient.

    VálaszTörlés