diff --git a/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/CorrelationStoreTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/CorrelationStoreTests.cs deleted file mode 100644 index 6d7db7aa..00000000 --- a/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/CorrelationStoreTests.cs +++ /dev/null @@ -1,32 +0,0 @@ -using System.Threading.Tasks; -using Microsoft.VisualStudio.TestTools.UnitTesting; - -namespace Cleipnir.ResilientFunctions.Tests.InMemoryTests; - -[TestClass] -public class CorrelationStoreTests : Cleipnir.ResilientFunctions.Tests.TestTemplates.CorrelationStoreTests -{ - [TestMethod] - public override Task SunshineScenario() - => SunshineScenario(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task TwoDifferentFunctionsCanUseTheSameCorrelationId() - => TwoDifferentFunctionsCanUseTheSameCorrelationId(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task FunctionCorrelationsCanBeDeleted() - => FunctionCorrelationsCanBeDeleted(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task SingleFunctionCorrelationCanBeDeleted() - => SingleFunctionCorrelationCanBeDeleted(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task SingleFunctionCanHaveMultipleCorrelations() - => SingleFunctionCanHaveMultipleCorrelations(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task FunctionInstancesCanBeFetchedForFunctionTypeAndCorrelation() - => FunctionInstancesCanBeFetchedForFunctionTypeAndCorrelation(FunctionStoreFactory.Create()); -} \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/RFunctionTests/ControlPanelTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/RFunctionTests/ControlPanelTests.cs index 80dba307..292389a8 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/RFunctionTests/ControlPanelTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/RFunctionTests/ControlPanelTests.cs @@ -138,10 +138,6 @@ public override Task ExistingEffectCanBeSetToFailed() public override Task SaveChangesPersistsChangedResult() => SaveChangesPersistsChangedResult(Utils.CreateInMemoryFunctionStoreTask()); - [TestMethod] - public override Task CorrelationsCanBeChanged() - => CorrelationsCanBeChanged(FunctionStoreFactory.Create()); - [TestMethod] public override Task DeleteRemovesFunctionFromAllStores() => DeleteRemovesFunctionFromAllStores(FunctionStoreFactory.Create()); diff --git a/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/RFunctionTests/RoutingTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/RFunctionTests/RoutingTests.cs index cbdfa76d..c9770a5e 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/RFunctionTests/RoutingTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/RFunctionTests/RoutingTests.cs @@ -22,14 +22,6 @@ public override Task MessageIsRoutedToFuncInstance() public override Task MessageIsRoutedUsingRoutingInfo() => MessageIsRoutedUsingRoutingInfo(FunctionStoreFactory.Create()); - [TestMethod] - public override Task MessageIsRoutedToParamlessInstanceUsingCorrelationId() - => MessageIsRoutedToParamlessInstanceUsingCorrelationId(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task MessageIsRoutedToMultipleInstancesUsingCorrelationId() - => MessageIsRoutedToMultipleInstancesUsingCorrelationId(FunctionStoreFactory.Create()); - [TestMethod] public override Task ParamlessInstanceIsStartedByMessage() => ParamlessInstanceIsStartedByMessage(FunctionStoreFactory.Create()); diff --git a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/CorrelationStoreTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/CorrelationStoreTests.cs deleted file mode 100644 index 6955419e..00000000 --- a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/CorrelationStoreTests.cs +++ /dev/null @@ -1,115 +0,0 @@ -using System.Linq; -using System.Threading.Tasks; -using Cleipnir.ResilientFunctions.Helpers; -using Cleipnir.ResilientFunctions.Storage; -using Cleipnir.ResilientFunctions.Tests.Utils; -using Shouldly; - -namespace Cleipnir.ResilientFunctions.Tests.TestTemplates; - -public abstract class CorrelationStoreTests -{ - public abstract Task SunshineScenario(); - public async Task SunshineScenario(Task storeTask) - { - var correlationStore = await storeTask.SelectAsync(s => s.CorrelationStore); - var storedId = TestStoredId.Create(); - - await correlationStore.SetCorrelation(storedId, "SomeCorrelationId"); - await correlationStore - .GetCorrelations(storedId.Type, correlationId: "SomeCorrelationId") - .SelectAsync(c => c.Single()) - .ShouldBeAsync(storedId); - - await correlationStore - .GetCorrelations(storedId) - .SelectAsync(c => c.Single()) - .ShouldBeAsync("SomeCorrelationId"); - } - - public abstract Task TwoDifferentFunctionsCanUseTheSameCorrelationId(); - public async Task TwoDifferentFunctionsCanUseTheSameCorrelationId(Task storeTask) - { - var correlationStore = await storeTask.SelectAsync(s => s.CorrelationStore); - var storedId1 = TestStoredId.Create(); - var storedId2 = TestStoredId.Create(); - - await correlationStore.SetCorrelation(storedId1, correlationId: "TwoDifferentFunctionsCanUseTheSameCorrelationId"); - await correlationStore.SetCorrelation(storedId2, correlationId: "TwoDifferentFunctionsCanUseTheSameCorrelationId"); - - var instances = await correlationStore - .GetCorrelations(storedId1.Type, correlationId: "TwoDifferentFunctionsCanUseTheSameCorrelationId"); - instances.Count.ShouldBe(1); - instances.Single().ShouldBe(storedId1); - - instances = await correlationStore - .GetCorrelations(storedId2.Type, correlationId: "TwoDifferentFunctionsCanUseTheSameCorrelationId"); - instances.Count.ShouldBe(1); - instances.Single().ShouldBe(storedId2); - } - - public abstract Task FunctionCorrelationsCanBeDeleted(); - public async Task FunctionCorrelationsCanBeDeleted(Task storeTask) - { - var correlationStore = await storeTask.SelectAsync(s => s.CorrelationStore); - var functionId = TestStoredId.Create(); - - await correlationStore.SetCorrelation(functionId, "SomeCorrelationId1"); - await correlationStore.SetCorrelation(functionId, "SomeCorrelationId2"); - - await correlationStore.RemoveCorrelations(functionId); - - (await correlationStore.GetCorrelations(functionId)).ShouldBeEmpty(); - } - - public abstract Task SingleFunctionCorrelationCanBeDeleted(); - public async Task SingleFunctionCorrelationCanBeDeleted(Task storeTask) - { - var correlationStore = await storeTask.SelectAsync(s => s.CorrelationStore); - var functionId = TestStoredId.Create(); - - await correlationStore.SetCorrelation(functionId, "SomeCorrelationId1"); - await correlationStore.SetCorrelation(functionId, "SomeCorrelationId2"); - - await correlationStore.RemoveCorrelation(functionId, "SomeCorrelationId1"); - - await correlationStore - .GetCorrelations(functionId) - .SelectAsync(c => c.Single()) - .ShouldBeAsync("SomeCorrelationId2"); - } - - public abstract Task SingleFunctionCanHaveMultipleCorrelations(); - public async Task SingleFunctionCanHaveMultipleCorrelations(Task storeTask) - { - var correlationStore = await storeTask.SelectAsync(s => s.CorrelationStore); - var functionId = TestStoredId.Create(); - - await correlationStore.SetCorrelation(functionId, "SomeCorrelationId1"); - await correlationStore.SetCorrelation(functionId, "SomeCorrelationId2"); - - var correlations = await correlationStore.GetCorrelations(functionId); - correlations.Count.ShouldBe(2); - correlations.Any(c => c == "SomeCorrelationId1").ShouldBeTrue(); - correlations.Any(c => c == "SomeCorrelationId2").ShouldBeTrue(); - } - - public abstract Task FunctionInstancesCanBeFetchedForFunctionTypeAndCorrelation(); - public async Task FunctionInstancesCanBeFetchedForFunctionTypeAndCorrelation(Task storeTask) - { - var correlationStore = await storeTask.SelectAsync(s => s.CorrelationStore); - var storedId1 = TestStoredId.Create(); - var storedId2 = TestStoredId.Create(storedId1.Type); - var storedId3 = TestStoredId.Create(); - - await correlationStore.SetCorrelation(storedId1, "SomeCorrelationId1"); - await correlationStore.SetCorrelation(storedId2, "SomeCorrelationId1"); - await correlationStore.SetCorrelation(storedId3, "SomeCorrelationId1"); - - var instances = await correlationStore.GetCorrelations(storedId1.Type, "SomeCorrelationId1"); - instances.Count.ShouldBe(2); - instances.Any(i => i == storedId1).ShouldBeTrue(); - instances.Any(i => i == storedId2).ShouldBeTrue(); - } -} - diff --git a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/ControlPanelTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/ControlPanelTests.cs index c1540c03..b7e55665 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/ControlPanelTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/ControlPanelTests.cs @@ -1225,52 +1225,16 @@ protected async Task SaveChangesPersistsChangedResult(Task store unhandledExceptionCatcher.ShouldNotHaveExceptions(); } - public abstract Task CorrelationsCanBeChanged(); - protected async Task CorrelationsCanBeChanged(Task storeTask) - { - var unhandledExceptionCatcher = new UnhandledExceptionCatcher(); - - var store = await storeTask; - var functionId = TestFlowId.Create(); - var (flowType, flowInstance) = functionId; - using var functionsRegistry = new FunctionsRegistry(store, new Settings(unhandledExceptionCatcher.Catch)); - - var registration = functionsRegistry.RegisterParamless( - flowType, - async workflow => - { - await workflow.Correlations.Register("SomeCorrelation"); - - } - ); - - await registration.Run(flowInstance.Value); - - var controlPanel = await registration.ControlPanel(flowInstance.Value); - controlPanel.ShouldNotBeNull(); - - await controlPanel.Correlations.Contains("SomeCorrelation").ShouldBeTrueAsync(); - await controlPanel.Correlations.Remove("SomeCorrelation"); - await controlPanel.Correlations.Register("SomeNewCorrelation"); - - controlPanel = await registration.ControlPanel(flowInstance.Value); - controlPanel.ShouldNotBeNull(); - await controlPanel.Correlations.Contains("SomeCorrelation").ShouldBeFalseAsync(); - await controlPanel.Correlations.Contains("SomeNewCorrelation").ShouldBeTrueAsync(); - - unhandledExceptionCatcher.ShouldNotHaveExceptions(); - } - public abstract Task DeleteRemovesFunctionFromAllStores(); protected async Task DeleteRemovesFunctionFromAllStores(Task storeTask) { var unhandledExceptionCatcher = new UnhandledExceptionCatcher(); - + var store = await storeTask; var functionId = TestFlowId.Create(); var (flowType, flowInstance) = functionId; using var functionsRegistry = new FunctionsRegistry(store, new Settings(unhandledExceptionCatcher.Catch)); - + var registration = functionsRegistry.RegisterParamless( flowType, inner: () => Task.CompletedTask @@ -1281,7 +1245,6 @@ protected async Task DeleteRemovesFunctionFromAllStores(Task sto var controlPanel = await registration.ControlPanel(flowInstance.Value); controlPanel.ShouldNotBeNull(); - await controlPanel.Correlations.Register("SomeCorrelation"); await controlPanel.Effects.SetSucceeded("SomeEffect".GetHashCode()); await controlPanel.Messages.Append("Some Message"); @@ -1289,20 +1252,16 @@ protected async Task DeleteRemovesFunctionFromAllStores(Task sto var storedId = registration.MapToStoredId(functionId.Instance); await store.GetFunction(storedId).ShouldBeNullAsync(); - + await store.MessageStore.GetMessages(storedId) .SelectAsync(msgs => msgs.Count == 0) .ShouldBeTrueAsync(); - await store.CorrelationStore.GetCorrelations(storedId) - .SelectAsync(c => c.Any()) - .ShouldBeFalseAsync(); - await store.EffectsStore .GetEffectResults(storedId) .SelectAsync(e => e.Any()) .ShouldBeFalseAsync(); - + unhandledExceptionCatcher.ShouldNotHaveExceptions(); } diff --git a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/RoutingTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/RoutingTests.cs index 52f0d745..3fa2846d 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/RoutingTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/RoutingTests.cs @@ -167,116 +167,6 @@ await registration.SendMessage( unhandledExceptionCatcher.ShouldNotHaveExceptions(); } - #region Route using Correlation - - public abstract Task MessageIsRoutedToParamlessInstanceUsingCorrelationId(); - protected async Task MessageIsRoutedToParamlessInstanceUsingCorrelationId(Task storeTask) - { - var store = await storeTask; - var functionId = TestFlowId.Create(); - var (flowType, flowInstance) = functionId; - - var unhandledExceptionCatcher = new UnhandledExceptionCatcher(); - using var functionsRegistry = new FunctionsRegistry( - store, - new Settings(unhandledExceptionCatcher.Catch) - ); - - var correlationId = $"SomeCorrelationId_{Guid.NewGuid().ToString()}"; - - var correlationIdRegisteredFlag = new SyncedFlag(); - var syncedFlag = new SyncedFlag(); - var syncedValue = new Synced(); - - var registration = functionsRegistry.RegisterParamless( - flowType, - inner: async workflow => - { - await workflow.RegisterCorrelation(correlationId); - correlationIdRegisteredFlag.Raise(); - - var someMessage = await workflow.Message(); - syncedValue.Value = someMessage.Value; - syncedFlag.Raise(); - } - ); - - await registration.Schedule(flowInstance); - await correlationIdRegisteredFlag.WaitForRaised(); - - await registration.RouteMessage( - new SomeCorrelatedMessage(correlationId, "SomeValue!"), - correlationId - ); - - await syncedFlag.WaitForRaised(); - syncedValue.Value.ShouldBe("SomeValue!"); - - unhandledExceptionCatcher.ShouldNotHaveExceptions(); - } - - public abstract Task MessageIsRoutedToMultipleInstancesUsingCorrelationId(); - protected async Task MessageIsRoutedToMultipleInstancesUsingCorrelationId(Task storeTask) - { - var store = await storeTask; - var functionId = TestFlowId.Create(); - var (flowType, flowInstance1) = functionId; - var (_, flowInstance2) = TestFlowId.Create().WithTypeId(flowType); - - var unhandledExceptionCatcher = new UnhandledExceptionCatcher(); - using var functionsRegistry = new FunctionsRegistry( - store, - new Settings(unhandledExceptionCatcher.Catch) - ); - - var correlationId = $"SomeCorrelationId_{Guid.NewGuid().ToString()}"; - - var registration = functionsRegistry.RegisterParamless( - flowType, - inner: async workflow => - { - await workflow.RegisterCorrelation(correlationId); - await workflow.Message(); - } - ); - var storedType = registration.StoredType; - - await registration.Schedule(flowInstance1); - await registration.Schedule(flowInstance2); - - await BusyWait.Until(() => store - .CorrelationStore - .GetCorrelations(storedType, correlationId) - .SelectAsync(l => l.Count == 2) - ); - - await registration.RouteMessage( - new SomeCorrelatedMessage(correlationId, "SomeValue!"), - correlationId - ); - - var controlPanel1 = await registration.ControlPanel(flowInstance1); - controlPanel1.ShouldNotBeNull(); - await BusyWait.Until(async () => - { - await controlPanel1.Refresh(); - return controlPanel1.Status == Status.Succeeded; - }); - - - var controlPanel2 = await registration.ControlPanel(flowInstance2); - controlPanel2.ShouldNotBeNull(); - await BusyWait.Until(async () => - { - await controlPanel2.Refresh(); - return controlPanel2.Status == Status.Succeeded; - }); - - unhandledExceptionCatcher.ShouldNotHaveExceptions(); - } - - #endregion - #region Paramless is started by message public abstract Task ParamlessInstanceIsStartedByMessage(); @@ -319,5 +209,4 @@ await registration.SendMessage( #endregion public record SomeMessage(string RouteTo, string Value); - public record SomeCorrelatedMessage(string Correlation, string Value); } \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/StoreCrudTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/StoreCrudTests.cs index b257b51f..c89f29c0 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/StoreCrudTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/StoreCrudTests.cs @@ -184,18 +184,16 @@ await store.EffectsStore.SetEffectResult( StoredEffect.CreateCompleted(1.ToEffectId(), "SomeStateJson".ToUtf8Bytes(), alias: null).ToStoredChange(storedId, Insert), session: null ); - await store.CorrelationStore.SetCorrelation(storedId, "SomeCorrelationId"); await store.EffectsStore.SetEffectResult( storedId, new StoredEffect(2.ToEffectId(), WorkStatus.Completed, Result: null, StoredException: null, Alias: null).ToStoredChange(storedId, Insert), session: null ); await store.MessageStore.AppendMessage(storedId, new StoredMessage("SomeJson".ToUtf8Bytes(), "SomeType".ToUtf8Bytes(), Position: 0)); - + await store.DeleteFunction(storedId); await store.GetFunction(storedId).ShouldBeNullAsync(); - await store.CorrelationStore.GetCorrelations(storedId).ShouldBeEmptyAsync(); await store.EffectsStore.GetEffectResults(storedId).ShouldBeEmptyAsync(); await store.MessageStore.GetMessages(storedId).ShouldBeEmptyAsync(); } diff --git a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/CrashableFunctionStore.cs b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/CrashableFunctionStore.cs index c012cb42..28ad548d 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/CrashableFunctionStore.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/CrashableFunctionStore.cs @@ -20,7 +20,6 @@ public class CrashableFunctionStore : IFunctionStore private readonly CrashableEffectStore _crashableEffectStore; public IEffectsStore EffectsStore => _crashableEffectStore; - public ICorrelationStore CorrelationStore => _crashed ? throw new TimeoutException() : _inner.CorrelationStore; public IReplicaStore ReplicaStore => _crashed ? throw new TimeoutException() : _inner.ReplicaStore; public CrashableFunctionStore(IFunctionStore inner) diff --git a/Core/Cleipnir.ResilientFunctions/BaseRegistration.cs b/Core/Cleipnir.ResilientFunctions/BaseRegistration.cs index 86e8ef23..23aecfc5 100644 --- a/Core/Cleipnir.ResilientFunctions/BaseRegistration.cs +++ b/Core/Cleipnir.ResilientFunctions/BaseRegistration.cs @@ -23,9 +23,6 @@ protected BaseRegistration(StoredType storedType, Postman postman, IFunctionStor UtcNow = utcNow; } - public Task RouteMessage(T message, string correlationId, string? idempotencyKey = null) where T : class - => Postman.RouteMessage(message, correlationId, idempotencyKey); - public StoredId MapToStoredId(FlowInstance instance) => StoredId.Create(StoredType, instance.Value); public Task Interrupt(IEnumerable instances) diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/InvocationHelper.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/InvocationHelper.cs index 3398c3ff..69d2763e 100644 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/InvocationHelper.cs +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/InvocationHelper.cs @@ -402,12 +402,6 @@ public Effect CreateEffect(StoredId storedId, FlowId flowId, IReadOnlyList CreateExistingEffects(FlowId flowId) { diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs index 5ff19eeb..b8ed3566 100644 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs @@ -280,13 +280,11 @@ await _invocationHelper.PersistFunctionInStore( _flowsManager ); - var correlations = _invocationHelper.CreateCorrelations(flowId); - var flowState = _flowsManager.CreateFlow(storedId, flowTimeouts); var queueManager = _invocationHelper.CreateQueueManager(flowId, storedId, effect, flowState, flowTimeouts, _unhandledExceptionHandler); disposables.Add(queueManager); var messageWriter = _invocationHelper.CreateMessageWriter(storedId); - var workflow = new Workflow(flowId, storedId, effect, correlations, queueManager, _invocationHelper.UtcNow, messageWriter); + var workflow = new Workflow(flowId, storedId, effect, queueManager, _invocationHelper.UtcNow, messageWriter); return new PreparedInvocation( persisted, @@ -333,7 +331,6 @@ private async Task PrepareForReInvocation(StoredId storedI var effect = _invocationHelper.CreateEffect(storedId, flowId, effects, flowTimeouts, storageSession, _flowsManager); - var correlations = _invocationHelper.CreateCorrelations(flowId); var flowState = _flowsManager.CreateFlow(storedId, flowTimeouts); var queueManager = _invocationHelper.CreateQueueManager(flowId, storedId, effect, flowState, flowTimeouts, _unhandledExceptionHandler); disposables.Add(queueManager); @@ -343,7 +340,6 @@ private async Task PrepareForReInvocation(StoredId storedI flowId, storedId, effect, - correlations, queueManager, _invocationHelper.UtcNow, messageWriter diff --git a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Workflow.cs b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Workflow.cs index c1e01be0..aae91c5d 100644 --- a/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Workflow.cs +++ b/Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Workflow.cs @@ -13,26 +13,22 @@ public class Workflow public FlowId FlowId { get; } internal StoredId StoredId { get; } public Effect Effect { get; } - public Correlations Correlations { get; } private QueueManager _queueManager; private readonly UtcNow _utcNow; private MessageWriter MessageWriter { get; } - public Workflow(FlowId flowId, StoredId storedId, Effect effect, Correlations correlations, QueueManager queueManager, UtcNow utcNow, MessageWriter messageWriter) + public Workflow(FlowId flowId, StoredId storedId, Effect effect, QueueManager queueManager, UtcNow utcNow, MessageWriter messageWriter) { FlowId = flowId; StoredId = storedId; Effect = effect; - Correlations = correlations; _queueManager = queueManager; _utcNow = utcNow; MessageWriter = messageWriter; } - public async Task RegisterCorrelation(string correlation) => await Correlations.Register(correlation); - public Task Delay(TimeSpan @for, bool suspend = true, string? alias = null) => Delay(until: _utcNow() + @for, suspend, alias); public Task Delay(DateTime until, bool suspend = true, string? alias = null) { diff --git a/Core/Cleipnir.ResilientFunctions/Domain/ControlPanel.cs b/Core/Cleipnir.ResilientFunctions/Domain/ControlPanel.cs index d77d58ab..43b8ac3d 100644 --- a/Core/Cleipnir.ResilientFunctions/Domain/ControlPanel.cs +++ b/Core/Cleipnir.ResilientFunctions/Domain/ControlPanel.cs @@ -19,14 +19,13 @@ internal ControlPanel( Status status, long expires, ExistingEffects effects, ExistingMessages messages, - Correlations correlations, FatalWorkflowException? fatalWorkflowException, UtcNow utcNow ) : base( invoker, invocationHelper, flowId, storedId, ownerReplica, status, expires, innerParam: Unit.Instance, innerResult: Unit.Instance, effects, - messages, correlations, fatalWorkflowException, + messages, fatalWorkflowException, utcNow ) { } @@ -63,14 +62,13 @@ internal ControlPanel( Status status, long expires, TParam innerParam, ExistingEffects effects, ExistingMessages messages, - Correlations correlations, FatalWorkflowException? fatalWorkflowException, UtcNow utcNow ) : base( invoker, invocationHelper, flowId, storedId, ownerReplica, status, expires, innerParam, innerResult: Unit.Instance, effects, - messages, correlations, fatalWorkflowException, + messages, fatalWorkflowException, utcNow ) { } @@ -112,13 +110,13 @@ internal ControlPanel( long expires, TParam innerParam, TReturn? innerResult, ExistingEffects effects, ExistingMessages messages, - Correlations correlations, FatalWorkflowException? fatalWorkflowException, + FatalWorkflowException? fatalWorkflowException, UtcNow utcNow ) : base( invoker, invocationHelper, flowId, storedId, ownerReplica, status, expires, innerParam, innerResult, effects, messages, - correlations, fatalWorkflowException, + fatalWorkflowException, utcNow ) { } @@ -170,7 +168,6 @@ internal BaseControlPanel( TReturn? innerResult, ExistingEffects effects, ExistingMessages messages, - Correlations correlations, FatalWorkflowException? fatalWorkflowException, UtcNow utcNow) { @@ -188,7 +185,6 @@ internal BaseControlPanel( : new DateTime(expires, DateTimeKind.Utc)) : null; Effects = effects; Messages = messages; - Correlations = correlations; FatalWorkflowException = fatalWorkflowException; UtcNow = utcNow; } @@ -203,8 +199,6 @@ internal BaseControlPanel( public ExistingEffects Effects { get; private set; } - public Correlations Correlations { get; private set; } - private TParam _innerParam; protected TParam InnerParam { @@ -340,7 +334,6 @@ public async Task Refresh() FatalWorkflowException = sf.FatalWorkflowException; Effects = await _invocationHelper.CreateExistingEffects(FlowId); Messages = _invocationHelper.CreateExistingMessages(FlowId); - Correlations = _invocationHelper.CreateCorrelations(FlowId); _innerParamChanged = false; } diff --git a/Core/Cleipnir.ResilientFunctions/Domain/ControlPanelFactory.cs b/Core/Cleipnir.ResilientFunctions/Domain/ControlPanelFactory.cs index 7b55219a..2396b3cc 100644 --- a/Core/Cleipnir.ResilientFunctions/Domain/ControlPanelFactory.cs +++ b/Core/Cleipnir.ResilientFunctions/Domain/ControlPanelFactory.cs @@ -42,7 +42,6 @@ internal ControlPanelFactory(FlowType flowType, StoredType storedType, Invoker? _correlations; - private readonly Lock _sync = new(); - - private async Task> GetCorrelations() - { - lock (_sync) - if (_correlations is not null) - return _correlations; - - var correlations = (await correlationStore.GetCorrelations(storedId)) - .ToHashSet(); - - lock (_sync) - if (_correlations is null) - return _correlations = correlations; - else - return _correlations; - } - - public async Task Register(string correlation) - { - var registered = await GetCorrelations(); - - lock (_sync) - if (registered.Contains(correlation)) - return; - - await correlationStore.SetCorrelation(storedId, correlation); - - lock (_sync) - registered.Add(correlation); - } - - public async Task Contains(string correlation) - { - var registered = await GetCorrelations(); - lock (_sync) - return registered.Contains(correlation); - } - - public async Task Remove(string correlation) - { - var registered = await GetCorrelations(); - - lock (_sync) - if (!registered.Contains(correlation)) - return; - - await correlationStore.RemoveCorrelation(storedId, correlation); - - lock (_sync) - registered.Remove(correlation); - } -} \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/FunctionsRegistry.cs b/Core/Cleipnir.ResilientFunctions/FunctionsRegistry.cs index eec12ec9..40c7fec6 100644 --- a/Core/Cleipnir.ResilientFunctions/FunctionsRegistry.cs +++ b/Core/Cleipnir.ResilientFunctions/FunctionsRegistry.cs @@ -258,11 +258,7 @@ public FuncRegistration RegisterFunc( serializer ); - var postman = new Postman( - storedType, - _functionStore.CorrelationStore, - messageWriters - ); + var postman = new Postman(messageWriters); var registration = new FuncRegistration( flowType, @@ -347,11 +343,7 @@ private ParamlessRegistration RegisterParamless( serializer ); - var postman = new Postman( - storedType, - _functionStore.CorrelationStore, - messageWriters - ); + var postman = new Postman(messageWriters); var registration = new ParamlessRegistration( flowType, @@ -435,11 +427,7 @@ public ActionRegistration RegisterAction( _functionStore, serializer ); - var postman = new Postman( - storedType, - _functionStore.CorrelationStore, - messageWriters - ); + var postman = new Postman(messageWriters); var registration = new ActionRegistration( flowType, diff --git a/Core/Cleipnir.ResilientFunctions/Messaging/Postman.cs b/Core/Cleipnir.ResilientFunctions/Messaging/Postman.cs index ebd32bcf..0fe16be1 100644 --- a/Core/Cleipnir.ResilientFunctions/Messaging/Postman.cs +++ b/Core/Cleipnir.ResilientFunctions/Messaging/Postman.cs @@ -4,21 +4,14 @@ namespace Cleipnir.ResilientFunctions.Messaging; -public class Postman(StoredType storedType, ICorrelationStore correlationStore, MessageWriters messageWriters) +public class Postman(MessageWriters messageWriters) { public Task SendMessage( - StoredId instance, - TMessage message, + StoredId instance, + TMessage message, string? idempotencyKey = null ) where TMessage : class => messageWriters.For(instance).AppendMessage(message, idempotencyKey); - + public async Task SendMessages(IReadOnlyList messages) => await messageWriters.AppendMessages(messages); - - public async Task RouteMessage(TMessage message, string correlationId, string? idempotencyKey = null) where TMessage : class - { - var flowInstances = await correlationStore.GetCorrelations(storedType, correlationId); - foreach (var storedId in flowInstances) - await SendMessage(storedId, message, idempotencyKey); - } } \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Storage/ICorrelationStore.cs b/Core/Cleipnir.ResilientFunctions/Storage/ICorrelationStore.cs deleted file mode 100644 index 44238325..00000000 --- a/Core/Cleipnir.ResilientFunctions/Storage/ICorrelationStore.cs +++ /dev/null @@ -1,16 +0,0 @@ -using System.Collections.Generic; -using System.Threading.Tasks; - -namespace Cleipnir.ResilientFunctions.Storage; - -public interface ICorrelationStore -{ - public Task Initialize(); - public Task Truncate(); - - public Task SetCorrelation(StoredId storedId, string correlationId); - public Task> GetCorrelations(StoredType flowType, string correlationId); - public Task> GetCorrelations(StoredId storedId); - public Task RemoveCorrelations(StoredId storedId); - public Task RemoveCorrelation(StoredId storedId, string correlationId); -} \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Storage/IFunctionStore.cs b/Core/Cleipnir.ResilientFunctions/Storage/IFunctionStore.cs index 3d9678fc..c6f795b6 100644 --- a/Core/Cleipnir.ResilientFunctions/Storage/IFunctionStore.cs +++ b/Core/Cleipnir.ResilientFunctions/Storage/IFunctionStore.cs @@ -11,7 +11,6 @@ public interface IFunctionStore public ITypeStore TypeStore { get; } public IMessageStore MessageStore { get; } public IEffectsStore EffectsStore { get; } - public ICorrelationStore CorrelationStore { get; } public IReplicaStore ReplicaStore { get; } public Task Initialize(); diff --git a/Core/Cleipnir.ResilientFunctions/Storage/InMemoryCorrelationStore.cs b/Core/Cleipnir.ResilientFunctions/Storage/InMemoryCorrelationStore.cs deleted file mode 100644 index 457daef1..00000000 --- a/Core/Cleipnir.ResilientFunctions/Storage/InMemoryCorrelationStore.cs +++ /dev/null @@ -1,112 +0,0 @@ -using System.Collections.Generic; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using Cleipnir.ResilientFunctions.Domain; -using Cleipnir.ResilientFunctions.Helpers; - -namespace Cleipnir.ResilientFunctions.Storage; - -public class InMemoryCorrelationStore : ICorrelationStore -{ - private readonly Dictionary> _correlations = new(); - private readonly Dictionary> _reverseLookup = new(); - private readonly Lock _sync = new(); - - public Task Initialize() => Task.CompletedTask; - - public Task Truncate() - { - lock (_sync) - { - _correlations.Clear(); - _reverseLookup.Clear(); - } - - return Task.CompletedTask; - } - - public Task SetCorrelation(StoredId storedId, string correlationId) - { - lock (_sync) - { - if (!_correlations.ContainsKey(storedId)) - _correlations[storedId] = new HashSet(); - - _correlations[storedId].Add(correlationId); - - if (!_reverseLookup.ContainsKey(correlationId)) - _reverseLookup[correlationId] = new HashSet(); - - _reverseLookup[correlationId].Add(storedId); - } - - return Task.CompletedTask; - } - - public Task> GetCorrelations(string correlationId) - { - lock (_sync) - { - if (!_reverseLookup.ContainsKey(correlationId)) - return new List().CastTo>().ToTask(); - - return _reverseLookup[correlationId].ToList().CastTo>().ToTask(); - } - } - - public Task> GetCorrelations(StoredType flowType, string correlationId) - { - lock (_sync) - { - return _correlations - .Where(kv => kv.Key.Type == flowType && kv.Value.Contains(correlationId)) - .Select(kv => kv.Key) - .ToList() - .CastTo>() - .ToTask(); - } - } - - public Task> GetCorrelations(StoredId storedId) - { - lock (_sync) - { - if (!_correlations.ContainsKey(storedId)) - return new List().CastTo>().ToTask(); - - return _correlations[storedId] - .ToList() - .CastTo>() - .ToTask(); - } - } - - public Task RemoveCorrelations(StoredId storedId) - { - lock (_sync) - { - if (!_correlations.ContainsKey(storedId)) - return Task.CompletedTask; - - var correlations = _correlations[storedId]; - foreach (var correlation in correlations) - _reverseLookup[correlation].Remove(storedId); - - _correlations.Remove(storedId); - } - - return Task.CompletedTask; - } - - public Task RemoveCorrelation(StoredId storedId, string correlationId) - { - lock (_sync) - { - _correlations[storedId].Remove(correlationId); - _reverseLookup[correlationId].Remove(storedId); - } - - return Task.CompletedTask; - } -} \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs b/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs index 03c0cd07..62c71bb7 100644 --- a/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs +++ b/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs @@ -20,8 +20,6 @@ public class InMemoryFunctionStore : IFunctionStore, IMessageStore public IMessageStore MessageStore => this; private readonly InMemoryEffectsStore _effectsStore = new(); public IEffectsStore EffectsStore => _effectsStore; - private readonly InMemoryCorrelationStore _correlationStore = new(); - public ICorrelationStore CorrelationStore => _correlationStore; public IReplicaStore ReplicaStore { get; } = new InMemoryReplicaStore(); public Task Initialize() => Task.CompletedTask; @@ -543,8 +541,7 @@ public virtual Task DeleteFunction(StoredId storedId) { _messages.Remove(storedId); _effectsStore.Remove(storedId); - _correlationStore.RemoveCorrelations(storedId); - + return _states.Remove(storedId).ToTask(); } } diff --git a/Samples/Sample.ConsoleApp/Utils/CrashableFunctionStore.cs b/Samples/Sample.ConsoleApp/Utils/CrashableFunctionStore.cs index 30d036e1..1aa519ba 100644 --- a/Samples/Sample.ConsoleApp/Utils/CrashableFunctionStore.cs +++ b/Samples/Sample.ConsoleApp/Utils/CrashableFunctionStore.cs @@ -16,7 +16,6 @@ public class CrashableFunctionStore : IFunctionStore public ITypeStore TypeStore => _inner.TypeStore; public IMessageStore MessageStore => _inner.MessageStore; public IEffectsStore EffectsStore => _inner.EffectsStore; - public ICorrelationStore CorrelationStore => _inner.CorrelationStore; public IReplicaStore ReplicaStore => _inner.ReplicaStore; public CrashableFunctionStore(IFunctionStore inner) => _inner = inner; diff --git a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/CorrelationStoreTests.cs b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/CorrelationStoreTests.cs deleted file mode 100644 index 62208fdc..00000000 --- a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/CorrelationStoreTests.cs +++ /dev/null @@ -1,31 +0,0 @@ -using Microsoft.VisualStudio.TestTools.UnitTesting; - -namespace Cleipnir.ResilientFunctions.MariaDb.Tests; - -[TestClass] -public class CorrelationStoreTests : Cleipnir.ResilientFunctions.Tests.TestTemplates.CorrelationStoreTests -{ - [TestMethod] - public override Task SunshineScenario() - => SunshineScenario(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task TwoDifferentFunctionsCanUseTheSameCorrelationId() - => TwoDifferentFunctionsCanUseTheSameCorrelationId(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task FunctionCorrelationsCanBeDeleted() - => FunctionCorrelationsCanBeDeleted(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task SingleFunctionCorrelationCanBeDeleted() - => SingleFunctionCorrelationCanBeDeleted(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task SingleFunctionCanHaveMultipleCorrelations() - => SingleFunctionCanHaveMultipleCorrelations(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task FunctionInstancesCanBeFetchedForFunctionTypeAndCorrelation() - => FunctionInstancesCanBeFetchedForFunctionTypeAndCorrelation(FunctionStoreFactory.Create()); -} \ No newline at end of file diff --git a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/RFunctionTests/ControlPanelTests.cs b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/RFunctionTests/ControlPanelTests.cs index 13ca1950..be0eba94 100644 --- a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/RFunctionTests/ControlPanelTests.cs +++ b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/RFunctionTests/ControlPanelTests.cs @@ -137,10 +137,6 @@ public override Task ExistingEffectCanBeRemoved() public override Task EffectsAreOnlyFetchedOnPropertyInvocation() => EffectsAreOnlyFetchedOnPropertyInvocation(FunctionStoreFactory.Create()); - [TestMethod] - public override Task CorrelationsCanBeChanged() - => CorrelationsCanBeChanged(FunctionStoreFactory.Create()); - [TestMethod] public override Task DeleteRemovesFunctionFromAllStores() => DeleteRemovesFunctionFromAllStores(FunctionStoreFactory.Create()); diff --git a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/RFunctionTests/RoutingTests.cs b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/RFunctionTests/RoutingTests.cs index 77454928..03a35bd3 100644 --- a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/RFunctionTests/RoutingTests.cs +++ b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/RFunctionTests/RoutingTests.cs @@ -21,14 +21,6 @@ public override Task MessageIsRoutedToFuncInstance() public override Task MessageIsRoutedUsingRoutingInfo() => MessageIsRoutedUsingRoutingInfo(FunctionStoreFactory.Create()); - [TestMethod] - public override Task MessageIsRoutedToParamlessInstanceUsingCorrelationId() - => MessageIsRoutedToParamlessInstanceUsingCorrelationId(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task MessageIsRoutedToMultipleInstancesUsingCorrelationId() - => MessageIsRoutedToMultipleInstancesUsingCorrelationId(FunctionStoreFactory.Create()); - [TestMethod] public override Task ParamlessInstanceIsStartedByMessage() => ParamlessInstanceIsStartedByMessage(FunctionStoreFactory.Create()); diff --git a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbCorrelationStore.cs b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbCorrelationStore.cs deleted file mode 100644 index 0ec0df6e..00000000 --- a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbCorrelationStore.cs +++ /dev/null @@ -1,193 +0,0 @@ -using Cleipnir.ResilientFunctions.Helpers; -using Cleipnir.ResilientFunctions.Storage; -using MySqlConnector; - -namespace Cleipnir.ResilientFunctions.MariaDb; - -public class MariaDbCorrelationStore : ICorrelationStore -{ - private readonly string _connectionString; - private readonly string _tablePrefix; - - public MariaDbCorrelationStore(string connectionString, string tablePrefix = "") - { - _connectionString = connectionString; - _tablePrefix = tablePrefix; - } - - private string? _initialize; - public async Task Initialize() - { - await using var conn = await CreateConnection(); - _initialize ??= @$" - CREATE TABLE IF NOT EXISTS {_tablePrefix}_correlations ( - type INT NOT NULL, - instance CHAR(32) NOT NULL, - correlation VARCHAR(200) NOT NULL, - PRIMARY KEY (type, instance, correlation), - INDEX (correlation, type, instance) - );"; - var command = new MySqlCommand(_initialize, conn); - await command.ExecuteNonQueryAsync(); - } - - private string? _truncateSql; - public async Task Truncate() - { - await using var conn = await CreateConnection(); - _truncateSql ??= $"TRUNCATE TABLE {_tablePrefix}_correlations"; - var command = new MySqlCommand(_truncateSql, conn); - await command.ExecuteNonQueryAsync(); - } - - private string? _setCorrelationSql; - public async Task SetCorrelation(StoredId storedId, string correlationId) - { - await using var conn = await CreateConnection(); - _setCorrelationSql ??= $@" - INSERT IGNORE INTO {_tablePrefix}_correlations - (type, instance, correlation) - VALUES - (?, ?, ?)"; - - await using var command = new MySqlCommand(_setCorrelationSql, conn) - { - Parameters = - { - new() {Value = storedId.Type.Value.ToInt()}, - new() {Value = storedId.AsGuid.ToString("N")}, - new() {Value = correlationId}, - } - }; - - await command.ExecuteNonQueryAsync(); - } - - private string? _getCorrelationsSql; - public async Task> GetCorrelations(string correlationId) - { - await using var conn = await CreateConnection(); - _getCorrelationsSql ??= @$" - SELECT type, instance - FROM {_tablePrefix}_correlations - WHERE correlation = ?"; - await using var command = new MySqlCommand(_getCorrelationsSql, conn) - { - Parameters = - { - new() { Value = correlationId }, - } - }; - - await using var reader = await command.ExecuteReaderAsync(); - - var states = new List(); - while (await reader.ReadAsync()) - { - var instance = reader.GetString(1).ToGuid(); - states.Add(new StoredId(instance)); - } - - return states; - } - - private string? _getInstancesForFlowTypeAndCorrelation; - public async Task> GetCorrelations(StoredType storedType, string correlationId) - { - await using var conn = await CreateConnection(); - _getInstancesForFlowTypeAndCorrelation ??= @$" - SELECT instance - FROM {_tablePrefix}_correlations - WHERE type = ? AND correlation = ?"; - await using var command = new MySqlCommand(_getInstancesForFlowTypeAndCorrelation, conn) - { - Parameters = - { - new() { Value = storedType.Value }, - new() { Value = correlationId }, - } - }; - - await using var reader = await command.ExecuteReaderAsync(); - - var ids = new List(); - while (await reader.ReadAsync()) - { - var id = reader.GetString(0).ToGuid().ToStoredId(); - ids.Add(id); - } - - return ids; - } - - private string? _getCorrelationsForFunctionSql; - public async Task> GetCorrelations(StoredId storedId) - { - await using var conn = await CreateConnection(); - _getCorrelationsForFunctionSql ??= @$" - SELECT correlation - FROM {_tablePrefix}_correlations - WHERE type = ? AND instance = ?"; - await using var command = new MySqlCommand(_getCorrelationsForFunctionSql, conn) - { - Parameters = - { - new() { Value = storedId.Type.Value.ToInt() }, - new() { Value = storedId.AsGuid.ToString("N") }, - } - }; - - await using var reader = await command.ExecuteReaderAsync(); - - var correlations = new List(); - while (await reader.ReadAsync()) - { - var correlation = reader.GetString(0); - correlations.Add(correlation); - } - - return correlations; - } - - private string? _removeCorrelationsSql; - public async Task RemoveCorrelations(StoredId storedId) - { - await using var conn = await CreateConnection(); - _removeCorrelationsSql ??= $"DELETE FROM {_tablePrefix}_correlations WHERE type = ? AND instance = ?"; - await using var command = new MySqlCommand(_removeCorrelationsSql, conn) - { - Parameters = - { - new() { Value = storedId.Type.Value.ToInt() }, - new() { Value = storedId.AsGuid.ToString("N") }, - } - }; - - await command.ExecuteNonQueryAsync(); - } - - private string? _removeCorrelationSql; - public async Task RemoveCorrelation(StoredId storedId, string correlationId) - { - await using var conn = await CreateConnection(); - _removeCorrelationSql ??= $"DELETE FROM {_tablePrefix}_correlations WHERE type = ? AND instance = ? AND correlation = ?"; - await using var command = new MySqlCommand(_removeCorrelationSql, conn) - { - Parameters = - { - new() { Value = storedId.Type.Value.ToInt() }, - new() { Value = storedId.AsGuid.ToString("N") }, - new() { Value = correlationId }, - } - }; - - await command.ExecuteNonQueryAsync(); - } - - private async Task CreateConnection() - { - var conn = new MySqlConnection(_connectionString); - await conn.OpenAsync(); - return conn; - } -} \ No newline at end of file diff --git a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbFunctionStore.cs b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbFunctionStore.cs index d9ae6fa4..946d3903 100644 --- a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbFunctionStore.cs +++ b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbFunctionStore.cs @@ -25,9 +25,6 @@ public class MariaDbFunctionStore : IFunctionStore private readonly MariaDbTypeStore _typeStore; public ITypeStore TypeStore => _typeStore; - - private readonly MariaDbCorrelationStore _correlationStore; - public ICorrelationStore CorrelationStore => _correlationStore; private readonly MariaDbReplicaStore _replicaStore; public IReplicaStore ReplicaStore => _replicaStore; @@ -44,7 +41,6 @@ public MariaDbFunctionStore(string connectionString, string tablePrefix = "") _messageStore = new MariaDbMessageStore(connectionString, _sqlGenerator, tablePrefix); _effectsStore = new MariaDbEffectsStore(connectionString, tablePrefix); - _correlationStore = new MariaDbCorrelationStore(connectionString, tablePrefix); _typeStore = new MariaDbTypeStore(connectionString, tablePrefix); _replicaStore = new MariaDbReplicaStore(connectionString, tablePrefix); } @@ -57,7 +53,6 @@ public async Task Initialize() await MessageStore.Initialize(); await EffectsStore.Initialize(); - await CorrelationStore.Initialize(); await _typeStore.Initialize(); await _replicaStore.Initialize(); await using var conn = await CreateOpenConnection(_connectionString); @@ -88,7 +83,6 @@ public async Task TruncateTables() { await _messageStore.TruncateTable(); await _effectsStore.Truncate(); - await _correlationStore.Truncate(); await _typeStore.Truncate(); await _replicaStore.Truncate(); @@ -884,7 +878,6 @@ public async Task DeleteFunction(StoredId storedId) { await _messageStore.Truncate(storedId); await _effectsStore.Remove(storedId); - await _correlationStore.RemoveCorrelations(storedId); return await DeleteStoredFunction(storedId); } diff --git a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/CorrelationStoreTests.cs b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/CorrelationStoreTests.cs deleted file mode 100644 index 1742b716..00000000 --- a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/CorrelationStoreTests.cs +++ /dev/null @@ -1,32 +0,0 @@ -using System.Threading.Tasks; -using Microsoft.VisualStudio.TestTools.UnitTesting; - -namespace Cleipnir.ResilientFunctions.PostgreSQL.Tests; - -[TestClass] -public class CorrelationStoreTests : Cleipnir.ResilientFunctions.Tests.TestTemplates.CorrelationStoreTests -{ - [TestMethod] - public override Task SunshineScenario() - => SunshineScenario(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task TwoDifferentFunctionsCanUseTheSameCorrelationId() - => TwoDifferentFunctionsCanUseTheSameCorrelationId(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task FunctionCorrelationsCanBeDeleted() - => FunctionCorrelationsCanBeDeleted(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task SingleFunctionCorrelationCanBeDeleted() - => SingleFunctionCorrelationCanBeDeleted(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task SingleFunctionCanHaveMultipleCorrelations() - => SingleFunctionCanHaveMultipleCorrelations(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task FunctionInstancesCanBeFetchedForFunctionTypeAndCorrelation() - => FunctionInstancesCanBeFetchedForFunctionTypeAndCorrelation(FunctionStoreFactory.Create()); -} \ No newline at end of file diff --git a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/RFunctionTests/ControlPanelTests.cs b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/RFunctionTests/ControlPanelTests.cs index 6d463363..163fa6d7 100644 --- a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/RFunctionTests/ControlPanelTests.cs +++ b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/RFunctionTests/ControlPanelTests.cs @@ -137,11 +137,7 @@ public override Task ExistingEffectCanBeReplaced() [TestMethod] public override Task EffectCanBeStarted() => EffectCanBeStarted(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task CorrelationsCanBeChanged() - => CorrelationsCanBeChanged(FunctionStoreFactory.Create()); - + [TestMethod] public override Task DeleteRemovesFunctionFromAllStores() => DeleteRemovesFunctionFromAllStores(FunctionStoreFactory.Create()); diff --git a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/RFunctionTests/RoutingTests.cs b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/RFunctionTests/RoutingTests.cs index ebd13079..1b123186 100644 --- a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/RFunctionTests/RoutingTests.cs +++ b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/RFunctionTests/RoutingTests.cs @@ -22,14 +22,6 @@ public override Task MessageIsRoutedToFuncInstance() public override Task MessageIsRoutedUsingRoutingInfo() => MessageIsRoutedUsingRoutingInfo(FunctionStoreFactory.Create()); - [TestMethod] - public override Task MessageIsRoutedToParamlessInstanceUsingCorrelationId() - => MessageIsRoutedToParamlessInstanceUsingCorrelationId(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task MessageIsRoutedToMultipleInstancesUsingCorrelationId() - => MessageIsRoutedToMultipleInstancesUsingCorrelationId(FunctionStoreFactory.Create()); - [TestMethod] public override Task ParamlessInstanceIsStartedByMessage() => ParamlessInstanceIsStartedByMessage(FunctionStoreFactory.Create()); diff --git a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlCorrelationStore.cs b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlCorrelationStore.cs deleted file mode 100644 index 3d84cf63..00000000 --- a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlCorrelationStore.cs +++ /dev/null @@ -1,196 +0,0 @@ -using System.Collections.Generic; -using System.Threading.Tasks; -using Cleipnir.ResilientFunctions.Domain; -using Cleipnir.ResilientFunctions.Helpers; -using Cleipnir.ResilientFunctions.Storage; -using Npgsql; - -namespace Cleipnir.ResilientFunctions.PostgreSQL; - -public class PostgreSqlCorrelationStore(string connectionString, string tablePrefix = "") : ICorrelationStore -{ - private string? _initializeSql; - public async Task Initialize() - { - await using var conn = await CreateConnection(); - _initializeSql ??= @$" - CREATE TABLE IF NOT EXISTS {tablePrefix}_correlations ( - type INT, - instance UUID, - correlation VARCHAR(255) NOT NULL, - PRIMARY KEY(type, instance, correlation) - ); - - CREATE UNIQUE INDEX IF NOT EXISTS idx_{tablePrefix}_correlations - ON {tablePrefix}_correlations(correlation, type, instance);"; - var command = new NpgsqlCommand(_initializeSql, conn); - await command.ExecuteNonQueryAsync(); - } - - private string? _truncateSql; - public async Task Truncate() - { - await using var conn = await CreateConnection(); - _truncateSql ??= $"TRUNCATE TABLE {tablePrefix}_correlations"; - var command = new NpgsqlCommand(_truncateSql, conn); - await command.ExecuteNonQueryAsync(); - } - - private string? _setCorrelationSql; - public async Task SetCorrelation(StoredId storedId, string correlationId) - { - await using var conn = await CreateConnection(); - _setCorrelationSql ??= $@" - INSERT INTO {tablePrefix}_correlations - (type, instance, correlation) - VALUES - ($1, $2, $3) - ON CONFLICT (type, instance, correlation) DO NOTHING"; - - await using var command = new NpgsqlCommand(_setCorrelationSql, conn) - { - Parameters = - { - new() {Value = storedId.Type.Value.ToInt()}, - new() {Value = storedId.AsGuid}, - new() {Value = correlationId} - } - }; - - await command.ExecuteNonQueryAsync(); - } - - private string? _getCorrelationsSql; - public async Task> GetCorrelations(string correlationId) - { - await using var conn = await CreateConnection(); - _getCorrelationsSql ??= @$" - SELECT type, instance - FROM {tablePrefix}_correlations - WHERE correlation = $1"; - await using var command = new NpgsqlCommand(_getCorrelationsSql, conn) - { - Parameters = - { - new() { Value = correlationId } - } - }; - - await using var reader = await command.ExecuteReaderAsync(); - - var ids = new List(); - while (await reader.ReadAsync()) - { - var id = new StoredId(reader.GetGuid(1)); - ids.Add(id); - } - - return ids; - } - - private string? _getInstancesForFlowTypeAndCorrelation; - public async Task> GetCorrelations(StoredType flowType, string correlationId) - { - await using var conn = await CreateConnection(); - _getInstancesForFlowTypeAndCorrelation ??= @$" - SELECT instance - FROM {tablePrefix}_correlations - WHERE type = $1 AND correlation = $2"; - await using var command = new NpgsqlCommand(_getInstancesForFlowTypeAndCorrelation, conn) - { - Parameters = - { - new() { Value = flowType.Value.ToInt() }, - new() { Value = correlationId } - } - }; - - await using var reader = await command.ExecuteReaderAsync(); - - var ids = new List(); - while (await reader.ReadAsync()) - { - var id = reader.GetGuid(0).ToStoredId(); - ids.Add(id); - } - - return ids; - } - - private string? _getCorrelationsForFunction; - public async Task> GetCorrelations(StoredId storedId) - { - await using var conn = await CreateConnection(); - _getCorrelationsForFunction ??= @$" - SELECT correlation - FROM {tablePrefix}_correlations - WHERE type = $1 AND instance = $2"; - await using var command = new NpgsqlCommand(_getCorrelationsForFunction, conn) - { - Parameters = - { - new() { Value = storedId.Type.Value.ToInt() }, - new() { Value = storedId.AsGuid } - } - }; - - await using var reader = await command.ExecuteReaderAsync(); - - var correlations = new List(); - while (await reader.ReadAsync()) - { - var correlation = reader.GetString(0); - correlations.Add(correlation); - } - - return correlations; - } - - private string? _removeCorrelationsSql; - public async Task RemoveCorrelations(StoredId storedId) - { - await using var conn = await CreateConnection(); - _removeCorrelationsSql ??= $@" - DELETE FROM {tablePrefix}_correlations - WHERE type = $1 AND instance = $2"; - - await using var command = new NpgsqlCommand(_removeCorrelationsSql, conn) - { - Parameters = - { - new() {Value = storedId.Type.Value.ToInt()}, - new() {Value = storedId.AsGuid}, - } - }; - - await command.ExecuteNonQueryAsync(); - } - - private string? _removeCorrelationSql; - public async Task RemoveCorrelation(StoredId storedId, string correlationId) - { - await using var conn = await CreateConnection(); - _removeCorrelationSql ??= $@" - DELETE FROM {tablePrefix}_correlations - WHERE type = $1 AND instance = $2 AND correlation = $3"; - - await using var command = new NpgsqlCommand(_removeCorrelationSql, conn) - { - Parameters = - { - new() {Value = storedId.Type.Value.ToInt()}, - new() {Value = storedId.AsGuid}, - new() {Value = correlationId}, - } - }; - - await command.ExecuteNonQueryAsync(); - } - - private async Task CreateConnection() - { - var conn = new NpgsqlConnection(connectionString); - await conn.OpenAsync(); - return conn; - } -} \ No newline at end of file diff --git a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlFunctionStore.cs b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlFunctionStore.cs index 339f1a33..b14205de 100644 --- a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlFunctionStore.cs +++ b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlFunctionStore.cs @@ -25,9 +25,6 @@ public class PostgreSqlFunctionStore : IFunctionStore private readonly PostgreSqlEffectsStore _effectsStore; public IEffectsStore EffectsStore => _effectsStore; - - private readonly ICorrelationStore _correlationStore; - public ICorrelationStore CorrelationStore => _correlationStore; private readonly PostgreSqlDbReplicaStore _replicaStore; public IReplicaStore ReplicaStore => _replicaStore; @@ -42,7 +39,6 @@ public PostgreSqlFunctionStore(string connectionString, string tablePrefix = "") _messageStore = new PostgreSqlMessageStore(connectionString, _sqlGenerator, _tableName); _effectsStore = new PostgreSqlEffectsStore(connectionString, _tableName); - _correlationStore = new PostgreSqlCorrelationStore(connectionString, _tableName); _typeStore = new PostgreSqlTypeStore(connectionString, _tableName); _replicaStore = new PostgreSqlDbReplicaStore(connectionString, _tableName); } @@ -62,7 +58,6 @@ public async Task Initialize() await _messageStore.Initialize(); await _effectsStore.Initialize(); - await _correlationStore.Initialize(); await _typeStore.Initialize(); await _replicaStore.Initialize(); await using var conn = await CreateConnection(); @@ -98,7 +93,6 @@ public async Task TruncateTables() { await _messageStore.TruncateTable(); await _effectsStore.Truncate(); - await _correlationStore.Truncate(); await _typeStore.Truncate(); await _replicaStore.Truncate(); @@ -799,7 +793,6 @@ public async Task DeleteFunction(StoredId storedId) { await _messageStore.Truncate(storedId); await _effectsStore.Remove(storedId); - await _correlationStore.RemoveCorrelations(storedId); return await DeleteStoredFunction(storedId); } diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/CorrelationStoreTests.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/CorrelationStoreTests.cs deleted file mode 100644 index 15dcf29c..00000000 --- a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/CorrelationStoreTests.cs +++ /dev/null @@ -1,32 +0,0 @@ -using System.Threading.Tasks; -using Microsoft.VisualStudio.TestTools.UnitTesting; - -namespace Cleipnir.ResilientFunctions.SqlServer.Tests; - -[TestClass] -public class CorrelationStoreTests : Cleipnir.ResilientFunctions.Tests.TestTemplates.CorrelationStoreTests -{ - [TestMethod] - public override Task SunshineScenario() - => SunshineScenario(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task TwoDifferentFunctionsCanUseTheSameCorrelationId() - => TwoDifferentFunctionsCanUseTheSameCorrelationId(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task FunctionCorrelationsCanBeDeleted() - => FunctionCorrelationsCanBeDeleted(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task SingleFunctionCorrelationCanBeDeleted() - => SingleFunctionCorrelationCanBeDeleted(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task SingleFunctionCanHaveMultipleCorrelations() - => SingleFunctionCanHaveMultipleCorrelations(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task FunctionInstancesCanBeFetchedForFunctionTypeAndCorrelation() - => FunctionInstancesCanBeFetchedForFunctionTypeAndCorrelation(FunctionStoreFactory.Create()); -} \ No newline at end of file diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/RFunctionTests/ControlPanelTests.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/RFunctionTests/ControlPanelTests.cs index e81675c7..1c2d4c92 100644 --- a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/RFunctionTests/ControlPanelTests.cs +++ b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/RFunctionTests/ControlPanelTests.cs @@ -138,10 +138,6 @@ public override Task ExistingEffectCanBeReplaced() public override Task EffectCanBeStarted() => EffectCanBeStarted(FunctionStoreFactory.Create()); - [TestMethod] - public override Task CorrelationsCanBeChanged() - => CorrelationsCanBeChanged(FunctionStoreFactory.Create()); - [TestMethod] public override Task DeleteRemovesFunctionFromAllStores() => DeleteRemovesFunctionFromAllStores(FunctionStoreFactory.Create()); diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/RFunctionTests/RoutingTests.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/RFunctionTests/RoutingTests.cs index 1feae36b..386a9121 100644 --- a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/RFunctionTests/RoutingTests.cs +++ b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/RFunctionTests/RoutingTests.cs @@ -22,15 +22,6 @@ public override Task MessageIsRoutedToFuncInstance() public override Task MessageIsRoutedUsingRoutingInfo() => MessageIsRoutedUsingRoutingInfo(FunctionStoreFactory.Create()); - [TestMethod] - public override Task MessageIsRoutedToParamlessInstanceUsingCorrelationId() - => MessageIsRoutedToParamlessInstanceUsingCorrelationId(FunctionStoreFactory.Create()); - - - [TestMethod] - public override Task MessageIsRoutedToMultipleInstancesUsingCorrelationId() - => MessageIsRoutedToMultipleInstancesUsingCorrelationId(FunctionStoreFactory.Create()); - [TestMethod] public override Task ParamlessInstanceIsStartedByMessage() => ParamlessInstanceIsStartedByMessage(FunctionStoreFactory.Create()); diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerCorrelationsStore.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerCorrelationsStore.cs deleted file mode 100644 index 7cd299d2..00000000 --- a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerCorrelationsStore.cs +++ /dev/null @@ -1,179 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Threading.Tasks; -using Cleipnir.ResilientFunctions.Domain; -using Cleipnir.ResilientFunctions.Helpers; -using Cleipnir.ResilientFunctions.Storage; -using Microsoft.Data.SqlClient; - -namespace Cleipnir.ResilientFunctions.SqlServer; - -public class SqlServerCorrelationsStore(string connectionString, string tablePrefix = "") : ICorrelationStore -{ - private readonly Func> _connFunc = CreateConnection(connectionString); - - private string? _initializeSql; - public async Task Initialize() - { - await using var conn = await _connFunc(); - _initializeSql ??= @$" - CREATE TABLE {tablePrefix}_Correlations ( - Type INT, - Instance UNIQUEIDENTIFIER, - Correlation NVARCHAR(200), - PRIMARY KEY (Type, Instance, Correlation) - ); - - CREATE INDEX IDX_{tablePrefix}_Correlations ON {tablePrefix}_Correlations (Correlation, Type, Instance); - "; - - await using var command = new SqlCommand(_initializeSql, conn); - try - { - await command.ExecuteNonQueryAsync(); - } catch (SqlException exception) when (exception.Number == 2714) {} - } - - private string? _truncateSql; - public async Task Truncate() - { - await using var conn = await _connFunc(); - _truncateSql ??= $"TRUNCATE TABLE {tablePrefix}_Correlations"; - await using var command = new SqlCommand(_truncateSql, conn); - await command.ExecuteNonQueryAsync(); - } - - private string? _setCorrelationSql; - public async Task SetCorrelation(StoredId storedId, string correlationId) - { - await using var conn = await _connFunc(); - _setCorrelationSql ??= $@" - MERGE INTO {tablePrefix}_Correlations - USING (VALUES (@Type, @Instance, @Correlation)) - AS source (Type, Instance, Correlation) - ON {tablePrefix}_Correlations.Type = source.Type AND - {tablePrefix}_Correlations.Instance = source.Instance AND - {tablePrefix}_Correlations.Correlation = source.Correlation - WHEN NOT MATCHED THEN - INSERT (Type, Instance, Correlation) - VALUES (source.Type, source.Instance, source.Correlation);"; - await using var command = new SqlCommand(_setCorrelationSql, conn); - command.Parameters.AddWithValue("@Type", storedId.Type.Value.ToInt()); - command.Parameters.AddWithValue("@Instance", storedId.AsGuid); - command.Parameters.AddWithValue("@Correlation", correlationId); - - await command.ExecuteNonQueryAsync(); - } - - private string? _getCorrelations; - public async Task> GetCorrelations(string correlationId) - { - await using var conn = await _connFunc(); - _getCorrelations ??= @$" - SELECT Type, Instance - FROM {tablePrefix}_Correlations - WHERE Correlation = @CorrelationId"; - - await using var command = new SqlCommand(_getCorrelations, conn); - command.Parameters.AddWithValue("@CorrelationId", correlationId); - - var functions = new List(); - await using var reader = await command.ExecuteReaderAsync(); - while (reader.HasRows && reader.Read()) - { - var id = reader.GetGuid(1); - functions.Add(new StoredId(id)); - } - - return functions; - } - - private string? _getInstancesForFunctionTypeAndCorrelationId; - public async Task> GetCorrelations(StoredType storedType, string correlationId) - { - await using var conn = await _connFunc(); - _getInstancesForFunctionTypeAndCorrelationId ??= @$" - SELECT Instance - FROM {tablePrefix}_Correlations - WHERE Type = @Type AND Correlation = @Correlation"; - - await using var command = new SqlCommand(_getInstancesForFunctionTypeAndCorrelationId, conn); - command.Parameters.AddWithValue("@Type", storedType.Value.ToInt()); - command.Parameters.AddWithValue("@Correlation", correlationId); - - var ids = new List(); - await using var reader = await command.ExecuteReaderAsync(); - while (reader.HasRows && reader.Read()) - { - var id = reader.GetGuid(0).ToStoredId(); - ids.Add(id); - } - - return ids; - } - - private string? _getCorrelationsForFunction; - public async Task> GetCorrelations(StoredId storedId) - { - await using var conn = await _connFunc(); - _getCorrelationsForFunction ??= @$" - SELECT Correlation - FROM {tablePrefix}_Correlations - WHERE Type = @Type AND Instance = @Instance"; - - await using var command = new SqlCommand(_getCorrelationsForFunction, conn); - command.Parameters.AddWithValue("@Type", storedId.Type.Value.ToInt()); - command.Parameters.AddWithValue("@Instance", storedId.AsGuid); - - var correlations = new List(); - await using var reader = await command.ExecuteReaderAsync(); - while (reader.HasRows && reader.Read()) - { - var correlation = reader.GetString(0); - correlations.Add(correlation); - } - - return correlations; - } - - private string? _removeCorrelationsSql; - public async Task RemoveCorrelations(StoredId storedId) - { - await using var conn = await _connFunc(); - _removeCorrelationsSql ??= @$" - DELETE FROM {tablePrefix}_Correlations - WHERE Type = @Type AND Instance = @Instance"; - - await using var command = new SqlCommand(_removeCorrelationsSql, conn); - command.Parameters.AddWithValue("@Type", storedId.Type.Value.ToInt()); - command.Parameters.AddWithValue("@Instance", storedId.AsGuid); - - await command.ExecuteNonQueryAsync(); - } - - private string? _removeCorrelationSql; - public async Task RemoveCorrelation(StoredId storedId, string correlationId) - { - await using var conn = await _connFunc(); - _removeCorrelationSql ??= @$" - DELETE FROM {tablePrefix}_Correlations - WHERE Type = @Type AND Instance = @Instance AND Correlation = @Correlation"; - - await using var command = new SqlCommand(_removeCorrelationSql, conn); - command.Parameters.AddWithValue("@Type", storedId.Type.Value.ToInt()); - command.Parameters.AddWithValue("@Instance", storedId.AsGuid); - command.Parameters.AddWithValue("@Correlation", correlationId); - - await command.ExecuteNonQueryAsync(); - } - - private static Func> CreateConnection(string connectionString) - { - return async () => - { - var connection = new SqlConnection(connectionString); - await connection.OpenAsync(); - return connection; - }; - } -} \ No newline at end of file diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerFunctionStore.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerFunctionStore.cs index a285fa50..25414058 100644 --- a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerFunctionStore.cs +++ b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerFunctionStore.cs @@ -24,11 +24,9 @@ public class SqlServerFunctionStore : IFunctionStore private readonly SqlServerEffectsStore _effectsStore; private readonly SqlServerMessageStore _messageStore; - private readonly SqlServerCorrelationsStore _correlationStore; private readonly SqlServerTypeStore _typeStore; - + public IEffectsStore EffectsStore => _effectsStore; - public ICorrelationStore CorrelationStore => _correlationStore; public ITypeStore TypeStore => _typeStore; public IMessageStore MessageStore => _messageStore; private readonly SqlServerReplicaStore _replicaStore; @@ -45,7 +43,6 @@ public SqlServerFunctionStore(string connectionString, string tablePrefix = "") _connFunc = CreateConnection(connectionString); _messageStore = new SqlServerMessageStore(connectionString, _sqlGenerator, _tableName); _effectsStore = new SqlServerEffectsStore(connectionString, _tableName); - _correlationStore = new SqlServerCorrelationsStore(connectionString, _tableName); _typeStore = new SqlServerTypeStore(connectionString, _tableName); _replicaStore = new SqlServerReplicaStore(connectionString, _tableName); } @@ -68,7 +65,6 @@ public async Task Initialize() await _messageStore.Initialize(); await _effectsStore.Initialize(); - await _correlationStore.Initialize(); await _typeStore.Initialize(); await _replicaStore.Initialize(); await using var conn = await _connFunc(); @@ -117,7 +113,6 @@ public async Task TruncateTables() { await _messageStore.TruncateTable(); await _effectsStore.Truncate(); - await _correlationStore.Truncate(); await _typeStore.Truncate(); await _replicaStore.Truncate(); @@ -925,7 +920,6 @@ public async Task DeleteFunction(StoredId storedId) { await _messageStore.Truncate(storedId); await _effectsStore.Remove(storedId); - await _correlationStore.RemoveCorrelations(storedId); return await DeleteStoredFunction(storedId); }