Skip to content
Open
129 changes: 112 additions & 17 deletions src/DynamicData.Tests/Cache/DeadlockTortureTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ public sealed class DeadlockTortureTest
{
private const int ItemCount = 200;
private const int Iterations = 50;
private const int TimeoutSeconds = 15;
private const int TimeoutSeconds = 60;

private static async Task<bool> RunBidirectionalDeadlockTest(
Func<IObservable<IChangeSet<Person, string>>, IObservable<IChangeSet<Person, string>>> pipeline,
Action? subjectPusher = null,
int iterations = Iterations)
{
for (var iter = 0; iter < iterations; iter++)
Expand All @@ -44,11 +45,13 @@ private static async Task<bool> RunBidirectionalDeadlockTest(
using var aToB = pipeline(sourceA.Connect().Filter(x => x.Name.StartsWith("A"))).PopulateInto(sourceB);
using var bToA = pipeline(sourceB.Connect().Filter(x => x.Name.StartsWith("B"))).PopulateInto(sourceA);

using var barrier = new Barrier(2);
var participants = subjectPusher is null ? 2 : 3;
using var barrier = new Barrier(participants);
var taskA = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) sourceA.AddOrUpdate(new Person("A-" + iter + "-" + i, i)); });
var taskB = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) sourceB.AddOrUpdate(new Person("B-" + iter + "-" + i, i)); });
var taskC = subjectPusher is null ? null : Task.Run(() => { barrier.SignalAndWait(); subjectPusher(); });

var completed = Task.WhenAll(taskA, taskB);
var completed = taskC is null ? Task.WhenAll(taskA, taskB) : Task.WhenAll(taskA, taskB, taskC);
if (await Task.WhenAny(completed, Task.Delay(TimeSpan.FromSeconds(TimeoutSeconds))) != completed)
return false;
}
Expand All @@ -64,26 +67,84 @@ [Fact] public async Task AutoRefresh_DoesNotDeadlock() =>
[Fact] public async Task GroupOn_DoesNotDeadlock() =>
(await RunBidirectionalDeadlockTest(s => s.Group(p => p.Age % 3).MergeMany(g => g.Cache.Connect()))).Should().BeTrue();

[Fact] public async Task GroupWithImmutableState_DoesNotDeadlock() =>
(await RunBidirectionalDeadlockTest(s => s.GroupWithImmutableState(p => p.Age % 3).TransformMany(g => g.Items, p => p.UniqueKey))).Should().BeTrue();

[Fact] public async Task Page_DoesNotDeadlock()
{
using var req = new BehaviorSubject<IPageRequest>(new PageRequest(1, 50));
(await RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer<Person>.Ascending(p => p.Age)).Page(req))).Should().BeTrue();
(await RunBidirectionalDeadlockTest(
s => s.Sort(SortExpressionComparer<Person>.Ascending(p => p.Age)).Page(req),
subjectPusher: () => { for (var j = 0; j < ItemCount; j++) req.OnNext(new PageRequest(1 + (j % 4), 25 + (j % 4) * 25)); })).Should().BeTrue();
}

[Fact] public async Task SortAndPage_DoesNotDeadlock()
{
using var req = new BehaviorSubject<IPageRequest>(new PageRequest(1, 50));
(await RunBidirectionalDeadlockTest(
s => s.SortAndPage(SortExpressionComparer<Person>.Ascending(p => p.Age), req),
subjectPusher: () => { for (var j = 0; j < ItemCount; j++) req.OnNext(new PageRequest(1 + (j % 4), 25 + (j % 4) * 25)); })).Should().BeTrue();
}

[Fact] public async Task Virtualise_DoesNotDeadlock()
{
using var req = new BehaviorSubject<IVirtualRequest>(new VirtualRequest(0, 50));
(await RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer<Person>.Ascending(p => p.Age)).Virtualise(req))).Should().BeTrue();
(await RunBidirectionalDeadlockTest(
s => s.Sort(SortExpressionComparer<Person>.Ascending(p => p.Age)).Virtualise(req),
subjectPusher: () => { for (var j = 0; j < ItemCount; j++) req.OnNext(new VirtualRequest(j * 5, 25 + (j % 4) * 25)); })).Should().BeTrue();
Comment thread
dwcullop marked this conversation as resolved.
}

