A következő címkéjű bejegyzések mutatása: backpressure. Összes bejegyzés megjelenítése
A következő címkéjű bejegyzések mutatása: backpressure. Összes bejegyzés megjelenítése

2017. október 19., csütörtök

Android LiveData API: a quick look

Introduction


Threading and lifecycle are one of the top concerns when developing applications for the Android platform. UI has to be interacted with on a dedicated thread (main thread) but in order to keep the UI responsible to user input and rendering, blocking or CPU intensive calculations should be kept off it.

In addition, views can get destroyed and recreated in a way that is outside of a given application's control unlike a desktop Swing application. This means background tasks must be stopped and listeners removed to prevent leaking references to now logically dead objects.

RxJava and RxAndroid can help with threading concerns and there are other libraries that tap into the various lifecycle events; in general, this means someone will call dispose() on a particular flow or clear() on a CompositeDisposable to mass-cancel multiples of them.

Having a rich set of transformative and coordinating operators along with support for normal values, errors and finite sequences may be overwhelming compared to a classical Listener-based API. Google's LiveData is one of such classical Listener style APIs but unlike Swing's ActionListener for example, there are explicit requirements that interaction with the LiveData object itself happens on the main thread and signals will be dispatched from the main thread to Observers to it.

LiveData API


Unfortunately, I wasn't able to locate a public repository for the LiveData sources and had to rely on the sources downloaded from Google's Maven repository:

compile "android.arch.lifecycle:reactivestreams:1+"

There is an interoperation library associated with LiveData that allows presenting and consuming events from any Reactive-Streams Publisher. This will transitively import the actual LiveData library. Note that LiveData is currently considered beta and may change arbitrarily before release. That said, I don't think the core structure and premise will actually change.

The main consumer type is the android.arch.lifecycle.Observer with its single onChanged(T) method. Given an arbitrary LiveData instance, one can use observe(LifecycleOwner, Observer) or observeForever(Observer) methods to attach the listener to a source. The onChanged(T) will be called on the main thread. Cancelling an observation requires one to call removeObserver() for which one has to remember both the original LiveData object and the Observer instance itself. All the methods return void. The LifecycleOwner will be listened for destroy events, at which point the Observers are removed from the LiveData instance that depended on that specific source. This is similar to RxBinding's bindToLifecycle() approach composed onto a flow.

The LiveData abstract class doesn't feature any transformative or coordinating operators on itself, but the library features an utility class, Transformations, currently with a map and switchMap operations. Both will execute the transformations and emissions on the main thread, but otherwise they match the behavior of the same RxJava operators.

In theory, one could interpret LiveData as a cold source, but its usage patterns point to a hot source, just like a BehaviorSubject in RxJava; LiveData remembers the last value and emits it first to new consumers. To signal to consumers, LiveData has protected methods setValue and postValue. setValue requires the value to be set on the main thread and postValue will schedule a task to the main thread to call setValue there. To manually invoke these methods, a MutableLiveData object is available where the two methods have been made public.


Possible shortcomings


The main shortcoming I see with LiveData is the ubiquitous "return to main thread" mentality. For example, querying data from a LiveData-enabled datasource and preprocessing it should happen in a background thread as well before the aggregated data is sent back to the main thread for displaying them. Unfortunately, the datasource will notify the preprocessing Observer on the main thread for each resulting item, which Observer then has to schedule tasks on a background thread for all those items. The main thread is involved too early in such flows which wastes time for the main thread.

A second shortcoming could be the lack of terminal event notifications included in the Observer API. Of course, one can define an union-class with the value, error and complete subtypes, just like RxJava's Notification object, to work around this problem. Needless to say, such wrapper will cost allocation and indirection.

Lastly, addListener/removeListener pairs compose relatively poorly and requires the operator to remember both the original LiveData and the Observer that was used when the link was established. An onSubscribe(Disposable) style cancellation support worked out much nicer in RxJava 2. Of course, having a similar method in Observer would break its lambda-friendly Single Abstract Method structure.


Reactive-Streams interoperation


Consuming a LiveData as Publisher


RxJava has infrastructure support to work with such classical Listener-style sources. Specifically, the create() operator on Observable and Flowable show such an example usage in its JavaDoc. Consequently, talking to LiveData is a straightforward routine:


LiveData<String> source = new MutableLiveData<String>();
Flowable<String> f = Flowable.<String>create(emitter -> {
   Observer<String> o = v -> {
       emitter.onNext(v);
   };
   
   source.observeForever(o);
   emitter.setCancellable(() -> 
       AndroidSchedulers.mainThread()
       .scheduleDirect(() -> 
            mld.removeObserver(o)
       )
   );
}, BackpressureStrategy.LATEST);


The Reactive-Streams bridge in LiveDataReactiveStreams, of course, has no access to Flowable and has to work out the Subscription management manually. Unfortunately, the toPublisher() implementation is wrong. If there was a public repository, I could contribute a correct one (without using RxJava of course).

Let's see the implementation piece by piece to demonstrate how not to write Publishers. (The file is marked as Apache 2.0 open source, should be fine to quote it here, right?)


    public static <T> Publisher<T> toPublisher(
            final LifecycleOwner lifecycle, final LiveData<T> liveData) {

        return new Publisher<T>() {
            boolean mObserving;
            boolean mCanceled;
            long mRequested;
            @Nullable
            T mLatest;

            @Override
            public void subscribe(final Subscriber<T> subscriber) {
                // continued below
            }
        };
    }


The first red flag comes from the mutable, non thread-safe state of the anonymous Publisher. Hot sources may have mutable fields but those are protected with proper synchronization primitives (see PublishProcessor for example). However, these fields should be part of the state of the Subscription that is sent to a Subscriber as Publishers are expected to be consumed by any number of Subscribers and interaction with one Subscriber should not affect the interactions with the others.

Of course, a Publisher can chose to only service a single Subscriber (as with UnicastProcessor), but for that, there is no field to let the subscribe() know there is already someone consuming the Publisher.

Next, the Observer instance is defined with a backpressure strategy of keeping the latest if the downstream is not ready.


    final Observer observer = new Observer() {
        @Override
        public void onChanged(@Nullable T t) {
            if (mCanceled) {
                return;
            }
            if (mRequested > 0) {
                mLatest = null;
                subscriber.onNext(t);
                if (mRequested != Long.MAX_VALUE) {
                    mRequested--;
                }
            } else {
                mLatest = t;
            }
        }
    };


Apart from the wrong shared state of the mCanceled, mRequested and mLatest, the code relies on the fact that both the onChanged invocation and the request() call happens on the main thread (see below). Having a per Subscriber atomic mRequested instead would save a trip to the main thread for an increment.

