Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions src/Dexpace.Sdk.Core/Pipeline/HttpPipeline.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright (c) 2026 dexpace and Omar Aljarrah.
// Licensed under the MIT License. See LICENSE in the repository root for details.

using Dexpace.Sdk.Core.Client;
using Dexpace.Sdk.Core.Configuration;
using Dexpace.Sdk.Core.Errors;
using Dexpace.Sdk.Core.Http.Request;
using Dexpace.Sdk.Core.Http.Response;

namespace Dexpace.Sdk.Core.Pipeline;

/// <summary>
/// The entry point for sending an HTTP request through the configured policy chain.
/// </summary>
/// <remarks>
/// <para>
/// Instances are created exclusively by <see cref="PipelineBuilder.Build"/>. The pipeline is
/// immutable after construction: the sorted policy array and transport are captured at build time.
/// </para>
/// <para>
/// <b>Sync bridge.</b> <see cref="Send"/> blocks the calling thread by driving the async chain
/// synchronously via <c>GetAwaiter().GetResult()</c>. Callers on a thread pool should prefer
/// <see cref="SendAsync"/> to avoid thread starvation.
/// </para>
/// </remarks>
public sealed class HttpPipeline
{
private readonly HttpPipelinePolicy[] _policies;
private readonly IAsyncHttpClient _transport;

internal HttpPipeline(HttpPipelinePolicy[] policies, IAsyncHttpClient transport)
{
_policies = policies;
_transport = transport;
}

/// <summary>
/// Asynchronously sends <paramref name="request"/> through the pipeline and returns the
/// response produced by the terminal transport.
/// </summary>
/// <param name="request">The request to send.</param>
/// <param name="options">Client options that apply to this call.</param>
/// <param name="cancellationToken">An optional token to cancel the call.</param>
/// <returns>
/// A <see cref="ValueTask{TResult}"/> that completes with the <see cref="Response"/> once
/// the pipeline chain has finished.
/// </returns>
/// <exception cref="PipelineAbortedException">
/// No policy or the transport produced a <see cref="Response"/> by the time the chain
/// completed (i.e. the pipeline was short-circuited without setting a response).
/// </exception>
public async ValueTask<Response> SendAsync(
Request request,
DexpaceClientOptions options,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(request);
ArgumentNullException.ThrowIfNull(options);

var context = new PipelineContext(request, options, cancellationToken);
await new PipelineRunner(_policies, 0, _transport).RunAsync(context).ConfigureAwait(false);

return context.Response
?? throw new PipelineAbortedException(
"The pipeline completed without producing a response.");
}

/// <summary>
/// Synchronously sends <paramref name="request"/> through the pipeline and returns the
/// response. Blocks the calling thread until the async chain completes.
/// </summary>
/// <param name="request">The request to send.</param>
/// <param name="options">Client options that apply to this call.</param>
/// <param name="cancellationToken">An optional token to cancel the call.</param>
/// <returns>The <see cref="Response"/> produced by the pipeline.</returns>
/// <exception cref="PipelineAbortedException">
/// The pipeline completed without producing a response.
/// </exception>
public Response Send(
Request request,
DexpaceClientOptions options,
CancellationToken cancellationToken = default) =>
SendAsync(request, options, cancellationToken).AsTask().GetAwaiter().GetResult();
}
46 changes: 46 additions & 0 deletions src/Dexpace.Sdk.Core/Pipeline/HttpPipelinePolicy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (c) 2026 dexpace and Omar Aljarrah.
// Licensed under the MIT License. See LICENSE in the repository root for details.

namespace Dexpace.Sdk.Core.Pipeline;

/// <summary>
/// Base class for every policy in the HTTP pipeline.
/// </summary>
/// <remarks>
/// <para>
/// A policy participates in the request/response lifecycle by implementing
/// <see cref="ProcessAsync"/>. Before calling <c>next.RunAsync</c>, a policy may mutate
/// <see cref="PipelineContext.Request"/>; after the call returns, it may inspect or replace
/// <see cref="PipelineContext.Response"/>.
/// </para>
/// <para>
/// <b>Re-entrancy.</b> <see cref="PipelineRunner"/> is a value type, so a policy may call
/// <c>next.RunAsync</c> more than once — this is how redirect and retry policies work.
/// </para>
/// <para>
/// <b>Async-only in v1.</b> There is no synchronous <c>Process</c> override on this base class.
/// The sync entry point on the pipeline drives the async chain via a blocking
/// bridge; concrete policy subclasses are only required to implement the async path.
/// </para>
/// </remarks>
public abstract class HttpPipelinePolicy
{
/// <summary>
/// The stage at which this policy is inserted in the pipeline.
/// </summary>
public abstract PipelineStage Stage { get; }

/// <summary>
/// Asynchronously participates in processing the request/response.
/// </summary>
/// <param name="context">
/// The mutable context carrying the current <see cref="PipelineContext.Request"/>,
/// <see cref="PipelineContext.Response"/>, and ancillary state for this call.
/// </param>
/// <param name="continuation">
/// The continuation that runs the remaining policies and eventually invokes the transport.
/// Call this to forward the request; omit the call to short-circuit the chain.
/// </param>
/// <returns>A <see cref="ValueTask"/> that completes when the policy has finished.</returns>
public abstract ValueTask ProcessAsync(PipelineContext context, PipelineRunner continuation);
}
161 changes: 161 additions & 0 deletions src/Dexpace.Sdk.Core/Pipeline/PipelineBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Copyright (c) 2026 dexpace and Omar Aljarrah.
// Licensed under the MIT License. See LICENSE in the repository root for details.

