Skip to content

Add embedded HTTP proxy server for streaming HTTP triggers (fixes #781)#877

Draft
ahmedmuhsin wants to merge 8 commits into
Azure:devfrom
ahmedmuhsin:feat/http-proxy-streaming
Draft

Add embedded HTTP proxy server for streaming HTTP triggers (fixes #781)#877
ahmedmuhsin wants to merge 8 commits into
Azure:devfrom
ahmedmuhsin:feat/http-proxy-streaming

Conversation

@ahmedmuhsin

@ahmedmuhsin ahmedmuhsin commented Jun 12, 2026

Copy link
Copy Markdown
Contributor

Summary

Adds an embedded HTTP proxy server to the Java worker so HTTP-triggered functions can stream both request and response bodies, instead of being forced through the buffered gRPC InvocationRequest / InvocationResponse path.

Fixes #781.

Background

Today every HTTP-triggered Java function buffers its entire request body into a single gRPC message before the worker sees it, and buffers its entire response body before the host sends it back. This makes large uploads/downloads and SSE / chunked streaming impossible from Java.

The .NET isolated worker solved this by having the host hand the worker a forwarded HTTP request via a side channel. This PR brings the same capability to the Java worker.

Design

The worker now advertises an HttpUri capability at startup pointing at an embedded HttpProxyServer (JDK com.sun.net.httpserver.HttpServer, ephemeral port, loopback only). When the host detects this capability, it forwards HTTP-trigger requests to that URI in addition to the regular gRPC InvocationRequest. The proxy correlates the HTTP request with the matching gRPC invocation via the x-ms-invocation-id header and an HttpInvocationCoordinator slot.

Once correlated:

  • Buffered functions (existing HttpRequestMessage<byte[]> / String): unchanged behavior — body is read from the proxy exchange and placed into the gRPC TypedData, response is written back through the exchange.
  • Streaming input (new HttpRequestMessage<InputStream>): the body bridge is skipped; the live HttpExchange is stashed in a ThreadLocal on RpcHttpRequestDataSource so the binding can hand the user an InputStream reading directly from the socket.
  • Streaming output (new HttpResponseMessage.Builder.bodyStream(InputStream) / bodyStream(IOConsumer<OutputStream>)): the broker returns an HttpInvocationOutcome that the proxy handler unwraps and pipes directly to the exchange OutputStream with chunked Transfer-Encoding.

What changed

Worker (this PR)

  • http/HttpProxyServer + HttpProxyHandler + HttpInvocationCoordinator — embedded proxy infrastructure
  • JavaWorkerClient advertises the HttpUri capability on startup
  • InvocationRequestHandler.executeProxiedHttp — detects streaming-input signatures via JavaFunctionBroker.methodHasStreamingHttpBody, installs/clears the ThreadLocal<HttpExchange> around the broker call, and skips the body bridge
  • binding/RpcHttpRequestDataSource — adds setCurrentExchange / currentExchange + a streaming branch in HTTP_DATA_OPERATIONS for HttpRequestMessage<InputStream>
  • broker/JavaFunctionBroker.HttpInvocationOutcome — DTO carrying both the gRPC return value and the raw streaming-body callback
  • New end-to-end test HttpProxyEndToEndTest (7 tests) exercising buffered/streaming combinations against a real JavaWorkerClient + proxy server

Library (companion PR)

Streaming output requires new API on HttpResponseMessage.Builder (bodyStream(InputStream) and bodyStream(IOConsumer<OutputStream>)). That sits in azure-functions-java-additions and ships as 1.4.0-SNAPSHOT. Companion PR: Azure/azure-functions-java-additions#55.

Testing

  • 146 unit + integration tests pass (mvn test)
  • New HttpProxyEndToEndTest covers:
    • Buffered request + buffered response round-trip
    • Buffered request + streaming InputStream response (asserts Transfer-Encoding: chunked)
    • Buffered request + streaming IOConsumer<OutputStream> response (SSE-style)
    • Streaming InputStream request + buffered response (asserts body bridge is skipped and ThreadLocal is cleared)
    • Full streaming request + streaming response echo
    • HttpInvocationCoordinator slot release after success
    • Broker exception → HTTP 500 with error message in body

Backward compatibility

  • Existing HttpRequestMessage<byte[]> / String functions are unchanged — the proxy detects buffered signatures via reflection and feeds the body through the existing gRPC pipeline.
  • If the host doesn't honor the HttpUri capability, the worker falls back to the legacy gRPC-only path (the proxy server just sits idle on loopback).

Related

Introduce a new com.microsoft.azure.functions.worker.http package that lays the foundation for receiving HTTP-triggered invocations directly from the Functions host via the HttpUri capability (parity with the Go, Python, and .NET isolated workers).

* HttpProxyServer wraps the JDK built-in com.sun.net.httpserver.HttpServer. Binds to 127.0.0.1 on an ephemeral port, dispatches requests on a cached executor mirroring the existing gRPC dispatch pool (unbounded growth, daemon threads, 15s shutdown drain). No new third-party dependencies.

* HttpInvocationCoordinator synchronizes the two halves of each invocation - the HTTP request and the gRPC InvocationRequest - via a ConcurrentMap of per-invocation slots keyed by invocationId. Each slot exposes CompletableFutures for HTTP arrival, gRPC arrival, and completion.

* ProxyConfig captures bind address/port only. No body-size or timeout caps; matches Go/Python/.NET behavior of relying on the platform for those guards.

Pure infrastructure commit: nothing is wired into the worker lifecycle yet. 15 new unit tests cover both rendezvous orderings, slot release/failure semantics, ephemeral port binding, request routing, and close-after-start behavior.
Adds HttpProxyHandler and HttpBodyBridge to connect the embedded HTTP
proxy server to the existing gRPC invocation dispatch path.

HttpProxyHandler parks each forwarded request on
HttpInvocationCoordinator (keyed by x-ms-invocation-id), then blocks on
the slot's completion future so the connection stays open until the
gRPC side has fully written the response. Missing header / duplicate
arrival / invocation failure paths return appropriate HTTP error
responses.

HttpBodyBridge handles the request body <-> protobuf body conversion
and writes RpcHttp responses back to HttpExchange. Body classification
mirrors the host's PopulateBody behavior (application/json -> JSON
TypedData, text/form-encoded/xml/js -> string, otherwise bytes) so
downstream RpcHttpRequestDataSource logic continues to work unchanged.

API refactor: HttpInvocationCoordinator.register*Arrival() now returns
the HttpInvocationSlot itself instead of the opposite side's future,
because the HTTP handler needs to await completion() (not grpcArrival).
HttpInvocationSlot is now public to expose this surface.

Tests:
- HttpBodyBridgeTest: 15 tests covering content-type classification,
  charset handling, body enrichment, response writing, and status
  parsing.
- HttpProxyHandlerTest: 4 tests covering missing-header,
  successful-completion, invocation-failure, and duplicate-arrival
  paths.
- HttpInvocationCoordinatorTest: updated for the new slot-returning
  API; added completionFutureResolvesOnRelease.

No worker wiring yet -- the proxy server is not started and no
capability is advertised. This change just stages the parts that the
next commit will glue into JavaWorkerClient + the request handlers.
Wires HttpProxyServer and HttpInvocationCoordinator into the worker
lifecycle. When the FUNCTIONS_JAVA_DISABLE_HTTP_PROXY environment
variable is not set to true:

- JavaWorkerClient constructs a single HttpProxyServer and a single
  HttpInvocationCoordinator, shared across all handler instances.
- WorkerInitRequestHandler starts the proxy server during worker init,
  advertises HttpUri = "http://127.0.0.1:<port>" and
  RequiresRouteParameters = "true" so the host forwards HTTP traffic
  for HTTP-triggered functions directly to the worker.
- InvocationRequestHandler detects requests that contain an HTTP input
  binding, rendezvous-es with the matching HttpExchange via the
  coordinator, folds the request body into the gRPC InvocationRequest,
  invokes the user function, writes the RpcHttp response back to the
  exchange, and releases the slot. Failures fail the slot so the HTTP
  handler returns 500.
- Non-HTTP invocations (queue, timer, etc.) bypass the coordinator and
  execute on the existing gRPC-only path.
- JavaWorkerClient.close() stops the proxy server before tearing down
  the gRPC peer so in-flight HTTP handlers drain cleanly.

Fixes Azure#781: requests sent with Transfer-Encoding: chunked previously
arrived at the worker with an empty body because of a null-comparison
bug in the host's RpcHttp content-length handling. Moving body delivery
to HTTP (matching Go / Python / .NET isolated workers) sidesteps that
host bug entirely.

The escape hatch FUNCTIONS_JAVA_DISABLE_HTTP_PROXY=true restores the
old behavior if anything regresses.

Test verification: full test suite (mvn -pl . test) passes 111/111
tests including 35 new tests across HttpProxyServer, coordinator,
handler, and body bridge.
…am()

When a function returns an HttpResponseMessage whose body is an
InputStream or HttpResponseMessage.IOConsumer<OutputStream>, the HTTP
proxy dispatch path now writes the body directly to the underlying
HttpExchange response stream using chunked transfer-encoding instead
of buffering the entire payload through a protobuf TypedData. This
enables Server-Sent Events, large file downloads, and other long-lived
streaming responses without first materializing the whole body in
memory.

Design highlights:
- RpcHttpDataTarget.toRpcHttpData detects streaming bodies and skips
  the RpcUnspecifiedDataTarget serialization step, leaving the RpcHttp
  body field unset so the status + headers envelope still flows through
  the existing pipeline unchanged.
- BindingDataStore exposes getHttpResponseRawBody() so the dispatch
  layer can recover the raw (unserialized) body without ripping apart
  the protobuf reply.
- JavaFunctionBroker.invokeMethodForHttpProxy is a new method overload
  (not a modification of invokeMethod) that returns an
  HttpInvocationOutcome containing both the protobuf reply and the raw
  body. The original invokeMethod is left untouched for backward
  compatibility with the existing gRPC dispatch path.
- HttpBodyBridge.writeStreamingResponse(InputStream) and
  writeStreamingResponse(IOConsumer) handle the actual streaming
  write; both use sendResponseHeaders(status, 0) to select chunked
  transfer-encoding (or close-delimited for HTTP/1.0 clients) and
  close/flush the streams reliably in try-with-resources.
- InvocationRequestHandler.executeProxiedHttp dispatches based on the
  raw body type, falling back to the existing buffered writer when the
  body is not a streaming type.

The pom.xml is bumped to consume azure-functions-java-core-library
1.4.0-SNAPSHOT, which adds the bodyStream() overloads and the
IOConsumer functional interface.

Tests added: 5 new HttpBodyBridge streaming tests, 8 new
RpcHttpDataTarget tests, 6 new BindingDataStore tests. Total worker
test suite: 130 passing (was 111).
When a function declares its trigger parameter as
HttpRequestMessage<InputStream> (or any InputStream subtype), the HTTP
proxy dispatch path now hands the function the live request body
stream from the HttpExchange instead of eagerly buffering the entire
payload into a protobuf TypedData. This enables streaming uploads,
multi-gigabyte request bodies, and proxy-style scenarios where the
function consumes the body incrementally without first materializing
it in memory.

Design highlights:
- RpcHttpRequestDataSource captures a per-thread HttpExchange via a
  ThreadLocal at construction time; its HTTP_DATA_OPERATIONS resolver
  detects InputStream-assignable type arguments and returns
  exchange.getRequestBody() directly when the exchange is present.
  Otherwise the existing buffered bodyDataSource path is preserved
  unchanged.
- JavaFunctionBroker.methodHasStreamingHttpBody(id) introspects the
  function signature so the dispatch layer can decide upfront whether
  to skip the eager body read.
- InvocationRequestHandler.executeProxiedHttp consults the pre-check;
  when true it skips HttpBodyBridge.enrichRequestWithBody (which would
  consume the stream), installs the exchange on the ThreadLocal across
  the broker invocation, and clears it in a finally block so the
  thread can be safely reused.

The existing buffered path is the default for all other signatures
(HttpRequestMessage<String>, HttpRequestMessage<byte[]>, raw
HttpRequestMessage, JSON-bound POJOs) and behaves identically to
before. Non-proxy gRPC dispatch is unaffected.

Tests added:
- RpcHttpRequestDataSourceTest: 3 streaming-input cases (live stream
  handed back when exchange captured; ClassCastException on
  fallthrough when no exchange; ThreadLocal cleared correctly).
- JavaFunctionBrokerStreamingTest: 6 signature-introspection cases
  (InputStream, InputStream subtype, String, raw, non-HTTP, unknown
  id).
Wires HttpProxyServer + HttpProxyHandler + HttpInvocationCoordinator +
InvocationRequestHandler together with a mocked JavaFunctionBroker and
exercises seven scenarios end-to-end:

  1. buffered request / buffered response  (sanity for the existing path)
  2. buffered request / streaming InputStream response
  3. buffered request / streaming IOConsumer (SSE) response
  4. streaming request / buffered response  (validates the live exchange
     is exposed via the per-thread side channel and the body is NOT
     folded into the protobuf envelope)
  5. full streaming (request + response)    (echoes uploaded bytes back)
  6. coordinator slot is released after a successful invocation
  7. broker exception surfaces as an HTTP 500 to the client

The tests fire real HTTP requests against an embedded HttpServer on an
ephemeral port and drive the simulated gRPC arrival on the test thread,
exercising the rendezvous, body propagation, and response writeback in
their integrated form.

Supporting library-side changes (no behavior impact on the production
path):

- JavaFunctionBroker.HttpInvocationOutcome: constructor is now public
  so tests outside the broker package can construct the DTO without
  reflection. It was already a plain DTO with public getters.

- RpcHttpRequestDataSource.currentExchange(): public accessor for the
  per-thread HttpExchange installed by setCurrentExchange(). Used by
  the integration tests to assert that the dispatch layer correctly
  installs and clears the side channel around the broker invocation.

- HttpInvocationCoordinator.activeInvocationCount(): widened from
  package-private to public so the integration tests (in the handler
  package) can assert slot release.
…source

The worker depends on `azure-functions-java-core-library:1.4.0-SNAPSHOT`
during development of cross-repo changes, but ADO CI agents only resolve
from Maven Central + OSS Sonatype snapshots. There was no way to validate
a worker PR against a fork/branch of azure-functions-java-additions
without first publishing the snapshot.

This adds three opt-in pipeline parameters to `public-build.yml`:

  - buildAdditionsFromSource (bool, default: false)
  - additionsRepoUrl         (string, default: official Azure repo)
  - additionsBranch          (string, default: dev)

When the toggle is true, an "Install azure-functions-java-additions
from source" step runs before the worker's mvn build in every job
(Build, TestWindows, TestLinux, TestDocker). The step delegates to
the existing `installAdditionsLocally.ps1`, which is parameterized to
accept a repo URL + branch and made idempotent for CI re-runs.

When the toggle is false (the default for normal PRs against dev) the
step is excluded at template-expansion time via `${{ if eq(...) }}`,
so there is zero impact on existing builds.

To use: queue the PR pipeline manually from the ADO UI, check
"Build azure-functions-java-additions from source", and optionally
override the repo URL/branch to point at a fork (e.g.
`https://github.com/ahmedmuhsin/azure-functions-java-additions.git`
on `feat/http-response-bodystream`).
GitHub-triggered ADO PR runs can't set pipeline parameters interactively,
so flip the defaults to point at the fork branch that contains the matching
unpublished library snapshot:

  buildAdditionsFromSource: true
  additionsRepoUrl:        https://github.com/ahmedmuhsin/azure-functions-java-additions.git
  additionsBranch:         feat/http-response-bodystream

REVERT THIS COMMIT before merge — once the companion PR
(Azure/azure-functions-java-additions#55) lands and a matching
`azure-functions-java-core-library` version is published to Maven Central,
the defaults should return to:

  buildAdditionsFromSource: false
  additionsRepoUrl:        https://github.com/Azure/azure-functions-java-additions.git
  additionsBranch:         dev
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Http Requests with Transfer-Encoding: chunked report an empty body

1 participant