Skip to content

Commit 5e7ff85

Browse files
authored
Add ResetInterrupted method to function stores (#125)
1 parent e7c892b commit 5e7ff85

15 files changed

Lines changed: 140 additions & 77 deletions

File tree

Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/StoreTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,10 @@ public override Task RestartingExecutionShouldFailWhenExpectedEpochDoesNotMatch(
9898
public override Task RestartingFunctionShouldSetInterruptedToFalse()
9999
=> RestartingFunctionShouldSetInterruptedToFalse(FunctionStoreFactory.Create());
100100

101+
[TestMethod]
102+
public override Task ResetInterruptedClearsInterruptedFlag()
103+
=> ResetInterruptedClearsInterruptedFlag(FunctionStoreFactory.Create());
104+
101105
[TestMethod]
102106
public override Task MessagesCanBeFetchedAfterFunctionWithInitialMessagesHasBeenCreated()
103107
=> MessagesCanBeFetchedAfterFunctionWithInitialMessagesHasBeenCreated(FunctionStoreFactory.Create());

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/StoreTests.cs

Lines changed: 48 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -689,16 +689,58 @@ public async Task RestartingFunctionShouldSetInterruptedToFalse(Task<IFunctionSt
689689
session.ShouldBeNull();
690690

691691
await store.Interrupt(functionId).ShouldBeTrueAsync();
692-
await store.Interrupted(functionId).ShouldBeAsync(true);
692+
(await store.GetInterruptedFunctions()).Any(id => id == functionId).ShouldBeTrue();
693693

694694
await store.RestartExecution(
695-
functionId,
695+
functionId,
696696
owner: ReplicaId.NewId()
697697
).ShouldNotBeNullAsync();
698-
699-
await store.Interrupted(functionId).ShouldBeAsync(false);
698+
699+
(await store.GetInterruptedFunctions()).Any(id => id == functionId).ShouldBeFalse();
700700
}
701-
701+
702+
public abstract Task ResetInterruptedClearsInterruptedFlag();
703+
protected async Task ResetInterruptedClearsInterruptedFlag(Task<IFunctionStore> storeTask)
704+
{
705+
var store = await storeTask;
706+
var functionId1 = TestStoredId.Create();
707+
var functionId2 = StoredId.Create(functionId1.Type, Guid.NewGuid().ToString());
708+
709+
await store.CreateFunction(
710+
functionId1,
711+
"humanInstanceId1",
712+
param: Test.SimpleStoredParameter,
713+
leaseExpiration: DateTime.UtcNow.Ticks,
714+
postponeUntil: null,
715+
timestamp: DateTime.UtcNow.Ticks,
716+
parent: null,
717+
owner: null
718+
);
719+
720+
await store.CreateFunction(
721+
functionId2,
722+
"humanInstanceId2",
723+
param: Test.SimpleStoredParameter,
724+
leaseExpiration: DateTime.UtcNow.Ticks,
725+
postponeUntil: null,
726+
timestamp: DateTime.UtcNow.Ticks,
727+
parent: null,
728+
owner: null
729+
);
730+
731+
await store.Interrupt(functionId1).ShouldBeTrueAsync();
732+
await store.Interrupt(functionId2).ShouldBeTrueAsync();
733+
734+
(await store.GetInterruptedFunctions()).Count.ShouldBe(2);
735+
736+
await store.ResetInterrupted([functionId1]);
737+
738+
var interrupted = await store.GetInterruptedFunctions();
739+
interrupted.Count.ShouldBe(1);
740+
interrupted.Any(id => id == functionId2).ShouldBeTrue();
741+
interrupted.Any(id => id == functionId1).ShouldBeFalse();
742+
}
743+
702744
public abstract Task MessagesCanBeFetchedAfterFunctionWithInitialMessagesHasBeenCreated();
703745
public async Task MessagesCanBeFetchedAfterFunctionWithInitialMessagesHasBeenCreated(Task<IFunctionStore> storeTask)
704746
{
@@ -1039,7 +1081,7 @@ protected async Task InterruptCountForNonExistingFunctionIsNull(Task<IFunctionSt
10391081
{
10401082
var functionId = TestStoredId.Create();
10411083
var store = await storeTask;
1042-
(await store.Interrupted(functionId)).ShouldBeNull();
1084+
(await store.GetFunction(functionId)).ShouldBeNull();
10431085
}
10441086

10451087
public abstract Task DefaultStateCanSetAndFetchedAfterwards();

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/CrashableFunctionStore.cs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -187,11 +187,8 @@ public Task<bool> Interrupt(StoredId storedId)
187187

188188
public Task Interrupt(IReadOnlyList<StoredId> storedIds) => _inner.Interrupt(storedIds);
189189

190-
public Task<bool?> Interrupted(StoredId storedId)
191-
=> _crashed
192-
? Task.FromException<bool?>(new TimeoutException())
193-
: _inner.Interrupted(storedId);
194-
190+
public Task ResetInterrupted(IReadOnlyList<StoredId> storedIds) => _inner.ResetInterrupted(storedIds);
191+
195192
public Task<bool> SetParameters(StoredId storedId, byte[]? storedParameter, byte[]? storedResult, ReplicaId? expectedReplica)
196193
=> _crashed
197194
? Task.FromException<bool>(new TimeoutException())

Core/Cleipnir.ResilientFunctions/Storage/IFunctionStore.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ Task<bool> SuspendFunction(
104104

105105
Task<bool> Interrupt(StoredId storedId);
106106
Task Interrupt(IReadOnlyList<StoredId> storedIds);
107-
Task<bool?> Interrupted(StoredId storedId);
107+
Task ResetInterrupted(IReadOnlyList<StoredId> storedIds);
108108

109109
Task<Status?> GetFunctionStatus(StoredId storedId);
110110
Task<IReadOnlyList<StatusAndId>> GetFunctionsStatus(IEnumerable<StoredId> storedIds);

Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -450,15 +450,16 @@ public async Task Interrupt(IReadOnlyList<StoredId> storedIds)
450450
await Interrupt(storedId);
451451
}
452452

453-
public Task<bool?> Interrupted(StoredId storedId)
453+
public Task ResetInterrupted(IReadOnlyList<StoredId> storedIds)
454454
{
455455
lock (_sync)
456456
{
457-
if (!_states.ContainsKey(storedId))
458-
return Task.FromResult(default(bool?));
459-
460-
return ((bool?) _states[storedId].Interrupted).ToTask();
457+
foreach (var storedId in storedIds)
458+
if (_states.TryGetValue(storedId, out var state))
459+
state.Interrupted = false;
461460
}
461+
462+
return Task.CompletedTask;
462463
}
463464

464465
public Task<Status?> GetFunctionStatus(StoredId storedId)

Samples/Sample.ConsoleApp/Utils/CrashableFunctionStore.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -170,11 +170,11 @@ public Task Interrupt(IReadOnlyList<StoredId> storedIds)
170170
? Task.FromException(new TimeoutException())
171171
: _inner.Interrupt(storedIds);
172172

173-
public Task<bool?> Interrupted(StoredId storedId)
173+
public Task ResetInterrupted(IReadOnlyList<StoredId> storedIds)
174174
=> _crashed
175-
? Task.FromException<bool?>(new TimeoutException())
176-
: _inner.Interrupted(storedId);
177-
175+
? Task.FromException(new TimeoutException())
176+
: _inner.ResetInterrupted(storedIds);
177+
178178
public Task<Status?> GetFunctionStatus(StoredId storedId)
179179
=> _crashed
180180
? Task.FromException<Status?>(new TimeoutException())

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/StoreTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,10 @@ public override Task RestartingExecutionShouldFailWhenExpectedEpochDoesNotMatch(
8787
public override Task RestartingFunctionShouldSetInterruptedToFalse()
8888
=> RestartingFunctionShouldSetInterruptedToFalse(FunctionStoreFactory.Create());
8989

90+
[TestMethod]
91+
public override Task ResetInterruptedClearsInterruptedFlag()
92+
=> ResetInterruptedClearsInterruptedFlag(FunctionStoreFactory.Create());
93+
9094
[TestMethod]
9195
public override Task MessagesCanBeFetchedAfterFunctionWithInitialMessagesHasBeenCreated()
9296
=> MessagesCanBeFetchedAfterFunctionWithInitialMessagesHasBeenCreated(FunctionStoreFactory.Create());

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbFunctionStore.cs

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -670,12 +670,22 @@ public async Task Interrupt(IReadOnlyList<StoredId> storedIds)
670670
{
671671
if (storedIds.Count == 0)
672672
return;
673-
673+
674674
await using var conn = await CreateOpenConnection(_connectionString);
675675
await using var cmd = _sqlGenerator.Interrupt(storedIds).ToSqlCommand(conn);
676676
await cmd.ExecuteNonQueryAsync();
677677
}
678678

679+
public async Task ResetInterrupted(IReadOnlyList<StoredId> storedIds)
680+
{
681+
if (storedIds.Count == 0)
682+
return;
683+
684+
await using var conn = await CreateOpenConnection(_connectionString);
685+
await using var cmd = _sqlGenerator.ResetInterrupted(storedIds).ToSqlCommand(conn);
686+
await cmd.ExecuteNonQueryAsync();
687+
}
688+
679689
private string? _setParametersSql;
680690
public async Task<bool> SetParameters(
681691
StoredId storedId,
@@ -709,26 +719,6 @@ public async Task<bool> SetParameters(
709719
return affectedRows == 1;
710720
}
711721

712-
private string? _getInterruptCountSql;
713-
public async Task<bool?> Interrupted(StoredId storedId)
714-
{
715-
await using var conn = await CreateOpenConnection(_connectionString);
716-
717-
_getInterruptCountSql ??= $@"
718-
SELECT interrupted
719-
FROM {_tablePrefix}
720-
WHERE id = ?;";
721-
722-
await using var command = new MySqlCommand(_getInterruptCountSql, conn)
723-
{
724-
Parameters =
725-
{
726-
new() { Value = storedId.AsGuid.ToString("N") },
727-
}
728-
};
729-
730-
return (bool?) await command.ExecuteScalarAsync();
731-
}
732722

733723
private string? _getFunctionStatusSql;
734724
public async Task<Status?> GetFunctionStatus(StoredId storedId)

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/SqlGenerator.cs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,17 @@ ELSE expires
3333

3434
return StoreCommand.Create(sql);
3535
}
36-
36+
37+
public StoreCommand ResetInterrupted(IEnumerable<StoredId> storedIds)
38+
{
39+
var sql = @$"
40+
UPDATE {tablePrefix}
41+
SET interrupted = FALSE
42+
WHERE Id IN ({storedIds.Select(id => $"'{id.AsGuid:N}'").StringJoin(", ")});";
43+
44+
return StoreCommand.Create(sql);
45+
}
46+
3747
private string? _getEffectResultsSql;
3848
public StoreCommand GetEffects(StoredId storedId)
3949
{

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/StoreTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ public override Task RestartingExecutionShouldFailWhenExpectedEpochDoesNotMatch(
9090
public override Task RestartingFunctionShouldSetInterruptedToFalse()
9191
=> RestartingFunctionShouldSetInterruptedToFalse(FunctionStoreFactory.Create());
9292

93+
[TestMethod]
94+
public override Task ResetInterruptedClearsInterruptedFlag()
95+
=> ResetInterruptedClearsInterruptedFlag(FunctionStoreFactory.Create());
96+
9397
[TestMethod]
9498
public override Task MessagesCanBeFetchedAfterFunctionWithInitialMessagesHasBeenCreated()
9599
=> MessagesCanBeFetchedAfterFunctionWithInitialMessagesHasBeenCreated(FunctionStoreFactory.Create());

0 commit comments

Comments
 (0)