The next segment deals with the request() call from the downstream.


    subscriber.onSubscribe(new Subscription() {
        @Override
        public void request(final long n) {
            if (n < 0 || mCanceled) {
                return;
            }
            ArchTaskExecutor.getInstance().executeOnMainThread(new Runnable() {
                @Override
                public void run() {
                    if (mCanceled) {
                        return;
                    }
                    // Prevent overflowage.
                    mRequested = mRequested + n >= mRequested
                               ? mRequested + n : Long.MAX_VALUE;
                    if (!mObserving) {
                         mObserving = true;
                         liveData.observe(lifecycle, observer);
                    } else if (mLatest != null) {
                         observer.onChanged(mLatest);
                         mLatest = null;
                    }
                }
            });
        }



The first (smaller) problem is that non-positive requests should be rewarded with an onError(IllegalArgumentException) properly serialized with any onNext calls. Here, it is ignored. In addition, request() may be called from any thread and such call may not see mCanceled properly. The mObserving adds an odd delay to start observing the actual LiveData object. (Since most sequences will issue a request() almost immediately, I'm not sure what benefit this can give, however, there were voices pretty keen on such behavior recently who couldn't understand that to control the first invocation of request(), one would have to consume a Publisher directly with a Subscriber and not have any intermediate operators between the two.)

Lastly, let's see cancel():

    @Override
    public void cancel() {
        if (mCanceled) {
            return;
        }
        ArchTaskExecutor.getInstance().executeOnMainThread(new Runnable() {
            @Override
            public void run() {
                if (mCanceled) {
                    return;
                }
                if (mObserving) {
                    liveData.removeObserver(observer);
                    mObserving = false;
                }
                mLatest = null;
                mCanceled = true;
            }
       });
   }


Since removeObserver has to be called from the main thread (which by itself could be inconvenient), a task is scheduled which will also update mCanceled to true. Since cancel() can be invoked from any thread, a non-volatile mCanceled may not be visible. In addition, since the mCanceled is set to true on the main thread, there could be a significant delay between a cancel() and its effects, letting onChanged() calls still through. Setting a volatile mCanceled true right in the cancel() method could have the stopping effects much earlier.

(In fact, this was a problem with our early unsubscribeOn() where the Reactive-Streams TCK would complain about still receiving too events after it has cancelled the flow. The solution was to set a volatile cancelled flag that would be read by onNext and drop events until the upstream.cancel() got executed asynchronously.)

It is relatively simple to fix the mistakes here: combine Observer and Subscription into one class and move the mXXX fields into this class.

Exposing a Publisher as LiveData


The unfortunate lack of terminal events in LiveData forces us to deal with only onNext calls. There is no way to tell an Observer that no more events will come (unless agreeing upon an event container type) and then get rid of them en-masse (there is no clearObservers()).

With RxJava, this could be done relatively easily:


    MutableLiveData<String> mld2 = new MutableLiveData<>();
    Disposable d = Flowable.range(1, 5).map(Object::toString)
            .subscribe(mld2::postValue);


Of course, one has to manage the Disposable as usual and possibly decide, when to subscribe() to a cold source with respect to available Observers of the (Mutable)LiveData. A refCount-like behavior is possible but for that, one has to implement a custom LiveData object.

Apparently, there exist events on the LiveData class, onActive and onInactive, that will be called when the first Observer arrives and the last Observer leaves respectively. The ReactiveStreamsLiveData.fromPublisher() uses them in its implementation:

    private static class PublisherLiveData<T> extends LiveData<T> {
        private WeakReference<Subscription> mSubscriptionRef;
        private final Publisher mPublisher;
        private final Object mLock = new Object();

        PublisherLiveData(@NonNull final Publisher publisher) {
            mPublisher = publisher;
        }

        // the rest is below
}


The mSubscriptionRef holds the current active connection's Subscription and the mLock protects it so that a concurrent call to onSubscribe may not clash with the deactivation of the LiveData object. As we'll see in a short, this is not enough because the race can leave the upstream's Subscription from an old activation not cancelled and thus still posting to the LiveData object.

The onActivate event should subscribe to the source and relay its onNext events:

    @Override
    protected void onActive() {
        super.onActive();

        mPublisher.subscribe(new Subscriber<T>() {
            @Override
            public void onSubscribe(Subscription s) {
                // Don't worry about backpressure. If the stream is too noisy then
                // backpressure can be handled upstream.
                synchronized (mLock) {
                    s.request(Long.MAX_VALUE);
                    mSubscriptionRef = new WeakReference<>(s);
                }
            }

            @Override
            public void onNext(final T t) {
                postValue(t);
            }

            @Override
            public void onError(Throwable t) {
                synchronized (mLock) {
                    mSubscriptionRef = null;
                }
                // Errors should be handled upstream, so propagate as a crash.
                throw new RuntimeException(t);
            }

            @Override
            public void onComplete() {
                synchronized (mLock) {
                    mSubscriptionRef = null;
                }
            }
        });
    }


Here, apart from the mSubscriptionRef management, the only small mistake is the call to request() from within the lock itself. A bigger mistake is that onError should not throw.

The onInactive should cancel any existing Subscription if the Subscription representing that connection has been received:


    @Override
    protected void onInactive() {
        super.onInactive();
        synchronized (mLock) {
            WeakReference<Subscription> subscriptionRef = mSubscriptionRef;
            if (subscriptionRef != null) {
                Subscription subscription = subscriptionRef.get();
                if (subscription != null) {
                    subscription.cancel();
                }
                mSubscriptionRef = null;
            }
        }
    }


Unfortunately, the call to onSubscribe is not guaranteed to happen on the same thread the Publisher.subscribe() is invoked. Even though RxJava's Flowable almost always does so, other Reactive-Streams implementations are free to delay the call to onSubscribe and issue it from any thread. (We deal with such situations in Flowable via the deferred requesting/cancellation mechanism.)

Consider the following scenario. We know onActive and onInactive execute on the main thread thus they are serialized in respect to each other. Let's assume a Publisher will signal onSubscribe in a delayed manner when subscribed to. The main thread executes onActive that calls subscribe() on the Publisher. Then the main thread executes onInactive() after. Since the Publisher hasn't called onSubscribe yet, there is no mSubscriptionRef and thus there is nothing to cancel. The onSubscribe() is then invoked by the Publisher and it starts streaming events (to nobody practically). Now the main thread executes onActive again, triggering yet another subscription to the Publisher. After onSubscribe() is called again, the previous Subscription is overwritten and there are now twice as many onNext events posted to the LiveData (these could be duplicates in case the Publisher is hot/multicasting, or arbitrary items from various stages of a cold Publisher). Since the previous Subscription is lost, it is not possible to stop the first subscription other than letting it terminate on its own pace.

One possible solution is to host the Subscriber in an AtomicReference inside the custom LiveData, plus have that Subscriber store the incoming Subscription in an AtomicReference too. Therefore, an onInactive can get rid of the previous Subscriber and at the same time, instruct it to cancel the Subscription incoming through onSubscribe without requesting from it:


    static final class PublisherLiveData<T> extends LiveData<T> {
        final Publisher<T> mPublisher;
        final AtomicReference<SubscriberLiveData> mSubscriber;

        PublisherLiveData(@NonNull final Publisher<T> publisher) {
            mPublisher = publisher;
            mSubscriber = new AtomicReference<>();
        }

        @Override
        protected void onActive() {
            super.onActive();
            SubscriberLiveData s = new SubscriberLiveData();
            mSubscriber.set(s);
            mPublisher.subscribe(s);
        }

        @Override
        protected void onInactive() {
            super.onInactive();
            SubscriberLiveData s = mSubscriber.getAndSet(null);
            if (s != null) {
                s.cancel();
            }
        }

        final class SubscriberLiveData 
        extends AtomicReference<Subscription>
        implements Subscriber<T>, Subscription {
            @Override
            public void onSubscribe(Subscription s) {
                if (compareAndSet(null, s)) {
                    s.request(Long.MAX_VALUE);
                } else {
                    s.cancel();
                }
            }

            @Override
            public void onNext(T item) {
                postValue(item);
            }

            @Override
            public void onError(Throwable ex) {
                lazySet(this);
                mSubscriber.compareAndSet(this, null);
                ex.printStackTrace();
            }

            @Override
            public void onComplete() {
                lazySet(this);
                mSubscriber.compareAndSet(this, null);
            }

            @Override
            public void cancel() {
                Subscription s = getAndSet(this);
                if (s != null && s != this) {
                    s.cancel();
                }
            }
            
            @Override
            public void request(long n) {
                // never called
            }
        }
    }

Although onActive and onInactive would update mSubscriber from the main thread, the atomics is preferable since we want to set the current Subscriber to null once the Publisher terminated, thus playing nice with the GC. (For comparison, the refCount in RxJava is more involved because connection and disconnection may be triggered from any thread at any time.)


Conclusion


The LiveData class can be considered a classical (gen 0) reactive-push component which has practically one purpose on Android: notify its Observers of data (changes) on the main thread. If the typical thread crossing in the app is mainly from a background thread to the main thread, LiveData may be just enough to suit one's needs. It's one step above Agera after all since LiveData actually transmits data to be acted upon, not just "something happened" signals. However, if possibly failing and finite sources are present, which also have to be coordinated, transformed and kept on background thread(s) as long as possible, I believe RxJava is a better choice.

Nevertheless, there are standard interoperation bridges provided with LiveData which would allow it to work with or pose as a Reactive-Streams Publisher, but unfortunately, the implementation of this bridge has concurrency flaws that could be easily fixed by a PR, if there was an open repository for the library (because you'd want to post unit tests as well).

2017. szeptember 11., hétfő

Interoperation between RxJava and Kotlin Coroutines

Introduction


Writing imperative-looking code with Kotlin Coroutines is certainly an attractive property of it, but I'd think things can get quite convoluted pretty fast once, for example, Selectors are involved.

I haven't gotten there to look at what Selectors are, I only read that they can help you implement a flatMap like stream combiner. We are not goind to do that now, RxJava can do it for us after all.

However, the reasonable question arises: if I have a coroutine generator, a coroutine transformation or simply want to receive items from a Flowable, how can I make RxJava work with these coroutines?

Easily with the combined magic of Kotlin Coroutines and RxJava coroutines!


Suspendable Emitter


A generator is a source-like construct that emits items followed by a terminal signal. It should be familiar from RxJava as the Flowable.generate() operator. It gives you a FlowableEmitter and the usual onNext, onError and onComplete calls on it.

One limitation is that you can call onNext only once per invocation of your (Bi)Consumer lambda that receives the emitter. The reason is that we can't block a second call to onNext and we don't want to buffer it either; therefore, RxJava cooperates with the developer.

Compiler supported suspension and state machine built by it, however, allow us to prevent a second call from getting through by suspending it until there is a demand from the downstream, which then resumes the coroutine where it left off. Therefore, we can lift the single onNext requirement for our Coroutine-based generator.

So let's define the SuspendEmitter interface


interface SuspendEmitter<in T> : CoroutineScope {

    suspend fun onNext(t: T)

    suspend fun onError(t: Throwable)

    suspend fun onComplete()
}


By extending the CoroutineScope, we provide useful infrastructure (i.e., coroutineContext, isActive) to the block that will target our SuspendEmitter. One can argue that why use onError and onComplete since a coroutine can throw and simply end. The reason is that this way, a coroutine can terminate the sequence from a transformation we'll see later, just like our recent mapFilter operator allows it.


The flow-producer

Given our context providing interface for a generator coroutine, let's define the generator method the user will call:


fun <T> produceFlow(generator: suspend SuspendEmitter.() -> Unit) : Flowable<T> {
    return Produce(generator)
}


(For those unfamiliar with the Kotlin syntax, the SuspendEmitter.() -> Unit is practically a one parameter lambda of signature (param: SuspendEmitter) -> Unit where, when the lambda is implemented, accessing methods of param do not need to be qualified by it, thus you can write onNext(1) instead of param.onNext(1).)

We have to implement a Flowable that interacts with a suspendable generator function in some fashion. When implementing source-like operators, one usually has to write a Subscription instance and call Subscriber.onSubscribe() with it.

class Produce<T>(private val generator: suspend SuspendEmitter<T>.() -> Unit) : 
        Flowable<T>() {
    override fun subscribeActual(s: Subscriber<in T>) {
        launch(Unconfined) {
            val parent = ProduceSubscription(s)
            parent.setJob(coroutineContext[Job])
            s.onSubscribe(parent)
            generator(parent)
        }
    }
}


Since the generator is a suspendable coroutine, we need a context where it can run. The Unconfined context gives us a trampolined execution environment where resumptions of suspended coroutines are not confined to any particular thread, as if you'd run with the trampoline() Scheduler in RxJava.

We create our Subscription, attach the Job of the coroutine context itself to bridge the cancellation from a downstream Subscription.cancel(), signal the custom Subscription to the downstream and then execute the provided producer block by supplying it the parent which also implements SuspendEmitter.

So far, nothing is too hairy or convoluted, however, the interaction between regular trampolined coroutines of RxJava and the Kotlin Coroutine infrastructure is more involved.

Non-blocking await/notify

We will need a way to get the generator coroutine suspended if there are no downstream requests and we have to resume that coroutine when the downstream does request an amount. This resembles the wait-notify pair of a typical BlockingQueue implementation where a blocking emission due to a full queue gets unblocked by a notification by a concurrent take()/poll() invocation. Since we don't want to block and the coroutine infrastructure supports programmatic resuming of a coroutine, we'll use this feature in two helper methods establishing a non-blocking wait-notify exchange:


typealias Cont = Continuation<Unit>

fun notify(ref: AtomicReference<Cont?>) {
    while (true) {
        val cont = ref.get()
        val next : Cont?
        if (cont != null && cont != TOKEN) {
            if (ref.compareAndSet(cont, null)) {
                cont.resume(Unit)
                break
            }
        } else {
            if (ref.compareAndSet(cont, TOKEN)) {
                break;
            }
        }
    }
}


We will use a valueless Continuation<Unit>, Cont for short, and atomics to place an indicator or an actual continuation object in an AtomicReference. The notify() atomically performs the following logic: if there is a real continuation in the reference, we clear it and then call resume on it to trigger the resumption. Otherwise, we set it to the shared TOKEN object indicating that when the other side, await, wanted to get continued, it can do so immediately.

fun await(ref: AtomicReference<Cont?>, cont: Cont) {
    while (true) {
        val a = ref.get()
        if (a == TOKEN) {
            if (ref.compareAndSet(a, null)) {
                cont.resume(Unit)
                break
            }
        } else {
            if (ref.compareAndSet(a, cont)) {
                break;
            }
        }

    }
}


The await() method uses the same reference and the continuation instance provided by a suspendCoroutine in its code block.The method atomically checks if there is a TOKEN and if so, it calls resume on the continuation parameter after clearing the TOKEN from the reference. Otherwise, it stores the continuation in the reference and quits.

val TOKEN: Cont = object: Cont {
    override val context: CoroutineContext
        get() = throw UnsupportedOperationException()

    override fun resume(value: Unit) {
        throw UnsupportedOperationException()
    }

    override fun resumeWithException(exception: Throwable) {
        throw UnsupportedOperationException()
    }

}


Finally, the TOKEN is just an empty implementation of a Continuation - we should never call its methods as the object reference itself serves only a purpose of indicator for an immediate resumption.



The ProduceSubscription  

Now we can implement the ProduceSubscription class. First, let's see the skeleton with the relevant fields:

open class ProduceSubscription<T>(
        private val actual: Subscriber<in T>,
        private val ctx : CoroutineContext
) : Subscription, SuspendEmitter<T> {

    companion object {
        val CANCELLED = Object()
    }

    @Suppress("DEPRECATION")
    override val context: CoroutineContext
        get() = ctx!!

    override val isActive: Boolean
        get() = job.get() != CANCELLED

    private val job = AtomicReference<Any>()

    private val requested = AtomicLong()

    private val resume = AtomicReference<Cont?>()

    private var done: Boolean = false

    override suspend fun onNext(t: T) {
        // TODO implement
    }

    override suspend fun onError(t: Throwable) {
        // TODO implement
    }

    override suspend fun onComplete() {
        // TODO implement
    }

    override fun cancel() {
        // TODO implement
    }

    override fun request(n: Long) {
        // TODO implement
    }

    fun setJob(j: Job?) {
        // TODO implement
    }
}

We see the methods of both Subscription and SuspendEmitter along with a couple of fields/properties:


  • It takes the downstream's Subscriber and the CoroutineContext it will provide to the produce callback in the operator.
  • We will use the companion object's CANCELLED value to indicate the the parent job we get from the coroutineContext is cancelled exactly once.
  • It considers being active when the job object is not the CANCELLED indicator
  • Of which Job is then stored in the job AtomicReference.
  • We have to track the requested amount from downstream via an AtomicLong.
  • The resume AtomicReference stores the continuation to be used with the non-blocking await-notify shown in the previous section.
  • Finally, we have the done flag indicating the generator coroutine called onError or onComplete at most once.
Perhaps the main difficulty lies in the implementation of the onNext method as it is the primary interaction point between a coroutine that has to be suspended if there are no requests:


    override suspend fun onNext(t: T) {
        if (job.get() == CANCELLED) {
            suspendCoroutine<Unit> { }
        }
        val r = requested.get()
        if (r == 0L) {
            suspendCoroutine<Unit> { cont -> await(resume, cont)  }
        }

        actual.onNext(t)

        if (job.get() == CANCELLED) {
            suspendCoroutine<Unit> { }
        }
        if (resume.get() == TOKEN) {
            resume.compareAndSet(TOKEN, null)
        }
        if (r != Long.MAX_VALUE) {
            requested.decrementAndGet()
        }
    }


First we check if the downstream has cancelled the generator in which case we should get out of the coroutine entirely. I'm not sure if there is a more appropriate way for doing this other than suspending indefinely.

Next, we check the request amount and if it is zero, we suspend the current coroutine by using our non-blocking await mechanism. Once notified, or there was at least one requested item, the code should continue with the emission of the item. This could trigger an in-sequence cancellation and we suspend the coroutine indefinitely again.

Since the downstream can immediately request some amount due to the s.onSubscribe(parent) call in the operator, before the generator can even run and call onNext, we may have a TOKEN in the resume field, that would otherwise incorrectly indicate the next call to await it can resume immediately, violating the backpressure we expect. I know this sounds convoluted, but I learned it the hard way...

Finally, we decrement the request amount if not unbounded.

The onError and onComplete look pretty much alike:


    override suspend fun onError(t: Throwable) {
        if (!done) {
            done = true
            actual.onError(t)
            cancel()
        }
        suspendCoroutine<Unit> { }
    }

    override suspend fun onComplete() {
        if (!done) {
            done = true
            actual.onComplete()
            cancel()
        }
        suspendCoroutine<Unit> { }
    }


We set the done flag to true, emit the relevant event to the downstream and then cancel the job/Subscription we are running with. I defensively suspend the coroutine afterwards.

Next we see how cancel() and setJob() works:

    override fun cancel() {
        val o = job.getAndSet(CANCELLED)
        if (o != CANCELLED) {
            (o as Job).cancel()
        }
    }

    fun setJob(j: Job?) {
        while (true) {
            val o = job.get()
            if (o == CANCELLED) {
                j?.cancel()
                break
            }
            if (job.compareAndSet(o, j)) {
                break
            }
        }
    }


They are pretty much implemented along RxJava's typical deferred cancellation mechanism. cancel() atomically swaps in the CANCELLED indicator and calls cancel on the Job it contained. setJob() atomically set the Job instance or cancels it if cancel() swapped in the CANCELLED indicator just before that.

Lastly, the request() implementation that is responsible for accounting downstream requests and resuming the suspended generator if inside onNext().

    override fun request(n: Long) {
        if (BackpressureHelper.add(requested, n) == 0L) {
            notify(resume)
        }
    }


In the RxJava world, a transition from 0 to n triggers the emission loop in a range() operator for example. Here, we notify a possibly suspended coroutine that will resume from the await() method we implemented.

Testing it is simple with RxJava:


val f = produceFlow {
    for (i in 0 until 10) {
         println("Generating $i")
         onNext(i)
    }
    onComplete()
}

f.test(0)
.assertEmpty()
.requestMore(5)
.assertValues(0, 1, 2, 3, 4)
.requestMore(5)
.assertResult(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)


Outstanding!

The flow-transformer

Now that we have a way to emit items, we would like to emit an item in response to an upstream value, like the map() operator but with a suspendable coroutine function. RxJava's map is confined to return one item in exchange for one upstream item.

With coroutines and the ProduceSubscription described in the previous section, we could emit any number of items without overflowing a Subscriber!

Let's define our API and a skeleton implementation for it first:


fun <T, R> Flowable<T>.transform(
        transformer: suspend SuspendEmitter<R>.(T) -> Unit)
 : Flowable<R> {
    return Transform(this, transformer)
}

class Transform<T, R>(
        private val source: Flowable<T>, 
        private val transformer: suspend SuspendEmitter<R>.(T) -> Unit)
 : Flowable<R>() {
    override fun subscribeActual(s: Subscriber<in R>) {
        // TODO implement
    }
}


We define a transform extension method on Flowable with a suspendable transformer that takes our SuspendEmitter, the upstream's value and returns nothing.

This time, we have an upstream we have to subscribe to via a regular FlowableSubscriber from RxJava, call the coroutine in some way and make sure we keep calling the upstream for more values as we have to deal with the backpressure of the coroutine itself transitively.

The first step into this direction is the handling of the upstream's own Subscription we get through Subscriber.onSubscribe. We have to attach that to the Subscription we show to the downstream Subscriber. Since we will use the ProduceSubscription anyway, we extend it and override its cancel() for this purpose:


class ProduceWithResource<T>(
        actual: Subscriber<in T>,
        ctx : CoroutineContext
) : ProduceSubscription<T>(actual, ctx) {
    private val resource = AtomicReference<Subscription>()

    fun setResource(s: Subscription) {
        SubscriptionHelper.replace(resource, s)
    }

    override fun cancel() {
        SubscriptionHelper.cancel(resource)
        super.cancel()
    }
}


We simply use the deferred cancellation helper for Subscriptions.

Now let's see how we can prepare the context for running the coroutine inside the transform operator's subscribeActual() method:

    val ctx = newCoroutineContext(Unconfined)
    val parent = ProduceWithResource(s, ctx)
    s.onSubscribe(parent)
    source.subscribe(object: FlowableSubscriber<T> {

        var upstream : Subscription? = null

        val wip = AtomicInteger()
        var error: Throwable? = null

        override fun onSubscribe(s: Subscription) {
            // TODO implement
        }

        override fun onNext(t: T) {
            // TODO implement
        }

        override fun onError(t: Throwable) {
            // TODO implement
        }

        override fun onComplete() {
            // TODO implement
        }
    })


First we create an unconfinded context where each invocation of the transformer coroutine will execute and suspend in. We create the producer that can hold an additional Subscription and send it to the downstream Subscriber. Finally, we subscribe to the upstream with a FlowableSubscriber.

In this custom FlowableSubscriber, we will have request from upstream, thus we save the Subscription we'll get from it. The wip and error fields will be used to achieve something similar to a half-serialization. I'll explain it once the methods are implemented.

Handling onSubscribe() is straightforward and typical for an RxJava operator:


    override fun onSubscribe(s: Subscription) {
        upstream = s
        parent.setResource(s)
        s.request(1)
    }


We store the upstream's subscription locally and in the ProducerWithResource to link up the cancellation across the operator. Then we request one item; this is partly due to simplifying the interaction between a suspended coroutine and the upstream producer. Using larger prefetch would require the use of some intermediate queue - possible, but left for the reader as an exercise. (Finally, we found a use for request(1)!)

Next, onNext():

    override fun onNext(t: T) {
        launch(ctx) {
           parent.setJob(coroutineContext[Job])

           wip.getAndIncrement()

           transformer(parent, t)

           if (wip.decrementAndGet() == 0) {
               upstream!!.request(1)
           } else {
               val ex = error;
               if (ex == null) {
                   s.onComplete()
               } else {
                   s.onError(ex)
               }
               parent.cancel()
           }
       }
    }

First, the Job of the actual coroutineContext has to be stored so a downstream cancellation can can call its Job.cancel() method. We have to do this because we will go in and out of the launch() when the upstream sends an item.

Next, the wip counter is incremented, which may seem odd. The reason for this is that if the transformer coroutine gets suspended, the execution returns to the caller of onNext(), a regular RxJava producer of some sorts. If this producer has reached its end, it will call onError or onComplete as these can be issued without request. As we'll see a bit later, forwarding these signals cuts out any pending emission from the suspended coroutine, therefore, we use the pattern of a half-serializer to save this terminal indication.

The transformer is executed with the parent ProducerWithResource instance that handles the suspendable onNext emissions towards the downstream.

Once the transformer's job has been done, the execution (resumes) with the atomic decrement of the wip counter. If it successfully decrements to 0, there was no terminal event signalled from the upstream while the transformer was suspended, thus we can request the next item from the upstream RxJava source.

The onError and onComplete are much simpler fortunately:


    override fun onError(t: Throwable) {
        error = t
        if (wip.getAndIncrement() == 0) {
            s.onError(t)
            parent.cancel()
        }
    }

    override fun onComplete() {
        if (wip.getAndIncrement() == 0) {
            s.onComplete()
            parent.cancel()
        }
    }


We store the Throwable (in onError only), then atomically increment the wip counter. If there was no ongoing coroutine, we are safe to emit the terminal event and cleanup/cancel the contextual Job we may still be referencing. If the original wip value was 1, the increment bumps it to 2 and the decrement in onNext() will detect the terminal condition and act accordingly.

Let's test it (by reusing the generator for fun)!

    f.transform({
        if (it % 2 == 0) {
            onNext(it)
        }
    })
    .test()
    .assertResult(0, 2, 4, 6, 8)

    f.transform({
        onNext(it)
        onNext(it + 1)
    })
    .test()
    .assertResult(0, 1, 1, 2, 2, 3, 3, 4, 4,
            5, 5, 6, 6, 7, 7, 8, 8, 9, 9, 10)

    f.transform({
        launch(CommonPool) {
            onNext(it + 1)
        }
    })
    .test()
    .awaitDone(5, TimeUnit.SECONDS)
    .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)


We can filter or amplify a source, synchronously or asynchronously if necessary with a single operator! Excellent!

The receiver

The last operation we'd do is, given a Flowable flow, we'd like to return to the coroutine world and consume the flow. For that, a ReceiverChannel seems to be appropriate output type as it can be for-each looped nicely.

Let's define the extension method toReceiver() with a skeleton as well:


suspend fun <T> Flowable<T>.toReceiver(capacityHint: Int = 128) : ReceiveChannel<T> {
    val queue = Channel<T>(capacityHint)

    val upstream = AtomicReference<Subscription>()
    val error = AtomicReference<Throwable>()
    val wip = AtomicInteger()

    subscribe(object: FlowableSubscriber<T> {

        override fun onSubscribe(s: Subscription) {
            // TODO implement
        }

        override fun onNext(t: T) {
            // TODO implement
        }

        override fun onComplete() {
            // TODO implement
        }

        override fun onError(t: Throwable) {
            // TODO implement
        }

    })

    return // TODO implement
}


First, a Channel of type T and the given capacity is created. It is followed by the AtomicReference that will hold the source Flowable's Subscription, which will have to be linked up with the consumer to propagate cancellation. Next, since the upstream may signal terminal events while the channel is suspended in a send() we'll use - similar to the ProducerWithResource.onNext() situation, we will use the same AtomicInteger-based technique. The error AtomicReference will serve as the intermediary when handing over the terminal event to the channel.

Let's see the FlowableSubscriber implementation first:

        override fun onSubscribe(s: Subscription) {
            if (SubscriptionHelper.setOnce(upstream, s)) {
                s.request(1)
            }
        }

        override fun onNext(t: T) {
            launch (Unconfined) {
                wip.getAndIncrement()

                queue.send(t);

                if (wip.decrementAndGet() == 0) {
                    upstream.get().request(1)
                } else {
                    queue.cancel(error.get());
                }
            }
        }

        override fun onComplete() {
            if (wip.getAndIncrement() == 0) {
                launch(Unconfined) {
                    queue.cancel();
                }
            }
        }

        override fun onError(t: Throwable) {
            error.lazySet(t)
            if (wip.getAndIncrement() == 0) {
                launch(Unconfined) {
                    queue.cancel(t);
                }
            }
        }


The FlowableSubscriber implementation, practically, performs the same bookeeping as the transformer() operator did, with the exception that the closing of the channel has to happen in a launch-provided context.

However, this is only the producer half of the channel, we still need the consumer part, more specifically, the consumer-reemitter. Luckily, the build in produce() operator of the Coroutines library help with it. Why not return the channel directly? Because we need a way to detect if the channel is closed from the consumer's end and Channel doesn't allow us to register a completion handler for it. However, the Job inside the coroutineContext of produce() does:

    return produce(Unconfined) {
        coroutineContext[Job]?.invokeOnCompletion { 
            SubscriptionHelper.cancel(upstream) 
        }

        for (v in queue) send(v)
    }


Let's test this last operator:

runBlocking {
    for (i in f.toReceiver()) {
         println(i)
    }
    println("Done")

    for (i in f.subscribeOn(Schedulers.single()).toReceiver()) {
         println("Async $i")
    }
    println("Async Done")
}


Well done!

Conclusion

In this blog post, I demonstrated how one can write three operators, produceFlow, transform and toReceiver, that can interoperate with RxJava's own, backpressure enabled Flowable type reasonably well.

This should prove that both technologies, at the end, can be combined by the developer as seen fit for the target domain or business requirements.

This was somewhat a heated week for me so for now, until something interesting comes up in this topic, me writing about Kotlin Coroutines will be ... suspended.

Java 9 Flow API: ordered merge

Introduction

Sometimes, one has several ordered sequences of events and would like to merge them into one single flow. Since one element from a sequence should come before another element in another sequence, we need a way to keep comparing elements with each other from different sequences.

Unfortunately, zip() doesn't work because it takes a row of available items and item #2 from sequence #2 may come before item #1 from stream #3. Plus, if one stream is shorter than the others, the end sequence stops. Similarly, flatMap() doesn't work because it takes the next item from any inner source sequence the moment it is available without any ordering considerations at that point. At least it emits all items from all sources (provided there are no errors of course).

Therefore, we need something between the two operators: one that collects up a row of items from the sources, decides which is the smallest/largest of them based on some comparison logic and only emits that. It then awaits a fresh item from that specific source (or completion) and repeats the picking of the smallest/largest item as long as there are requests for it.

Such operator, let's call it orderedMerge(), has an implication about the number of its inner source sequences: it has to be fixed. The reason for it is that it has to pick the smallest/largest of the available items in order for the output to be in order. If there is still a source missing, it can't know for sure the others are smaller/larger that any of the upcoming item from that missing source will produce.

The second implication is, what happens if the sources themselves are not ordered? The logic presented in this post still works, but the end output won't be totally ordered. It will act like some priority queue instead: picking important items first before turning to less important ones.


The inner consumer

Operators handling multiple sources often need a way to prefetch item from these sources and give out them on demand to some joining logic. This mainly happens by prefetching a fixed amount, putting items in a queue, calling the parent coordinator's drain() method, and batching out replenishing calls from the coordinator if the so-called stable-prefetch backpressure is required.

For this purpose, let's see how the inner consumer, OrderedMergeInnerSubscriber, of orderedMerge() could look like:


static final class OrderedMergeInnerSubscriber<T> 
extends AtomicReference<Flow.Subscription>
implements Flow.Subscriber<T>, Flow.Subscription {

    final OrderedMergeCoordinator<T> parent;

    final int prefetch;

    final int limit;

    final Queue<T> queue;

    int consumed;

    volatile boolean done;

    OrderedMergeInnerSubscriber(
        OrderedMergeCoordinator<T> parent,
        int prefetch
    ) {
        this.parent = parent;
        this.prefetch = prefetch;
        this.limit = prefetch - (prefetch >> 2);
        this.queue = new ConcurrentLinkedQueue<>()
    }

    @Override
    public void onSubscribe(Flow.Subscription s) {
        // TODO implement
    }

    @Override
    public void onNext(T item) {
        // TODO implement
    }

    @Override
    public void onError(Throwable throwable) {
        // TODO implement
    }

    @Override
    public void onComplete() {
        // TODO implement
    }

    @Override
    public void request(long n) {
        // TODO implement
    }

    @Override
    public void cancel() {
        // TODO implement
    }
}

We'll need a reference to the coordinator of the operator, the prefetch amount that we will use the 75% of for replenishing requests, a queue - ConcurrentLinkedQueue for simplicity, but bounded SpscArrayQueue from JCTools works here as well, and a counter of how many items have been consumed so far to know when to replenish.


    @Override
    public void onSubscribe(Flow.Subscription s) {
        if (compareAndSet(null, s)) {
            s.request(prefetch);
        } else {
            s.cancel();
        }
    }

    @Override
    public void onNext(T item) {
        queue.offer(item);
        parent.drain();
    }

    @Override
    public void onError(Throwable throwable) {
        parent.onInnerError(this, throwable);
    }

    @Override
    public void onComplete() {
        done = true;
        parent.drain();
    }

    @Override
    public void request(long n) {
        int c = consumed + 1;
        if (c == limit) {
            consumed = 0;
            Flow.Subscription s = get();
            if (s != this) {
                s.request(c);
            }
        } else {
            consumed = c;
        }
    }

    @Override
    public void cancel() {
        Flow.Subscription s = getAndSet(this);
        if (s != null && s != this) {
            s.cancel();
        }
    }


I'd say there is nothing too complicated here.

  • onSubscribe() saves the upstream Flow.Subscription in the AtomicReference the operator extends if not already cancelled. If successful, the prefetch amount is requested.
  • onNext() stores the item in the queue and calls drain() on parent to handle that case.
  • onError() defers the error signal to be handled by the parent coordinator: the parent may save up the errors or cancel the whole flow at once.
  • onComplete() sets the complete indicator, which tells the parent this particular source will not produce more values and thus can be skipped when looking for the next smallest/largest items to emit
  • request() will only be called by the parent to replenish one item from its perspective once the previous item has been successfully chosen as the next item to be emitted towards downstream. Since replenishing one by one is costly, we batch up those via the consumed counter. Once that counter reaches the limit (75% of prefetch), a request is issued to the upstream. Since the AtomicReference will hold itself as a cancellation indicator, we don't want to call request on ourselves. It's important to state that request() will be guaranteed to be called from one thread at a time by the virtue of the queue-drain approach the coordinator implements below.
  • cancel() atomically swaps in the this as a terminal indicator and cancels the non-null, non-this Flow.Subscription if present.

The coordinator

Since there is no primary source in this orderedMerge() operator, it acts somewhat like a plain source of events. Therefore, we have to implement it on top of the Flow.Subscription to interact with the downstream. For convenience for this blog, we'll define the operator to take a variable number of Flow.Publisher sources (which at runtime ways ends up a fixed-size array):


@SafeVarargs
public static <T> Flow.Publisher<T> orderedMerge(
        Comparator<? super T> comparator, 
        int prefetch,
        Flow.Publisher<? extends T>... sources) {
    return new OrderedMergePublisher<>(sources, prefetch, comparator);
}

final class OrderedMergePublisher<T> implements Flow.Publisher<T> {

    final Flow.Publisher<? extends T>[] sources;

    final int prefetch;

    final Comparator<? super T> comparator;

    OrderedMergePublisher(
            Flow.Publisher<? extends T>[] sources,
            int prefetch,
            Comparator<? super T> comparator) {
        this.sources = sources;
        this.prefetch = prefetch;
        this.comparator = comparator;
    }

    @Override
    public void subscribe(Flow.Subscriber<? super T> s) {
         // TODO implement
    }
}


The boilerplate of writing an operator is nothing special: save up on the parameters to be used by the implementation. We allow customization via a Comparator interface. If T is self-comparable, you can use Comparators.naturalOrder() from Java itself.

The coordinator implementation has to hold onto the inner OrderedMergeInnerSubscribers for mass cancellation support, subscribe them to the sources and work out the emissions from them. Let's see the non-exciting parts of it:

static final class OrderedMergeSubscription<T>
extends AtomicInteger implements Flow.Subscription {

    final Flow.Subscriber<? super T> downstream;

    final OrderedMergeInnerSubscriber<T>[] subscribers;

    final Comparator<? super T> comparator;

    final Object[] values;

    static final Object DONE = new Object();

    Throwable error;

    boolean cancelled;

    long requested;

    long emitted;

    // -------------------------------------------------

    static final VarHandle ERROR;
 
    static final VarHandle DONE;
 
    static final VarHandle CANCELLED;
 
    static final VarHandle REQUESTED;

    static {
        Lookup lk = MethodHandles.lookup();
        try {
            ERROR = lk.findVarHandle(
                OrderedMergeSubscription.class, "error", Throwable.class);
            CANCELLED = lk.findVarHandle(
                OrderedMergeSubscription.class, "cancelled", boolean.class);
            REQUESTED = lk.findVarHandle(
                OrderedMergeSubscription.class, "requested", long.class);
        } catch (Throwable ex) {
            throw new InternalError(ex);
        }
    }

    OrderedMergeSubscription(
            Flow.Subscriber<? super T> downstream,
            Comparator<? super T> comparator,
            int prefetch,
            int n) {
        this.downstream = downstream;
        this.comparator = comparator;
        this.subscribers = new OrderedMergeInnerSubscriber[n];
        for (int i = 0; i < n; i++) {
             this.subscriber[i] = new OrderedMergeInnerSubscriber<>(this, prefetch);
        }
        this.values = new Object[n];
    }

    void subscribe(Flow.Publisher<? extends T>[] sources) {
        // TODO implement
    }
     
    @Override
    public void request(long n) {
        // TODO implement
    }

    @Override
    public void cancel() {
        // TODO implement
    }

    void drain() {
        // TODO implement
    }

    void onInnerError(OrderedMergeInnerSubscriber<T> sender, Throwable ex) {
        // TODO implement
    }

    void updateError(Throwable ex) {
        // TODO implement
    }
}

We have a couple of fields and methods here, some should be familiar in its naming and intended purpose:


  • downstream will receive the ordered sequence of items
  • subscribers holds onto the fixed set of inner OrderedMergeInnerSubscribers, each will be subscribed to a particular Flow.Publisher and the total number of them won't ever change in this operator.
  • comparator will compare elements from various sources
  • values holds onto the next available value from each source. This allows the merging algorithm to work with queues that don't support peek() (such as RxJava 2's on queue implementations) and otherwise has nice properties such as locality, avoiding accessing internals of the inner subscribers' queues and the overhead of a peek()-poll() pair all the time.
  • The DONE constant will indicate a particular source has no further elements and can be ignored (without looking at its subscriber).
  • error will gather the errors signalled by the sources and emitted together once all sources terminated. There is an ERROR VarHandle for concurrent access to this field.
  • cancelled indicates the downstream has issued a cancel() call to stop the flow. The CANCELLED VarHandle will allow us to use compareAndSet() to cancel at most once.
  • requested accumulates the requests done by the downstream via its REQUESTED VarHandle.
  • emitted counts how many items were emitted and will be compared against requested to detect when to pause emitting.
There is no separate done indicator field because we will deduce this state by detecting that all values items are marked as DONE.

Now let's see the shorter methods implemented:


    // ...

    void subscribe(Flow.Publisher<? extends T>[] sources) {
        for (int i = 0; i < sources.length; i++) {
            sources[i].subscribe(subscribers[i]);
        }
    }
     
    @Override
    public void request(long n) {
        if (n <= 0L) {
            updateError(new IllegalArgumentException("non-negative request expected"));
        } else {
            for (;;) {
                long current = (long)REQUESTED.getAcquire(this);
                long next = current + n;
                if (next < 0L) {
                    next = Long.MAX_VALUE;
                }
                if (REQUESTED.compareAndSet(this, current, next)) {
                    break;
                }
            }
        }
        drain();
    }

    @Override
    public void cancel() {
        if (CANCELLED.compareAndSet(this, false, true)) {
            for (OrderedMergeInnerSubscriber<T> inner : subscribers) {
                inner.cancel();
            }

            if (getAndIncrement() == 0) {
                Arrays.fill(values, null);

                for (OrderedMergeInnerSubscriber<T> inner : subscribers) {
                    inner.queue.clear();
                }
            }
        }
    }

    void onInnerError(OrderedMergeInnerSubscriber<T> sender, Throwable ex) {
        update(ex);
        sender.done = true;
        drain();
    }

    void updateError(Throwable ex) {
        for (;;) {
            Throwable current = (Throwable)ERROR.getAcquire(this);
            Throwable next;
            if (current == null) {
                next = throwable;
            } else {
                next = new Throwable();
                next.addSuppressed(current);
                next.addSuppressed(throwable);
            }
            if (ERROR.compareAndSet(this, current, next)) {
                break;
            }
        }
    }

    void drain() {
        // TODO implement
    }

}


The subscribe() method simply subscribes to all sources with the prepared array of OrderedMergeInnerSubscribers. The cancel() method cancels all inner subscribers and then enters a half-open drain mode where both the values array and each queue of the inner subscribers is cleared in order to help the GC. Both request() and updateError() should be familiar from the previous post of the series.

What's left is the drain() logic itself.


void drain() {
    if (getAndIncrement() != 0) {
        return;
    }

    int missed = 1;
    Flow.Subscriber<? super T> downstream = this.downstream;

    Comparator<? super T> comparator = comparator;

    OrderedMergeInnerSubscriber<T>[] subscribers = this.subscribers;
    int n = subscribers.length;

    Object[] values = this.values;

    long e = emitted;

    for (;;) {
         long r = (long)REQUESTED.getAcquire(this);

         for (;;) {
              // TODO implement
         }

         emitted = e;
         missed = addAndGet(-missed);
         if (missed == 0) {
             break;
         }
    }
}


We start out with the usual drain-exclusion logic: transitioning the work-in-progress counter of this (by extending AtomicInteger) from zero to one allows one thread to enter and perform the emissions. We load the frequently accessed components into local fields and do an almost-classical for-loop with missed accounting to determine when to leave the loop.

Note that in the loop, after reading the current request amount we don't have the usual while (e != r) and if (e == r) cases. The reason for this is that we can have one shared loop for both the cases when backpressure is applied and when there are no further source items to merge and can terminate the sequence without a request from downstream.


