Skip to content

Commit e7c892b

Browse files
authored
Simplify GetInterruptedFunctions to be parameterless and add interrupted index (#124)
1 parent c8e683d commit e7c892b

12 files changed

Lines changed: 50 additions & 56 deletions

File tree

Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/StoreTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,8 +239,8 @@ public override Task GetInterruptedFunctionsReturnsOnlyInterruptedFunctions()
239239
=> GetInterruptedFunctionsReturnsOnlyInterruptedFunctions(FunctionStoreFactory.Create());
240240

241241
[TestMethod]
242-
public override Task GetInterruptedFunctionsReturnsEmptyListWhenNoIdsProvided()
243-
=> GetInterruptedFunctionsReturnsEmptyListWhenNoIdsProvided(FunctionStoreFactory.Create());
242+
public override Task GetInterruptedFunctionsReturnsEmptyListWhenNoneExist()
243+
=> GetInterruptedFunctionsReturnsEmptyListWhenNoneExist(FunctionStoreFactory.Create());
244244

245245
[TestMethod]
246246
public override Task GetInterruptedFunctionsReturnsEmptyListWhenNoneFunctionsAreInterrupted()

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/StoreTests.cs

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2008,8 +2008,8 @@ protected async Task GetInterruptedFunctionsReturnsOnlyInterruptedFunctions(Task
20082008
await store.Interrupt(functionId1).ShouldBeTrueAsync();
20092009
await store.Interrupt(functionId3).ShouldBeTrueAsync();
20102010

2011-
// Get interrupted functions from the set of all 4 functions
2012-
var interruptedFunctions = await store.GetInterruptedFunctions([functionId1, functionId2, functionId3, functionId4]);
2011+
// Get interrupted functions
2012+
var interruptedFunctions = await store.GetInterruptedFunctions();
20132013

20142014
// Should return only the 2 interrupted functions
20152015
interruptedFunctions.Count.ShouldBe(2);
@@ -2019,12 +2019,12 @@ protected async Task GetInterruptedFunctionsReturnsOnlyInterruptedFunctions(Task
20192019
interruptedFunctions.Any(id => id == functionId4).ShouldBeFalse();
20202020
}
20212021

2022-
public abstract Task GetInterruptedFunctionsReturnsEmptyListWhenNoIdsProvided();
2023-
protected async Task GetInterruptedFunctionsReturnsEmptyListWhenNoIdsProvided(Task<IFunctionStore> storeTask)
2022+
public abstract Task GetInterruptedFunctionsReturnsEmptyListWhenNoneExist();
2023+
protected async Task GetInterruptedFunctionsReturnsEmptyListWhenNoneExist(Task<IFunctionStore> storeTask)
20242024
{
20252025
var store = await storeTask;
20262026

2027-
var interruptedFunctions = await store.GetInterruptedFunctions([]);
2027+
var interruptedFunctions = await store.GetInterruptedFunctions();
20282028
interruptedFunctions.Count.ShouldBe(0);
20292029
}
20302030

@@ -2060,7 +2060,7 @@ protected async Task GetInterruptedFunctionsReturnsEmptyListWhenNoneFunctionsAre
20602060
session.ShouldBeNull();
20612061

20622062
// Don't interrupt any functions
2063-
var interruptedFunctions = await store.GetInterruptedFunctions([functionId1, functionId2]);
2063+
var interruptedFunctions = await store.GetInterruptedFunctions();
20642064

20652065
interruptedFunctions.Count.ShouldBe(0);
20662066
}
@@ -2101,10 +2101,13 @@ protected async Task GetInterruptedFunctionsReturnsEmptyListWhenQueriedIdsDoNotE
21012101
await store.Interrupt(functionId1).ShouldBeTrueAsync();
21022102
await store.Interrupt(functionId2).ShouldBeTrueAsync();
21032103

2104-
// Query for non-existent IDs
2105-
var interruptedFunctions = await store.GetInterruptedFunctions([nonExistentId1, nonExistentId2]);
2104+
// No functions are interrupted despite existing
2105+
var interruptedFunctions = await store.GetInterruptedFunctions();
21062106

2107-
interruptedFunctions.Count.ShouldBe(0);
2107+
// Should return the 2 interrupted functions
2108+
interruptedFunctions.Count.ShouldBe(2);
2109+
interruptedFunctions.Any(id => id == functionId1).ShouldBeTrue();
2110+
interruptedFunctions.Any(id => id == functionId2).ShouldBeTrue();
21082111
}
21092112

21102113
public abstract Task GetInterruptedFunctionsOnlyReturnsMatchingInterruptedFunctions();
@@ -2171,15 +2174,15 @@ protected async Task GetInterruptedFunctionsOnlyReturnsMatchingInterruptedFuncti
21712174
await store.Interrupt(functionId3).ShouldBeTrueAsync();
21722175
await store.Interrupt(functionId4).ShouldBeTrueAsync();
21732176

