2015. október 7., szerda

Operator internals: All, Any, Exists

Introduction

The operator all checks if a given condition (predicate) holds for all elements of the upstream, emitting a single true value at the end, or emits false immediately if the predicate returns false. The operator any is its logical inverse and looks quite like all, except it returns immediately if the predicate returns true and returns false for empty upstreams. They can and do support backpressure.

We need to consider the following properties/requirements with this operator:


  • Since the output is a single value, one doesn't need to play around with request accounting and can let the operator request Long.MAX_VALUE from upstream. This gives the added benefit that it may trigger a fast-path and thus run with reduced overhead.
  • Since the output is a single value, emitted even if the upstream is empty, one has to prepare for handling request amounts from downstream and only then emit the result.

Implementation: 1.x

The 1.x implementation is straightforward. The Subscriber requests Long.MAX_VALUE and uses the SingleDelayedProducer to delay the emission of the resulting boolean until the downstream actually requests.

Since backpressure handling is optional in 1.x, one can't emit false in case the predicate returns false in onNext because only the SingleDelayedProducer knows if there was actually an request call or not.


Implementation: 2.x

The 2.x implementation is a bit longer because I chose to inline the behavior of the SingleDelayedProducer and thus saving on allocation costs.

The backpressure requirement still holds but with one exception: since the call to onSubscribe is mandatory, onNext is only ever called if there was a request to it. Therefore, the operator has to insert itself between the upstream and the downstream request-wise.

Failing the predicate in onNext no longer requires buffering of the value but can be simply emit directly (because we know there was at least a request(1) beforehand). An empty upstream, however, still requires "buffering" the result until a request comes along. The related state machine is quite similar to the one described in an earlier post. The notable difference is that we know the delayed emission will always emit true thus no need for an instance variable holding it until needed.

It is worth looking at the onNext() method in AllSubscriber:


@Override
public void onNext(T t) {
    if (done) {                             // (1)
        return;
    }
    boolean b;
    try {
         b = predicate.test(t);
    } catch (Throwable e) {                 // (2)
         lazySet(HAS_REQUEST_HAS_VALUE);
         done = true;
         s.cancel();
         actual.onError(e);
         return;
    }
    if (!b) {
        lazySet(HAS_REQUEST_HAS_VALUE);     // (3)
        done = true;
        s.cancel();
        actual.onNext(false);
        actual.onComplete();
    }
}


  1. Cancellation is best effort in both Reactive-Streams and 1.x Observables and one can't rely upon the cancellation alone. The done flag drops all events after the termination/cancellation of the operator.
  2. Callbacks can crash, in which case we set the state-machine to its terminal value HAS_REQUEST_HAS_VALUE which should prevent any value emission in a request call. In addition we set the done flag and call cancel on the Subscription.
  3. If the predicate returned false, we can shortcut the stream by cancelling it and emitting the constant false as the result. Here, the state machine is also brought to its terminal state.


Conclusion

The all and any operators is among the simpler operators, 2/10 maybe, but one needs to recognize an empty upstream would overflow the downstream in a naive implementation and thus there is a need for the SingleDelayedProducer to bridge the gap.



Nincsenek megjegyzések:

Megjegyzés küldése