[Fact] public async Task SortAndVirtualize_DoesNotDeadlock()
{
using var req = new BehaviorSubject<IVirtualRequest>(new VirtualRequest(0, 50));
(await RunBidirectionalDeadlockTest(
s => s.SortAndVirtualize(SortExpressionComparer<Person>.Ascending(p => p.Age), req),
subjectPusher: () => { for (var j = 0; j < ItemCount; j++) req.OnNext(new VirtualRequest(j * 5, 25 + (j % 4) * 25)); })).Should().BeTrue();
}

[Fact] public async Task QueryWhenChanged_DoesNotDeadlock()
{
for (var iter = 0; iter < Iterations; iter++)
{
using var sourceA = new SourceCache<Person, string>(p => p.UniqueKey);
using var sourceB = new SourceCache<Person, string>(p => p.UniqueKey);

// QueryWhenChanged with an itemChangedTrigger exercises the Merge branch.
// A side-channel write into the other cache closes the same ABBA cycle that
// PopulateInto would close for changeset-shaped operators.
using var aToB = sourceA.Connect()
.Filter(p => p.Name.StartsWith("A"))
.QueryWhenChanged(p => p.WhenPropertyChanged(x => x.Age))
.Subscribe(_ => sourceB.AddOrUpdate(new Person("A-marker", 0)));
using var bToA = sourceB.Connect()
.Filter(p => p.Name.StartsWith("B"))
.QueryWhenChanged(p => p.WhenPropertyChanged(x => x.Age))
.Subscribe(_ => sourceA.AddOrUpdate(new Person("B-marker", 0)));

using var barrier = new Barrier(2);
var taskA = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) sourceA.AddOrUpdate(new Person("A-" + iter + "-" + i, i)); });
var taskB = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) sourceB.AddOrUpdate(new Person("B-" + iter + "-" + i, i)); });

var completed = Task.WhenAll(taskA, taskB);
(await Task.WhenAny(completed, Task.Delay(TimeSpan.FromSeconds(TimeoutSeconds)))).Should().BeSameAs(completed, "iteration " + iter);
}
}

[Fact] public async Task TransformWithForce_DoesNotDeadlock()
{
using var force = new Subject<Func<Person, string, bool>>();
(await RunBidirectionalDeadlockTest(s => s.Transform((p, k) => new Person("T-" + p.Name, p.Age), force))).Should().BeTrue();
(await RunBidirectionalDeadlockTest(
s => s.Transform((p, k) => new Person("T-" + p.Name, p.Age), force),
subjectPusher: () => { for (var j = 0; j < ItemCount; j++) force.OnNext(static (p, _) => true); })).Should().BeTrue();
}

[Fact] public async Task BatchIf_DoesNotDeadlock() =>
(await RunBidirectionalDeadlockTest(s => s.BatchIf(new BehaviorSubject<bool>(false), false, (TimeSpan?)null))).Should().BeTrue();
[Fact] public async Task BatchIf_DoesNotDeadlock()
{
using var pause = new BehaviorSubject<bool>(false);
(await RunBidirectionalDeadlockTest(
s => s.BatchIf(pause, false, (TimeSpan?)null),
subjectPusher: () => { for (var j = 0; j < ItemCount; j++) pause.OnNext(j % 2 == 0); })).Should().BeTrue();
}

[Fact] public async Task DisposeMany_DoesNotDeadlock() =>
(await RunBidirectionalDeadlockTest(s => s.DisposeMany())).Should().BeTrue();
Expand All @@ -94,31 +155,65 @@ [Fact] public async Task OnItemRemoved_DoesNotDeadlock() =>
[Fact] public async Task AllDangerous_Stacked_DoNotDeadlock()
{
using var pageReq = new BehaviorSubject<IPageRequest>(new PageRequest(1, 100));
using var virtReq = new BehaviorSubject<IVirtualRequest>(new VirtualRequest(0, 100));
using var force = new Subject<Func<Person, string, bool>>();
(await RunBidirectionalDeadlockTest(
s => s.AutoRefresh(p => p.Age)
s => s.GroupWithImmutableState(p => p.Age % 3)
.TransformMany(g => g.Items, p => p.UniqueKey)
.AutoRefresh(p => p.Age)
.Filter(p => p.Age >= 0)
.Transform((p, k) => new Person("X-" + p.Name, p.Age), force)
.OnItemRemoved(_ => { })
.DisposeMany()
.Sort(SortExpressionComparer<Person>.Ascending(p => p.Age))
.Virtualise(virtReq)
.Page(pageReq),
subjectPusher: () =>
{
for (var j = 0; j < ItemCount; j++)
{
force.OnNext(static (p, _) => true);
pageReq.OnNext(new PageRequest(1 + (j % 4), 50 + (j % 4) * 50));
virtReq.OnNext(new VirtualRequest(j * 5, 50 + (j % 4) * 50));
}
},
iterations: Iterations * 2)).Should().BeTrue();
}