// inner for(;;) {

if ((boolean)CANCELLED.getAcquire(this)) {
    Arrays.fill(values, null);

    for (OrderedMergeInnerSubscriber<T> inner : subscribers) {
        inner.queue.clear();
    }
    return;
}

int done = 0;
int nonEmpty = 0;
for (int i = 0; i < n; i++) {
    Object o = values[i]
    if (o == DONE) {
        done++;
        nonEmpty++;
    } else
    if (o == null) {
        boolean innerDone = subscribers[i].done;
        o = subscribers[i].queue.poll();
        if (o != null) {
            values[i] = o;
            nonEmpty++;
        } else if (innerDone) {
            values[i] = DONE;
            done++;
            nonEmpty++;
        }
    } else {
        nonEmpty++;
    }
}


The first part is check if there was a cancellation from downstream. If so, we clear the internal state of the coordinator and each queue then quit. Next for each subscriber of a source, we have to poll the next available item into the common values array if there is not already an item available there. In addition, we account how many of those items indicate a completed source and how many of them has actual items.

Note that checking a source for completion has to happen before polling for the next item from its queue. As explained before on this blog, since Flow.Subscriber methods are invoked in a strict protocol order where an onComplete always happens after any previous onNext calls, if we detect done to be true and then get a null from the queue, we know there can't be any further items from that source. Otherwise, polling and seeing an empty queue first then checking done opens a window when the source quickly produces items and completes between these two checks.

