Skip to content

Commit c990303

Browse files
committed
Extract TryTakeMessage method in QueueManager Subscribe loop
1 parent 4e93e3a commit c990303

1 file changed

Lines changed: 17 additions & 17 deletions

File tree

Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,22 @@ public async Task FetchMessagesOnce()
157157
}
158158
}
159159

160+
private (MessageData? Matched, int PositionToRemoveIndex, Task PulseTask) TryTakeMessage(MessagePredicate predicate)
161+
{
162+
lock (_lock)
163+
{
164+
for (var i = 0; i < _toDeliver.Count; i++)
165+
if (predicate(_toDeliver[i].Envelope))
166+
{
167+
var matched = _toDeliver[i];
168+
_toDeliver.RemoveAt(i);
169+
return (matched, _nextToRemoveIndex++, _pulse.Task);
170+
}
171+
172+
return (null, 0, _pulse.Task);
173+
}
174+
}
175+
160176
public async Task FetchMessages()
161177
{
162178
while (!_disposed && _thrownException == null)
@@ -242,23 +258,7 @@ private void PulseAll()
242258
if (_thrownException != null)
243259
throw _thrownException;
244260

245-
MessageData? matched = null;
246-
var positionToRemoveIndex = 0;
247-
Task pulseTask;
248-
lock (_lock)
249-
{
250-
for (var i = 0; i < _toDeliver.Count; i++)
251-
if (predicate(_toDeliver[i].Envelope))
252-
{
253-
matched = _toDeliver[i];
254-
_toDeliver.RemoveAt(i);
255-
positionToRemoveIndex = _nextToRemoveIndex++;
256-
break;
257-
}
258-
259-
pulseTask = _pulse.Task;
260-
}
261-
261+
var (matched, positionToRemoveIndex, pulseTask) = TryTakeMessage(predicate);
262262
if (matched != null)
263263
{
264264
var toRemoveId = new EffectId([-1, 0, positionToRemoveIndex]);

0 commit comments

Comments
 (0)