[Fact] public async Task MultiplePairs_Simultaneous_NoDeadlock()
{
using var pageReq = new BehaviorSubject<IPageRequest>(new PageRequest(1, 50));
using var pageReq2 = new BehaviorSubject<IPageRequest>(new PageRequest(1, 50));
using var virtReq = new BehaviorSubject<IVirtualRequest>(new VirtualRequest(0, 50));
using var virtReq2 = new BehaviorSubject<IVirtualRequest>(new VirtualRequest(0, 50));
using var pause = new BehaviorSubject<bool>(false);
var results = await Task.WhenAll(
RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer<Person>.Ascending(p => p.Age)), 30),
RunBidirectionalDeadlockTest(s => s.AutoRefresh(p => p.Age), 30),
RunBidirectionalDeadlockTest(s => s.Group(p => p.Age % 3).MergeMany(g => g.Cache.Connect()), 30),
RunBidirectionalDeadlockTest(s => s.OnItemRemoved(_ => { }), 30),
RunBidirectionalDeadlockTest(s => s.DisposeMany(), 30),
RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer<Person>.Ascending(p => p.Age)).Page(pageReq), 30),
RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer<Person>.Ascending(p => p.Age)).Virtualise(virtReq), 30),
RunBidirectionalDeadlockTest(s => s.BatchIf(new BehaviorSubject<bool>(false), false, (TimeSpan?)null), 30));
RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer<Person>.Ascending(p => p.Age)), iterations: 30),
RunBidirectionalDeadlockTest(s => s.AutoRefresh(p => p.Age), iterations: 30),
RunBidirectionalDeadlockTest(s => s.Group(p => p.Age % 3).MergeMany(g => g.Cache.Connect()), iterations: 30),
RunBidirectionalDeadlockTest(s => s.GroupWithImmutableState(p => p.Age % 3).TransformMany(g => g.Items, p => p.UniqueKey), iterations: 30),
RunBidirectionalDeadlockTest(s => s.OnItemRemoved(_ => { }), iterations: 30),
RunBidirectionalDeadlockTest(s => s.DisposeMany(), iterations: 30),
RunBidirectionalDeadlockTest(
s => s.Sort(SortExpressionComparer<Person>.Ascending(p => p.Age)).Page(pageReq),
subjectPusher: () => { for (var j = 0; j < ItemCount; j++) pageReq.OnNext(new PageRequest(1 + (j % 4), 25 + (j % 4) * 25)); },
iterations: 30),
RunBidirectionalDeadlockTest(
s => s.SortAndPage(SortExpressionComparer<Person>.Ascending(p => p.Age), pageReq2),
subjectPusher: () => { for (var j = 0; j < ItemCount; j++) pageReq2.OnNext(new PageRequest(1 + (j % 4), 25 + (j % 4) * 25)); },
iterations: 30),
RunBidirectionalDeadlockTest(
s => s.Sort(SortExpressionComparer<Person>.Ascending(p => p.Age)).Virtualise(virtReq),
subjectPusher: () => { for (var j = 0; j < ItemCount; j++) virtReq.OnNext(new VirtualRequest(j * 5, 25 + (j % 4) * 25)); },
iterations: 30),
RunBidirectionalDeadlockTest(
s => s.SortAndVirtualize(SortExpressionComparer<Person>.Ascending(p => p.Age), virtReq2),
subjectPusher: () => { for (var j = 0; j < ItemCount; j++) virtReq2.OnNext(new VirtualRequest(j * 5, 25 + (j % 4) * 25)); },
iterations: 30),
RunBidirectionalDeadlockTest(
s => s.BatchIf(pause, false, (TimeSpan?)null),
subjectPusher: () => { for (var j = 0; j < ItemCount; j++) pause.OnNext(j % 2 == 0); },
iterations: 30));
results.Should().AllSatisfy(r => r.Should().BeTrue());
}

Expand Down
Loading
Loading