2017. december 14., csütörtök

Java 9 Flow API: Multicasting via a Processor

Introduction


There are situations when the same elements of a source should be dispatched to multiple consumers. Certainly, if the source supports multiple subscribers and is deterministic (such as our previous async range), one can just instantiate the flow multiple times. However, if the source doesn't support multiple subscribers or each subscription ends up being unique and/or non-deterministic, that simple approach doesn't work anymore.

We need a way to have a single realization of the (upstream) source yet allow multiple consumers. Since we are dealing with Flow.Publishers that require backpressure management, such intermediate solution has to coordinate requests from its Flow.Subscribers in addition to handling the dynamic subscription and unsubscription (cancellation) of said Flow.Subscribers while the flow is active. Enter, MulticastProcessor.


Flow.Processor recap


What is a Processor? By definition, it is a combination of a Flow.Publisher and a Flow.Subscriber, i.e., it can act as a source and can be subscribed to via subscribe() as well as the processor itself can be used with somebody else's Flow.Publisher.subscribe(). It has a mixed history as the idea comes from the original Observer pattern (i.e., java.util.Observable) and Rx.NET's Subject that allows dispatching signals to multiple Observers in an imperative (and synchronous) fashion.

The Flow.Processor in Java 9 defines two type arguments, one for its input side (Flow.Subscriber) and one for its output side (Flow.Publisher). The idea behind it was that a Flow.Processor can act as a transformation step between an upstream and a downstream. However, such transformation often mandates the Flow.Processor implementation only accepts a single Flow.Subscriber during its entire lifetime. Since the implementation has to follow the Reactive Streams specification nonetheless, this adds a lot of overhead to the flow. As demonstrated in previous posts, when a flow is realized, there are only one subscriber per stage in it and thus the Flow.Processor overhead can be completely avoided.

Still, restricting the number of Flow.Subscribers has some use in operators such as groupBy() and window() where the inner Flow.Publishers are actually Flow.Processors that work in an unicast mode: holding onto elements until the consumer is ready to receive them while letting the main source progress with other groups/windows.

Of course, when the items in these inner Flow.Publishers are required by multiple processing stages, a separate multicasting stage, in the form of a Flow.Processor can come in quite handy.


MulticastProcessor


The first step is to define the skeleton of the class.


public final class MulticastProcessor<T> implements Flow.Processor<T, T> {

    final int prefetch;

    final SpscArrayQueue<T> queue;    

    /* Some other fields, see below. */

    public MulticastProcessor(int prefetch) {
        this.prefetch = prefetch;
        this.queue = new SpscArrayQueue<>(prefetch);
    }

    @Override
    public void subscribe(Flow.Subscriber<? super T> subsciber) {
        // TODO implement
    }

    @Override
    public void onSubscribe(Flow.Subscription s) {
        // TODO implement
    }

    @Override
    public void onNext(T item) {
        // TODO implement
    }

    @Override
    public void onError(Throwable t) {
        // TODO implement
    }

    @Override
    public void onComplete() {
        // TODO implement
    }     
}


The class has to implement 5 public methods from the Flow.Processor interface. Since the intended usage is to have multiple Flow.Subscribers and be able to subscribe the MulticastProcessor itself to some other Flow.Publisher, both the upstream's Flow.Subscription and any Flow.Subscribers have to be tracked. The input and output types are the same as there won't be any item transformation happening in this particular Flow.Processor implementation.

We need a queue to store the upstream values because some of the downstream Flow.Subscribers may not be ready to receive items and we don't want any discontinuity in item delivery for them. A JCTools single-producer single-consumer queue suffices because the items coming from the upstream are guaranteed to be single thread at a time and the dispatch logic consuming the queue will be serialized as well by MulticastProcessor.

The two sides can operate asynchronously to each other, we will need some atomics around the state of the MulticastProcessor:

    Flow.Subscription upstream;

    static final VarHandle UPSTREAM = 
        VH.find(MethodHandles.lookup(), MulticastProcessor.class, 
        "upstream", Flow.Subscription.class);

    MulticastSubscription<T>[] subscribers = EMPTY;
    static final VarHandle SUBSCRIBERS = 
        VH.find(MethodHandles.lookup(), MulticastProcessor.class, 
        "subscribers", MulticastSubscription[].class);

    int wip;
    static final VarHandle WIP = 
        VH.find(MethodHandles.lookup(), MulticastProcessor.class, 
        "wip", int.class);

    static final MulticastSubscription[] EMPTY = new MulticastSubscription[0];

    static final MulticastSubscription[] TERMINATED = new MulticastSubscription[0];

    volatile boolean done;
    Throwable error;

    int consumed;


The upstream stores the Flow.Subscription received from the source to be multicasted. Flow.Subscribers can come and go independently and as such, a typical copy-on-write array structure with terminal state can be employed. If the terminal state has been reached, any subsequent subscribe() attempt will either call onError() or onComplete() on the Flow.Subscriber, depending on how the MulticastProcessor itself has been terminated (i.e., the error field is null or not). The VH utility class has been introduced in a previous post.

One could ask, why do we need the wip field? The upstream will invoke onNext synchronously on the MulticastProcessor so a simple loop over the current array of Flow.Subscribers should suffice, right? Not exactly. Becase we want every Flow.Subscriber to receive the same items in the same order without skipping, they will move in a so-called lockstep-mode. If a Flow.Subscriber is not ready to receive, the others won't receive data even if they could. Once this single Flow.Subscriber is ready to receive (it called request()), we have to try and drain the queue for the available items, which can happen from a different thread than the upstream is calling onNext from. Second, if a Flow.Subscriber decides to cancel and was preventing the others from making progress because of not requesting, its leave may enable the others to receive available items right on.

