Introduction
After the lenghty first part describing a modestly complex RangeProducer, let's have some time with two, more simpler Producers.You may think, why didn't I start with these? There are two main reasons for this: a) describing the RangeProducer gives more insight (IMO) and b) these producers can be derived from the concepts applied in the RangeProducer.
The single-producer
You should be familiar with the just(T value) operator of RxJava, so let me tell you a secret: it doesn't honor backpressure or unsubscription. Whenever one subscribes to it, it sends out its single onNext and onCompleted events unconditionally:Observable<Integer> source = Observable.just(1); TestSubscriber<Integer> ts = new TestSubscriber<>(); ts.requestMore(0); // (1) source.unsafeSubscribe(ts); ts.getOnNextEvents().forEach(System.out::println); System.out.println("--"); ts.unsubscribe(); // (2) source.unsafeSubscribe(ts); ts.getOnNextEvents().forEach(System.out::println);
Even though we request zero at (1), we get its single value printed regardless and despite calling unsubscribe() on the TestSubscriber at (2), we get that single value again (now the TestSubscriber holds and prints two 1s).
Is just() broken? Partially yes. Its ignorance towards unsubscription can be rationalized by saying just() does its best effort (read nothing) to cancel emissions.
However, from the perspective of backpressure, it is broken but no bugs manifest in current RxJava implementation that could show this (i.e., no MissingBackpressureException) because most bounded operator-buffers can effortlessly hold onto a single value even if unrequested. Since the operator does so little, it excels in benchmarks and use cases that use this single value source.
The next major version of RxJava - 2.0 - will be natively reactive-streams-jvm compliant which strictly forbids this lack of backpressure support. One can use onBackpressureBuffer() to fix this up, but such extra operator is quite an overkill and instead, we can implement a proper Producer for the operator just().
As always, I'm going through the implementation step-by-step. Let's begin with the 'trivial' surrounding class:
public final class SingleProducer<T> extends AtomicBoolean implements Producer { // (1) final Subscriber<? super T> child; // (2) final T value; public SingleProducer( Subscriber<? super T> child, T value) { this.child = child; this.value = value; } @Override public void request(long n) { // logic comes here } } Integer value = 1; Observable<Integer> just = Observable.create(child -> { child.setProducer( new SingleProducer<>(child, value)); // (3) });
Our SingleProducer is built upon an AtomicBoolean directly (1) which saves on allocation and naturally has to hold onto the child Subscriber in order to emit its payload. A just observable now is pretty simple: we need to set the SingleProducer instance on the subscribing child via setProducer and all else will be taken care of - once the request() method is actually filled in:
@Override public void request(long n) { if (n < 0) { throw new IllegalArgumentException(); // (1) } if (n > 0 && compareAndSet(false, true)) { // (2) if (!child.isUnsubscribed()) { child.onNext(value); // (3) } if (!child.isUnsubscribed()) { child.onCompleted(); // (4) } } }
It turns out the request() method is pretty simple too, because the entire request accounting can be reduced to a single CAS from false to true and left in that way:
- We throw an IllegalArgumentException if request is negative, as usual.
- If the request is positive and the CAS succeeds, we can safely enter the 'drain loop'. It doesn't matter if downstream requested 1 or a million, the producer will only emit a single value ever.
- After the eager unsubscription check, we emit the stored value.
- Then conditionally emit an onCompleted() events just in case.
The single-delayed-producer
What if the single value to be emitted isn't known at subscription time but becomes available later on (usually through some async activity). Again, ignoring backpressure and/or relying on onBackpressureBuffer is a possibility, but we'd like to have some efficient infrastructure to make this happen.To accomplish this, we need to think in atomic state transitions that are triggered by the arrival of the value and the arrival of the downstream request. We have 4 possible states to deal with:
- No request arrived and no value is available: constant will be NO_REQUEST_NO_VALUE = 0
- No request arrived but the value is available: constant will be NO_REQUEST_HAS_VALUE = 1
- A positive request arrived but no value is available: constant will be HAS_REQUEST_NO_VALUE = 2
- A positive request arrived and a value is available: constant will be HAS_REQUEST_HAS_VALUE = 3
We will have a state variable (by extending AtomicInteger) and we will try to CAS into one of the 4 states once the conditions are met.
First, we start with the surrounding class and simple demo printout:
public class SingleDelayedProducer<T> extends AtomicInteger implements Producer { private static final long serialVersionUID = 1L; final Subscriber<? super T> child; T value; // (1) static final int NO_REQUEST_NO_VALUE = 0; static final int NO_REQUEST_HAS_VALUE = 1; static final int HAS_REQUEST_NO_VALUE = 2; static final int HAS_REQUEST_HAS_VALUE = 3; public SingleDelayedProducer( Subscriber<? super T> child) { this.child = child; } @Override public void request(long n) { // implement request } public void set(T value) { // implement set } } Observable<Integer> justDelayed = Observable.create(child -> { SingleDelayedProducer<Integer> p = new SingleDelayedProducer<>(child); ForkJoinPool.commonPool().submit(() -> { try { Thread.sleep(500); // (2) } catch (InterruptedException ex) { child.onError(ex); return; } p.set(1); // (3) }); child.setProducer(p); }); justDelayed.subscribe(System.out::println); Thread.sleep(1000);
In SingleDelayedProducer, we can't use final for value as it has to change after construction (1). When we build an Observable with it, the async task started when the child subscribes sleeps a bit (2) before the single value is emitted to the producer (3).
To implement the request() method, we try to transition from NO_REQUEST_NO_VALUE into HAS_REQUEST_NO_VALUE and quit or transition from NO_REQUEST_HAS_VALUE into HAS_REQUEST_HAS_VALUE and emit the stored value:
// ... @Override public void request(long n) { if (n < 0) { throw new IllegalArgumentException(); // (1) } if (n == 0) { return; } for (;;) { // (2) int s = get(); if (s == NO_REQUEST_NO_VALUE) { // (3) if (!compareAndSet( NO_REQUEST_NO_VALUE, HAS_REQUEST_NO_VALUE)) { continue; // (4) } } else if (s == NO_REQUEST_HAS_VALUE) { // (5) if (compareAndSet( NO_REQUEST_HAS_VALUE, HAS_REQUEST_HAS_VALUE)) { if (!child.isUnsubscribed()) { // (6) child.onNext(value); } if (!child.isUnsubscribed()) { child.onCompleted(); } } // (7) } return; // (8) } } // ...
The code looks complicated, but only because those long constant names, so bear with me:
- We do the negative check and zero check on the requested amount as usual.
- We need a for loop because concurrent calls to either request() or set() can change the state and we need to retry the state transition (or simply quit).
- In the first test, we check if there were no-requests and no-values so far and try to transition into the has-request but no-values state. If successful, we can quit safely because the work should be done by any subsequent calls to set().
- If we fail the CAS, it means the state has been changed concurrently and we need to retry and reassess the situation.
- In case the value arrived earlier we try to transition into a has-request and has-value state. If successful,
- we will (conditionally) emit this single value and an onCompleted event, then quit. This CAS will also make sure any further call to request() or set() is essentially a no-op.
- Otherwise, the failed CAS is an indication that the state has changed: either a concurrent request got ahead or emitted the value before us so in either case we can just fall-through and quit the loop (we can do this because there are no back-transitions).
- If the producer is in any other state, we simply quit the loop.
The set() method looks remarkably similar, but checks and attempts different state-transitions.
// ... public void set(T value) { for (;;) { // (1) int s = get(); if (s == NO_REQUEST_NO_VALUE) { this.value = value; // (2) if (!compareAndSet( // (3) NO_REQUEST_NO_VALUE, NO_REQUEST_HAS_VALUE)) { continue; } } else if (s == HAS_REQUEST_NO_VALUE) { // (4) if (compareAndSet( HAS_REQUEST_NO_VALUE, HAS_REQUEST_HAS_VALUE)) { if (!child.isUnsubscribed()) { child.onNext(value); } if (!child.isUnsubscribed()) { child.onCompleted(); } } } return; // (5) } } }
- Since the state can be changed by a concurrent request, we may need to loop.
- First, we set the value we just received. Note that if there are concurrent calls to set(), there is a race on this variable because multiple threads attempt to set it and do the transition. In this example, we are sure there will be only a single call to set() and won't be a problem. If you have a publisher where concurrent set calls are allowed, you need to establish a protocol about whose value should win. (A simple approach would be to make value volatile and let the cache-coherency decide the winning value.)
- If the transition succeeds, we can quit the loop and it is not the responsibility of request() to emit the value. Otherwise, we need to loop again since the state has changed.
- If there was a request already and we can transition into the final has-request and has-value state, the value and completion events can be emitted. If the CAS fails, it means some concurrent call to one of the methods has already done the emission and we can just fall through and quit.
- In any other state, there is nothing to do further and we can quit the loop.
Conclusion
In this blog post, I've implemented and described two single-value-emitting, backpressure- and unsubscription-aware Producers. While the direct value-emitting variant is less commonly used in practice, the delayed version is required in many situations, especially if one wants to emit values from onError() - for example, in onErrorReturn() - or from onCompleted() - for example, in toList() and buffer() - where generally one just forwards the terminal event to the child subscriber.
In the next part, I'm going to extend the delayed producer concept further (the multi-value direct emission is what from() does) and make if capable of emitting not just one, but many elements on downstream request.
I like the state machine with CAS transitions example, makes reasoning about state in the face of concurrency a bit easier.
VálaszTörlés"If the transition succeeds, we can quit the loop and it is not the responsibility of request() to emit the value. ", should it be "If the transition succeeds, we can quit the loop and it is the responsibility of request() to emit the value. "?
VálaszTörlésThe sentence is correct as is. If the CAS fails, the state has changed an should try again. If the CAS succeeds, the execution falls through to the return statement.
Törlés