Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/StoreTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -269,16 +269,4 @@ public override Task GetResultsReturnsEmptyDictionaryForEmptyInput()
[TestMethod]
public override Task GetResultsReturnsOnlyExistingFunctionResults()
=> GetResultsReturnsOnlyExistingFunctionResults(FunctionStoreFactory.Create());

[TestMethod]
public override Task SetResultSucceedsWhenOwnerMatches()
=> SetResultSucceedsWhenOwnerMatches(FunctionStoreFactory.Create());

[TestMethod]
public override Task SetResultDoesNothingWhenOwnerDoesNotMatch()
=> SetResultDoesNothingWhenOwnerDoesNotMatch(FunctionStoreFactory.Create());

[TestMethod]
public override Task SetResultDoesNothingWhenFunctionDoesNotExist()
=> SetResultDoesNothingWhenFunctionDoesNotExist(FunctionStoreFactory.Create());
}
68 changes: 0 additions & 68 deletions Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/StoreTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2357,72 +2357,4 @@ await store.SucceedFunction(
results.ContainsKey(nonExistentFunctionId).ShouldBeFalse();
}

public abstract Task SetResultSucceedsWhenOwnerMatches();
protected async Task SetResultSucceedsWhenOwnerMatches(Task<IFunctionStore> storeTask)
{
var store = await storeTask;
var functionId = TestStoredId.Create();
var owner = ReplicaId.NewId();
var result = "test result".ToJson().ToUtf8Bytes();

// Create function with owner
await store.CreateFunction(
functionId,
"humanInstanceId",
param: Test.SimpleStoredParameter,
leaseExpiration: DateTime.UtcNow.Ticks,
postponeUntil: null,
timestamp: DateTime.UtcNow.Ticks,
parent: null,
owner: owner
).ShouldNotBeNullAsync();

// Set result
await store.SetResult(functionId, result, owner);

// Verify result was set
var results = await store.GetResults([functionId]);
results[functionId].ShouldBe(result);
}

public abstract Task SetResultDoesNothingWhenOwnerDoesNotMatch();
protected async Task SetResultDoesNothingWhenOwnerDoesNotMatch(Task<IFunctionStore> storeTask)
{
var store = await storeTask;
var functionId = TestStoredId.Create();
var owner = ReplicaId.NewId();
var wrongOwner = ReplicaId.NewId();
var result = "test result".ToJson().ToUtf8Bytes();

// Create function with owner
await store.CreateFunction(
functionId,
"humanInstanceId",
param: Test.SimpleStoredParameter,
leaseExpiration: DateTime.UtcNow.Ticks,
postponeUntil: null,
timestamp: DateTime.UtcNow.Ticks,
parent: null,
owner: owner
).ShouldNotBeNullAsync();

// Try to set result with wrong owner
await store.SetResult(functionId, result, wrongOwner);

// Verify result was not set
var results = await store.GetResults([functionId]);
results[functionId].ShouldBeNull();
}

public abstract Task SetResultDoesNothingWhenFunctionDoesNotExist();
protected async Task SetResultDoesNothingWhenFunctionDoesNotExist(Task<IFunctionStore> storeTask)
{
var store = await storeTask;
var nonExistentFunctionId = TestStoredId.Create();
var owner = ReplicaId.NewId();
var result = "test result".ToJson().ToUtf8Bytes();

// Try to set result for non-existent function (should not throw)
await store.SetResult(nonExistentFunctionId, result, owner);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -215,11 +215,6 @@ public Task<bool> DeleteFunction(StoredId storedId)
? Task.FromException<IReadOnlyDictionary<StoredId, byte[]?>>(new TimeoutException())
: _inner.GetResults(storedIds);

public Task SetResult(StoredId storedId, byte[] result, ReplicaId expectedReplica)
=> _crashed
? Task.FromException(new TimeoutException())
: _inner.SetResult(storedId, result, expectedReplica);

public IFunctionStore WithPrefix(string prefix) => _inner.WithPrefix(prefix);
}

Expand Down
7 changes: 3 additions & 4 deletions Core/Cleipnir.ResilientFunctions/ActionRegistration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ public ActionRegistration(
Invoker<TParam, Unit> invoker,
ControlPanelFactory<TParam> controlPanelFactory,
MessageWriters messageWriters,
Postman postman,
UtcNow utcNow
) : base(storedType, postman, functionStore, utcNow)
) : base(storedType, functionStore, utcNow)
{
Type = flowType;
_invoker = invoker;
Expand Down Expand Up @@ -59,8 +58,8 @@ public async Task SendMessage<T>(
FlowInstance flowInstance,
T message,
string? idempotencyKey = null
) where T : class => await Postman.SendMessage(StoredId.Create(StoredType, flowInstance.Value), message, idempotencyKey);
) where T : class => await MessageWriters.For(flowInstance).AppendMessage(message, idempotencyKey);