Due to the prefetching nature of MulticastProcessor, we won't be able to use the TERMINAL indicator as a completion indicator because the queue may hold non-dispatched elements when an onComplete() or onError() arrives. Therefore, a separate done flag is employed which tells the drain logic later on to not expect any further items from the queue.

Each individual Flow.Subscriber needs its own state so that its request amounts can be tracked, the MulticastSubscription class is employed:


static final class MulticastSubscription<T> 
extends AtomicLong
implements Flow.Subscription {

    final Flow.Subscriber<? super T> downstream;

    final MulticastProcessor<T> parent;

    long emitted;
    
    MulticastSubscription(Flow.Subscriber<? super T> downstream, 
            MulticastProcessor<T> parent) {
        this.downstream = downstream;
        this.parent = parent;
    }

    @Override
    public void request(long n) {
        if (n <= 0L) {
            cancel();
            // for brevity, serialization is omitted here, see HalfSerializer.
            downstream.onError(new IllegalArgumentException());
        } else {
            for (;;) {
                long current = (long)getAcquire();
                if (current == Long.MIN_VALUE || current == Long.MAX_VALUE) {
                    break;
                }
                long next = current + n;
                if (next < 0L) {
                    next = Long.MAX_VALUE;
                }
                if (compareAndSet(current, next)) {
                    parent.drain();
                    break;
                }
            }
        }
    }

    @Override
    public void cancel() {
        if ((long)getAndSet(Long.MIN_VALUE) != Long.MIN_VALUE) {
            parent.remove(this);
            parent.drain();
        }
    }
}


The MulticastSubscription takes a reference to the actual downstream Flow.Subscriber and the parent MulticastProcessor to trigger removal and item draining. The emitted field counts how many times downstream.onNext() was invoked to know the remaining items this particular subscription can take. The AtomicLong tracks the total downstream requested amounts so the remaining can be calculated with getAcquire() - emitted (should happen on a single thread). The Long.MIN_VALUE indicates the downstream has cancelled the subscription.

The parent MulticastProcessor requires a few services to handle the various MulticastSubscription instances: drain() to start emitting items if possible, remove() to let a particular Flow.Subscriber go. We'll need its pair, add(), to register a new Flow.Subscriber to work with. First, let's see add() and remove():


    boolean add(MulticastSubscription<T> ms) {
        for (;;) {
            MulticastSubscription<T>[] current = 
                (MulticastSubscription<T>[])SUBSCRIBERS.getAcquire(this);
            if (current == TERMINATED) {
                return false;
            }
            int n = current.length;
            MulticastSubscription<T>[] next = new MulticastSubscription[n + 1];
            System.arraycopy(current, 0, next, 0, n);
            next[n] = ms;
            if (SUBSCRIBERS.compareAndSet(this, current, next)) {
                return true;
            }
        }
    }


The add() method should look familiar. We read the current array of MulticastSubscriptions, see if it is the terminal indicator (in which case we return false indicating the MulticastProcessor is in a terminal state an no new Flow.Subscribers are accepted) and if not, we copy and extend the array to incorporate the incoming new MulticastSubscription. The compareAndSet then ensures the changes are applied atomically and in case of a concurrent change, the process is retried.

    void remove(MulticastSubscription<T> ms) {
        for (;;) {
            MulticastSubscription<T>[] current = 
                (MulticastSubscription<T>[])SUBSCRIBERS.getAcquire(this);
            int n = current.length;
            if (n == 0) {
                break;
            }

            int j = -1;
            for (int i = 0; i < n; i++) {
                 if (ms == current[i]) {
                     j = i;
                     break;
                 }
            }

            if (j < 0) {
                 break;
            }

            MulticastSubscription<T>[] next;
            if (n == 1) {
                next = EMPTY;
            } else {
                next = new MulticastSubscription[n + 1];
                System.arraycopy(current, 0, next, 0, j);
                System.arraycopy(current, j + 1, next, j, n - j - 1);
            }

            if (SUBSCRIBERS.compareAndSet(this, current, next)) {
                break;
            }
        }
    }


The remove() method is fairly typical again. We read the current array and if it is empty (due to having no subscribers or being terminated) we quit. Otherwise we search for the index of the MulticastSubscription and if found, a new array is created and items copied around the index into it. The compareAndSet again ensures proper visibility and retry in case of concurrent state changes. If the current array happens to be one element long, we avoid creating a new empty array and simply reuse the shared static EMPTY instance.

We can now implement the subscribe() method:


    @Override
    public void subscribe(Flow.Subscriber<? super T> subscriber) {
        Objects.requireNonNull(subscriber, "subscriber == null");

        MulticastSubscription<T> ms = new MulticastSubscription<T>(subscriber, this);
        subscriber.onSubscribe(ms);

        if (add(ms)) {
            if ((long)ms.getAcquire() == Long.MIN_VALUE) {
                remove(ms);
            }
            drain();
        } else {
            if ((long)ms.getAcquire() != Long.MIN_VALUE) {
               Throwable ex = error;
               if (ex == null) {
                   ms.downstream.onComplete();
               } else {
                   ms.downstream.onError(ex);
               }
            }
        }
    }


