2015. június 16., kedd

Subjects (part 1)

Introduction


I have the feeling many would like to bury Subjects and I'm yet going to do a multi-post series about them.

Some consider them the mutable state of the reactive world, which I don't disagree with, but then they continue and say in big letters: don't use them while showing some more examples of using Observable.create():

Observable.create(s -> {
   int i = 0;
   while (true) {
       s.onNext(i++);
   }
}).subscribe(System.out::println);

What about unsubscription and backpressure? No amount of operators can fix that for this source, yet you can apply onBackpressureXXX strategies on a Subject at any time.

Subjects are by no means bad or broken, but as with other components of the reactive paradigm, one must learn when and how to use them. For those who are lecturing about them, they should reconsider when and how they introduce Subjects to their audience. I suggest introducing them after introducing regular fluent operators but before talking about create().

In this series, I'm going to try and introduce Subjects, detail their requirements and structure and show how one can build its own custom Subject.

Imperative eventing

Let's assume you'd like to emit your own events, but you can't be sure when those events will be fired or how many of them will be there. Clearly, just() and from() can't help here and you don't want to create() an Observable which spins on some state either.

The best would be an object that is both Observable, so clients can chain onto it, and Observer so you can emit values and terminal events as well. The combination of these two is what's called Subject (or Processor in Reactive-Streams naming).

You can think of them as a way of multicasting your values to interested parties without the hassle to handle thread-safety for the incoming Subscribers.

Subject changeEvents = ...

changeEvents.subscribe(System.out::println);
changeEvents.subscribe(System.out::println);

changeEvents.onNext("Added");
Thread.sleep(10);
changeEvents.onNext("Removed");
Thread.sleep(10);
changeEvents.onCompleted();

Subjects are generified and sometimes you want them to emit the same type as received or emit a completely different type. In C# you can define two kinds of it with different arities:

interface ISubject<T>: IObserver<T>, IObservable<T> { }

interface ISubject<T, U>: IObserver<T> IObservable<U> { }

however, Java's type erasure doesn't let you do this, therefore, RxJava went with the two-parameter version of it:

abstract class Subject<T, R>
extends Observable<R> implements Observer<T> { }

Since RxJava's fluent API entry point is a class itself, in order to retain the composition possibility, Subject has to extend Observable and implement Observer. Perhaps a more interesting option would be to extend Subscriber instead of Observer to gain some resource-management and backpressure options, but Java doesn't let you extend more than one base class either and having the composable methods available is more important.

So in the example above, you'd have:

Subject<String, String> changeEvents = ...

Subjects are also considered hot observables: they 'operate' even if you don't have a Subscriber listening into them, similar to how a radio station doesn't stop playing music if you turn off your radio; who is there is there, who isn't there isn't.

For the example, such a Subject in RxJava is called a PublishSubject:

Subject<String, String> changeEvents = PublishSubject.create();

Certainly, having a create() factory method helps to avoid the repetition of the type parameters, but why can't one just call new PublishSubject<>() like in Rx.NET?

The reason is how the fluent API is made possible by an Observable class. If you recall, you have to provide an OnSubscribe instance when you create an Observable via its create() method which internally will remember that instance to call call() on it whenever a Subscriber subscribes through subscribe().

Unlike cold Observables, Subjects have to track their clients (so everyone gets the same data) which tracking has to happen both inside the OnSubscribe and in the Subject body itself. Unfortunately, Java doesn't allow an inner class inside the constructor to access the parent class' fields, therefore, the shared state between the two has to be extracted into a separate class and presented to both of them. This is non-trivial therefore the whole Subject instantiation and setup process is hidden behind such factory methods like PublishSubject.create(). (Once we get to the Subject-construction, I'll explain this with the help of examples.)

Flavors

Sometimes, you don't just want to dispatch your events as you see fit and don't care about your subscriber audience.

For example, you are a TV network and just throwing out a large amount of TV series and their episodes on a weekly basis. However, your clients can't keep up with the sheer amount but they don't want to skip any of the episodes either. Therefore, their smart TV or the cable provider itself offers the capability to cache these episodes, starting at some point, and allow the subscriber to watch it in sequence in its own pace and without leaving out any.

In the programming world, you'd want to emit events and allow clients to run at a different pace but still receive all of your events, perhaps even after you stopped generating those events: a late Subscriber comes in and you'd like to replay all events that has been accumulated during the active times.

This is called a ReplaySubject.

By default, you can create an unbounded ReplaySubject which will cache everything it receives and replays it to Subscribers, including any terminal events.

However, some use cases require limiting the retention time or amount of the cached elements so later Subscribers don't get everything from the very beginning. The RxJava API offers three additional replay modes:

  • createWithSize(n) will retain at most n elements,
  • createWithTime(t, u) will retain elements that are younger that than t and
  • createWithTimeAndSize(n, t, u) will retain at most n elements which are also younger than t.
This could be enough, but there exist further cases that warrant for an unique Subject implementation.

For example, if one performs an asynchronous computation and wants to emit just a single value followed by a completion event. ReplaySubject does work but is too verbose and the overhead might be inacceptable for such a simple task. Therefore, RxJava offers another Subject called AsyncSubject. It remembers the very last element it received and once onCompleted is called, all currently listening and future listeners will receive just that single value followed by the completion event. But unlike ReplaySubject, if one calls onError on an AsyncSubject, any previously received value is ignored and all Subscribers will just receive the Throwable from then on.

The final case that is covered in RxJava via a Subject variant is when one would like to have a single value stored. Subscribers to it should immediately receive this value and any subsequent value in case a new value is onNext'd on the Subject. It is called BehaviorSubject and some also call it the reactive property. Again, a ReplaySubject of size 1 would also do, but unlike ReplaySubject, sending an onCompleted() will evict the single stored value and subsequent Subscribers will only receive just the onCompleted (or onError) event and not any value. You can create a BehaviorSubject with or without an initial value.

A reactive list

After such dry text, let's see how we can put these subjects into use. Imagine you want to build a list class that sends out notifications whenever its contents change. We want to send out events in case of addition, removal or update to some element and have

  • a notification channel for the type of change,
  • a notification channel of the value causing the change, which also remembers the last value that was involved in a change and
  • a notification channel that holds onto the last 10 elements added to the list.
I'll call this class ReactiveList:

public final class ReactiveList<T> {
    
    public enum ChangeType { 
        ADD, REMOVE, UPDATE                      // (1)
    };
    
    final List<T> list = new ArrayList<>();      // (2)
    
    final PublishSubject<Changetype> changes = 
            PublishSubject.create();             // (3)
    final BehaviorSubject<T> changeValues = 
            BehaviorSubject.create();
    final ReplaySubject<T> latestAdded = 
            ReplaySubject.createWithSize(10);
    
    public Observable<ChangeType> changes() {    // (4)
        return changes;
    }
    
    public Observable<T> changeValues() {
        return changeValues;
    }
    
    public Observable<T> latestAdded() {
        return latestAdded;
    }
    
    public void add(T value) {
        // implement
    }
    public void remove(T value) {
        // implement
    }
    public void replace(T value, T newValue) {
        // implement
    }
}

Our ReactiveList consist of the following notable elements:
  1. We define an enum for the three kinds of change events: add, remove and update.
  2. The data will be stored in a regular j.u.List instance.
  3. We specify the Subjects for the various event outputs and
  4. we specify methods that return an Observable of those outputs.
Now let's see the implementation of the mutation methods:

    
    // ...
    public void add(T value) {
        list.add(value);
        changes.onNext(ChangeType.ADD);
        changeValues.onNext(value);
        latestAdded.onNext(value);
    }
    public void remove(T value) {
        if (list.remove(value)) {
            changes.onNext(ChangeType.REMOVE);
            changeValues.onNext(value);
        }
    }
    public void replace(T value, T newValue) {
        int index = list.indexOf(value);
        if (index >= 0) {
            list.set(index, newValue);
            changes.onNext(ChangeType.UPDATE);
            changeValues.onNext(newValue);
        }
    }
}