using Dexpace.Sdk.Core.Client;

namespace Dexpace.Sdk.Core.Pipeline;

/// <summary>
/// Builds an <see cref="HttpPipeline"/> from an ordered set of <see cref="HttpPipelinePolicy"/>
/// instances and a terminal transport.
/// </summary>
/// <remarks>
/// <para>
/// Policies are kept in an internal list. <see cref="Add"/> appends to that list;
/// <c>InsertBefore&lt;T&gt;</c>, <c>InsertAfter&lt;T&gt;</c>, <c>Replace&lt;T&gt;</c>, and
/// <c>Remove&lt;T&gt;</c> operate relative to the first policy of runtime type <c>T</c>.
/// </para>
/// <para>
/// <see cref="Build"/> performs a <b>stable sort by <see cref="HttpPipelinePolicy.Stage"/></b>
/// (preserving list order within a stage) and then validates pillar-stage cardinality:
/// stages marked as pillar admit exactly one policy. A violation throws
/// <see cref="InvalidOperationException"/> with an actionable message naming the offending stage.
/// </para>
/// </remarks>
public sealed class PipelineBuilder
{
private readonly List<HttpPipelinePolicy> _list = [];

/// <summary>
/// Appends <paramref name="policy"/> to the internal list. The stage-based sort happens at
/// <see cref="Build"/> time, not here.
/// </summary>
/// <param name="policy">The policy to add.</param>
/// <returns>This builder (fluent interface).</returns>
public PipelineBuilder Add(HttpPipelinePolicy policy)
{
ArgumentNullException.ThrowIfNull(policy);
_list.Add(policy);
return this;
}

/// <summary>
/// Inserts <paramref name="policy"/> immediately before the first policy of runtime type
/// <typeparamref name="T"/> in the current list.
/// </summary>
/// <typeparam name="T">The type to search for.</typeparam>
/// <param name="policy">The policy to insert.</param>
/// <returns>This builder (fluent interface).</returns>
/// <exception cref="InvalidOperationException">
/// No policy of type <typeparamref name="T"/> is present in the list.
/// </exception>
public PipelineBuilder InsertBefore<T>(HttpPipelinePolicy policy)
where T : HttpPipelinePolicy
{
ArgumentNullException.ThrowIfNull(policy);
var index = FindFirst<T>();
_list.Insert(index, policy);
return this;
}

/// <summary>
/// Inserts <paramref name="policy"/> immediately after the first policy of runtime type
/// <typeparamref name="T"/> in the current list.
/// </summary>
/// <typeparam name="T">The type to search for.</typeparam>
/// <param name="policy">The policy to insert.</param>
/// <returns>This builder (fluent interface).</returns>
/// <exception cref="InvalidOperationException">
/// No policy of type <typeparamref name="T"/> is present in the list.
/// </exception>
public PipelineBuilder InsertAfter<T>(HttpPipelinePolicy policy)
where T : HttpPipelinePolicy
{
ArgumentNullException.ThrowIfNull(policy);
var index = FindFirst<T>();
_list.Insert(index + 1, policy);
return this;
}

/// <summary>
/// Replaces the first policy of runtime type <typeparamref name="T"/> with
/// <paramref name="policy"/>.
/// </summary>
/// <typeparam name="T">The type to replace.</typeparam>
/// <param name="policy">The replacement policy.</param>
/// <returns>This builder (fluent interface).</returns>
/// <exception cref="InvalidOperationException">
/// No policy of type <typeparamref name="T"/> is present in the list.
/// </exception>
public PipelineBuilder Replace<T>(HttpPipelinePolicy policy)
where T : HttpPipelinePolicy
{
ArgumentNullException.ThrowIfNull(policy);
var index = FindFirst<T>();
_list[index] = policy;
return this;
}

/// <summary>
/// Removes every policy of runtime type <typeparamref name="T"/> from the list.
/// If none are present, this is a no-op.
/// </summary>
/// <typeparam name="T">The type to remove.</typeparam>
/// <returns>This builder (fluent interface).</returns>
public PipelineBuilder Remove<T>()
where T : HttpPipelinePolicy
{
_list.RemoveAll(p => p is T);
return this;
}

/// <summary>
/// Stable-sorts the registered policies by <see cref="HttpPipelinePolicy.Stage"/>, validates
/// pillar-stage cardinality, and constructs the <see cref="HttpPipeline"/> with the given
/// <paramref name="transport"/> as the terminal.
/// </summary>
/// <param name="transport">
/// The terminal transport; invoked after all policies have run.
/// </param>
/// <returns>A fully configured <see cref="HttpPipeline"/>.</returns>
/// <exception cref="InvalidOperationException">
/// A pillar stage contains more than one policy.
/// </exception>
public HttpPipeline Build(IAsyncHttpClient transport)
{
ArgumentNullException.ThrowIfNull(transport);

// Stable sort by Stage value
HttpPipelinePolicy[] sorted = [.. _list.OrderBy(p => (int)p.Stage)];

// Validate pillar cardinality
foreach (var stage in PipelineStageHelper.PillarStages)
{
var count = sorted.Count(p => p.Stage == stage);
if (count > 1)
{
throw new InvalidOperationException(
$"Pipeline stage '{stage}' is a pillar stage and may contain at most one policy, " +
$"but {count} policies were registered for it. " +
$"Remove the duplicate or use a non-pillar stage.");
}
}

return new HttpPipeline(sorted, transport);
}

// Returns the index of the first policy of type T, or throws.
private int FindFirst<T>() where T : HttpPipelinePolicy
{
for (var i = 0; i < _list.Count; i++)
{
if (_list[i] is T)
{
return i;
}
}

throw new InvalidOperationException(
$"No policy of type '{typeof(T).Name}' is registered in this builder.");
}
}
63 changes: 63 additions & 0 deletions src/Dexpace.Sdk.Core/Pipeline/PipelineRunner.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright (c) 2026 dexpace and Omar Aljarrah.
// Licensed under the MIT License. See LICENSE in the repository root for details.