First, we verify subscriber is not null. Then, an unique instance of MulticastSubscription is associated with the Flow.Subscriber. We try to register this with the MulticastProcessor via add(). This may fail in case the MulticastProcessor reached its terminal state in which case we emit the terminal event, unless the Flow.Subscriber has cancelled in the meantime (the request accounting shows Long.MIN_VALUE). This can happen concurrently (remember, we sent out the Flow.Subscription via onSubscribe before) with the add() call so the remove() triggered by MulticastSubscription.cancel() may not find itself in the array: calling it again ensures that the reference is not retained in this corner case. In either case, the additional call to drain() will ensure items are dispatched as soon as possible to the (remaining) parties.

One could ask, why not call add() first and then call onSubscribe() since as long as there is no request() from downstream, the visibility of the MulticastSubscription shouldn't be a problem, right? Not exactly. The Reactive Streams specification allows emitting onError or onComplete without prior request; in this case, since the reference to the MulticastSubscription would be visible sooner, the onError/onComplete could be invoked before onSubscribe which is a violation of the specification (and will likely lead to NullPointerException down the line). Reversing the call order avoids this violation but requires the aforementioned add()-if cancelled-remove() logic to avoid leaking cancelled subscriptions.

Let's see the implementation of the remaining 4 public methods:


    @Override
    public void onSubscribe(Flow.Subscription s) {
        Objects.requireNonNull(s, "s == null");

        if (UPSTREAM.compareAndSet(this, null, s)) {
            s.request(prefetch);
        } else {
            s.cancel();
        }
    }

    @Override
    public void onNext(T item) {
        Objects.requireNonNull(item, "item == null");

        queue.offer(item);
        drain();
    }

    @Override
    public void onError(Throwable t) {
        Objects.requireNonNull(t, "t == null");

        error = t;
        done = true;
        drain();
    }

    @Override
    public void onComplete() {
        done = true;
        drain();
    }


The onSubscribe atomically sets the Flow.Subscription if not already set, cancels it otherwise. The other onXXX methods queue the item/error/done indicator and call drain() to dispatch events if possible. The Reactive Streams specification requires throwing NullPointerException if the input parameters are null.

Lastly, we can now implement the drain() method, piece by piece for better clarity. It is implemented as our typical queue-drain loop:

    void drain() {
        if ((int)WIP.getAndAdd(this, 1) != 0) {
            return;
        }

        int missed = 1;

        for (;;) {
            
            // TODO implement

            missed = (int)WIP.getAndAdd(this, -missed) - missed;
            if (missed == 0) {
                break;
            }
        }
    }


Usually, the inner loop starts with reading the requested amount, however, we don't have a single requested amount in this case: we have to find out the smallest number of items that can be emitted to all current Flow.Subscribers:

    // for (;;) {

    MulticastSubscription<T>[] current = 
        (MulticastSubscription<T>[])SUBSCRIBERS.getAcquire(this);
    int n = current.length;

    if (n != 0) {
        long requested = Long.MAX_VALUE;
        for (MulticastSubscription<T> ms : current) {
            long r = ms.getAcquire();
            if (r != Long.MIN_VALUE) {
                requested = Math.min(requested, r - ms.emitted);
            }
        }

        // TODO try emitting handle
    }

    // }


Remember, the amount of items a particular MulticastSubscription can receive is its total requested amount minus the emitted items to it so far.

Next, we try to emit any available item from the queue:

    // if (n != 0) { ...
 
    while (requested != 0) {

        boolean d = done;
        T v = queue.poll();
        boolean empty = v == null;

        if (d && empty) {
            // TODO terminal case
            return;
        }

        if (empty) {
            break;
        }

        for (MulticastSubscription<T> ms : current) {
            if ((long)ms.getAcquire() != Long.MIN_VALUE) {
                ms.downstream.onNext(v);
                ms.emitted++;
            }
        }

        requested--;

        if (++consumed == (prefetch >> 1)) {
            upstream.request(consumed);
            consumed = 0;
        }
    }

    // TODO no request - no more items case

    // }


In this inner loop, which is quit if requested reaches zero, we try to get the next available item from the queue and if found, we emit it to non-cancelled Flow.Subscribers inside the MulticastSubscriptions. Since the MulticastProcessor works in a stable-prefetch mode, we have to track the consumption via consumed and request more after a certain amount of items have been taken. In this case, we request half of the prefetch amount if we reach that amount.

One case to be implemented is when the upstream has terminated and we emitted all queued items:

    // if (d && empty) {
    Throwable ex = error;
    current = (MulticastSubscription<T>[])SUBSCRIBERS.getAndSet(this, TERMINATED);
    if (ex == null) {
        for (MulticastSubscription<T> ms : current) {
             if ((long)ms.getAcquire() != Long.MIN_VALUE) {
                ms.downstream.onComplete();
             }
        }
    } else {
        for (MulticastSubscription<T> ms : current) {
             if ((long)ms.getAcquire() != Long.MIN_VALUE) {
                ms.downstream.onError(ex);
             }
        }
    }
    return;
    // }


We atomically swap in the terminal array which gives us the last array of MulticastSubscriptions. We have to do this because more subscribers could have arrived since the previous SUBSCRIBERS.getAcquire() and would be left non-terminated and hanging forever. The non-cancelled Flow.Subscribers are terminated with onError or onComplete based on the error field's contents.

