Skip to content

Commit 58981a5

Browse files
committed
Refactor QueueManager to use Monitor-like pulse/wait pattern for message delivery
1 parent fc2aa65 commit 58981a5

3 files changed

Lines changed: 66 additions & 131 deletions

File tree

Core/Cleipnir.ResilientFunctions/CoreRuntime/FlowsManager.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public void Interrupt(IEnumerable<StoredId> ids)
6767

6868
var queueManager = _dict[id].QueueManager;
6969
InterruptThreads(id);
70-
Task.Run(() => queueManager.FetchAndTryToDeliver());
70+
Task.Run(() => queueManager.FetchMessagesOnce());
7171
}
7272
}
7373
}

Core/Cleipnir.ResilientFunctions/Queuing/QueueClient.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ public Task<Envelope> PullEnvelope<T>(Workflow workflow, EffectId parentId, Func
5757
if (!effect.Contains(messageId))
5858
{
5959
var result = await queueManager.Subscribe(
60-
messageId,
6160
envelope => filter?.Invoke(envelope) ?? true,
6261
timeout,
6362
timeoutId,

Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs

Lines changed: 65 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -28,22 +28,21 @@ public class QueueManager(
2828
TimeSpan? maxIdempotencyKeyTtl = null)
2929
: IDisposable
3030
{
31-
private readonly Dictionary<EffectId, Subscription> _subscribers = new();
3231
private readonly Lock _lock = new();
33-
32+
3433
private readonly EffectId _toRemoveNextIndex = new([-1, 0]);
3534
private readonly EffectId _idempotencyKeysId = new([-1, -1]);
3635
private readonly List<MessageData> _toDeliver = new();
3736
private readonly HashSet<long> _fetchedPositions = new();
3837

3938
private IdempotencyKeys? _idempotencyKeys;
4039
private int _nextToRemoveIndex = 0;
41-
private readonly SemaphoreSlim _deliverySemaphore = new(1, 1);
4240
private readonly SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(1);
4341
private bool _initialized = false;
4442
private volatile bool _disposed;
45-
43+
4644
private volatile Exception? _thrownException = null;
45+
private TaskCompletionSource _pulse = new(TaskCreationOptions.RunContinuationsAsynchronously);
4746

4847
private async Task Initialize()
4948
{
@@ -99,19 +98,13 @@ public async Task FetchMessagesOnce()
9998
{
10099
if (_disposed)
101100
throw new ObjectDisposedException($"{nameof(QueueManager)} is disposed already");
102-
101+
103102
await _semaphoreSlim.WaitAsync();
104103
try
105104
{
106105
if (_thrownException != null)
107-
{
108-
foreach (var (_, subscription) in _subscribers)
109-
subscription.Tcs.TrySetException(_thrownException);
110-
_subscribers.Clear();
111-
112106
return;
113-
}
114-
107+
115108
List<long> skipPositions;
116109
lock (_lock)
117110
skipPositions = _fetchedPositions.ToList();
@@ -153,33 +146,17 @@ public async Task FetchMessagesOnce()
153146
{
154147
unhandledExceptionHandler.Invoke(flowId.Type, e);
155148
_thrownException = e;
156-
157-
lock (_lock)
158-
{
159-
foreach (var (_, subscription) in _subscribers)
160-
subscription.Tcs.TrySetException(_thrownException);
161-
_subscribers.Clear();
162-
}
163-
164149
return;
165150
}
166151
}
167-
168-
if (messages.Any())
169-
_ = DeliverMessages();
170152
}
171153
finally
172154
{
155+
PulseAll();
173156
_semaphoreSlim.Release();
174157
}
175158
}
176159

177-
public async Task FetchAndTryToDeliver()
178-
{
179-
await FetchMessagesOnce();
180-
await DeliverMessages();
181-
}
182-
183160
public async Task FetchMessages()
184161
{
185162
while (!_disposed && _thrownException == null)
@@ -228,84 +205,20 @@ public async Task AfterFlush()
228205
}
229206
}
230207

231-
private async Task DeliverMessages()
208+
private void PulseAll()
232209
{
233-
await _deliverySemaphore.WaitAsync();
234-
try
235-
{
236-
while (true)
237-
{
238-
List<MessageData> messages;
239-
List<KeyValuePair<EffectId, Subscription>> subscribers;
240-
lock (_lock)
241-
{
242-
messages = _toDeliver.ToList();
243-
subscribers = _subscribers.ToList();
244-
}
245-
246-
var delivered = false;
247-
foreach (var envelopeWithPosition in messages)
248-
{
249-
if (delivered) break;
250-
251-
foreach (var idAndSubscription in subscribers)
252-
{
253-
var (effectId, subscription) = idAndSubscription;
254-
if (subscription.Predicate(envelopeWithPosition.Envelope))
255-
{
256-
int positionToRemoveIndex;
257-
lock (_lock)
258-
{
259-
if (!_subscribers.ContainsKey(effectId)) //might have been removed by timeout
260-
continue;
261-
262-
_toDeliver.Remove(envelopeWithPosition);
263-
_subscribers.Remove(effectId);
264-
positionToRemoveIndex = _nextToRemoveIndex++;
265-
}
266-
267-
var toRemoveId = new EffectId([-1, 0, positionToRemoveIndex]);
268-
await effect.Upserts(
269-
new List<EffectResult>(
270-
[
271-
new EffectResult(_toRemoveNextIndex, positionToRemoveIndex, Alias: null),
272-
new EffectResult(toRemoveId, envelopeWithPosition.Position, Alias: null),
273-
new EffectResult(subscription.MessageContentId, envelopeWithPosition.MessageContentBytes, Alias: null),
274-
new EffectResult(subscription.MessageTypeId, envelopeWithPosition.MessageTypeBytes, Alias: null),
275-
new EffectResult(subscription.ReceiverId, envelopeWithPosition.Receiver, Alias: null),
276-
new EffectResult(subscription.SenderId, envelopeWithPosition.Sender, Alias: null),
277-
]).Concat(envelopeWithPosition.IdempotencyKeyResult == null
278-
? []
279-
: [envelopeWithPosition.IdempotencyKeyResult]),
280-
flush: false
281-
);
282-
283-
subscription.Tcs.SetResult(envelopeWithPosition.Envelope);
284-
285-
delivered = true;
286-
break;
287-
}
288-
}
289-
}
290-
291-
if (!delivered)
292-
break;
293-
}
294-
}
295-
catch (Exception e)
296-
{
297-
unhandledExceptionHandler.Invoke(flowId.Type, e);
298-
}
299-
finally
210+
TaskCompletionSource oldPulse;
211+
lock (_lock)
300212
{
301-
_deliverySemaphore.Release();
213+
oldPulse = _pulse;
214+
_pulse = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
302215
}
216+
oldPulse.TrySetResult();
303217
}
304-
218+
305219
public async Task<Envelope?> Subscribe(
306-
EffectId effectId,
307-
MessagePredicate predicate,
308-
DateTime? timeout,
220+
MessagePredicate predicate,
221+
DateTime? timeout,
309222
EffectId timeoutId,
310223
EffectId messageId,
311224
EffectId messageTypeId,
@@ -316,37 +229,69 @@ await effect.Upserts(
316229
if (_thrownException != null)
317230
throw _thrownException;
318231

319-
var tcs = new TaskCompletionSource<Envelope?>();
320-
lock (_lock)
321-
_subscribers[effectId] = new Subscription(predicate, tcs, timeout, messageId, messageTypeId, receiverId, senderId);
232+
await FetchMessagesOnce();
322233

323234
var timeoutTask = timeout != null
324235
? timeouts.AddTimeout(timeoutId, timeout.Value)
325236
: new TaskCompletionSource().Task;
326237

327-
_ = DeliverMessages();
238+
var maxWaitTask = Task.Delay(maxWait ?? settings.MessagesDefaultMaxWaitForCompletion);
328239

329-
await Task.WhenAny(tcs.Task, timeoutTask, Task.Delay(maxWait ?? settings.MessagesDefaultMaxWaitForCompletion));
330-
331-
var shouldSuspend = false;
332-
lock (_lock)
240+
while (true)
333241
{
334-
if (!tcs.Task.IsCompleted && timeoutTask.IsCompleted)
242+
if (_thrownException != null)
243+
throw _thrownException;
244+
245+
MessageData? matched = null;
246+
var positionToRemoveIndex = 0;
247+
Task pulseTask;
248+
lock (_lock)
335249
{
336-
_subscribers.Remove(effectId);
337-
tcs.TrySetResult(null);
250+
for (var i = 0; i < _toDeliver.Count; i++)
251+
if (predicate(_toDeliver[i].Envelope))
252+
{
253+
matched = _toDeliver[i];
254+
_toDeliver.RemoveAt(i);
255+
positionToRemoveIndex = _nextToRemoveIndex++;
256+
break;
257+
}
258+
259+
pulseTask = _pulse.Task;
338260
}
339261

340-
if (!tcs.Task.IsCompleted)
341-
shouldSuspend = true;
342-
else
262+
if (matched != null)
263+
{
264+
var toRemoveId = new EffectId([-1, 0, positionToRemoveIndex]);
265+
await effect.Upserts(
266+
new List<EffectResult>(
267+
[
268+
new EffectResult(_toRemoveNextIndex, positionToRemoveIndex, Alias: null),
269+
new EffectResult(toRemoveId, matched.Position, Alias: null),
270+
new EffectResult(messageId, matched.MessageContentBytes, Alias: null),
271+
new EffectResult(messageTypeId, matched.MessageTypeBytes, Alias: null),
272+
new EffectResult(receiverId, matched.Receiver, Alias: null),
273+
new EffectResult(senderId, matched.Sender, Alias: null),
274+
]).Concat(matched.IdempotencyKeyResult == null
275+
? []
276+
: [matched.IdempotencyKeyResult]),
277+
flush: false
278+
);
279+
343280
timeouts.RemoveTimeout(timeoutId);
344-
}
281+
return matched.Envelope;
282+
}
345283

346-
if (shouldSuspend)
347-
await flowsManager.Suspend(storedId);
284+
await Task.WhenAny(pulseTask, timeoutTask, maxWaitTask);
348285

349-
return await tcs.Task;
286+
if (timeoutTask.IsCompleted)
287+
return null;
288+
289+
if (maxWaitTask.IsCompleted)
290+
{
291+
await flowsManager.Suspend(storedId);
292+
return null;
293+
}
294+
}
350295
}
351296

352297
public record MessageData(
@@ -359,15 +304,6 @@ public record MessageData(
359304
string? Sender
360305
);
361306

362-
private record Subscription(
363-
MessagePredicate Predicate,
364-
TaskCompletionSource<Envelope?> Tcs,
365-
DateTime? Timeout,
366-
EffectId MessageContentId,
367-
EffectId MessageTypeId,
368-
EffectId ReceiverId,
369-
EffectId SenderId);
370-
371307
public void Dispose()
372308
{
373309
_disposed = true;

0 commit comments

Comments
 (0)