Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,16 @@ public IReadOnlyList<ActivityRecord> Snapshot()
}

public ActivityGateState GetGateState()
=> new(Name, IsBlocked, ActiveCount, Snapshot());
{
lock (_gate)
{
var activities = _activities.Values
.OrderBy(static activity => activity.StartedAt)
.ThenBy(static activity => activity.Id, StringComparer.Ordinal)
.ToArray();
return new(Name, activities.Length > 0, activities.Length, activities);
}
}

public static Builder Create(string name = "activity-tracker") => new(name);

Expand Down
59 changes: 47 additions & 12 deletions src/PatternKit.Core/Messaging/Channels/MessageChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,28 +55,53 @@ public MessageChannelReceiveResult<TPayload> TryReceive()
}

public IReadOnlyList<Message<TPayload>> Drain(Func<Message<TPayload>, bool>? predicate = null)
=> DrainWithState(predicate).Removed;

internal MessageChannelDrainResult<TPayload> DrainWithState(Func<Message<TPayload>, bool>? predicate = null)
{
var removed = new List<Message<TPayload>>();
Message<TPayload>[] snapshot;

lock (_gate)
{
var messages = _messages.ToArray();
var retained = new List<Message<TPayload>>(messages.Length);
snapshot = _messages.ToArray();

var removed = predicate is null
? snapshot
: snapshot.Where(predicate).ToArray();
Comment thread
JerrettDavis marked this conversation as resolved.
if (removed.Length == 0)
return new MessageChannelDrainResult<TPayload>([], Count);

var removedCounts = removed
.GroupBy(static message => message)
.ToDictionary(static group => group.Key, static group => group.Count());
var actualRemoved = new List<Message<TPayload>>(removed.Length);
var retained = new List<Message<TPayload>>();

foreach (var message in messages)
lock (_gate)
{
while (_messages.Count > 0)
{
if (predicate is null || predicate(message))
removed.Add(message);
var message = _messages.Dequeue();
if (removedCounts.TryGetValue(message, out var remainingRemovals))
{
actualRemoved.Add(message);
if (remainingRemovals == 1)
removedCounts.Remove(message);
else
removedCounts[message] = remainingRemovals - 1;
}
else
{
retained.Add(message);
}
}

var remainingCount = retained.Count;
_messages.Clear();
foreach (var message in retained)
_messages.Enqueue(message);
}

return removed;
return new MessageChannelDrainResult<TPayload>(actualRemoved, remainingCount);
}
}