Lastly, we have to deal with the case when none of the MulticastSubscribers have requested yet (or run out of requests) and the upstream has terminated without any further values stored in the queue:

    // while (requested != 0) {
    // ...
    // }

    if (requested == 0) {
        boolean d = done;
        boolean empty = queue.isEmpty();

        if (d && empty) {
            Throwable ex = error;
            current = (MulticastSubscription<T>[])SUBSCRIBERS.getAndSet(this, TERMINATED);
            if (ex == null) {
                for (MulticastSubscription<T> ms : current) {
                     if ((long)ms.getAcquire() != Long.MIN_VALUE) {
                        ms.downstream.onComplete();
                     }
                }
            } else {
                for (MulticastSubscription<T> ms : current) {
                     if ((long)ms.getAcquire() != Long.MIN_VALUE) {
                        ms.downstream.onError(ex);
                     }
                }
            }
            return;
        }
    }


Unsurprisingly, this almost looks like as the fist part of the while (requested != 0) loop, except we don't poll the item if there is one as we can't put it back and would lose it.


Testing with the Reactive Streams TCK


Now that the code is complete, we want to make sure the implementation properly implements the Reactive Streams specification. We can do this via the Test Compatibility Kit of it, however, since Reactive Streams is a separate library predating the Java 9 Flow API, we need some adapters to utilize the TCK. Fortunately, the upcoming 1.0.2 both features such adapters and a Java 9 Flow-based port of the TCK itself. For that, one only needs to import the tck-flow (and its dependencies if doing it manually!):


    compile "org.reactivestreams:reactive-streams-tck-flow:1.0.2-RC2"


When writing this blog post, the release candidate version is only available, hence the -RC2 postfix.

The TCK contains the IdentityFlowProcessorVerification we must implement in our tests. Note that the TCK uses TestNG instead of JUnit.


@Test
public class MulticastProcessorTest extends IdentityFlowProcessorVerification<Integer> {

    public MulticastProcessorTest() {
        super(new TestEnvironment(50));
    }

    @Override
    public ExecutorService publisherExecutorService() {
        return Executors.newCachedThreadPool();
    }

    @Override
    public Integer createElement(int element) {
        return element;
    }

    @Override
    protected Flow.Publisher<Integer> createFailedFlowPublisher() {
        MulticastProcessor mp = new MulticastProcessor<>(128);
        mp.onError(new IOException());
        return mp;
    }

    @Override
    protected Flow.Processor<Integer, Integer> createIdentityFlowProcessor(int bufferSize) {
        return new MulticastProcessor<>(bufferSize);
    }
}


At first glance, there is no complication in writing the test. We have to specify a TestEnvironment with a default timeout (50 milliseconds should be enough), we have t specify a helper ExecutorService from which test items will be emitted. Those test items will be created via createElement so the target type of the Flow.Processor to be tested can be matched (plain integer in this case). The createFailedFlowPublisher verifies an immediately failing Flow.Publisher (remember Flow.Processor is one) and the createIdentityFlowProcessor should return a fresh and empty processor instance.

If we run the test, 3 of the 68 tests will fail (plus 24 gets ignored as they don't apply or can't be properly verified as of now):


  1. required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError
  2. required_mustRequestFromUpstreamForElementsThatHaveBeenRequestedLongAgo
  3. required_exerciseWhiteboxHappyPath


From these, failures 1) and 2) are due to what the TCK expects the Flow.Processor behaves by default: independent requests from independend Flow.Subscribers must reach the upstream and produce elements independently. Unfortunately, our MulticastProcessor uses a prefetch and lockstep mechanisms, thus if only one Flow.Subscriber requested and the others didn't, nobody gets items. Fortunately, the 1.0.2 has been enhanced in this regard and one only has to override a method in the IdentityFlowProcessorVerification class:


    @Override
    public boolean doesCoordinatedEmission() {
        return true;
    }


This will instruct the methods 1) and 2) to try a different request-emission pattern ant they should now pass.

Now lets look at 3) which fails with a different exception: "Did not receive expected cancelling of upstream subscription within 50 ms". Indeed, looking at the code, there upstream's Flow.Subscription.cancel() is never called on a normal path (only when onSubscribe is called twice).

What's happening? This is another default expectation of the TCK: when all Flow.Subscribers cancel the Flow.Processor implementation is expected to cancel its upstream promptly. This may not be the desired business behavior as one may want to keep dispatching even if Flow.Subscribers come and go and temporarily, there are no downstream to send events to. Unfortunately, there is no switch for this in the TCK and we have to modify the MulticastProcessor for anticipate this expectation. This is similar to the concept and behavior of refCount() in RxJava.

Luckily, it requires only a small set of changes. We can detect when all the Flow.Subscribers have cancelled conveniently in the remove() method and instead of setting the subscribers array to EMPTY, we set it to TERMINATED and cancel() the upstream instance:

    void remove(MulticastSubscription<T> ms) {
        for (;;) {

            // the find index is omitted for brevity

            MulticastSubscription<T>[] next;
            if (n == 1) {
                if (UPSTREAM.getAcquire(this) == null) {
                    next = EMPTY;
                } else {
                    next = TERMINATED;
                }
            } else {
                next = new MulticastSubscription[n + 1];
                System.arraycopy(current, 0, next, 0, j);
                System.arraycopy(current, j + 1, next, j, n - j - 1);
            }

            if (SUBSCRIBERS.compareAndSet(this, current, next)) {
                if (next == TERMINATED) {
                    ((Flow.Subscription)UPSTREAM.getAcquire(this)).cancel();
                }
                break;
            }
        }
    }


This change will allow Flow.Subscribers to come and go until the MulticastPublisher is subscribed to another Flow.Publisher without terminating the MulticastPublisher itself and otherwise will cancel the upstream on a live one.


Conclusion


