Skip to content

Eliminate Rx Merge gate in queue-serialized operators#1097

Open
dwcullop wants to merge 1 commit into
reactivemarbles:mainfrom
dwcullop:fix/operator-merge-gate-deadlock
Open

Eliminate Rx Merge gate in queue-serialized operators#1097
dwcullop wants to merge 1 commit into
reactivemarbles:mainfrom
dwcullop:fix/operator-merge-gate-deadlock

Conversation

@dwcullop
Copy link
Copy Markdown
Member

Problem

PR #1079 moved cross-cache operators from Synchronize(lock) to SynchronizeSafe, which routes every notification through a SharedDeliveryQueue that releases the lock before invoking downstream observers. The goal was that no operator-level lock would ever be held across a cross-cache call, so two operators on a bidirectional pipeline could not form an ABBA cycle.

Six operators completed the queue routing but then combined their already-serialized inputs with Observable.Merge before delivery:

  • Page
  • Virtualise
  • AutoRefresh
  • Sort (the conditional branch when comparerChangedObservable or resorter is present)
  • GroupOnImmutable
  • QueryWhenChanged (the itemChangedTrigger branch)

Rx's Observable.Merge installs a private _gate and holds it for the full duration of every downstream OnNext. When downstream delivery walks into another cache's writer lock, the two Merge gates on the two operators reconstruct the ABBA cycle that the queue-drain design was supposed to eliminate.

DeadlockTortureTest.Page_DoesNotDeadlock (added in #1079) caught this for Page as an intermittent CI failure. The other five operators have the same latent bug; the existing torture test does not exercise their merge branches with cross-cache writes.

Fix

Add IObservable<T>.UnsynchronizedMerge, a drop-in alternative to Observable.Merge that performs no synchronization of its own. It preserves Merge's terminal semantics (completes only after every source completes; first error terminates; subscription happens in argument order) but does not install a gate.

UnsynchronizedMerge is safe to use only when every input is already serialized. In this library that precondition is satisfied by routing each input through the same SharedDeliveryQueue via SynchronizeSafe(queue) before the merge. The queue's drain loop guarantees that at most one notification is in flight to the shared observer at a time, so the additional gate that Observable.Merge would install is redundant.

All six operators above are rewritten to use UnsynchronizedMerge. The pattern at every call site is unchanged except for the method name:

// Before
request.Merge(dataChange).Where(...).SubscribeSafe(observer)

// After
request.UnsynchronizedMerge(dataChange).Where(...).SubscribeSafe(observer)

Sort's three-source case becomes a single UnsynchronizedMerge call with two params arguments instead of nested .Merge().Merge(), which removes one of the two gates the chained form created.

Why FullJoin is not changed

FullJoin uses the same Merge syntax but its two inputs come from leftCache.Connect() and rightCache.Connect() on independently materialized AsObservableCache() stages that share no queue. There, the Merge gate is the only thing serializing the two cache deliveries before they mutate joinedCache. Removing it without alternative serialization would race the joined cache. FullJoin is left alone.

Test coverage

DeadlockTortureTest is expanded so the same fixture catches a future regression in any of the six operators:

  • New [Fact] GroupWithImmutableState_DoesNotDeadlock.
  • New [Fact] QueryWhenChanged_DoesNotDeadlock — uses a side-channel .Subscribe(_ => otherCache.AddOrUpdate(...)) to close the ABBA cycle, since QueryWhenChanged does not produce a changeset that PopulateInto can consume.
  • AllDangerous_Stacked_DoNotDeadlock now stacks GroupWithImmutableState and Virtualise into the existing kitchen-sink pipeline.
  • MultiplePairs_Simultaneous_NoDeadlock gains a GroupWithImmutableState lane.

Operators with their own existing standalone tests in the fixture (AutoRefresh, Page, Sort, Virtualise) are already covered.

Verification

  • DeadlockTortureTest fixture: 14/14 pass at xUnit.MaxParallelThreads=16, 10 consecutive runs, zero failures.
  • Targeted unit tests for Sort + Virtualise + Page + AutoRefresh + Group* + QueryWhenChanged: 422/422 pass.
  • Full test suite at xUnit.MaxParallelThreads=4: 2321 passed, 0 failed, 1 skipped.

reactivemarbles#1079 moved cross-cache operators from Synchronize(lock) to SynchronizeSafe,
which routes deliveries through a SharedDeliveryQueue that releases the lock
before invoking downstream observers. The intent was to make the lock no
longer held across cross-cache calls, so two operators on a bidirectional
pipeline could not form an ABBA cycle.

Six operators (Page, Virtualise, AutoRefresh, Sort, GroupOnImmutable, and
QueryWhenChanged) routed every input through the queue but then combined the
inputs with Observable.Merge before delivery. Rx's Merge installs its own
private gate and holds it for the full duration of every downstream OnNext.
When downstream delivery walks into another cache's writer lock, the two
Merge gates on the two operators reconstruct the ABBA cycle that the queue-
drain design was supposed to eliminate. DeadlockTortureTest.Page_DoesNotDeadlock
caught this for Page; the other five had the same latent bug.

This adds IObservable<T>.UnsynchronizedMerge, a drop-in alternative to
Observable.Merge that performs no synchronization of its own. It is safe to
use only when every input is already serialized (in this library, by routing
through the same SharedDeliveryQueue). All six operators now use it.

Sort's three-source case becomes a single UnsynchronizedMerge call instead of
nested .Merge().Merge(), removing one of the two gates that the chained form
created.

FullJoin uses the same Merge syntax but its two inputs come from independently
materialized AsObservableCache().Connect() streams that share no queue. The
Merge gate is the only thing serializing them; this PR leaves FullJoin alone.

DeadlockTortureTest grows three new cases (GroupWithImmutableState, QueryWhenChanged,
and Virtualise added to the stacked + multi-pair scenarios) so a future regression
in any of the six operators is caught by the existing torture fixture.

Verified: 14/14 DeadlockTortureTest pass at MaxParallelThreads=16 across 10
iterations; 422/422 Sort/Virtualise/Page/AutoRefresh/Group/QueryWhenChanged
unit tests pass; full Cache + List suite passes (2321 passed, 1 skipped).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant