Introduction
The amb operator, shorthand for ambiguous, subscribes to a set of source Observables and keeps relaying events from the first Observable that signaled any event while unsubscribing and ignoring the rest. The operator can and does support backpressure.
From the operator building's perspective, we need to consider the following properties/requirements:
- The number of source Observables is known when the downstream subscribes.
- We need to track all subscriptions in a collection other than CompositeSubscription because there is no way to cherry pick one and unsubscribe the rest.
- One has to relay the downstream request to all sources.
- Unsubscription, requesting and even the choosing of a "winner" may happen while subscribing to the source Observables.
The two major versions are implemented slightly differently.
Implementation: 1.x
The 1.x implementation is slightly more verbose. It uses a simple ConcurrentLinkedQueue to keep track of the subscribers, AmbSubscriber. In addition, the winner AmbSubscriber is kept in an AtomicReference.
In case an unsubscription happens, we need to attach a callback to the child subscriber which when called, will loop through the collection of AmbSubscribers and unsubscribes them one by one.
When the subscription happens, a loop goes through all available sources, instantiated an AmbSubscriber and subscribes. Since unsubscription can happen at any time or any previous source may have already won, the loop has to check for both condition and quit early.
Once all sources have been subscribed, the child subscriber receives its Producer. The task of this producer is to dispatch all requests to every AmbSubscriber, or in case of a winner, dispatch the request only to that particular AmbSubscriber.
It may seem odd, but if there is a winner before the Producer is set, there is no need to set the Producer because the winner has obviously ignored any backpressure and started emitting anyways.
In the AmbSubscriber, any event fired will check if the current AmbSubscriber is the winner or not. If so, the event is relayed. Otherwise, an atomic dance happens where the AmbSubscriber tries to CAS itself into the winning position and if successful, it unsubscribes the others. If the CAS failed, the AmbSubscriber unsubscribes itself.
Implementation: 2.x
The 2.x implementation is less verbose and exploits the fact that the number of source Observables is known. Therefore, an array of AmbInnerSubscriber is used and the "winner" indicator is now an volatile integer field backed by a field updater.
When the child subscribes, a loop first creates every AmbInnerSubscriber, sets a custom Subscription on the child (which is the coordinator class itself) and then subscribes to each source Observable. This second loop also checks for a winner in the process.
The winner field has multiple meanings depending on the state of the operator. Minus one indicates the child cancelled, zero means there is no winner yet and any positive number indicates the index plus 1 of the winner AmbInnerSubscriber.
In the reactive-streams world, there is an increased likelihood a Subscription arrives later than any request or cancellation attempt, therefore, one has to be prepared for it. Therefore, AmbInnerSubscriber has to keep its Subscription in a volatile field plus it has to track all the missed requests in another. This pattern is so common with 2.x, it is worth detailing it here:
class AmbSubscriber<T> extends AtomicReference<Subscription> implements Subscriber<T>, Subscription { volatile long missedRequested; static final AtomicLongFieldUpdater MISSED_REQUESTED = ...; static final Subscription CANCELLED = ...; }
The class implements Subscriber, naturally, and Subscription for convenience (so we have request() and cancel() to implement). The class also has a static final field holding an empty implementation of the Subscription interface. We will use this instance to indicate a cancelled state and also notify any late-coming request() or onSubscribe() to do nothing. By extending AtomicReference directly, we will keep the incoming Subscription in a (hidden) instance field and access it via atomic methods of this.
Let's see the implementation of the cancel() method first:
This atomic getAndSet() should look familiar by now. When called, if the current subscription is not the constant CANCELLED, we getAndSet it to cancelled. The atomicity guarantees that there will be only one thread that experiences a non-CANCELLED previous state in which case we call cancel on it. Note that cancel() may be called before onSubscribe thus the current subscription may be null.
Let's see the implementation of the cancel() method first:
@Override public void cancel() { Subscription s = get(); if (s != CANCELLED) { s = getAndSet(CANCELLED); if (s != CANCELLED && s != null) { s.cancel(); } } }
This atomic getAndSet() should look familiar by now. When called, if the current subscription is not the constant CANCELLED, we getAndSet it to cancelled. The atomicity guarantees that there will be only one thread that experiences a non-CANCELLED previous state in which case we call cancel on it. Note that cancel() may be called before onSubscribe thus the current subscription may be null.
Next, let's see the onSubscribe() method:
@Override public void onSubscribe(Subscription s) { if (!compareAndSet(null, s)) { // (1) s.cancel(); // (2) if (get() != CANCELLED) { // (3) SubscriptionHelper.reportSubscriptionSet(); } return; } long r = MISSED_REQUESTED.getAndSet(this, 0L); // (4) if (r != 0L) { // (5) s.request(r); } }
- First, we try to CAS in the incoming Subscription and replace a null value.
- If there is already a Subscription, we cancel the incoming one in any case.
- It is possible, although unlikely, multiple calls to onSubscribe happens due to bogous source. If the current value isn't the cancelled indicator, we have to report the incident in some way and just quit.
- If the CAS succeeded, we now have to take all missed requested amount via getAndSet.
- If there were in fact missed requests, we do request that amount from the Subscription at hand.
Finally, let's look at the request() method:
@Override public void request(long n) { Subscription s = get(); if (s != null) { // (1) s.request(n); } else { BackpressureHelper.add(MISSED_REQUESTED, this, n); // (2) s = get(); if (s != null && s != CANCELLED) { // (3) long r = MISSED_REQUESTED.getAndSet(this, 0L); // (4) if (r != 0L) { // (5) s.request(r); } } } }
- When the request is called, first we check if the current Subscription isn't null. If so, we request the amount directly. The current Subscription might be the CANCELLED instance in which case this call is a no-op.
- We use the backpressure-helper routine to safely add the number to the missedRequested field (which caps at Long.MAX_VALUE). 2.x Bug: validation of n is missing here.
- Once the missed amount has been added, we need to check the Subscription again since it might have been set asynchronously.
- If not null and not cancelled, we call getAndSet the missed amount. This makes sure the missed amount is either retrieved by this method or by the onSubscribe method atomically.
- If the missed amount is non-zero, we request it from the Subscription. Otherwise, the onSubscribe has already taken any missed value for us.
The other onXXX methods of the AmbInnerSubscriber work similarly to the 1.x version. There is a local won field (no need for volatile) that if set, serves as a fast-path for delivering events. If it is false, there is an attempt to win the race and if won, the won field is set to true. Otherwise the AmbInnerSubscriber cancels the Subscription at hand (which shouldn't be null at this point as RS requires calling onSubscribe before any other onXXX methods).
2.x Bug: If the AmbSubscriber wins, it doesn't cancel the other AmbSubscribers and thus they remain subscribed indefinitely.
Conclusion
The operator amb isn't a complicated operator, 5/10 maybe, but it requires some custom logic to deal with unsubscription/cancellation and request dispatching.While reviewing the 2.x implementation, I found two oversights that can be easily addressed via a PR.
Nincsenek megjegyzések:
Megjegyzés küldése