In this post, I showed how a multicasting Flow.Processor can be implemented and then tested with the Reactive Streams TCK. In fact, one reason I was holding off the demonstration of this MulticastProcessor is because of the limits of the TCK itself: prior to 1.0.2-RC2, now available from Maven, there was no standard adapter and no support for a lockstepping Flow.Publisher implementation. Now that I contributed both and the maintainers released it after some time, we can now use it and experiment with various Java 9-based Flow.Processor and Flow.Publisher implementations.

We have applied the usual patterns: copy-on-write with terminal state, queue-drain and request accounting that should be quite familiar for veteran readers of this blog. Once the test indicated issues, we could simply adjust the logic to match its expectations by tweaking the nice thread-safe and lock-free logic of it.

2017. november 29., szerda

When multiple subscribeOn()s do have effect

Introduction


In many tutorials and explanations, it has been said that having multiple subscribeOn()s has no effect and only the one closest to the source wins. I often tell this with the wording "no practical effect". However, it is possible to demonstrate the effects of multiple subscibeOn()s that have some actual effects.

What is subscribeOn again?


The most precise definition of this operator I can formulate is as follows:


  • subscribeOn changes where (on what thread) the (side) effects of calling subscribe() on the parent/upstream Observable (Flowable, Single, etc.) happen.


So what are these subscription (side) effects look like in code?


Observable.create(emitter -> {
    for (int i = 0; i < 10; i++) {
        emitter.onNext(i + ": " + Thread.currentThread().getName());
    }
    emitter.onComplete();
})
.subscribeOn(Schedulers.io())
.blockingSubscribe(System.out::println);

// Prints:
// -------
// 0: RxCachedThreadScheduler-1
// 1: RxCachedThreadScheduler-1
// 2: RxCachedThreadScheduler-1
// 3: RxCachedThreadScheduler-1
// 4: RxCachedThreadScheduler-1
// 5: RxCachedThreadScheduler-1
// 6: RxCachedThreadScheduler-1
// 7: RxCachedThreadScheduler-1
// 8: RxCachedThreadScheduler-1
// 9: RxCachedThreadScheduler-1


In this example, the effect of subscribing is that the body of the ObservableOnSubscribe starts running on the thread provided via the io() Scheduler.

Applying yet another subscribeOn after the first one won't change what is printed to the console.

Most source-like operators, such as create(), fromCallable(), fromIterable(), do have subscription side-effects as they often start emitting event(s) immediately.

Most instance operators, such as map(), filter(), take(), don't have subscription side-effects on their own and just subscribe() to their upstream.

Instance operators with subscription side-effects


However, there are a couple of instance operators that do have subscription side-effects. Specifically, any operator that offers a way to specify a per subscriber initial state via some callback.


Observable.create(emitter -> {
    for (int i = 0; i < 10; i++) {
        emitter.onNext(i + ": " + Thread.currentThread().getName());
    }
    emitter.onComplete();
})
.subscribeOn(Schedulers.io())
.collect(() -> {
       List<String> list = new ArrayList<>();
       list.add(Thread.currentThread().getName());
       return list;
    }, 
    (a, b) -> a.add(b)
)
.blockingSubscribe(list -> list.forEach(System.out::println));


This will print the following:


main
0: RxCachedThreadScheduler-1
1: RxCachedThreadScheduler-1
2: RxCachedThreadScheduler-1
3: RxCachedThreadScheduler-1
4: RxCachedThreadScheduler-1
5: RxCachedThreadScheduler-1
6: RxCachedThreadScheduler-1
7: RxCachedThreadScheduler-1
8: RxCachedThreadScheduler-1
9: RxCachedThreadScheduler-1


The first printout, main, is due to the fact that the blockingSubscribe() is subscribing on the main thread and the collect() operator performs its initialization side-effect on the main thread as well.

Now let's see what happens if we add subscribeOn() after collect:


Observable.create(emitter -> {
    for (int i = 0; i < 10; i++) {
        emitter.onNext(i + ": " + Thread.currentThread().getName());
    }
    emitter.onComplete();
})
.subscribeOn(Schedulers.io())
.collect(() -> {
       List<String> list = new ArrayList<>();
       list.add(Thread.currentThread().getName());
       return list;
    }, 
    (a, b) -> a.add(b)
)
.subscribeOn(Schedulers.computation())  // <---------------------------------------
.blockingSubscribe(list -> list.forEach(System.out::println));

will print the following:

RxComputationThreadPool-1
0: RxCachedThreadScheduler-1
1: RxCachedThreadScheduler-1
2: RxCachedThreadScheduler-1
3: RxCachedThreadScheduler-1
4: RxCachedThreadScheduler-1
5: RxCachedThreadScheduler-1
6: RxCachedThreadScheduler-1
7: RxCachedThreadScheduler-1
8: RxCachedThreadScheduler-1
9: RxCachedThreadScheduler-1


Because subscribeOn(Schedulers.computation()) performs its subscribe() call on one of the computation thread, the subscription side-effect of creating the list (for collecting Strings from upstream) is happening on that thread. Then, subscribeOn(Schedulers.io()) again switches threads to an io thread where the subscription effect of emitting more Strings begins.


Conclusion


Having multiple subscribeOn()s in the same chain has often no practical effect because it just changes the thread where its upstream is subscribed to and most instance operators don't perform any side-effects in their subscribeActual() implementation. Source operators often have side-effects but since they reside on top of a chain, plus since subscribe() travels upstream, only the closest subscribeOn() to this source will have an effect where the code in the source's subscribeActual() will be executed.

