2015. június 15., hétfő

The Reactive-Streams API (part 2)

Introduction

In this blog post, I'm going to take our SingleProducer and SingleDelayedProducer classes and convert them into a reactive-streams based Subscriptions.

At first, one might think the conversion is going to be troublesome, but luckily, if you can already thing in how you'd implement the request() method on a rx.Producer, you are 75% there. The final 25% comes from the idea how you'd move the rx.Subscriber.isUnsubscribed() logic into request() since the rs.Subscriber doesn't extend rx.Subscription (nor any other resource-management interface).

The SingleSubscription

Since the SingleSubscription itself isn't that complicated, I'm going to show it in one go:

import org.reactivestreams.*;

public final class SingleSubscription<T> 
extends AtomicBoolean implements Subscription {
    private static final long serialVersionUID = 1L;
    
    final T value;                                       // (1)
    final Subscriber<? super T> child;
    volatile boolean cancelled;                          // (2)
    
    public SingleSubscription(T value, 
            Subscriber<? super T> child) {               // (3)
        this.value = Objects.requireNonNull(value);
        this.child = Objects.requireNonNull(child);
    }
    @Override
    public void request(long n) {
        if (n <= 0) {
            throw new IllegalArgumentException(
                "n > 0 required");                       // (4)
        }
        if (compareAndSet(false, true)) {
            if (!cancelled) {                            // (5)
                child.onNext(value);
                if (!cancelled) {
                    child.onComplete();
                }
            }
        }
    }
    @Override
    public void cancel() {
        cancelled = true;                                // (6)
    }
}

That's it! Wait, that's it? Yes, it is no accident that I've been showing Producer implementations so far that can be transformed into a reactive-streams Subscription with relatively little effort. But still, here are the explanation for the major properties of this new implementation:

  1. We have an instance field for the constant value and the target Subscriber as before,
  2. however, since isUnsubscribed() is not part of the RS Subscriber and unsubscription comes in the form of a cancel() call, we need to store the cancelled state ourselves, in a volatile field. If you recall, I mentioned that you can't be sure by what and when request() (or in fact, cancel()) will be called, therefore, one needs to make sure things are thread-safe.
  3. Since RS doesn't like null values, we capture them early in the constructor.
  4. My "Let them throw!" philosophy dictates that non-positive requests are programming errors which should yield a nice IllegalArgumentException.
  5. Because there is no child.isUnsubscribed() method anymore, we check the volatile cancelled variable everywhere instead.
  6. Our idempotent cancel just sets the cancelled flag atomically.

The SingleDelayedSubscription

Given the simplicity of SingleSubscription, how hard could it be to convert SingleDelayedProducer?

public final class SingleDelayedSubscription<T> 
extends AtomicInteger implements Subscription {
    /** */
    private static final long serialVersionUID = -1L;
    
    T value;
    final Subscriber<? super T> child;
    
    static final int CANCELLED = -1;                           // (1)
    static final int NO_VALUE_NO_REQUEST = 0;
    static final int NO_VALUE_HAS_REQUEST = 1;
    static final int HAS_VALUE_NO_REQUEST = 2;
    static final int HAS_VALUE_HAS_REQUEST = 3;
    
    public SingleDelayedSubscription(Subscriber<? super T> child) {
        this.child = Objects.requireNonNull(child);
    }
    @Override
    public void request(long n) {
        if (n <= 0) {
            throw new IllegalArgumentException("n > 0 required");
        }
        for (;;) {
            int s = get();
            if (s == NO_VALUE_HAS_REQUEST
                    || s == HAS_VALUE_HAS_REQUEST
                    || s == CANCELLED) {                       // (2)
                return;
            }
            if (s == NO_VALUE_NO_REQUEST) {
                if (!compareAndSet(s, NO_VALUE_HAS_REQUEST)) {
                    continue;
                }
            }
            if (s == HAS_VALUE_NO_REQUEST) {
                if (compareAndSet(s, HAS_VALUE_HAS_REQUEST)) {
                    T v = value;
                    value = null;
                    child.onNext(v);
                    if (get() != CANCELLED) {                  // (3)
                        child.onComplete();
                    }
                }
            }
            return;
        }
    }
    
    public void setValue(T value) {
       Objects.requireNonNull(value);
       for (;;) {
           int s = get();
           if (s == HAS_VALUE_NO_REQUEST
                   || s == HAS_VALUE_HAS_REQUEST
                   || s == CANCELLED) {                        // (4)
               return;
           } else
           if (s == NO_VALUE_NO_REQUEST) {
               this.value = value;
               if (!compareAndSet(s, HAS_VALUE_NO_REQUEST)) {
                   continue;
               }
           } else
           if (s == NO_VALUE_HAS_REQUEST) {
               if (compareAndSet(s, HAS_VALUE_HAS_REQUEST)) {
                   child.onNext(value);
                   if (get() != CANCELLED) {                   // (5)
                       child.onComplete();
                   }
               }
           }
           return;
       }
    }

    @Override
    public void cancel() {
        int state = get();
        if (state != CANCELLED) {                              // (6)
            state = getAndSet(CANCELLED);
            if (state != CANCELLED) {
                value = null;
            }
        }
    }
}

