Skip to content

Commit d500fd0

Browse files
committed
Add FlushlessUpserts method to EffectResults and Effect and use it in QueueManager
1 parent 888f5c3 commit d500fd0

8 files changed

Lines changed: 85 additions & 3 deletions

File tree

Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/RFunctionTests/EffectTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,4 +153,8 @@ public override Task UtcNowEffectSunshineTest()
153153
[TestMethod]
154154
public override Task FlushlessUpsertIsNotStoredUntilFlushed()
155155
=> FlushlessUpsertIsNotStoredUntilFlushed(FunctionStoreFactory.Create());
156+
157+
[TestMethod]
158+
public override Task FlushlessUpsertsAreNotStoredUntilFlushed()
159+
=> FlushlessUpsertsAreNotStoredUntilFlushed(FunctionStoreFactory.Create());
156160
}

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/EffectTests.cs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1537,4 +1537,55 @@ public async Task FlushlessUpsertIsNotStoredUntilFlushed(Task<IFunctionStore> st
15371537
storedEffect.EffectId.ShouldBe(effectId);
15381538
((int)DefaultSerializer.Instance.Deserialize(storedEffect.Result!, typeof(int))).ShouldBe(42);
15391539
}
1540+
1541+
public abstract Task FlushlessUpsertsAreNotStoredUntilFlushed();
1542+
public async Task FlushlessUpsertsAreNotStoredUntilFlushed(Task<IFunctionStore> storeTask)
1543+
{
1544+
var store = await storeTask;
1545+
var storedId = TestStoredId.Create();
1546+
var session = await store.CreateFunction(
1547+
storedId,
1548+
"SomeInstance",
1549+
param: null,
1550+
leaseExpiration: 0,
1551+
postponeUntil: null,
1552+
timestamp: 0,
1553+
parent: null,
1554+
owner: ReplicaId.NewId()
1555+
);
1556+
var effectStore = store.EffectsStore;
1557+
var effectResults = new EffectResults(
1558+
TestFlowId.Create(),
1559+
storedId,
1560+
new List<StoredEffect>(),
1561+
effectStore,
1562+
DefaultSerializer.Instance,
1563+
session,
1564+
clearChildren: true
1565+
);
1566+
1567+
var effectId1 = new EffectId([1]);
1568+
var effectId2 = new EffectId([2]);
1569+
effectResults.FlushlessUpserts(
1570+
[
1571+
new EffectResult(effectId1, 42, Alias: null),
1572+
new EffectResult(effectId2, "hello", Alias: null),
1573+
]);
1574+
1575+
// before flush nothing should be stored
1576+
var fetchedResults = await effectStore.GetEffectResults(storedId);
1577+
fetchedResults.Count.ShouldBe(0);
1578+
1579+
// after flush the values should be persisted
1580+
await effectResults.Flush();
1581+
1582+
fetchedResults = await effectStore.GetEffectResults(storedId);
1583+
fetchedResults.Count.ShouldBe(2);
1584+
((int)DefaultSerializer.Instance.Deserialize(
1585+
fetchedResults.Single(r => r.EffectId == effectId1).Result!, typeof(int)
1586+
)).ShouldBe(42);
1587+
((string)DefaultSerializer.Instance.Deserialize(
1588+
fetchedResults.Single(r => r.EffectId == effectId2).Result!, typeof(string)
1589+
)).ShouldBe("hello");
1590+
}
15401591
}

Core/Cleipnir.ResilientFunctions/Domain/Effect.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ internal async Task Upsert<T>(string alias, T value, bool flush = true)
6969

7070
internal Task Upserts(IEnumerable<EffectResult> values, bool flush)
7171
=> effectResults.Upserts(values, flush);
72+
internal void FlushlessUpserts(IEnumerable<EffectResult> values)
73+
=> effectResults.FlushlessUpserts(values);
7274

7375
internal bool TryGet<T>(string alias, out T? value)
7476
{

Core/Cleipnir.ResilientFunctions/Domain/EffectResults.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,20 @@ internal async Task Upserts(IEnumerable<EffectResult> values, bool flush)
175175
if (flush)
176176
await Flush();
177177
}
178+
179+
internal void FlushlessUpserts(IEnumerable<EffectResult> values)
180+
{
181+
var storedEffects = values
182+
.Select(t =>
183+
{
184+
var bytes = _serializer.Serialize(t.Value!, t.Value?.GetType() ?? typeof(object));
185+
return new { Id = t.Id, Bytes = bytes, Alias = t.Alias };
186+
})
187+
.Select(a => StoredEffect.CreateCompleted(a.Id, a.Bytes, a.Alias))
188+
.ToList();
189+
190+
AddToPending(storedEffects);
191+
}
178192

179193
public bool TryGet<T>(EffectId effectId, out T? value)
180194
{

Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ private void PulseAll()
263263
if (matched != null)
264264
{
265265
var toRemoveId = new EffectId([-1, 0, positionToRemoveIndex]);
266-
await effect.Upserts(
266+
effect.FlushlessUpserts(
267267
new List<EffectResult>(
268268
[
269269
new EffectResult(_toRemoveNextIndex, positionToRemoveIndex, Alias: null),
@@ -274,8 +274,7 @@ await effect.Upserts(
274274
new EffectResult(senderId, matched.Sender, Alias: null),
275275
]).Concat(matched.IdempotencyKeyResult == null
276276
? []
277-
: [matched.IdempotencyKeyResult]),
278-
flush: false
277+
: [matched.IdempotencyKeyResult])
279278
);
280279

281280
timeouts.RemoveTimeout(timeoutId);

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/RFunctionTests/EffectTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,4 +152,8 @@ public override Task UtcNowEffectSunshineTest()
152152
[TestMethod]
153153
public override Task FlushlessUpsertIsNotStoredUntilFlushed()
154154
=> FlushlessUpsertIsNotStoredUntilFlushed(FunctionStoreFactory.Create());
155+
156+
[TestMethod]
157+
public override Task FlushlessUpsertsAreNotStoredUntilFlushed()
158+
=> FlushlessUpsertsAreNotStoredUntilFlushed(FunctionStoreFactory.Create());
155159
}

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/RFunctionTests/EffectTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,4 +153,8 @@ public override Task UtcNowEffectSunshineTest()
153153
[TestMethod]
154154
public override Task FlushlessUpsertIsNotStoredUntilFlushed()
155155
=> FlushlessUpsertIsNotStoredUntilFlushed(FunctionStoreFactory.Create());
156+
157+
[TestMethod]
158+
public override Task FlushlessUpsertsAreNotStoredUntilFlushed()
159+
=> FlushlessUpsertsAreNotStoredUntilFlushed(FunctionStoreFactory.Create());
156160
}

Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/EffectTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,4 +153,8 @@ public override Task UtcNowEffectSunshineTest()
153153
[TestMethod]
154154
public override Task FlushlessUpsertIsNotStoredUntilFlushed()
155155
=> FlushlessUpsertIsNotStoredUntilFlushed(FunctionStoreFactory.Create());
156+
157+
[TestMethod]
158+
public override Task FlushlessUpsertsAreNotStoredUntilFlushed()
159+
=> FlushlessUpsertsAreNotStoredUntilFlushed(FunctionStoreFactory.Create());
156160
}

0 commit comments

Comments
 (0)