However, when an intermediate operator has subscription-side effects, such as executing a user provided callback to perform some per-subscriber initialization, the thread changing of a downstream subscribeOn() will have an effect on this initialization. When is this property relevant? For example, when establishing that initial state is computation heavy or involves blocking calls.

This may seem an odd property, but it can be derived from first principles such as how, where and when control- and item-flow happen in the various operators involved. 

2017. október 19., csütörtök

Android LiveData API: a quick look

Introduction


Threading and lifecycle are one of the top concerns when developing applications for the Android platform. UI has to be interacted with on a dedicated thread (main thread) but in order to keep the UI responsible to user input and rendering, blocking or CPU intensive calculations should be kept off it.

In addition, views can get destroyed and recreated in a way that is outside of a given application's control unlike a desktop Swing application. This means background tasks must be stopped and listeners removed to prevent leaking references to now logically dead objects.

RxJava and RxAndroid can help with threading concerns and there are other libraries that tap into the various lifecycle events; in general, this means someone will call dispose() on a particular flow or clear() on a CompositeDisposable to mass-cancel multiples of them.

Having a rich set of transformative and coordinating operators along with support for normal values, errors and finite sequences may be overwhelming compared to a classical Listener-based API. Google's LiveData is one of such classical Listener style APIs but unlike Swing's ActionListener for example, there are explicit requirements that interaction with the LiveData object itself happens on the main thread and signals will be dispatched from the main thread to Observers to it.

LiveData API


Unfortunately, I wasn't able to locate a public repository for the LiveData sources and had to rely on the sources downloaded from Google's Maven repository:

compile "android.arch.lifecycle:reactivestreams:1+"

There is an interoperation library associated with LiveData that allows presenting and consuming events from any Reactive-Streams Publisher. This will transitively import the actual LiveData library. Note that LiveData is currently considered beta and may change arbitrarily before release. That said, I don't think the core structure and premise will actually change.

The main consumer type is the android.arch.lifecycle.Observer with its single onChanged(T) method. Given an arbitrary LiveData instance, one can use observe(LifecycleOwner, Observer) or observeForever(Observer) methods to attach the listener to a source. The onChanged(T) will be called on the main thread. Cancelling an observation requires one to call removeObserver() for which one has to remember both the original LiveData object and the Observer instance itself. All the methods return void. The LifecycleOwner will be listened for destroy events, at which point the Observers are removed from the LiveData instance that depended on that specific source. This is similar to RxBinding's bindToLifecycle() approach composed onto a flow.

The LiveData abstract class doesn't feature any transformative or coordinating operators on itself, but the library features an utility class, Transformations, currently with a map and switchMap operations. Both will execute the transformations and emissions on the main thread, but otherwise they match the behavior of the same RxJava operators.

In theory, one could interpret LiveData as a cold source, but its usage patterns point to a hot source, just like a BehaviorSubject in RxJava; LiveData remembers the last value and emits it first to new consumers. To signal to consumers, LiveData has protected methods setValue and postValue. setValue requires the value to be set on the main thread and postValue will schedule a task to the main thread to call setValue there. To manually invoke these methods, a MutableLiveData object is available where the two methods have been made public.


Possible shortcomings


The main shortcoming I see with LiveData is the ubiquitous "return to main thread" mentality. For example, querying data from a LiveData-enabled datasource and preprocessing it should happen in a background thread as well before the aggregated data is sent back to the main thread for displaying them. Unfortunately, the datasource will notify the preprocessing Observer on the main thread for each resulting item, which Observer then has to schedule tasks on a background thread for all those items. The main thread is involved too early in such flows which wastes time for the main thread.

A second shortcoming could be the lack of terminal event notifications included in the Observer API. Of course, one can define an union-class with the value, error and complete subtypes, just like RxJava's Notification object, to work around this problem. Needless to say, such wrapper will cost allocation and indirection.

Lastly, addListener/removeListener pairs compose relatively poorly and requires the operator to remember both the original LiveData and the Observer that was used when the link was established. An onSubscribe(Disposable) style cancellation support worked out much nicer in RxJava 2. Of course, having a similar method in Observer would break its lambda-friendly Single Abstract Method structure.


Reactive-Streams interoperation


Consuming a LiveData as Publisher


RxJava has infrastructure support to work with such classical Listener-style sources. Specifically, the create() operator on Observable and Flowable show such an example usage in its JavaDoc. Consequently, talking to LiveData is a straightforward routine:


LiveData<String> source = new MutableLiveData<String>();
Flowable<String> f = Flowable.<String>create(emitter -> {
   Observer<String> o = v -> {
       emitter.onNext(v);
   };
   
   source.observeForever(o);
   emitter.setCancellable(() -> 
       AndroidSchedulers.mainThread()
       .scheduleDirect(() -> 
            mld.removeObserver(o)
       )
   );
}, BackpressureStrategy.LATEST);


The Reactive-Streams bridge in LiveDataReactiveStreams, of course, has no access to Flowable and has to work out the Subscription management manually. Unfortunately, the toPublisher() implementation is wrong. If there was a public repository, I could contribute a correct one (without using RxJava of course).

Let's see the implementation piece by piece to demonstrate how not to write Publishers. (The file is marked as Apache 2.0 open source, should be fine to quote it here, right?)


    public static <T> Publisher<T> toPublisher(
            final LifecycleOwner lifecycle, final LiveData<T> liveData) {

        return new Publisher<T>() {
            boolean mObserving;
            boolean mCanceled;
            long mRequested;
            @Nullable
            T mLatest;

            @Override
            public void subscribe(final Subscriber<T> subscriber) {
                // continued below
            }
        };
    }