Next, we handle the overall state of the operator:

if (done == n) {
    Throwable ex = (Throwable)ERROR.getAcquire(this);
    if (ex == null) {
        downstream.onComplete();
    } else {
        downstream.onError(ex);
    }
    return;
}

if (nonEmpty != n || e == r) {
    break;
}


If all of the elements turn out to be DONE, that means we exhausted all sources and can terminate the downstream accordingly (considering if there was any error along the way). If not all values slot have something or the downstream is not ready to receive an item, we break out this inner look and the outer loop will see if more work has to be done or not.

Finally, we find the smallest item from the available values:


    T min = null;
    int minIndex = -1;

    int i = 0;
    for (Object o : values) {
        if (o != DONE) {
            if (min == null || comparator.compare(min, (T)o) > 0) {
                min = (T)o;
                minIndex = i;
            }      
        }
        i++;
    }

    values[minIndex] = null;

    downstream.onNext(min);

    e++;
    subscribers[minIndex].request(1);

} // of the inner for (;;)


Once we know there we can emit an item, we'll find the smallest one among the non-DONE entries along with its index. Since we checked that not all entries are DONE before, min must end up non-null and minIndex non-negative. We clear the appropriate values entry indicating the next cycle should poll for more items from that particular source, we emit the found minimum item, increment the emission counter and signal the winning source to produce one more item.