2174-
// Query for only functions 2 and 4
2175-
var interruptedFunctions = await store.GetInterruptedFunctions([functionId2, functionId4]);
2177+
// Get all interrupted functions
2178+
var interruptedFunctions = await store.GetInterruptedFunctions();
21762179

2177-
// Should return only the 2 queried interrupted functions
2178-
interruptedFunctions.Count.ShouldBe(2);
2180+
// Should return all 4 interrupted functions
2181+
interruptedFunctions.Count.ShouldBe(4);
2182+
interruptedFunctions.Any(id => id == functionId1).ShouldBeTrue();
21792183
interruptedFunctions.Any(id => id == functionId2).ShouldBeTrue();
2184+
interruptedFunctions.Any(id => id == functionId3).ShouldBeTrue();
21802185
interruptedFunctions.Any(id => id == functionId4).ShouldBeTrue();
2181-
interruptedFunctions.Any(id => id == functionId1).ShouldBeFalse();
2182-
interruptedFunctions.Any(id => id == functionId3).ShouldBeFalse();
21832186
}
21842187

21852188
public abstract Task GetResultsReturnsResultsForExistingFunctions();

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/CrashableFunctionStore.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,10 @@ public Task<IReadOnlyList<StoredId>> GetSucceededFunctions(long completedBefore)
9797
? Task.FromException<IReadOnlyList<StoredId>>(new TimeoutException())
9898
: _inner.GetSucceededFunctions(completedBefore);
9999

100-
public Task<IReadOnlyList<StoredId>> GetInterruptedFunctions(IEnumerable<StoredId> ids)
100+
public Task<IReadOnlyList<StoredId>> GetInterruptedFunctions()
101101
=> _crashed
102102
? Task.FromException<IReadOnlyList<StoredId>>(new TimeoutException())
103-
: _inner.GetInterruptedFunctions(ids);
103+
: _inner.GetInterruptedFunctions();
104104