public IReadOnlyList<Message<TPayload>> Snapshot()
Expand Down Expand Up @@ -300,11 +325,11 @@ private ChannelPurger(

public ChannelPurgeResult<TPayload> Purge()
{
var purged = _channel.Drain(_predicate);
foreach (var message in purged)
var result = _channel.DrainWithState(_predicate);
foreach (var message in result.Removed)
_audit?.Invoke(new(Name, _channel.Name, message));

return new(Name, _channel.Name, purged.Count, _channel.Count, purged);
return new(Name, _channel.Name, result.Removed.Count, result.RemainingCount, result.Removed);
}

public static Builder Create(string name = "channel-purger") => new(name);
Expand Down Expand Up @@ -385,6 +410,16 @@ public ChannelPurgeResult(
public IReadOnlyList<Message<TPayload>> PurgedMessages { get; }
}

internal sealed class MessageChannelDrainResult<TPayload>
{
public MessageChannelDrainResult(IReadOnlyList<Message<TPayload>> removed, int remainingCount)
=> (Removed, RemainingCount) = (removed, remainingCount);

public IReadOnlyList<Message<TPayload>> Removed { get; }

public int RemainingCount { get; }
}

public sealed class InvalidMessageChannel<TPayload>
{
private readonly MessageChannel<InvalidMessage<TPayload>> _invalidChannel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public DurableSubscriberResult<TPayload> CatchUp(MessageStoreQuery? query = null
}
}

if (messageFailed && _errorPolicy == DurableSubscriberErrorPolicy.StopOnFirstFailure)
if (messageFailed)
break;

if (!messageFailed)
Expand Down
14 changes: 8 additions & 6 deletions src/PatternKit.Core/Messaging/Routing/DynamicRouter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ private DynamicRouter(RouteEntry[] routes, RouteHandler? @default)
=> (_routes, _default) = (routes, @default);

/// <summary>Current ordered route names.</summary>
public IReadOnlyList<string> RouteNames => _routes.Select(static route => route.Name).ToArray();
public IReadOnlyList<string> RouteNames => Volatile.Read(ref _routes).Select(static route => route.Name).ToArray();

/// <summary>Registers or replaces a route in the runtime route table.</summary>
public DynamicRouter<TPayload, TResult> Register(string name, int order, RoutePredicate predicate, RouteHandler handler)
Expand All @@ -38,12 +38,13 @@ public DynamicRouter<TPayload, TResult> Register(string name, int order, RoutePr
var entry = new RouteEntry(name, order, predicate, handler);
lock (_gate)
{
_routes = _routes
var next = _routes
.Where(route => !string.Equals(route.Name, name, StringComparison.Ordinal))
.Append(entry)
.OrderBy(static route => route.Order)
.ThenBy(static route => route.Name, StringComparer.Ordinal)
.ToArray();
Volatile.Write(ref _routes, next);
}

return this;
Expand All @@ -57,11 +58,12 @@ public bool Unregister(string name)

lock (_gate)
{
var next = _routes.Where(route => !string.Equals(route.Name, name, StringComparison.Ordinal)).ToArray();
if (next.Length == _routes.Length)
var snapshot = Volatile.Read(ref _routes);
var next = snapshot.Where(route => !string.Equals(route.Name, name, StringComparison.Ordinal)).ToArray();
if (next.Length == snapshot.Length)
return false;

_routes = next;
Volatile.Write(ref _routes, next);
return true;
}
}
Expand All @@ -73,7 +75,7 @@ public TResult Route(Message<TPayload> message, MessageContext? context = null)
throw new ArgumentNullException(nameof(message));

var effectiveContext = context ?? MessageContext.From(message);
var snapshot = _routes;
var snapshot = Volatile.Read(ref _routes);
foreach (var route in snapshot)
if (route.Predicate(message, effectiveContext))
return route.Handler(message, effectiveContext);
Expand Down
94 changes: 78 additions & 16 deletions src/PatternKit.Generators/CacheAside/CacheAsidePolicyGenerator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,31 +130,93 @@ private static string GenerateSource(
sb.AppendLine();
}

sb.Append(GetAccessibility(type.DeclaredAccessibility)).Append(' ');
var indent = "";
foreach (var containingType in GetContainingTypes(type))
{
AppendTypeDeclaration(sb, containingType, indent);
sb.Append(indent).AppendLine("{");
indent += " ";
}

AppendTypeDeclaration(sb, type, indent);
sb.AppendLine("{");
sb.Append(indent).Append(" public static global::PatternKit.Cloud.CacheAside.CacheAsidePolicy<").Append(resultTypeName).Append("> ").Append(factoryMethodName).AppendLine("()");
sb.Append(indent).AppendLine(" {");
sb.Append(indent).Append(" var builder = global::PatternKit.Cloud.CacheAside.CacheAsidePolicy<").Append(resultTypeName).Append(">.Create(\"").Append(Escape(policyName)).AppendLine("\");");

if (timeToLiveMilliseconds > 0)
sb.Append(indent).Append(" builder.WithTimeToLive(global::System.TimeSpan.FromMilliseconds(").Append(timeToLiveMilliseconds).AppendLine("));");
else
sb.Append(indent).AppendLine(" builder.WithoutExpiration();");

if (predicate is not null)
sb.Append(indent).Append(" builder.CacheWhen(static value => ").Append(predicate.Name).AppendLine("(value));");

sb.Append(indent).AppendLine(" return builder.Build();");
sb.Append(indent).AppendLine(" }");
sb.Append(indent).AppendLine("}");

while (indent.Length > 0)
{
indent = indent.Substring(4);
sb.Append(indent).AppendLine("}");
}

return sb.ToString();
}

private static IReadOnlyList<INamedTypeSymbol> GetContainingTypes(INamedTypeSymbol type)
{
var stack = new Stack<INamedTypeSymbol>();
for (var current = type.ContainingType; current is not null; current = current.ContainingType)
stack.Push(current);
return stack.ToArray();
}