Conclusion

The orderedMerge() operator shown in this post is perhaps one of the shortest and more comprehensible among the other ones, even if considering the lack of infrastructure with Java 9 Flows (i.e., cancelled indicator). The queue-drain approach present in many of the typical reactive operators can be observed here as well.

Since the operator collects you a row of values available, it can be relatively easily turned into a zip() operator:


  • done != 0 indicates one or more sources run out of items thus the sequence can be completed. Note that the non-done inner subscribers have to be cancelled and cleared before the downstream gets the terminal event.
  • instead of the loop that compares items, one copies the values array, clears the original one, applies a function to the copy and emit the result of that function call.


You can also turn it into a (rather clumsy) merge() operator that uses a round-robin collection strategy to pick the item to be emitted: an index (also saved into a field for subsequent drain rounds) that indicates which next slot to consider for emission if nonEmpty != 0, skipping over the DONE entries along the way.

However, there is one likely problem that troubles such value-collecting-and-emitting operators; it does - what Stephane Maldini, Project Reactor lead, once called our RxJava 2 / Reactor-Core 3 algorithms do - thread-stealing. Given all the sources, one of them will be doing the collecting and emitting the smallest item for all the other sources while that thread itself likely won't make any progress unless it finds a small pause in the onslaught of source items so the drain loop can quit.

This may be undesirable at times and there is a solution for it. To get there, we will investigate how thread switching in mid-flow can be implemented within the Java 9 Flow API in the next post.