105105
public Task<bool> SetFunctionState(
106106
StoredId storedId, Status status, byte[]? storedParameter,

Core/Cleipnir.ResilientFunctions/Storage/IFunctionStore.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ Task<int> BulkScheduleFunctions(
4141

4242
Task<IReadOnlyList<StoredId>> GetExpiredFunctions(long expiresBefore);
4343
Task<IReadOnlyList<StoredId>> GetSucceededFunctions(long completedBefore);
44-
Task<IReadOnlyList<StoredId>> GetInterruptedFunctions(IEnumerable<StoredId> ids);
44+
Task<IReadOnlyList<StoredId>> GetInterruptedFunctions();
4545

4646
Task<bool> SetParameters(
4747
StoredId storedId,

Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -240,15 +240,11 @@ public Task<IReadOnlyList<StoredId>> GetSucceededFunctions(long completedBefore)
240240
.ToTask();
241241
}
242242

243-
public Task<IReadOnlyList<StoredId>> GetInterruptedFunctions(IEnumerable<StoredId> ids)
243+
public Task<IReadOnlyList<StoredId>> GetInterruptedFunctions()
244244
{
245-
var idsList = ids.ToHashSet();
246-
if (idsList.Count == 0)
247-
return Array.Empty<StoredId>().CastTo<IReadOnlyList<StoredId>>().ToTask();
248-
249245
lock (_sync)
250246
return _states
251-
.Where(kv => idsList.Contains(kv.Key) && kv.Value.Interrupted)
247+
.Where(kv => kv.Value.Interrupted)
252248
.Select(kv => kv.Key)
253249
.ToList()
254250
.CastTo<IReadOnlyList<StoredId>>()

Samples/Sample.ConsoleApp/Utils/CrashableFunctionStore.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,10 @@ public Task<IReadOnlyList<StoredId>> GetSucceededFunctions(long completedBefore)
7979
? Task.FromException<IReadOnlyList<StoredId>>(new TimeoutException())
8080
: _inner.GetSucceededFunctions(completedBefore);
8181

82-
public Task<IReadOnlyList<StoredId>> GetInterruptedFunctions(IEnumerable<StoredId> ids)
82+
public Task<IReadOnlyList<StoredId>> GetInterruptedFunctions()
8383
=> _crashed
8484
? Task.FromException<IReadOnlyList<StoredId>>(new TimeoutException())
85-
: _inner.GetInterruptedFunctions(ids);
85+
: _inner.GetInterruptedFunctions();
8686

8787
public Task<bool> SetParameters(StoredId storedId, byte[]? param, byte[]? result, ReplicaId? expectedReplica)
8888
=> _crashed

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/StoreTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -228,8 +228,8 @@ public override Task GetInterruptedFunctionsReturnsOnlyInterruptedFunctions()
228228
=> GetInterruptedFunctionsReturnsOnlyInterruptedFunctions(FunctionStoreFactory.Create());
229229

230230
[TestMethod]
231-
public override Task GetInterruptedFunctionsReturnsEmptyListWhenNoIdsProvided()
232-
=> GetInterruptedFunctionsReturnsEmptyListWhenNoIdsProvided(FunctionStoreFactory.Create());
231+
public override Task GetInterruptedFunctionsReturnsEmptyListWhenNoneExist()
232+
=> GetInterruptedFunctionsReturnsEmptyListWhenNoneExist(FunctionStoreFactory.Create());
233233

234234
[TestMethod]
235235
public override Task GetInterruptedFunctionsReturnsEmptyListWhenNoneFunctionsAreInterrupted()

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbFunctionStore.cs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,8 @@ id CHAR(32) PRIMARY KEY,
8888
parent CHAR(32) NULL,
8989
owner CHAR(32) NULL,
9090
effects LONGBLOB NULL,
91-
INDEX (expires, id, status)
91+
INDEX (expires, id, status),
92+
INDEX idx_interrupted (id, interrupted)
9293
);";
9394

9495
await using var command = new MySqlCommand(_initializeSql, conn);
@@ -441,16 +442,12 @@ SELECT id
441442
return ids;
442443
}
443444

444-
public async Task<IReadOnlyList<StoredId>> GetInterruptedFunctions(IEnumerable<StoredId> ids)
445+
public async Task<IReadOnlyList<StoredId>> GetInterruptedFunctions()
445446
{
446-
var inSql = ids.Select(id => $"'{id.AsGuid:N}'").StringJoin(", ");
447-
if (string.IsNullOrEmpty(inSql))
448-
return [];
449-
450447
var sql = @$"
451448
SELECT id
452449
FROM {_tablePrefix}
453-
WHERE interrupted = TRUE AND id IN ({inSql})";
450+
WHERE interrupted = TRUE";
454451

455452
await using var conn = await CreateOpenConnection(_connectionString);
456453
await using var command = new MySqlCommand(sql, conn);

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/StoreTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -231,8 +231,8 @@ public override Task GetInterruptedFunctionsReturnsOnlyInterruptedFunctions()
231231
=> GetInterruptedFunctionsReturnsOnlyInterruptedFunctions(FunctionStoreFactory.Create());
232232

233233
[TestMethod]
234-
public override Task GetInterruptedFunctionsReturnsEmptyListWhenNoIdsProvided()
235-
=> GetInterruptedFunctionsReturnsEmptyListWhenNoIdsProvided(FunctionStoreFactory.Create());
234+
public override Task GetInterruptedFunctionsReturnsEmptyListWhenNoneExist()
235+
=> GetInterruptedFunctionsReturnsEmptyListWhenNoneExist(FunctionStoreFactory.Create());
236236

237237
[TestMethod]
238238
public override Task GetInterruptedFunctionsReturnsEmptyListWhenNoneFunctionsAreInterrupted()

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlFunctionStore.cs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,11 @@ parent UUID NULL
9494
9595
CREATE INDEX IF NOT EXISTS idx_{_tableName}_expires
9696
ON {_tableName}(expires, id)
97-
WHERE status = {(int) Status.Postponed};";
97+
WHERE status = {(int) Status.Postponed};
98+
99+
CREATE INDEX IF NOT EXISTS idx_{_tableName}_interrupted
100+
ON {_tableName}(id)
101+
WHERE interrupted = TRUE;";
98102

99103
await using var command = new NpgsqlCommand(_initializeSql, conn);
100104
await command.ExecuteNonQueryAsync();
@@ -410,20 +414,15 @@ SELECT id
410414
return ids;
411415
}
412416

413-
public async Task<IReadOnlyList<StoredId>> GetInterruptedFunctions(IEnumerable<StoredId> ids)
417+
public async Task<IReadOnlyList<StoredId>> GetInterruptedFunctions()
414418
{
415-
var idsArray = ids.Select(id => id.AsGuid).ToArray();
416-
if (idsArray.Length == 0)
417-
return [];
418-
419419
var sql = @$"
420420
SELECT id
421421
FROM {_tableName}
422-
WHERE interrupted = TRUE AND id = ANY($1)";
422+
WHERE interrupted = TRUE";
423423

424424
await using var conn = await CreateConnection();
425425
await using var command = new NpgsqlCommand(sql, conn);
426-
command.Parameters.Add(new NpgsqlParameter { Value = idsArray });
427426

428427
await using var reader = await command.ExecuteReaderAsync();
429428
var interruptedIds = new List<StoredId>();

0 commit comments

Comments
 (0)