private static void AppendTypeDeclaration(StringBuilder sb, INamedTypeSymbol type, string indent)
{
sb.Append(indent).Append(GetAccessibility(type.DeclaredAccessibility)).Append(' ');
if (type.IsStatic)
sb.Append("static ");
else if (type.IsAbstract && type.TypeKind == TypeKind.Class)
sb.Append("abstract ");
else if (type.IsSealed && type.TypeKind == TypeKind.Class)
sb.Append("sealed ");
sb.Append("partial ").Append(type.TypeKind == TypeKind.Struct ? "struct" : "class").Append(' ').Append(type.Name).AppendLine();
sb.AppendLine("{");
sb.Append(" public static global::PatternKit.Cloud.CacheAside.CacheAsidePolicy<").Append(resultTypeName).Append("> ").Append(factoryMethodName).AppendLine("()");
sb.AppendLine(" {");
sb.Append(" var builder = global::PatternKit.Cloud.CacheAside.CacheAsidePolicy<").Append(resultTypeName).Append(">.Create(\"").Append(Escape(policyName)).AppendLine("\");");
sb.Append("partial ").Append(type.TypeKind == TypeKind.Struct ? "struct" : "class").Append(' ')
.Append(type.Name).Append(GetTypeParameterList(type)).Append(GetConstraintClauses(type)).AppendLine();
}

if (timeToLiveMilliseconds > 0)
sb.Append(" builder.WithTimeToLive(global::System.TimeSpan.FromMilliseconds(").Append(timeToLiveMilliseconds).AppendLine("));");
else
sb.AppendLine(" builder.WithoutExpiration();");
private static string GetTypeParameterList(INamedTypeSymbol type)
=> type.TypeParameters.Length == 0
? string.Empty
: "<" + string.Join(", ", type.TypeParameters.Select(static parameter => parameter.Name)) + ">";

if (predicate is not null)
sb.Append(" builder.CacheWhen(static value => ").Append(predicate.Name).AppendLine("(value));");
private static string GetConstraintClauses(INamedTypeSymbol type)
{
if (type.TypeParameters.Length == 0)
return string.Empty;

sb.AppendLine(" return builder.Build();");
sb.AppendLine(" }");
sb.AppendLine("}");
return sb.ToString();
var clauses = new List<string>();
foreach (var parameter in type.TypeParameters)
{
var constraints = new List<string>();
if (parameter.HasReferenceTypeConstraint)
constraints.Add(parameter.ReferenceTypeConstraintNullableAnnotation == NullableAnnotation.Annotated ? "class?" : "class");
if (parameter.HasNotNullConstraint)
constraints.Add("notnull");
if (parameter.HasUnmanagedTypeConstraint)
constraints.Add("unmanaged");
else if (parameter.HasValueTypeConstraint)
constraints.Add("struct");

constraints.AddRange(parameter.ConstraintTypes.Select(static constraint => constraint.ToDisplayString(TypeFormat)));
if (parameter.HasConstructorConstraint)
constraints.Add("new()");
if (constraints.Count > 0)
clauses.Add($" where {parameter.Name} : {string.Join(", ", constraints)}");
}

return string.Concat(clauses);
}

private static bool IsCachePredicate(IMethodSymbol method, ITypeSymbol resultType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,30 +151,92 @@ private static string GenerateSource(
sb.AppendLine();
}

sb.Append(GetAccessibility(type.DeclaredAccessibility)).Append(' ');
var indent = "";
foreach (var containingType in GetContainingTypes(type))
{
AppendTypeDeclaration(sb, containingType, indent);
sb.Append(indent).AppendLine("{");
indent += " ";
}

AppendTypeDeclaration(sb, type, indent);
sb.AppendLine("{");
sb.Append(indent).Append(" public static global::PatternKit.Cloud.CircuitBreaker.CircuitBreakerPolicy<").Append(resultTypeName).Append("> ").Append(factoryMethodName).AppendLine("()");
sb.Append(indent).AppendLine(" {");
sb.Append(indent).Append(" var builder = global::PatternKit.Cloud.CircuitBreaker.CircuitBreakerPolicy<").Append(resultTypeName).Append(">.Create(\"").Append(Escape(policyName)).AppendLine("\")");
sb.Append(indent).Append(" .WithFailureThreshold(").Append(failureThreshold).AppendLine(")");
sb.Append(indent).Append(" .WithBreakDuration(global::System.TimeSpan.FromMilliseconds(").Append(breakDurationMilliseconds).AppendLine("));");

if (resultPredicate is not null)
sb.Append(indent).Append(" builder.HandleResult(static result => ").Append(resultPredicate.Name).AppendLine("(result));");
if (exceptionPredicate is not null)
sb.Append(indent).Append(" builder.HandleException(static exception => ").Append(exceptionPredicate.Name).AppendLine("(exception));");

sb.Append(indent).AppendLine(" return builder.Build();");
sb.Append(indent).AppendLine(" }");
sb.Append(indent).AppendLine("}");

while (indent.Length > 0)
{
indent = indent.Substring(4);
sb.Append(indent).AppendLine("}");
}

return sb.ToString();
}

