Skip to content

Commit 8d7fff4

Browse files
committed
Add synchronous UpsertWithoutFlush method to EffectResults and Effect
1 parent b39d0c4 commit 8d7fff4

4 files changed

Lines changed: 55 additions & 0 deletions

File tree

Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/RFunctionTests/EffectTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,4 +149,8 @@ public override Task RunParallelleTest()
149149
[TestMethod]
150150
public override Task UtcNowEffectSunshineTest()
151151
=> UtcNowEffectSunshineTest(FunctionStoreFactory.Create());
152+
153+
[TestMethod]
154+
public override Task UpsertWithoutFlushIsNotStoredUntilFlushed()
155+
=> UpsertWithoutFlushIsNotStoredUntilFlushed(FunctionStoreFactory.Create());
152156
}

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/EffectTests.cs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1494,4 +1494,47 @@ async Task (workflow) =>
14941494
syncedList.Count.ShouldBe(2);
14951495
syncedList[0].ShouldBe(syncedList[1]);
14961496
}
1497+
1498+
public abstract Task UpsertWithoutFlushIsNotStoredUntilFlushed();
1499+
public async Task UpsertWithoutFlushIsNotStoredUntilFlushed(Task<IFunctionStore> storeTask)
1500+
{
1501+
var store = await storeTask;
1502+
var storedId = TestStoredId.Create();
1503+
var session = await store.CreateFunction(
1504+
storedId,
1505+
"SomeInstance",
1506+
param: null,
1507+
leaseExpiration: 0,
1508+
postponeUntil: null,
1509+
timestamp: 0,
1510+
parent: null,
1511+
owner: ReplicaId.NewId()
1512+
);
1513+
var effectStore = store.EffectsStore;
1514+
var effectResults = new EffectResults(
1515+
TestFlowId.Create(),
1516+
storedId,
1517+
new List<StoredEffect>(),
1518+
effectStore,
1519+
DefaultSerializer.Instance,
1520+
session,
1521+
clearChildren: true
1522+
);
1523+
1524+
var effectId = new EffectId([1]);
1525+
effectResults.UpsertWithoutFlush(effectId, alias: null, 42);
1526+
1527+
// before flush nothing should be stored
1528+
var fetchedResults = await effectStore.GetEffectResults(storedId);
1529+
fetchedResults.Count.ShouldBe(0);
1530+
1531+
// after flush the value should be persisted
1532+
await effectResults.Flush();
1533+
1534+
fetchedResults = await effectStore.GetEffectResults(storedId);
1535+
fetchedResults.Count.ShouldBe(1);
1536+
var storedEffect = fetchedResults.Single();
1537+
storedEffect.EffectId.ShouldBe(effectId);
1538+
((int)DefaultSerializer.Instance.Deserialize(storedEffect.Result!, typeof(int))).ShouldBe(42);
1539+
}
14971540
}

Core/Cleipnir.ResilientFunctions/Domain/Effect.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ internal async Task Upsert<T>(string alias, T value, bool flush = true)
6565
flush
6666
);
6767
internal Task Upsert<T>(EffectId effectId, T value, string? alias, bool flush) => effectResults.Upsert(effectId, alias, value, flush);
68+
internal void UpsertWithoutFlush<T>(EffectId effectId, T value, string? alias) => effectResults.UpsertWithoutFlush(effectId, alias, value);
6869

6970
internal Task Upserts(IEnumerable<EffectResult> values, bool flush)
7071
=> effectResults.Upserts(values, flush);

Core/Cleipnir.ResilientFunctions/Domain/EffectResults.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,13 @@ await FlushOrAddToPending(
151151
clearChildren: false
152152
);
153153
}
154+
155+
internal void UpsertWithoutFlush<T>(EffectId effectId, string? alias, T value)
156+
{
157+
var serializedValue = _serializer.Serialize(value!, typeof(T));
158+
var storedEffect = StoredEffect.CreateCompleted(effectId, serializedValue, alias);
159+
AddToPending(storedEffect.EffectId, storedEffect, delete: false, clearChildren: false);
160+
}
154161

155162
internal async Task Upserts(IEnumerable<EffectResult> values, bool flush)
156163
{

0 commit comments

Comments
 (0)