Simply put, we perform the relevant changes and call onNext on the subjects to propagate the relevant change information.

A more reactive list

What if we want the inputs to the ReactiveList to be reactive as well by providing an Observer surface for them instead?

For simplicity, I'll drop the replace() functionality but add a new method list() that will return a snapshot Observable of the contents of the list at the call time.

Now that we go for full reactivity, one has to consider concurrent calls to the Subjects. Subjects implement the Observer interface and thus inherit the requirement that onXXX methods have to be called in a sequential fashion and not concurrently. We can, of course use what we learned from serialized access, but such behavior is so often required the Subject has a dedicated operator for it, toSerialized() which returns a Subject that can be accessed fully concurrently.

public class MoreReactiveList<T> {
    public enum ChangeType { 
        ADD, REMOVE 
    };
    
    final List<T> list = new ArrayList<>();
    
    final Subject<ChangeType, ChangeType> changes;  // (1)
    final Subject<T, T> changeValues;
    final Observer<T> addObserver;                  // (2)
    final Observer<T> removeObserver;
    
    public Observable<ChangeType> changes() {
        return changes;
    }
    
    public Observable<T> changeValues() {
        return changeValues;
    }
    
    public Observable<T> list() {                   // (3)
        List<T> copy = new ArrayList<>();
        synchronized (list) {
            copy.addAll(list);
        }
        return Observable.from(copy);
    }
    
    public Observer<T> adder() {                    // (4)
        return addObserver;
    }
    
    public Observer<T> remover() {
        return removeObserver;
    }

    void onAdd(T value) {                           // (5)
        synchronized (list) {
            list.add(value);
        }
        changes.onNext(ChangeType.ADD);
        changeValues.onNext(value);
    }

    void onRemove(T value) {
        synchronized (list) {
            if (!list.remove(value)) {
                return;
            }
        }
        changes.onNext(ChangeType.REMOVE);
        changeValues.onNext(value);
    }

    void clear() {
        synchronized (list) {
            list.clear();
        }
    }

    public MoreReactiveList() {
        // implement
    }
}

The data types and structure has changed a little:

  1. Since we are going to serialize the Subjects, we lose their concrete type and have to specify them as Subject<T, R>.
  2. We will have a single set Observers for both add- and remove-channels.
  3. Here we have the new a list() method that creates a snapshot of the underlying list and returns an Observable of it. Note the synchronization around the list itself: the class will be manipulated concurrently and we need to ensure thread safety.
  4. We return the final Observer instances for the add- and remove-channels through the methods.
  5. The actual list manipulation has been moved to the package-private onAdd and onRemove methods.
Now let's see the missing MoreReactiveList() constructor's logic:

    // ...
    public MoreReactiveList() {
        changes = 
                PublishSubject.<ChangeType>create()
                .toSerialized();                     // (1)

        changeValues = 
                BehaviorSubject.<T>create()
                .toSerialized();                     
        
        addObserver = new SerializedObserver<>(      // (2)
            Observers.create(
                this::onAdd,
                t -> {
                    clear();
                    changes.onError(t);
                    changeValues.onError(t);
                },
                () -> { 
                    clear();
                    changes.onCompleted();
                    changeValues.onCompleted();
                }
        ));
        removeObserver = new SerializedObserver<>(   // (3)
            Observers.create(
                this::onRemove,
                t -> {
                    clear();
                    changes.onError(t);
                    changeValues.onError(t);
                },
                () -> { 
                    clear();
                    changes.onCompleted();
                    changeValues.onCompleted();
                }
        ));
    }
}

It works as follows:

  1. We setup the output channels similar to ReactiveList but this time, we serialize the access to them.
  2. The addObserver is a serialized observer that forwards its onNext method to the onAdd method. If it receives a terminal event, the list is cleared and the event is propagated to both output channels.
  3. The removeObserver is built up similarly to (2) with the exception that it forwards to onRemove.
Let's try it out:

MoreReactiveList<Long> list = new MoreReactiveList<>();

Observable.timer(0, 1, TimeUnit.SECONDS)
    .take(10)
    .subscribe(list.adder());

Observable.timer(4, 1, TimeUnit.SECONDS)
    .take(10)
    .subscribe(list.remover());

list.changes().subscribe(System.out::println);

list.changes()
.flatMap(e -> list.list().toList())
.subscribe(System.out::println);