private static IReadOnlyList<INamedTypeSymbol> GetContainingTypes(INamedTypeSymbol type)
{
var stack = new Stack<INamedTypeSymbol>();
for (var current = type.ContainingType; current is not null; current = current.ContainingType)
stack.Push(current);
return stack.ToArray();
}

private static void AppendTypeDeclaration(StringBuilder sb, INamedTypeSymbol type, string indent)
{
sb.Append(indent).Append(GetAccessibility(type.DeclaredAccessibility)).Append(' ');
if (type.IsStatic)
sb.Append("static ");
else if (type.IsAbstract && type.TypeKind == TypeKind.Class)
sb.Append("abstract ");
else if (type.IsSealed && type.TypeKind == TypeKind.Class)
sb.Append("sealed ");
sb.Append("partial ").Append(type.TypeKind == TypeKind.Struct ? "struct" : "class").Append(' ').Append(type.Name).AppendLine();
sb.AppendLine("{");
sb.Append(" public static global::PatternKit.Cloud.CircuitBreaker.CircuitBreakerPolicy<").Append(resultTypeName).Append("> ").Append(factoryMethodName).AppendLine("()");
sb.AppendLine(" {");
sb.Append(" var builder = global::PatternKit.Cloud.CircuitBreaker.CircuitBreakerPolicy<").Append(resultTypeName).Append(">.Create(\"").Append(Escape(policyName)).AppendLine("\")");
sb.Append(" .WithFailureThreshold(").Append(failureThreshold).AppendLine(")");
sb.Append(" .WithBreakDuration(global::System.TimeSpan.FromMilliseconds(").Append(breakDurationMilliseconds).AppendLine("));");
sb.Append("partial ").Append(type.TypeKind == TypeKind.Struct ? "struct" : "class").Append(' ')
.Append(type.Name).Append(GetTypeParameterList(type)).Append(GetConstraintClauses(type)).AppendLine();
}

if (resultPredicate is not null)
sb.Append(" builder.HandleResult(static result => ").Append(resultPredicate.Name).AppendLine("(result));");
if (exceptionPredicate is not null)
sb.Append(" builder.HandleException(static exception => ").Append(exceptionPredicate.Name).AppendLine("(exception));");
private static string GetTypeParameterList(INamedTypeSymbol type)
=> type.TypeParameters.Length == 0
? string.Empty
: "<" + string.Join(", ", type.TypeParameters.Select(static parameter => parameter.Name)) + ">";

sb.AppendLine(" return builder.Build();");
sb.AppendLine(" }");
sb.AppendLine("}");
return sb.ToString();
private static string GetConstraintClauses(INamedTypeSymbol type)
{
if (type.TypeParameters.Length == 0)
return string.Empty;

var clauses = new List<string>();
foreach (var parameter in type.TypeParameters)
{
var constraints = new List<string>();
if (parameter.HasReferenceTypeConstraint)
constraints.Add(parameter.ReferenceTypeConstraintNullableAnnotation == NullableAnnotation.Annotated ? "class?" : "class");
if (parameter.HasNotNullConstraint)
constraints.Add("notnull");
if (parameter.HasUnmanagedTypeConstraint)
constraints.Add("unmanaged");
else if (parameter.HasValueTypeConstraint)
constraints.Add("struct");

constraints.AddRange(parameter.ConstraintTypes.Select(static constraint => constraint.ToDisplayString(TypeFormat)));
if (parameter.HasConstructorConstraint)
constraints.Add("new()");
if (constraints.Count > 0)
clauses.Add($" where {parameter.Name} : {string.Join(", ", constraints)}");
}

return string.Concat(clauses);
}

private static bool IsResultPredicate(IMethodSymbol method, ITypeSymbol resultType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ private static IEnumerable<IPropertySymbol> GetContractProperties(INamedTypeSymb
component.GetMembers()
.OfType<IPropertySymbol>()
.Where(p => !p.IsStatic && !p.IsIndexer && p.GetMethod is not null && !HasIgnore(p))
.Where(p => component.TypeKind != TypeKind.Class || p.IsAbstract || p.IsVirtual || (p.IsOverride && !p.IsSealed))
.OrderBy(p => p.Name, StringComparer.Ordinal);

private static bool IsPartial(SyntaxNode node) =>
Expand Down
Loading
Loading