Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,6 @@ public override Task ExistingEffectCanBeSetToFailed()
public override Task SaveChangesPersistsChangedResult()
=> SaveChangesPersistsChangedResult(Utils.CreateInMemoryFunctionStoreTask());

[TestMethod]
public override Task CorrelationsCanBeChanged()
=> CorrelationsCanBeChanged(FunctionStoreFactory.Create());

[TestMethod]
public override Task DeleteRemovesFunctionFromAllStores()
=> DeleteRemovesFunctionFromAllStores(FunctionStoreFactory.Create());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,6 @@ public override Task MessageIsRoutedToFuncInstance()
public override Task MessageIsRoutedUsingRoutingInfo()
=> MessageIsRoutedUsingRoutingInfo(FunctionStoreFactory.Create());

[TestMethod]
public override Task MessageIsRoutedToParamlessInstanceUsingCorrelationId()
=> MessageIsRoutedToParamlessInstanceUsingCorrelationId(FunctionStoreFactory.Create());

[TestMethod]
public override Task MessageIsRoutedToMultipleInstancesUsingCorrelationId()
=> MessageIsRoutedToMultipleInstancesUsingCorrelationId(FunctionStoreFactory.Create());

[TestMethod]
public override Task ParamlessInstanceIsStartedByMessage()
=> ParamlessInstanceIsStartedByMessage(FunctionStoreFactory.Create());
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -1225,52 +1225,16 @@ protected async Task SaveChangesPersistsChangedResult(Task<IFunctionStore> store
unhandledExceptionCatcher.ShouldNotHaveExceptions();
}

public abstract Task CorrelationsCanBeChanged();
protected async Task CorrelationsCanBeChanged(Task<IFunctionStore> storeTask)
{
var unhandledExceptionCatcher = new UnhandledExceptionCatcher();

var store = await storeTask;
var functionId = TestFlowId.Create();
var (flowType, flowInstance) = functionId;
using var functionsRegistry = new FunctionsRegistry(store, new Settings(unhandledExceptionCatcher.Catch));

var registration = functionsRegistry.RegisterParamless(
flowType,
async workflow =>
{
await workflow.Correlations.Register("SomeCorrelation");

}
);

await registration.Run(flowInstance.Value);

var controlPanel = await registration.ControlPanel(flowInstance.Value);
controlPanel.ShouldNotBeNull();

await controlPanel.Correlations.Contains("SomeCorrelation").ShouldBeTrueAsync();
await controlPanel.Correlations.Remove("SomeCorrelation");
await controlPanel.Correlations.Register("SomeNewCorrelation");

controlPanel = await registration.ControlPanel(flowInstance.Value);
controlPanel.ShouldNotBeNull();
await controlPanel.Correlations.Contains("SomeCorrelation").ShouldBeFalseAsync();
await controlPanel.Correlations.Contains("SomeNewCorrelation").ShouldBeTrueAsync();

unhandledExceptionCatcher.ShouldNotHaveExceptions();
}