public async Task SendMessages(IReadOnlyList<BatchedMessage> messages)
=> await Postman.SendMessages(messages);
=> await MessageWriters.AppendMessages(messages);
}
7 changes: 2 additions & 5 deletions Core/Cleipnir.ResilientFunctions/BaseRegistration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,22 @@
using System.Threading.Tasks;
using Cleipnir.ResilientFunctions.CoreRuntime;
using Cleipnir.ResilientFunctions.Domain;
using Cleipnir.ResilientFunctions.Messaging;
using Cleipnir.ResilientFunctions.Storage;

namespace Cleipnir.ResilientFunctions;

public abstract class BaseRegistration
{
private readonly IFunctionStore _functionStore;
protected Postman Postman { get; }
public StoredType StoredType { get; }
protected UtcNow UtcNow { get; }

protected BaseRegistration(StoredType storedType, Postman postman, IFunctionStore functionStore, UtcNow utcNow)
protected BaseRegistration(StoredType storedType, IFunctionStore functionStore, UtcNow utcNow)
{
_functionStore = functionStore;
StoredType = storedType;
Postman = postman;
UtcNow = utcNow;
}
}

public StoredId MapToStoredId(FlowInstance instance) => StoredId.Create(StoredType, instance.Value);

Expand Down
6 changes: 0 additions & 6 deletions Core/Cleipnir.ResilientFunctions/Domain/Result.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,6 @@

namespace Cleipnir.ResilientFunctions.Domain;

public static class Result
{
public static Result<T> SucceedWithValue<T>(T value) => new(value);
public static Result<Unit> SucceedWithUnit { get; } = new Result<Unit>(Unit.Instance);
}

public static class Succeed
{
public static Result<T> WithValue<T>(T value) => new Result<T>(value);
Expand Down
7 changes: 3 additions & 4 deletions Core/Cleipnir.ResilientFunctions/FuncRegistration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@ public FuncRegistration(
Invoker<TParam, TReturn> invoker,
ControlPanelFactory<TParam, TReturn> controlPanelFactory,
MessageWriters messageWriters,
Postman postman,
UtcNow utcNow
) : base(storedType, postman, functionStore, utcNow)
) : base(storedType, functionStore, utcNow)
{
Type = flowType;
_invoker = invoker;
Expand Down Expand Up @@ -59,8 +58,8 @@ public async Task SendMessage<T>(
FlowInstance flowInstance,
T message,
string? idempotencyKey = null
) where T : class => await Postman.SendMessage(StoredId.Create(StoredType, flowInstance.Value), message, idempotencyKey);
) where T : class => await MessageWriters.For(flowInstance).AppendMessage(message, idempotencyKey);

public async Task SendMessages(IReadOnlyList<BatchedMessage> messages)
=> await Postman.SendMessages(messages);
=> await MessageWriters.AppendMessages(messages);
}
9 changes: 0 additions & 9 deletions Core/Cleipnir.ResilientFunctions/FunctionsRegistry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -258,16 +258,13 @@ public FuncRegistration<TParam, TReturn> RegisterFunc<TParam, TReturn>(
serializer
);

var postman = new Postman(messageWriters);

var registration = new FuncRegistration<TParam, TReturn>(
flowType,
storedType,
_functionStore,
invoker,
controlPanels,
messageWriters,
postman,
_settings.UtcNow
);
_functions[flowType] = registration;
Expand Down Expand Up @@ -343,16 +340,13 @@ private ParamlessRegistration RegisterParamless(
serializer
);

var postman = new Postman(messageWriters);

var registration = new ParamlessRegistration(
flowType,
storedType,
_functionStore,
invoker,
controlPanels,
messageWriters,
postman,
_settings.UtcNow
);
_functions[flowType] = registration;
Expand Down Expand Up @@ -427,16 +421,13 @@ public ActionRegistration<TParam> RegisterAction<TParam>(
_functionStore,
serializer
);
var postman = new Postman(messageWriters);

var registration = new ActionRegistration<TParam>(
flowType,
storedType,
_functionStore,
rActionInvoker,
controlPanels,
messageWriters,
postman,
_settings.UtcNow
);
_functions[flowType] = registration;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,3 @@
namespace Cleipnir.ResilientFunctions.Messaging;
namespace Cleipnir.ResilientFunctions.Messaging;

public record MessageAndIdempotencyKey(object Message, string? IdempotencyKey = null);

public static class MessageAndIdempotencyKeyExtensions
{
public static MessageAndIdempotencyKey ToMessageAndIdempotencyKey(this object message, string? idempotencyKey = null)
=> new(message, idempotencyKey);
}
public record MessageAndIdempotencyKey(object Message, string? IdempotencyKey = null);
17 changes: 0 additions & 17 deletions Core/Cleipnir.ResilientFunctions/Messaging/Postman.cs

