Skip to content

Commit b39d0c4

Browse files
authored
Remove maxWait parameter from QueueClient and Workflow.Message methods (#122)
1 parent c990303 commit b39d0c4

13 files changed

Lines changed: 93 additions & 155 deletions

File tree

Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/RFunctionTests/MessagingTests.cs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,6 @@ public override Task FunctionIsSuspendedWhenAwaitedMessageDoesNotAlreadyExist()
1818
FunctionStoreFactory.Create()
1919
);
2020

21-
[TestMethod]
22-
public override Task TimeoutEventCausesSuspendedFunctionToBeReInvoked()
23-
=> TimeoutEventCausesSuspendedFunctionToBeReInvoked(
24-
FunctionStoreFactory.Create()
25-
);
26-
2721
[TestMethod]
2822
public override Task ScheduleInvocationWithPublishResultToSpecifiedFunctionId()
2923
=> ScheduleInvocationWithPublishResultToSpecifiedFunctionId(FunctionStoreFactory.Create());

Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesSubscriptionTests.cs

Lines changed: 22 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ protected async Task QueueClientCanPullSingleMessage(Task<IFunctionStore> functi
8484

8585
var rFunc = functionsRegistry.RegisterFunc(
8686
nameof(QueueClientCanPullSingleMessage),
87-
inner: (string _, Workflow workflow) => workflow.Message<string>(maxWait: TimeSpan.FromMinutes(1))
87+
inner: (string _, Workflow workflow) => workflow.Message<string>()
8888
);
8989

9090
var scheduled = await rFunc.Schedule("instanceId", "");
@@ -114,11 +114,11 @@ protected async Task QueueClientCanPullMultipleMessages(Task<IFunctionStore> fun
114114
{
115115
storedId = workflow.StoredId;
116116

117-
var message1 = await workflow.Message<string>(maxWait: TimeSpan.FromMinutes(1));
117+
var message1 = await workflow.Message<string>();
118118
await workflow.Delay(TimeSpan.FromMilliseconds(100));
119-
var message2 = await workflow.Message<string>(maxWait: TimeSpan.FromMinutes(1));
119+
var message2 = await workflow.Message<string>();
120120
await workflow.Delay(TimeSpan.FromMilliseconds(100));
121-
var message3 = await workflow.Message<string>(maxWait: TimeSpan.FromMinutes(1));
121+
var message3 = await workflow.Message<string>();
122122
await workflow.Delay(TimeSpan.FromMilliseconds(100));
123123

124124
return $"{message1},{message2},{message3}";
@@ -155,7 +155,7 @@ protected async Task QueueClientReturnsNullAfterTimeout(Task<IFunctionStore> fun
155155
nameof(QueueClientReturnsNullAfterTimeout),
156156
inner: async Task<string?> (string _, Workflow workflow) =>
157157
{
158-
var message = await workflow.Message<string>(TimeSpan.FromMilliseconds(100), maxWait: TimeSpan.FromMinutes(1));
158+
var message = await workflow.Message<string>(TimeSpan.FromMilliseconds(100));
159159
return message;
160160
}
161161
);
@@ -196,7 +196,7 @@ protected async Task QueueClientPullsFiveMessagesAndTimesOutOnSixth(Task<IFuncti
196196

197197
for (var i = 0; i < 6; i++)
198198
{
199-
var message = await workflow.Message<string>(TimeSpan.FromMilliseconds(250), maxWait: TimeSpan.FromMinutes(1));
199+
var message = await workflow.Message<string>(TimeSpan.FromMilliseconds(250));
200200
messages.Add(message ?? "NULL");
201201
}
202202

@@ -242,8 +242,8 @@ protected async Task OnlyFirstMessageWithSameIdempotencyKeyIsDeliveredAndBothAre
242242
{
243243
storedId = workflow.StoredId;
244244

245-
var message1 = await workflow.Message<string>(maxWait: TimeSpan.FromMinutes(1));
246-
var message2 = await workflow.Message<string>(TimeSpan.FromSeconds(1), maxWait: TimeSpan.FromMinutes(1));
245+
var message1 = await workflow.Message<string>();
246+
var message2 = await workflow.Message<string>(TimeSpan.FromSeconds(1));
247247

248248
return Tuple.Create(message1, message2);
249249
}
@@ -297,8 +297,7 @@ protected async Task MultipleIterationsWithDuplicateIdempotencyKeysProcessCorrec
297297
while (message != "stop")
298298
{
299299
message = await workflow.Message<string>(
300-
TimeSpan.FromMilliseconds(100),
301-
maxWait: TimeSpan.FromMinutes(1)
300+
TimeSpan.FromMilliseconds(100)
302301
);
303302

304303
if (message is null)
@@ -364,18 +363,15 @@ protected async Task QueueClientFilterParameterFiltersMessages(Task<IFunctionSto
364363
{
365364
// Pull only messages that start with "even-"
366365
var message1 = await workflow.Message<string>(
367-
m => m.StartsWith("even-"),
368-
maxWait: TimeSpan.FromMinutes(1)
366+
m => m.StartsWith("even-")
369367
);
370368

371369
var message2 = await workflow.Message<string>(
372-
m => m.StartsWith("even-"),
373-
maxWait: TimeSpan.FromMinutes(1)
370+
m => m.StartsWith("even-")
374371
);
375372

376373
var message3 = await workflow.Message<string>(
377-
m => m.StartsWith("even-"),
378-
maxWait: TimeSpan.FromMinutes(1)
374+
m => m.StartsWith("even-")
379375
);
380376

381377
return $"{message1},{message2},{message3}";
@@ -408,17 +404,17 @@ protected async Task QueueClientWorksWithCustomSerializer(Task<IFunctionStore> f
408404
// Use default serializer to ensure serialization works correctly
409405
using var functionsRegistry = new FunctionsRegistry(
410406
functionStore,
411-
new Settings(unhandledExceptionCatcher.Catch)
407+
new Settings(unhandledExceptionCatcher.Catch, messagesDefaultMaxWaitForCompletion: TimeSpan.FromMinutes(1))
412408
);
413409

414410
var rFunc = functionsRegistry.RegisterFunc(
415411
nameof(QueueClientWorksWithCustomSerializer),
416412
inner: async Task<string> (string _, Workflow workflow) =>
417413
{
418414
// Pull different types of messages to verify serialization works
419-
var message1 = await workflow.Message<string>(maxWait: TimeSpan.FromSeconds(5));
420-
var message2 = await workflow.Message<WrappedInt>(maxWait: TimeSpan.FromSeconds(5));
421-
var message3 = await workflow.Message<TestRecord>(maxWait: TimeSpan.FromSeconds(5));
415+
var message1 = await workflow.Message<string>();
416+
var message2 = await workflow.Message<WrappedInt>();
417+
var message3 = await workflow.Message<TestRecord>();
422418

423419
return $"{message1},{message2.Value},{message3.Value}";
424420
}
@@ -452,7 +448,7 @@ protected async Task BatchedMessagesAreDeliveredToMultipleFlows(Task<IFunctionSt
452448

453449
var rFunc = functionsRegistry.RegisterFunc(
454450
nameof(BatchedMessagesAreDeliveredToMultipleFlows),
455-
inner: (string _, Workflow workflow) => workflow.Message<string>(maxWait: TimeSpan.FromMinutes(1))
451+
inner: (string _, Workflow workflow) => workflow.Message<string>()
456452
);
457453

458454
// Send batched messages first
@@ -503,7 +499,7 @@ protected async Task QueueClientSupportsMultiFlowMessageExchange(Task<IFunctionS
503499
for (var i = 0; i < 10; i++)
504500
{
505501
await pongRegistration!.SendMessage("Pong", new Ping(i), idempotencyKey: $"Pong{i}");
506-
await workflow.Message<Pong>(pong => pong.Number == i, maxWait: TimeSpan.FromMinutes(1));
502+
await workflow.Message<Pong>(pong => pong.Number == i);
507503
}
508504

509505
return "completed";
@@ -516,7 +512,7 @@ protected async Task QueueClientSupportsMultiFlowMessageExchange(Task<IFunctionS
516512
{
517513
for (var i = 0; i < 10; i++)
518514
{
519-
await workflow.Message<Ping>(ping => ping.Number == i, maxWait: TimeSpan.FromMinutes(1));
515+
await workflow.Message<Ping>(ping => ping.Number == i);
520516
await pingRegistration!.SendMessage("Ping", new Pong(i), idempotencyKey: $"Ping{i}");
521517
}
522518

@@ -580,8 +576,7 @@ protected async Task QueueManagerFailsOnMessageDeserializationError(Task<IFuncti
580576

581577
var message = await queueClient.Pull<GoodMessage>(
582578
workflow,
583-
workflow.Effect.CreateNextImplicitId(),
584-
maxWait: TimeSpan.FromMinutes(1)
579+
workflow.Effect.CreateNextImplicitId()
585580
);
586581

587582
return message.Value;
@@ -646,8 +641,7 @@ protected async Task RegisteredTimeoutIsRemovedWhenPullingMessage(Task<IFunction
646641
var message = await queueClient.Pull<string>(
647642
workflow,
648643
workflow.Effect.CreateNextImplicitId(),
649-
timeout: TimeSpan.FromMinutes(5),
650-
maxWait: TimeSpan.FromMinutes(1)
644+
timeout: TimeSpan.FromMinutes(5)
651645
);
652646

653647
// Verify timeout is removed after successful pull
@@ -702,8 +696,7 @@ protected async Task PullEnvelopeReturnsEnvelopeWithReceiverAndSender(Task<IFunc
702696
var envelope = await queueClient.PullEnvelope<string>(
703697
workflow,
704698
workflow.Effect.CreateNextImplicitId(),
705-
filter: _ => true,
706-
maxWait: TimeSpan.FromMinutes(1)
699+
filter: _ => true
707700
);
708701

709702
return $"{envelope.Message}|{envelope.Receiver}|{envelope.Sender}";

Core/Cleipnir.ResilientFunctions.Tests/Messaging/TestTemplates/MessagesTests.cs

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ protected async Task MessagesSunshineScenario(Task<IFunctionStore> functionStore
4848
);
4949

5050
queueClient = await queueManager.CreateQueueClient();
51-
var message = await queueClient.Pull<string>(workflow, workflow.Effect.CreateNextImplicitId(), maxWait: TimeSpan.FromMinutes(1));
51+
var message = await queueClient.Pull<string>(workflow, workflow.Effect.CreateNextImplicitId());
5252

5353
return message;
5454
}
@@ -100,7 +100,7 @@ protected async Task QueueClientReturnsNullAfterTimeout(Task<IFunctionStore> fun
100100
var flowsManager = new FlowsManager(() => DateTime.UtcNow);
101101
flowsManager.AddFlow(workflow.StoredId, () => { }, queueManager, flowTimeouts);
102102
var queueClient = await queueManager.CreateQueueClient();
103-
var message = await queueClient.Pull<string>(workflow, workflow.Effect.CreateNextImplicitId(), TimeSpan.FromMilliseconds(100), maxWait: TimeSpan.FromMinutes(1));
103+
var message = await queueClient.Pull<string>(workflow, workflow.Effect.CreateNextImplicitId(), TimeSpan.FromMilliseconds(100));
104104

105105
return message;
106106
}
@@ -151,8 +151,7 @@ protected async Task MessagesFirstOfTypesReturnsNoneForFirstOfTypesOnTimeout(Tas
151151
workflow,
152152
workflow.Effect.CreateNextImplicitId(),
153153
TimeSpan.Zero,
154-
filter: m => m is string or int,
155-
maxWait: TimeSpan.FromMinutes(1)
154+
filter: m => m is string or int
156155
);
157156

158157
return message == null ? "NONE" : message.ToString()!;
@@ -200,8 +199,7 @@ protected async Task MessagesFirstOfTypesReturnsFirstForFirstOfTypesOnFirst(Task
200199
var message = await queueClient.Pull<object>(
201200
workflow,
202201
workflow.Effect.CreateNextImplicitId(),
203-
filter: m => m is string or int,
204-
maxWait: TimeSpan.FromMinutes(1)
202+
filter: m => m is string or int
205203
);
206204

207205
return message!.ToString()!;
@@ -250,8 +248,7 @@ protected async Task MessagesFirstOfTypesReturnsSecondForFirstOfTypesOnSecond(Ta
250248
var queueClient = await queueManager.CreateQueueClient();
251249
var message = await queueClient.Pull<string>(
252250
workflow,
253-
workflow.Effect.CreateNextImplicitId(),
254-
maxWait: TimeSpan.FromMinutes(1)
251+
workflow.Effect.CreateNextImplicitId()
255252
);
256253

257254
return message;
@@ -300,8 +297,8 @@ protected async Task SecondEventWithExistingIdempotencyKeyIsIgnored(Task<IFuncti
300297
);
301298

302299
var queueClient = await queueManager.CreateQueueClient();
303-
var message1 = await queueClient.Pull<string>(workflow, workflow.Effect.CreateNextImplicitId(), maxWait: TimeSpan.FromMinutes(1));
304-
var message2 = await queueClient.Pull<string>(workflow, workflow.Effect.CreateNextImplicitId(), maxWait: TimeSpan.FromMinutes(1));
300+
var message1 = await queueClient.Pull<string>(workflow, workflow.Effect.CreateNextImplicitId());
301+
var message2 = await queueClient.Pull<string>(workflow, workflow.Effect.CreateNextImplicitId());
305302

306303
return Tuple.Create(message1, message2);
307304
}
@@ -354,8 +351,8 @@ protected async Task QueueClientCanPullMultipleMessages(Task<IFunctionStore> fun
354351

355352
var queueClient = await queueManager.CreateQueueClient();
356353

357-
var message1 = await queueClient.Pull<string>(workflow, workflow.Effect.CreateNextImplicitId(), maxWait: TimeSpan.FromMinutes(1));
358-
var message2 = await queueClient.Pull<string>(workflow, workflow.Effect.CreateNextImplicitId(), maxWait: TimeSpan.FromMinutes(1));
354+
var message1 = await queueClient.Pull<string>(workflow, workflow.Effect.CreateNextImplicitId());
355+
var message2 = await queueClient.Pull<string>(workflow, workflow.Effect.CreateNextImplicitId());
359356

360357
return $"{message1},{message2}";
361358
}
@@ -399,15 +396,15 @@ async Task (workflow) =>
399396
unhandledExceptionHandler,
400397
new FlowTimeouts(),
401398
() => DateTime.UtcNow,
402-
SettingsWithDefaults.Default,
399+
SettingsWithDefaults.Default with { MessagesDefaultMaxWaitForCompletion = TimeSpan.FromMinutes(1) },
403400
new FlowsManager(() => DateTime.UtcNow)
404401
);
405402

406403
var queueClient = await queueManager.CreateQueueClient();
407404
lock (queueClients)
408405
queueClients[workflow.FlowId.Instance.Value] = queueClient;
409406

410-
await queueClient.Pull<string>(workflow, workflow.Effect.CreateNextImplicitId(), maxWait: TimeSpan.FromMinutes(1));
407+
await queueClient.Pull<string>(workflow, workflow.Effect.CreateNextImplicitId());
411408
}
412409
);
413410

@@ -470,7 +467,7 @@ async Task (workflow) =>
470467

471468
while (true)
472469
{
473-
var message = await queueClient.Pull<object>(workflow, workflow.Effect.CreateNextImplicitId(), maxWait: TimeSpan.FromSeconds(10));
470+
var message = await queueClient.Pull<object>(workflow, workflow.Effect.CreateNextImplicitId());
474471
if (message is string s)
475472
await workflow.Effect.Capture(() => messages.Add(s));
476473
else
@@ -536,7 +533,7 @@ async Task (workflow) =>
536533
for (var i = 0; i < 10; i++)
537534
{
538535
await pongRegistration.SendMessage("Pong", new Ping(i), idempotencyKey: $"Pong{i}");
539-
await queueClient.Pull<Pong>(workflow, workflow.Effect.CreateNextImplicitId(), filter: pong => pong.Number == i, maxWait: TimeSpan.FromMinutes(1));
536+
await queueClient.Pull<Pong>(workflow, workflow.Effect.CreateNextImplicitId(), filter: pong => pong.Number == i);
540537
}
541538
});
542539

@@ -562,7 +559,7 @@ async Task (workflow) =>
562559

563560
for (var i = 0; i < 10; i++)
564561
{
565-
await queueClient.Pull<Ping>(workflow, workflow.Effect.CreateNextImplicitId(), filter: ping => ping.Number == i, maxWait: TimeSpan.FromMinutes(1));
562+
await queueClient.Pull<Ping>(workflow, workflow.Effect.CreateNextImplicitId(), filter: ping => ping.Number == i);
566563
await pingRegistration.SendMessage("Ping", new Pong(i), idempotencyKey: $"Ping{i}");
567564
}
568565
});

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/ControlPanelTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -620,7 +620,7 @@ async Task(string param, Workflow workflow) =>
620620
{
621621
for (var i = 0; i < 2; i++)
622622
{
623-
var msg = await workflow.Message<string>(maxWait: TimeSpan.FromSeconds(10));
623+
var msg = await workflow.Message<string>();
624624
syncedList.Add(msg);
625625
}
626626

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/MessagingTests.cs

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -70,40 +70,6 @@ await Should.ThrowAsync<InvocationSuspendedException>(() =>
7070
unhandledExceptionHandler.ShouldNotHaveExceptions();
7171
}
7272

73-
public abstract Task TimeoutEventCausesSuspendedFunctionToBeReInvoked();
74-
public async Task TimeoutEventCausesSuspendedFunctionToBeReInvoked(Task<IFunctionStore> functionStore)
75-
{
76-
var store = await functionStore;
77-
78-
var functionId = new FlowId(nameof(TimeoutEventCausesSuspendedFunctionToBeReInvoked),"instanceId");
79-
var unhandledExceptionHandler = new UnhandledExceptionCatcher();
80-
using var functionsRegistry = new FunctionsRegistry(store, new Settings(unhandledExceptionHandler.Catch));
81-
82-
var tomorrow = DateTime.UtcNow.AddDays(1);
83-
var rFunc = functionsRegistry.RegisterFunc(
84-
functionId.Type,
85-
inner: async Task<string?> (string _, Workflow workflow)
86-
=> await workflow.Message<string>(waitUntil: tomorrow)
87-
);
88-
89-
await rFunc.Schedule(functionId.Instance.Value, param: "");
90-
91-
var controlPanel = await rFunc.ControlPanel("instanceId");
92-
controlPanel.ShouldNotBeNull();
93-
94-
await BusyWait.Until(async () =>
95-
{
96-
await controlPanel.Refresh();
97-
return controlPanel.Status == Status.Postponed;
98-
}, maxWait: TimeSpan.FromSeconds(10));
99-
100-
await controlPanel.Refresh();
101-
controlPanel.Status.ShouldBe(Status.Postponed);
102-
controlPanel.PostponedUntil.ShouldBe(tomorrow);
103-
104-
unhandledExceptionHandler.ShouldNotHaveExceptions();
105-
}
106-
10773
public abstract Task ScheduleInvocationWithPublishResultToSpecifiedFunctionId();
10874
public async Task ScheduleInvocationWithPublishResultToSpecifiedFunctionId(Task<IFunctionStore> functionStore)
10975
{

0 commit comments

Comments
 (0)