using Dexpace.Sdk.Core.Client;

namespace Dexpace.Sdk.Core.Pipeline;

/// <summary>
/// The "next" continuation passed to each <see cref="HttpPipelinePolicy.ProcessAsync"/> call.
/// Advances the policy index and ultimately invokes the transport.
/// </summary>
/// <remarks>
/// <para>
/// <see cref="PipelineRunner"/> is a <c>readonly struct</c> so it carries zero allocation per
/// policy hop. A policy may call <see cref="RunAsync"/> more than once (e.g. retry, redirect)
/// because the runner is immutable — each call re-advances from the same index with its own
/// in-flight state.
/// </para>
/// <para>
/// Callers must not retain or share a <see cref="PipelineRunner"/> beyond the duration of
/// <see cref="HttpPipelinePolicy.ProcessAsync"/>.
/// </para>
/// </remarks>
public readonly struct PipelineRunner
{
private readonly HttpPipelinePolicy[] _policies;
private readonly int _index;
private readonly IAsyncHttpClient _transport;

/// <summary>
/// Initializes a runner. Called by the pipeline entry point and recursively by
/// <see cref="RunAsync"/>.
/// </summary>
/// <param name="policies">The ordered (sorted-by-stage) policy array.</param>
/// <param name="index">The index of the next policy to invoke.</param>
/// <param name="transport">The terminal transport invoked when all policies have run.</param>
internal PipelineRunner(HttpPipelinePolicy[] policies, int index, IAsyncHttpClient transport)
{
_policies = policies;
_index = index;
_transport = transport;
}

/// <summary>
/// Runs the remainder of the pipeline starting at the current index, then invokes the
/// transport if no earlier policy short-circuited.
/// </summary>
/// <param name="context">The mutable context for the current call.</param>
/// <returns>A <see cref="ValueTask"/> that completes when the pipeline tail has run.</returns>
public async ValueTask RunAsync(PipelineContext context)
{
if (_index >= _policies.Length)
{
context.Response = await _transport
.ExecuteAsync(context.Request, context.CancellationToken)
.ConfigureAwait(false);
return;
}

var next = new PipelineRunner(_policies, _index + 1, _transport);
await _policies[_index].ProcessAsync(context, next).ConfigureAwait(false);
}
}
Loading