This file was deleted.

7 changes: 3 additions & 4 deletions Core/Cleipnir.ResilientFunctions/ParamlessRegistration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ public ParamlessRegistration(
Invoker<Unit, Unit> invoker,
ControlPanelFactory controlPanelFactory,
MessageWriters messageWriters,
Postman postman,
UtcNow utcNow
) : base(storedType, postman, functionStore, utcNow)
) : base(storedType, functionStore, utcNow)
{
Type = flowType;
_invoker = invoker;
Expand Down Expand Up @@ -70,9 +69,9 @@ public async Task SendMessage<T>(
await Schedule(flowInstance);
}

await Postman.SendMessage(StoredId.Create(StoredType, flowInstance.Value), message, idempotencyKey);
await MessageWriters.For(flowInstance).AppendMessage(message, idempotencyKey);
}

public async Task SendMessages(IReadOnlyList<BatchedMessage> messages)
=> await Postman.SendMessages(messages);
=> await MessageWriters.AppendMessages(messages);
}
1 change: 0 additions & 1 deletion Core/Cleipnir.ResilientFunctions/Storage/IFunctionStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,5 +110,4 @@ Task<bool> SuspendFunction(

IFunctionStore WithPrefix(string prefix);
Task<IReadOnlyDictionary<StoredId, byte[]?>> GetResults(IEnumerable<StoredId> storedIds);
Task SetResult(StoredId storedId, byte[] result, ReplicaId expectedReplica);
}
16 changes: 0 additions & 16 deletions Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -563,22 +563,6 @@ public virtual Task<bool> DeleteFunction(StoredId storedId)
}
}

public Task SetResult(StoredId storedId, byte[] result, ReplicaId expectedReplica)
{
lock (_sync)
{
if (!_states.ContainsKey(storedId))
return Task.CompletedTask;

var state = _states[storedId];
if (state.Owner != expectedReplica)
return Task.CompletedTask;

state.Result = result;
return Task.CompletedTask;
}
}

private class InnerState
{
public StoredId StoredId { get; init; } = null!;
Expand Down
5 changes: 0 additions & 5 deletions Samples/Sample.ConsoleApp/Utils/CrashableFunctionStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,6 @@ public Task<bool> DeleteFunction(StoredId storedId)
? Task.FromException<IReadOnlyDictionary<StoredId, byte[]?>>(new TimeoutException())
: _inner.GetResults(storedIds);

public Task SetResult(StoredId storedId, byte[] result, ReplicaId expectedReplica)
=> _crashed
? Task.FromException(new TimeoutException())
: _inner.SetResult(storedId, result, expectedReplica);

public IFunctionStore WithPrefix(string prefix)
=> _inner.WithPrefix(prefix);
}
Original file line number Diff line number Diff line change
Expand Up @@ -258,16 +258,4 @@ public override Task GetResultsReturnsEmptyDictionaryForEmptyInput()
[TestMethod]
public override Task GetResultsReturnsOnlyExistingFunctionResults()
=> GetResultsReturnsOnlyExistingFunctionResults(FunctionStoreFactory.Create());

[TestMethod]
public override Task SetResultSucceedsWhenOwnerMatches()
=> SetResultSucceedsWhenOwnerMatches(FunctionStoreFactory.Create());

[TestMethod]
public override Task SetResultDoesNothingWhenOwnerDoesNotMatch()
=> SetResultDoesNothingWhenOwnerDoesNotMatch(FunctionStoreFactory.Create());

[TestMethod]
public override Task SetResultDoesNothingWhenFunctionDoesNotExist()
=> SetResultDoesNothingWhenFunctionDoesNotExist(FunctionStoreFactory.Create());
}
Original file line number Diff line number Diff line change
Expand Up @@ -913,28 +913,6 @@ public IFunctionStore WithPrefix(string prefix)
return results;
}

private string? _setResultSql;
public async Task SetResult(StoredId storedId, byte[] result, ReplicaId expectedReplica)
{
await using var conn = await CreateOpenConnection(_connectionString);
_setResultSql ??= $@"
UPDATE {_tablePrefix}
SET result_json = ?
WHERE id = ? AND owner = ?";

await using var command = new MySqlCommand(_setResultSql, conn)
{
Parameters =
{
new() { Value = result },
new() { Value = storedId.AsGuid.ToString("N") },
new() { Value = expectedReplica.AsGuid.ToString("N") }
}
};

await command.ExecuteNonQueryAsync();
}

private string? _deleteFunctionSql;
private async Task<bool> DeleteStoredFunction(StoredId storedId)
{
Expand Down
Loading