The first red flag comes from the mutable, non thread-safe state of the anonymous Publisher. Hot sources may have mutable fields but those are protected with proper synchronization primitives (see PublishProcessor for example). However, these fields should be part of the state of the Subscription that is sent to a Subscriber as Publishers are expected to be consumed by any number of Subscribers and interaction with one Subscriber should not affect the interactions with the others.

Of course, a Publisher can chose to only service a single Subscriber (as with UnicastProcessor), but for that, there is no field to let the subscribe() know there is already someone consuming the Publisher.

Next, the Observer instance is defined with a backpressure strategy of keeping the latest if the downstream is not ready.


    final Observer observer = new Observer() {
        @Override
        public void onChanged(@Nullable T t) {
            if (mCanceled) {
                return;
            }
            if (mRequested > 0) {
                mLatest = null;
                subscriber.onNext(t);
                if (mRequested != Long.MAX_VALUE) {
                    mRequested--;
                }
            } else {
                mLatest = t;
            }
        }
    };


Apart from the wrong shared state of the mCanceled, mRequested and mLatest, the code relies on the fact that both the onChanged invocation and the request() call happens on the main thread (see below). Having a per Subscriber atomic mRequested instead would save a trip to the main thread for an increment.

The next segment deals with the request() call from the downstream.


    subscriber.onSubscribe(new Subscription() {
        @Override
        public void request(final long n) {
            if (n < 0 || mCanceled) {
                return;
            }
            ArchTaskExecutor.getInstance().executeOnMainThread(new Runnable() {
                @Override
                public void run() {
                    if (mCanceled) {
                        return;
                    }
                    // Prevent overflowage.
                    mRequested = mRequested + n >= mRequested
                               ? mRequested + n : Long.MAX_VALUE;
                    if (!mObserving) {
                         mObserving = true;
                         liveData.observe(lifecycle, observer);
                    } else if (mLatest != null) {
                         observer.onChanged(mLatest);
                         mLatest = null;
                    }
                }
            });
        }



The first (smaller) problem is that non-positive requests should be rewarded with an onError(IllegalArgumentException) properly serialized with any onNext calls. Here, it is ignored. In addition, request() may be called from any thread and such call may not see mCanceled properly. The mObserving adds an odd delay to start observing the actual LiveData object. (Since most sequences will issue a request() almost immediately, I'm not sure what benefit this can give, however, there were voices pretty keen on such behavior recently who couldn't understand that to control the first invocation of request(), one would have to consume a Publisher directly with a Subscriber and not have any intermediate operators between the two.)

Lastly, let's see cancel():

    @Override
    public void cancel() {
        if (mCanceled) {
            return;
        }
        ArchTaskExecutor.getInstance().executeOnMainThread(new Runnable() {
            @Override
            public void run() {
                if (mCanceled) {
                    return;
                }
                if (mObserving) {
                    liveData.removeObserver(observer);
                    mObserving = false;
                }
                mLatest = null;
                mCanceled = true;
            }
       });
   }


Since removeObserver has to be called from the main thread (which by itself could be inconvenient), a task is scheduled which will also update mCanceled to true. Since cancel() can be invoked from any thread, a non-volatile mCanceled may not be visible. In addition, since the mCanceled is set to true on the main thread, there could be a significant delay between a cancel() and its effects, letting onChanged() calls still through. Setting a volatile mCanceled true right in the cancel() method could have the stopping effects much earlier.

(In fact, this was a problem with our early unsubscribeOn() where the Reactive-Streams TCK would complain about still receiving too events after it has cancelled the flow. The solution was to set a volatile cancelled flag that would be read by onNext and drop events until the upstream.cancel() got executed asynchronously.)

It is relatively simple to fix the mistakes here: combine Observer and Subscription into one class and move the mXXX fields into this class.

Exposing a Publisher as LiveData


The unfortunate lack of terminal events in LiveData forces us to deal with only onNext calls. There is no way to tell an Observer that no more events will come (unless agreeing upon an event container type) and then get rid of them en-masse (there is no clearObservers()).

With RxJava, this could be done relatively easily:


    MutableLiveData<String> mld2 = new MutableLiveData<>();
    Disposable d = Flowable.range(1, 5).map(Object::toString)
            .subscribe(mld2::postValue);


Of course, one has to manage the Disposable as usual and possibly decide, when to subscribe() to a cold source with respect to available Observers of the (Mutable)LiveData. A refCount-like behavior is possible but for that, one has to implement a custom LiveData object.

Apparently, there exist events on the LiveData class, onActive and onInactive, that will be called when the first Observer arrives and the last Observer leaves respectively. The ReactiveStreamsLiveData.fromPublisher() uses them in its implementation:

    private static class PublisherLiveData<T> extends LiveData<T> {
        private WeakReference<Subscription> mSubscriptionRef;
        private final Publisher mPublisher;
        private final Object mLock = new Object();

        PublisherLiveData(@NonNull final Publisher publisher) {
            mPublisher = publisher;
        }

        // the rest is below
}


The mSubscriptionRef holds the current active connection's Subscription and the mLock protects it so that a concurrent call to onSubscribe may not clash with the deactivation of the LiveData object. As we'll see in a short, this is not enough because the race can leave the upstream's Subscription from an old activation not cancelled and thus still posting to the LiveData object.

The onActivate event should subscribe to the source and relay its onNext events:

    @Override
    protected void onActive() {
        super.onActive();

        mPublisher.subscribe(new Subscriber<T>() {
            @Override
            public void onSubscribe(Subscription s) {
                // Don't worry about backpressure. If the stream is too noisy then
                // backpressure can be handled upstream.
                synchronized (mLock) {
                    s.request(Long.MAX_VALUE);
                    mSubscriptionRef = new WeakReference<>(s);
                }
            }

            @Override
            public void onNext(final T t) {
                postValue(t);
            }

            @Override
            public void onError(Throwable t) {
                synchronized (mLock) {
                    mSubscriptionRef = null;
                }
                // Errors should be handled upstream, so propagate as a crash.
                throw new RuntimeException(t);
            }

            @Override
            public void onComplete() {
                synchronized (mLock) {
                    mSubscriptionRef = null;
                }
            }
        });
    }


Here, apart from the mSubscriptionRef management, the only small mistake is the call to request() from within the lock itself. A bigger mistake is that onError should not throw.

The onInactive should cancel any existing Subscription if the Subscription representing that connection has been received:


    @Override
    protected void onInactive() {
        super.onInactive();
        synchronized (mLock) {
            WeakReference<Subscription> subscriptionRef = mSubscriptionRef;
            if (subscriptionRef != null) {
                Subscription subscription = subscriptionRef.get();
                if (subscription != null) {
                    subscription.cancel();
                }
                mSubscriptionRef = null;
            }
        }
    }


Unfortunately, the call to onSubscribe is not guaranteed to happen on the same thread the Publisher.subscribe() is invoked. Even though RxJava's Flowable almost always does so, other Reactive-Streams implementations are free to delay the call to onSubscribe and issue it from any thread. (We deal with such situations in Flowable via the deferred requesting/cancellation mechanism.)

Consider the following scenario. We know onActive and onInactive execute on the main thread thus they are serialized in respect to each other. Let's assume a Publisher will signal onSubscribe in a delayed manner when subscribed to. The main thread executes onActive that calls subscribe() on the Publisher. Then the main thread executes onInactive() after. Since the Publisher hasn't called onSubscribe yet, there is no mSubscriptionRef and thus there is nothing to cancel. The onSubscribe() is then invoked by the Publisher and it starts streaming events (to nobody practically). Now the main thread executes onActive again, triggering yet another subscription to the Publisher. After onSubscribe() is called again, the previous Subscription is overwritten and there are now twice as many onNext events posted to the LiveData (these could be duplicates in case the Publisher is hot/multicasting, or arbitrary items from various stages of a cold Publisher). Since the previous Subscription is lost, it is not possible to stop the first subscription other than letting it terminate on its own pace.

One possible solution is to host the Subscriber in an AtomicReference inside the custom LiveData, plus have that Subscriber store the incoming Subscription in an AtomicReference too. Therefore, an onInactive can get rid of the previous Subscriber and at the same time, instruct it to cancel the Subscription incoming through onSubscribe without requesting from it:


    static final class PublisherLiveData<T> extends LiveData<T> {
        final Publisher<T> mPublisher;
        final AtomicReference<SubscriberLiveData> mSubscriber;

        PublisherLiveData(@NonNull final Publisher<T> publisher) {
            mPublisher = publisher;
            mSubscriber = new AtomicReference<>();
        }

        @Override
        protected void onActive() {
            super.onActive();
            SubscriberLiveData s = new SubscriberLiveData();
            mSubscriber.set(s);
            mPublisher.subscribe(s);
        }

        @Override
        protected void onInactive() {
            super.onInactive();
            SubscriberLiveData s = mSubscriber.getAndSet(null);
            if (s != null) {
                s.cancel();
            }
        }

        final class SubscriberLiveData 
        extends AtomicReference<Subscription>
        implements Subscriber<T>, Subscription {
            @Override
            public void onSubscribe(Subscription s) {
                if (compareAndSet(null, s)) {
                    s.request(Long.MAX_VALUE);
                } else {
                    s.cancel();
                }
            }

            @Override
            public void onNext(T item) {
                postValue(item);
            }

            @Override
            public void onError(Throwable ex) {
                lazySet(this);
                mSubscriber.compareAndSet(this, null);
                ex.printStackTrace();
            }

            @Override
            public void onComplete() {
                lazySet(this);
                mSubscriber.compareAndSet(this, null);
            }

            @Override
            public void cancel() {
                Subscription s = getAndSet(this);
                if (s != null && s != this) {
                    s.cancel();
                }
            }
            
            @Override
            public void request(long n) {
                // never called
            }
        }
    }

Although onActive and onInactive would update mSubscriber from the main thread, the atomics is preferable since we want to set the current Subscriber to null once the Publisher terminated, thus playing nice with the GC. (For comparison, the refCount in RxJava is more involved because connection and disconnection may be triggered from any thread at any time.)


Conclusion


The LiveData class can be considered a classical (gen 0) reactive-push component which has practically one purpose on Android: notify its Observers of data (changes) on the main thread. If the typical thread crossing in the app is mainly from a background thread to the main thread, LiveData may be just enough to suit one's needs. It's one step above Agera after all since LiveData actually transmits data to be acted upon, not just "something happened" signals. However, if possibly failing and finite sources are present, which also have to be coordinated, transformed and kept on background thread(s) as long as possible, I believe RxJava is a better choice.

Nevertheless, there are standard interoperation bridges provided with LiveData which would allow it to work with or pose as a Reactive-Streams Publisher, but unfortunately, the implementation of this bridge has concurrency flaws that could be easily fixed by a PR, if there was an open repository for the library (because you'd want to post unit tests as well).