Add embedded HTTP proxy server for streaming HTTP triggers (fixes #781)#877
Draft
ahmedmuhsin wants to merge 8 commits into
Draft
Add embedded HTTP proxy server for streaming HTTP triggers (fixes #781)#877ahmedmuhsin wants to merge 8 commits into
ahmedmuhsin wants to merge 8 commits into
Conversation
Introduce a new com.microsoft.azure.functions.worker.http package that lays the foundation for receiving HTTP-triggered invocations directly from the Functions host via the HttpUri capability (parity with the Go, Python, and .NET isolated workers). * HttpProxyServer wraps the JDK built-in com.sun.net.httpserver.HttpServer. Binds to 127.0.0.1 on an ephemeral port, dispatches requests on a cached executor mirroring the existing gRPC dispatch pool (unbounded growth, daemon threads, 15s shutdown drain). No new third-party dependencies. * HttpInvocationCoordinator synchronizes the two halves of each invocation - the HTTP request and the gRPC InvocationRequest - via a ConcurrentMap of per-invocation slots keyed by invocationId. Each slot exposes CompletableFutures for HTTP arrival, gRPC arrival, and completion. * ProxyConfig captures bind address/port only. No body-size or timeout caps; matches Go/Python/.NET behavior of relying on the platform for those guards. Pure infrastructure commit: nothing is wired into the worker lifecycle yet. 15 new unit tests cover both rendezvous orderings, slot release/failure semantics, ephemeral port binding, request routing, and close-after-start behavior.
Adds HttpProxyHandler and HttpBodyBridge to connect the embedded HTTP proxy server to the existing gRPC invocation dispatch path. HttpProxyHandler parks each forwarded request on HttpInvocationCoordinator (keyed by x-ms-invocation-id), then blocks on the slot's completion future so the connection stays open until the gRPC side has fully written the response. Missing header / duplicate arrival / invocation failure paths return appropriate HTTP error responses. HttpBodyBridge handles the request body <-> protobuf body conversion and writes RpcHttp responses back to HttpExchange. Body classification mirrors the host's PopulateBody behavior (application/json -> JSON TypedData, text/form-encoded/xml/js -> string, otherwise bytes) so downstream RpcHttpRequestDataSource logic continues to work unchanged. API refactor: HttpInvocationCoordinator.register*Arrival() now returns the HttpInvocationSlot itself instead of the opposite side's future, because the HTTP handler needs to await completion() (not grpcArrival). HttpInvocationSlot is now public to expose this surface. Tests: - HttpBodyBridgeTest: 15 tests covering content-type classification, charset handling, body enrichment, response writing, and status parsing. - HttpProxyHandlerTest: 4 tests covering missing-header, successful-completion, invocation-failure, and duplicate-arrival paths. - HttpInvocationCoordinatorTest: updated for the new slot-returning API; added completionFutureResolvesOnRelease. No worker wiring yet -- the proxy server is not started and no capability is advertised. This change just stages the parts that the next commit will glue into JavaWorkerClient + the request handlers.
Wires HttpProxyServer and HttpInvocationCoordinator into the worker lifecycle. When the FUNCTIONS_JAVA_DISABLE_HTTP_PROXY environment variable is not set to true: - JavaWorkerClient constructs a single HttpProxyServer and a single HttpInvocationCoordinator, shared across all handler instances. - WorkerInitRequestHandler starts the proxy server during worker init, advertises HttpUri = "http://127.0.0.1:<port>" and RequiresRouteParameters = "true" so the host forwards HTTP traffic for HTTP-triggered functions directly to the worker. - InvocationRequestHandler detects requests that contain an HTTP input binding, rendezvous-es with the matching HttpExchange via the coordinator, folds the request body into the gRPC InvocationRequest, invokes the user function, writes the RpcHttp response back to the exchange, and releases the slot. Failures fail the slot so the HTTP handler returns 500. - Non-HTTP invocations (queue, timer, etc.) bypass the coordinator and execute on the existing gRPC-only path. - JavaWorkerClient.close() stops the proxy server before tearing down the gRPC peer so in-flight HTTP handlers drain cleanly. Fixes Azure#781: requests sent with Transfer-Encoding: chunked previously arrived at the worker with an empty body because of a null-comparison bug in the host's RpcHttp content-length handling. Moving body delivery to HTTP (matching Go / Python / .NET isolated workers) sidesteps that host bug entirely. The escape hatch FUNCTIONS_JAVA_DISABLE_HTTP_PROXY=true restores the old behavior if anything regresses. Test verification: full test suite (mvn -pl . test) passes 111/111 tests including 35 new tests across HttpProxyServer, coordinator, handler, and body bridge.
…am() When a function returns an HttpResponseMessage whose body is an InputStream or HttpResponseMessage.IOConsumer<OutputStream>, the HTTP proxy dispatch path now writes the body directly to the underlying HttpExchange response stream using chunked transfer-encoding instead of buffering the entire payload through a protobuf TypedData. This enables Server-Sent Events, large file downloads, and other long-lived streaming responses without first materializing the whole body in memory. Design highlights: - RpcHttpDataTarget.toRpcHttpData detects streaming bodies and skips the RpcUnspecifiedDataTarget serialization step, leaving the RpcHttp body field unset so the status + headers envelope still flows through the existing pipeline unchanged. - BindingDataStore exposes getHttpResponseRawBody() so the dispatch layer can recover the raw (unserialized) body without ripping apart the protobuf reply. - JavaFunctionBroker.invokeMethodForHttpProxy is a new method overload (not a modification of invokeMethod) that returns an HttpInvocationOutcome containing both the protobuf reply and the raw body. The original invokeMethod is left untouched for backward compatibility with the existing gRPC dispatch path. - HttpBodyBridge.writeStreamingResponse(InputStream) and writeStreamingResponse(IOConsumer) handle the actual streaming write; both use sendResponseHeaders(status, 0) to select chunked transfer-encoding (or close-delimited for HTTP/1.0 clients) and close/flush the streams reliably in try-with-resources. - InvocationRequestHandler.executeProxiedHttp dispatches based on the raw body type, falling back to the existing buffered writer when the body is not a streaming type. The pom.xml is bumped to consume azure-functions-java-core-library 1.4.0-SNAPSHOT, which adds the bodyStream() overloads and the IOConsumer functional interface. Tests added: 5 new HttpBodyBridge streaming tests, 8 new RpcHttpDataTarget tests, 6 new BindingDataStore tests. Total worker test suite: 130 passing (was 111).
When a function declares its trigger parameter as HttpRequestMessage<InputStream> (or any InputStream subtype), the HTTP proxy dispatch path now hands the function the live request body stream from the HttpExchange instead of eagerly buffering the entire payload into a protobuf TypedData. This enables streaming uploads, multi-gigabyte request bodies, and proxy-style scenarios where the function consumes the body incrementally without first materializing it in memory. Design highlights: - RpcHttpRequestDataSource captures a per-thread HttpExchange via a ThreadLocal at construction time; its HTTP_DATA_OPERATIONS resolver detects InputStream-assignable type arguments and returns exchange.getRequestBody() directly when the exchange is present. Otherwise the existing buffered bodyDataSource path is preserved unchanged. - JavaFunctionBroker.methodHasStreamingHttpBody(id) introspects the function signature so the dispatch layer can decide upfront whether to skip the eager body read. - InvocationRequestHandler.executeProxiedHttp consults the pre-check; when true it skips HttpBodyBridge.enrichRequestWithBody (which would consume the stream), installs the exchange on the ThreadLocal across the broker invocation, and clears it in a finally block so the thread can be safely reused. The existing buffered path is the default for all other signatures (HttpRequestMessage<String>, HttpRequestMessage<byte[]>, raw HttpRequestMessage, JSON-bound POJOs) and behaves identically to before. Non-proxy gRPC dispatch is unaffected. Tests added: - RpcHttpRequestDataSourceTest: 3 streaming-input cases (live stream handed back when exchange captured; ClassCastException on fallthrough when no exchange; ThreadLocal cleared correctly). - JavaFunctionBrokerStreamingTest: 6 signature-introspection cases (InputStream, InputStream subtype, String, raw, non-HTTP, unknown id).
Wires HttpProxyServer + HttpProxyHandler + HttpInvocationCoordinator +
InvocationRequestHandler together with a mocked JavaFunctionBroker and
exercises seven scenarios end-to-end:
1. buffered request / buffered response (sanity for the existing path)
2. buffered request / streaming InputStream response
3. buffered request / streaming IOConsumer (SSE) response
4. streaming request / buffered response (validates the live exchange
is exposed via the per-thread side channel and the body is NOT
folded into the protobuf envelope)
5. full streaming (request + response) (echoes uploaded bytes back)
6. coordinator slot is released after a successful invocation
7. broker exception surfaces as an HTTP 500 to the client
The tests fire real HTTP requests against an embedded HttpServer on an
ephemeral port and drive the simulated gRPC arrival on the test thread,
exercising the rendezvous, body propagation, and response writeback in
their integrated form.
Supporting library-side changes (no behavior impact on the production
path):
- JavaFunctionBroker.HttpInvocationOutcome: constructor is now public
so tests outside the broker package can construct the DTO without
reflection. It was already a plain DTO with public getters.
- RpcHttpRequestDataSource.currentExchange(): public accessor for the
per-thread HttpExchange installed by setCurrentExchange(). Used by
the integration tests to assert that the dispatch layer correctly
installs and clears the side channel around the broker invocation.
- HttpInvocationCoordinator.activeInvocationCount(): widened from
package-private to public so the integration tests (in the handler
package) can assert slot release.
…source
The worker depends on `azure-functions-java-core-library:1.4.0-SNAPSHOT`
during development of cross-repo changes, but ADO CI agents only resolve
from Maven Central + OSS Sonatype snapshots. There was no way to validate
a worker PR against a fork/branch of azure-functions-java-additions
without first publishing the snapshot.
This adds three opt-in pipeline parameters to `public-build.yml`:
- buildAdditionsFromSource (bool, default: false)
- additionsRepoUrl (string, default: official Azure repo)
- additionsBranch (string, default: dev)
When the toggle is true, an "Install azure-functions-java-additions
from source" step runs before the worker's mvn build in every job
(Build, TestWindows, TestLinux, TestDocker). The step delegates to
the existing `installAdditionsLocally.ps1`, which is parameterized to
accept a repo URL + branch and made idempotent for CI re-runs.
When the toggle is false (the default for normal PRs against dev) the
step is excluded at template-expansion time via `${{ if eq(...) }}`,
so there is zero impact on existing builds.
To use: queue the PR pipeline manually from the ADO UI, check
"Build azure-functions-java-additions from source", and optionally
override the repo URL/branch to point at a fork (e.g.
`https://github.com/ahmedmuhsin/azure-functions-java-additions.git`
on `feat/http-response-bodystream`).
GitHub-triggered ADO PR runs can't set pipeline parameters interactively, so flip the defaults to point at the fork branch that contains the matching unpublished library snapshot: buildAdditionsFromSource: true additionsRepoUrl: https://github.com/ahmedmuhsin/azure-functions-java-additions.git additionsBranch: feat/http-response-bodystream REVERT THIS COMMIT before merge — once the companion PR (Azure/azure-functions-java-additions#55) lands and a matching `azure-functions-java-core-library` version is published to Maven Central, the defaults should return to: buildAdditionsFromSource: false additionsRepoUrl: https://github.com/Azure/azure-functions-java-additions.git additionsBranch: dev
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds an embedded HTTP proxy server to the Java worker so HTTP-triggered functions can stream both request and response bodies, instead of being forced through the buffered gRPC
InvocationRequest/InvocationResponsepath.Fixes #781.
Background
Today every HTTP-triggered Java function buffers its entire request body into a single gRPC message before the worker sees it, and buffers its entire response body before the host sends it back. This makes large uploads/downloads and SSE / chunked streaming impossible from Java.
The .NET isolated worker solved this by having the host hand the worker a forwarded HTTP request via a side channel. This PR brings the same capability to the Java worker.
Design
The worker now advertises an
HttpUricapability at startup pointing at an embeddedHttpProxyServer(JDKcom.sun.net.httpserver.HttpServer, ephemeral port, loopback only). When the host detects this capability, it forwards HTTP-trigger requests to that URI in addition to the regular gRPCInvocationRequest. The proxy correlates the HTTP request with the matching gRPC invocation via thex-ms-invocation-idheader and anHttpInvocationCoordinatorslot.Once correlated:
HttpRequestMessage<byte[]>/String): unchanged behavior — body is read from the proxy exchange and placed into the gRPCTypedData, response is written back through the exchange.HttpRequestMessage<InputStream>): the body bridge is skipped; the liveHttpExchangeis stashed in aThreadLocalonRpcHttpRequestDataSourceso the binding can hand the user anInputStreamreading directly from the socket.HttpResponseMessage.Builder.bodyStream(InputStream)/bodyStream(IOConsumer<OutputStream>)): the broker returns anHttpInvocationOutcomethat the proxy handler unwraps and pipes directly to the exchangeOutputStreamwith chunkedTransfer-Encoding.What changed
Worker (this PR)
http/HttpProxyServer+HttpProxyHandler+HttpInvocationCoordinator— embedded proxy infrastructureJavaWorkerClientadvertises theHttpUricapability on startupInvocationRequestHandler.executeProxiedHttp— detects streaming-input signatures viaJavaFunctionBroker.methodHasStreamingHttpBody, installs/clears theThreadLocal<HttpExchange>around the broker call, and skips the body bridgebinding/RpcHttpRequestDataSource— addssetCurrentExchange/currentExchange+ a streaming branch inHTTP_DATA_OPERATIONSforHttpRequestMessage<InputStream>broker/JavaFunctionBroker.HttpInvocationOutcome— DTO carrying both the gRPC return value and the raw streaming-body callbackHttpProxyEndToEndTest(7 tests) exercising buffered/streaming combinations against a realJavaWorkerClient+ proxy serverLibrary (companion PR)
Streaming output requires new API on
HttpResponseMessage.Builder(bodyStream(InputStream)andbodyStream(IOConsumer<OutputStream>)). That sits in azure-functions-java-additions and ships as1.4.0-SNAPSHOT. Companion PR: Azure/azure-functions-java-additions#55.Testing
mvn test)HttpProxyEndToEndTestcovers:InputStreamresponse (assertsTransfer-Encoding: chunked)IOConsumer<OutputStream>response (SSE-style)InputStreamrequest + buffered response (asserts body bridge is skipped and ThreadLocal is cleared)HttpInvocationCoordinatorslot release after successBackward compatibility
HttpRequestMessage<byte[]>/Stringfunctions are unchanged — the proxy detects buffered signatures via reflection and feeds the body through the existing gRPC pipeline.HttpUricapability, the worker falls back to the legacy gRPC-only path (the proxy server just sits idle on loopback).Related