2015. június 15., hétfő

The Reactive-Streams API (part 4 - final)

Introduction


In this final post about the Reactive-Streams API, I'm going to talk about the perhaps surprising trend that one needs a SubscriberArbiter (the cousin of ProducerArbiter) very frequently, especially if there are multiple sources, scheduling or other asynchrony involved.

In RxJava 1.x, having a Producer set on a Subscriber is optional and one can call onError() and onCompleted() without any Producer. Such calls will eventually call unsubscribe() on the rx.Subscriber itself and clean up any resources.

In contrast, RS requires a Subscription to be sent through onSubscribe() before one can call the onXXX methods for any reason.

In this blog post, I'll look at cases where this requirement can cause trouble when the operator is implemented in the classical RxJava structure.

Deferred subscribing

An operator which has to consider an error-before-Subscription case is defer(). When subscribing to a deferred Publisher, the operator calls a user factory that returns another Publisher which is then subscribed to. Now since we don't really trust user functions, we need to catch a Throwable exception and notify the child of this problem. However, for this case, we already need a Subscription to be sent to the child Subscriber, but if we already sent one, the generated Producer can't send another one. The solution is to use a SubscriptionArbiter which will allow us to send an early error or switch over to the 'real' source.

public final class OnSubscribeDefer<T>
implements OnSubscribe<T> {
    final Func0<? extends Publisher<? extends T>> factory;
    public OnSubscribeDefer(
           Func0<? extends Publisher<? extends T>> factory) {
        this.factory = factory;
    }
    @Override
    public void call(Subscriber<? super T> child) {
        
        SubscriptionArbiter sa = new SubscriptionArbiter();
        child.onSubscribe(sa);                                 // (1)  
        
        Publisher<? extends T> p;
        try {
            p = factory.call();
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            child.onError(e);                                  // (2)
            return;
        }
        p.subscribe(new Subscriber<T>() {                      // (3)
            @Override
            public void onSubscribe(Subscription s) {
                sa.setSubscription(s);                         // (4)
            }

            @Override
            public void onNext(T t) {
                child.onNext(t);
            }

            @Override
            public void onError(Throwable t) {
                child.onError(t);
            }

            @Override
            public void onComplete() {
                child.onComplete();
            }
            
        });
    }
}

This time, we don't need to manage resources, but the Subscription change has to be handled:

  1. We first create an empty arbiter and set it on the child.
  2. In case the function call throws, we can now safely emit the exception through onError because child has received a Subscription in the form of the arbiter.
  3. We can't just use child directly but have to override the onSubscribe method
  4. and set the 'real' Subscription on the arbiter instead. The rest of the methods just delegate directly.

 Time-delayed subscribing

Let's look at the operator delaySubscription() which delays the actual subscription to the source by some time. We can almost copy-past the current implementation, but it won't compile due to the API changes:


public final class OnSubscribeDelayTimed<T>
implements OnSubscribe<T> {
    final Publisher<T> source;
    final Scheduler scheduler;
    final long delay;
    final TimeUnit unit;
    public OnSubscribeDelayTimed(
            Publisher<T> source, 
            long delay, TimeUnit unit, 
            Scheduler scheduler) {
        this.source = source;
        this.delay = delay;
        this.unit = unit;
        this.scheduler = scheduler;
    }
    @Override
    public void call(Subscriber child) {
        Scheduler.Worker w = scheduler.createWorker();
        
        // child.add(w);
        
        w.schedule(() -> {
            // if (!child.isUnsubscribed()) {

                source.subscribe(child);

            // }
        }, delay, unit);
    }
}


We can't add a resource to child and we can't check for unsubscription either this way. In order to clear the worker, one needs a disposable-container wrapper, for the cancellation, we need something that can 'replay' cancellation to the Subscription the source is going to emit:


    @Override
    public void call(Subscriber<? super T> child) {
        Scheduler.Worker w = scheduler.createWorker();
        
        SubscriptionArbiter sa = new SubscriptionArbiter();    // (1)

        DisposableSubscription dsub = 
            new DisposableSubscription(
                sa, new DisposableList());                     // (2)
        
        dsub.add(w);                                           // (3)
        
        child.onSubscribe(dsub);                               // (4)
        
        w.schedule(() -> {
            source.subscribe(new Subscriber<T>() {             // (5)
                @Override
                public void onSubscribe(Subscription s) {
                    sa.setSubscription(s);                     // (6)
                }

                @Override
                public void onNext(T t) {
                    child.onNext(t);
                }

                @Override
                public void onError(Throwable t) {
                    child.onError(t);
                }

                @Override
                public void onComplete() {
                    child.onComplete();
                }
                
            });
        }, delay, unit);

Quite an expanded logic in respect of the original code, but for a good reason:

  1. We need a SubscriptionArbiter because we need cancellation support while the delayed subscription happens, however, we won't have any real Subscription from source yet so we need to hold onto any requests that may come in in the meantime.
  2. We need a way to cancel the schedule in case the child has decided to cancel() the whole operation. Since the arbiter doesn't have resource management, we need to wrap it into a disposable-container. Naturally, we don't really need a list for a single resource and you can implement your own, single resource disposable subscription.
  3. The container will take care of cancellation if we add the Worker instance.
  4. Once the arbiter+disposable are set up, we send the dsub to the child. This way, it can request() and cancel() at any time and both arbiter and dsub will forward the calls to the appropriate (re)sources.
  5. Once the scheduled action runs, we can't just hand the original child Subscriber to it because we already have set a Subscriber on it. Therefore, we need to wrap it with another Subscriber instance that mostly forwards the onXXX calls except onSubscribe.
  6. The onSubscribe needs to change and instead of setting the source Subscription on the child, we set it on the arbiter. The arbiter will then now relay any accumulated requests or cancel the source immediately.
Now you might think there is an error with the Subscriber (5) because in its onNext methods, it doesn't call sa.produced(1). This causes an accounting inconsistency indeed, but once a real Subscription is set on the arbiter, any future request(n) from child is relayed as-is to the upstream and no further call to setSubscription() will take place in this operator. The upstream receives the correct request amounts and the arbiter just keeps adding them up without having any effect on anything else. To be on the safe side, you can
  • still call sa.produced(1) or
  • implement an arbiter variant which allows only a single Subscription to be set and stops accumulating requests after it is set.

Conclusion

In this blog post, I've shown two cases where Subscription arbitration and resource cleanup has to be managed. Luckily, not all operators have to do these behaviors but the majority will, therefore, two specialized Subscriptions are required to handle any delay in a 'real' Subscription and/or the release of resources on cancel().

Since this is going to happen frequently, I'm sure RxJava 2.0 will provide standard and efficient implementations to help conform to the Reactive-Streams specification.

As for the RS API series conclusion, the minimal set of interfaces is quite capable of capturing the requirements of a typical dataflow. It is possible to convert RxJava idioms into RS idioms, but the infrastructure and conveniences have to be re-implemented from scratch. I've shown some basic Subscriptions such as SingeSubscription, SingleDelayedSubscription, SubscriptionArbiter and DisposableSubscription that, among other similar new classes will be the main tools to implement operators for RxJava 2.0.

In the next series, I'm going to cover the most controversial set of objects in the field of reactive programming and I'll not just talk about them in detail but show how to implement your own custom variant of the kind.

Nincsenek megjegyzések:

Megjegyzés küldése