completion() {
+ return completion;
+ }
+}
diff --git a/src/main/java/com/microsoft/azure/functions/worker/http/HttpProxyHandler.java b/src/main/java/com/microsoft/azure/functions/worker/http/HttpProxyHandler.java
new file mode 100644
index 0000000..2b74841
--- /dev/null
+++ b/src/main/java/com/microsoft/azure/functions/worker/http/HttpProxyHandler.java
@@ -0,0 +1,101 @@
+package com.microsoft.azure.functions.worker.http;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.microsoft.azure.functions.worker.WorkerLogManager;
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+
+/**
+ * Handler attached to the worker's embedded HTTP proxy server.
+ *
+ * Receives HTTP requests forwarded by the Functions host (via the HttpUri
+ * capability) and parks them on the {@link HttpInvocationCoordinator} until
+ * the gRPC dispatcher picks them up. The actual invocation runs on the gRPC
+ * dispatch thread, which reads the request body and writes the response back
+ * to the same {@link HttpExchange}. This handler simply:
+ *
+ * - Extracts {@code x-ms-invocation-id} from the request headers.
+ * - Registers the HTTP arrival with the coordinator.
+ * - Blocks on the slot's {@code completion} future so the exchange stays
+ * open until the gRPC side finishes writing the response.
+ * - Returns from {@code handle()}, letting the JDK HttpServer close the
+ * exchange.
+ *
+ *
+ * Missing header or unexpected failures are converted into appropriate HTTP
+ * error responses so the host always gets a closed connection.
+ */
+public final class HttpProxyHandler implements HttpHandler {
+ /** Header set by {@code DefaultHttpProxyService} on the host side. */
+ public static final String INVOCATION_ID_HEADER = "x-ms-invocation-id";
+
+ private static final Logger LOGGER = WorkerLogManager.getSystemLogger();
+
+ private final HttpInvocationCoordinator coordinator;
+
+ public HttpProxyHandler(HttpInvocationCoordinator coordinator) {
+ this.coordinator = Objects.requireNonNull(coordinator, "coordinator");
+ }
+
+ @Override
+ public void handle(HttpExchange exchange) throws IOException {
+ String invocationId = exchange.getRequestHeaders().getFirst(INVOCATION_ID_HEADER);
+ if (invocationId == null || invocationId.isEmpty()) {
+ LOGGER.warning("HTTP proxy request missing " + INVOCATION_ID_HEADER + " header");
+ try {
+ HttpBodyBridge.writeErrorResponse(exchange, 400,
+ "Missing required header: " + INVOCATION_ID_HEADER);
+ } finally {
+ exchange.close();
+ }
+ return;
+ }
+
+ HttpInvocationSlot slot;
+ try {
+ slot = coordinator.registerHttpArrival(invocationId, exchange);
+ } catch (IllegalStateException ex) {
+ LOGGER.log(Level.WARNING, "Duplicate HTTP arrival for invocation " + invocationId, ex);
+ try {
+ HttpBodyBridge.writeErrorResponse(exchange, 409,
+ "Duplicate HTTP arrival for invocation " + invocationId);
+ } finally {
+ exchange.close();
+ }
+ return;
+ }
+
+ try {
+ // Block until the gRPC dispatcher signals invocation completion.
+ // The dispatcher is responsible for writing the response to this
+ // exchange; we simply hold the connection open in the meantime.
+ slot.completion().get();
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ tryWriteError(exchange, 503, "Worker interrupted while waiting for invocation");
+ } catch (CancellationException ex) {
+ // Coordinator cancelled the futures via releaseInvocation();
+ // the gRPC side has already written (or chosen not to write) the response.
+ } catch (ExecutionException ex) {
+ Throwable cause = ex.getCause() != null ? ex.getCause() : ex;
+ LOGGER.log(Level.WARNING, "Invocation " + invocationId + " failed before responding", cause);
+ tryWriteError(exchange, 500, "Invocation failed: " + cause.getMessage());
+ } finally {
+ exchange.close();
+ }
+ }
+
+ private static void tryWriteError(HttpExchange exchange, int status, String message) {
+ try {
+ HttpBodyBridge.writeErrorResponse(exchange, status, message);
+ } catch (IOException ioe) {
+ LOGGER.log(Level.FINE, "Unable to write error response (response likely already started)", ioe);
+ }
+ }
+}
diff --git a/src/main/java/com/microsoft/azure/functions/worker/http/HttpProxyServer.java b/src/main/java/com/microsoft/azure/functions/worker/http/HttpProxyServer.java
new file mode 100644
index 0000000..c1971bb
--- /dev/null
+++ b/src/main/java/com/microsoft/azure/functions/worker/http/HttpProxyServer.java
@@ -0,0 +1,126 @@
+package com.microsoft.azure.functions.worker.http;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+
+import com.microsoft.azure.functions.worker.WorkerLogManager;
+import com.sun.net.httpserver.HttpHandler;
+import com.sun.net.httpserver.HttpServer;
+
+/**
+ * Embedded HTTP proxy server used to receive HTTP-triggered invocations
+ * directly from the Functions host (HttpUri capability).
+ *
+ * Backed by {@link com.sun.net.httpserver.HttpServer}, a JDK built-in
+ * since Java 6, so the worker takes on no new runtime dependencies.
+ *
+ * The server binds to the loopback address on an ephemeral port and is
+ * started by {@link #start(HttpHandler)} with a single root handler.
+ * Worker threads come from a cached executor that mirrors the gRPC dispatch
+ * pool: unbounded growth, named for diagnostics, 15 s drain on shutdown.
+ * Capping concurrency is left to the platform, matching the Go, Python, and
+ * .NET isolated workers.
+ */
+public final class HttpProxyServer implements AutoCloseable {
+ private static final long EXECUTOR_SHUTDOWN_SECONDS = 15L;
+ private static final long SERVER_STOP_SECONDS = 5L;
+
+ private final ProxyConfig config;
+ private final AtomicBoolean started = new AtomicBoolean(false);
+
+ private HttpServer server;
+ private ExecutorService executor;
+ private String boundUri;
+
+ public HttpProxyServer(ProxyConfig config) {
+ this.config = Objects.requireNonNull(config, "config");
+ }
+
+ /**
+ * Binds the server, attaches {@code rootHandler} to {@code "/"}, and starts
+ * serving requests. Returns the absolute {@code http://host:port} URI that
+ * should be advertised to the Functions host via the {@code HttpUri}
+ * capability.
+ *
+ * @throws IllegalStateException if start has already been called
+ * @throws IOException if the server cannot bind
+ */
+ public synchronized String start(HttpHandler rootHandler) throws IOException {
+ Objects.requireNonNull(rootHandler, "rootHandler");
+ if (!started.compareAndSet(false, true)) {
+ throw new IllegalStateException("HttpProxyServer already started");
+ }
+ InetSocketAddress bindAddress = new InetSocketAddress(
+ config.getBindAddress(), config.getBindPort());
+ // Backlog 0 → JDK default.
+ this.server = HttpServer.create(bindAddress, 0);
+ this.executor = Executors.newCachedThreadPool(new ProxyThreadFactory());
+ this.server.setExecutor(this.executor);
+ this.server.createContext("/", rootHandler);
+ this.server.start();
+ InetSocketAddress actual = this.server.getAddress();
+ this.boundUri = "http://" + actual.getHostString() + ":" + actual.getPort();
+ WorkerLogManager.getSystemLogger().log(Level.INFO,
+ "HTTP proxy server bound to " + boundUri);
+ return boundUri;
+ }
+
+ /**
+ * Returns the URI the server is listening on, or {@code null} if the
+ * server has not been started.
+ */
+ public String getBoundUri() {
+ return boundUri;
+ }
+
+ @Override
+ public synchronized void close() {
+ if (!started.compareAndSet(true, false)) {
+ return;
+ }
+ if (server != null) {
+ try {
+ // Allow in-flight requests up to SERVER_STOP_SECONDS to drain.
+ server.stop((int) SERVER_STOP_SECONDS);
+ } catch (RuntimeException ex) {
+ WorkerLogManager.getSystemLogger().log(Level.WARNING,
+ "Error stopping HTTP proxy server", ex);
+ }
+ server = null;
+ }
+ if (executor != null) {
+ executor.shutdown();
+ try {
+ if (!executor.awaitTermination(EXECUTOR_SHUTDOWN_SECONDS, TimeUnit.SECONDS)) {
+ executor.shutdownNow();
+ }
+ } catch (InterruptedException ex) {
+ executor.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ executor = null;
+ }
+ boundUri = null;
+ }
+
+ /**
+ * Thread factory that names worker threads for diagnostics. Daemon threads
+ * so they do not block JVM shutdown if the server is not explicitly closed.
+ */
+ private static final class ProxyThreadFactory implements java.util.concurrent.ThreadFactory {
+ private final java.util.concurrent.atomic.AtomicInteger counter = new java.util.concurrent.atomic.AtomicInteger();
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r, "functions-http-proxy-" + counter.incrementAndGet());
+ t.setDaemon(true);
+ return t;
+ }
+ }
+}
diff --git a/src/main/java/com/microsoft/azure/functions/worker/http/ProxyConfig.java b/src/main/java/com/microsoft/azure/functions/worker/http/ProxyConfig.java
new file mode 100644
index 0000000..78f31d6
--- /dev/null
+++ b/src/main/java/com/microsoft/azure/functions/worker/http/ProxyConfig.java
@@ -0,0 +1,43 @@
+package com.microsoft.azure.functions.worker.http;
+
+import java.util.Objects;
+
+/**
+ * Configuration for the embedded HTTP proxy server used to receive HTTP-triggered
+ * invocations directly from the Functions host (HttpUri capability).
+ *
+ * The configuration deliberately does not impose request body size limits or
+ * per-request timeouts. The Functions front-end (nginx) enforces an upstream
+ * ceiling, and per-worker overload is managed by the platform — matching the
+ * behavior of the Go, Python, and .NET isolated workers.
+ */
+public final class ProxyConfig {
+ /** Loopback bind address. Other workers also bind to 127.0.0.1 only. */
+ public static final String DEFAULT_BIND_ADDRESS = "127.0.0.1";
+
+ /** Ephemeral port. The OS picks an unused port at bind time. */
+ public static final int DEFAULT_BIND_PORT = 0;
+
+ private final String bindAddress;
+ private final int bindPort;
+
+ public ProxyConfig(String bindAddress, int bindPort) {
+ this.bindAddress = Objects.requireNonNull(bindAddress, "bindAddress");
+ if (bindPort < 0 || bindPort > 65535) {
+ throw new IllegalArgumentException("bindPort out of range: " + bindPort);
+ }
+ this.bindPort = bindPort;
+ }
+
+ public static ProxyConfig defaults() {
+ return new ProxyConfig(DEFAULT_BIND_ADDRESS, DEFAULT_BIND_PORT);
+ }
+
+ public String getBindAddress() {
+ return bindAddress;
+ }
+
+ public int getBindPort() {
+ return bindPort;
+ }
+}
diff --git a/src/test/java/com/microsoft/azure/functions/worker/binding/BindingDataStoreTest.java b/src/test/java/com/microsoft/azure/functions/worker/binding/BindingDataStoreTest.java
new file mode 100644
index 0000000..305f9da
--- /dev/null
+++ b/src/test/java/com/microsoft/azure/functions/worker/binding/BindingDataStoreTest.java
@@ -0,0 +1,96 @@
+package com.microsoft.azure.functions.worker.binding;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.UUID;
+
+import com.microsoft.azure.functions.HttpResponseMessage;
+import com.microsoft.azure.functions.HttpResponseMessage.Builder;
+import com.microsoft.azure.functions.HttpResponseMessage.IOConsumer;
+
+import org.junit.jupiter.api.Test;
+
+public class BindingDataStoreTest {
+
+ @Test
+ public void getHttpResponseRawBodyReturnsNullWhenNoTargetsPromoted() {
+ BindingDataStore store = new BindingDataStore();
+ assertNull(store.getHttpResponseRawBody());
+ }
+
+ @Test
+ public void getHttpResponseRawBodyReturnsNullWhenPromotedUuidHasNoTargets() {
+ BindingDataStore store = new BindingDataStore();
+ // Promote a UUID that was never populated.
+ store.promoteDataTargets(UUID.randomUUID());
+ assertNull(store.getHttpResponseRawBody());
+ }
+
+ @Test
+ public void getHttpResponseRawBodyReturnsNullWhenHttpTargetHasNoBody() {
+ BindingDataStore store = registerHttpReturnTarget(builder -> { /* no body */ });
+ assertNull(store.getHttpResponseRawBody());
+ }
+
+ @Test
+ public void getHttpResponseRawBodyReturnsInputStreamBody() {
+ InputStream stream = new ByteArrayInputStream(new byte[]{1, 2, 3});
+ BindingDataStore store = registerHttpReturnTarget(builder -> builder.body(stream));
+ Object raw = store.getHttpResponseRawBody();
+ assertSame(stream, raw);
+ assertTrue(raw instanceof InputStream);
+ }
+
+ @Test
+ public void getHttpResponseRawBodyReturnsIOConsumerBody() {
+ IOConsumer writer = out -> out.write(0);
+ BindingDataStore store = registerHttpReturnTarget(builder -> builder.body(writer));
+ Object raw = store.getHttpResponseRawBody();
+ assertSame(writer, raw);
+ assertTrue(raw instanceof IOConsumer);
+ }
+
+ @Test
+ public void getHttpResponseRawBodyReturnsStringBody() {
+ BindingDataStore store = registerHttpReturnTarget(builder -> builder.body("hello"));
+ assertEquals("hello", store.getHttpResponseRawBody());
+ }
+
+ /**
+ * Helper that sets up a store with a single promoted HTTP output target on
+ * the {@code $return} binding, then invokes {@code configure} on the
+ * underlying {@link HttpResponseMessage.Builder}.
+ */
+ @FunctionalInterface
+ private interface BuilderConfigurator {
+ void apply(Builder builder);
+ }
+
+ private static BindingDataStore registerHttpReturnTarget(BuilderConfigurator configure) {
+ BindingDataStore store = new BindingDataStore();
+ // getOrAddDataTarget internally consults `definitions` via isDefinitionOutput
+ // before short-circuiting on ignoreDefinition; install an empty map so that
+ // lookup returns Optional.empty() instead of throwing NPE.
+ store.setBindingDefinitions(new HashMap<>());
+ UUID outputId = UUID.randomUUID();
+ // ignoreDefinition=true bypasses the binding-definition check, which is
+ // adequate for unit-testing the data-store accessor in isolation.
+ BindingData data = store.getOrAddDataTarget(
+ outputId, BindingDataStore.RETURN_NAME, HttpResponseMessage.class, true).orElseThrow(
+ () -> new AssertionError("Expected getOrAddDataTarget to create an HTTP target"));
+ // RpcHttpDataTarget sets its own DataTarget value to `this` in its
+ // constructor, so the BindingData value is the Builder itself.
+ Object value = data.getValue();
+ assertTrue(value instanceof Builder,
+ "Expected RpcHttpDataTarget value to be an HttpResponseMessage.Builder, got " + value);
+ configure.apply((Builder) value);
+ store.promoteDataTargets(outputId);
+ return store;
+ }
+}
diff --git a/src/test/java/com/microsoft/azure/functions/worker/binding/RpcHttpDataTargetTest.java b/src/test/java/com/microsoft/azure/functions/worker/binding/RpcHttpDataTargetTest.java
new file mode 100644
index 0000000..b58b95f
--- /dev/null
+++ b/src/test/java/com/microsoft/azure/functions/worker/binding/RpcHttpDataTargetTest.java
@@ -0,0 +1,91 @@
+package com.microsoft.azure.functions.worker.binding;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+import com.microsoft.azure.functions.HttpResponseMessage.IOConsumer;
+import com.microsoft.azure.functions.HttpStatus;
+import com.microsoft.azure.functions.rpc.messages.RpcHttp;
+import com.microsoft.azure.functions.rpc.messages.TypedData;
+
+import org.junit.jupiter.api.Test;
+
+public class RpcHttpDataTargetTest {
+
+ @Test
+ public void isStreamingBodyDetectsInputStream() {
+ InputStream stream = new ByteArrayInputStream(new byte[]{1, 2, 3});
+ assertTrue(RpcHttpDataTarget.isStreamingBody(stream));
+ }
+
+ @Test
+ public void isStreamingBodyDetectsIOConsumer() {
+ IOConsumer writer = out -> out.write(1);
+ assertTrue(RpcHttpDataTarget.isStreamingBody(writer));
+ }
+
+ @Test
+ public void isStreamingBodyRejectsNull() {
+ assertFalse(RpcHttpDataTarget.isStreamingBody(null));
+ }
+
+ @Test
+ public void isStreamingBodyRejectsString() {
+ assertFalse(RpcHttpDataTarget.isStreamingBody("hello"));
+ }
+
+ @Test
+ public void isStreamingBodyRejectsByteArray() {
+ assertFalse(RpcHttpDataTarget.isStreamingBody(new byte[]{1, 2, 3}));
+ }
+
+ @Test
+ public void toRpcHttpDataSkipsBodyForInputStream() throws Exception {
+ RpcHttpDataTarget target = new RpcHttpDataTarget();
+ target.status(HttpStatus.OK)
+ .header("Content-Type", "text/event-stream")
+ .body(new ByteArrayInputStream("ignored".getBytes(StandardCharsets.UTF_8)));
+
+ TypedData.Builder builder = RpcHttpDataTarget.toRpcHttpData(target);
+ RpcHttp http = builder.getHttp();
+
+ assertEquals("200", http.getStatusCode());
+ assertEquals("text/event-stream", http.getHeadersOrDefault("Content-Type", null));
+ // Streaming bodies are NOT serialized into the protobuf envelope; the field is unset.
+ assertFalse(http.hasBody(), "InputStream body should not be serialized into RpcHttp.body");
+ }
+
+ @Test
+ public void toRpcHttpDataSkipsBodyForIOConsumer() throws Exception {
+ RpcHttpDataTarget target = new RpcHttpDataTarget();
+ target.status(HttpStatus.ACCEPTED)
+ .header("X-Trace", "abc")
+ .body((IOConsumer) (out -> out.write(0)));
+
+ TypedData.Builder builder = RpcHttpDataTarget.toRpcHttpData(target);
+ RpcHttp http = builder.getHttp();
+
+ assertEquals("202", http.getStatusCode());
+ assertEquals("abc", http.getHeadersOrDefault("X-Trace", null));
+ assertFalse(http.hasBody(), "IOConsumer body should not be serialized into RpcHttp.body");
+ }
+
+ @Test
+ public void toRpcHttpDataSerializesStringBody() throws Exception {
+ RpcHttpDataTarget target = new RpcHttpDataTarget();
+ target.status(HttpStatus.OK)
+ .body("hello");
+
+ TypedData.Builder builder = RpcHttpDataTarget.toRpcHttpData(target);
+ RpcHttp http = builder.getHttp();
+
+ assertEquals("200", http.getStatusCode());
+ assertTrue(http.hasBody(), "Non-streaming body should be serialized");
+ assertEquals("hello", http.getBody().getString());
+ }
+}
diff --git a/src/test/java/com/microsoft/azure/functions/worker/binding/tests/RpcHttpRequestDataSourceTest.java b/src/test/java/com/microsoft/azure/functions/worker/binding/tests/RpcHttpRequestDataSourceTest.java
index 813f4fa..1c655ce 100644
--- a/src/test/java/com/microsoft/azure/functions/worker/binding/tests/RpcHttpRequestDataSourceTest.java
+++ b/src/test/java/com/microsoft/azure/functions/worker/binding/tests/RpcHttpRequestDataSourceTest.java
@@ -1,6 +1,8 @@
package com.microsoft.azure.functions.worker.binding.tests;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
import java.lang.invoke.WrongMethodTypeException;
import java.lang.reflect.Method;
import java.lang.reflect.Parameter;
@@ -10,9 +12,14 @@
import com.microsoft.azure.functions.rpc.messages.RpcHttp;
import com.microsoft.azure.functions.rpc.messages.TypedData;
import com.microsoft.azure.functions.worker.binding.*;
+import com.sun.net.httpserver.HttpExchange;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class RpcHttpRequestDataSourceTest {
@@ -26,6 +33,9 @@ public void HttpRequestIntBody(HttpRequestMessage request) {
public void HttpRequestBinaryBody(HttpRequestMessage request) {
}
+ public void HttpRequestStreamBody(HttpRequestMessage request) {
+ }
+
public static RpcHttp getTestRpcHttp(Object inputBody) throws Exception {
TypedData.Builder dataBuilder = TypedData.newBuilder();
RpcHttp.Builder httpBuilder = RpcHttp.newBuilder()
@@ -108,4 +118,76 @@ private Method getFunctionMethod(String methodName) {
return functionMethod;
}
+ @Test
+ public void rpcHttpDataSource_To_HttpRequestMessage_StreamBody_returnsLiveExchangeStream() throws Exception {
+ Method method = getFunctionMethod("HttpRequestStreamBody");
+ Parameter[] parameters = method.getParameters();
+ String sourceKey = "testRpcHttp";
+
+ byte[] payload = "live-stream-body".getBytes();
+ InputStream liveStream = new ByteArrayInputStream(payload);
+ HttpExchange exchange = mock(HttpExchange.class);
+ when(exchange.getRequestBody()).thenReturn(liveStream);
+
+ RpcHttp input = getTestRpcHttp(new byte[0]);
+ RpcHttpRequestDataSource.setCurrentExchange(exchange);
+ try {
+ RpcHttpRequestDataSource rpcHttp = new RpcHttpRequestDataSource(sourceKey, input);
+ Optional bindingData = rpcHttp.computeByName(sourceKey,
+ parameters[0].getParameterizedType());
+ BindingData arg = bindingData.orElseThrow(WrongMethodTypeException::new);
+ HttpRequestMessage> requestMsg = (HttpRequestMessage>) arg.getValue();
+ assertNotNull(requestMsg.getBody());
+ assertSame(liveStream, requestMsg.getBody(),
+ "Streaming-input path must hand back the exchange's live request body unmodified");
+ } finally {
+ RpcHttpRequestDataSource.setCurrentExchange(null);
+ }
+ }
+
+ @Test
+ public void rpcHttpDataSource_To_HttpRequestMessage_StreamBody_withNoCapturedExchange_fallsThrough()
+ throws Exception {
+ Method method = getFunctionMethod("HttpRequestStreamBody");
+ Parameter[] parameters = method.getParameters();
+ String sourceKey = "testRpcHttp";
+
+ // No exchange installed on the ThreadLocal: the streaming-input branch
+ // must not trigger and we should fall through to the existing
+ // bodyDataSource path. With a byte[] body and InputStream target type, no
+ // converter is registered for InputStream, so the existing pipeline raises
+ // a ClassCastException -- the same behavior as before this feature was
+ // added. The point of this test is to confirm we have NOT silently
+ // swallowed the failure.
+ RpcHttp input = getTestRpcHttp("ignored-body".getBytes());
+ RpcHttpRequestDataSource.setCurrentExchange(null);
+ RpcHttpRequestDataSource rpcHttp = new RpcHttpRequestDataSource(sourceKey, input);
+ org.junit.jupiter.api.Assertions.assertThrows(
+ ClassCastException.class,
+ () -> rpcHttp.computeByName(sourceKey, parameters[0].getParameterizedType()),
+ "Without a captured exchange, InputStream resolution must fall through and fail as it did before the feature");
+ }
+
+ @Test
+ public void setCurrentExchange_null_clearsThreadLocal() throws Exception {
+ HttpExchange exchange = mock(HttpExchange.class);
+ RpcHttpRequestDataSource.setCurrentExchange(exchange);
+ // Cleared before construction: the data source must NOT capture the prior
+ // exchange.
+ RpcHttpRequestDataSource.setCurrentExchange(null);
+
+ Method method = getFunctionMethod("HttpRequestStreamBody");
+ Parameter[] parameters = method.getParameters();
+ RpcHttp input = getTestRpcHttp(new byte[0]);
+ RpcHttpRequestDataSource rpcHttp = new RpcHttpRequestDataSource("k", input);
+ // Same expectation as the no-exchange-ever case: streaming branch doesn't
+ // fire, fallthrough fails. If we had leaked the previously-installed
+ // exchange, mockito would have returned a real (empty default) stream and
+ // the call would succeed instead of throwing.
+ org.junit.jupiter.api.Assertions.assertThrows(
+ ClassCastException.class,
+ () -> rpcHttp.computeByName("k", parameters[0].getParameterizedType()),
+ "After clearing the ThreadLocal, the prior exchange must not leak into newly-constructed data sources");
+ }
+
}
diff --git a/src/test/java/com/microsoft/azure/functions/worker/broker/JavaFunctionBrokerStreamingTest.java b/src/test/java/com/microsoft/azure/functions/worker/broker/JavaFunctionBrokerStreamingTest.java
new file mode 100644
index 0000000..090e2d8
--- /dev/null
+++ b/src/test/java/com/microsoft/azure/functions/worker/broker/JavaFunctionBrokerStreamingTest.java
@@ -0,0 +1,119 @@
+package com.microsoft.azure.functions.worker.broker;
+
+import com.microsoft.azure.functions.HttpRequestMessage;
+import com.microsoft.azure.functions.worker.reflect.DefaultClassLoaderProvider;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.junit.jupiter.api.Test;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for {@link JavaFunctionBroker#methodHasStreamingHttpBody(String)}.
+ * The HTTP proxy dispatch path uses this signature pre-check to decide whether
+ * to skip the buffered request body read and expose the live HTTP exchange
+ * input stream to user functions that declare
+ * {@code HttpRequestMessage}.
+ */
+public class JavaFunctionBrokerStreamingTest {
+
+ // Test signatures spanning the supported and unsupported shapes.
+ public void streamingFn(HttpRequestMessage req) {}
+ public void streamingFnSubtype(HttpRequestMessage req) {}
+ public void stringFn(HttpRequestMessage req) {}
+ public void rawFn(@SuppressWarnings("rawtypes") HttpRequestMessage req) {}
+ public void noHttpFn(String s, int i) {}
+
+ @Test
+ public void methodHasStreamingHttpBody_inputStreamParam_returnsTrue() throws Exception {
+ JavaFunctionBroker broker = new JavaFunctionBroker(new DefaultClassLoaderProvider());
+ registerMethod(broker, "id-stream", "streamingFn");
+ assertTrue(broker.methodHasStreamingHttpBody("id-stream"));
+ }
+
+ @Test
+ public void methodHasStreamingHttpBody_inputStreamSubtypeParam_returnsTrue() throws Exception {
+ JavaFunctionBroker broker = new JavaFunctionBroker(new DefaultClassLoaderProvider());
+ registerMethod(broker, "id-substream", "streamingFnSubtype");
+ assertTrue(broker.methodHasStreamingHttpBody("id-substream"));
+ }
+
+ @Test
+ public void methodHasStreamingHttpBody_stringParam_returnsFalse() throws Exception {
+ JavaFunctionBroker broker = new JavaFunctionBroker(new DefaultClassLoaderProvider());
+ registerMethod(broker, "id-string", "stringFn");
+ assertFalse(broker.methodHasStreamingHttpBody("id-string"));
+ }
+
+ @Test
+ public void methodHasStreamingHttpBody_rawHttpRequestMessage_returnsFalse() throws Exception {
+ JavaFunctionBroker broker = new JavaFunctionBroker(new DefaultClassLoaderProvider());
+ registerMethod(broker, "id-raw", "rawFn");
+ assertFalse(broker.methodHasStreamingHttpBody("id-raw"));
+ }
+
+ @Test
+ public void methodHasStreamingHttpBody_noHttpParam_returnsFalse() throws Exception {
+ JavaFunctionBroker broker = new JavaFunctionBroker(new DefaultClassLoaderProvider());
+ registerMethod(broker, "id-nohttp", "noHttpFn");
+ assertFalse(broker.methodHasStreamingHttpBody("id-nohttp"));
+ }
+
+ @Test
+ public void methodHasStreamingHttpBody_unknownId_returnsFalse() throws Exception {
+ JavaFunctionBroker broker = new JavaFunctionBroker(new DefaultClassLoaderProvider());
+ assertFalse(broker.methodHasStreamingHttpBody("never-registered"));
+ }
+
+ /**
+ * Resolves the named test method on this class, builds a real
+ * {@link MethodBindInfo} for it, wraps a mocked {@link FunctionDefinition}
+ * around it, and reflectively inserts the entry into the broker's private
+ * {@code methods} map so the public {@code methodHasStreamingHttpBody}
+ * lookup can find it without going through the full descriptor /
+ * classloader pipeline.
+ */
+ private void registerMethod(JavaFunctionBroker broker, String id, String methodName) throws Exception {
+ Method method = null;
+ for (Method m : JavaFunctionBrokerStreamingTest.class.getMethods()) {
+ if (m.getName().equals(methodName)) {
+ method = m;
+ break;
+ }
+ }
+ if (method == null) {
+ throw new IllegalArgumentException("Test method not found: " + methodName);
+ }
+ MethodBindInfo mbi = new MethodBindInfo(method);
+
+ FunctionDefinition functionDefinition = mock(FunctionDefinition.class);
+ when(functionDefinition.getCandidate()).thenReturn(mbi);
+
+ @SuppressWarnings("unchecked")
+ Map> methods =
+ (Map>) getField(broker, "methods");
+ methods.put(id, ImmutablePair.of(methodName, functionDefinition));
+
+ // Touch a live stream so the unused-import / classloading paths are
+ // exercised; protects the assertion that the broker logic depends only
+ // on the param type, not on any runtime body.
+ try (InputStream ignored = new ByteArrayInputStream(new byte[0])) {
+ // no-op
+ }
+ }
+
+ private static Object getField(Object target, String name) throws Exception {
+ Field f = target.getClass().getDeclaredField(name);
+ f.setAccessible(true);
+ return f.get(target);
+ }
+}
diff --git a/src/test/java/com/microsoft/azure/functions/worker/handler/HttpProxyEndToEndTest.java b/src/test/java/com/microsoft/azure/functions/worker/handler/HttpProxyEndToEndTest.java
new file mode 100644
index 0000000..aec2368
--- /dev/null
+++ b/src/test/java/com/microsoft/azure/functions/worker/handler/HttpProxyEndToEndTest.java
@@ -0,0 +1,465 @@
+package com.microsoft.azure.functions.worker.handler;
+
+import com.google.protobuf.ByteString;
+import com.microsoft.azure.functions.HttpResponseMessage;
+import com.microsoft.azure.functions.HttpResponseMessage.IOConsumer;
+import com.microsoft.azure.functions.rpc.messages.InvocationRequest;
+import com.microsoft.azure.functions.rpc.messages.InvocationResponse;
+import com.microsoft.azure.functions.rpc.messages.ParameterBinding;
+import com.microsoft.azure.functions.rpc.messages.RpcHttp;
+import com.microsoft.azure.functions.rpc.messages.StreamingMessage;
+import com.microsoft.azure.functions.rpc.messages.TypedData;
+import com.microsoft.azure.functions.worker.WorkerLogManager;
+import com.microsoft.azure.functions.worker.binding.RpcHttpRequestDataSource;
+import com.microsoft.azure.functions.worker.broker.JavaFunctionBroker;
+import com.microsoft.azure.functions.worker.broker.JavaFunctionBroker.HttpInvocationOutcome;
+import com.microsoft.azure.functions.worker.http.HttpInvocationCoordinator;
+import com.microsoft.azure.functions.worker.http.HttpProxyHandler;
+import com.microsoft.azure.functions.worker.http.HttpProxyServer;
+import com.microsoft.azure.functions.worker.http.ProxyConfig;
+import com.sun.net.httpserver.HttpExchange;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Answers;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Logger;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * End-to-end integration tests for the HTTP proxy dispatch path. These wire
+ * the real {@link HttpProxyServer}, {@link HttpProxyHandler}, and
+ * {@link HttpInvocationCoordinator} together with a real
+ * {@link InvocationRequestHandler} backed by a mocked {@link JavaFunctionBroker}.
+ *
+ * Each test sends an actual HTTP request through the embedded proxy on one
+ * thread while simulating the gRPC {@code InvocationRequest} arrival on the
+ * test thread, exercising the rendezvous and validating the full pipeline:
+ * HTTP arrival, body propagation (buffered or streamed), broker invocation,
+ * and response writeback (buffered or streamed).
+ *
+ * The mocked broker captures the {@link InvocationRequest} it sees and the
+ * thread-local exchange that was active during {@code invokeMethodForHttpProxy}
+ * so the tests can assert that the streaming pre-check and the body-read skip
+ * behaved correctly.
+ */
+public class HttpProxyEndToEndTest {
+
+ private static final String INVOCATION_ID = "e2e-invocation-1";
+ private static final String FUNCTION_ID = "e2e-function-1";
+ private static final long HTTP_AWAIT_MS = 5_000;
+
+ private HttpInvocationCoordinator coordinator;
+ private HttpProxyServer server;
+ private String proxyUri;
+ private JavaFunctionBroker brokerMock;
+ private InvocationRequestHandler handler;
+ private MockedStatic workerLogManagerMock;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ // WorkerLogManager.getInvocationLogger calls addHandlers which asserts
+ // the singleton has been initialized with a JavaWorkerClient (only true
+ // when the worker is running under the host). Mock the static façade
+ // and forward everything else to the real implementation so system/host
+ // loggers continue to work for diagnostic output during tests.
+ workerLogManagerMock = Mockito.mockStatic(WorkerLogManager.class, Answers.CALLS_REAL_METHODS);
+ workerLogManagerMock.when(() -> WorkerLogManager.getInvocationLogger(anyString()))
+ .thenReturn(Logger.getAnonymousLogger());
+
+ coordinator = new HttpInvocationCoordinator();
+ server = new HttpProxyServer(ProxyConfig.defaults());
+ proxyUri = server.start(new HttpProxyHandler(coordinator));
+ brokerMock = mock(JavaFunctionBroker.class);
+ when(brokerMock.getMethodName(anyString())).thenReturn(Optional.of("TestFn"));
+ handler = new InvocationRequestHandler(brokerMock, coordinator);
+ }
+
+ @AfterEach
+ public void tearDown() {
+ if (server != null) {
+ server.close();
+ }
+ if (workerLogManagerMock != null) {
+ workerLogManagerMock.close();
+ }
+ }
+
+ // -------- 1. Buffered request, buffered response (existing path) --------
+ @Test
+ public void bufferedRequest_bufferedResponse_roundTrips() throws Exception {
+ when(brokerMock.methodHasStreamingHttpBody(anyString())).thenReturn(false);
+
+ AtomicReference seenRequest = new AtomicReference<>();
+ AtomicReference seenExchange = new AtomicReference<>();
+ when(brokerMock.invokeMethodForHttpProxy(anyString(), any(), any()))
+ .thenAnswer((InvocationOnMock inv) -> {
+ seenRequest.set(inv.getArgument(1));
+ seenExchange.set(RpcHttpRequestDataSource.currentExchange());
+ RpcHttp respHttp = RpcHttp.newBuilder()
+ .setStatusCode("200")
+ .putHeaders("Content-Type", "text/plain")
+ .setBody(TypedData.newBuilder().setString("pong").build())
+ .build();
+ TypedData returnValue = TypedData.newBuilder().setHttp(respHttp).build();
+ return new HttpInvocationOutcome(Optional.of(returnValue), null);
+ });
+
+ CompletableFuture futureResp = sendHttpRequestAsync(
+ "POST", "/api/echo", "text/plain", "ping".getBytes(StandardCharsets.UTF_8));
+
+ awaitHttpArrival();
+ runGrpcArrival();
+
+ HttpClientResult resp = futureResp.get(HTTP_AWAIT_MS, TimeUnit.MILLISECONDS);
+
+ assertEquals(200, resp.statusCode);
+ assertEquals("pong", new String(resp.body, StandardCharsets.UTF_8));
+ assertEquals("text/plain", resp.contentType);
+
+ // The buffered path must have folded the HTTP body into the RpcHttp envelope.
+ // For text/* content types the bridge stores the body as a string, not bytes
+ // (mirrors host PopulateBody behavior).
+ TypedData httpInput = seenRequest.get().getInputDataList().get(0).getData();
+ assertEquals("ping", httpInput.getHttp().getBody().getString());
+ // For non-streaming requests the per-thread exchange must NOT be set.
+ assertNull(seenExchange.get(), "Buffered-input invocations must not install a thread-local exchange");
+ }
+
+ // -------- 2. Buffered request, streaming response (commit #5 path) --------
+ @Test
+ public void bufferedRequest_streamingInputStreamResponse_streamsBackUnbuffered() throws Exception {
+ when(brokerMock.methodHasStreamingHttpBody(anyString())).thenReturn(false);
+
+ byte[] payload = repeat("stream-chunk-", 5000); // ~65KB to force multi-chunk write
+ when(brokerMock.invokeMethodForHttpProxy(anyString(), any(), any()))
+ .thenAnswer((InvocationOnMock inv) -> {
+ RpcHttp envelope = RpcHttp.newBuilder()
+ .setStatusCode("200")
+ .putHeaders("Content-Type", "application/octet-stream")
+ .build();
+ TypedData returnValue = TypedData.newBuilder().setHttp(envelope).build();
+ return new HttpInvocationOutcome(Optional.of(returnValue), new ByteArrayInputStream(payload));
+ });
+
+ CompletableFuture futureResp = sendHttpRequestAsync(
+ "GET", "/api/download", null, new byte[0]);
+
+ awaitHttpArrival();
+ runGrpcArrival();
+
+ HttpClientResult resp = futureResp.get(HTTP_AWAIT_MS, TimeUnit.MILLISECONDS);
+
+ assertEquals(200, resp.statusCode);
+ assertEquals("application/octet-stream", resp.contentType);
+ assertEquals(payload.length, resp.body.length);
+ assertTrue(java.util.Arrays.equals(payload, resp.body), "Streamed body must match payload byte-for-byte");
+ // Chunked transfer encoding (server-side) is used when the body length is
+ // unknown. HttpURLConnection transparently dechunks; we can verify
+ // either header or fall-through behavior via the chunked flag.
+ assertEquals("chunked", resp.transferEncoding,
+ "Streaming-output path must use chunked transfer-encoding (no Content-Length)");
+ }
+
+ // -------- 3. Buffered request, streaming response via IOConsumer --------
+ @Test
+ public void bufferedRequest_streamingIOConsumerResponse_streamsBack() throws Exception {
+ when(brokerMock.methodHasStreamingHttpBody(anyString())).thenReturn(false);
+
+ IOConsumer writer = out -> {
+ for (int i = 0; i < 100; i++) {
+ out.write(("event: tick\ndata: " + i + "\n\n").getBytes(StandardCharsets.UTF_8));
+ }
+ };
+ when(brokerMock.invokeMethodForHttpProxy(anyString(), any(), any()))
+ .thenAnswer((InvocationOnMock inv) -> {
+ RpcHttp envelope = RpcHttp.newBuilder()
+ .setStatusCode("200")
+ .putHeaders("Content-Type", "text/event-stream")
+ .build();
+ TypedData returnValue = TypedData.newBuilder().setHttp(envelope).build();
+ return new HttpInvocationOutcome(Optional.of(returnValue), writer);
+ });
+
+ CompletableFuture futureResp = sendHttpRequestAsync(
+ "GET", "/api/sse", null, new byte[0]);
+
+ awaitHttpArrival();
+ runGrpcArrival();
+
+ HttpClientResult resp = futureResp.get(HTTP_AWAIT_MS, TimeUnit.MILLISECONDS);
+
+ assertEquals(200, resp.statusCode);
+ assertEquals("text/event-stream", resp.contentType);
+ String body = new String(resp.body, StandardCharsets.UTF_8);
+ assertTrue(body.startsWith("event: tick\ndata: 0\n\n"));
+ assertTrue(body.endsWith("event: tick\ndata: 99\n\n"));
+ }
+
+ // -------- 4. Streaming request, buffered response (commit #6 path) --------
+ @Test
+ public void streamingRequest_bufferedResponse_skipsBodyReadAndExposesLiveStream() throws Exception {
+ when(brokerMock.methodHasStreamingHttpBody(anyString())).thenReturn(true);
+
+ byte[] uploadPayload = repeat("upload-frag-", 4096); // ~50KB
+ AtomicReference consumedByBroker = new AtomicReference<>();
+ AtomicReference seenRequest = new AtomicReference<>();
+ AtomicReference seenExchange = new AtomicReference<>();
+ when(brokerMock.invokeMethodForHttpProxy(anyString(), any(), any()))
+ .thenAnswer((InvocationOnMock inv) -> {
+ seenRequest.set(inv.getArgument(1));
+ HttpExchange exch = RpcHttpRequestDataSource.currentExchange();
+ seenExchange.set(exch);
+ // Consume the live request body to validate the streaming path actually wired through.
+ if (exch != null) {
+ byte[] consumed = readAll(exch.getRequestBody());
+ consumedByBroker.set(consumed);
+ }
+ RpcHttp respHttp = RpcHttp.newBuilder()
+ .setStatusCode("201")
+ .putHeaders("Content-Type", "text/plain")
+ .setBody(TypedData.newBuilder().setString("ok").build())
+ .build();
+ TypedData returnValue = TypedData.newBuilder().setHttp(respHttp).build();
+ return new HttpInvocationOutcome(Optional.of(returnValue), null);
+ });
+
+ CompletableFuture futureResp = sendHttpRequestAsync(
+ "POST", "/api/upload", "application/octet-stream", uploadPayload);
+
+ awaitHttpArrival();
+ runGrpcArrival();
+
+ HttpClientResult resp = futureResp.get(HTTP_AWAIT_MS, TimeUnit.MILLISECONDS);
+
+ assertEquals(201, resp.statusCode);
+ assertEquals("ok", new String(resp.body, StandardCharsets.UTF_8));
+
+ // Critical: the streaming-input path must NOT have populated the protobuf body.
+ TypedData httpInput = seenRequest.get().getInputDataList().get(0).getData();
+ assertEquals(0, httpInput.getHttp().getBody().getBytes().size(),
+ "Streaming-input request must leave the protobuf body untouched (empty)");
+ assertNotNull(seenExchange.get(), "Streaming-input invocation must install the thread-local exchange");
+ // The bytes the broker read off the live exchange must equal what the client sent.
+ assertNotNull(consumedByBroker.get());
+ assertTrue(java.util.Arrays.equals(uploadPayload, consumedByBroker.get()),
+ "Bytes read from the live exchange must equal the bytes the client sent");
+ // After the invocation completes, the thread-local must be cleared so the
+ // worker thread can be safely reused.
+ assertNull(RpcHttpRequestDataSource.currentExchange(),
+ "InvocationRequestHandler must clear the thread-local exchange in its finally block");
+ }
+
+ // -------- 5. Full streaming: streaming request + streaming response --------
+ @Test
+ public void streamingRequest_streamingResponse_endToEnd() throws Exception {
+ when(brokerMock.methodHasStreamingHttpBody(anyString())).thenReturn(true);
+
+ byte[] uploadPayload = repeat("full-stream-", 1024);
+ AtomicReference echoedBack = new AtomicReference<>();
+ when(brokerMock.invokeMethodForHttpProxy(anyString(), any(), any()))
+ .thenAnswer((InvocationOnMock inv) -> {
+ HttpExchange exch = RpcHttpRequestDataSource.currentExchange();
+ byte[] consumed = readAll(exch.getRequestBody());
+ echoedBack.set(consumed);
+ RpcHttp envelope = RpcHttp.newBuilder()
+ .setStatusCode("200")
+ .putHeaders("Content-Type", "application/octet-stream")
+ .build();
+ TypedData returnValue = TypedData.newBuilder().setHttp(envelope).build();
+ // Echo what we read back to the client as a streaming response.
+ return new HttpInvocationOutcome(Optional.of(returnValue), new ByteArrayInputStream(consumed));
+ });
+
+ CompletableFuture futureResp = sendHttpRequestAsync(
+ "POST", "/api/echo-stream", "application/octet-stream", uploadPayload);
+
+ awaitHttpArrival();
+ runGrpcArrival();
+
+ HttpClientResult resp = futureResp.get(HTTP_AWAIT_MS, TimeUnit.MILLISECONDS);
+
+ assertEquals(200, resp.statusCode);
+ assertTrue(java.util.Arrays.equals(uploadPayload, echoedBack.get()),
+ "Bytes broker read off live stream must equal client's uploaded payload");
+ assertTrue(java.util.Arrays.equals(uploadPayload, resp.body),
+ "Streaming response body must equal what the broker echoed");
+ }
+
+ // -------- 6. Coordinator slot is released after success --------
+ @Test
+ public void coordinatorSlotReleasedAfterSuccessfulInvocation() throws Exception {
+ when(brokerMock.methodHasStreamingHttpBody(anyString())).thenReturn(false);
+ when(brokerMock.invokeMethodForHttpProxy(anyString(), any(), any()))
+ .thenAnswer((InvocationOnMock inv) -> {
+ RpcHttp respHttp = RpcHttp.newBuilder()
+ .setStatusCode("204")
+ .build();
+ TypedData returnValue = TypedData.newBuilder().setHttp(respHttp).build();
+ return new HttpInvocationOutcome(Optional.of(returnValue), null);
+ });
+
+ CompletableFuture futureResp = sendHttpRequestAsync(
+ "DELETE", "/api/item", null, new byte[0]);
+
+ awaitHttpArrival();
+ assertEquals(1, coordinator.activeInvocationCount());
+ runGrpcArrival();
+
+ HttpClientResult resp = futureResp.get(HTTP_AWAIT_MS, TimeUnit.MILLISECONDS);
+ assertEquals(204, resp.statusCode);
+
+ // The slot must be released so the coordinator can serve subsequent invocations.
+ assertEquals(0, coordinator.activeInvocationCount(),
+ "Coordinator must release the slot after a successful invocation");
+ }
+
+ // -------- 7. Broker exception surfaces as a 500 on the HTTP side --------
+ @Test
+ public void brokerExceptionPropagatesAsHttp500() throws Exception {
+ when(brokerMock.methodHasStreamingHttpBody(anyString())).thenReturn(false);
+ when(brokerMock.invokeMethodForHttpProxy(anyString(), any(), any()))
+ .thenThrow(new RuntimeException("user function blew up"));
+
+ CompletableFuture futureResp = sendHttpRequestAsync(
+ "GET", "/api/boom", null, new byte[0]);
+
+ awaitHttpArrival();
+ // The gRPC-side execute throws; handle() catches it. We invoke directly so
+ // the caller (this test) doesn't need to deal with gRPC marshalling.
+ try {
+ runGrpcArrival();
+ } catch (Exception ignored) {
+ // Expected: the broker's RuntimeException propagates out of execute().
+ }
+
+ HttpClientResult resp = futureResp.get(HTTP_AWAIT_MS, TimeUnit.MILLISECONDS);
+ assertEquals(500, resp.statusCode);
+ assertTrue(new String(resp.body, StandardCharsets.UTF_8).contains("user function blew up"),
+ "500 body must surface the underlying failure message");
+ }
+
+ // -------- Helpers --------
+
+ private void awaitHttpArrival() throws InterruptedException {
+ long deadline = System.currentTimeMillis() + HTTP_AWAIT_MS;
+ while (coordinator.activeInvocationCount() == 0 && System.currentTimeMillis() < deadline) {
+ Thread.sleep(10);
+ }
+ assertEquals(1, coordinator.activeInvocationCount(),
+ "HTTP arrival should have registered with the coordinator by now");
+ }
+
+ private void runGrpcArrival() throws Exception {
+ InvocationRequest request = buildInvocationRequest();
+ InvocationResponse.Builder response = InvocationResponse.newBuilder();
+ handler.execute(request, response);
+ }
+
+ private static InvocationRequest buildInvocationRequest() {
+ // Trigger metadata only; body is empty (the host's HTTP proxy contract).
+ RpcHttp httpEnvelope = RpcHttp.newBuilder()
+ .setMethod("POST")
+ .setUrl("http://localhost/api/test")
+ .build();
+ TypedData inputData = TypedData.newBuilder().setHttp(httpEnvelope).build();
+ ParameterBinding binding = ParameterBinding.newBuilder()
+ .setName("req")
+ .setData(inputData)
+ .build();
+ return InvocationRequest.newBuilder()
+ .setInvocationId(INVOCATION_ID)
+ .setFunctionId(FUNCTION_ID)
+ .addInputData(binding)
+ .build();
+ }
+
+ private CompletableFuture sendHttpRequestAsync(
+ String method, String path, String contentType, byte[] body) {
+ return CompletableFuture.supplyAsync(() -> {
+ try {
+ HttpURLConnection conn = (HttpURLConnection) URI.create(proxyUri + path).toURL().openConnection();
+ conn.setRequestMethod(method);
+ conn.setRequestProperty(HttpProxyHandler.INVOCATION_ID_HEADER, INVOCATION_ID);
+ if (contentType != null) {
+ conn.setRequestProperty("Content-Type", contentType);
+ }
+ if (body != null && body.length > 0) {
+ conn.setDoOutput(true);
+ conn.setFixedLengthStreamingMode(body.length);
+ try (OutputStream os = conn.getOutputStream()) {
+ os.write(body);
+ }
+ }
+ conn.connect();
+ int status = conn.getResponseCode();
+ String returnedContentType = conn.getHeaderField("Content-Type");
+ String transferEncoding = conn.getHeaderField("Transfer-Encoding");
+ InputStream in = status >= 200 && status < 400 ? conn.getInputStream() : conn.getErrorStream();
+ byte[] respBody = in != null ? readAll(in) : new byte[0];
+ conn.disconnect();
+ return new HttpClientResult(status, returnedContentType, transferEncoding, respBody);
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ });
+ }
+
+ private static byte[] readAll(InputStream in) throws IOException {
+ java.io.ByteArrayOutputStream buf = new java.io.ByteArrayOutputStream();
+ byte[] chunk = new byte[4096];
+ int n;
+ while ((n = in.read(chunk)) > 0) {
+ buf.write(chunk, 0, n);
+ }
+ return buf.toByteArray();
+ }
+
+ private static byte[] repeat(String fragment, int times) {
+ StringBuilder sb = new StringBuilder(fragment.length() * times);
+ for (int i = 0; i < times; i++) {
+ sb.append(fragment);
+ }
+ return sb.toString().getBytes(StandardCharsets.UTF_8);
+ }
+
+ /** Snapshot of the response observable by the HTTP client. */
+ private static final class HttpClientResult {
+ final int statusCode;
+ final String contentType;
+ final String transferEncoding;
+ final byte[] body;
+
+ HttpClientResult(int statusCode, String contentType, String transferEncoding, byte[] body) {
+ this.statusCode = statusCode;
+ this.contentType = contentType;
+ this.transferEncoding = transferEncoding;
+ this.body = body;
+ }
+ }
+}
diff --git a/src/test/java/com/microsoft/azure/functions/worker/http/HttpBodyBridgeTest.java b/src/test/java/com/microsoft/azure/functions/worker/http/HttpBodyBridgeTest.java
new file mode 100644
index 0000000..5463d26
--- /dev/null
+++ b/src/test/java/com/microsoft/azure/functions/worker/http/HttpBodyBridgeTest.java
@@ -0,0 +1,330 @@
+package com.microsoft.azure.functions.worker.http;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import com.google.protobuf.ByteString;
+import com.microsoft.azure.functions.rpc.messages.InvocationRequest;
+import com.microsoft.azure.functions.rpc.messages.ParameterBinding;
+import com.microsoft.azure.functions.rpc.messages.RpcHttp;
+import com.microsoft.azure.functions.rpc.messages.TypedData;
+import com.sun.net.httpserver.Headers;
+import com.sun.net.httpserver.HttpExchange;
+
+import org.junit.jupiter.api.Test;
+
+public class HttpBodyBridgeTest {
+
+ @Test
+ public void buildBodyTypedDataJsonContentType() {
+ TypedData data = HttpBodyBridge.buildBodyTypedData(
+ "{\"k\":1}".getBytes(StandardCharsets.UTF_8), "application/json");
+ assertEquals(TypedData.DataCase.JSON, data.getDataCase());
+ assertEquals("{\"k\":1}", data.getJson());
+ }
+
+ @Test
+ public void buildBodyTypedDataJsonWithCharsetSuffix() {
+ TypedData data = HttpBodyBridge.buildBodyTypedData(
+ "{}".getBytes(StandardCharsets.UTF_8), "application/json; charset=utf-8");
+ assertEquals(TypedData.DataCase.JSON, data.getDataCase());
+ }
+
+ @Test
+ public void buildBodyTypedDataTextContentType() {
+ TypedData data = HttpBodyBridge.buildBodyTypedData(
+ "hello".getBytes(StandardCharsets.UTF_8), "text/plain");
+ assertEquals(TypedData.DataCase.STRING, data.getDataCase());
+ assertEquals("hello", data.getString());
+ }
+
+ @Test
+ public void buildBodyTypedDataFormEncoded() {
+ TypedData data = HttpBodyBridge.buildBodyTypedData(
+ "a=1&b=2".getBytes(StandardCharsets.UTF_8), "application/x-www-form-urlencoded");
+ assertEquals(TypedData.DataCase.STRING, data.getDataCase());
+ }
+
+ @Test
+ public void buildBodyTypedDataBinaryWhenNoContentType() {
+ byte[] bytes = new byte[]{1, 2, 3};
+ TypedData data = HttpBodyBridge.buildBodyTypedData(bytes, null);
+ assertEquals(TypedData.DataCase.BYTES, data.getDataCase());
+ assertArrayEquals(bytes, data.getBytes().toByteArray());
+ }
+
+ @Test
+ public void buildBodyTypedDataBinaryWhenOctetStream() {
+ byte[] bytes = "binary".getBytes(StandardCharsets.UTF_8);
+ TypedData data = HttpBodyBridge.buildBodyTypedData(bytes, "application/octet-stream");
+ assertEquals(TypedData.DataCase.BYTES, data.getDataCase());
+ }
+
+ @Test
+ public void buildBodyTypedDataRespectsCharset() {
+ byte[] bytes = "héllo".getBytes(StandardCharsets.ISO_8859_1);
+ TypedData data = HttpBodyBridge.buildBodyTypedData(bytes, "text/plain; charset=ISO-8859-1");
+ assertEquals(TypedData.DataCase.STRING, data.getDataCase());
+ assertEquals("héllo", data.getString());
+ }
+
+ @Test
+ public void enrichRequestWithBodyReplacesHttpBody() throws Exception {
+ InvocationRequest original = InvocationRequest.newBuilder()
+ .setInvocationId("inv-1")
+ .addInputData(ParameterBinding.newBuilder()
+ .setName("req")
+ .setData(TypedData.newBuilder()
+ .setHttp(RpcHttp.newBuilder().setMethod("POST").setUrl("http://localhost/api/x"))))
+ .build();
+ HttpExchange exchange = mockExchangeWithBody("payload".getBytes(StandardCharsets.UTF_8), "text/plain");
+
+ InvocationRequest enriched = HttpBodyBridge.enrichRequestWithBody(original, exchange);
+ assertNotSame(original, enriched);
+ TypedData body = enriched.getInputData(0).getData().getHttp().getBody();
+ assertEquals(TypedData.DataCase.STRING, body.getDataCase());
+ assertEquals("payload", body.getString());
+ // Method/url preserved.
+ assertEquals("POST", enriched.getInputData(0).getData().getHttp().getMethod());
+ }
+
+ @Test
+ public void enrichRequestWithBodyReturnsSameWhenNoHttpInput() throws Exception {
+ InvocationRequest original = InvocationRequest.newBuilder()
+ .setInvocationId("inv-1")
+ .addInputData(ParameterBinding.newBuilder()
+ .setName("queueItem")
+ .setData(TypedData.newBuilder().setString("hello")))
+ .build();
+ HttpExchange exchange = mock(HttpExchange.class);
+ InvocationRequest result = HttpBodyBridge.enrichRequestWithBody(original, exchange);
+ assertSame(original, result, "Non-HTTP requests should not be modified");
+ }
+
+ @Test
+ public void enrichRequestReadsChunkedBodyLargerThanBuffer() throws Exception {
+ // Simulate transfer-encoding: chunked by providing a body larger than the read buffer.
+ byte[] big = new byte[20_000];
+ for (int i = 0; i < big.length; i++) {
+ big[i] = (byte) (i & 0xff);
+ }
+ InvocationRequest original = InvocationRequest.newBuilder()
+ .setInvocationId("inv-big")
+ .addInputData(ParameterBinding.newBuilder().setName("req")
+ .setData(TypedData.newBuilder().setHttp(RpcHttp.newBuilder().setMethod("POST"))))
+ .build();
+ HttpExchange exchange = mockExchangeWithBody(big, "application/octet-stream");
+
+ InvocationRequest enriched = HttpBodyBridge.enrichRequestWithBody(original, exchange);
+ TypedData body = enriched.getInputData(0).getData().getHttp().getBody();
+ assertEquals(TypedData.DataCase.BYTES, body.getDataCase());
+ assertArrayEquals(big, body.getBytes().toByteArray());
+ }
+
+ @Test
+ public void writeRpcHttpResponseWritesStatusHeadersAndBody() throws Exception {
+ ByteArrayOutputStream captured = new ByteArrayOutputStream();
+ Headers responseHeaders = new Headers();
+ HttpExchange exchange = mock(HttpExchange.class);
+ when(exchange.getResponseHeaders()).thenReturn(responseHeaders);
+ when(exchange.getResponseBody()).thenReturn(captured);
+
+ RpcHttp response = RpcHttp.newBuilder()
+ .setStatusCode("201")
+ .putHeaders("Content-Type", "application/json")
+ .putHeaders("X-Custom", "value")
+ .setBody(TypedData.newBuilder().setJson("{\"ok\":true}"))
+ .build();
+
+ HttpBodyBridge.writeRpcHttpResponse(exchange, response);
+
+ byte[] expected = "{\"ok\":true}".getBytes(StandardCharsets.UTF_8);
+ verify(exchange).sendResponseHeaders(201, expected.length);
+ assertArrayEquals(expected, captured.toByteArray());
+ assertEquals("application/json", responseHeaders.getFirst("Content-Type"));
+ assertEquals("value", responseHeaders.getFirst("X-Custom"));
+ }
+
+ @Test
+ public void writeRpcHttpResponseHandlesEmptyBody() throws Exception {
+ ByteArrayOutputStream captured = new ByteArrayOutputStream();
+ HttpExchange exchange = mock(HttpExchange.class);
+ when(exchange.getResponseHeaders()).thenReturn(new Headers());
+ when(exchange.getResponseBody()).thenReturn(captured);
+
+ RpcHttp response = RpcHttp.newBuilder().setStatusCode("204").build();
+ HttpBodyBridge.writeRpcHttpResponse(exchange, response);
+
+ verify(exchange).sendResponseHeaders(204, -1);
+ assertEquals(0, captured.size());
+ }
+
+ @Test
+ public void writeRpcHttpResponseHandlesBytesBody() throws Exception {
+ ByteArrayOutputStream captured = new ByteArrayOutputStream();
+ HttpExchange exchange = mock(HttpExchange.class);
+ when(exchange.getResponseHeaders()).thenReturn(new Headers());
+ when(exchange.getResponseBody()).thenReturn(captured);
+
+ byte[] payload = new byte[]{0x01, 0x02, 0x03};
+ RpcHttp response = RpcHttp.newBuilder()
+ .setStatusCode("200")
+ .setBody(TypedData.newBuilder().setBytes(ByteString.copyFrom(payload)))
+ .build();
+ HttpBodyBridge.writeRpcHttpResponse(exchange, response);
+
+ verify(exchange).sendResponseHeaders(200, payload.length);
+ assertArrayEquals(payload, captured.toByteArray());
+ }
+
+ @Test
+ public void writeRpcHttpResponseDefaultsInvalidStatusTo500() throws Exception {
+ ByteArrayOutputStream captured = new ByteArrayOutputStream();
+ HttpExchange exchange = mock(HttpExchange.class);
+ when(exchange.getResponseHeaders()).thenReturn(new Headers());
+ when(exchange.getResponseBody()).thenReturn(captured);
+
+ RpcHttp response = RpcHttp.newBuilder().setStatusCode("not-a-number").build();
+ HttpBodyBridge.writeRpcHttpResponse(exchange, response);
+
+ verify(exchange).sendResponseHeaders(500, -1);
+ }
+
+ @Test
+ public void writeErrorResponseWritesPlainText() throws Exception {
+ ByteArrayOutputStream captured = new ByteArrayOutputStream();
+ Headers headers = new Headers();
+ HttpExchange exchange = mock(HttpExchange.class);
+ when(exchange.getResponseHeaders()).thenReturn(headers);
+ when(exchange.getResponseBody()).thenReturn(captured);
+
+ HttpBodyBridge.writeErrorResponse(exchange, 418, "I'm a teapot");
+
+ byte[] expected = "I'm a teapot".getBytes(StandardCharsets.UTF_8);
+ verify(exchange).sendResponseHeaders(418, expected.length);
+ assertArrayEquals(expected, captured.toByteArray());
+ assertTrue(headers.getFirst("Content-Type").startsWith("text/plain"));
+ }
+
+ @Test
+ public void writeStreamingResponseFromInputStreamUsesChunkedEncoding() throws Exception {
+ ByteArrayOutputStream captured = new ByteArrayOutputStream();
+ Headers responseHeaders = new Headers();
+ HttpExchange exchange = mock(HttpExchange.class);
+ when(exchange.getResponseHeaders()).thenReturn(responseHeaders);
+ when(exchange.getResponseBody()).thenReturn(captured);
+
+ RpcHttp envelope = RpcHttp.newBuilder()
+ .setStatusCode("200")
+ .putHeaders("Content-Type", "text/event-stream")
+ .putHeaders("Cache-Control", "no-cache")
+ .build();
+ byte[] payload = "data: one\n\ndata: two\n\n".getBytes(StandardCharsets.UTF_8);
+ HttpBodyBridge.writeStreamingResponse(exchange, envelope, new ByteArrayInputStream(payload));
+
+ // length=0 selects chunked transfer-encoding (or close-delimited for HTTP/1.0).
+ verify(exchange).sendResponseHeaders(200, 0);
+ assertArrayEquals(payload, captured.toByteArray());
+ assertEquals("text/event-stream", responseHeaders.getFirst("Content-Type"));
+ assertEquals("no-cache", responseHeaders.getFirst("Cache-Control"));
+ }
+
+ @Test
+ public void writeStreamingResponseFromInputStreamHandlesLargePayload() throws Exception {
+ // Larger than the 8KB read chunk; verifies the copy loop iterates correctly.
+ byte[] big = new byte[32_768];
+ for (int i = 0; i < big.length; i++) {
+ big[i] = (byte) (i & 0xff);
+ }
+ ByteArrayOutputStream captured = new ByteArrayOutputStream();
+ HttpExchange exchange = mock(HttpExchange.class);
+ when(exchange.getResponseHeaders()).thenReturn(new Headers());
+ when(exchange.getResponseBody()).thenReturn(captured);
+
+ RpcHttp envelope = RpcHttp.newBuilder().setStatusCode("200").build();
+ HttpBodyBridge.writeStreamingResponse(exchange, envelope, new ByteArrayInputStream(big));
+
+ verify(exchange).sendResponseHeaders(200, 0);
+ assertArrayEquals(big, captured.toByteArray());
+ }
+
+ @Test
+ public void writeStreamingResponseFromInputStreamClosesSource() throws Exception {
+ boolean[] closed = new boolean[]{false};
+ ByteArrayInputStream backing = new ByteArrayInputStream("x".getBytes(StandardCharsets.UTF_8));
+ java.io.InputStream tracking = new java.io.FilterInputStream(backing) {
+ @Override
+ public void close() throws IOException {
+ closed[0] = true;
+ super.close();
+ }
+ };
+ HttpExchange exchange = mock(HttpExchange.class);
+ when(exchange.getResponseHeaders()).thenReturn(new Headers());
+ when(exchange.getResponseBody()).thenReturn(new ByteArrayOutputStream());
+
+ HttpBodyBridge.writeStreamingResponse(
+ exchange, RpcHttp.newBuilder().setStatusCode("200").build(), tracking);
+
+ assertTrue(closed[0], "Streaming InputStream body should be closed");
+ }
+
+ @Test
+ public void writeStreamingResponseFromIOConsumerInvokesWriterAndFlushes() throws Exception {
+ ByteArrayOutputStream captured = new ByteArrayOutputStream();
+ Headers responseHeaders = new Headers();
+ HttpExchange exchange = mock(HttpExchange.class);
+ when(exchange.getResponseHeaders()).thenReturn(responseHeaders);
+ when(exchange.getResponseBody()).thenReturn(captured);
+
+ RpcHttp envelope = RpcHttp.newBuilder()
+ .setStatusCode("202")
+ .putHeaders("X-Trace", "abc")
+ .build();
+ HttpBodyBridge.writeStreamingResponse(exchange, envelope, out -> {
+ out.write("chunk-1\n".getBytes(StandardCharsets.UTF_8));
+ out.write("chunk-2\n".getBytes(StandardCharsets.UTF_8));
+ });
+
+ verify(exchange).sendResponseHeaders(202, 0);
+ assertEquals("chunk-1\nchunk-2\n", captured.toString("UTF-8"));
+ assertEquals("abc", responseHeaders.getFirst("X-Trace"));
+ }
+
+ @Test
+ public void writeStreamingResponseFromIOConsumerPropagatesIOException() {
+ HttpExchange exchange = mock(HttpExchange.class);
+ when(exchange.getResponseHeaders()).thenReturn(new Headers());
+ when(exchange.getResponseBody()).thenReturn(new ByteArrayOutputStream());
+
+ IOException expected = new IOException("writer-failed");
+ IOException thrown = org.junit.jupiter.api.Assertions.assertThrows(IOException.class, () ->
+ HttpBodyBridge.writeStreamingResponse(
+ exchange,
+ RpcHttp.newBuilder().setStatusCode("200").build(),
+ out -> { throw expected; }));
+ assertSame(expected, thrown);
+ }
+
+ private static HttpExchange mockExchangeWithBody(byte[] body, String contentType) throws IOException {
+ HttpExchange exchange = mock(HttpExchange.class);
+ Headers headers = new Headers();
+ if (contentType != null) {
+ headers.add("Content-Type", contentType);
+ }
+ when(exchange.getRequestHeaders()).thenReturn(headers);
+ when(exchange.getRequestBody()).thenReturn(new ByteArrayInputStream(body));
+ return exchange;
+ }
+}
diff --git a/src/test/java/com/microsoft/azure/functions/worker/http/HttpInvocationCoordinatorTest.java b/src/test/java/com/microsoft/azure/functions/worker/http/HttpInvocationCoordinatorTest.java
new file mode 100644
index 0000000..4fd35d3
--- /dev/null
+++ b/src/test/java/com/microsoft/azure/functions/worker/http/HttpInvocationCoordinatorTest.java
@@ -0,0 +1,149 @@
+package com.microsoft.azure.functions.worker.http;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import com.microsoft.azure.functions.rpc.messages.InvocationRequest;
+import com.sun.net.httpserver.HttpExchange;
+
+import org.junit.jupiter.api.Test;
+
+public class HttpInvocationCoordinatorTest {
+
+ private static final String INVOCATION_ID = "abc-123";
+
+ @Test
+ public void httpArrivesBeforeGrpc() throws Exception {
+ HttpInvocationCoordinator coordinator = new HttpInvocationCoordinator();
+ HttpExchange exchange = mock(HttpExchange.class);
+ InvocationRequest request = InvocationRequest.newBuilder().setInvocationId(INVOCATION_ID).build();
+
+ HttpInvocationSlot httpSlot = coordinator.registerHttpArrival(INVOCATION_ID, exchange);
+ assertFalse(httpSlot.grpcArrival().isDone(), "gRPC future should still be pending before gRPC arrival");
+
+ HttpInvocationSlot grpcSlot = coordinator.registerGrpcArrival(request);
+ assertSame(httpSlot, grpcSlot, "both registrations should yield the same slot");
+ assertTrue(grpcSlot.httpArrival().isDone(), "HTTP future should already be resolved once gRPC arrives");
+ assertSame(exchange, grpcSlot.httpArrival().get(1, TimeUnit.SECONDS));
+ assertSame(request, httpSlot.grpcArrival().get(1, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void grpcArrivesBeforeHttp() throws Exception {
+ HttpInvocationCoordinator coordinator = new HttpInvocationCoordinator();
+ HttpExchange exchange = mock(HttpExchange.class);
+ InvocationRequest request = InvocationRequest.newBuilder().setInvocationId(INVOCATION_ID).build();
+
+ HttpInvocationSlot grpcSlot = coordinator.registerGrpcArrival(request);
+ assertFalse(grpcSlot.httpArrival().isDone(), "HTTP future should still be pending before HTTP arrival");
+
+ HttpInvocationSlot httpSlot = coordinator.registerHttpArrival(INVOCATION_ID, exchange);
+ assertSame(grpcSlot, httpSlot, "both registrations should yield the same slot");
+ assertTrue(httpSlot.grpcArrival().isDone(), "gRPC future should already be resolved once HTTP arrives");
+ assertSame(exchange, httpSlot.httpArrival().get(1, TimeUnit.SECONDS));
+ assertSame(request, httpSlot.grpcArrival().get(1, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void releaseInvocationRemovesSlot() throws Exception {
+ HttpInvocationCoordinator coordinator = new HttpInvocationCoordinator();
+ HttpExchange exchange = mock(HttpExchange.class);
+ coordinator.registerHttpArrival(INVOCATION_ID, exchange);
+ assertEquals(1, coordinator.activeInvocationCount());
+
+ coordinator.releaseInvocation(INVOCATION_ID);
+ assertEquals(0, coordinator.activeInvocationCount());
+ }
+
+ @Test
+ public void releaseInvocationIsIdempotent() {
+ HttpInvocationCoordinator coordinator = new HttpInvocationCoordinator();
+ // Releasing an unknown invocation does not throw.
+ coordinator.releaseInvocation("unknown");
+ coordinator.releaseInvocation("unknown");
+ }
+
+ @Test
+ public void failInvocationPropagatesToFutures() {
+ HttpInvocationCoordinator coordinator = new HttpInvocationCoordinator();
+ HttpExchange exchange = mock(HttpExchange.class);
+ HttpInvocationSlot slot = coordinator.registerHttpArrival(INVOCATION_ID, exchange);
+
+ IOException cause = new IOException("boom");
+ coordinator.failInvocation(INVOCATION_ID, cause);
+
+ ExecutionException ex = assertThrows(ExecutionException.class,
+ () -> slot.grpcArrival().get(1, TimeUnit.SECONDS));
+ assertSame(cause, ex.getCause());
+ ExecutionException completionEx = assertThrows(ExecutionException.class,
+ () -> slot.completion().get(1, TimeUnit.SECONDS));
+ assertSame(cause, completionEx.getCause());
+ assertEquals(0, coordinator.activeInvocationCount());
+ }
+
+ @Test
+ public void duplicateHttpArrivalThrows() {
+ HttpInvocationCoordinator coordinator = new HttpInvocationCoordinator();
+ HttpExchange exchange = mock(HttpExchange.class);
+ coordinator.registerHttpArrival(INVOCATION_ID, exchange);
+ assertThrows(IllegalStateException.class,
+ () -> coordinator.registerHttpArrival(INVOCATION_ID, exchange));
+ }
+
+ @Test
+ public void duplicateGrpcArrivalThrows() {
+ HttpInvocationCoordinator coordinator = new HttpInvocationCoordinator();
+ InvocationRequest request = InvocationRequest.newBuilder().setInvocationId(INVOCATION_ID).build();
+ coordinator.registerGrpcArrival(request);
+ assertThrows(IllegalStateException.class,
+ () -> coordinator.registerGrpcArrival(request));
+ }
+
+ @Test
+ public void independentInvocationsDoNotInterfere() throws Exception {
+ HttpInvocationCoordinator coordinator = new HttpInvocationCoordinator();
+ HttpExchange exchangeA = mock(HttpExchange.class);
+ HttpExchange exchangeB = mock(HttpExchange.class);
+ InvocationRequest reqA = InvocationRequest.newBuilder().setInvocationId("a").build();
+ InvocationRequest reqB = InvocationRequest.newBuilder().setInvocationId("b").build();
+
+ HttpInvocationSlot slotA = coordinator.registerHttpArrival("a", exchangeA);
+ HttpInvocationSlot slotB = coordinator.registerHttpArrival("b", exchangeB);
+ // Resolve only A; B must still be pending.
+ coordinator.registerGrpcArrival(reqA);
+ assertTrue(slotA.grpcArrival().isDone());
+ assertFalse(slotB.grpcArrival().isDone());
+
+ coordinator.registerGrpcArrival(reqB);
+ assertSame(reqA, slotA.grpcArrival().get(1, TimeUnit.SECONDS));
+ assertSame(reqB, slotB.grpcArrival().get(1, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void grpcFutureRemainsPendingUntilHttpArrives() {
+ HttpInvocationCoordinator coordinator = new HttpInvocationCoordinator();
+ InvocationRequest request = InvocationRequest.newBuilder().setInvocationId(INVOCATION_ID).build();
+ HttpInvocationSlot slot = coordinator.registerGrpcArrival(request);
+ // No HTTP arrival; future must time out.
+ assertThrows(TimeoutException.class, () -> slot.httpArrival().get(50, TimeUnit.MILLISECONDS));
+ }
+
+ @Test
+ public void completionFutureResolvesOnRelease() throws Exception {
+ HttpInvocationCoordinator coordinator = new HttpInvocationCoordinator();
+ HttpExchange exchange = mock(HttpExchange.class);
+ HttpInvocationSlot slot = coordinator.registerHttpArrival(INVOCATION_ID, exchange);
+ assertFalse(slot.completion().isDone());
+ coordinator.releaseInvocation(INVOCATION_ID);
+ slot.completion().get(1, TimeUnit.SECONDS); // resolves without throwing
+ }
+}
diff --git a/src/test/java/com/microsoft/azure/functions/worker/http/HttpProxyHandlerTest.java b/src/test/java/com/microsoft/azure/functions/worker/http/HttpProxyHandlerTest.java
new file mode 100644
index 0000000..42e9dd6
--- /dev/null
+++ b/src/test/java/com/microsoft/azure/functions/worker/http/HttpProxyHandlerTest.java
@@ -0,0 +1,129 @@
+package com.microsoft.azure.functions.worker.http;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayOutputStream;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.sun.net.httpserver.Headers;
+import com.sun.net.httpserver.HttpExchange;
+
+import org.junit.jupiter.api.Test;
+
+public class HttpProxyHandlerTest {
+
+ private static final String INVOCATION_ID = "abc-123";
+
+ @Test
+ public void rejectsRequestWithoutInvocationIdHeader() throws Exception {
+ HttpInvocationCoordinator coordinator = new HttpInvocationCoordinator();
+ HttpProxyHandler handler = new HttpProxyHandler(coordinator);
+ ByteArrayOutputStream captured = new ByteArrayOutputStream();
+ HttpExchange exchange = mock(HttpExchange.class);
+ when(exchange.getRequestHeaders()).thenReturn(new Headers());
+ when(exchange.getResponseHeaders()).thenReturn(new Headers());
+ when(exchange.getResponseBody()).thenReturn(captured);
+
+ handler.handle(exchange);
+
+ verify(exchange).sendResponseHeaders(400, captured.size());
+ verify(exchange).close();
+ assertEquals(0, coordinator.activeInvocationCount());
+ }
+
+ @Test
+ public void registersHttpArrivalAndWaitsForCompletion() throws Exception {
+ HttpInvocationCoordinator coordinator = new HttpInvocationCoordinator();
+ HttpProxyHandler handler = new HttpProxyHandler(coordinator);
+ HttpExchange exchange = mock(HttpExchange.class);
+ Headers requestHeaders = new Headers();
+ requestHeaders.add(HttpProxyHandler.INVOCATION_ID_HEADER, INVOCATION_ID);
+ when(exchange.getRequestHeaders()).thenReturn(requestHeaders);
+
+ AtomicReference handlerError = new AtomicReference<>();
+ CompletableFuture handlerDone = CompletableFuture.runAsync(() -> {
+ try {
+ handler.handle(exchange);
+ } catch (Throwable t) {
+ handlerError.set(t);
+ }
+ });
+
+ // Wait for the handler to register HTTP arrival.
+ long deadline = System.currentTimeMillis() + 1000;
+ while (coordinator.activeInvocationCount() == 0 && System.currentTimeMillis() < deadline) {
+ Thread.sleep(10);
+ }
+ assertEquals(1, coordinator.activeInvocationCount());
+
+ // Simulate the gRPC side finishing the invocation.
+ coordinator.releaseInvocation(INVOCATION_ID);
+
+ handlerDone.get();
+ assertEquals(null, handlerError.get());
+ verify(exchange).close();
+ // The handler must NOT have written any error response - the gRPC side owns the body.
+ verify(exchange, never()).sendResponseHeaders(anyInt(), anyLong());
+ }
+
+ @Test
+ public void respondsWith500WhenInvocationFails() throws Exception {
+ HttpInvocationCoordinator coordinator = new HttpInvocationCoordinator();
+ HttpProxyHandler handler = new HttpProxyHandler(coordinator);
+ ByteArrayOutputStream captured = new ByteArrayOutputStream();
+ HttpExchange exchange = mock(HttpExchange.class);
+ Headers requestHeaders = new Headers();
+ requestHeaders.add(HttpProxyHandler.INVOCATION_ID_HEADER, INVOCATION_ID);
+ when(exchange.getRequestHeaders()).thenReturn(requestHeaders);
+ when(exchange.getResponseHeaders()).thenReturn(new Headers());
+ when(exchange.getResponseBody()).thenReturn(captured);
+
+ CompletableFuture handlerDone = CompletableFuture.runAsync(() -> {
+ try {
+ handler.handle(exchange);
+ } catch (Exception ignored) {
+ }
+ });
+
+ long deadline = System.currentTimeMillis() + 1000;
+ while (coordinator.activeInvocationCount() == 0 && System.currentTimeMillis() < deadline) {
+ Thread.sleep(10);
+ }
+ coordinator.failInvocation(INVOCATION_ID, new RuntimeException("user fn crashed"));
+
+ handlerDone.get();
+ verify(exchange).sendResponseHeaders(500, captured.size());
+ verify(exchange).close();
+ assertTrue(new String(captured.toByteArray()).contains("user fn crashed"));
+ }
+
+ @Test
+ public void duplicateRegistrationReturns409() throws Exception {
+ HttpInvocationCoordinator coordinator = new HttpInvocationCoordinator();
+ HttpProxyHandler handler = new HttpProxyHandler(coordinator);
+ // Pre-register HTTP arrival to force a duplicate on the next handle() call.
+ HttpExchange first = mock(HttpExchange.class);
+ coordinator.registerHttpArrival(INVOCATION_ID, first);
+
+ ByteArrayOutputStream captured = new ByteArrayOutputStream();
+ HttpExchange second = mock(HttpExchange.class);
+ Headers headers = new Headers();
+ headers.add(HttpProxyHandler.INVOCATION_ID_HEADER, INVOCATION_ID);
+ when(second.getRequestHeaders()).thenReturn(headers);
+ when(second.getResponseHeaders()).thenReturn(new Headers());
+ when(second.getResponseBody()).thenReturn(captured);
+
+ handler.handle(second);
+
+ verify(second).sendResponseHeaders(409, captured.size());
+ verify(second).close();
+ }
+}
diff --git a/src/test/java/com/microsoft/azure/functions/worker/http/HttpProxyServerTest.java b/src/test/java/com/microsoft/azure/functions/worker/http/HttpProxyServerTest.java
new file mode 100644
index 0000000..f9f04a3
--- /dev/null
+++ b/src/test/java/com/microsoft/azure/functions/worker/http/HttpProxyServerTest.java
@@ -0,0 +1,115 @@
+package com.microsoft.azure.functions.worker.http;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.sun.net.httpserver.HttpHandler;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+public class HttpProxyServerTest {
+
+ private HttpProxyServer server;
+
+ @AfterEach
+ public void tearDown() {
+ if (server != null) {
+ server.close();
+ server = null;
+ }
+ }
+
+ @Test
+ public void startBindsToEphemeralPortAndReturnsUri() throws Exception {
+ server = new HttpProxyServer(ProxyConfig.defaults());
+ String uri = server.start(noOpHandler());
+ assertNotNull(uri);
+ assertTrue(uri.startsWith("http://127.0.0.1:"), "Expected loopback URI, got " + uri);
+ URI parsed = URI.create(uri);
+ assertTrue(parsed.getPort() > 0, "Expected a real port number, got " + parsed.getPort());
+ assertEquals(uri, server.getBoundUri());
+ }
+
+ @Test
+ public void getBoundUriReturnsNullBeforeStart() {
+ server = new HttpProxyServer(ProxyConfig.defaults());
+ assertNull(server.getBoundUri());
+ }
+
+ @Test
+ public void closeBeforeStartIsNoop() {
+ server = new HttpProxyServer(ProxyConfig.defaults());
+ server.close();
+ assertNull(server.getBoundUri());
+ }
+
+ @Test
+ public void doubleStartThrows() throws Exception {
+ server = new HttpProxyServer(ProxyConfig.defaults());
+ server.start(noOpHandler());
+ assertThrows(IllegalStateException.class, () -> server.start(noOpHandler()));
+ }
+
+ @Test
+ public void routesIncomingRequestToHandler() throws Exception {
+ server = new HttpProxyServer(ProxyConfig.defaults());
+ AtomicReference seenPath = new AtomicReference<>();
+ AtomicReference seenHeader = new AtomicReference<>();
+ String uri = server.start(exchange -> {
+ seenPath.set(exchange.getRequestURI().getPath());
+ seenHeader.set(exchange.getRequestHeaders().getFirst("x-ms-invocation-id"));
+ byte[] body = "hello".getBytes(StandardCharsets.UTF_8);
+ exchange.sendResponseHeaders(200, body.length);
+ try (OutputStream os = exchange.getResponseBody()) {
+ os.write(body);
+ }
+ });
+
+ HttpURLConnection conn = (HttpURLConnection) URI.create(uri + "/some/route").toURL().openConnection();
+ conn.setRequestProperty("x-ms-invocation-id", "test-123");
+ conn.connect();
+ try {
+ assertEquals(200, conn.getResponseCode());
+ try (BufferedReader reader = new BufferedReader(
+ new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8))) {
+ assertEquals("hello", reader.readLine());
+ }
+ } finally {
+ conn.disconnect();
+ }
+ assertEquals("/some/route", seenPath.get());
+ assertEquals("test-123", seenHeader.get());
+ }
+
+ @Test
+ public void closeStopsAcceptingConnections() throws Exception {
+ server = new HttpProxyServer(ProxyConfig.defaults());
+ String uri = server.start(noOpHandler());
+ server.close();
+ server = null;
+ HttpURLConnection conn = (HttpURLConnection) URI.create(uri + "/").toURL().openConnection();
+ conn.setConnectTimeout(500);
+ conn.setReadTimeout(500);
+ // After close, the next connect attempt must fail (connection refused).
+ assertThrows(Exception.class, conn::connect);
+ }
+
+ private static HttpHandler noOpHandler() {
+ return exchange -> {
+ exchange.sendResponseHeaders(204, -1);
+ exchange.close();
+ };
+ }
+}