Looks quite similar to the original state-machine, but it has an additional CANCELLED state (1..6), which is atomically swapped in. We don't really need to check for this state before onNext() because the preceding compareAndSet() would fail anyway, but we can check it just before calling onComplete().

Why don't we use a volatile cancelled flag instead of this new state? You could naturally do that and the resulting Subscription would be equally correct. It is a matter of personal preference: you can add an extra instance field or extend the state machine to include a cancelled state. The primary reason here, mostly, is to show an example of this latter alternative.

The RangeSubscription

I'm not going to convert all previous Producers into Subscriptions here, but I'd like to show a second example for including a cancelled state in the state machine.

public final class RangeSubscription 
extends AtomicLong implements Subscription {
    /** */
    private static final long serialVersionUID = 1L;
    
    final Subscriber<? super Integer> child;
    int index;
    final int max;
    
    static final long CANCELLED = Long.MIN_VALUE;          // (1)
    
    public RangeSubscription(
            Subscriber<? super Integer> child, 
            int start, int count) {
        this.child = Objects.requireNonNull(child);
        this.index = start;
        this.max = start + count;
    }
    @Override
    public void request(long n) {
        if (n <= 0) {
            throw new IllegalArgumentException(
                "n > required");
        }
        long r;
        for (;;) {
            r = get();
            if (r == CANCELLED) {                          // (2)
                return;
            }
            long u = r + n;
            if (u < 0) {
                u = Long.MAX_VALUE;
            }
            if (compareAndSet(r, u)) {
                break;
            }
        }
        if (r != 0L) {
            return;
        }
        for (;;) {
            r = get();
            if (r == CANCELLED) {                          // (3)
                 return;
            }
            int i = index;
            int m = max;
            long e = 0;
            while (r > 0L && i < m) {
                child.onNext(i);
                if (get() == CANCELLED) {                  // (4)
                    return;
                }
                i++;
                if (i == m) {
                    child.onComplete();
                    return;
                }
                r--;
                e++;
            }
            index = i;
            if (e != 0) {
                for (;;) {
                    r = get();
                    if (r == CANCELLED) {                  // (5)
                        return;
                    }
                    long u = r - e;
                    if (u < 0) {
                        throw new IllegalStateException(
                                "more produced than requested!");
                    }
                    if (compareAndSet(r, u)) {
                        break;
                    }
                }
            }
            if (r <= 0L) {
                break;
            }
        }
    }
    @Override
    public void cancel() {
        if (get() != CANCELLED) {                          // (6)
            getAndSet(CANCELLED);
        }
    }
}