list.changeValues.toBlocking().forEach(System.out::println);

In the example, we create a list and two timers which run 4 seconds apart and emit items every second. We use the first timer to add to the list and the second to remove from the list. We then observe the change types, we print the current state of the list at when the contents change and we blockingly observe the change-causing values themselves until completion.

Conclusion

In this blog post, I've introduced the Subjects and their RxJava flavors, then shown two reactive objects that make use of their capabilities.

In the next post, I'll explain the requirements, structures and algorithms that are necessary to implement a custom Subject.

2015. június 15., hétfő

The Reactive-Streams API (part 4 - final)

Introduction


In this final post about the Reactive-Streams API, I'm going to talk about the perhaps surprising trend that one needs a SubscriberArbiter (the cousin of ProducerArbiter) very frequently, especially if there are multiple sources, scheduling or other asynchrony involved.

In RxJava 1.x, having a Producer set on a Subscriber is optional and one can call onError() and onCompleted() without any Producer. Such calls will eventually call unsubscribe() on the rx.Subscriber itself and clean up any resources.

In contrast, RS requires a Subscription to be sent through onSubscribe() before one can call the onXXX methods for any reason.

In this blog post, I'll look at cases where this requirement can cause trouble when the operator is implemented in the classical RxJava structure.

Deferred subscribing

An operator which has to consider an error-before-Subscription case is defer(). When subscribing to a deferred Publisher, the operator calls a user factory that returns another Publisher which is then subscribed to. Now since we don't really trust user functions, we need to catch a Throwable exception and notify the child of this problem. However, for this case, we already need a Subscription to be sent to the child Subscriber, but if we already sent one, the generated Producer can't send another one. The solution is to use a SubscriptionArbiter which will allow us to send an early error or switch over to the 'real' source.

public final class OnSubscribeDefer<T>
implements OnSubscribe<T> {
    final Func0<? extends Publisher<? extends T>> factory;
    public OnSubscribeDefer(
           Func0<? extends Publisher<? extends T>> factory) {
        this.factory = factory;
    }
    @Override
    public void call(Subscriber<? super T> child) {
        
        SubscriptionArbiter sa = new SubscriptionArbiter();
        child.onSubscribe(sa);                                 // (1)  
        
        Publisher<? extends T> p;
        try {
            p = factory.call();
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            child.onError(e);                                  // (2)
            return;
        }
        p.subscribe(new Subscriber<T>() {                      // (3)
            @Override
            public void onSubscribe(Subscription s) {
                sa.setSubscription(s);                         // (4)
            }

            @Override
            public void onNext(T t) {
                child.onNext(t);
            }

            @Override
            public void onError(Throwable t) {
                child.onError(t);
            }

            @Override
            public void onComplete() {
                child.onComplete();
            }
            
        });
    }
}

This time, we don't need to manage resources, but the Subscription change has to be handled:

  1. We first create an empty arbiter and set it on the child.
  2. In case the function call throws, we can now safely emit the exception through onError because child has received a Subscription in the form of the arbiter.
  3. We can't just use child directly but have to override the onSubscribe method
  4. and set the 'real' Subscription on the arbiter instead. The rest of the methods just delegate directly.

 Time-delayed subscribing

Let's look at the operator delaySubscription() which delays the actual subscription to the source by some time. We can almost copy-past the current implementation, but it won't compile due to the API changes:


public final class OnSubscribeDelayTimed<T>
implements OnSubscribe<T> {
    final Publisher<T> source;
    final Scheduler scheduler;
    final long delay;
    final TimeUnit unit;
    public OnSubscribeDelayTimed(
            Publisher<T> source, 
            long delay, TimeUnit unit, 
            Scheduler scheduler) {
        this.source = source;
        this.delay = delay;
        this.unit = unit;
        this.scheduler = scheduler;
    }
    @Override
    public void call(Subscriber child) {
        Scheduler.Worker w = scheduler.createWorker();
        
        // child.add(w);
        
        w.schedule(() -> {
            // if (!child.isUnsubscribed()) {

                source.subscribe(child);

            // }
        }, delay, unit);
    }
}