public abstract Task DeleteRemovesFunctionFromAllStores();
protected async Task DeleteRemovesFunctionFromAllStores(Task<IFunctionStore> storeTask)
{
var unhandledExceptionCatcher = new UnhandledExceptionCatcher();

var store = await storeTask;
var functionId = TestFlowId.Create();
var (flowType, flowInstance) = functionId;
using var functionsRegistry = new FunctionsRegistry(store, new Settings(unhandledExceptionCatcher.Catch));

var registration = functionsRegistry.RegisterParamless(
flowType,
inner: () => Task.CompletedTask
Expand All @@ -1281,28 +1245,23 @@ protected async Task DeleteRemovesFunctionFromAllStores(Task<IFunctionStore> sto
var controlPanel = await registration.ControlPanel(flowInstance.Value);
controlPanel.ShouldNotBeNull();

await controlPanel.Correlations.Register("SomeCorrelation");
await controlPanel.Effects.SetSucceeded("SomeEffect".GetHashCode());
await controlPanel.Messages.Append("Some Message");

await controlPanel.Delete();

var storedId = registration.MapToStoredId(functionId.Instance);
await store.GetFunction(storedId).ShouldBeNullAsync();

await store.MessageStore.GetMessages(storedId)
.SelectAsync(msgs => msgs.Count == 0)
.ShouldBeTrueAsync();

await store.CorrelationStore.GetCorrelations(storedId)
.SelectAsync(c => c.Any())
.ShouldBeFalseAsync();

await store.EffectsStore
.GetEffectResults(storedId)
.SelectAsync(e => e.Any())
.ShouldBeFalseAsync();

unhandledExceptionCatcher.ShouldNotHaveExceptions();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,116 +167,6 @@ await registration.SendMessage(
unhandledExceptionCatcher.ShouldNotHaveExceptions();
}

#region Route using Correlation

public abstract Task MessageIsRoutedToParamlessInstanceUsingCorrelationId();
protected async Task MessageIsRoutedToParamlessInstanceUsingCorrelationId(Task<IFunctionStore> storeTask)
{
var store = await storeTask;
var functionId = TestFlowId.Create();
var (flowType, flowInstance) = functionId;

var unhandledExceptionCatcher = new UnhandledExceptionCatcher();
using var functionsRegistry = new FunctionsRegistry(
store,
new Settings(unhandledExceptionCatcher.Catch)
);

var correlationId = $"SomeCorrelationId_{Guid.NewGuid().ToString()}";

var correlationIdRegisteredFlag = new SyncedFlag();
var syncedFlag = new SyncedFlag();
var syncedValue = new Synced<string>();

var registration = functionsRegistry.RegisterParamless(
flowType,
inner: async workflow =>
{
await workflow.RegisterCorrelation(correlationId);
correlationIdRegisteredFlag.Raise();

var someMessage = await workflow.Message<SomeCorrelatedMessage>();
syncedValue.Value = someMessage.Value;
syncedFlag.Raise();
}
);

await registration.Schedule(flowInstance);
await correlationIdRegisteredFlag.WaitForRaised();

await registration.RouteMessage(
new SomeCorrelatedMessage(correlationId, "SomeValue!"),
correlationId
);

await syncedFlag.WaitForRaised();
syncedValue.Value.ShouldBe("SomeValue!");

unhandledExceptionCatcher.ShouldNotHaveExceptions();
}

public abstract Task MessageIsRoutedToMultipleInstancesUsingCorrelationId();
protected async Task MessageIsRoutedToMultipleInstancesUsingCorrelationId(Task<IFunctionStore> storeTask)
{
var store = await storeTask;
var functionId = TestFlowId.Create();
var (flowType, flowInstance1) = functionId;
var (_, flowInstance2) = TestFlowId.Create().WithTypeId(flowType);

var unhandledExceptionCatcher = new UnhandledExceptionCatcher();
using var functionsRegistry = new FunctionsRegistry(
store,
new Settings(unhandledExceptionCatcher.Catch)
);

var correlationId = $"SomeCorrelationId_{Guid.NewGuid().ToString()}";

var registration = functionsRegistry.RegisterParamless(
flowType,
inner: async workflow =>
{
await workflow.RegisterCorrelation(correlationId);
await workflow.Message<SomeCorrelatedMessage>();
}
);
var storedType = registration.StoredType;

await registration.Schedule(flowInstance1);
await registration.Schedule(flowInstance2);

await BusyWait.Until(() => store
.CorrelationStore
.GetCorrelations(storedType, correlationId)
.SelectAsync(l => l.Count == 2)
);

await registration.RouteMessage(
new SomeCorrelatedMessage(correlationId, "SomeValue!"),
correlationId
);

var controlPanel1 = await registration.ControlPanel(flowInstance1);
controlPanel1.ShouldNotBeNull();
await BusyWait.Until(async () =>
{
await controlPanel1.Refresh();
return controlPanel1.Status == Status.Succeeded;
});


var controlPanel2 = await registration.ControlPanel(flowInstance2);
controlPanel2.ShouldNotBeNull();
await BusyWait.Until(async () =>
{
await controlPanel2.Refresh();
return controlPanel2.Status == Status.Succeeded;
});

unhandledExceptionCatcher.ShouldNotHaveExceptions();
}

#endregion

#region Paramless is started by message

public abstract Task ParamlessInstanceIsStartedByMessage();
Expand Down Expand Up @@ -319,5 +209,4 @@ await registration.SendMessage(
#endregion

public record SomeMessage(string RouteTo, string Value);
public record SomeCorrelatedMessage(string Correlation, string Value);
}
Loading