Introduction
In the previous part, I've introduced the concepts around operator fusion. In this post, I'll detail the API and protocols required make operator fusion happen.
In its current form, operator fusion works between two subsequent operators and is based on the ability to identify each other and, in case of micro-fusion, switch to a different protocol than Reactive-Streams (RS) if both agree.
Macro-fusion constructs
The primary targets of macro-fusion are the single element sources:
just(),
empty(),
fromCallable(). Firing up the complete RS infrastructure for such single elements is quite expensive, but half of the API use in RxJava and Reactor come from these. Therefore, RxJava introduced
Single and Reactor introduced
Mono to help as much as possible and offer (ever increasingly) optimized operators on them.
However, knowing a source will generate 0 or 1 element during assembly time is also a great help in regular
Observable /
Flux uses. In addition, knowing the source is also a constant helps inlining it in via some custom operator.
Creating 0 or 1 element synchronous sources
To indicate a source returns a single value, the
Reactive-Streams-Commons (Rsc) project (and Reactor off it) established a contract:
If a Publisher implements java.util.concurrent.Callable, it is considered a 0 or 1 element source.
You can implement
Callable and return a non-
null value that can be computed synchronously. You can also return
null which indicates an empty result. (Remember, RS doesn't allow
null values over
onNext.) The call to
call() will happen during subscription time.
public class MySingleSource implements Publisher<Object>, Callable<Object> {
@Override
public void subscribe(Subscriber<? super Object> s) {
s.onSubscribe(new ScalarSubscription<>(s, System.currentTimeMillis()));
}
@Override
public Object call() throws Exception {
return System.currentTimeMillis();
}
}
If the 0 or 1 element source is known to be constant, the source can be the subject of assembly time optimizations. For example, if it returns
null, indicating emptiness (like
empty()), there are only a handful of operators that can be applied to it (which don't work on items) and the assembly process can just return
empty().
We can extend Callable with a new interface ScalarCallable to indicate a 0 or 1 element constant source.
public interface ScalarCallable<T> extends Callable<T> {
@Override
T call();
}
By extending
Callable, any use places who expects a 0 or 1 element dynamic source can work with a constant source. The reverse is not true; those expecting a constant source won't execute an arbitrary
Callable (which could block or trigger side-effects) during assembly time:
public class MyScalarSource implements Publisher<Object>, ScalarCallable<Object> {
@Override
public void subscribe(Subscriber<? super Object> s) {
s.onSubscribe(new ScalarSubscription<>(s, 1));
}
@Override
public Object call() {
return 1;
}
}
Note that the
ScalarCallable overrides the
call() method and removes the throws Exception clause: scalar constants should not throw for one and consumers should not need to wrap the
call() into a try-catch.
Consuming 0 or 1 element synchronous sources
Consuming
Callable and
ScalarCallable is a matter of
instanceof checks performed either in subscription time or assembly time respectively, followed by the extraction of the single value through
call().
For example, a macro-fusion on the operator
count() could check for a scalar value and return a constant 0 for an empty or 1 for a single value:
public final Flux<Long> count() {
if (this instanceof ScalarCallable) {
T value = ((ScalarCallable<T>)this).call();
return just(value == null ? 0 : 1);
}
return new FluxCount<>(this);
}
Another example is to have a shortcut in
flatMap(),
concatMap() or
switchMap() for 0 or 1 element sources. In this case, there is no need to run the full infrastructure but just subscribe to the Publisher returned by their mapping function.
Note that since the mapper function can side-effect itself, one can't use assembly-time optimization on them and a new source operator has to be introduced.
public final <R> Px<R> flatMap(
Function<? super T, ? extends Publisher<? extends R>> mapper) {
if (this instanceof Callable) {
return new PublisherCallableMap<((Callable<T>)this, mapper);
}
return new PublisherFlatMap<>(this, mapper, ...);
}
(Remark:
Px stands for
Publisher Extensions in Rsc and is the base type for Rsc's fluent API - more of a convenience in tests and perf to avoid spelling out all those
PubliserXXX classes than a fully fledged API entry point.)
public final class PublisherCallableMap<T, R> implements Publisher<R> {
final Callable<? extends T> source;
final Function<? super T, ? extends Publisher<? extends T>> mapper;
public PublisherCallableMap(
Callable<? extends T> source,
Function<? super T, ? extends Publisher<? extends T>> mapper) {
this.source = source;
this.mapper = mapper;
}
@Override
public void subscribe(Subscriber<? super R> s) {
T value;
try {
value = source.call(); // (1)
} catch (Throwable ex) {
ExceptionHelper.throwIfFatal(ex);
EmptySubscription.error(s, ex);
return;
}
if (value == null) {
EmptySubscription.complete(s);
return;
}
Publisher<? extends R> p;
try {
p = mapper.apply(value); // (2)
} catch (Throwable ex) {
ExceptionHelper.throwIfFatal(ex);
EmptySubscription.error(s, ex);
return;
}
if (p == null) {
EmptySubscription.error(s,
new NullPointerException("The mapper returned null");
return;
}
if (p instanceof Callable) { // (3)
R result;
try {
result = ((Callable<R>)p).call();
} catch (Throwable ex) {
ExceptionHelper.throwIfFatal(ex);
EmptySubscription.error(s, ex);
return;
}
if (result == null) {
EmptySubscription.complete(s);
return;
}
s.onSubscribe(new ScalarSubscription<>(s, result));
return;
}
p.subscribe(s);
}
}
First (1), we extract the single value from the underlying
Callable instance. If it is
null, we complete the
Subscriber immediately. Otherwise, we call the mapper that returns a
Publisher (2). Since this publisher could be also a
Callable, we do the extraction again (3) and either complete or set a backpressure-enabled
ScalarSubscription on the
Subscriber. Because
call() can throw, we catch any exception, signal the fatal ones in some library-specific way and signal non-fatal exceptions to the
Subscriber as well (plus setting its
Subscription at the right time).
Caution with Callable
Since
Callable is an established interface, one must be careful with implementors of
Publisher and
Callable where functionally, the callable means something different than a shortcut to a 0 or 1 element.
My hope is that since RS is relatively new and only a few people have actually implemented operators with it, we can avoid any pitfalls related to this combined interface approach.
Micro-fusion constructs
Unlike macro-fusion, micro-fusion requires a protocol switch between two subsequent operators; instead of using the standard RS method calls, some or all of them gets replaced by other method calls. This allows sharing internal structures or state between the two.
In theory, in a pair of operators, the upstream operator can be the initiator and work with the internals of the downstream operator. In practice, so far, we implemented fusion the other way around: the downstream operator works with the internals of the upstream operator.
However, going for a full custom interaction is not advised because that may lead to a complete custom implementation and duplication of a lot of code. (That being said, unfortunately,
ConditionalSubscriber requires code duplication to avoid casting.)
Currently, Rsc and Reactor can do two kinds of micro-fusion: conditional and queue-based. On a second dimension, we can think 3 kinds of operators:
- sources that support fusion (range(), UnicastProcessor)
- intermediate operators that may support fusion (concatMap, observeOn, groupBy, window)
- front fusion (concatMap)
- back fusion (groupBy)
- transitive fusion (map, filter)
- consumers (flatMap inner, zip)
The third dimension appears with queue-based fusion where the source can be synchronous (i.e., fromArray) or asynchronous (UnicastProcessor).
Conditional micro-fusion
The conditional micro-fusion ability is indicated by an interface: ConditionalSubscriber extending Subscriber with one extra method:
public interface ConditionalSubscriber<T> extends Subscriber<T> {
boolean onNextIf(T t);
}
If a source or intermediate operator sees that its consumer is a ConditionalSubscriber it may call the onNextIf method. (By nature, this means a synchronous execution and response, thus conditional fusion is for synchronous cases only.)
If the method returns true, the value has been consumed as usual. If the method returns false, it means the value was dropped and a new value can be sent immediately. This avoids a request(1) call for a replenishment in filter and other operators as well.
Sidenote: You may ask, why is this important? A call to request() usually ends up in an atomic CAS, costing 21-45 cylces for each dropped element.
To work with ConditionalSubscribers in source operators, you may have to first switch on the incoming Subscriber's type and do a different implementation to avoid casting the downstream Subscriber all the time.
@Override
public void subscribe(Subscriber<? super Integer> s) {
if (s instanceof ConditionalSubscriber) {
s.onSubscribe(new RangeConditionalSubscription<>(
(ConditionalSubscriber<T>)s, start, count));
} else {
s.onSubscribe(new RangeSubscription<>(s, start, count);
}
}
The implementation can then can use the
onNextIf method during emissions. For example, the fast-path
can be rewritten as follows:
for (long i = start; i < (long)start + count; i++) {
if (cancelled) {
return;
}
s.onNextIf((int)i);
}
if (!cancelled) {
s.onComplete();
}
You may think, why call
onNextIf if we don't care about the return value? For composition reasons. Even though this path in
range() doesn't need the return value, but if the downstream is also calling
onNextIf further down, this can avoid a whole chain of unnecessary
request(1) calls.
The slow path is more interesting in this regard:
long i = index;
long end = (long)start + count;
long r = requested;
long e = 0L;
while (i != end && e != r) {
if (cancelled) {
return;
}
if (s.onNextIf((int)i)) {
e++;
}
i++;
}
if (i == end) {
if (!cancelled) {
s.onComplete();
}
return;
}
if (e != 0L) {
index = i;
REQUESTED.addAndGet(this, REQUESTED, -e);
}
In the while loop, if the
onNextIf returns false, we don't increment the emission count which means the next integer value can come immediately. If a downstream consumer requests only 1 and then drops all values, the loop can exhaust the available integers and not call the atomic
addAndGet even once.
Since filter is one of the most common operators in a chain, one should be prepared to work with
ConditionalSubscriber even if one doesn't interfere with the number of events flowing through. For example,
map() and filter appear together and it is advised
map() also supports conditional fusion by switching on the
Subscriber's type just like above and using a
ConditionalSubscriber-based
Subscriber:
static final class MapConditionalSubscriber<T, R> implements ConditionalSubscriber<T> {
final ConditionalSubscriber<? super R> actual;
final Function<? super T, ? extends R> mapper;
boolean done;
Subscription s;
// ...
@Override
public boolean onNextIf(T t) {
if (done) {
return;
}
R v;
try {
v = mapper.apply(t);
} catch (Throwable ex) {
ExceptionHelper.throwIfFatal(ex);
s.cancel();
onError(ex);
return;
}
if (v == null) {
s.cancel();
onError(new NullPointerException("..."));
return;
}
return actual.onNextIf(v);
}
// ...
}
The final case for the conditional micro-fusion is the "terminal" operator or consumer implementation. Luckily, usually doesn't have to provide two implementations, on
ConditionalSubscriber and one
Subscriber, but have them together. Those who can work with the
ConditionalSubscriber part will do, others will just use the regular
Subscriber methods:
static final FilterSubscriber<T> implements ConditionalSubscriber<T> {
final Subscriber<? super T> actual;
final Predicate<? super T> predicate;
boolean done;
Subscription s;
// ...
@Override
public void onNext(T t) {
if (!onNextIf(t)) {
s.request(1);
}
}
@Override
public boolean onNextIf(T t) {
if (done) {
return;
}
boolean pass;
try {
pass = predicate.test(t);
} catch (Throwable ex) {
ExceptionHelper.throwIfFatal(ex);
s.cancel();
onError(ex);
}
if (pass) {
actual.onNext(t);
return true;
}
return false;
}
// ...
}
In conclusion, conditional micro-fusion is a relatively simple but sometimes verbose way of avoiding
request(1) calls and the resulting per-item overhead.
Queue-based micro-fusion
Believe me if I tell, this is the most complicated thing, so far, in the reactive landscape. Not because it requires complicated structures or algorithms, but for the implications towards operators and the combinatoric-explosion nature of what happens if op1 is followed by op2 and how they can or can't fuse.
The queue-based micro-fusion is built upon the idea that many operators employ a queue to work out backpressure-related or asynchrony-related cases when notifying the downstream and happen to face their queue towards each other.
For example,
UnicastProcessor has a backend-queue that holds values until the downstream requests them whereas
concatMap has a front-queue that holds the source values to be mapped into
Publishers. When subscribed, a value goes from one queue into the other, forming a dequeue-enqueue pair without anything functional between the two other than the atomics overhead of request management and wip-accounting.
Clearly, if we could somehow use a single queue between the two and somehow decrease the atomics overhead via it, we'd have a much lower overhead in terms of computation and memory usage.
However, what if there is an operator between the two that does something with the values? What if the fusion shouldn't happen in this case?
To solve this coordination problem, we can reuse the onSubscribe(Subscription) rail in RS and extend the protocol. Enter QueueSubscription.
public interface QueueSubscription<T> extends Queue<T>, Subscription {
int NONE = 0;
int SYNC = 1;
int ASYNC = 2;
int ANY = SYNC | ASYNC;
int THREAD_BOUNDARY = 4;
int requestFusion(int mode);
@Override
default boolean offer(T t) {
throw new UnsupportedOperationException();
}
// ...
}
The
QueueSubscription is a combination of
Queue and
Subscription interfaces, adding a new
requestFusion() method, and other than keeping the following methods from the base interfaces, all other methods are defaulted to
UnsupportedOperationException as we won't need them. (Java 7- note: yes you may have to manually do this for classes that can't extend a base class.):
- void request(long n);
- void cancel()
- T poll()
- boolean isEmpty()
- void clear();
(Some libraries may choose to implement size() as well, for diagnostic purposes.)
When a source supports queue-based fusion, it can send a
QueueSubscription implementation through
onSubscribe. Those who can deal with it can act on it, the rest will simply see it as a regular
Subscription.
The idea is that those who can deal with it can use it as a
Queue instead of instantiating their own, saving on allocation and overhead at the same time. In addition, a source such as
range() can itself pretend to be a queue, returning the next value through
poll() or null if no more integers remain.
Since there are cases where fusion can't or should not happen, we need to perform a protocol switch during the subscription phase of a flow. This switch can be requested via the
requestFusion() method, that takes and returns the constants from the interface.
(Sidenote: I know enums would be more readable, but EnumSet has a nice additional overhead you know...)
As an input, it can take:
- SYNC - indicates the consumer wants to work with a synchronous upstream, with often known length
- ASYNC - indicates the consumer wants to work with an asynchronous upstream with often unknown length and emission timing
- ANY - indicates a consumer can work with both SYNC and ASYNC upstream
- (SYNC, ASYNC) | THREAD_BOUNDARY - indicates that the consumer goes over a thread boundary and poll() happens on some other thread.
It can return:
- NONE - fusion can't happen/rejected
- SYNC - synchronous fusion mode activated
- ASYNC - asynchronous fusion mode activated
If the upstream is unable to work in the requested mode or is sensitive to thread-boundary effects, it can return NONE. In this case, the flow behaves just like the regular, non-fused RS stream would. (Note that conditional fusion is still may be an option.)
Because fusion is optional, a successfully negotiated mode requires different mode of execution in either or both parties. In addition, this mode switch has to happen before any events fly through the chain, therefore, onSubscribe is an ideal place for it due to the underlying RS protocol spec.
Both SYNC and ASYNC modes have extra rules implementors must adhere.
In SYNC mode, consumers should never call request() and producers should never return null from poll() unless they mean completion. Since the only interaction between the two are through poll() and isEmpty(), sources have no opportunity to call onError but must throw a runtime exception from these two methods. On the other side, consumers now have to wrap these methods into try-catches and handle/unwrap exceptions there.
In ASYNC mode, the producer enqueues events in its own queue and has to signal the availability to the consumer. The best way for this is through onNext. One can either signal the value itself or null - the only place where you can do this. On the consumer side, the ASYNC mode onNext now has meaningless value and should be ignored. The other methods, onError, onComplete, request and cancel should be used as in regular RS cases. In this mode, poll() can return null indicating a temporary lack of values; the termination will be indicated by onError and onComplete as usual.
Implementing fusion-enabled sources
Now let's see the API in action. First, let's make range() fusion enabled:
static final class RangeSubscription extends QueueSubscription<Integer> {
// ... the Subscription part is the same
@Override
public Integer poll() {
long i = index;
if (i == (long)start + count) {
return null;
}
index = i + 1;
return (int)i;
}
@Override
public boolean isEmpty() {
return index == (long)start + count;
}
@Override
public void clear() {
index = (long)start + count;
}
@Override
public int requestFusion(int mode) {
return SYNC;
}
}
No sign of request accounting whatsoever because range() works in synchronous pull mode; consumer does backpressure by calling poll() when it needs a new value.
UnicastProcessor (which is somewhat like
onBackpressureBuffer()) can support fusion in
ASYNC mode specifically:
public final class UnicastProcessor<T> implements Processor<T, T>, QueueSubscription<T> {
volatile Subscriber<? super T> actual;
final Queue<T> queue;
int mode;
// ...
@Override
public void onNext(T t) {
Subscriber<? super T> a = actual;
if (mode == ASYNC && a != null) {
a.onNext(null);
} else {
queue.offer(t);
drain();
}
}
@Override
public int requestFusion(int m) {
if ((m & ASYNC) != 0) {
mode = ASYNC;
return ASYNC;
}
return NONE;
}
@Override
public T poll() {
return queue.poll();
}
@Override
public boolean isEmpty() {
return queue.isEmpty();
}
@Override
public void clear() {
queue.clear();
}
@Override
public void subscribe(Subscriber<? super T> s) {
if (ONCE.compareAndSet(this, 0, 1)) {
s.onSubscribe(this);
actual = s;
if (cancelled) {
actual = null;
} else {
if (mode != NONE) {
if (done) {
if (error != null) {
s.onError(error);
} else {
s.onComplete();
}
} else {
s.onNext(null);
}
} else {
drain();
}
}
} else {
EmptySubscription.error(s, new IllegalStateException("..."));
}
}
}
The fusion mode requires the following behavior changes:
- onNext has to call actual.onNext instead of drain(),
- requestFusion has to see if the downstream actually wants ASYNC fusion,
- the queue methods have to be delegated to the instance queue,
- the subscribe() has to call actual.onNext instead of drain() as well.
Doesn't look too complicated, does it? At this point, you can check your understanding of supporting fusion through an exercise: can
UnicastProcessor support
SYNC fusion and if so, when and how; if not, why not?
Implementing fusion-enabled intermediate operators
In practice, usually there are some intermediate operators between a fuseable source and a fusion-enabled consumer. Unfortunately, this can break the fusion (and thus reverting to the classical RS) mode or worse, the data may skip the intermediate operator altogether, causing all sorts of failures.
The latter manifests itself when an operator forwards the Subscription it received via its onSubscribe method. Now imagine if map() does this; what would be the output of the following sequence:
range(0, 10).map(v -> v + 1).concatMap(v -> just(v)).subscribe(System.out::println);
In a classical flow, you'd get values 1 through 10 printed to the console. If both range() and concatMap() do fusion but map() forwards its Subscription, the surprising output is 0 through 9! This can affect any operator.
The solution is to require all operators that don't want to participate in fusion to never forward the upstream's subscriber verbatim. A possible manifestation of this rule is to implement Subscription on yourself:
static final class MapSubscriber<T, R> implements Subscriber<T>, Subscription {
// ...
@Override
public void onSubscribe(Subscription s) {
this.s = s;
actual.onSubscribe(this);
}
@Override
public void request(long n) {
s.request(n);
}
@Override
public void cancel() {
s.cancel();
}
// ...
}
In practice, many operators that either manipulate requests or cancellation does this so the indirection is an acceptable trade-off for the benefit of a lower overhead dataflow in general.
This rule, unfortunately affects cross-library behavior. Even though other libraries may not speak the same fusion protocol, they could end up forwarding
Subscriptions, thus if you go into and out of some other library, the same problem may appear again. Generally, libraries supposed to have a method
hide() or
asObservable() to hide the identity of a source as well as preventing the propagation of unwanted internal features.
Luckily,
map() can participate in the fusion: it only has to be fuseable itself, mediate the
requestFusion between its upstream and downstream, plus place itself at the exit point:
poll().
static final class MapSubscriber<T, R> implements Subscriber<T>, QueueSubscription<R> {
final Subscriber<? super R> actual;
final Function<? super T, ? extends R> mapper;
QueueSubscription<T> qs;
Subscription s;
int mode;
// ...
@Override
public void onSubscribe(Subscription s) {
this.s = s;
if (s instanceof QueueSubscription) {
qs = (QueueSubscription<T>)s;
}
actual.onSubscribe(this);
}
@Override
public void onNext(T t) {
if (mode == NONE) {
// error handling omitted for brevity
actual.onNext(mapper.apply(t));
} else {
actual.onNext(null);
}
}
@Override
public int requestFusion(int m) {
if (qs == null || (m & THREAD_BOUNDARY) != 0) {
return NONE;
}
int u = qs.requestFusion(m);
mode = u;
return u;
}
@Override
public R poll() {
T t = qs.poll();
if (t == null) {
return null;
}
return mapper.apply(t);
}
@Override
public boolean isEmpty() {
return qs.isEmpty();
}
@Override
public void clear() {
qs.clear();
}
}
The operator
map() can implement
QueueSubscription itself and have a field for the potential upstream's
QueueSubscription as well. In
requestFusion, if the upstream does support fusion and the downstream isn't a boundary, the request is forwarded to upstream; rejected otherwise.
Now
poll() can't just forward to the upstream because the types are different. Here comes the mapper function that is applied to the upstream's value. Note that
null indicates termination or temporary lack of values and should not be mapped.
The main reason
THREAD_BOUNDARY was introduced as a flag is due to
map(), or in a more broader sense: the restriction on where user-supplied computations happen. In fusion mode, the execution of the mapper function happens on the exit side of the queue, which could be in some other thread. Now imagine you have a heavy computation in map which would run off the main thread before reaching an
observeOn. When unfused, the result of the computation would be queued up in
observeOn, then dequeued on the target thread (let's say the main thread). However, if fusion is allowed, the target thread is doing the
poll() and now the heavy calculation runs on the main thread.
The operator
filter() can be implemented in a similar fashion, but our old
request(1) comes back unfortunately:
static final class FilterSubscriber<T> implements Subscriber<T>, QueueSubscription<T> {
// ...
@Override
public T poll() {
for (;;) {
T v = qs.poll();
if (v == null || cancelled) {
return null;
}
if (predicate.test(v)) {
return v;
}
if (mode == ASYNC) {
qs.request(1);
}
}
}
@Override
public boolean isEmpty() {
return qs.isEmpty();
}
// ...
}
Since
filter() drops values, we need to loop in
poll() until the
predicate matches or no more upstream values are available for some reason. If the
predicate doesn't match, we have to replenish our
ASYNC source (remember, you are not supposed to call
request() in sync mode!).
Implementing fusion-enabled consumers
Generally, operator fusion is not very useful (or really happens) with end-subscribers, such as your favorite
Subscriber subclass or with
subscribe(System.out::println).
The consumers I'm talking about can be considered intermediate operators as well, but since all operators are basically custom
Subscribers that are subscribed to the upstream, they are consumers as well.
As I mentioned, many operators feature some internal queue on their front side (e.g.,
concatMap,
observeOn) or when they consume some inner Publisher (i.e.,
flatMap, zip). These are the primary consumers and drivers of the fusion lifecycle.
Now that
we are familiar with how
observeOn is implemented, let's see how can we enable fusion with it:
static final class ObserveOnSubscriber<T> implements Subscriber<T>, Subscription {
Queue<T> queue;
int mode;
Subscription s;
// ...
@Override
public void onSubscribe(Subscription s) {
this.s = s;
if (s instanceof QueueSubscription) {
QueueSubscription<T> qs = (QueueSubscription<T>)s;
int m = qs.requestFusion(QueueSubscription.ANY
| QueueSubscription.THREAD_BOUNDARY);
if (m == QueueSubscription.SYNC) {
q = qs;
mode = m;
done = true;
actual.onSubscribe(this);
return;
}
if (m == QueueSubscription.ASYNC) {
q = qs;
mode = m;
actual.onSubscribe(this);
s.request(prefetch);
return;
}
}
queue = new SpscArrayQueue<>(prefetch);
actual.onSubscribe(this);
s.request(prefetch);
}
@Override
public void onNext(T t) {
if (mode == QueueSubscription.NONE) {
queue.offer(t);
}
drain();
}
void drain() {
// ...
if (mode != QueueSubscription.SYNC) {
request(p);
}
// ...
}
// ...
}
Enabling fusion has two implications: 1)
queue can no longer be final but has to be created in
onSubscribe, 2)
onNext should not offer if fusion is enabled.
The fusion mode is requested in the
onSubscribe after identifying the upstream as
QueueSubscription. Since the algorithm inside
drain() only sees the
Queue interface and doesn't particularly care when values are available in the queue, we request the
ANY mode from upstream in addition to indicating the consumer is also a
THREAD_BOUNDARY. This should prevent the
poll() side to change the location of some user-defined function unexpectedly.
If
SYNC mode is granted, we assign the
QueueSubscription to our
queue and call
onSubscribe on the downstream
Subscriber. In this mode, the prefetch amount is not requested in accordance with the synchronous fusion protocol. The big win in
SYNC mode is the fact that if
poll() returns
null, that is an indication of termination. We already exploit this in the standard queue-drain algorithm: if the
done flag is set and the queue reports
null/empty, we have completed. Note however, that we have to adjust the drain algorithm a bit because we can't call
request in
SYNC mode anymore.
If
ASYNC mode is granted, we store the
queue again, but can't set the
done flag as we don't know when the upstream finishes -
poll() returning
null is just the indication of unavailability of values at the time. In addition, once the downstream
Subscriber is notified, we still have to signal a
prefetch-request to upstream, so it can trigger its own sources even further up.
Note that once
requestFusion returns
SYNC or
ASYNC, there is no going back (you may try to call
requestFusion() again which may change the mode, but that's undefined behavior at the moment; it may be forbidden entirely in the future), definitely not after elements have been delivered already in any mode.
General warnings around micro-fusion
In my experience, some of my colleagues tend to become enthusiastic about micro-fusion; they want to apply it everywhere. Whenever an operator has any queue, they see fusion happening.
I must warn against such relentlessness because fusion has some requirements, implications and generally subject to cost-benefit trade-offs:
- If an operator is a thread boundary, my current understanding is that you can't fuse both its front and back side at the same time.
- Fusion can shift computations in time and sometimes in location (even without an explicit boundary).
- The fact an operator has a queue doesn't mean it can be exposed/replaced. A good example of this is combineLatest: my current understanding is that the post-processing of the queue elements makes this infeasible for back side fusion. Another example is flatMap where I'm not convinced the collector logic can be integrated into a poll()/isEmpty() back-side fusion.
- Some sources, such as 0 or 1 are likely not worth it and are better off with macro-fusions.
- Fusion is an extra behavior which also can be buggy or in fact, hide a bug on the regular path (i.e., groupBy) and requires extra care. In addition, it increases the test method count because now you have to test with and without fusion (see hide()).
To cheer you up, there is a great counter-example operator that supports full fusion: front and back side at the same time:
flattenIterable, or as you may know it,
concatMapIterable/
flatMapIterable.
Conclusion
In this post, I've detailed the structures and protocols of operator fusion and shown some examples how it can be utilized in source, intermediate and terminal operators.
Since operator fusion is an active research area, I can't say these are all that can happen and we are eager to hear about interesting chains of operators where fusion can happen, or in contrast, were fusion should not happen. See the
Rsc repository for examples of all kinds of fusions.
In addition, I hope these fusion protocols will be standardized and be part of
Reactive-Streams 2.0, allowing a full, cross-library efficient operation that maintains fusion as long as possible.
My next topic will be to finish up the series about
ConnectableObservables.