For brevity, I've omitted the fast-path logic here. The rest is, again, similar to the original RangeProducer structure, but now that the cancellation state is merged into the requested accounting, we need to re-read the current requested amount and check for a CANCELLED value (1) almost everywhere (2..5). Note that the emission accounting can't be a simple getAndAdd() anymore, because even if CANCELLED would be -1, one could, in theory, emit Long.MAX_VALUE - 1 and wrap the counter, losing the cancelled state information. Again, using getAndSet() to swap in the terminal state atomically and in idempotent fashion (6).


Conclusion

In this part, I've shown two approaches to convert from a rx.Producer into an RS Subscription and keep the unsubscription behavior intact. Naturally, they involve tradeoffs: instance size if using a separate cancellation flag or algorithm complexity if cancellation is woven into the state machine.

In the next part, I'm going to show how one can deal with the loss of another rx.Subscriber functionality: the add(rx.Subscriber) option to associate resources with the downstream Subscriber.

15 megjegyzés:

  1. Should RangeSubscription.index be volatile?

    VálaszTörlés
    Válaszok
    1. Index is only accessed inside the emission loop, guarded on both sides with full-barrier atomic increments/decrements.

      Törlés
  2. Can you explain further how index is protected? It looks to me like index is susceptible to stale reads since it is non volatile. Which full-barrier atomic increments/decrements are you referring to?
    Thanks.

    VálaszTörlés
    Válaszok
    1. I was refering to the increment done by the compareAndSet() calls. Why do you think once between the the two compareAndSet, index would be stale? The Java Memory Model guarantees that index is properly read. In its terms, the change to the requested amount happens before the read to index and the write to index happens before the change to the requested amount.

      Törlés
  3. I think I am starting to get it. Are you saying that the AtomicLong.compareAndSet causes a full memory fence which guarantees that index is coherent?

    I have another question. You derive your RangeSubscription from AtomicLong rather than have an AtomicLong contained in RangeSubscription. Is it just a style preference to use inheritance rather than composition, or is there a performance / memory layout advantage that inspired your decision?

    Thanks

    VálaszTörlés
  4. It saves one allocation, combines the headers and gives access to intrinsified Unsafe calls. Lately, AtomicLongFieldUpdater is also relatively close to the Unsafe perf but we can't use them because there are problems with certain Android devices that mess up field names. By not composing but inlining, we can be a lot faster with a less memory.

    VálaszTörlés
  5. Awesome explanation, thanks.

    Back to the non-volatile index, does the AtomicLong.compareAndSet cause a full memory fence which guarantees that index is coherent?

    VálaszTörlés
  6. Thank you very much for your patience! I am learning.

    VálaszTörlés
  7. In the second for loop of `RangeSubscription.request()` implementation, there are two self increment of `i`, we only need the first one, right?

    VálaszTörlés
    Válaszok
    1. Right, fixed. The second one was unnecessary, the first is needed to detect end of sequence and issue an onComplete.

      Törlés
  8. One more concurrency question.

    Imagine if I subscribe to an observable and observeOn(io()). In the subscribe(onNext) Consumer, I interact with a mutable collection.
    I know that io() has multiple threads and different threads could be used on subsequent callbacks to onNext Consumer. Because of this, do I need to synchronize the collection to make it thread-safe (or use a concurrent-safe collection)? Or does rxjava take care of synchronization issues leaving me free to regard it as a single-threaded app?
    Thanks again
    Rodney

    VálaszTörlés
    Válaszok
    1. You get 1 thread from the io() for individual subscribe() and onNext is called in a sequential manner. If you don't use the collection elsewhere but in onNext, no synchronization is needed.

      Törlés
    2. I ran an experiment and confirmed experimentally that the subscribe() onNext is indeed always called from only one thread - always the same thread. Can you point me to the class in rxjava2 that enforces this requirement?
      Many thanks for your wonderful software and level of support!!

      Törlés
  9. Thank you so much for the fast reply. To clarify my understanding, it sounds like onNext Consumer.accept() will always be invoked by the exact same thread id from the io() pool?

    VálaszTörlés