From 3f0313282d6911f29f16f448f4d820b23f08ef63 Mon Sep 17 00:00:00 2001 From: stidsborg Date: Sun, 26 Apr 2026 11:17:50 +0200 Subject: [PATCH 1/2] Remove Utilities record and Register subsystem The Utilities record was a thin wrapper around IRegister, exposed via IFunctionStore.Utilities and Workflow.Utilities. Neither the framework runtime nor any sample/test consumed it, so the record, IRegister, Register, IUnderlyingRegister, the per-store underlying registers, and their tests are dead. The store-level _register tables are no longer created or truncated. --- .../InMemoryTests/UtilsTests/RegisterTests.cs | 61 ----- .../TestTemplates/UtilsTests/RegisterTests.cs | 158 ----------- .../WatchDogsTests/CrashableFunctionStore.cs | 2 - .../CoreRuntime/Invocation/Invoker.cs | 6 +- .../CoreRuntime/Invocation/Utilities.cs | 10 - .../CoreRuntime/Invocation/Workflow.cs | 4 +- .../FunctionsRegistry.cs | 3 - .../Storage/IFunctionStore.cs | 2 - .../Storage/InMemoryFunctionStore.cs | 10 - .../Utils/IUnderlyingRegister.cs | 13 - .../Utils/Register/IRegister.cs | 13 - .../Utils/Register/Register.cs | 28 -- .../Utils/RegisterType.cs | 7 - .../Utils/UnderlyingInMemoryRegister.cs | 78 ------ .../Utils/CrashableFunctionStore.cs | 2 - .../UtilTests/RegisterTests.cs | 65 ----- .../MariaDbFunctionStore.cs | 15 +- .../MariaDbUnderlyingRegister.cs | 215 --------------- .../UtilTests/RegisterTests.cs | 65 ----- .../PostgreSqlFunctionStore.cs | 14 +- .../PostgresSqlUnderlyingRegister.cs | 242 ----------------- .../UtilTests/RegisterTests.cs | 65 ----- .../SqlServerFunctionStore.cs | 12 +- .../SqlServerUnderlyingRegister.cs | 247 ------------------ 24 files changed, 10 insertions(+), 1327 deletions(-) delete mode 100644 Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/UtilsTests/RegisterTests.cs delete mode 100644 Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/UtilsTests/RegisterTests.cs delete mode 100644 Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Utilities.cs delete mode 100644 Core/Cleipnir.ResilientFunctions/Utils/IUnderlyingRegister.cs delete mode 100644 Core/Cleipnir.ResilientFunctions/Utils/Register/IRegister.cs delete mode 100644 Core/Cleipnir.ResilientFunctions/Utils/Register/Register.cs delete mode 100644 Core/Cleipnir.ResilientFunctions/Utils/RegisterType.cs delete mode 100644 Core/Cleipnir.ResilientFunctions/Utils/UnderlyingInMemoryRegister.cs delete mode 100644 Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/UtilTests/RegisterTests.cs delete mode 100644 Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbUnderlyingRegister.cs delete mode 100644 Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/UtilTests/RegisterTests.cs delete mode 100644 Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgresSqlUnderlyingRegister.cs delete mode 100644 Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/UtilTests/RegisterTests.cs delete mode 100644 Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerUnderlyingRegister.cs diff --git a/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/UtilsTests/RegisterTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/UtilsTests/RegisterTests.cs deleted file mode 100644 index 00e360447..000000000 --- a/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/UtilsTests/RegisterTests.cs +++ /dev/null @@ -1,61 +0,0 @@ -using System.Threading.Tasks; -using Cleipnir.ResilientFunctions.Helpers; -using Cleipnir.ResilientFunctions.Utils; -using Cleipnir.ResilientFunctions.Utils.Register; -using Microsoft.VisualStudio.TestTools.UnitTesting; - -namespace Cleipnir.ResilientFunctions.Tests.InMemoryTests.UtilsTests; - -[TestClass] -public class RegisterTests : Cleipnir.ResilientFunctions.Tests.TestTemplates.UtilsTests.RegisterTests -{ - [TestMethod] - public override Task SetValueWithNoExistingValueSucceeds() - => SetValueWithNoExistingValueSucceeds(CreateInMemoryRegister()); - - [TestMethod] - public override Task CompareAndSwapWithNoExistingValueSucceeds() - => CompareAndSwapWithNoExistingValueSucceeds(CreateInMemoryRegister()); - - [TestMethod] - public override Task CompareAndSwapFailsWithNoExistingValue() - => CompareAndSwapFailsWithNoExistingValue(CreateInMemoryRegister()); - - [TestMethod] - public override Task SetValueIfEmptyFailsWhenRegisterHasExistingValue() - => SetValueIfEmptyFailsWhenRegisterHasExistingValue(CreateInMemoryRegister()); - - [TestMethod] - public override Task CompareAndSwapSucceedsIfAsExpected() - => CompareAndSwapSucceedsIfAsExpected(CreateInMemoryRegister()); - - [TestMethod] - public override Task CompareAndSwapSucceedsIfAsExpectedIgnoreIfNoExisting() - => CompareAndSwapSucceedsIfAsExpectedIgnoreIfNoExisting(CreateInMemoryRegister()); - - [TestMethod] - public override Task ExistsIfFalseForNonExistingRegister() - => ExistsIfFalseForNonExistingRegister(CreateInMemoryRegister()); - - [TestMethod] - public override Task ExistingValueIsNullForNonExistingRegister() - => ExistingValueIsNullForNonExistingRegister(CreateInMemoryRegister()); - - [TestMethod] - public override Task DeleteSucceedsForNonExistingRegister() - => DeleteSucceedsForNonExistingRegister(CreateInMemoryRegister()); - - [TestMethod] - public override Task DeleteSucceedsForExistingRegister() - => DeleteSucceedsForExistingRegister(CreateInMemoryRegister()); - - [TestMethod] - public override Task DeleteSucceedsWithExpectedValueForExistingRegister() - => DeleteSucceedsWithExpectedValueForExistingRegister(CreateInMemoryRegister()); - - [TestMethod] - public override Task DeleteFailsWhenNonExpectedValueForExistingRegister() - => DeleteFailsWhenNonExpectedValueForExistingRegister(CreateInMemoryRegister()); - - private Task CreateInMemoryRegister() => new Register(new UnderlyingInMemoryRegister()).CastTo().ToTask(); -} \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/UtilsTests/RegisterTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/UtilsTests/RegisterTests.cs deleted file mode 100644 index 16188de87..000000000 --- a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/UtilsTests/RegisterTests.cs +++ /dev/null @@ -1,158 +0,0 @@ -using System.Runtime.CompilerServices; -using System.Threading.Tasks; -using Cleipnir.ResilientFunctions.Tests.Utils; -using Cleipnir.ResilientFunctions.Utils.Register; -using Shouldly; - -namespace Cleipnir.ResilientFunctions.Tests.TestTemplates.UtilsTests; - -public abstract class RegisterTests -{ - public abstract Task SetValueWithNoExistingValueSucceeds(); - protected async Task SetValueWithNoExistingValueSucceeds(Task registerTask) - { - var register = await registerTask; - var (group, key) = GetGroupAndKey(); - - await register.Exists(group, key).ShouldBeFalseAsync(); - await register.SetIfEmpty(group, key, value: "hello world").ShouldBeTrueAsync(); - - var value = await register.Get(group, key); - value.ShouldBe("hello world"); - - await register.Exists(group, key).ShouldBeTrueAsync(); - } - - public abstract Task SetValueIfEmptyFailsWhenRegisterHasExistingValue(); - protected async Task SetValueIfEmptyFailsWhenRegisterHasExistingValue(Task registerTask) - { - var register = await registerTask; - var (group, key) = GetGroupAndKey(); - - await register.SetIfEmpty(group, key, value: "hello world").ShouldBeTrueAsync(); - await register.SetIfEmpty(group, key, value: "hello universe").ShouldBeFalseAsync(); - - var value = await register.Get(group, key); - value.ShouldBe("hello world"); - } - - public abstract Task CompareAndSwapWithNoExistingValueSucceeds(); - protected async Task CompareAndSwapWithNoExistingValueSucceeds(Task registerTask) - { - var register = await registerTask; - var (group, key) = GetGroupAndKey(); - - await register.CompareAndSwap(group, key, newValue: "hello world", expectedValue: "", setIfEmpty: true).ShouldBeTrueAsync(); - - var value = await register.Get(group, key); - value.ShouldBe("hello world"); - } - - public abstract Task CompareAndSwapFailsWithNoExistingValue(); - protected async Task CompareAndSwapFailsWithNoExistingValue(Task registerTask) - { - var register = await registerTask; - var (group, key) = GetGroupAndKey(); - - await register.CompareAndSwap(group, key, newValue: "hello world", expectedValue: "", setIfEmpty: false).ShouldBeFalseAsync(); - - await register.Exists(group, key).ShouldBeFalseAsync(); - } - - public abstract Task CompareAndSwapSucceedsIfAsExpected(); - protected async Task CompareAndSwapSucceedsIfAsExpected(Task registerTask) - { - var register = await registerTask; - var (group, key) = GetGroupAndKey(); - - await register.SetIfEmpty(group, key, value: "hello world").ShouldBeTrueAsync(); - - await register.CompareAndSwap(group, key, newValue: "hello universe", expectedValue: "hello world"); - await register.SetIfEmpty(group, key, value: "hello universe").ShouldBeFalseAsync(); - - var value = await register.Get(group, key); - value.ShouldBe("hello universe"); - } - - public abstract Task CompareAndSwapSucceedsIfAsExpectedIgnoreIfNoExisting(); - protected async Task CompareAndSwapSucceedsIfAsExpectedIgnoreIfNoExisting(Task registerTask) - { - var register = await registerTask; - var (group, key) = GetGroupAndKey(); - - await register.SetIfEmpty(group, key, value: "hello world").ShouldBeTrueAsync(); - - await register.CompareAndSwap(group, key, newValue: "hello universe", expectedValue: "hello world", setIfEmpty: false); - await register.SetIfEmpty(group, key, value: "hello universe").ShouldBeFalseAsync(); - - var value = await register.Get(group, key); - value.ShouldBe("hello universe"); - } - - public abstract Task ExistsIfFalseForNonExistingRegister(); - protected async Task ExistsIfFalseForNonExistingRegister(Task registerTask) - { - var register = await registerTask; - var (group, key) = GetGroupAndKey(); - - await register.Exists(group, key).ShouldBeFalseAsync(); - } - - public abstract Task ExistingValueIsNullForNonExistingRegister(); - protected async Task ExistingValueIsNullForNonExistingRegister(Task registerTask) - { - var register = await registerTask; - var (group, key) = GetGroupAndKey(); - - await register.Get(group, key).ShouldBeNullAsync(); - } - - public abstract Task DeleteSucceedsForNonExistingRegister(); - protected async Task DeleteSucceedsForNonExistingRegister(Task registerTask) - { - var register = await registerTask; - var (group, key) = GetGroupAndKey(); - - await register.Delete(group, key); - } - - public abstract Task DeleteSucceedsForExistingRegister(); - protected async Task DeleteSucceedsForExistingRegister(Task registerTask) - { - var register = await registerTask; - var (group, key) = GetGroupAndKey(); - - await register.SetIfEmpty(group, key, value: "hello world").ShouldBeTrueAsync(); - await register.Delete(group, key); - - await register.Exists(group, key).ShouldBeFalseAsync(); - } - - public abstract Task DeleteSucceedsWithExpectedValueForExistingRegister(); - protected async Task DeleteSucceedsWithExpectedValueForExistingRegister(Task registerTask) - { - var register = await registerTask; - var (group, key) = GetGroupAndKey(); - - await register.SetIfEmpty(group, key, value: "hello world").ShouldBeTrueAsync(); - await register.Delete(group, key, "hello world").ShouldBeTrueAsync(); - - await register.Exists(group, key).ShouldBeFalseAsync(); - } - - public abstract Task DeleteFailsWhenNonExpectedValueForExistingRegister(); - protected async Task DeleteFailsWhenNonExpectedValueForExistingRegister(Task registerTask) - { - var register = await registerTask; - var (group, name) = GetGroupAndKey(); - - await register.SetIfEmpty(group, name, value: "hello world").ShouldBeTrueAsync(); - await register.Delete(group, name, "hello universe").ShouldBeFalseAsync(); - - await register.Exists(group, name).ShouldBeTrueAsync(); - } - - private record GroupAndKey(string Group, string Key); - private static GroupAndKey GetGroupAndKey([CallerMemberName] string memberName = "") => - new(Group: nameof(RegisterTests), Key: memberName); -} \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/CrashableFunctionStore.cs b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/CrashableFunctionStore.cs index c99e72ba8..c012cb425 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/CrashableFunctionStore.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/CrashableFunctionStore.cs @@ -1,7 +1,6 @@ using System; using System.Collections.Generic; using System.Threading.Tasks; -using Cleipnir.ResilientFunctions.CoreRuntime.Invocation; using Cleipnir.ResilientFunctions.Domain; using Cleipnir.ResilientFunctions.Messaging; using Cleipnir.ResilientFunctions.Storage; @@ -22,7 +21,6 @@ public class CrashableFunctionStore : IFunctionStore private readonly CrashableEffectStore _crashableEffectStore; public IEffectsStore EffectsStore => _crashableEffectStore; public ICorrelationStore CorrelationStore => _crashed ? throw new TimeoutException() : _inner.CorrelationStore; - public Utilities Utilities => _crashed ? throw new TimeoutException() : _inner.Utilities; public IReplicaStore ReplicaStore => _crashed ? throw new TimeoutException() : _inner.ReplicaStore; public CrashableFunctionStore(IFunctionStore inner) diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs index 0ae51144f..5ff19eeb9 100644 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs @@ -20,7 +20,6 @@ public class Invoker private readonly InvocationHelper _invocationHelper; private readonly UnhandledExceptionHandler _unhandledExceptionHandler; - private readonly Utilities _utilities; private readonly FlowsManager _flowsManager; internal Invoker( @@ -28,7 +27,6 @@ internal Invoker( Func>> inner, InvocationHelper invocationHelper, UnhandledExceptionHandler unhandledExceptionHandler, - Utilities utilities, ReplicaId replicaId, FlowsManager flowsManager ) @@ -39,7 +37,6 @@ FlowsManager flowsManager _inner = inner; _invocationHelper = invocationHelper; _unhandledExceptionHandler = unhandledExceptionHandler; - _utilities = utilities; _flowsManager = flowsManager; } @@ -289,7 +286,7 @@ await _invocationHelper.PersistFunctionInStore( var queueManager = _invocationHelper.CreateQueueManager(flowId, storedId, effect, flowState, flowTimeouts, _unhandledExceptionHandler); disposables.Add(queueManager); var messageWriter = _invocationHelper.CreateMessageWriter(storedId); - var workflow = new Workflow(flowId, storedId, effect, _utilities, correlations, queueManager, _invocationHelper.UtcNow, messageWriter); + var workflow = new Workflow(flowId, storedId, effect, correlations, queueManager, _invocationHelper.UtcNow, messageWriter); return new PreparedInvocation( persisted, @@ -346,7 +343,6 @@ private async Task PrepareForReInvocation(StoredId storedI flowId, storedId, effect, - _utilities, correlations, queueManager, _invocationHelper.UtcNow, diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Utilities.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Utilities.cs deleted file mode 100644 index 4e46268cb..000000000 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Utilities.cs +++ /dev/null @@ -1,10 +0,0 @@ -using Cleipnir.ResilientFunctions.Utils; -using Cleipnir.ResilientFunctions.Utils.Register; - -namespace Cleipnir.ResilientFunctions.CoreRuntime.Invocation; - -public record Utilities(IRegister Register) -{ - public Utilities(IUnderlyingRegister underlyingRegister) - : this(new Register(underlyingRegister)) {} -} \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Workflow.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Workflow.cs index 265caec71..c1e01be0b 100644 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Workflow.cs +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Workflow.cs @@ -13,7 +13,6 @@ public class Workflow public FlowId FlowId { get; } internal StoredId StoredId { get; } public Effect Effect { get; } - public Utilities Utilities { get; } public Correlations Correlations { get; } private QueueManager _queueManager; @@ -21,11 +20,10 @@ public class Workflow private MessageWriter MessageWriter { get; } - public Workflow(FlowId flowId, StoredId storedId, Effect effect, Utilities utilities, Correlations correlations, QueueManager queueManager, UtcNow utcNow, MessageWriter messageWriter) + public Workflow(FlowId flowId, StoredId storedId, Effect effect, Correlations correlations, QueueManager queueManager, UtcNow utcNow, MessageWriter messageWriter) { FlowId = flowId; StoredId = storedId; - Utilities = utilities; Effect = effect; Correlations = correlations; _queueManager = queueManager; diff --git a/Core/Cleipnir.ResilientFunctions/FunctionsRegistry.cs b/Core/Cleipnir.ResilientFunctions/FunctionsRegistry.cs index 303cbab5e..eec12ec94 100644 --- a/Core/Cleipnir.ResilientFunctions/FunctionsRegistry.cs +++ b/Core/Cleipnir.ResilientFunctions/FunctionsRegistry.cs @@ -228,7 +228,6 @@ public FuncRegistration RegisterFunc( inner, invocationHelper, settingsWithDefaults.UnhandledExceptionHandler, - _functionStore.Utilities, ClusterInfo.ReplicaId, _flowsManager ); @@ -318,7 +317,6 @@ private ParamlessRegistration RegisterParamless( inner, invocationHelper, settingsWithDefaults.UnhandledExceptionHandler, - _functionStore.Utilities, ClusterInfo.ReplicaId, _flowsManager ); @@ -408,7 +406,6 @@ public ActionRegistration RegisterAction( inner, invocationHelper, settingsWithDefaults.UnhandledExceptionHandler, - _functionStore.Utilities, ClusterInfo.ReplicaId, _flowsManager ); diff --git a/Core/Cleipnir.ResilientFunctions/Storage/IFunctionStore.cs b/Core/Cleipnir.ResilientFunctions/Storage/IFunctionStore.cs index 4c1f59bc5..3d9678fcb 100644 --- a/Core/Cleipnir.ResilientFunctions/Storage/IFunctionStore.cs +++ b/Core/Cleipnir.ResilientFunctions/Storage/IFunctionStore.cs @@ -1,6 +1,5 @@ using System.Collections.Generic; using System.Threading.Tasks; -using Cleipnir.ResilientFunctions.CoreRuntime.Invocation; using Cleipnir.ResilientFunctions.Domain; using Cleipnir.ResilientFunctions.Messaging; using Cleipnir.ResilientFunctions.Storage.Session; @@ -13,7 +12,6 @@ public interface IFunctionStore public IMessageStore MessageStore { get; } public IEffectsStore EffectsStore { get; } public ICorrelationStore CorrelationStore { get; } - public Utilities Utilities { get; } public IReplicaStore ReplicaStore { get; } public Task Initialize(); diff --git a/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs b/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs index 9f2dd5f36..03c0cd07f 100644 --- a/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs +++ b/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs @@ -3,13 +3,10 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; -using Cleipnir.ResilientFunctions.CoreRuntime.Invocation; using Cleipnir.ResilientFunctions.Domain; using Cleipnir.ResilientFunctions.Helpers; using Cleipnir.ResilientFunctions.Messaging; using Cleipnir.ResilientFunctions.Storage.Session; -using Cleipnir.ResilientFunctions.Utils; -using Cleipnir.ResilientFunctions.Utils.Register; namespace Cleipnir.ResilientFunctions.Storage; @@ -25,17 +22,10 @@ public class InMemoryFunctionStore : IFunctionStore, IMessageStore public IEffectsStore EffectsStore => _effectsStore; private readonly InMemoryCorrelationStore _correlationStore = new(); public ICorrelationStore CorrelationStore => _correlationStore; - public Utilities Utilities { get; } public IReplicaStore ReplicaStore { get; } = new InMemoryReplicaStore(); public Task Initialize() => Task.CompletedTask; - public InMemoryFunctionStore() - { - var underlyingRegister = new UnderlyingInMemoryRegister(); - Utilities = new Utilities(new Register(underlyingRegister)); - } - #region FunctionStore public virtual Task CreateFunction( diff --git a/Core/Cleipnir.ResilientFunctions/Utils/IUnderlyingRegister.cs b/Core/Cleipnir.ResilientFunctions/Utils/IUnderlyingRegister.cs deleted file mode 100644 index 7eeaa237b..000000000 --- a/Core/Cleipnir.ResilientFunctions/Utils/IUnderlyingRegister.cs +++ /dev/null @@ -1,13 +0,0 @@ -using System.Threading.Tasks; - -namespace Cleipnir.ResilientFunctions.Utils; - -public interface IUnderlyingRegister -{ - Task SetIfEmpty(RegisterType registerType, string group, string name, string value); - Task CompareAndSwap(RegisterType registerType, string group, string name, string newValue, string expectedValue, bool setIfEmpty = true); - Task Get(RegisterType registerType, string group, string name); - Task Delete(RegisterType registerType, string group, string name, string expectedValue); - Task Delete(RegisterType registerType, string group, string name); - Task Exists(RegisterType registerType, string group, string name); -} \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Utils/Register/IRegister.cs b/Core/Cleipnir.ResilientFunctions/Utils/Register/IRegister.cs deleted file mode 100644 index 0f015e644..000000000 --- a/Core/Cleipnir.ResilientFunctions/Utils/Register/IRegister.cs +++ /dev/null @@ -1,13 +0,0 @@ -using System.Threading.Tasks; - -namespace Cleipnir.ResilientFunctions.Utils.Register; - -public interface IRegister -{ - Task SetIfEmpty(string group, string name, string value); - Task CompareAndSwap(string group, string name, string newValue, string expectedValue, bool setIfEmpty = true); - Task Get(string group, string name); - Task Delete(string group, string name, string expectedValue); - Task Delete(string group, string name); - Task Exists(string group, string name); -} \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Utils/Register/Register.cs b/Core/Cleipnir.ResilientFunctions/Utils/Register/Register.cs deleted file mode 100644 index 1cfdda56f..000000000 --- a/Core/Cleipnir.ResilientFunctions/Utils/Register/Register.cs +++ /dev/null @@ -1,28 +0,0 @@ -using System.Threading.Tasks; - -namespace Cleipnir.ResilientFunctions.Utils.Register; - -public class Register : IRegister -{ - private readonly IUnderlyingRegister _underlyingRegister; - - public Register(IUnderlyingRegister underlyingRegister) => _underlyingRegister = underlyingRegister; - - public Task SetIfEmpty(string group, string name, string value) - => _underlyingRegister.SetIfEmpty(RegisterType.Register, group, name, value); - - public Task CompareAndSwap(string group, string name, string newValue, string expectedValue, bool setIfEmpty = true) - => _underlyingRegister.CompareAndSwap(RegisterType.Register, group, name, newValue, expectedValue, setIfEmpty); - - public Task Get(string group, string name) - => _underlyingRegister.Get(RegisterType.Register, group, name); - - public Task Delete(string group, string name, string expectedValue) - => _underlyingRegister.Delete(RegisterType.Register, group, name, expectedValue); - - public Task Delete(string group, string name) - => _underlyingRegister.Delete(RegisterType.Register, group, name); - - public Task Exists(string group, string name) - => _underlyingRegister.Exists(RegisterType.Register, group, name); -} \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Utils/RegisterType.cs b/Core/Cleipnir.ResilientFunctions/Utils/RegisterType.cs deleted file mode 100644 index 4c0fc4fc9..000000000 --- a/Core/Cleipnir.ResilientFunctions/Utils/RegisterType.cs +++ /dev/null @@ -1,7 +0,0 @@ -namespace Cleipnir.ResilientFunctions.Utils; - -public enum RegisterType -{ - Register = 0, - Monitor = 1 -} \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Utils/UnderlyingInMemoryRegister.cs b/Core/Cleipnir.ResilientFunctions/Utils/UnderlyingInMemoryRegister.cs deleted file mode 100644 index cb71a2e29..000000000 --- a/Core/Cleipnir.ResilientFunctions/Utils/UnderlyingInMemoryRegister.cs +++ /dev/null @@ -1,78 +0,0 @@ -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; -using Cleipnir.ResilientFunctions.Helpers; - -namespace Cleipnir.ResilientFunctions.Utils; - -public class UnderlyingInMemoryRegister : IUnderlyingRegister -{ - private readonly Dictionary _dictionary = new(); - private readonly Lock _sync = new(); - - public Task SetIfEmpty(RegisterType registerType, string group, string name, string value) - { - var id = new Id(registerType, group, name); - lock (_sync) - if (_dictionary.ContainsKey(id)) - return false.ToTask(); - else - _dictionary[id] = value; - - return true.ToTask(); - } - - public Task CompareAndSwap(RegisterType registerType, string group, string name, string newValue, string expectedValue, bool setIfEmpty = true) - { - var id = new Id(registerType, group, name); - lock (_sync) - if (!setIfEmpty && !_dictionary.ContainsKey(id)) - return false.ToTask(); - else if (!_dictionary.ContainsKey(id) || _dictionary[id].Equals(expectedValue)) - _dictionary[id] = newValue; - else - return false.ToTask(); - - return true.ToTask(); - } - - public Task Get(RegisterType registerType, string group, string name) - { - var id = new Id(registerType, group, name); - lock (_sync) - if (_dictionary.ContainsKey(id)) - return ((string?) _dictionary[id]).ToTask(); - - return default(string).ToTask(); - } - - public Task Delete(RegisterType registerType, string group, string name, string expectedValue) - { - var id = new Id(registerType, group, name); - lock (_sync) - if (!_dictionary.ContainsKey(id) || _dictionary[id].Equals(expectedValue)) - _dictionary.Remove(id); - else - return false.ToTask(); - - return true.ToTask(); - } - - public Task Delete(RegisterType registerType, string group, string name) - { - var id = new Id(registerType, group, name); - lock (_sync) - _dictionary.Remove(id); - - return Task.CompletedTask; - } - - public Task Exists(RegisterType registerType, string group, string name) - { - var id = new Id(registerType, group, name); - lock (_sync) - return _dictionary.ContainsKey(id).ToTask(); - } - - private record Id(RegisterType RegisterType, string Group, string Name); -} \ No newline at end of file diff --git a/Samples/Sample.ConsoleApp/Utils/CrashableFunctionStore.cs b/Samples/Sample.ConsoleApp/Utils/CrashableFunctionStore.cs index 49508c074..30d036e12 100644 --- a/Samples/Sample.ConsoleApp/Utils/CrashableFunctionStore.cs +++ b/Samples/Sample.ConsoleApp/Utils/CrashableFunctionStore.cs @@ -1,7 +1,6 @@ using System; using System.Collections.Generic; using System.Threading.Tasks; -using Cleipnir.ResilientFunctions.CoreRuntime.Invocation; using Cleipnir.ResilientFunctions.Domain; using Cleipnir.ResilientFunctions.Messaging; using Cleipnir.ResilientFunctions.Storage; @@ -18,7 +17,6 @@ public class CrashableFunctionStore : IFunctionStore public IMessageStore MessageStore => _inner.MessageStore; public IEffectsStore EffectsStore => _inner.EffectsStore; public ICorrelationStore CorrelationStore => _inner.CorrelationStore; - public Utilities Utilities => _inner.Utilities; public IReplicaStore ReplicaStore => _inner.ReplicaStore; public CrashableFunctionStore(IFunctionStore inner) => _inner = inner; diff --git a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/UtilTests/RegisterTests.cs b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/UtilTests/RegisterTests.cs deleted file mode 100644 index 175509f92..000000000 --- a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/UtilTests/RegisterTests.cs +++ /dev/null @@ -1,65 +0,0 @@ -using System.Runtime.CompilerServices; -using Cleipnir.ResilientFunctions.Tests.Utils; -using Cleipnir.ResilientFunctions.Utils.Register; -using Microsoft.VisualStudio.TestTools.UnitTesting; - -namespace Cleipnir.ResilientFunctions.MariaDb.Tests.UtilTests; - -[TestClass] -public class RegisterTests : Cleipnir.ResilientFunctions.Tests.TestTemplates.UtilsTests.RegisterTests -{ - [TestMethod] - public override Task SetValueWithNoExistingValueSucceeds() - => SetValueWithNoExistingValueSucceeds(CreateAndInitializeRegister()); - - [TestMethod] - public override Task CompareAndSwapWithNoExistingValueSucceeds() - => CompareAndSwapWithNoExistingValueSucceeds(CreateAndInitializeRegister()); - - [TestMethod] - public override Task CompareAndSwapFailsWithNoExistingValue() - => CompareAndSwapFailsWithNoExistingValue(CreateAndInitializeRegister()); - - [TestMethod] - public override Task SetValueIfEmptyFailsWhenRegisterHasExistingValue() - => SetValueIfEmptyFailsWhenRegisterHasExistingValue(CreateAndInitializeRegister()); - - [TestMethod] - public override Task CompareAndSwapSucceedsIfAsExpected() - => CompareAndSwapSucceedsIfAsExpected(CreateAndInitializeRegister()); - - [TestMethod] - public override Task CompareAndSwapSucceedsIfAsExpectedIgnoreIfNoExisting() - => CompareAndSwapSucceedsIfAsExpectedIgnoreIfNoExisting(CreateAndInitializeRegister()); - - [TestMethod] - public override Task ExistsIfFalseForNonExistingRegister() - => ExistsIfFalseForNonExistingRegister(CreateAndInitializeRegister()); - - [TestMethod] - public override Task ExistingValueIsNullForNonExistingRegister() - => ExistingValueIsNullForNonExistingRegister(CreateAndInitializeRegister()); - - [TestMethod] - public override Task DeleteSucceedsForNonExistingRegister() - => DeleteSucceedsForNonExistingRegister(CreateAndInitializeRegister()); - - [TestMethod] - public override Task DeleteSucceedsForExistingRegister() - => DeleteSucceedsForExistingRegister(CreateAndInitializeRegister()); - - [TestMethod] - public override Task DeleteSucceedsWithExpectedValueForExistingRegister() - => DeleteSucceedsWithExpectedValueForExistingRegister(CreateAndInitializeRegister()); - - [TestMethod] - public override Task DeleteFailsWhenNonExpectedValueForExistingRegister() - => DeleteFailsWhenNonExpectedValueForExistingRegister(CreateAndInitializeRegister()); - - private async Task CreateAndInitializeRegister([CallerMemberName] string memberName = "") - { - var underlyingRegister = new MariaDbUnderlyingRegister(Sql.ConnectionString); - await underlyingRegister.Initialize(); - return new Register(underlyingRegister); - } -} \ No newline at end of file diff --git a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbFunctionStore.cs b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbFunctionStore.cs index a2f48d3f2..d9ae6fa45 100644 --- a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbFunctionStore.cs +++ b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbFunctionStore.cs @@ -1,6 +1,5 @@ using System.Text; using System.Text.Json; -using Cleipnir.ResilientFunctions.CoreRuntime.Invocation; using Cleipnir.ResilientFunctions.Domain; using Cleipnir.ResilientFunctions.Helpers; using Cleipnir.ResilientFunctions.MariaDB.StoreCommand; @@ -33,27 +32,21 @@ public class MariaDbFunctionStore : IFunctionStore private readonly MariaDbReplicaStore _replicaStore; public IReplicaStore ReplicaStore => _replicaStore; - public Utilities Utilities { get; } - private readonly MariaDbUnderlyingRegister _mariaDbUnderlyingRegister; - private readonly SqlGenerator _sqlGenerator; public MariaDbFunctionStore(string connectionString, string tablePrefix = "") { tablePrefix = tablePrefix == "" ? "rfunctions" : tablePrefix; - + _connectionString = connectionString; _tablePrefix = tablePrefix; _sqlGenerator = new SqlGenerator(tablePrefix); - + _messageStore = new MariaDbMessageStore(connectionString, _sqlGenerator, tablePrefix); _effectsStore = new MariaDbEffectsStore(connectionString, tablePrefix); _correlationStore = new MariaDbCorrelationStore(connectionString, tablePrefix); - _mariaDbUnderlyingRegister = new MariaDbUnderlyingRegister(connectionString, tablePrefix); _typeStore = new MariaDbTypeStore(connectionString, tablePrefix); _replicaStore = new MariaDbReplicaStore(connectionString, tablePrefix); - - Utilities = new Utilities(_mariaDbUnderlyingRegister); } private string? _initializeSql; @@ -61,8 +54,7 @@ public async Task Initialize() { if (await DoTablesAlreadyExist()) return; - - await _mariaDbUnderlyingRegister.Initialize(); + await MessageStore.Initialize(); await EffectsStore.Initialize(); await CorrelationStore.Initialize(); @@ -95,7 +87,6 @@ INDEX idx_interrupted (id, interrupted) public async Task TruncateTables() { await _messageStore.TruncateTable(); - await _mariaDbUnderlyingRegister.TruncateTable(); await _effectsStore.Truncate(); await _correlationStore.Truncate(); await _typeStore.Truncate(); diff --git a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbUnderlyingRegister.cs b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbUnderlyingRegister.cs deleted file mode 100644 index d53a9aef7..000000000 --- a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbUnderlyingRegister.cs +++ /dev/null @@ -1,215 +0,0 @@ -using Cleipnir.ResilientFunctions.Utils; -using MySqlConnector; - -namespace Cleipnir.ResilientFunctions.MariaDb; - -public class MariaDbUnderlyingRegister : IUnderlyingRegister -{ - private readonly string _connectionString; - private readonly string _tablePrefix; - - public MariaDbUnderlyingRegister(string connectionString, string tablePrefix = "") - { - _connectionString = connectionString; - _tablePrefix = tablePrefix; - } - - private string? _initializeSql; - public async Task Initialize() - { - await using var conn = await DatabaseHelper.CreateOpenConnection(_connectionString); - _initializeSql ??= @$" - CREATE TABLE IF NOT EXISTS {_tablePrefix}_register ( - registertype INT NOT NULL, - `group` VARCHAR(255) NOT NULL, - name VARCHAR(255) NOT NULL, - value VARCHAR(1024) NOT NULL, - PRIMARY KEY (registertype, `group`, name) - );"; - - await using var command = new MySqlCommand(_initializeSql, conn); - await command.ExecuteNonQueryAsync(); - } - - private string? _setIfEmptySql; - public async Task SetIfEmpty(RegisterType registerType, string group, string name, string value) - { - await using var conn = await DatabaseHelper.CreateOpenConnection(_connectionString); - _setIfEmptySql ??= @$" - INSERT IGNORE INTO {_tablePrefix}_register - (registertype, `group`, name, value) - VALUES - (?, ?, ?, ?);"; - - await using var command = new MySqlCommand(_setIfEmptySql, conn) - { - Parameters = - { - new() {Value = (int) registerType }, - new() {Value = group}, - new() {Value = name}, - new() {Value = value} - } - }; - var affectedRows = await command.ExecuteNonQueryAsync(); - return affectedRows > 0; - } - - private string? _compareAndSwapUpdateSql; - private string? _compareAndSwapUpsertSql; - public async Task CompareAndSwap(RegisterType registerType, string group, string name, string newValue, string expectedValue, bool setIfEmpty = true) - { - await using var conn = await DatabaseHelper.CreateOpenConnection(_connectionString); - - if (!setIfEmpty) - { - _compareAndSwapUpdateSql ??= @$" - UPDATE {_tablePrefix}_register - SET value = ? - WHERE registertype = ? AND `group` = ? AND name = ? AND value = ?;"; - - await using var command = new MySqlCommand(_compareAndSwapUpdateSql, conn) - { - Parameters = - { - new() {Value = newValue}, - new() {Value = (int) registerType}, - new() {Value = group}, - new() {Value = name}, - new() {Value = expectedValue}, - } - }; - - var affectedRows = await command.ExecuteNonQueryAsync(); - return affectedRows > 0; - } - else - { - _compareAndSwapUpsertSql ??= @$" - START TRANSACTION; - DELETE FROM {_tablePrefix}_register WHERE registertype = ? AND `group` = ? AND name = ? AND value = ?; - INSERT IGNORE INTO {_tablePrefix}_register - (registertype, `group`, name, value) - VALUES - (?, ?, ?, ?); - COMMIT;"; - - await using var command = new MySqlCommand(_compareAndSwapUpsertSql, conn) - { - Parameters = - { - new() { Value = (int) registerType}, - new() { Value = group }, - new() { Value = name }, - new() { Value = expectedValue }, - new() { Value = (int) registerType}, - new() { Value = group }, - new() { Value = name }, - new() { Value = newValue } - } - }; - - var affectedRows = await command.ExecuteNonQueryAsync(); - return affectedRows > 0; - } - } - - private string? _getSql; - public async Task Get(RegisterType registerType, string group, string name) - { - await using var conn = await DatabaseHelper.CreateOpenConnection(_connectionString);; - _getSql ??= @$" - SELECT value - FROM {_tablePrefix}_register - WHERE registertype = ? AND `group` = ? AND name = ?;"; - - await using var command = new MySqlCommand(_getSql, conn) - { - Parameters = - { - new() { Value = (int) registerType }, - new() { Value = group }, - new() { Value = name } - } - }; - - await using var reader = await command.ExecuteReaderAsync(); - while (await reader.ReadAsync()) - return reader.GetString(0); - - return default; - } - - private string? _conditionalDeleteSql; - public async Task Delete(RegisterType registerType, string group, string name, string expectedValue) - { - await using var conn = await DatabaseHelper.CreateOpenConnection(_connectionString); - - _conditionalDeleteSql ??= $"DELETE FROM {_tablePrefix}_register WHERE registertype = ? AND `group` = ? AND name = ? AND value = ?;"; - - await using var command = new MySqlCommand(_conditionalDeleteSql, conn) - { - Parameters = - { - new() {Value = (int) registerType}, - new() {Value = group}, - new() {Value = name}, - new() {Value = expectedValue}, - } - }; - - var affectedRows = await command.ExecuteNonQueryAsync(); - return affectedRows > 0; - } - - private string? _deleteSql; - public async Task Delete(RegisterType registerType, string group, string name) - { - await using var conn = await DatabaseHelper.CreateOpenConnection(_connectionString); - - _deleteSql ??= $"DELETE FROM {_tablePrefix}_register WHERE registertype = ? AND `group` = ? AND name = ?;"; - - await using var command = new MySqlCommand(_deleteSql, conn) - { - Parameters = - { - new() {Value = (int) registerType}, - new() {Value = group}, - new() {Value = name} - } - }; - await command.ExecuteScalarAsync(); - } - - private string? _existsSql; - public async Task Exists(RegisterType registerType, string group, string name) - { - await using var conn = await DatabaseHelper.CreateOpenConnection(_connectionString);; - _existsSql ??= @$" - SELECT COUNT(*) - FROM {_tablePrefix}_register - WHERE registertype = ? AND `group` = ? AND name = ?;"; - - await using var command = new MySqlCommand(_existsSql, conn) - { - Parameters = - { - new() {Value = (int) registerType}, - new() {Value = group}, - new() {Value = name} - } - }; - - var count = (long) (await command.ExecuteScalarAsync() ?? 0); - return count > 0; - } - - private string? _truncateTableSql; - public async Task TruncateTable() - { - await using var conn = await DatabaseHelper.CreateOpenConnection(_connectionString); - _truncateTableSql ??= $"TRUNCATE TABLE {_tablePrefix}_register"; - var command = new MySqlCommand(_truncateTableSql, conn); - await command.ExecuteNonQueryAsync(); - } -} \ No newline at end of file diff --git a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/UtilTests/RegisterTests.cs b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/UtilTests/RegisterTests.cs deleted file mode 100644 index a5e254539..000000000 --- a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/UtilTests/RegisterTests.cs +++ /dev/null @@ -1,65 +0,0 @@ -using System.Runtime.CompilerServices; -using System.Threading.Tasks; -using Cleipnir.ResilientFunctions.Utils.Register; -using Microsoft.VisualStudio.TestTools.UnitTesting; - -namespace Cleipnir.ResilientFunctions.PostgreSQL.Tests.UtilTests; - -[TestClass] -public class RegisterTests : Cleipnir.ResilientFunctions.Tests.TestTemplates.UtilsTests.RegisterTests -{ - [TestMethod] - public override Task SetValueWithNoExistingValueSucceeds() - => SetValueWithNoExistingValueSucceeds(CreateAndInitializeRegister()); - - [TestMethod] - public override Task CompareAndSwapWithNoExistingValueSucceeds() - => CompareAndSwapWithNoExistingValueSucceeds(CreateAndInitializeRegister()); - - [TestMethod] - public override Task CompareAndSwapFailsWithNoExistingValue() - => CompareAndSwapFailsWithNoExistingValue(CreateAndInitializeRegister()); - - [TestMethod] - public override Task SetValueIfEmptyFailsWhenRegisterHasExistingValue() - => SetValueIfEmptyFailsWhenRegisterHasExistingValue(CreateAndInitializeRegister()); - - [TestMethod] - public override Task CompareAndSwapSucceedsIfAsExpected() - => CompareAndSwapSucceedsIfAsExpected(CreateAndInitializeRegister()); - - [TestMethod] - public override Task CompareAndSwapSucceedsIfAsExpectedIgnoreIfNoExisting() - => CompareAndSwapSucceedsIfAsExpectedIgnoreIfNoExisting(CreateAndInitializeRegister()); - - [TestMethod] - public override Task ExistsIfFalseForNonExistingRegister() - => ExistsIfFalseForNonExistingRegister(CreateAndInitializeRegister()); - - [TestMethod] - public override Task ExistingValueIsNullForNonExistingRegister() - => ExistingValueIsNullForNonExistingRegister(CreateAndInitializeRegister()); - - [TestMethod] - public override Task DeleteSucceedsForNonExistingRegister() - => DeleteSucceedsForNonExistingRegister(CreateAndInitializeRegister()); - - [TestMethod] - public override Task DeleteSucceedsForExistingRegister() - => DeleteSucceedsForExistingRegister(CreateAndInitializeRegister()); - - [TestMethod] - public override Task DeleteSucceedsWithExpectedValueForExistingRegister() - => DeleteSucceedsWithExpectedValueForExistingRegister(CreateAndInitializeRegister()); - - [TestMethod] - public override Task DeleteFailsWhenNonExpectedValueForExistingRegister() - => DeleteFailsWhenNonExpectedValueForExistingRegister(CreateAndInitializeRegister()); - - private async Task CreateAndInitializeRegister([CallerMemberName] string memberName = "") - { - var underlyingRegister = new PostgresSqlUnderlyingRegister(Sql.ConnectionString, tablePrefix: memberName); - await underlyingRegister.Initialize(); - return new Register(underlyingRegister); - } -} \ No newline at end of file diff --git a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlFunctionStore.cs b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlFunctionStore.cs index 6149fcd48..339f1a333 100644 --- a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlFunctionStore.cs +++ b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlFunctionStore.cs @@ -3,7 +3,6 @@ using System.Linq; using System.Text.Json; using System.Threading.Tasks; -using Cleipnir.ResilientFunctions.CoreRuntime.Invocation; using Cleipnir.ResilientFunctions.Domain; using Cleipnir.ResilientFunctions.Helpers; using Cleipnir.ResilientFunctions.Messaging; @@ -33,9 +32,6 @@ public class PostgreSqlFunctionStore : IFunctionStore private readonly PostgreSqlDbReplicaStore _replicaStore; public IReplicaStore ReplicaStore => _replicaStore; - public Utilities Utilities { get; } - - private readonly PostgresSqlUnderlyingRegister _postgresSqlUnderlyingRegister; private readonly SqlGenerator _sqlGenerator; public PostgreSqlFunctionStore(string connectionString, string tablePrefix = "") @@ -43,15 +39,13 @@ public PostgreSqlFunctionStore(string connectionString, string tablePrefix = "") _tableName = tablePrefix == "" ? "rfunctions" : tablePrefix; _connectionString = connectionString; _sqlGenerator = new SqlGenerator(_tableName); - + _messageStore = new PostgreSqlMessageStore(connectionString, _sqlGenerator, _tableName); _effectsStore = new PostgreSqlEffectsStore(connectionString, _tableName); _correlationStore = new PostgreSqlCorrelationStore(connectionString, _tableName); _typeStore = new PostgreSqlTypeStore(connectionString, _tableName); - _postgresSqlUnderlyingRegister = new PostgresSqlUnderlyingRegister(connectionString, _tableName); _replicaStore = new PostgreSqlDbReplicaStore(connectionString, _tableName); - Utilities = new Utilities(_postgresSqlUnderlyingRegister); - } + } private async Task CreateConnection() { @@ -65,8 +59,7 @@ public async Task Initialize() { if (await DoTablesAlreadyExist()) return; - - await _postgresSqlUnderlyingRegister.Initialize(); + await _messageStore.Initialize(); await _effectsStore.Initialize(); await _correlationStore.Initialize(); @@ -104,7 +97,6 @@ CREATE INDEX IF NOT EXISTS idx_{_tableName}_interrupted public async Task TruncateTables() { await _messageStore.TruncateTable(); - await _postgresSqlUnderlyingRegister.TruncateTable(); await _effectsStore.Truncate(); await _correlationStore.Truncate(); await _typeStore.Truncate(); diff --git a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgresSqlUnderlyingRegister.cs b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgresSqlUnderlyingRegister.cs deleted file mode 100644 index 97146e91a..000000000 --- a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgresSqlUnderlyingRegister.cs +++ /dev/null @@ -1,242 +0,0 @@ -using System.Threading.Tasks; -using Cleipnir.ResilientFunctions.Utils; -using Npgsql; - -namespace Cleipnir.ResilientFunctions.PostgreSQL; - -public class PostgresSqlUnderlyingRegister(string connectionString, string tablePrefix = "") : IUnderlyingRegister -{ - private string? _initializeSql; - public async Task Initialize() - { - await using var conn = new NpgsqlConnection(connectionString); - await conn.OpenAsync(); - _initializeSql ??= @$" - CREATE TABLE IF NOT EXISTS {tablePrefix}_register ( - registertype INT NOT NULL, - groupname VARCHAR(255) NOT NULL, - name VARCHAR(255) NOT NULL, - value VARCHAR(255) NOT NULL, - PRIMARY KEY (registertype, groupname, name) - );"; - await using var command = new NpgsqlCommand(_initializeSql, conn); - await command.ExecuteNonQueryAsync(); - } - - private string? _setIfEmptySql; - public async Task SetIfEmpty(RegisterType registerType, string group, string name, string value) - { - await using var conn = new NpgsqlConnection(connectionString); - await conn.OpenAsync(); - - _setIfEmptySql ??= @$" - INSERT INTO {tablePrefix}_register - (registertype, groupname, name, value) - VALUES - ($1, $2, $3, $4) - ON CONFLICT DO NOTHING"; - - await using var command = new NpgsqlCommand(_setIfEmptySql, conn) - { - Parameters = - { - new() {Value = (int) registerType }, - new() {Value = group}, - new() {Value = name}, - new() {Value = value} - } - }; - - var affectedRows = await command.ExecuteNonQueryAsync(); - return affectedRows > 0; - } - - private string? _compareAndSwapUpdateSql; - private string? _compareAndSwapDeleteExistingSql; - private string? _compareAndSwapInsertSql; - public async Task CompareAndSwap(RegisterType registerType, string group, string name, string newValue, string expectedValue, bool setIfEmpty = true) - { - await using var conn = new NpgsqlConnection(connectionString); - await conn.OpenAsync(); - - if (!setIfEmpty) - { - //as setIfEmpty is false then only update if expected value is found - _compareAndSwapUpdateSql ??= @$" - UPDATE {tablePrefix}_register - SET value = $1 - WHERE registertype = $2 AND groupname = $3 AND name = $4 AND value = $5"; - - await using var command = new NpgsqlCommand(_compareAndSwapUpdateSql, conn) - { - Parameters = - { - new() {Value = newValue}, - new() {Value = (int) registerType}, - new() {Value = group}, - new() {Value = name}, - new() {Value = expectedValue}, - } - }; - - var affectedRows = await command.ExecuteNonQueryAsync(); - return affectedRows > 0; - } - else - { - //setIfEmpty is true - await using var batch = new NpgsqlBatch(conn); - { - _compareAndSwapDeleteExistingSql ??= @$" - DELETE FROM {tablePrefix}_register - WHERE registertype = $1 AND groupname = $2 AND name = $3 AND value = $4"; - var command = - new NpgsqlBatchCommand(_compareAndSwapDeleteExistingSql) - { - Parameters = - { - new() { Value = (int) registerType }, - new() { Value = group }, - new() { Value = name }, - new() { Value = expectedValue }, - } - }; - batch.BatchCommands.Add(command); - } - { - _compareAndSwapInsertSql ??= @$" - INSERT INTO {tablePrefix}_register - (registertype, groupname, name, value) - VALUES - ($1, $2, $3, $4) - ON CONFLICT DO NOTHING"; - - var command = new NpgsqlBatchCommand(_compareAndSwapInsertSql) - { - Parameters = - { - new() { Value = (int) registerType }, - new() { Value = group }, - new() { Value = name }, - new() { Value = newValue } - } - }; - - batch.BatchCommands.Add(command); - } - - var affectedRows = await batch.ExecuteNonQueryAsync(); - return affectedRows > 0; - } - } - - private string? _getSql; - public async Task Get(RegisterType registerType, string group, string key) - { - await using var conn = new NpgsqlConnection(connectionString); - await conn.OpenAsync(); - - _getSql ??= @$" - SELECT value - FROM {tablePrefix}_register - WHERE registertype = $1 AND groupname = $2 AND name = $3"; - await using var command = new NpgsqlCommand(_getSql, conn) - { - Parameters = - { - new() {Value = (int) registerType}, - new() {Value = group}, - new() {Value = key} - } - }; - - await using var reader = await command.ExecuteReaderAsync(); - while (await reader.ReadAsync()) - return reader.GetString(0); - - return default; - } - - private string? _deleteExpectedValueSql; - public async Task Delete(RegisterType registerType, string group, string name, string expectedValue) - { - await using var conn = new NpgsqlConnection(connectionString); - await conn.OpenAsync(); - - _deleteExpectedValueSql ??= @$" - DELETE FROM {tablePrefix}_register - WHERE registertype = $1 AND groupname = $2 AND name = $3 AND value = $4"; - - await using var command = new NpgsqlCommand(_deleteExpectedValueSql, conn) - { - Parameters = - { - new() { Value = (int) registerType}, - new() { Value = group }, - new() { Value = name }, - new() { Value = expectedValue }, - } - }; - - var affectedRows = await command.ExecuteNonQueryAsync(); - return affectedRows > 0; - } - - private string? _deleteSql; - public async Task Delete(RegisterType registerType, string group, string name) - { - await using var conn = new NpgsqlConnection(connectionString); - await conn.OpenAsync(); - - _deleteSql ??= @$" - DELETE FROM {tablePrefix}_register - WHERE registertype = $1 AND groupname = $2 AND name = $3"; - - await using var command = new NpgsqlCommand(_deleteSql, conn) - { - Parameters = - { - new() { Value = (int) registerType }, - new() { Value = group }, - new() { Value = name }, - } - }; - - await command.ExecuteNonQueryAsync(); - } - - private string? _existsSql; - public async Task Exists(RegisterType registerType, string group, string name) - { - await using var conn = new NpgsqlConnection(connectionString); - await conn.OpenAsync(); - - _existsSql ??= @$" - SELECT COUNT(*) - FROM {tablePrefix}_register - WHERE registertype = $1 AND groupname = $2 AND name = $3"; - await using var command = new NpgsqlCommand(_existsSql, conn) - { - Parameters = - { - new() {Value = (int) registerType}, - new() {Value = group}, - new() {Value = name} - } - }; - - var count = (long?) await command.ExecuteScalarAsync(); - return count > 0; - } - - private string? _truncateTableSql; - public async Task TruncateTable() - { - await using var conn = new NpgsqlConnection(connectionString); - await conn.OpenAsync(); - - _truncateTableSql ??= $"TRUNCATE TABLE {tablePrefix}_register"; - var command = new NpgsqlCommand(_truncateTableSql, conn); - await command.ExecuteNonQueryAsync(); - } -} \ No newline at end of file diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/UtilTests/RegisterTests.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/UtilTests/RegisterTests.cs deleted file mode 100644 index c3b37d11b..000000000 --- a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/UtilTests/RegisterTests.cs +++ /dev/null @@ -1,65 +0,0 @@ -using System.Runtime.CompilerServices; -using System.Threading.Tasks; -using Cleipnir.ResilientFunctions.Utils.Register; -using Microsoft.VisualStudio.TestTools.UnitTesting; - -namespace Cleipnir.ResilientFunctions.SqlServer.Tests.UtilTests; - -[TestClass] -public class RegisterTests : Cleipnir.ResilientFunctions.Tests.TestTemplates.UtilsTests.RegisterTests -{ - [TestMethod] - public override Task SetValueWithNoExistingValueSucceeds() - => SetValueWithNoExistingValueSucceeds(CreateAndInitializeRegister()); - - [TestMethod] - public override Task CompareAndSwapWithNoExistingValueSucceeds() - => CompareAndSwapWithNoExistingValueSucceeds(CreateAndInitializeRegister()); - - [TestMethod] - public override Task CompareAndSwapFailsWithNoExistingValue() - => CompareAndSwapFailsWithNoExistingValue(CreateAndInitializeRegister()); - - [TestMethod] - public override Task SetValueIfEmptyFailsWhenRegisterHasExistingValue() - => SetValueIfEmptyFailsWhenRegisterHasExistingValue(CreateAndInitializeRegister()); - - [TestMethod] - public override Task CompareAndSwapSucceedsIfAsExpected() - => CompareAndSwapSucceedsIfAsExpected(CreateAndInitializeRegister()); - - [TestMethod] - public override Task CompareAndSwapSucceedsIfAsExpectedIgnoreIfNoExisting() - => CompareAndSwapSucceedsIfAsExpectedIgnoreIfNoExisting(CreateAndInitializeRegister()); - - [TestMethod] - public override Task ExistsIfFalseForNonExistingRegister() - => ExistsIfFalseForNonExistingRegister(CreateAndInitializeRegister()); - - [TestMethod] - public override Task ExistingValueIsNullForNonExistingRegister() - => ExistingValueIsNullForNonExistingRegister(CreateAndInitializeRegister()); - - [TestMethod] - public override Task DeleteSucceedsForNonExistingRegister() - => DeleteSucceedsForNonExistingRegister(CreateAndInitializeRegister()); - - [TestMethod] - public override Task DeleteSucceedsForExistingRegister() - => DeleteSucceedsForExistingRegister(CreateAndInitializeRegister()); - - [TestMethod] - public override Task DeleteSucceedsWithExpectedValueForExistingRegister() - => DeleteSucceedsWithExpectedValueForExistingRegister(CreateAndInitializeRegister()); - - [TestMethod] - public override Task DeleteFailsWhenNonExpectedValueForExistingRegister() - => DeleteFailsWhenNonExpectedValueForExistingRegister(CreateAndInitializeRegister()); - - private async Task CreateAndInitializeRegister([CallerMemberName] string memberName = "") - { - var underlyingRegister = new SqlServerUnderlyingRegister(Sql.ConnectionString, tablePrefix: memberName); - await underlyingRegister.Initialize(); - return new Register(underlyingRegister); - } -} \ No newline at end of file diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerFunctionStore.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerFunctionStore.cs index 2c817b6d4..a285fa500 100644 --- a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerFunctionStore.cs +++ b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerFunctionStore.cs @@ -6,7 +6,6 @@ using System.Runtime.Serialization; using System.Text.Json; using System.Threading.Tasks; -using Cleipnir.ResilientFunctions.CoreRuntime.Invocation; using Cleipnir.ResilientFunctions.Domain; using Cleipnir.ResilientFunctions.Helpers; using Cleipnir.ResilientFunctions.Messaging; @@ -32,12 +31,9 @@ public class SqlServerFunctionStore : IFunctionStore public ICorrelationStore CorrelationStore => _correlationStore; public ITypeStore TypeStore => _typeStore; public IMessageStore MessageStore => _messageStore; - public Utilities Utilities { get; } private readonly SqlServerReplicaStore _replicaStore; public IReplicaStore ReplicaStore => _replicaStore; - private readonly SqlServerUnderlyingRegister _underlyingRegister; - private readonly SqlGenerator _sqlGenerator; public SqlServerFunctionStore(string connectionString, string tablePrefix = "") @@ -45,15 +41,13 @@ public SqlServerFunctionStore(string connectionString, string tablePrefix = "") _tableName = tablePrefix == "" ? "RFunctions" : tablePrefix; _connectionString = connectionString; _sqlGenerator = new SqlGenerator(_tableName); - + _connFunc = CreateConnection(connectionString); _messageStore = new SqlServerMessageStore(connectionString, _sqlGenerator, _tableName); - _underlyingRegister = new SqlServerUnderlyingRegister(connectionString, _tableName); _effectsStore = new SqlServerEffectsStore(connectionString, _tableName); _correlationStore = new SqlServerCorrelationsStore(connectionString, _tableName); _typeStore = new SqlServerTypeStore(connectionString, _tableName); _replicaStore = new SqlServerReplicaStore(connectionString, _tableName); - Utilities = new Utilities(_underlyingRegister); } private static Func> CreateConnection(string connectionString) @@ -71,8 +65,7 @@ public async Task Initialize() { if (await DoTablesAlreadyExist()) return; - - await _underlyingRegister.Initialize(); + await _messageStore.Initialize(); await _effectsStore.Initialize(); await _correlationStore.Initialize(); @@ -122,7 +115,6 @@ CREATE INDEX {_tableName}_idx_Interrupted private string? _truncateSql; public async Task TruncateTables() { - await _underlyingRegister.TruncateTable(); await _messageStore.TruncateTable(); await _effectsStore.Truncate(); await _correlationStore.Truncate(); diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerUnderlyingRegister.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerUnderlyingRegister.cs deleted file mode 100644 index 103db0370..000000000 --- a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerUnderlyingRegister.cs +++ /dev/null @@ -1,247 +0,0 @@ -using System.Threading.Tasks; -using Cleipnir.ResilientFunctions.Utils; -using Microsoft.Data.SqlClient; - -namespace Cleipnir.ResilientFunctions.SqlServer; - -public class SqlServerUnderlyingRegister : IUnderlyingRegister -{ - private readonly string _connectionString; - private readonly string _tablePrefix; - - public SqlServerUnderlyingRegister(string connectionString, string tablePrefix = "") - { - _connectionString = connectionString; - _tablePrefix = tablePrefix; - } - - private string? _initializeSql; - public async Task Initialize() - { - await using var conn = new SqlConnection(_connectionString); - await conn.OpenAsync(); - try - { - _initializeSql ??= @$" - CREATE TABLE {_tablePrefix}_Register ( - RegisterType INT NOT NULL, - [Group] VARCHAR(255) NOT NULL, - Name VARCHAR(255) NOT NULL, - Value VARCHAR(255) NOT NULL, - PRIMARY KEY (RegisterType, [Group], Name) - );"; - await using var command = new SqlCommand(_initializeSql, conn); - await command.ExecuteNonQueryAsync(); - } - catch (SqlException e) - { - if (e.Number != SqlError.TABLE_ALREADY_EXISTS) - throw; - } - } - - private string? _setIfEmptySql; - public async Task SetIfEmpty(RegisterType registerType, string group, string name, string value) - { - await using var conn = new SqlConnection(_connectionString); - await conn.OpenAsync(); - - _setIfEmptySql ??= @$" - INSERT INTO {_tablePrefix}_Register - (RegisterType, [Group], Name, Value) - VALUES - (@RegisterType, @Group, @Name, @Value)"; - - await using var command = new SqlCommand(_setIfEmptySql, conn) - { - Parameters = - { - new() { ParameterName = "@RegisterType", Value = (int) registerType }, - new() { ParameterName = "@Group", Value = group }, - new() { ParameterName = "@Name", Value = name }, - new() { ParameterName = "@Value", Value = value } - } - }; - - try - { - await command.ExecuteNonQueryAsync(); - return true; - } catch (SqlException sqlException) when (sqlException.Number == SqlError.UNIQUENESS_VIOLATION) - { - return false; - } - } - - private string? _compareAndSwapNonEmptySql; - private string? _compareAndSwapEmptySql; - public async Task CompareAndSwap(RegisterType registerType, string group, string name, string newValue, string expectedValue, bool setIfEmpty = true) - { - await using var conn = new SqlConnection(_connectionString); - await conn.OpenAsync(); - - if (!setIfEmpty) - { - //as setIfEmpty is false then only update if expected value is found - _compareAndSwapNonEmptySql ??= @$" - UPDATE {_tablePrefix}_Register - SET Value = @NewValue - WHERE RegisterType = @RegisterType AND [Group] = @Group AND Name = @Name AND Value = @ExpectedValue"; - - await using var command = new SqlCommand(_compareAndSwapNonEmptySql, conn) - { - Parameters = - { - new() { ParameterName = "@RegisterType", Value = (int) registerType }, - new() { ParameterName = "@Group", Value = group }, - new() { ParameterName = "@Name", Value = name }, - new() { ParameterName = "@NewValue", Value = newValue }, - new() { ParameterName = "@ExpectedValue", Value = expectedValue } - } - }; - - var affectedRows = await command.ExecuteNonQueryAsync(); - return affectedRows > 0; - } else - { - //setIfEmpty is true - _compareAndSwapEmptySql ??= @$" - BEGIN TRANSACTION; - DELETE FROM {_tablePrefix}_Register WHERE RegisterType = @RegisterType AND [Group] = @Group AND Name = @Name AND Value = @ExpectedValue; - INSERT INTO {_tablePrefix}_Register (RegisterType, [Group], Name, Value) - VALUES (@RegisterType, @Group, @Name, @NewValue); - COMMIT TRANSACTION;"; - - await using var command = new SqlCommand(_compareAndSwapEmptySql, conn) - { - Parameters = - { - new() { ParameterName = "@RegisterType", Value = (int) registerType }, - new() { ParameterName = "@Group", Value = group }, - new() { ParameterName = "@Name", Value = name }, - new() { ParameterName = "@ExpectedValue", Value = expectedValue }, - new() { ParameterName = "@NewValue", Value = newValue }, - } - }; - - try - { - var affectedRows = await command.ExecuteNonQueryAsync(); - return affectedRows > 0; - } catch (SqlException sqlException) when (sqlException.Number == SqlError.UNIQUENESS_VIOLATION) - { - return false; - } - } - } - - private string? _getSql; - public async Task Get(RegisterType registerType, string group, string name) - { - await using var conn = new SqlConnection(_connectionString); - await conn.OpenAsync(); - - _getSql ??= @$" - SELECT Value - FROM {_tablePrefix}_Register - WHERE RegisterType = @RegisterType AND [Group] = @Group AND Name = @Name"; - await using var command = new SqlCommand(_getSql, conn) - { - Parameters = - { - new() { ParameterName = "@RegisterType", Value = (int) registerType }, - new() { ParameterName = "@Group", Value = group }, - new() { ParameterName = "@Name", Value = name } - } - }; - - await using var reader = await command.ExecuteReaderAsync(); - while (await reader.ReadAsync()) - return reader.GetString(0); - - return default; - } - - private string? _deleteExpectedValueSql; - public async Task Delete(RegisterType registerType, string group, string name, string expectedValue) - { - await using var conn = new SqlConnection(_connectionString); - await conn.OpenAsync(); - - _deleteExpectedValueSql ??= @$" - DELETE FROM {_tablePrefix}_Register - WHERE RegisterType = @RegisterType AND [Group] = @Group AND Name = @Name AND Value = @Value"; - - await using var command = new SqlCommand(_deleteExpectedValueSql, conn) - { - Parameters = - { - new() { ParameterName = "@RegisterType", Value = (int) registerType }, - new() { ParameterName = "@Group", Value = group }, - new() { ParameterName = "@Name", Value = name }, - new() { ParameterName = "@Value", Value = expectedValue }, - } - }; - - var affectedRows = await command.ExecuteNonQueryAsync(); - return affectedRows > 0; - } - - private string? _deleteSql; - public async Task Delete(RegisterType registerType, string group, string name) - { - await using var conn = new SqlConnection(_connectionString); - await conn.OpenAsync(); - - _deleteSql ??= @$" - DELETE FROM {_tablePrefix}_Register - WHERE RegisterType = @RegisterType AND [Group] = @Group AND Name = @Name"; - - await using var command = new SqlCommand(_deleteSql, conn) - { - Parameters = - { - new() { ParameterName = "@RegisterType", Value = (int) registerType }, - new() { ParameterName = "@Group", Value = group }, - new() { ParameterName = "@Name", Value = name } - } - }; - - await command.ExecuteNonQueryAsync(); - } - - private string? _existsSql; - public async Task Exists(RegisterType registerType, string group, string name) - { - await using var conn = new SqlConnection(_connectionString); - await conn.OpenAsync(); - - _existsSql ??= @$" - SELECT COUNT(*) - FROM {_tablePrefix}_Register - WHERE RegisterType = @RegisterType AND [Group] = @Group AND Name = @Name"; - await using var command = new SqlCommand(_existsSql, conn) - { - Parameters = - { - new() { ParameterName = "@RegisterType", Value = (int) registerType }, - new() { ParameterName = "@Group", Value = group }, - new() { ParameterName = "@Name", Value = name } - } - }; - - var count = (int?) await command.ExecuteScalarAsync(); - return count > 0; - } - - private string? _truncateTableSql; - public async Task TruncateTable() - { - await using var conn = new SqlConnection(_connectionString); - await conn.OpenAsync(); - - _truncateTableSql ??= $"TRUNCATE TABLE {_tablePrefix}_Register"; - await using var command = new SqlCommand(_truncateTableSql, conn); - await command.ExecuteNonQueryAsync(); - } -} \ No newline at end of file From 15fe066e6b6ba9e246487e02d6d160e41117e302 Mon Sep 17 00:00:00 2001 From: stidsborg Date: Sun, 26 Apr 2026 11:35:13 +0200 Subject: [PATCH 2/2] Remove Correlations and ICorrelationStore subsystem --- .../InMemoryTests/CorrelationStoreTests.cs | 32 --- .../RFunctionTests/ControlPanelTests.cs | 4 - .../RFunctionTests/RoutingTests.cs | 8 - .../TestTemplates/CorrelationStoreTests.cs | 115 ---------- .../FunctionTests/ControlPanelTests.cs | 49 +---- .../FunctionTests/RoutingTests.cs | 111 ---------- .../TestTemplates/StoreCrudTests.cs | 4 +- .../WatchDogsTests/CrashableFunctionStore.cs | 1 - .../BaseRegistration.cs | 3 - .../Invocation/InvocationHelper.cs | 6 - .../CoreRuntime/Invocation/Invoker.cs | 6 +- .../CoreRuntime/Invocation/Workflow.cs | 6 +- .../Domain/ControlPanel.cs | 15 +- .../Domain/ControlPanelFactory.cs | 3 - .../Domain/Correlations.cs | 64 ------ .../FunctionsRegistry.cs | 18 +- .../Messaging/Postman.cs | 15 +- .../Storage/ICorrelationStore.cs | 16 -- .../Storage/IFunctionStore.cs | 1 - .../Storage/InMemoryCorrelationStore.cs | 112 ---------- .../Storage/InMemoryFunctionStore.cs | 5 +- .../Utils/CrashableFunctionStore.cs | 1 - .../CorrelationStoreTests.cs | 31 --- .../RFunctionTests/ControlPanelTests.cs | 4 - .../RFunctionTests/RoutingTests.cs | 8 - .../MariaDbCorrelationStore.cs | 193 ----------------- .../MariaDbFunctionStore.cs | 7 - .../CorrelationStoreTests.cs | 32 --- .../RFunctionTests/ControlPanelTests.cs | 6 +- .../RFunctionTests/RoutingTests.cs | 8 - .../PostgreSqlCorrelationStore.cs | 196 ------------------ .../PostgreSqlFunctionStore.cs | 7 - .../CorrelationStoreTests.cs | 32 --- .../RFunctionTests/ControlPanelTests.cs | 4 - .../RFunctionTests/RoutingTests.cs | 9 - .../SqlServerCorrelationsStore.cs | 179 ---------------- .../SqlServerFunctionStore.cs | 8 +- 37 files changed, 21 insertions(+), 1298 deletions(-) delete mode 100644 Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/CorrelationStoreTests.cs delete mode 100644 Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/CorrelationStoreTests.cs delete mode 100644 Core/Cleipnir.ResilientFunctions/Domain/Correlations.cs delete mode 100644 Core/Cleipnir.ResilientFunctions/Storage/ICorrelationStore.cs delete mode 100644 Core/Cleipnir.ResilientFunctions/Storage/InMemoryCorrelationStore.cs delete mode 100644 Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/CorrelationStoreTests.cs delete mode 100644 Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbCorrelationStore.cs delete mode 100644 Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/CorrelationStoreTests.cs delete mode 100644 Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlCorrelationStore.cs delete mode 100644 Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/CorrelationStoreTests.cs delete mode 100644 Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerCorrelationsStore.cs diff --git a/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/CorrelationStoreTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/CorrelationStoreTests.cs deleted file mode 100644 index 6d7db7aad..000000000 --- a/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/CorrelationStoreTests.cs +++ /dev/null @@ -1,32 +0,0 @@ -using System.Threading.Tasks; -using Microsoft.VisualStudio.TestTools.UnitTesting; - -namespace Cleipnir.ResilientFunctions.Tests.InMemoryTests; - -[TestClass] -public class CorrelationStoreTests : Cleipnir.ResilientFunctions.Tests.TestTemplates.CorrelationStoreTests -{ - [TestMethod] - public override Task SunshineScenario() - => SunshineScenario(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task TwoDifferentFunctionsCanUseTheSameCorrelationId() - => TwoDifferentFunctionsCanUseTheSameCorrelationId(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task FunctionCorrelationsCanBeDeleted() - => FunctionCorrelationsCanBeDeleted(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task SingleFunctionCorrelationCanBeDeleted() - => SingleFunctionCorrelationCanBeDeleted(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task SingleFunctionCanHaveMultipleCorrelations() - => SingleFunctionCanHaveMultipleCorrelations(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task FunctionInstancesCanBeFetchedForFunctionTypeAndCorrelation() - => FunctionInstancesCanBeFetchedForFunctionTypeAndCorrelation(FunctionStoreFactory.Create()); -} \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/RFunctionTests/ControlPanelTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/RFunctionTests/ControlPanelTests.cs index 80dba307d..292389a8d 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/RFunctionTests/ControlPanelTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/RFunctionTests/ControlPanelTests.cs @@ -138,10 +138,6 @@ public override Task ExistingEffectCanBeSetToFailed() public override Task SaveChangesPersistsChangedResult() => SaveChangesPersistsChangedResult(Utils.CreateInMemoryFunctionStoreTask()); - [TestMethod] - public override Task CorrelationsCanBeChanged() - => CorrelationsCanBeChanged(FunctionStoreFactory.Create()); - [TestMethod] public override Task DeleteRemovesFunctionFromAllStores() => DeleteRemovesFunctionFromAllStores(FunctionStoreFactory.Create()); diff --git a/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/RFunctionTests/RoutingTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/RFunctionTests/RoutingTests.cs index cbdfa76d8..c9770a5e5 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/RFunctionTests/RoutingTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/RFunctionTests/RoutingTests.cs @@ -22,14 +22,6 @@ public override Task MessageIsRoutedToFuncInstance() public override Task MessageIsRoutedUsingRoutingInfo() => MessageIsRoutedUsingRoutingInfo(FunctionStoreFactory.Create()); - [TestMethod] - public override Task MessageIsRoutedToParamlessInstanceUsingCorrelationId() - => MessageIsRoutedToParamlessInstanceUsingCorrelationId(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task MessageIsRoutedToMultipleInstancesUsingCorrelationId() - => MessageIsRoutedToMultipleInstancesUsingCorrelationId(FunctionStoreFactory.Create()); - [TestMethod] public override Task ParamlessInstanceIsStartedByMessage() => ParamlessInstanceIsStartedByMessage(FunctionStoreFactory.Create()); diff --git a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/CorrelationStoreTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/CorrelationStoreTests.cs deleted file mode 100644 index 6955419e4..000000000 --- a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/CorrelationStoreTests.cs +++ /dev/null @@ -1,115 +0,0 @@ -using System.Linq; -using System.Threading.Tasks; -using Cleipnir.ResilientFunctions.Helpers; -using Cleipnir.ResilientFunctions.Storage; -using Cleipnir.ResilientFunctions.Tests.Utils; -using Shouldly; - -namespace Cleipnir.ResilientFunctions.Tests.TestTemplates; - -public abstract class CorrelationStoreTests -{ - public abstract Task SunshineScenario(); - public async Task SunshineScenario(Task storeTask) - { - var correlationStore = await storeTask.SelectAsync(s => s.CorrelationStore); - var storedId = TestStoredId.Create(); - - await correlationStore.SetCorrelation(storedId, "SomeCorrelationId"); - await correlationStore - .GetCorrelations(storedId.Type, correlationId: "SomeCorrelationId") - .SelectAsync(c => c.Single()) - .ShouldBeAsync(storedId); - - await correlationStore - .GetCorrelations(storedId) - .SelectAsync(c => c.Single()) - .ShouldBeAsync("SomeCorrelationId"); - } - - public abstract Task TwoDifferentFunctionsCanUseTheSameCorrelationId(); - public async Task TwoDifferentFunctionsCanUseTheSameCorrelationId(Task storeTask) - { - var correlationStore = await storeTask.SelectAsync(s => s.CorrelationStore); - var storedId1 = TestStoredId.Create(); - var storedId2 = TestStoredId.Create(); - - await correlationStore.SetCorrelation(storedId1, correlationId: "TwoDifferentFunctionsCanUseTheSameCorrelationId"); - await correlationStore.SetCorrelation(storedId2, correlationId: "TwoDifferentFunctionsCanUseTheSameCorrelationId"); - - var instances = await correlationStore - .GetCorrelations(storedId1.Type, correlationId: "TwoDifferentFunctionsCanUseTheSameCorrelationId"); - instances.Count.ShouldBe(1); - instances.Single().ShouldBe(storedId1); - - instances = await correlationStore - .GetCorrelations(storedId2.Type, correlationId: "TwoDifferentFunctionsCanUseTheSameCorrelationId"); - instances.Count.ShouldBe(1); - instances.Single().ShouldBe(storedId2); - } - - public abstract Task FunctionCorrelationsCanBeDeleted(); - public async Task FunctionCorrelationsCanBeDeleted(Task storeTask) - { - var correlationStore = await storeTask.SelectAsync(s => s.CorrelationStore); - var functionId = TestStoredId.Create(); - - await correlationStore.SetCorrelation(functionId, "SomeCorrelationId1"); - await correlationStore.SetCorrelation(functionId, "SomeCorrelationId2"); - - await correlationStore.RemoveCorrelations(functionId); - - (await correlationStore.GetCorrelations(functionId)).ShouldBeEmpty(); - } - - public abstract Task SingleFunctionCorrelationCanBeDeleted(); - public async Task SingleFunctionCorrelationCanBeDeleted(Task storeTask) - { - var correlationStore = await storeTask.SelectAsync(s => s.CorrelationStore); - var functionId = TestStoredId.Create(); - - await correlationStore.SetCorrelation(functionId, "SomeCorrelationId1"); - await correlationStore.SetCorrelation(functionId, "SomeCorrelationId2"); - - await correlationStore.RemoveCorrelation(functionId, "SomeCorrelationId1"); - - await correlationStore - .GetCorrelations(functionId) - .SelectAsync(c => c.Single()) - .ShouldBeAsync("SomeCorrelationId2"); - } - - public abstract Task SingleFunctionCanHaveMultipleCorrelations(); - public async Task SingleFunctionCanHaveMultipleCorrelations(Task storeTask) - { - var correlationStore = await storeTask.SelectAsync(s => s.CorrelationStore); - var functionId = TestStoredId.Create(); - - await correlationStore.SetCorrelation(functionId, "SomeCorrelationId1"); - await correlationStore.SetCorrelation(functionId, "SomeCorrelationId2"); - - var correlations = await correlationStore.GetCorrelations(functionId); - correlations.Count.ShouldBe(2); - correlations.Any(c => c == "SomeCorrelationId1").ShouldBeTrue(); - correlations.Any(c => c == "SomeCorrelationId2").ShouldBeTrue(); - } - - public abstract Task FunctionInstancesCanBeFetchedForFunctionTypeAndCorrelation(); - public async Task FunctionInstancesCanBeFetchedForFunctionTypeAndCorrelation(Task storeTask) - { - var correlationStore = await storeTask.SelectAsync(s => s.CorrelationStore); - var storedId1 = TestStoredId.Create(); - var storedId2 = TestStoredId.Create(storedId1.Type); - var storedId3 = TestStoredId.Create(); - - await correlationStore.SetCorrelation(storedId1, "SomeCorrelationId1"); - await correlationStore.SetCorrelation(storedId2, "SomeCorrelationId1"); - await correlationStore.SetCorrelation(storedId3, "SomeCorrelationId1"); - - var instances = await correlationStore.GetCorrelations(storedId1.Type, "SomeCorrelationId1"); - instances.Count.ShouldBe(2); - instances.Any(i => i == storedId1).ShouldBeTrue(); - instances.Any(i => i == storedId2).ShouldBeTrue(); - } -} - diff --git a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/ControlPanelTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/ControlPanelTests.cs index c1540c03c..b7e556650 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/ControlPanelTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/ControlPanelTests.cs @@ -1225,52 +1225,16 @@ protected async Task SaveChangesPersistsChangedResult(Task store unhandledExceptionCatcher.ShouldNotHaveExceptions(); } - public abstract Task CorrelationsCanBeChanged(); - protected async Task CorrelationsCanBeChanged(Task storeTask) - { - var unhandledExceptionCatcher = new UnhandledExceptionCatcher(); - - var store = await storeTask; - var functionId = TestFlowId.Create(); - var (flowType, flowInstance) = functionId; - using var functionsRegistry = new FunctionsRegistry(store, new Settings(unhandledExceptionCatcher.Catch)); - - var registration = functionsRegistry.RegisterParamless( - flowType, - async workflow => - { - await workflow.Correlations.Register("SomeCorrelation"); - - } - ); - - await registration.Run(flowInstance.Value); - - var controlPanel = await registration.ControlPanel(flowInstance.Value); - controlPanel.ShouldNotBeNull(); - - await controlPanel.Correlations.Contains("SomeCorrelation").ShouldBeTrueAsync(); - await controlPanel.Correlations.Remove("SomeCorrelation"); - await controlPanel.Correlations.Register("SomeNewCorrelation"); - - controlPanel = await registration.ControlPanel(flowInstance.Value); - controlPanel.ShouldNotBeNull(); - await controlPanel.Correlations.Contains("SomeCorrelation").ShouldBeFalseAsync(); - await controlPanel.Correlations.Contains("SomeNewCorrelation").ShouldBeTrueAsync(); - - unhandledExceptionCatcher.ShouldNotHaveExceptions(); - } - public abstract Task DeleteRemovesFunctionFromAllStores(); protected async Task DeleteRemovesFunctionFromAllStores(Task storeTask) { var unhandledExceptionCatcher = new UnhandledExceptionCatcher(); - + var store = await storeTask; var functionId = TestFlowId.Create(); var (flowType, flowInstance) = functionId; using var functionsRegistry = new FunctionsRegistry(store, new Settings(unhandledExceptionCatcher.Catch)); - + var registration = functionsRegistry.RegisterParamless( flowType, inner: () => Task.CompletedTask @@ -1281,7 +1245,6 @@ protected async Task DeleteRemovesFunctionFromAllStores(Task sto var controlPanel = await registration.ControlPanel(flowInstance.Value); controlPanel.ShouldNotBeNull(); - await controlPanel.Correlations.Register("SomeCorrelation"); await controlPanel.Effects.SetSucceeded("SomeEffect".GetHashCode()); await controlPanel.Messages.Append("Some Message"); @@ -1289,20 +1252,16 @@ protected async Task DeleteRemovesFunctionFromAllStores(Task sto var storedId = registration.MapToStoredId(functionId.Instance); await store.GetFunction(storedId).ShouldBeNullAsync(); - + await store.MessageStore.GetMessages(storedId) .SelectAsync(msgs => msgs.Count == 0) .ShouldBeTrueAsync(); - await store.CorrelationStore.GetCorrelations(storedId) - .SelectAsync(c => c.Any()) - .ShouldBeFalseAsync(); - await store.EffectsStore .GetEffectResults(storedId) .SelectAsync(e => e.Any()) .ShouldBeFalseAsync(); - + unhandledExceptionCatcher.ShouldNotHaveExceptions(); } diff --git a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/RoutingTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/RoutingTests.cs index 52f0d7455..3fa2846dc 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/RoutingTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/RoutingTests.cs @@ -167,116 +167,6 @@ await registration.SendMessage( unhandledExceptionCatcher.ShouldNotHaveExceptions(); } - #region Route using Correlation - - public abstract Task MessageIsRoutedToParamlessInstanceUsingCorrelationId(); - protected async Task MessageIsRoutedToParamlessInstanceUsingCorrelationId(Task storeTask) - { - var store = await storeTask; - var functionId = TestFlowId.Create(); - var (flowType, flowInstance) = functionId; - - var unhandledExceptionCatcher = new UnhandledExceptionCatcher(); - using var functionsRegistry = new FunctionsRegistry( - store, - new Settings(unhandledExceptionCatcher.Catch) - ); - - var correlationId = $"SomeCorrelationId_{Guid.NewGuid().ToString()}"; - - var correlationIdRegisteredFlag = new SyncedFlag(); - var syncedFlag = new SyncedFlag(); - var syncedValue = new Synced(); - - var registration = functionsRegistry.RegisterParamless( - flowType, - inner: async workflow => - { - await workflow.RegisterCorrelation(correlationId); - correlationIdRegisteredFlag.Raise(); - - var someMessage = await workflow.Message(); - syncedValue.Value = someMessage.Value; - syncedFlag.Raise(); - } - ); - - await registration.Schedule(flowInstance); - await correlationIdRegisteredFlag.WaitForRaised(); - - await registration.RouteMessage( - new SomeCorrelatedMessage(correlationId, "SomeValue!"), - correlationId - ); - - await syncedFlag.WaitForRaised(); - syncedValue.Value.ShouldBe("SomeValue!"); - - unhandledExceptionCatcher.ShouldNotHaveExceptions(); - } - - public abstract Task MessageIsRoutedToMultipleInstancesUsingCorrelationId(); - protected async Task MessageIsRoutedToMultipleInstancesUsingCorrelationId(Task storeTask) - { - var store = await storeTask; - var functionId = TestFlowId.Create(); - var (flowType, flowInstance1) = functionId; - var (_, flowInstance2) = TestFlowId.Create().WithTypeId(flowType); - - var unhandledExceptionCatcher = new UnhandledExceptionCatcher(); - using var functionsRegistry = new FunctionsRegistry( - store, - new Settings(unhandledExceptionCatcher.Catch) - ); - - var correlationId = $"SomeCorrelationId_{Guid.NewGuid().ToString()}"; - - var registration = functionsRegistry.RegisterParamless( - flowType, - inner: async workflow => - { - await workflow.RegisterCorrelation(correlationId); - await workflow.Message(); - } - ); - var storedType = registration.StoredType; - - await registration.Schedule(flowInstance1); - await registration.Schedule(flowInstance2); - - await BusyWait.Until(() => store - .CorrelationStore - .GetCorrelations(storedType, correlationId) - .SelectAsync(l => l.Count == 2) - ); - - await registration.RouteMessage( - new SomeCorrelatedMessage(correlationId, "SomeValue!"), - correlationId - ); - - var controlPanel1 = await registration.ControlPanel(flowInstance1); - controlPanel1.ShouldNotBeNull(); - await BusyWait.Until(async () => - { - await controlPanel1.Refresh(); - return controlPanel1.Status == Status.Succeeded; - }); - - - var controlPanel2 = await registration.ControlPanel(flowInstance2); - controlPanel2.ShouldNotBeNull(); - await BusyWait.Until(async () => - { - await controlPanel2.Refresh(); - return controlPanel2.Status == Status.Succeeded; - }); - - unhandledExceptionCatcher.ShouldNotHaveExceptions(); - } - - #endregion - #region Paramless is started by message public abstract Task ParamlessInstanceIsStartedByMessage(); @@ -319,5 +209,4 @@ await registration.SendMessage( #endregion public record SomeMessage(string RouteTo, string Value); - public record SomeCorrelatedMessage(string Correlation, string Value); } \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/StoreCrudTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/StoreCrudTests.cs index b257b51fb..c89f29c01 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/StoreCrudTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/StoreCrudTests.cs @@ -184,18 +184,16 @@ await store.EffectsStore.SetEffectResult( StoredEffect.CreateCompleted(1.ToEffectId(), "SomeStateJson".ToUtf8Bytes(), alias: null).ToStoredChange(storedId, Insert), session: null ); - await store.CorrelationStore.SetCorrelation(storedId, "SomeCorrelationId"); await store.EffectsStore.SetEffectResult( storedId, new StoredEffect(2.ToEffectId(), WorkStatus.Completed, Result: null, StoredException: null, Alias: null).ToStoredChange(storedId, Insert), session: null ); await store.MessageStore.AppendMessage(storedId, new StoredMessage("SomeJson".ToUtf8Bytes(), "SomeType".ToUtf8Bytes(), Position: 0)); - + await store.DeleteFunction(storedId); await store.GetFunction(storedId).ShouldBeNullAsync(); - await store.CorrelationStore.GetCorrelations(storedId).ShouldBeEmptyAsync(); await store.EffectsStore.GetEffectResults(storedId).ShouldBeEmptyAsync(); await store.MessageStore.GetMessages(storedId).ShouldBeEmptyAsync(); } diff --git a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/CrashableFunctionStore.cs b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/CrashableFunctionStore.cs index c012cb425..28ad548db 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/CrashableFunctionStore.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/CrashableFunctionStore.cs @@ -20,7 +20,6 @@ public class CrashableFunctionStore : IFunctionStore private readonly CrashableEffectStore _crashableEffectStore; public IEffectsStore EffectsStore => _crashableEffectStore; - public ICorrelationStore CorrelationStore => _crashed ? throw new TimeoutException() : _inner.CorrelationStore; public IReplicaStore ReplicaStore => _crashed ? throw new TimeoutException() : _inner.ReplicaStore; public CrashableFunctionStore(IFunctionStore inner) diff --git a/Core/Cleipnir.ResilientFunctions/BaseRegistration.cs b/Core/Cleipnir.ResilientFunctions/BaseRegistration.cs index 86e8ef232..23aecfc58 100644 --- a/Core/Cleipnir.ResilientFunctions/BaseRegistration.cs +++ b/Core/Cleipnir.ResilientFunctions/BaseRegistration.cs @@ -23,9 +23,6 @@ protected BaseRegistration(StoredType storedType, Postman postman, IFunctionStor UtcNow = utcNow; } - public Task RouteMessage(T message, string correlationId, string? idempotencyKey = null) where T : class - => Postman.RouteMessage(message, correlationId, idempotencyKey); - public StoredId MapToStoredId(FlowInstance instance) => StoredId.Create(StoredType, instance.Value); public Task Interrupt(IEnumerable instances) diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/InvocationHelper.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/InvocationHelper.cs index 3398c3ff5..69d2763eb 100644 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/InvocationHelper.cs +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/InvocationHelper.cs @@ -402,12 +402,6 @@ public Effect CreateEffect(StoredId storedId, FlowId flowId, IReadOnlyList CreateExistingEffects(FlowId flowId) { diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs index 5ff19eeb9..b8ed35664 100644 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs @@ -280,13 +280,11 @@ await _invocationHelper.PersistFunctionInStore( _flowsManager ); - var correlations = _invocationHelper.CreateCorrelations(flowId); - var flowState = _flowsManager.CreateFlow(storedId, flowTimeouts); var queueManager = _invocationHelper.CreateQueueManager(flowId, storedId, effect, flowState, flowTimeouts, _unhandledExceptionHandler); disposables.Add(queueManager); var messageWriter = _invocationHelper.CreateMessageWriter(storedId); - var workflow = new Workflow(flowId, storedId, effect, correlations, queueManager, _invocationHelper.UtcNow, messageWriter); + var workflow = new Workflow(flowId, storedId, effect, queueManager, _invocationHelper.UtcNow, messageWriter); return new PreparedInvocation( persisted, @@ -333,7 +331,6 @@ private async Task PrepareForReInvocation(StoredId storedI var effect = _invocationHelper.CreateEffect(storedId, flowId, effects, flowTimeouts, storageSession, _flowsManager); - var correlations = _invocationHelper.CreateCorrelations(flowId); var flowState = _flowsManager.CreateFlow(storedId, flowTimeouts); var queueManager = _invocationHelper.CreateQueueManager(flowId, storedId, effect, flowState, flowTimeouts, _unhandledExceptionHandler); disposables.Add(queueManager); @@ -343,7 +340,6 @@ private async Task PrepareForReInvocation(StoredId storedI flowId, storedId, effect, - correlations, queueManager, _invocationHelper.UtcNow, messageWriter diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Workflow.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Workflow.cs index c1e01be0b..aae91c5d7 100644 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Workflow.cs +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Workflow.cs @@ -13,26 +13,22 @@ public class Workflow public FlowId FlowId { get; } internal StoredId StoredId { get; } public Effect Effect { get; } - public Correlations Correlations { get; } private QueueManager _queueManager; private readonly UtcNow _utcNow; private MessageWriter MessageWriter { get; } - public Workflow(FlowId flowId, StoredId storedId, Effect effect, Correlations correlations, QueueManager queueManager, UtcNow utcNow, MessageWriter messageWriter) + public Workflow(FlowId flowId, StoredId storedId, Effect effect, QueueManager queueManager, UtcNow utcNow, MessageWriter messageWriter) { FlowId = flowId; StoredId = storedId; Effect = effect; - Correlations = correlations; _queueManager = queueManager; _utcNow = utcNow; MessageWriter = messageWriter; } - public async Task RegisterCorrelation(string correlation) => await Correlations.Register(correlation); - public Task Delay(TimeSpan @for, bool suspend = true, string? alias = null) => Delay(until: _utcNow() + @for, suspend, alias); public Task Delay(DateTime until, bool suspend = true, string? alias = null) { diff --git a/Core/Cleipnir.ResilientFunctions/Domain/ControlPanel.cs b/Core/Cleipnir.ResilientFunctions/Domain/ControlPanel.cs index d77d58abf..43b8ac3d6 100644 --- a/Core/Cleipnir.ResilientFunctions/Domain/ControlPanel.cs +++ b/Core/Cleipnir.ResilientFunctions/Domain/ControlPanel.cs @@ -19,14 +19,13 @@ internal ControlPanel( Status status, long expires, ExistingEffects effects, ExistingMessages messages, - Correlations correlations, FatalWorkflowException? fatalWorkflowException, UtcNow utcNow ) : base( invoker, invocationHelper, flowId, storedId, ownerReplica, status, expires, innerParam: Unit.Instance, innerResult: Unit.Instance, effects, - messages, correlations, fatalWorkflowException, + messages, fatalWorkflowException, utcNow ) { } @@ -63,14 +62,13 @@ internal ControlPanel( Status status, long expires, TParam innerParam, ExistingEffects effects, ExistingMessages messages, - Correlations correlations, FatalWorkflowException? fatalWorkflowException, UtcNow utcNow ) : base( invoker, invocationHelper, flowId, storedId, ownerReplica, status, expires, innerParam, innerResult: Unit.Instance, effects, - messages, correlations, fatalWorkflowException, + messages, fatalWorkflowException, utcNow ) { } @@ -112,13 +110,13 @@ internal ControlPanel( long expires, TParam innerParam, TReturn? innerResult, ExistingEffects effects, ExistingMessages messages, - Correlations correlations, FatalWorkflowException? fatalWorkflowException, + FatalWorkflowException? fatalWorkflowException, UtcNow utcNow ) : base( invoker, invocationHelper, flowId, storedId, ownerReplica, status, expires, innerParam, innerResult, effects, messages, - correlations, fatalWorkflowException, + fatalWorkflowException, utcNow ) { } @@ -170,7 +168,6 @@ internal BaseControlPanel( TReturn? innerResult, ExistingEffects effects, ExistingMessages messages, - Correlations correlations, FatalWorkflowException? fatalWorkflowException, UtcNow utcNow) { @@ -188,7 +185,6 @@ internal BaseControlPanel( : new DateTime(expires, DateTimeKind.Utc)) : null; Effects = effects; Messages = messages; - Correlations = correlations; FatalWorkflowException = fatalWorkflowException; UtcNow = utcNow; } @@ -203,8 +199,6 @@ internal BaseControlPanel( public ExistingEffects Effects { get; private set; } - public Correlations Correlations { get; private set; } - private TParam _innerParam; protected TParam InnerParam { @@ -340,7 +334,6 @@ public async Task Refresh() FatalWorkflowException = sf.FatalWorkflowException; Effects = await _invocationHelper.CreateExistingEffects(FlowId); Messages = _invocationHelper.CreateExistingMessages(FlowId); - Correlations = _invocationHelper.CreateCorrelations(FlowId); _innerParamChanged = false; } diff --git a/Core/Cleipnir.ResilientFunctions/Domain/ControlPanelFactory.cs b/Core/Cleipnir.ResilientFunctions/Domain/ControlPanelFactory.cs index 7b55219a4..2396b3cc1 100644 --- a/Core/Cleipnir.ResilientFunctions/Domain/ControlPanelFactory.cs +++ b/Core/Cleipnir.ResilientFunctions/Domain/ControlPanelFactory.cs @@ -42,7 +42,6 @@ internal ControlPanelFactory(FlowType flowType, StoredType storedType, Invoker? _correlations; - private readonly Lock _sync = new(); - - private async Task> GetCorrelations() - { - lock (_sync) - if (_correlations is not null) - return _correlations; - - var correlations = (await correlationStore.GetCorrelations(storedId)) - .ToHashSet(); - - lock (_sync) - if (_correlations is null) - return _correlations = correlations; - else - return _correlations; - } - - public async Task Register(string correlation) - { - var registered = await GetCorrelations(); - - lock (_sync) - if (registered.Contains(correlation)) - return; - - await correlationStore.SetCorrelation(storedId, correlation); - - lock (_sync) - registered.Add(correlation); - } - - public async Task Contains(string correlation) - { - var registered = await GetCorrelations(); - lock (_sync) - return registered.Contains(correlation); - } - - public async Task Remove(string correlation) - { - var registered = await GetCorrelations(); - - lock (_sync) - if (!registered.Contains(correlation)) - return; - - await correlationStore.RemoveCorrelation(storedId, correlation); - - lock (_sync) - registered.Remove(correlation); - } -} \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/FunctionsRegistry.cs b/Core/Cleipnir.ResilientFunctions/FunctionsRegistry.cs index eec12ec94..40c7fec6a 100644 --- a/Core/Cleipnir.ResilientFunctions/FunctionsRegistry.cs +++ b/Core/Cleipnir.ResilientFunctions/FunctionsRegistry.cs @@ -258,11 +258,7 @@ public FuncRegistration RegisterFunc( serializer ); - var postman = new Postman( - storedType, - _functionStore.CorrelationStore, - messageWriters - ); + var postman = new Postman(messageWriters); var registration = new FuncRegistration( flowType, @@ -347,11 +343,7 @@ private ParamlessRegistration RegisterParamless( serializer ); - var postman = new Postman( - storedType, - _functionStore.CorrelationStore, - messageWriters - ); + var postman = new Postman(messageWriters); var registration = new ParamlessRegistration( flowType, @@ -435,11 +427,7 @@ public ActionRegistration RegisterAction( _functionStore, serializer ); - var postman = new Postman( - storedType, - _functionStore.CorrelationStore, - messageWriters - ); + var postman = new Postman(messageWriters); var registration = new ActionRegistration( flowType, diff --git a/Core/Cleipnir.ResilientFunctions/Messaging/Postman.cs b/Core/Cleipnir.ResilientFunctions/Messaging/Postman.cs index ebd32bcf5..0fe16be12 100644 --- a/Core/Cleipnir.ResilientFunctions/Messaging/Postman.cs +++ b/Core/Cleipnir.ResilientFunctions/Messaging/Postman.cs @@ -4,21 +4,14 @@ namespace Cleipnir.ResilientFunctions.Messaging; -public class Postman(StoredType storedType, ICorrelationStore correlationStore, MessageWriters messageWriters) +public class Postman(MessageWriters messageWriters) { public Task SendMessage( - StoredId instance, - TMessage message, + StoredId instance, + TMessage message, string? idempotencyKey = null ) where TMessage : class => messageWriters.For(instance).AppendMessage(message, idempotencyKey); - + public async Task SendMessages(IReadOnlyList messages) => await messageWriters.AppendMessages(messages); - - public async Task RouteMessage(TMessage message, string correlationId, string? idempotencyKey = null) where TMessage : class - { - var flowInstances = await correlationStore.GetCorrelations(storedType, correlationId); - foreach (var storedId in flowInstances) - await SendMessage(storedId, message, idempotencyKey); - } } \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Storage/ICorrelationStore.cs b/Core/Cleipnir.ResilientFunctions/Storage/ICorrelationStore.cs deleted file mode 100644 index 442383256..000000000 --- a/Core/Cleipnir.ResilientFunctions/Storage/ICorrelationStore.cs +++ /dev/null @@ -1,16 +0,0 @@ -using System.Collections.Generic; -using System.Threading.Tasks; - -namespace Cleipnir.ResilientFunctions.Storage; - -public interface ICorrelationStore -{ - public Task Initialize(); - public Task Truncate(); - - public Task SetCorrelation(StoredId storedId, string correlationId); - public Task> GetCorrelations(StoredType flowType, string correlationId); - public Task> GetCorrelations(StoredId storedId); - public Task RemoveCorrelations(StoredId storedId); - public Task RemoveCorrelation(StoredId storedId, string correlationId); -} \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Storage/IFunctionStore.cs b/Core/Cleipnir.ResilientFunctions/Storage/IFunctionStore.cs index 3d9678fcb..c6f795b6c 100644 --- a/Core/Cleipnir.ResilientFunctions/Storage/IFunctionStore.cs +++ b/Core/Cleipnir.ResilientFunctions/Storage/IFunctionStore.cs @@ -11,7 +11,6 @@ public interface IFunctionStore public ITypeStore TypeStore { get; } public IMessageStore MessageStore { get; } public IEffectsStore EffectsStore { get; } - public ICorrelationStore CorrelationStore { get; } public IReplicaStore ReplicaStore { get; } public Task Initialize(); diff --git a/Core/Cleipnir.ResilientFunctions/Storage/InMemoryCorrelationStore.cs b/Core/Cleipnir.ResilientFunctions/Storage/InMemoryCorrelationStore.cs deleted file mode 100644 index 457daef14..000000000 --- a/Core/Cleipnir.ResilientFunctions/Storage/InMemoryCorrelationStore.cs +++ /dev/null @@ -1,112 +0,0 @@ -using System.Collections.Generic; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using Cleipnir.ResilientFunctions.Domain; -using Cleipnir.ResilientFunctions.Helpers; - -namespace Cleipnir.ResilientFunctions.Storage; - -public class InMemoryCorrelationStore : ICorrelationStore -{ - private readonly Dictionary> _correlations = new(); - private readonly Dictionary> _reverseLookup = new(); - private readonly Lock _sync = new(); - - public Task Initialize() => Task.CompletedTask; - - public Task Truncate() - { - lock (_sync) - { - _correlations.Clear(); - _reverseLookup.Clear(); - } - - return Task.CompletedTask; - } - - public Task SetCorrelation(StoredId storedId, string correlationId) - { - lock (_sync) - { - if (!_correlations.ContainsKey(storedId)) - _correlations[storedId] = new HashSet(); - - _correlations[storedId].Add(correlationId); - - if (!_reverseLookup.ContainsKey(correlationId)) - _reverseLookup[correlationId] = new HashSet(); - - _reverseLookup[correlationId].Add(storedId); - } - - return Task.CompletedTask; - } - - public Task> GetCorrelations(string correlationId) - { - lock (_sync) - { - if (!_reverseLookup.ContainsKey(correlationId)) - return new List().CastTo>().ToTask(); - - return _reverseLookup[correlationId].ToList().CastTo>().ToTask(); - } - } - - public Task> GetCorrelations(StoredType flowType, string correlationId) - { - lock (_sync) - { - return _correlations - .Where(kv => kv.Key.Type == flowType && kv.Value.Contains(correlationId)) - .Select(kv => kv.Key) - .ToList() - .CastTo>() - .ToTask(); - } - } - - public Task> GetCorrelations(StoredId storedId) - { - lock (_sync) - { - if (!_correlations.ContainsKey(storedId)) - return new List().CastTo>().ToTask(); - - return _correlations[storedId] - .ToList() - .CastTo>() - .ToTask(); - } - } - - public Task RemoveCorrelations(StoredId storedId) - { - lock (_sync) - { - if (!_correlations.ContainsKey(storedId)) - return Task.CompletedTask; - - var correlations = _correlations[storedId]; - foreach (var correlation in correlations) - _reverseLookup[correlation].Remove(storedId); - - _correlations.Remove(storedId); - } - - return Task.CompletedTask; - } - - public Task RemoveCorrelation(StoredId storedId, string correlationId) - { - lock (_sync) - { - _correlations[storedId].Remove(correlationId); - _reverseLookup[correlationId].Remove(storedId); - } - - return Task.CompletedTask; - } -} \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs b/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs index 03c0cd07f..62c71bb72 100644 --- a/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs +++ b/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs @@ -20,8 +20,6 @@ public class InMemoryFunctionStore : IFunctionStore, IMessageStore public IMessageStore MessageStore => this; private readonly InMemoryEffectsStore _effectsStore = new(); public IEffectsStore EffectsStore => _effectsStore; - private readonly InMemoryCorrelationStore _correlationStore = new(); - public ICorrelationStore CorrelationStore => _correlationStore; public IReplicaStore ReplicaStore { get; } = new InMemoryReplicaStore(); public Task Initialize() => Task.CompletedTask; @@ -543,8 +541,7 @@ public virtual Task DeleteFunction(StoredId storedId) { _messages.Remove(storedId); _effectsStore.Remove(storedId); - _correlationStore.RemoveCorrelations(storedId); - + return _states.Remove(storedId).ToTask(); } } diff --git a/Samples/Sample.ConsoleApp/Utils/CrashableFunctionStore.cs b/Samples/Sample.ConsoleApp/Utils/CrashableFunctionStore.cs index 30d036e12..1aa519ba9 100644 --- a/Samples/Sample.ConsoleApp/Utils/CrashableFunctionStore.cs +++ b/Samples/Sample.ConsoleApp/Utils/CrashableFunctionStore.cs @@ -16,7 +16,6 @@ public class CrashableFunctionStore : IFunctionStore public ITypeStore TypeStore => _inner.TypeStore; public IMessageStore MessageStore => _inner.MessageStore; public IEffectsStore EffectsStore => _inner.EffectsStore; - public ICorrelationStore CorrelationStore => _inner.CorrelationStore; public IReplicaStore ReplicaStore => _inner.ReplicaStore; public CrashableFunctionStore(IFunctionStore inner) => _inner = inner; diff --git a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/CorrelationStoreTests.cs b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/CorrelationStoreTests.cs deleted file mode 100644 index 62208fdcb..000000000 --- a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/CorrelationStoreTests.cs +++ /dev/null @@ -1,31 +0,0 @@ -using Microsoft.VisualStudio.TestTools.UnitTesting; - -namespace Cleipnir.ResilientFunctions.MariaDb.Tests; - -[TestClass] -public class CorrelationStoreTests : Cleipnir.ResilientFunctions.Tests.TestTemplates.CorrelationStoreTests -{ - [TestMethod] - public override Task SunshineScenario() - => SunshineScenario(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task TwoDifferentFunctionsCanUseTheSameCorrelationId() - => TwoDifferentFunctionsCanUseTheSameCorrelationId(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task FunctionCorrelationsCanBeDeleted() - => FunctionCorrelationsCanBeDeleted(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task SingleFunctionCorrelationCanBeDeleted() - => SingleFunctionCorrelationCanBeDeleted(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task SingleFunctionCanHaveMultipleCorrelations() - => SingleFunctionCanHaveMultipleCorrelations(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task FunctionInstancesCanBeFetchedForFunctionTypeAndCorrelation() - => FunctionInstancesCanBeFetchedForFunctionTypeAndCorrelation(FunctionStoreFactory.Create()); -} \ No newline at end of file diff --git a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/RFunctionTests/ControlPanelTests.cs b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/RFunctionTests/ControlPanelTests.cs index 13ca1950f..be0eba94b 100644 --- a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/RFunctionTests/ControlPanelTests.cs +++ b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/RFunctionTests/ControlPanelTests.cs @@ -137,10 +137,6 @@ public override Task ExistingEffectCanBeRemoved() public override Task EffectsAreOnlyFetchedOnPropertyInvocation() => EffectsAreOnlyFetchedOnPropertyInvocation(FunctionStoreFactory.Create()); - [TestMethod] - public override Task CorrelationsCanBeChanged() - => CorrelationsCanBeChanged(FunctionStoreFactory.Create()); - [TestMethod] public override Task DeleteRemovesFunctionFromAllStores() => DeleteRemovesFunctionFromAllStores(FunctionStoreFactory.Create()); diff --git a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/RFunctionTests/RoutingTests.cs b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/RFunctionTests/RoutingTests.cs index 774549285..03a35bd38 100644 --- a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/RFunctionTests/RoutingTests.cs +++ b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/RFunctionTests/RoutingTests.cs @@ -21,14 +21,6 @@ public override Task MessageIsRoutedToFuncInstance() public override Task MessageIsRoutedUsingRoutingInfo() => MessageIsRoutedUsingRoutingInfo(FunctionStoreFactory.Create()); - [TestMethod] - public override Task MessageIsRoutedToParamlessInstanceUsingCorrelationId() - => MessageIsRoutedToParamlessInstanceUsingCorrelationId(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task MessageIsRoutedToMultipleInstancesUsingCorrelationId() - => MessageIsRoutedToMultipleInstancesUsingCorrelationId(FunctionStoreFactory.Create()); - [TestMethod] public override Task ParamlessInstanceIsStartedByMessage() => ParamlessInstanceIsStartedByMessage(FunctionStoreFactory.Create()); diff --git a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbCorrelationStore.cs b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbCorrelationStore.cs deleted file mode 100644 index 0ec0df6e4..000000000 --- a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbCorrelationStore.cs +++ /dev/null @@ -1,193 +0,0 @@ -using Cleipnir.ResilientFunctions.Helpers; -using Cleipnir.ResilientFunctions.Storage; -using MySqlConnector; - -namespace Cleipnir.ResilientFunctions.MariaDb; - -public class MariaDbCorrelationStore : ICorrelationStore -{ - private readonly string _connectionString; - private readonly string _tablePrefix; - - public MariaDbCorrelationStore(string connectionString, string tablePrefix = "") - { - _connectionString = connectionString; - _tablePrefix = tablePrefix; - } - - private string? _initialize; - public async Task Initialize() - { - await using var conn = await CreateConnection(); - _initialize ??= @$" - CREATE TABLE IF NOT EXISTS {_tablePrefix}_correlations ( - type INT NOT NULL, - instance CHAR(32) NOT NULL, - correlation VARCHAR(200) NOT NULL, - PRIMARY KEY (type, instance, correlation), - INDEX (correlation, type, instance) - );"; - var command = new MySqlCommand(_initialize, conn); - await command.ExecuteNonQueryAsync(); - } - - private string? _truncateSql; - public async Task Truncate() - { - await using var conn = await CreateConnection(); - _truncateSql ??= $"TRUNCATE TABLE {_tablePrefix}_correlations"; - var command = new MySqlCommand(_truncateSql, conn); - await command.ExecuteNonQueryAsync(); - } - - private string? _setCorrelationSql; - public async Task SetCorrelation(StoredId storedId, string correlationId) - { - await using var conn = await CreateConnection(); - _setCorrelationSql ??= $@" - INSERT IGNORE INTO {_tablePrefix}_correlations - (type, instance, correlation) - VALUES - (?, ?, ?)"; - - await using var command = new MySqlCommand(_setCorrelationSql, conn) - { - Parameters = - { - new() {Value = storedId.Type.Value.ToInt()}, - new() {Value = storedId.AsGuid.ToString("N")}, - new() {Value = correlationId}, - } - }; - - await command.ExecuteNonQueryAsync(); - } - - private string? _getCorrelationsSql; - public async Task> GetCorrelations(string correlationId) - { - await using var conn = await CreateConnection(); - _getCorrelationsSql ??= @$" - SELECT type, instance - FROM {_tablePrefix}_correlations - WHERE correlation = ?"; - await using var command = new MySqlCommand(_getCorrelationsSql, conn) - { - Parameters = - { - new() { Value = correlationId }, - } - }; - - await using var reader = await command.ExecuteReaderAsync(); - - var states = new List(); - while (await reader.ReadAsync()) - { - var instance = reader.GetString(1).ToGuid(); - states.Add(new StoredId(instance)); - } - - return states; - } - - private string? _getInstancesForFlowTypeAndCorrelation; - public async Task> GetCorrelations(StoredType storedType, string correlationId) - { - await using var conn = await CreateConnection(); - _getInstancesForFlowTypeAndCorrelation ??= @$" - SELECT instance - FROM {_tablePrefix}_correlations - WHERE type = ? AND correlation = ?"; - await using var command = new MySqlCommand(_getInstancesForFlowTypeAndCorrelation, conn) - { - Parameters = - { - new() { Value = storedType.Value }, - new() { Value = correlationId }, - } - }; - - await using var reader = await command.ExecuteReaderAsync(); - - var ids = new List(); - while (await reader.ReadAsync()) - { - var id = reader.GetString(0).ToGuid().ToStoredId(); - ids.Add(id); - } - - return ids; - } - - private string? _getCorrelationsForFunctionSql; - public async Task> GetCorrelations(StoredId storedId) - { - await using var conn = await CreateConnection(); - _getCorrelationsForFunctionSql ??= @$" - SELECT correlation - FROM {_tablePrefix}_correlations - WHERE type = ? AND instance = ?"; - await using var command = new MySqlCommand(_getCorrelationsForFunctionSql, conn) - { - Parameters = - { - new() { Value = storedId.Type.Value.ToInt() }, - new() { Value = storedId.AsGuid.ToString("N") }, - } - }; - - await using var reader = await command.ExecuteReaderAsync(); - - var correlations = new List(); - while (await reader.ReadAsync()) - { - var correlation = reader.GetString(0); - correlations.Add(correlation); - } - - return correlations; - } - - private string? _removeCorrelationsSql; - public async Task RemoveCorrelations(StoredId storedId) - { - await using var conn = await CreateConnection(); - _removeCorrelationsSql ??= $"DELETE FROM {_tablePrefix}_correlations WHERE type = ? AND instance = ?"; - await using var command = new MySqlCommand(_removeCorrelationsSql, conn) - { - Parameters = - { - new() { Value = storedId.Type.Value.ToInt() }, - new() { Value = storedId.AsGuid.ToString("N") }, - } - }; - - await command.ExecuteNonQueryAsync(); - } - - private string? _removeCorrelationSql; - public async Task RemoveCorrelation(StoredId storedId, string correlationId) - { - await using var conn = await CreateConnection(); - _removeCorrelationSql ??= $"DELETE FROM {_tablePrefix}_correlations WHERE type = ? AND instance = ? AND correlation = ?"; - await using var command = new MySqlCommand(_removeCorrelationSql, conn) - { - Parameters = - { - new() { Value = storedId.Type.Value.ToInt() }, - new() { Value = storedId.AsGuid.ToString("N") }, - new() { Value = correlationId }, - } - }; - - await command.ExecuteNonQueryAsync(); - } - - private async Task CreateConnection() - { - var conn = new MySqlConnection(_connectionString); - await conn.OpenAsync(); - return conn; - } -} \ No newline at end of file diff --git a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbFunctionStore.cs b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbFunctionStore.cs index d9ae6fa45..946d39033 100644 --- a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbFunctionStore.cs +++ b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbFunctionStore.cs @@ -25,9 +25,6 @@ public class MariaDbFunctionStore : IFunctionStore private readonly MariaDbTypeStore _typeStore; public ITypeStore TypeStore => _typeStore; - - private readonly MariaDbCorrelationStore _correlationStore; - public ICorrelationStore CorrelationStore => _correlationStore; private readonly MariaDbReplicaStore _replicaStore; public IReplicaStore ReplicaStore => _replicaStore; @@ -44,7 +41,6 @@ public MariaDbFunctionStore(string connectionString, string tablePrefix = "") _messageStore = new MariaDbMessageStore(connectionString, _sqlGenerator, tablePrefix); _effectsStore = new MariaDbEffectsStore(connectionString, tablePrefix); - _correlationStore = new MariaDbCorrelationStore(connectionString, tablePrefix); _typeStore = new MariaDbTypeStore(connectionString, tablePrefix); _replicaStore = new MariaDbReplicaStore(connectionString, tablePrefix); } @@ -57,7 +53,6 @@ public async Task Initialize() await MessageStore.Initialize(); await EffectsStore.Initialize(); - await CorrelationStore.Initialize(); await _typeStore.Initialize(); await _replicaStore.Initialize(); await using var conn = await CreateOpenConnection(_connectionString); @@ -88,7 +83,6 @@ public async Task TruncateTables() { await _messageStore.TruncateTable(); await _effectsStore.Truncate(); - await _correlationStore.Truncate(); await _typeStore.Truncate(); await _replicaStore.Truncate(); @@ -884,7 +878,6 @@ public async Task DeleteFunction(StoredId storedId) { await _messageStore.Truncate(storedId); await _effectsStore.Remove(storedId); - await _correlationStore.RemoveCorrelations(storedId); return await DeleteStoredFunction(storedId); } diff --git a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/CorrelationStoreTests.cs b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/CorrelationStoreTests.cs deleted file mode 100644 index 1742b716e..000000000 --- a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/CorrelationStoreTests.cs +++ /dev/null @@ -1,32 +0,0 @@ -using System.Threading.Tasks; -using Microsoft.VisualStudio.TestTools.UnitTesting; - -namespace Cleipnir.ResilientFunctions.PostgreSQL.Tests; - -[TestClass] -public class CorrelationStoreTests : Cleipnir.ResilientFunctions.Tests.TestTemplates.CorrelationStoreTests -{ - [TestMethod] - public override Task SunshineScenario() - => SunshineScenario(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task TwoDifferentFunctionsCanUseTheSameCorrelationId() - => TwoDifferentFunctionsCanUseTheSameCorrelationId(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task FunctionCorrelationsCanBeDeleted() - => FunctionCorrelationsCanBeDeleted(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task SingleFunctionCorrelationCanBeDeleted() - => SingleFunctionCorrelationCanBeDeleted(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task SingleFunctionCanHaveMultipleCorrelations() - => SingleFunctionCanHaveMultipleCorrelations(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task FunctionInstancesCanBeFetchedForFunctionTypeAndCorrelation() - => FunctionInstancesCanBeFetchedForFunctionTypeAndCorrelation(FunctionStoreFactory.Create()); -} \ No newline at end of file diff --git a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/RFunctionTests/ControlPanelTests.cs b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/RFunctionTests/ControlPanelTests.cs index 6d4633631..163fa6d7b 100644 --- a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/RFunctionTests/ControlPanelTests.cs +++ b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/RFunctionTests/ControlPanelTests.cs @@ -137,11 +137,7 @@ public override Task ExistingEffectCanBeReplaced() [TestMethod] public override Task EffectCanBeStarted() => EffectCanBeStarted(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task CorrelationsCanBeChanged() - => CorrelationsCanBeChanged(FunctionStoreFactory.Create()); - + [TestMethod] public override Task DeleteRemovesFunctionFromAllStores() => DeleteRemovesFunctionFromAllStores(FunctionStoreFactory.Create()); diff --git a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/RFunctionTests/RoutingTests.cs b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/RFunctionTests/RoutingTests.cs index ebd130795..1b1231861 100644 --- a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/RFunctionTests/RoutingTests.cs +++ b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/RFunctionTests/RoutingTests.cs @@ -22,14 +22,6 @@ public override Task MessageIsRoutedToFuncInstance() public override Task MessageIsRoutedUsingRoutingInfo() => MessageIsRoutedUsingRoutingInfo(FunctionStoreFactory.Create()); - [TestMethod] - public override Task MessageIsRoutedToParamlessInstanceUsingCorrelationId() - => MessageIsRoutedToParamlessInstanceUsingCorrelationId(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task MessageIsRoutedToMultipleInstancesUsingCorrelationId() - => MessageIsRoutedToMultipleInstancesUsingCorrelationId(FunctionStoreFactory.Create()); - [TestMethod] public override Task ParamlessInstanceIsStartedByMessage() => ParamlessInstanceIsStartedByMessage(FunctionStoreFactory.Create()); diff --git a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlCorrelationStore.cs b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlCorrelationStore.cs deleted file mode 100644 index 3d84cf633..000000000 --- a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlCorrelationStore.cs +++ /dev/null @@ -1,196 +0,0 @@ -using System.Collections.Generic; -using System.Threading.Tasks; -using Cleipnir.ResilientFunctions.Domain; -using Cleipnir.ResilientFunctions.Helpers; -using Cleipnir.ResilientFunctions.Storage; -using Npgsql; - -namespace Cleipnir.ResilientFunctions.PostgreSQL; - -public class PostgreSqlCorrelationStore(string connectionString, string tablePrefix = "") : ICorrelationStore -{ - private string? _initializeSql; - public async Task Initialize() - { - await using var conn = await CreateConnection(); - _initializeSql ??= @$" - CREATE TABLE IF NOT EXISTS {tablePrefix}_correlations ( - type INT, - instance UUID, - correlation VARCHAR(255) NOT NULL, - PRIMARY KEY(type, instance, correlation) - ); - - CREATE UNIQUE INDEX IF NOT EXISTS idx_{tablePrefix}_correlations - ON {tablePrefix}_correlations(correlation, type, instance);"; - var command = new NpgsqlCommand(_initializeSql, conn); - await command.ExecuteNonQueryAsync(); - } - - private string? _truncateSql; - public async Task Truncate() - { - await using var conn = await CreateConnection(); - _truncateSql ??= $"TRUNCATE TABLE {tablePrefix}_correlations"; - var command = new NpgsqlCommand(_truncateSql, conn); - await command.ExecuteNonQueryAsync(); - } - - private string? _setCorrelationSql; - public async Task SetCorrelation(StoredId storedId, string correlationId) - { - await using var conn = await CreateConnection(); - _setCorrelationSql ??= $@" - INSERT INTO {tablePrefix}_correlations - (type, instance, correlation) - VALUES - ($1, $2, $3) - ON CONFLICT (type, instance, correlation) DO NOTHING"; - - await using var command = new NpgsqlCommand(_setCorrelationSql, conn) - { - Parameters = - { - new() {Value = storedId.Type.Value.ToInt()}, - new() {Value = storedId.AsGuid}, - new() {Value = correlationId} - } - }; - - await command.ExecuteNonQueryAsync(); - } - - private string? _getCorrelationsSql; - public async Task> GetCorrelations(string correlationId) - { - await using var conn = await CreateConnection(); - _getCorrelationsSql ??= @$" - SELECT type, instance - FROM {tablePrefix}_correlations - WHERE correlation = $1"; - await using var command = new NpgsqlCommand(_getCorrelationsSql, conn) - { - Parameters = - { - new() { Value = correlationId } - } - }; - - await using var reader = await command.ExecuteReaderAsync(); - - var ids = new List(); - while (await reader.ReadAsync()) - { - var id = new StoredId(reader.GetGuid(1)); - ids.Add(id); - } - - return ids; - } - - private string? _getInstancesForFlowTypeAndCorrelation; - public async Task> GetCorrelations(StoredType flowType, string correlationId) - { - await using var conn = await CreateConnection(); - _getInstancesForFlowTypeAndCorrelation ??= @$" - SELECT instance - FROM {tablePrefix}_correlations - WHERE type = $1 AND correlation = $2"; - await using var command = new NpgsqlCommand(_getInstancesForFlowTypeAndCorrelation, conn) - { - Parameters = - { - new() { Value = flowType.Value.ToInt() }, - new() { Value = correlationId } - } - }; - - await using var reader = await command.ExecuteReaderAsync(); - - var ids = new List(); - while (await reader.ReadAsync()) - { - var id = reader.GetGuid(0).ToStoredId(); - ids.Add(id); - } - - return ids; - } - - private string? _getCorrelationsForFunction; - public async Task> GetCorrelations(StoredId storedId) - { - await using var conn = await CreateConnection(); - _getCorrelationsForFunction ??= @$" - SELECT correlation - FROM {tablePrefix}_correlations - WHERE type = $1 AND instance = $2"; - await using var command = new NpgsqlCommand(_getCorrelationsForFunction, conn) - { - Parameters = - { - new() { Value = storedId.Type.Value.ToInt() }, - new() { Value = storedId.AsGuid } - } - }; - - await using var reader = await command.ExecuteReaderAsync(); - - var correlations = new List(); - while (await reader.ReadAsync()) - { - var correlation = reader.GetString(0); - correlations.Add(correlation); - } - - return correlations; - } - - private string? _removeCorrelationsSql; - public async Task RemoveCorrelations(StoredId storedId) - { - await using var conn = await CreateConnection(); - _removeCorrelationsSql ??= $@" - DELETE FROM {tablePrefix}_correlations - WHERE type = $1 AND instance = $2"; - - await using var command = new NpgsqlCommand(_removeCorrelationsSql, conn) - { - Parameters = - { - new() {Value = storedId.Type.Value.ToInt()}, - new() {Value = storedId.AsGuid}, - } - }; - - await command.ExecuteNonQueryAsync(); - } - - private string? _removeCorrelationSql; - public async Task RemoveCorrelation(StoredId storedId, string correlationId) - { - await using var conn = await CreateConnection(); - _removeCorrelationSql ??= $@" - DELETE FROM {tablePrefix}_correlations - WHERE type = $1 AND instance = $2 AND correlation = $3"; - - await using var command = new NpgsqlCommand(_removeCorrelationSql, conn) - { - Parameters = - { - new() {Value = storedId.Type.Value.ToInt()}, - new() {Value = storedId.AsGuid}, - new() {Value = correlationId}, - } - }; - - await command.ExecuteNonQueryAsync(); - } - - private async Task CreateConnection() - { - var conn = new NpgsqlConnection(connectionString); - await conn.OpenAsync(); - return conn; - } -} \ No newline at end of file diff --git a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlFunctionStore.cs b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlFunctionStore.cs index 339f1a333..b14205de7 100644 --- a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlFunctionStore.cs +++ b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlFunctionStore.cs @@ -25,9 +25,6 @@ public class PostgreSqlFunctionStore : IFunctionStore private readonly PostgreSqlEffectsStore _effectsStore; public IEffectsStore EffectsStore => _effectsStore; - - private readonly ICorrelationStore _correlationStore; - public ICorrelationStore CorrelationStore => _correlationStore; private readonly PostgreSqlDbReplicaStore _replicaStore; public IReplicaStore ReplicaStore => _replicaStore; @@ -42,7 +39,6 @@ public PostgreSqlFunctionStore(string connectionString, string tablePrefix = "") _messageStore = new PostgreSqlMessageStore(connectionString, _sqlGenerator, _tableName); _effectsStore = new PostgreSqlEffectsStore(connectionString, _tableName); - _correlationStore = new PostgreSqlCorrelationStore(connectionString, _tableName); _typeStore = new PostgreSqlTypeStore(connectionString, _tableName); _replicaStore = new PostgreSqlDbReplicaStore(connectionString, _tableName); } @@ -62,7 +58,6 @@ public async Task Initialize() await _messageStore.Initialize(); await _effectsStore.Initialize(); - await _correlationStore.Initialize(); await _typeStore.Initialize(); await _replicaStore.Initialize(); await using var conn = await CreateConnection(); @@ -98,7 +93,6 @@ public async Task TruncateTables() { await _messageStore.TruncateTable(); await _effectsStore.Truncate(); - await _correlationStore.Truncate(); await _typeStore.Truncate(); await _replicaStore.Truncate(); @@ -799,7 +793,6 @@ public async Task DeleteFunction(StoredId storedId) { await _messageStore.Truncate(storedId); await _effectsStore.Remove(storedId); - await _correlationStore.RemoveCorrelations(storedId); return await DeleteStoredFunction(storedId); } diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/CorrelationStoreTests.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/CorrelationStoreTests.cs deleted file mode 100644 index 15dcf29cf..000000000 --- a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/CorrelationStoreTests.cs +++ /dev/null @@ -1,32 +0,0 @@ -using System.Threading.Tasks; -using Microsoft.VisualStudio.TestTools.UnitTesting; - -namespace Cleipnir.ResilientFunctions.SqlServer.Tests; - -[TestClass] -public class CorrelationStoreTests : Cleipnir.ResilientFunctions.Tests.TestTemplates.CorrelationStoreTests -{ - [TestMethod] - public override Task SunshineScenario() - => SunshineScenario(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task TwoDifferentFunctionsCanUseTheSameCorrelationId() - => TwoDifferentFunctionsCanUseTheSameCorrelationId(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task FunctionCorrelationsCanBeDeleted() - => FunctionCorrelationsCanBeDeleted(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task SingleFunctionCorrelationCanBeDeleted() - => SingleFunctionCorrelationCanBeDeleted(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task SingleFunctionCanHaveMultipleCorrelations() - => SingleFunctionCanHaveMultipleCorrelations(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task FunctionInstancesCanBeFetchedForFunctionTypeAndCorrelation() - => FunctionInstancesCanBeFetchedForFunctionTypeAndCorrelation(FunctionStoreFactory.Create()); -} \ No newline at end of file diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/RFunctionTests/ControlPanelTests.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/RFunctionTests/ControlPanelTests.cs index e81675c71..1c2d4c923 100644 --- a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/RFunctionTests/ControlPanelTests.cs +++ b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/RFunctionTests/ControlPanelTests.cs @@ -138,10 +138,6 @@ public override Task ExistingEffectCanBeReplaced() public override Task EffectCanBeStarted() => EffectCanBeStarted(FunctionStoreFactory.Create()); - [TestMethod] - public override Task CorrelationsCanBeChanged() - => CorrelationsCanBeChanged(FunctionStoreFactory.Create()); - [TestMethod] public override Task DeleteRemovesFunctionFromAllStores() => DeleteRemovesFunctionFromAllStores(FunctionStoreFactory.Create()); diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/RFunctionTests/RoutingTests.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/RFunctionTests/RoutingTests.cs index 1feae36b6..386a91217 100644 --- a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/RFunctionTests/RoutingTests.cs +++ b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/RFunctionTests/RoutingTests.cs @@ -22,15 +22,6 @@ public override Task MessageIsRoutedToFuncInstance() public override Task MessageIsRoutedUsingRoutingInfo() => MessageIsRoutedUsingRoutingInfo(FunctionStoreFactory.Create()); - [TestMethod] - public override Task MessageIsRoutedToParamlessInstanceUsingCorrelationId() - => MessageIsRoutedToParamlessInstanceUsingCorrelationId(FunctionStoreFactory.Create()); - - - [TestMethod] - public override Task MessageIsRoutedToMultipleInstancesUsingCorrelationId() - => MessageIsRoutedToMultipleInstancesUsingCorrelationId(FunctionStoreFactory.Create()); - [TestMethod] public override Task ParamlessInstanceIsStartedByMessage() => ParamlessInstanceIsStartedByMessage(FunctionStoreFactory.Create()); diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerCorrelationsStore.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerCorrelationsStore.cs deleted file mode 100644 index 7cd299d20..000000000 --- a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerCorrelationsStore.cs +++ /dev/null @@ -1,179 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Threading.Tasks; -using Cleipnir.ResilientFunctions.Domain; -using Cleipnir.ResilientFunctions.Helpers; -using Cleipnir.ResilientFunctions.Storage; -using Microsoft.Data.SqlClient; - -namespace Cleipnir.ResilientFunctions.SqlServer; - -public class SqlServerCorrelationsStore(string connectionString, string tablePrefix = "") : ICorrelationStore -{ - private readonly Func> _connFunc = CreateConnection(connectionString); - - private string? _initializeSql; - public async Task Initialize() - { - await using var conn = await _connFunc(); - _initializeSql ??= @$" - CREATE TABLE {tablePrefix}_Correlations ( - Type INT, - Instance UNIQUEIDENTIFIER, - Correlation NVARCHAR(200), - PRIMARY KEY (Type, Instance, Correlation) - ); - - CREATE INDEX IDX_{tablePrefix}_Correlations ON {tablePrefix}_Correlations (Correlation, Type, Instance); - "; - - await using var command = new SqlCommand(_initializeSql, conn); - try - { - await command.ExecuteNonQueryAsync(); - } catch (SqlException exception) when (exception.Number == 2714) {} - } - - private string? _truncateSql; - public async Task Truncate() - { - await using var conn = await _connFunc(); - _truncateSql ??= $"TRUNCATE TABLE {tablePrefix}_Correlations"; - await using var command = new SqlCommand(_truncateSql, conn); - await command.ExecuteNonQueryAsync(); - } - - private string? _setCorrelationSql; - public async Task SetCorrelation(StoredId storedId, string correlationId) - { - await using var conn = await _connFunc(); - _setCorrelationSql ??= $@" - MERGE INTO {tablePrefix}_Correlations - USING (VALUES (@Type, @Instance, @Correlation)) - AS source (Type, Instance, Correlation) - ON {tablePrefix}_Correlations.Type = source.Type AND - {tablePrefix}_Correlations.Instance = source.Instance AND - {tablePrefix}_Correlations.Correlation = source.Correlation - WHEN NOT MATCHED THEN - INSERT (Type, Instance, Correlation) - VALUES (source.Type, source.Instance, source.Correlation);"; - await using var command = new SqlCommand(_setCorrelationSql, conn); - command.Parameters.AddWithValue("@Type", storedId.Type.Value.ToInt()); - command.Parameters.AddWithValue("@Instance", storedId.AsGuid); - command.Parameters.AddWithValue("@Correlation", correlationId); - - await command.ExecuteNonQueryAsync(); - } - - private string? _getCorrelations; - public async Task> GetCorrelations(string correlationId) - { - await using var conn = await _connFunc(); - _getCorrelations ??= @$" - SELECT Type, Instance - FROM {tablePrefix}_Correlations - WHERE Correlation = @CorrelationId"; - - await using var command = new SqlCommand(_getCorrelations, conn); - command.Parameters.AddWithValue("@CorrelationId", correlationId); - - var functions = new List(); - await using var reader = await command.ExecuteReaderAsync(); - while (reader.HasRows && reader.Read()) - { - var id = reader.GetGuid(1); - functions.Add(new StoredId(id)); - } - - return functions; - } - - private string? _getInstancesForFunctionTypeAndCorrelationId; - public async Task> GetCorrelations(StoredType storedType, string correlationId) - { - await using var conn = await _connFunc(); - _getInstancesForFunctionTypeAndCorrelationId ??= @$" - SELECT Instance - FROM {tablePrefix}_Correlations - WHERE Type = @Type AND Correlation = @Correlation"; - - await using var command = new SqlCommand(_getInstancesForFunctionTypeAndCorrelationId, conn); - command.Parameters.AddWithValue("@Type", storedType.Value.ToInt()); - command.Parameters.AddWithValue("@Correlation", correlationId); - - var ids = new List(); - await using var reader = await command.ExecuteReaderAsync(); - while (reader.HasRows && reader.Read()) - { - var id = reader.GetGuid(0).ToStoredId(); - ids.Add(id); - } - - return ids; - } - - private string? _getCorrelationsForFunction; - public async Task> GetCorrelations(StoredId storedId) - { - await using var conn = await _connFunc(); - _getCorrelationsForFunction ??= @$" - SELECT Correlation - FROM {tablePrefix}_Correlations - WHERE Type = @Type AND Instance = @Instance"; - - await using var command = new SqlCommand(_getCorrelationsForFunction, conn); - command.Parameters.AddWithValue("@Type", storedId.Type.Value.ToInt()); - command.Parameters.AddWithValue("@Instance", storedId.AsGuid); - - var correlations = new List(); - await using var reader = await command.ExecuteReaderAsync(); - while (reader.HasRows && reader.Read()) - { - var correlation = reader.GetString(0); - correlations.Add(correlation); - } - - return correlations; - } - - private string? _removeCorrelationsSql; - public async Task RemoveCorrelations(StoredId storedId) - { - await using var conn = await _connFunc(); - _removeCorrelationsSql ??= @$" - DELETE FROM {tablePrefix}_Correlations - WHERE Type = @Type AND Instance = @Instance"; - - await using var command = new SqlCommand(_removeCorrelationsSql, conn); - command.Parameters.AddWithValue("@Type", storedId.Type.Value.ToInt()); - command.Parameters.AddWithValue("@Instance", storedId.AsGuid); - - await command.ExecuteNonQueryAsync(); - } - - private string? _removeCorrelationSql; - public async Task RemoveCorrelation(StoredId storedId, string correlationId) - { - await using var conn = await _connFunc(); - _removeCorrelationSql ??= @$" - DELETE FROM {tablePrefix}_Correlations - WHERE Type = @Type AND Instance = @Instance AND Correlation = @Correlation"; - - await using var command = new SqlCommand(_removeCorrelationSql, conn); - command.Parameters.AddWithValue("@Type", storedId.Type.Value.ToInt()); - command.Parameters.AddWithValue("@Instance", storedId.AsGuid); - command.Parameters.AddWithValue("@Correlation", correlationId); - - await command.ExecuteNonQueryAsync(); - } - - private static Func> CreateConnection(string connectionString) - { - return async () => - { - var connection = new SqlConnection(connectionString); - await connection.OpenAsync(); - return connection; - }; - } -} \ No newline at end of file diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerFunctionStore.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerFunctionStore.cs index a285fa500..25414058d 100644 --- a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerFunctionStore.cs +++ b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerFunctionStore.cs @@ -24,11 +24,9 @@ public class SqlServerFunctionStore : IFunctionStore private readonly SqlServerEffectsStore _effectsStore; private readonly SqlServerMessageStore _messageStore; - private readonly SqlServerCorrelationsStore _correlationStore; private readonly SqlServerTypeStore _typeStore; - + public IEffectsStore EffectsStore => _effectsStore; - public ICorrelationStore CorrelationStore => _correlationStore; public ITypeStore TypeStore => _typeStore; public IMessageStore MessageStore => _messageStore; private readonly SqlServerReplicaStore _replicaStore; @@ -45,7 +43,6 @@ public SqlServerFunctionStore(string connectionString, string tablePrefix = "") _connFunc = CreateConnection(connectionString); _messageStore = new SqlServerMessageStore(connectionString, _sqlGenerator, _tableName); _effectsStore = new SqlServerEffectsStore(connectionString, _tableName); - _correlationStore = new SqlServerCorrelationsStore(connectionString, _tableName); _typeStore = new SqlServerTypeStore(connectionString, _tableName); _replicaStore = new SqlServerReplicaStore(connectionString, _tableName); } @@ -68,7 +65,6 @@ public async Task Initialize() await _messageStore.Initialize(); await _effectsStore.Initialize(); - await _correlationStore.Initialize(); await _typeStore.Initialize(); await _replicaStore.Initialize(); await using var conn = await _connFunc(); @@ -117,7 +113,6 @@ public async Task TruncateTables() { await _messageStore.TruncateTable(); await _effectsStore.Truncate(); - await _correlationStore.Truncate(); await _typeStore.Truncate(); await _replicaStore.Truncate(); @@ -925,7 +920,6 @@ public async Task DeleteFunction(StoredId storedId) { await _messageStore.Truncate(storedId); await _effectsStore.Remove(storedId); - await _correlationStore.RemoveCorrelations(storedId); return await DeleteStoredFunction(storedId); }