We can't add a resource to child and we can't check for unsubscription either this way. In order to clear the worker, one needs a disposable-container wrapper, for the cancellation, we need something that can 'replay' cancellation to the Subscription the source is going to emit:


    @Override
    public void call(Subscriber<? super T> child) {
        Scheduler.Worker w = scheduler.createWorker();
        
        SubscriptionArbiter sa = new SubscriptionArbiter();    // (1)

        DisposableSubscription dsub = 
            new DisposableSubscription(
                sa, new DisposableList());                     // (2)
        
        dsub.add(w);                                           // (3)
        
        child.onSubscribe(dsub);                               // (4)
        
        w.schedule(() -> {
            source.subscribe(new Subscriber<T>() {             // (5)
                @Override
                public void onSubscribe(Subscription s) {
                    sa.setSubscription(s);                     // (6)
                }

                @Override
                public void onNext(T t) {
                    child.onNext(t);
                }

                @Override
                public void onError(Throwable t) {
                    child.onError(t);
                }

                @Override
                public void onComplete() {
                    child.onComplete();
                }
                
            });
        }, delay, unit);

Quite an expanded logic in respect of the original code, but for a good reason:

  1. We need a SubscriptionArbiter because we need cancellation support while the delayed subscription happens, however, we won't have any real Subscription from source yet so we need to hold onto any requests that may come in in the meantime.
  2. We need a way to cancel the schedule in case the child has decided to cancel() the whole operation. Since the arbiter doesn't have resource management, we need to wrap it into a disposable-container. Naturally, we don't really need a list for a single resource and you can implement your own, single resource disposable subscription.
  3. The container will take care of cancellation if we add the Worker instance.
  4. Once the arbiter+disposable are set up, we send the dsub to the child. This way, it can request() and cancel() at any time and both arbiter and dsub will forward the calls to the appropriate (re)sources.
  5. Once the scheduled action runs, we can't just hand the original child Subscriber to it because we already have set a Subscriber on it. Therefore, we need to wrap it with another Subscriber instance that mostly forwards the onXXX calls except onSubscribe.
  6. The onSubscribe needs to change and instead of setting the source Subscription on the child, we set it on the arbiter. The arbiter will then now relay any accumulated requests or cancel the source immediately.
Now you might think there is an error with the Subscriber (5) because in its onNext methods, it doesn't call sa.produced(1). This causes an accounting inconsistency indeed, but once a real Subscription is set on the arbiter, any future request(n) from child is relayed as-is to the upstream and no further call to setSubscription() will take place in this operator. The upstream receives the correct request amounts and the arbiter just keeps adding them up without having any effect on anything else. To be on the safe side, you can
  • still call sa.produced(1) or
  • implement an arbiter variant which allows only a single Subscription to be set and stops accumulating requests after it is set.

Conclusion

In this blog post, I've shown two cases where Subscription arbitration and resource cleanup has to be managed. Luckily, not all operators have to do these behaviors but the majority will, therefore, two specialized Subscriptions are required to handle any delay in a 'real' Subscription and/or the release of resources on cancel().

Since this is going to happen frequently, I'm sure RxJava 2.0 will provide standard and efficient implementations to help conform to the Reactive-Streams specification.

As for the RS API series conclusion, the minimal set of interfaces is quite capable of capturing the requirements of a typical dataflow. It is possible to convert RxJava idioms into RS idioms, but the infrastructure and conveniences have to be re-implemented from scratch. I've shown some basic Subscriptions such as SingeSubscription, SingleDelayedSubscription, SubscriptionArbiter and DisposableSubscription that, among other similar new classes will be the main tools to implement operators for RxJava 2.0.

In the next series, I'm going to cover the most controversial set of objects in the field of reactive programming and I'll not just talk about them in detail but show how to implement your own custom variant of the kind.

The Reactive-Streams API (part 3)

Introduction


In this episode, I'm going to talk about how the rx.Subscriber's ability to register resources with it can be converted into the Reactive-Streams API. However, since RS doesn't specify any means of resource management, we need to introduce (rename the rx.Subscription) containers ourselves and attach them to the RS Subscriber's cancellation logic in some way.

Subscription vs. Subscription

To avoid confusion, RxJava 2.0 will replace the XXXSubscriptions with XXXDisposables classes and interfaces, however, I'm not going to detail these classes here, but only show the new base interfaces for resource management:

interface Disposable {
    boolean isDisposed();
    void dispose();
}

interface DisposableCollection extends Disposable {
    boolean add(Disposable resource);
    boolean remove(Disposable resource);
    boolean removeSilently(Disposable resource);

    void clear();
    boolean hasDisposables();
    boolean contains(Disposable resource);
}


The usage rules will remain the same: thread safety and idempotence.


The DisposableSubscription

The most basic way of adding resource management is to wrap around an existing Subscription with one that hijacks the cancel() method and calls dispose() on the underlying composite:

public final class DisposableSubscription
implements Disposable, Subscription {
    final Subscription actual;
    final DisposableCollection collection;
    public DisposableSubscription(
            Subscription actual, 
            DisposableCollection collection) {
        this.actual = Objects.requireNonNull(actual);
        this.collection = Objects.requireNonNull(collection);
    }
    public boolean add(Disposable resource) {
        return collection.add(resource);
    }
    public boolean remove(Disposable resource) {
        return collection.remove(resource);
    }
    public boolean removeSilently(Disposable resource) {
        return collection.remove(resource);
    }
    @Override
    public boolean isDisposed() {
        return collection.isDisposed();
    }
    @Override
    public void dispose() {
        cancel();
    }
    
    @Override
    public void cancel() {
        collection.dispose();
        actual.cancel();
    }
    @Override
    public void request(long n) {
        actual.request(n);
    }
}

By implementing the Disposable interface, the DisposableSubscription itself can now be added to a disposable-container and participate in a complex dispose network. However, most of the time you'd want to avoid extra allocation, therefore, the structure above will be likely inlined into some other class, perhaps into the Subscriber participating in a lift() call.

If you remember the RxJava specification and pitfall #2, you shouldn't unsubscribe your downstream, because it is likely you'll trigger a premature release of its resources.

(This is currently somewhat an issue with RxAndroid's LifecycleObservable where one can attach a takeUntil()-like operator in the middle of a chain which instead of sending out onCompleted(), it unsubscribes its downstream subscriber.)

With RS, such downstream unsubscription is practically infeasible. Each level can only either pass the Subscription as-is (and thus can't attach resources to it) or wrap it with a DisposableSubscription-like class and forward that downstream as just a Subscription. If you call the cancel() on your level, it can't call a cancel() of a class that wraps your Subscription in any way.

As always, you still can shoot yourself in the foot, but it takes much more effort than in RxJava today and the rule won't change: you shouldn't try to cancel/dispose your downstream's resources nor share them across the chain of operators.


TakeUntil

Now let's see how one can implement the takeUntil() operator with such externalized resource management (split into code parts for better readability):

public final class OperatorTakeUntil<T> 
implements Operator<T, T> {
    final Publisher<?> other;
    public OperatorTakeUntil(Publisher<?> other) {
        this.other = Objects.requireNonNull(other);
    }
    @Override
    public Subscriber<? super T> call(
            Subscriber<? super T> child) {
        Subscriber<T> serial = 
            new SerializedSubscriber<>(child);

        SubscriptionArbiter arbiter = 
            new SubscriptionArbiter();                       // (1)
        serial.onSubscribe(arbiter);
        
        SerialDisposable sdUntil = new SerialDisposable();   // (2)
        SerialDisposable sdParent = new SerialDisposable();  // (3)

So far, it looks similar to the current RxJava implementation: we wrap the child to serialize out the potentially asynchronous onError() or onCompleted() emitted by the other Publisher.

We create a SubscriptionArbiter (a conversion of ProducerArbiter) for the following reason: let's assume, we are still in the call() method when the other source, already subscribed to, emits a notification of any kind. We'd like to forward it to the child Subscriber, however, unless it has a Subscription available, we can't call onXXX methods on it just yet. Such a 'real' subscription through the main path will only arrive once the operator-chain starts to call onSubscribe(). I'll talk about this in more detail in the next blog post.

However, since the way the cancellation option is presented to Subscribers, we need a way (2) to pass the Subscription from the other Subscriber back to the parent Subscriber so if either of them reaches a terminal state, it can cancel() the other one. Since the other may get its Subscription and cancellation before the parent does, we need to be prepared to cancel the parent's subscription on reception as well (3).

            // ...
        Subscriber<T> parent = new Subscriber<T>() {
            DisposableSubscription dsub;
            @Override
            public void onSubscribe(Subscription s) {
                DisposableSubscription dsub = 
                        new DisposableSubscription(s, 
                        new DisposableList());              // (1)
                dsub.add(sdUntil);                          // (2)
                sdParent.set(dsub);
                arbiter.setSubscription(dsub);              // (3)
            }
            @Override
            public void onNext(T t) {
                serial.onNext(t);
            }
            @Override
            public void onError(Throwable t) {
                serial.onError(t);
                sdParent.cancel();                          // (4)
            }
            @Override
            public void onComplete() {
                serial.onComplete();
                sdParent.cancel();
            }
        };


The parent Subscriber is a slightly different: we need to handle the incoming Subscription and wire up the cancellation network:

  1. We create a DisposableSubscription wrapper with a List-based underlying collection.
  2. We add the SerialDisposable - that will hold the Disposable to the other Subscription - to this composite. We also add the DisposableSubscription to the sdParent to allow the other Subscriber to finish before the parent one can even start.
  3. We hand the wrapper down to the serializer.
  4. In case an error or completion event happens, we'll make sure the composite is cancelled/disposed on the spot. Since the composite holds a reference to the other Subscriber's subscription, this will also cancel that stream.
As the final segment, we need to create a Subscriber to the other stream and ensure it completes the parent:

        // ...
        Subscriber<Object> until = new Subscriber<Object>() {
            @Override
            public void onSubscribe(Subscription s) {
                sdUntil.set(Disposables.create(s::cancel));    // (1)
                s.request(Long.MAX_VALUE);
            }
            @Override
            public void onNext(Object t) {
                parent.onComplete();                           // (2)
            }
            @Override
            public void onError(Throwable t) {
                parent.onError(t);
            }
            @Override
            public void onComplete() {
                parent.onComplete();
            }
        };
        
        this.other.subscribe(until);
        
        return parent;
    }
}

We receive the Subscription from the other source (1) and wrap it into a Disposable (similar to how RxJava's Subscription.create() does this now). Due to the terminal nature of Disposables, even if the main chain completes before the other chain has even the chance to receive a Subscription, the SerialDisposable will become disposed and will immediately dispose the other's Subscription.

Note that since the ability to cancel and thus dispose resources depends on timing and ordering, usually one will end up needing some disposable-container up-front (i.e., sdParent, sdOther) so regardless of which Subscription arrives when, they can cancel out each other as necessary.


TakeUntil v2

Looking at our takeUntil() implementation, one can discover that reorganizing the various Subscriptions, it is possible to clear up the mess the Disposables have caused.

    @Override
    // ...
    public Subscriber<? super T> call(Subscriber<? super T> child) {
        Subscriber<T> serial = new SerializedSubscriber<>(child);

        SubscriptionArbiter sa = new SubscriptionArbiter();        // (1)

        DisposableSubscription dsub = 
            new DisposableSubscription(sa, new DisposableList());  // (2)
        
        serial.onSubscribe(dsub);                                  // (3)
        
        Subscriber<T> parent = new Subscriber<T>() {
            @Override
            public void onSubscribe(Subscription s) {
                dsub.add(Disposables.create(s::cancel));           // (4)
                sa.setSubscription(s);                             // (5)
            }
            // ...
        };
        
        Subscriber<Object> until = 
        new Subscriber<Object>() {
            @Override
            public void onSubscribe(Subscription s) {
                dsub.add(Disposables.create(s::cancel));           // (6)
                s.request(Long.MAX_VALUE);
            }
            // ...

It works as follows:

  1. We create the SubscriptionArbiter as before,
  2. then it is wrapped by a DisposableSubscription,
  3. which is then pushed downstream. The pair will make sure cancellation and requests are accumulated until the arbiter receives a 'real' Subscription.
  4. Once the main receives its Subscription, we turn it into a Disposable and add it to the dsub composite,
  5. followed by updating the arbiter's current subscription: any accumulated requests and any cancellation is now 'replayed' to the upstream Subscription.
  6. Once the other source sends its Subscription, we turn it into a Disposable as well and request everything.
Naturally, the parent will now call dsub.dispose() in its onError() and onComplete() methods.

Let's think about the various cancellation paths:

  • Downstream's cancellation: it will cancel the dsub, dsub will cancel the arbiter, the arbiter will immediately cancel any Subscription it receives.
  • Main completion: it will cancel the dsub, dsub will cancel the arbiter and the arbiter will cancel the upstream's Subscription. In addition, dsub will immediately cancel the other's subscription if added.
  • Other completion: it will cancel the dsub, dsub will cancel the arbiter. Once the main receives a Subscription, either dsub or the arbiter will cancel it immediately.

Conclusion

In this blog post, I've talked about how resource management can be performed with respect to Reactive-Stream's Subscribers and Subscriptions and shown an example implementation of a takeUntil() operator.

Although it looks like we have to, at least, allocate as many objects similar to RxJava, note that many operators don't really require resource management themselves (example: take()) or don't even need to wrap the Subscription instance in any way (example: map()).

In the next and final post about the RS API, I'm going to talk about the increased need for various arbiter classes (briefly mentioned in the takeUntil() example), because one can't leave a Subscriber without a Subscription and expect cancellation to work and one can't call onSubscribe() multiple times either if the source changes.

The Reactive-Streams API (part 2)

Introduction

In this blog post, I'm going to take our SingleProducer and SingleDelayedProducer classes and convert them into a reactive-streams based Subscriptions.

At first, one might think the conversion is going to be troublesome, but luckily, if you can already thing in how you'd implement the request() method on a rx.Producer, you are 75% there. The final 25% comes from the idea how you'd move the rx.Subscriber.isUnsubscribed() logic into request() since the rs.Subscriber doesn't extend rx.Subscription (nor any other resource-management interface).

The SingleSubscription

Since the SingleSubscription itself isn't that complicated, I'm going to show it in one go:

import org.reactivestreams.*;

public final class SingleSubscription<T> 
extends AtomicBoolean implements Subscription {
    private static final long serialVersionUID = 1L;
    
    final T value;                                       // (1)
    final Subscriber<? super T> child;
    volatile boolean cancelled;                          // (2)
    
    public SingleSubscription(T value, 
            Subscriber<? super T> child) {               // (3)
        this.value = Objects.requireNonNull(value);
        this.child = Objects.requireNonNull(child);
    }
    @Override
    public void request(long n) {
        if (n <= 0) {
            throw new IllegalArgumentException(
                "n > 0 required");                       // (4)
        }
        if (compareAndSet(false, true)) {
            if (!cancelled) {                            // (5)
                child.onNext(value);
                if (!cancelled) {
                    child.onComplete();
                }
            }
        }
    }
    @Override
    public void cancel() {
        cancelled = true;                                // (6)
    }
}

That's it! Wait, that's it? Yes, it is no accident that I've been showing Producer implementations so far that can be transformed into a reactive-streams Subscription with relatively little effort. But still, here are the explanation for the major properties of this new implementation:

  1. We have an instance field for the constant value and the target Subscriber as before,
  2. however, since isUnsubscribed() is not part of the RS Subscriber and unsubscription comes in the form of a cancel() call, we need to store the cancelled state ourselves, in a volatile field. If you recall, I mentioned that you can't be sure by what and when request() (or in fact, cancel()) will be called, therefore, one needs to make sure things are thread-safe.
  3. Since RS doesn't like null values, we capture them early in the constructor.
  4. My "Let them throw!" philosophy dictates that non-positive requests are programming errors which should yield a nice IllegalArgumentException.
  5. Because there is no child.isUnsubscribed() method anymore, we check the volatile cancelled variable everywhere instead.
  6. Our idempotent cancel just sets the cancelled flag atomically.

The SingleDelayedSubscription

Given the simplicity of SingleSubscription, how hard could it be to convert SingleDelayedProducer?

public final class SingleDelayedSubscription<T> 
extends AtomicInteger implements Subscription {
    /** */
    private static final long serialVersionUID = -1L;
    
    T value;
    final Subscriber<? super T> child;
    
    static final int CANCELLED = -1;                           // (1)
    static final int NO_VALUE_NO_REQUEST = 0;
    static final int NO_VALUE_HAS_REQUEST = 1;
    static final int HAS_VALUE_NO_REQUEST = 2;
    static final int HAS_VALUE_HAS_REQUEST = 3;
    
    public SingleDelayedSubscription(Subscriber<? super T> child) {
        this.child = Objects.requireNonNull(child);
    }
    @Override
    public void request(long n) {
        if (n <= 0) {
            throw new IllegalArgumentException("n > 0 required");
        }
        for (;;) {
            int s = get();
            if (s == NO_VALUE_HAS_REQUEST
                    || s == HAS_VALUE_HAS_REQUEST
                    || s == CANCELLED) {                       // (2)
                return;
            }
            if (s == NO_VALUE_NO_REQUEST) {
                if (!compareAndSet(s, NO_VALUE_HAS_REQUEST)) {
                    continue;
                }
            }
            if (s == HAS_VALUE_NO_REQUEST) {
                if (compareAndSet(s, HAS_VALUE_HAS_REQUEST)) {
                    T v = value;
                    value = null;
                    child.onNext(v);
                    if (get() != CANCELLED) {                  // (3)
                        child.onComplete();
                    }
                }
            }
            return;
        }
    }
    
    public void setValue(T value) {
       Objects.requireNonNull(value);
       for (;;) {
           int s = get();
           if (s == HAS_VALUE_NO_REQUEST
                   || s == HAS_VALUE_HAS_REQUEST
                   || s == CANCELLED) {                        // (4)
               return;
           } else
           if (s == NO_VALUE_NO_REQUEST) {
               this.value = value;
               if (!compareAndSet(s, HAS_VALUE_NO_REQUEST)) {
                   continue;
               }
           } else
           if (s == NO_VALUE_HAS_REQUEST) {
               if (compareAndSet(s, HAS_VALUE_HAS_REQUEST)) {
                   child.onNext(value);
                   if (get() != CANCELLED) {                   // (5)
                       child.onComplete();
                   }
               }
           }
           return;
       }
    }

    @Override
    public void cancel() {
        int state = get();
        if (state != CANCELLED) {                              // (6)
            state = getAndSet(CANCELLED);
            if (state != CANCELLED) {
                value = null;
            }
        }
    }
}

Looks quite similar to the original state-machine, but it has an additional CANCELLED state (1..6), which is atomically swapped in. We don't really need to check for this state before onNext() because the preceding compareAndSet() would fail anyway, but we can check it just before calling onComplete().

Why don't we use a volatile cancelled flag instead of this new state? You could naturally do that and the resulting Subscription would be equally correct. It is a matter of personal preference: you can add an extra instance field or extend the state machine to include a cancelled state. The primary reason here, mostly, is to show an example of this latter alternative.

The RangeSubscription

I'm not going to convert all previous Producers into Subscriptions here, but I'd like to show a second example for including a cancelled state in the state machine.

public final class RangeSubscription 
extends AtomicLong implements Subscription {
    /** */
    private static final long serialVersionUID = 1L;
    
    final Subscriber<? super Integer> child;
    int index;
    final int max;
    
    static final long CANCELLED = Long.MIN_VALUE;          // (1)
    
    public RangeSubscription(
            Subscriber<? super Integer> child, 
            int start, int count) {
        this.child = Objects.requireNonNull(child);
        this.index = start;
        this.max = start + count;
    }
    @Override
    public void request(long n) {
        if (n <= 0) {
            throw new IllegalArgumentException(
                "n > required");
        }
        long r;
        for (;;) {
            r = get();
            if (r == CANCELLED) {                          // (2)
                return;
            }
            long u = r + n;
            if (u < 0) {
                u = Long.MAX_VALUE;
            }
            if (compareAndSet(r, u)) {
                break;
            }
        }
        if (r != 0L) {
            return;
        }
        for (;;) {
            r = get();
            if (r == CANCELLED) {                          // (3)
                 return;
            }
            int i = index;
            int m = max;
            long e = 0;
            while (r > 0L && i < m) {
                child.onNext(i);
                if (get() == CANCELLED) {                  // (4)
                    return;
                }
                i++;
                if (i == m) {
                    child.onComplete();
                    return;
                }
                r--;
                e++;
            }
            index = i;
            if (e != 0) {
                for (;;) {
                    r = get();
                    if (r == CANCELLED) {                  // (5)
                        return;
                    }
                    long u = r - e;
                    if (u < 0) {
                        throw new IllegalStateException(
                                "more produced than requested!");
                    }
                    if (compareAndSet(r, u)) {
                        break;
                    }
                }
            }
            if (r <= 0L) {
                break;
            }
        }
    }
    @Override
    public void cancel() {
        if (get() != CANCELLED) {                          // (6)
            getAndSet(CANCELLED);
        }
    }
}

For brevity, I've omitted the fast-path logic here. The rest is, again, similar to the original RangeProducer structure, but now that the cancellation state is merged into the requested accounting, we need to re-read the current requested amount and check for a CANCELLED value (1) almost everywhere (2..5). Note that the emission accounting can't be a simple getAndAdd() anymore, because even if CANCELLED would be -1, one could, in theory, emit Long.MAX_VALUE - 1 and wrap the counter, losing the cancelled state information. Again, using getAndSet() to swap in the terminal state atomically and in idempotent fashion (6).


Conclusion

In this part, I've shown two approaches to convert from a rx.Producer into an RS Subscription and keep the unsubscription behavior intact. Naturally, they involve tradeoffs: instance size if using a separate cancellation flag or algorithm complexity if cancellation is woven into the state machine.

In the next part, I'm going to show how one can deal with the loss of another rx.Subscriber functionality: the add(rx.Subscriber) option to associate resources with the downstream Subscriber.

2015. június 13., szombat

The Reactive-Streams API (part 1)

Introduction


The Reactive-Streams API, heavily influenced by RxJava, is (in my opinion) the most beautiful API describing (a)synchronous, cancellable and backpressure-able dataflows to date, replacing the original and limited IObservable/IObserver approach invented by Erik Meijer.

Back in April, I spent a week and tried porting RxJava 1.x over this new API. At first, the biggest hurdle was the lack of resource-management of the new Subscriber interface and how RxJava's Producer and Subscription interfaces were merged into a single Subscription interface. However, once I started implementing various infrastructure supporting classes, I started appreciating the new design, and perhaps the most important outcome is how the entire backpressure awareness just clicked into the right place in my mind.

With this new insight, I started looking at RxJava's operators and Producer implementations and I was suddenly able to discover bugs and optimization possibilities quite easily, not to mention, start a blog series about how things (should) work in RxJava.

In this blog series, I'm going to talk about this new reactive-streams API and its relation to current RxJava 1.x constructs.

Package and naming confusions

Knowing RxJava in and out, perhaps the first stumbling block about reactive-streams (RS) is how concepts are named differently.

rx.Observable is called org.reactivestreams.Publisher and is now an interface. Since Java doesn't and probably won't support extension methods, having a Publisher interface returned as a source doesn't really help with composing operators over it. Luckily, the current RxJava fluent approach can still remain as is by extending this new interface.

Unsubscription through the rx.Subscription and backpressure through the rx.Producer interfaces are now somewhat unified into a single interface, named org.reactivestreams.Subscription. The new interface uses cancel() instead of unsubscribe() and doesn't have (nor really needs) the means to check for a isUnsubscribed() state. Since RS doesn't specify any means of resource management, one will need to constantly wrap the new cancel() method into a resource management instance to be able to use it with the containers. Due to this name confusion, RxJava 2.0 will most likely change the resource management classes and interfaces into the C#-esque Disposables.

There is no simple rx.Observer equivalent, but the rx.Subscriber looks somewhat similar to org.reactivestreams.Subscriber. Instead of it being unsubscribable directly, the new Subscriber receives a 'control' object that lets it cancel the connection and request more values. In addition, instead of having two methods, onStart() and setProducer() - which pose quite a confusion about how one should jump-start a Subscriber in RxJava -, there is only a single onSubscribe() method taking an RS Subscription. The former calls to the protected rx.Subscriber.request(n) now have to store this RS Subscription instance and call rs.Subscription.request(n) on it.

There were some debates about how the completion signal's method should be called and was settled on onComplete() instead of RxJava's and Rx.NET's onCompleted() name. Hopefully, our IDE's auto-complete feature will save us in this regard.

Finally, RS specifies a combined interface of rs.Publisher and rs.Subscriber by the name of Processor, which resembles to the Subjects API inside RxJava. Since this Processor is an interface, the only change to rx.Subject will be to implement it.


Java 9+ Flow

There are some clashes between RxJava and RS, but you might have heard Doug Lea wants to include the reactive-streams idiom in Java 9 under container object java.util.concurrent.Flow. The naming matches RS but the packages are different. I was and I'm still skeptical about this move:
  • This "common platform" won't be that common: only Java 9+ users will benefit from it. Having RS, which is Java 6+ and thus Android-friendly, is a better option; most projects already include many other libraries and plus 2 (RS + RxJava 2.0) is not an issue.
  • Java 9 won't use it internally at all. No new async NIO, networking, File I/O and UI events based on the Observer++ pattern seems to be planned. Not to mention, without fluent API support, most users can't go zero-dependency with it. The situation can be remedied if Flow is backported to previos Java JDKs (with the same package location) in an update. Still, without extension methods in Java, users have to always rely on extra libraries and instance wrappings.
  • I'm not sure the whole reactive paradigm is final in terms of capturing properties: for example, Applied Duality's AsyncObservable capturing the latency of onXXX methods might be some day incorporated in RS 2.0 (although I don't really see the need for it). Swapping out the dependencies to RS 2.0 and RxJava 3.0 might be much simpler than changing JDK's Flow API. On a historical note, Rx.NET got its IObservable/IObserver included in the BCL, and I think they painted themselves into a corner and in order them to join the RS-paradigm, they now have to figure out a way to have the new set of interface structures live alongside the old one, similar to how pre-generics and post-generics collections ended up. Note that I assume here they accept RS as the new basis for Rx.NET 3.0 and don't try to do something more complicated.
 

 Critique of RS

I'm completely satisfied with the four new interfaces of RS, but less so with the textual specification about them. I believe it is over-restrictive and prevents some sensible API implementations.

No exceptions other than NullPointerException

Although methods are allowed to throw NPE if parameters of various methods are null, however, the components can end up in invalid state or the Subscription.request() could be called with an invalid non-positive value. The current RS 1.0 specification doesn't allow handing these cases the way most Java APIs do by throwing IllegalArgumentException or IllegalStateException.

One can argue that such exceptions should be sent through the onError() method, but I'll show potential situation that makes this impossible if adhering to the specification.

Let's assume one has a Subscriber and accidentally subscribes it to two different Publishers. By definition, this is not allowed and the onSubscribe() method should reject a second Subscription:

    // ...
    Subscription subscription;
    @Override
    public void onSubscribe(Subscription s) {
        if (subscription != null) {
            s.cancel();
            onError(new IllegalStateException("§x.y: ..."));
            return;
        }
        this.subscription = s;
        s.request(Long.MAX_VALUE);
    }

This may appear to work, but the moment the first Publisher goes async, two problems arise:
  • The second onSubscribe() call's onError() is now, potentially, concurrently invoked with the first Publisher's onNext() call, which is correctly forbidden by the specification.
  • Storing the first subscription can't be plain and requires an AtomicReference in case there is a subscription race due to multiple uses, adding overhead to a Subscriber implementation.
Therefore, the only safe way to implement onSubscribe() could be something like this:

    // ...
    final AtomicReference subscription = ...;
    @Override
    public void onSubscribe(Subscription s) {
        if (!subscription.compareAndSet(null, s)) {
            s.cancel();
            new IllegalStateException("§x.y: ...").printStackTrace();
            return;
        }
        s.request(Long.MAX_VALUE);
    }

and hope the standard out or log is monitored pr one needs to serialize out the subscriber's onXXX() methods all the time.

Instead of this, I'd simply throw the IllegalArgumentException and make the subscribe() fail.

Illegal request amount

Requests through the Subscriptions should be positive, otherwise, the subscriber should be notified via onError().

    // ...
    @Override
    public void request(long n) {
        if (n <= 0) {
            subscriber.onError(new IllegalArgumentException("§x.y: ..."));
            return;
        }
        // ...
    }


Again, the problem comes when request() is invoked asynchronously, for example, due to an observeOn() trying to replenish its input queue while the Subscription is still producing values through onNext(). The onError() is now racing with onNext() again, which is forbidden.
The resolution and my suggestions are similar to the IllegalStateException: one can print/log the error or instead throw it back at the caller.


The request() can't throw NPE

Even though various onXXX() methods can at least throw a NullPointerException, the Subscription.request() can't. One has to try and bounce such exception back to the Subscriber and if that fails as well, there is nowhere it could go but to standard out or log.


Processors require Subscriptions

The RS is heavily biased towards cold observable sequences where having a Subscription is a correct interoperation element. However, hot Observables, such as RxJava Subjects may or may not receive Subscriptions before their onNext() is called.

For example, if one uses a PublishSubject to multicast an Observable by calling Observable.subscribe(PublishSubject), the PublishSubject will receive a Subscription. However, if one uses PublishSubject to multicast mouse movement events, there is no way it can be cancelled (or backpressured) and thus having a Subscription there is unnecessary.

Therefore, one either has to use some dummy Subscription or, as I see, make the call to onSubscribe() optional for Processors.


Processors have to support backpressure

Backpressure support might be impossible and infeasible with Processors: mouse moves can't be backpressured, therefore, a Subject would need to buffer and/or drop overflown values. Implementing it for AsyncSubject is trivial, but other Subject types have to be turned into some kind of hybrid ReplaySubject. (Naturally, PublishSubject can by default drop unrequested values for its subscribers as if they weren't there, but the continuous value delivery guarantee of Subjects is too valuable to be subverted in my opinion.)

A conforming implementation would impose quite an overhead for all subscribers by preventing 'write-through' and potentially lock-stepping them and slowing the processing down to the slowest requestor.

Instead, I'd weaken the related specification points and allow optional backpressure support for Processors.


Subscriptions will be conservative regardless

The specification suggests that Subscriptions can be implemented without thinking about concurrency too often. If you remember about the requirements of Producers in RxJava, I mentioned the requirement of thread-safety and reentrant-safety.

Reentrant-safety will still be critical part of the Subscription.request() implementation to avoid unnecessary recursion and to handle the case when the onNext() calls request() before it processes the current value which would trigger another (potentially) recursive call to onNext() ad infinitum.

The need for thread-safety can be 'proven' by the behavior of the following chain. Let's assume a chain is subscribed on the main thread, outside any RxJava schedulers and starts producing values through an observeOn() operator. The consumer of the sequence is then periodically requesting more data which request() call is forwarded to the source still emitting from the main thread. At this point, two threads are in or enter the request() method and without proper atomics/synchronization, the reentrancy check and proper request accounting wouldn't work. Now one would think the observeOn() could serialize access to its Subscription (the main source) but since the source runs outside the Schedulers, such serialization would be no-op from the downstream's perspective and ineffective from the upstream's perspective.

Since the source Subscriber can't possibly know the call pattern to its request() method, it has to be conservative and use one of the approaches I blogged about in respect to Producers. Naturally, this also implies that the cancel() method has to touch a volatile variable in any implementations.

Conclusion

These concerns didn't just arise while writing this post, but I've already tried to notify the RS designers about them. Unfortunately, my concerns didn't got through and my code didn't win arguments. We could argue about my issues endlessly, but instead, I'd rather write code and prove my points there.

Regardless, I find RS to be a good basis to build upon with some slight phylosophical adjustments:
  • Error handling: to paraphrase the latest Godzilla movie: "Let them throw!".
  • Processors have the option to do backpressure and cancellation on their Subscriber side.
  • Most non-trivial Subscriptions have to be conservative and made threadsafe.
In the next post, I'll start with the two most basic Producers of RxJava, namely the SingleProducer and SingleDelayedProducer and show how they can be made into an RS SingleSubscription and SingleDelayedSubscription.