diff --git a/eng/ci/public-build.yml b/eng/ci/public-build.yml index 26b643c..12d2880 100644 --- a/eng/ci/public-build.yml +++ b/eng/ci/public-build.yml @@ -17,6 +17,32 @@ pr: include: - dev +# Pipeline-level parameters surfaced in the ADO "Run pipeline" dialog so we can +# build a custom branch of azure-functions-java-additions (e.g. a fork) before +# the worker's mvn build. Defaults to a no-op so normal PRs continue to resolve +# the library from Maven Central. +# +# TEMPORARY: defaults are flipped to build the fork branch +# (ahmedmuhsin/azure-functions-java-additions @ feat/http-response-bodystream) +# so GitHub-triggered PR CI for #877 picks up the unpublished +# `azure-functions-java-core-library:1.4.0-SNAPSHOT`. Revert to the upstream +# defaults (buildAdditionsFromSource=false, Azure repo, dev branch) once the +# companion PR (Azure/azure-functions-java-additions#55) is merged and a +# matching library version is published to Maven Central. +parameters: + - name: buildAdditionsFromSource + displayName: 'Build azure-functions-java-additions from source (instead of resolving from Maven Central)' + type: boolean + default: true + - name: additionsRepoUrl + displayName: 'Git URL for azure-functions-java-additions (used only when buildAdditionsFromSource is true)' + type: string + default: 'https://github.com/ahmedmuhsin/azure-functions-java-additions.git' + - name: additionsBranch + displayName: 'Branch of azure-functions-java-additions to build (used only when buildAdditionsFromSource is true)' + type: string + default: 'feat/http-response-bodystream' + resources: repositories: - repository: 1es @@ -51,6 +77,10 @@ extends: - stage: Build jobs: - template: /eng/ci/templates/jobs/build.yml@self + parameters: + buildAdditionsFromSource: ${{ parameters.buildAdditionsFromSource }} + additionsRepoUrl: ${{ parameters.additionsRepoUrl }} + additionsBranch: ${{ parameters.additionsBranch }} - stage: TestWindows dependsOn: [] @@ -58,6 +88,9 @@ extends: - template: /eng/ci/templates/jobs/run-emulated-tests-windows.yml@self parameters: poolName: 1es-pool-azfunc-public + buildAdditionsFromSource: ${{ parameters.buildAdditionsFromSource }} + additionsRepoUrl: ${{ parameters.additionsRepoUrl }} + additionsBranch: ${{ parameters.additionsBranch }} - stage: TestLinux dependsOn: [] @@ -65,10 +98,16 @@ extends: - template: /eng/ci/templates/jobs/run-emulated-tests-linux.yml@self parameters: poolName: 1es-pool-azfunc-public + buildAdditionsFromSource: ${{ parameters.buildAdditionsFromSource }} + additionsRepoUrl: ${{ parameters.additionsRepoUrl }} + additionsBranch: ${{ parameters.additionsBranch }} - stage: TestDocker dependsOn: [] jobs: - template: /eng/ci/templates/jobs/run-docker-tests-linux.yml@self parameters: - poolName: 1es-pool-azfunc-public \ No newline at end of file + poolName: 1es-pool-azfunc-public + buildAdditionsFromSource: ${{ parameters.buildAdditionsFromSource }} + additionsRepoUrl: ${{ parameters.additionsRepoUrl }} + additionsBranch: ${{ parameters.additionsBranch }} \ No newline at end of file diff --git a/eng/ci/templates/jobs/build.yml b/eng/ci/templates/jobs/build.yml index 9d22951..3b88652 100644 --- a/eng/ci/templates/jobs/build.yml +++ b/eng/ci/templates/jobs/build.yml @@ -1,3 +1,14 @@ +parameters: + - name: buildAdditionsFromSource + type: boolean + default: false + - name: additionsRepoUrl + type: string + default: 'https://github.com/Azure/azure-functions-java-additions.git' + - name: additionsBranch + type: string + default: 'dev' + jobs: - job: "Build" displayName: 'Build java worker' @@ -14,6 +25,10 @@ jobs: - pwsh: | java -version displayName: 'Check default java version' + - ${{ if eq(parameters.buildAdditionsFromSource, true) }}: + - pwsh: | + ./installAdditionsLocally.ps1 -AdditionsRepoUrl '${{ parameters.additionsRepoUrl }}' -AdditionsBranch '${{ parameters.additionsBranch }}' + displayName: 'Install azure-functions-java-additions from source' - pwsh: | mvn clean package displayName: 'Build java worker' \ No newline at end of file diff --git a/eng/ci/templates/jobs/run-docker-tests-linux.yml b/eng/ci/templates/jobs/run-docker-tests-linux.yml index 70c4564..08f1389 100644 --- a/eng/ci/templates/jobs/run-docker-tests-linux.yml +++ b/eng/ci/templates/jobs/run-docker-tests-linux.yml @@ -2,6 +2,15 @@ parameters: - name: poolName type: string default: '' + - name: buildAdditionsFromSource + type: boolean + default: false + - name: additionsRepoUrl + type: string + default: 'https://github.com/Azure/azure-functions-java-additions.git' + - name: additionsBranch + type: string + default: 'dev' jobs: - job: "TestDocker" @@ -51,6 +60,11 @@ jobs: pip install -e dockertests/azure-functions-test-kit displayName: 'Install Python dependencies' + - ${{ if eq(parameters.buildAdditionsFromSource, true) }}: + - pwsh: | + ./installAdditionsLocally.ps1 -AdditionsRepoUrl '${{ parameters.additionsRepoUrl }}' -AdditionsBranch '${{ parameters.additionsBranch }}' + displayName: 'Install azure-functions-java-additions from source' + - pwsh: | ./package-pipeline.ps1 -outputDir 'java-worker' -skipNuget displayName: 'Package Java worker' diff --git a/eng/ci/templates/jobs/run-emulated-tests-linux.yml b/eng/ci/templates/jobs/run-emulated-tests-linux.yml index 96b5ed2..8d7fd3e 100644 --- a/eng/ci/templates/jobs/run-emulated-tests-linux.yml +++ b/eng/ci/templates/jobs/run-emulated-tests-linux.yml @@ -2,6 +2,15 @@ parameters: - name: poolName type: string default: '' + - name: buildAdditionsFromSource + type: boolean + default: false + - name: additionsRepoUrl + type: string + default: 'https://github.com/Azure/azure-functions-java-additions.git' + - name: additionsBranch + type: string + default: 'dev' jobs: - job: "TestLinux" @@ -83,6 +92,10 @@ jobs: docker compose -f emulatedtests/utils/docker-compose.yml pull docker compose -f emulatedtests/utils/docker-compose.yml up -d displayName: 'Install Azurite and Start Emulators' + - ${{ if eq(parameters.buildAdditionsFromSource, true) }}: + - pwsh: | + ./installAdditionsLocally.ps1 -AdditionsRepoUrl '${{ parameters.additionsRepoUrl }}' -AdditionsBranch '${{ parameters.additionsBranch }}' + displayName: 'Install azure-functions-java-additions from source' - pwsh: | if ("$(isTag)"){ $buildNumber="$(Build.SourceBranchName)" diff --git a/eng/ci/templates/jobs/run-emulated-tests-windows.yml b/eng/ci/templates/jobs/run-emulated-tests-windows.yml index f2a87f0..6260f12 100644 --- a/eng/ci/templates/jobs/run-emulated-tests-windows.yml +++ b/eng/ci/templates/jobs/run-emulated-tests-windows.yml @@ -2,6 +2,15 @@ parameters: - name: poolName type: string default: '' + - name: buildAdditionsFromSource + type: boolean + default: false + - name: additionsRepoUrl + type: string + default: 'https://github.com/Azure/azure-functions-java-additions.git' + - name: additionsBranch + type: string + default: 'dev' jobs: - job: "TestWindows" @@ -78,6 +87,10 @@ jobs: mkdir azurite azurite --silent --location azurite --debug azurite\debug.log & displayName: 'Install and Run Azurite' + - ${{ if eq(parameters.buildAdditionsFromSource, true) }}: + - pwsh: | + ./installAdditionsLocally.ps1 -AdditionsRepoUrl '${{ parameters.additionsRepoUrl }}' -AdditionsBranch '${{ parameters.additionsBranch }}' + displayName: 'Install azure-functions-java-additions from source' - pwsh: | if ("$(isTag)"){ $buildNumber="$(Build.SourceBranchName)" diff --git a/installAdditionsLocally.ps1 b/installAdditionsLocally.ps1 index 9546919..31f90a7 100644 --- a/installAdditionsLocally.ps1 +++ b/installAdditionsLocally.ps1 @@ -1,28 +1,76 @@ -# Variables for first repository -$repoUrl1 = 'https://github.com/Azure/azure-functions-java-additions.git' -$branchName1 = 'dev' -$repoName1 = 'azure-functions-java-additions' - -# Clone the first repository -git clone $repoUrl1 - -# Change directory to the cloned repository -Set-Location $repoName1 - -# Checkout the desired branch -git checkout $branchName1 - -# Detect OS and execute build accordingly -if ($IsWindows) { - # Run the batch script (mvnBuild.bat) - & "..\mvnBuildAdditions.bat" -} else { - # Extract and explicitly invoke the mvn command from mvnBuild.bat - $mvnCommand = Get-Content "../mvnBuildAdditions.bat" | Where-Object { $_ -match '^mvn\s+' } - if ($null -ne $mvnCommand) { - # Execute the extracted mvn command explicitly as a single line - bash -c "$mvnCommand" - } else { - Write-Error "No mvn command found in mvnBuild.bat." +[CmdletBinding()] +param( + [string]$AdditionsRepoUrl = 'https://github.com/Azure/azure-functions-java-additions.git', + [string]$AdditionsBranch = 'dev', + [bool]$SkipTests = $true +) + +$ErrorActionPreference = 'Stop' + +$repoName = 'azure-functions-java-additions' +$workerRoot = $PSScriptRoot +$cloneDir = Join-Path $workerRoot $repoName +$mvnBuildScript = Join-Path $workerRoot 'mvnBuildAdditions.bat' + +# CI bootstrap only needs artifacts installed into the local Maven cache. +# Skipping tests avoids JDK-matrix-specific test compilation failures in additions. +$skipTestArgs = if ($SkipTests) { ' -Dmaven.test.skip=true' } else { '' } + +Write-Host "Installing $repoName from $AdditionsRepoUrl (branch: $AdditionsBranch)" +Write-Host "Clone destination: $cloneDir" + +# Make the clone idempotent for re-runs. +if (Test-Path $cloneDir) { + Write-Host "Removing existing $cloneDir" + Remove-Item -Path $cloneDir -Recurse -Force +} + +Push-Location $workerRoot +try { + git clone --branch $AdditionsBranch --single-branch $AdditionsRepoUrl + if ($LASTEXITCODE -ne 0) { throw "git clone failed for $AdditionsRepoUrl ($AdditionsBranch)" } + + Push-Location $repoName + try { + # spotbugs-maven-plugin:3.1.6 bundles groovy-3.0.0-alpha-3 which crashes at class-load + # time on JDK 17+ (ExceptionInInitializerError in org.codehaus.groovy.vmplugin.v7.Java7). + # This happens before Maven can check -Dspotbugs.skip=true, so the skip flag is useless. + # Work around by building additions with Java 8 when JAVA_HOME_8_X64 is available (always + # set on ADO agents by the JavaToolInstaller pre-step). Falls back to current JAVA_HOME + # on developer machines that don't have that variable set. + $savedJavaHome = $env:JAVA_HOME + $savedPath = $env:PATH + $java8Home = $env:JAVA_HOME_8_X64 + if ($java8Home -and (Test-Path $java8Home)) { + Write-Host "Temporarily using Java 8 (JAVA_HOME_8_X64=$java8Home) for additions install" + Write-Host " (avoids spotbugs-maven-plugin:3.1.6 Groovy incompatibility on JDK 17+)" + $env:JAVA_HOME = $java8Home + $env:PATH = (Join-Path $java8Home 'bin') + [System.IO.Path]::PathSeparator + $env:PATH + } else { + Write-Host "JAVA_HOME_8_X64 not set; using current JAVA_HOME: $env:JAVA_HOME" + } + + try { + # Extract the Maven command from mvnBuildAdditions.bat so we can append extra flags. + $mvnCommand = Get-Content $mvnBuildScript | Where-Object { $_ -match '^mvn\s+' } + if ($null -eq $mvnCommand) { + throw "No mvn command found in $mvnBuildScript" + } + + if ($IsWindows) { + & cmd.exe /c "$mvnCommand$skipTestArgs" + } else { + bash -c "$mvnCommand$skipTestArgs" + } + if ($LASTEXITCODE -ne 0) { throw "additions maven command failed" } + } finally { + # Restore JAVA_HOME/PATH regardless of success or failure. + $env:JAVA_HOME = $savedJavaHome + $env:PATH = $savedPath + } + } finally { + Pop-Location } +} finally { + Pop-Location } diff --git a/pom.xml b/pom.xml index cf3253d..178f9ef 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ UTF-8 1.8 1.8 - 1.3.0 + 1.4.0-SNAPSHOT 1.1.0 1.0.2 2.2.0 diff --git a/src/main/java/com/microsoft/azure/functions/worker/Constants.java b/src/main/java/com/microsoft/azure/functions/worker/Constants.java index 87b2834..2369b35 100644 --- a/src/main/java/com/microsoft/azure/functions/worker/Constants.java +++ b/src/main/java/com/microsoft/azure/functions/worker/Constants.java @@ -18,4 +18,11 @@ private Constants(){} public static final String JAVA_ENABLE_OPENTELEMETRY = "JAVA_ENABLE_OPENTELEMETRY"; public static final String JAVA_APPLICATIONINSIGHTS_ENABLE_TELEMETRY = "JAVA_APPLICATIONINSIGHTS_ENABLE_TELEMETRY"; public static final String JAVA_ENABLE_SDK_TYPES = "JAVA_ENABLE_SDK_TYPES"; + /** + * If set to "true" (case-insensitive), the worker will NOT start the + * embedded HTTP proxy server and will NOT advertise the {@code HttpUri} + * capability. Useful as an escape hatch if the proxy path causes problems. + * Default: unset (proxy enabled). + */ + public static final String FUNCTIONS_JAVA_DISABLE_HTTP_PROXY = "FUNCTIONS_JAVA_DISABLE_HTTP_PROXY"; } diff --git a/src/main/java/com/microsoft/azure/functions/worker/JavaWorkerClient.java b/src/main/java/com/microsoft/azure/functions/worker/JavaWorkerClient.java index 478102b..fca6bf7 100644 --- a/src/main/java/com/microsoft/azure/functions/worker/JavaWorkerClient.java +++ b/src/main/java/com/microsoft/azure/functions/worker/JavaWorkerClient.java @@ -12,6 +12,9 @@ import com.microsoft.azure.functions.worker.broker.*; import com.microsoft.azure.functions.worker.handler.*; +import com.microsoft.azure.functions.worker.http.HttpInvocationCoordinator; +import com.microsoft.azure.functions.worker.http.HttpProxyServer; +import com.microsoft.azure.functions.worker.http.ProxyConfig; import com.microsoft.azure.functions.worker.reflect.*; import com.microsoft.azure.functions.rpc.messages.*; @@ -36,19 +39,28 @@ public JavaWorkerClient(IApplication app) { this.peer = new AtomicReference<>(null); this.handlerSuppliers = new HashMap<>(); this.classPathProvider = new FactoryClassLoader().createClassLoaderProvider(); - + this.httpInvocationCoordinator = new HttpInvocationCoordinator(); + this.httpProxyServer = httpProxyEnabled() ? new HttpProxyServer(ProxyConfig.defaults()) : null; + this.addHandlers(); } + private static boolean httpProxyEnabled() { + String value = System.getenv(Constants.FUNCTIONS_JAVA_DISABLE_HTTP_PROXY); + return !Boolean.parseBoolean(value); + } + @PostConstruct private void addHandlers() { JavaFunctionBroker broker = new JavaFunctionBroker(classPathProvider); - - this.handlerSuppliers.put(StreamingMessage.ContentCase.WORKER_INIT_REQUEST, () -> new WorkerInitRequestHandler(broker)); + + this.handlerSuppliers.put(StreamingMessage.ContentCase.WORKER_INIT_REQUEST, + () -> new WorkerInitRequestHandler(broker, this.httpProxyServer, this.httpInvocationCoordinator)); this.handlerSuppliers.put(StreamingMessage.ContentCase.WORKER_WARMUP_REQUEST, WorkerWarmupHandler::new); this.handlerSuppliers.put(StreamingMessage.ContentCase.FUNCTION_ENVIRONMENT_RELOAD_REQUEST, () -> new FunctionEnvironmentReloadRequestHandler(broker)); this.handlerSuppliers.put(StreamingMessage.ContentCase.FUNCTION_LOAD_REQUEST, () -> new FunctionLoadRequestHandler(broker)); - this.handlerSuppliers.put(StreamingMessage.ContentCase.INVOCATION_REQUEST, () -> new InvocationRequestHandler(broker)); + this.handlerSuppliers.put(StreamingMessage.ContentCase.INVOCATION_REQUEST, + () -> new InvocationRequestHandler(broker, this.httpInvocationCoordinator)); this.handlerSuppliers.put(StreamingMessage.ContentCase.WORKER_STATUS_REQUEST, WorkerStatusRequestHandler::new); this.handlerSuppliers.put(StreamingMessage.ContentCase.WORKER_TERMINATE, WorkerTerminateRequestHandler::new); } @@ -68,6 +80,15 @@ void logToHost(LogRecord record, String invocationId) { @Override public void close() throws Exception { + // Stop accepting HTTP proxy requests before tearing down the gRPC peer + // so in-flight HTTP handlers can drain on completion futures cleanly. + if (this.httpProxyServer != null) { + try { + this.httpProxyServer.close(); + } catch (Exception ex) { + logger.log(Level.WARNING, "Failed to close HTTP proxy server cleanly", ex); + } + } this.peer.get().close(); this.peer.set(null); this.channel.shutdownNow(); @@ -143,6 +164,8 @@ private synchronized void send(String requestId, MessageHandler marshaller private final AtomicReference peer; private final Map>> handlerSuppliers; private final ClassLoaderProvider classPathProvider; + private final HttpInvocationCoordinator httpInvocationCoordinator; + private final HttpProxyServer httpProxyServer; /** * @param functionsUri Host endpoint URI, or null for legacy startup args that only provide host and port. diff --git a/src/main/java/com/microsoft/azure/functions/worker/binding/BindingDataStore.java b/src/main/java/com/microsoft/azure/functions/worker/binding/BindingDataStore.java index 1ccd91c..300006b 100644 --- a/src/main/java/com/microsoft/azure/functions/worker/binding/BindingDataStore.java +++ b/src/main/java/com/microsoft/azure/functions/worker/binding/BindingDataStore.java @@ -125,6 +125,31 @@ public Optional getDataTargetTypedValue(String name) throws Exception }); } + /** + * Returns the raw, unserialized response body of the HTTP output target, or + * {@code null} if no HTTP output target is registered or its body is null. + * + *

Used by the HTTP proxy path to recover streaming bodies + * ({@link java.io.InputStream}, {@code HttpResponseMessage.IOConsumer}) that + * cannot be represented in a protobuf {@code TypedData} and must instead be + * written directly to the {@code HttpExchange} response stream.

+ */ + public Object getHttpResponseRawBody() { + if (this.promotedTargets == null) { + return null; + } + Map promoted = this.targets.get(this.promotedTargets); + if (promoted == null) { + return null; + } + for (DataTarget target : promoted.values()) { + if (target instanceof RpcHttpDataTarget) { + return ((RpcHttpDataTarget) target).getBody(); + } + } + return null; + } + public Optional getOrAddDataTarget(UUID outputId, String name, Type target, boolean ignoreDefinition) { DataTarget output = null; if (this.isDataTargetValid(name, target)) { diff --git a/src/main/java/com/microsoft/azure/functions/worker/binding/RpcHttpDataTarget.java b/src/main/java/com/microsoft/azure/functions/worker/binding/RpcHttpDataTarget.java index 16b45f1..17bd7e9 100644 --- a/src/main/java/com/microsoft/azure/functions/worker/binding/RpcHttpDataTarget.java +++ b/src/main/java/com/microsoft/azure/functions/worker/binding/RpcHttpDataTarget.java @@ -1,9 +1,11 @@ package com.microsoft.azure.functions.worker.binding; +import java.io.InputStream; import java.util.HashMap; import java.util.Map; import com.microsoft.azure.functions.HttpResponseMessage; +import com.microsoft.azure.functions.HttpResponseMessage.IOConsumer; import com.microsoft.azure.functions.HttpStatus; import com.microsoft.azure.functions.HttpStatusType; import com.microsoft.azure.functions.rpc.messages.RpcHttp; @@ -37,14 +39,31 @@ public static TypedData.Builder toRpcHttpData(RpcHttpDataTarget response) throws if (response != null) { RpcHttp.Builder httpBuilder = RpcHttp.newBuilder().setStatusCode(Integer.toString(response.getStatusCode())); response.headers.forEach(httpBuilder::putHeaders); - RpcUnspecifiedDataTarget bodyTarget = new RpcUnspecifiedDataTarget(); - bodyTarget.setValue(response.getBody()); - bodyTarget.computeFromValue().ifPresent(httpBuilder::setBody); + Object body = response.getBody(); + if (isStreamingBody(body)) { + // Streaming bodies (InputStream / IOConsumer) cannot be serialized into a + // protobuf TypedData; they are written directly to the HTTP response by the + // HTTP proxy path. Leave the RpcHttp body unset so downstream code sees an + // empty envelope but can still read status + headers. + } else { + RpcUnspecifiedDataTarget bodyTarget = new RpcUnspecifiedDataTarget(); + bodyTarget.setValue(body); + bodyTarget.computeFromValue().ifPresent(httpBuilder::setBody); + } dataBuilder.setHttp(httpBuilder); } return dataBuilder; } + /** + * Returns {@code true} if {@code body} is a streaming response body type that + * should bypass protobuf serialization and be written directly to the HTTP + * response by the worker's HTTP proxy. + */ + static boolean isStreamingBody(Object body) { + return body instanceof InputStream || body instanceof IOConsumer; + } + private static final DataOperations HTTP_TARGET_OPERATIONS = new DataOperations<>(); static { HTTP_TARGET_OPERATIONS.addTargetOperation(HttpResponseMessage.class, v -> toRpcHttpData((RpcHttpDataTarget) v)); diff --git a/src/main/java/com/microsoft/azure/functions/worker/binding/RpcHttpRequestDataSource.java b/src/main/java/com/microsoft/azure/functions/worker/binding/RpcHttpRequestDataSource.java index 056c9fb..0caebff 100644 --- a/src/main/java/com/microsoft/azure/functions/worker/binding/RpcHttpRequestDataSource.java +++ b/src/main/java/com/microsoft/azure/functions/worker/binding/RpcHttpRequestDataSource.java @@ -1,5 +1,6 @@ package com.microsoft.azure.functions.worker.binding; +import java.io.InputStream; import java.lang.reflect.Type; import java.lang.reflect.TypeVariable; import java.net.URI; @@ -16,14 +17,53 @@ import com.microsoft.azure.functions.HttpStatusType; import com.microsoft.azure.functions.HttpResponseMessage.Builder; import com.microsoft.azure.functions.rpc.messages.RpcHttp; +import com.sun.net.httpserver.HttpExchange; public final class RpcHttpRequestDataSource extends DataSource { + + /** + * Per-thread captured HttpExchange used by the HTTP proxy dispatch path to + * expose the live request body as an {@link InputStream} to functions that + * declare {@code HttpRequestMessage}. Set by + * {@code InvocationRequestHandler.executeProxiedHttp} before the broker + * invocation and cleared in its {@code finally} block. + */ + private static final ThreadLocal CURRENT_EXCHANGE = new ThreadLocal<>(); + + /** + * Installs (or clears, when {@code exchange} is {@code null}) the per-thread + * {@code HttpExchange} that subsequently-constructed instances will use as + * the live request-body source for streaming-input parameters. + */ + public static void setCurrentExchange(HttpExchange exchange) { + if (exchange == null) { + CURRENT_EXCHANGE.remove(); + } else { + CURRENT_EXCHANGE.set(exchange); + } + } + + /** + * Returns the per-thread {@code HttpExchange} currently installed by + * {@link #setCurrentExchange(HttpExchange)}, or {@code null} if none. + * Exposed primarily for integration tests that need to assert the dispatch + * layer correctly installs and clears the side channel around the broker + * invocation. + */ + public static HttpExchange currentExchange() { + return CURRENT_EXCHANGE.get(); + } + public RpcHttpRequestDataSource(String name, RpcHttp value) { super(name, null, HTTP_DATA_OPERATIONS); this.httpPayload = value; this.bodyDataSource = BindingDataStore.rpcSourceFromTypedData(null, this.httpPayload.getBody()); this.fields = Arrays.asList(this.httpPayload.getHeadersMap(), this.httpPayload.getQueryMap(), this.httpPayload.getParamsMap()); + // Snapshot the per-thread exchange (if any) at construction time so it + // is available later when the HTTP_DATA_OPERATIONS lambda resolves the + // body type and decides whether to stream from the live request. + this.capturedExchange = CURRENT_EXCHANGE.get(); this.setValue(this); } @@ -76,12 +116,23 @@ public Builder createResponseBuilder(HttpStatus status) { private final RpcHttp httpPayload; private final DataSource bodyDataSource; private final List> fields; + private final HttpExchange capturedExchange; private static final DataOperations HTTP_DATA_OPERATIONS = new DataOperations<>(); static { HTTP_DATA_OPERATIONS.addGenericOperation(HttpRequestMessage.class, (v, t) -> { Map, Type> typeArgs = TypeUtils.getTypeArguments(t, HttpRequestMessage.class); Type actualType = typeArgs.size() > 0 ? typeArgs.values().iterator().next() : Object.class; + // Streaming-input path: when the user declares + // HttpRequestMessage (or any InputStream subtype) and + // we have a captured HttpExchange (i.e. running under the HTTP + // proxy), hand the live request body to the function instead of + // going through the buffered bodyDataSource. + if (v.capturedExchange != null + && actualType instanceof Class + && InputStream.class.isAssignableFrom((Class) actualType)) { + return new HttpRequestMessageImpl(v, v.capturedExchange.getRequestBody()); + } BindingData bodyData = v.bodyDataSource.computeByType(actualType).orElseThrow(ClassCastException::new); return new HttpRequestMessageImpl(v, bodyData.getValue()); }); diff --git a/src/main/java/com/microsoft/azure/functions/worker/broker/JavaFunctionBroker.java b/src/main/java/com/microsoft/azure/functions/worker/broker/JavaFunctionBroker.java index 3f6037d..ff09dc3 100644 --- a/src/main/java/com/microsoft/azure/functions/worker/broker/JavaFunctionBroker.java +++ b/src/main/java/com/microsoft/azure/functions/worker/broker/JavaFunctionBroker.java @@ -3,10 +3,14 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; import java.net.URL; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import com.microsoft.azure.functions.HttpRequestMessage; import com.microsoft.azure.functions.cache.CacheKey; import com.microsoft.azure.functions.internal.spi.middleware.Middleware; import com.microsoft.azure.functions.rpc.messages.*; @@ -197,6 +201,61 @@ public Optional invokeMethod(String id, InvocationRequest request, Li return executionContextDataSource.getDataStore().getDataTargetTypedValue(BindingDataStore.RETURN_NAME); } + /** + * Result returned by {@link #invokeMethodForHttpProxy(String, InvocationRequest, List)}. + * Exposes both the protobuf return value (for the gRPC reply) and the raw + * (unserialized) HTTP response body so the HTTP proxy path can stream + * {@code InputStream} / {@code HttpResponseMessage.IOConsumer} bodies directly + * to the {@code HttpExchange} response stream without first buffering them + * through a protobuf {@code TypedData}. + */ + public static final class HttpInvocationOutcome { + private final Optional returnValue; + private final Object rawHttpResponseBody; + + public HttpInvocationOutcome(Optional returnValue, Object rawHttpResponseBody) { + this.returnValue = returnValue; + this.rawHttpResponseBody = rawHttpResponseBody; + } + + public Optional getReturnValue() { + return returnValue; + } + + /** + * The raw response body object set by the user function (e.g. the + * {@code InputStream} or {@code IOConsumer} passed to + * {@code HttpResponseMessage.Builder.bodyStream(...)}), or {@code null} + * if no HTTP response was produced or the body was already serialized. + */ + public Object getRawHttpResponseBody() { + return rawHttpResponseBody; + } + } + + /** + * Variant of {@link #invokeMethod(String, InvocationRequest, List)} for the + * HTTP proxy dispatch path that, in addition to the protobuf reply, exposes + * the unserialized HTTP response body so streaming bodies can be written + * directly to the HTTP response. + */ + public HttpInvocationOutcome invokeMethodForHttpProxy(String id, InvocationRequest request, List outputs) + throws Exception { + ExecutionContextDataSource executionContextDataSource = buildExecutionContext(id, request); + + if (isJavaSdkTypesEnabled()) { + this.functionFactories.get(id).create().doNext(executionContextDataSource); + } else { + this.invocationChainFactory.create().doNext(executionContextDataSource); + } + + BindingDataStore dataStore = executionContextDataSource.getDataStore(); + Object rawHttpResponseBody = dataStore.getHttpResponseRawBody(); + outputs.addAll(dataStore.getOutputParameterBindings(true)); + Optional returnValue = dataStore.getDataTargetTypedValue(BindingDataStore.RETURN_NAME); + return new HttpInvocationOutcome(returnValue, rawHttpResponseBody); + } + private ExecutionContextDataSource buildExecutionContext(String id, InvocationRequest request) throws NoSuchMethodException { ImmutablePair methodEntry = this.methods.get(id); @@ -231,6 +290,40 @@ public Optional getMethodName(String id) { return Optional.ofNullable(this.methods.get(id)).map(entry -> entry.left); } + /** + * Returns true when the function with the given id declares an + * {@link HttpRequestMessage} parameter whose body type argument is + * {@link InputStream} (or any subtype). The HTTP proxy dispatch path uses + * this to decide whether to skip the buffered body read and instead expose + * the live HTTP request body as an {@code InputStream}. + */ + public boolean methodHasStreamingHttpBody(String id) { + ImmutablePair entry = this.methods.get(id); + if (entry == null) { + return false; + } + MethodBindInfo mbi = entry.right.getCandidate(); + for (ParamBindInfo p : mbi.getParams()) { + Type t = p.getType(); + if (!(t instanceof ParameterizedType)) { + continue; + } + ParameterizedType pt = (ParameterizedType) t; + if (pt.getRawType() != HttpRequestMessage.class) { + continue; + } + Type[] args = pt.getActualTypeArguments(); + if (args.length == 0) { + continue; + } + if (args[0] instanceof Class + && InputStream.class.isAssignableFrom((Class) args[0])) { + return true; + } + } + return false; + } + // TODO the scope should be package private for testability. Modify the package name as same as main package public Map getTriggerMetadataMap(InvocationRequest request) { String name =""; diff --git a/src/main/java/com/microsoft/azure/functions/worker/handler/InvocationRequestHandler.java b/src/main/java/com/microsoft/azure/functions/worker/handler/InvocationRequestHandler.java index 434a573..e5a1622 100644 --- a/src/main/java/com/microsoft/azure/functions/worker/handler/InvocationRequestHandler.java +++ b/src/main/java/com/microsoft/azure/functions/worker/handler/InvocationRequestHandler.java @@ -1,20 +1,34 @@ package com.microsoft.azure.functions.worker.handler; +import java.io.InputStream; import java.util.*; +import java.util.concurrent.ExecutionException; import java.util.logging.*; +import com.microsoft.azure.functions.HttpResponseMessage.IOConsumer; import com.microsoft.azure.functions.worker.*; +import com.microsoft.azure.functions.worker.binding.RpcHttpRequestDataSource; import com.microsoft.azure.functions.worker.broker.*; +import com.microsoft.azure.functions.worker.broker.JavaFunctionBroker.HttpInvocationOutcome; +import com.microsoft.azure.functions.worker.http.HttpBodyBridge; +import com.microsoft.azure.functions.worker.http.HttpInvocationCoordinator; +import com.microsoft.azure.functions.worker.http.HttpInvocationSlot; import com.microsoft.azure.functions.rpc.messages.*; +import com.sun.net.httpserver.HttpExchange; public class InvocationRequestHandler extends MessageHandler { public InvocationRequestHandler(JavaFunctionBroker broker) { + this(broker, null); + } + + public InvocationRequestHandler(JavaFunctionBroker broker, HttpInvocationCoordinator httpInvocationCoordinator) { super(StreamingMessage::getInvocationRequest, InvocationResponse::newBuilder, InvocationResponse.Builder::setResult, StreamingMessage.Builder::setInvocationResponse); assert broker != null; this.broker = broker; + this.httpInvocationCoordinator = httpInvocationCoordinator; this.invocationLogger = super.getLogger(); } @@ -29,7 +43,15 @@ String execute(InvocationRequest request, InvocationResponse.Builder response) t this.invocationLogger = WorkerLogManager.getInvocationLogger(invocationId); response.setInvocationId(invocationId); - + + // For HTTP-triggered invocations dispatched via the HttpUri capability, the + // gRPC request carries trigger metadata but an empty body. We rendezvous + // with the HTTP arrival via the coordinator, fold the body bytes back into + // the request, and write the response to the held HttpExchange. + if (httpInvocationCoordinator != null && hasHttpInput(request)) { + return executeProxiedHttp(request, response, functionId, invocationId); + } + List outputBindings = new ArrayList<>(); this.broker.invokeMethod(functionId, request, outputBindings).ifPresent(response::setReturnValue); response.addAllOutputData(outputBindings); @@ -38,6 +60,108 @@ String execute(InvocationRequest request, InvocationResponse.Builder response) t this.broker.getMethodName(functionId).orElse("UNKNOWN"), invocationId); } - private JavaFunctionBroker broker; + private String executeProxiedHttp(InvocationRequest request, + InvocationResponse.Builder response, + String functionId, + String invocationId) throws Exception { + HttpInvocationSlot slot = httpInvocationCoordinator.registerGrpcArrival(request); + HttpExchange exchange = null; + boolean streamingInput = false; + try { + try { + exchange = slot.httpArrival().get(); + } catch (ExecutionException ex) { + Throwable cause = ex.getCause() != null ? ex.getCause() : ex; + throw asException(cause); + } + // When the user function declares HttpRequestMessage, + // skip the eager body read so the live request stream remains + // available, and install the exchange on the per-thread side + // channel that RpcHttpRequestDataSource consults at construction + // time. + streamingInput = broker.methodHasStreamingHttpBody(functionId); + InvocationRequest enriched = streamingInput + ? request + : HttpBodyBridge.enrichRequestWithBody(request, exchange); + List outputBindings = new ArrayList<>(); + HttpInvocationOutcome outcome; + if (streamingInput) { + RpcHttpRequestDataSource.setCurrentExchange(exchange); + } + try { + outcome = this.broker.invokeMethodForHttpProxy(functionId, enriched, outputBindings); + } finally { + if (streamingInput) { + RpcHttpRequestDataSource.setCurrentExchange(null); + } + } + outcome.getReturnValue().ifPresent(response::setReturnValue); + response.addAllOutputData(outputBindings); + RpcHttp httpEnvelope = extractHttpResponse(response, outputBindings); + writeHttpResponse(exchange, httpEnvelope, outcome.getRawHttpResponseBody()); + httpInvocationCoordinator.releaseInvocation(invocationId); + return String.format("Function \"%s\" (Id: %s) invoked by Java Worker (HTTP proxy)", + this.broker.getMethodName(functionId).orElse("UNKNOWN"), invocationId); + } catch (Throwable t) { + httpInvocationCoordinator.failInvocation(invocationId, t); + throw asException(t); + } + } + + /** + * Writes the HTTP response to the {@code HttpExchange}. If {@code rawBody} + * is a streaming body ({@link InputStream} or + * {@link IOConsumer}{@code }), bypasses the buffered protobuf + * body and streams directly. Otherwise falls back to the buffered path. + */ + @SuppressWarnings("unchecked") + private static void writeHttpResponse(HttpExchange exchange, RpcHttp envelope, Object rawBody) throws Exception { + if (rawBody instanceof InputStream) { + HttpBodyBridge.writeStreamingResponse(exchange, envelope, (InputStream) rawBody); + return; + } + if (rawBody instanceof IOConsumer) { + HttpBodyBridge.writeStreamingResponse(exchange, envelope, (IOConsumer) rawBody); + return; + } + HttpBodyBridge.writeRpcHttpResponse(exchange, envelope); + } + + private static boolean hasHttpInput(InvocationRequest request) { + for (ParameterBinding binding : request.getInputDataList()) { + if (binding.getData().hasHttp()) { + return true; + } + } + return false; + } + + private static RpcHttp extractHttpResponse(InvocationResponse.Builder response, + List outputBindings) { + if (response.hasReturnValue() && response.getReturnValue().hasHttp()) { + return response.getReturnValue().getHttp(); + } + for (ParameterBinding binding : outputBindings) { + if (binding.getData().hasHttp()) { + return binding.getData().getHttp(); + } + } + // No HTTP response binding produced; respond with an empty 200 so the + // host doesn't see a hung connection. + return RpcHttp.newBuilder().setStatusCode("200").build(); + } + + private static Exception asException(Throwable t) { + if (t instanceof Exception) { + return (Exception) t; + } + if (t instanceof Error) { + throw (Error) t; + } + return new RuntimeException(t); + } + + private final JavaFunctionBroker broker; + private final HttpInvocationCoordinator httpInvocationCoordinator; private Logger invocationLogger; } diff --git a/src/main/java/com/microsoft/azure/functions/worker/handler/WorkerInitRequestHandler.java b/src/main/java/com/microsoft/azure/functions/worker/handler/WorkerInitRequestHandler.java index 618833a..6efff72 100644 --- a/src/main/java/com/microsoft/azure/functions/worker/handler/WorkerInitRequestHandler.java +++ b/src/main/java/com/microsoft/azure/functions/worker/handler/WorkerInitRequestHandler.java @@ -3,7 +3,11 @@ import com.microsoft.azure.functions.worker.*; import com.microsoft.azure.functions.rpc.messages.*; import com.microsoft.azure.functions.worker.broker.JavaFunctionBroker; +import com.microsoft.azure.functions.worker.http.HttpInvocationCoordinator; +import com.microsoft.azure.functions.worker.http.HttpProxyHandler; +import com.microsoft.azure.functions.worker.http.HttpProxyServer; +import java.io.IOException; import java.util.logging.Level; import static com.microsoft.azure.functions.worker.Constants.JAVA_APPLICATIONINSIGHTS_ENABLE_TELEMETRY; @@ -11,11 +15,19 @@ public class WorkerInitRequestHandler extends MessageHandler { public WorkerInitRequestHandler(JavaFunctionBroker broker) { + this(broker, null, null); + } + + public WorkerInitRequestHandler(JavaFunctionBroker broker, + HttpProxyServer httpProxyServer, + HttpInvocationCoordinator httpInvocationCoordinator) { super(StreamingMessage::getWorkerInitRequest, WorkerInitResponse::newBuilder, WorkerInitResponse.Builder::setResult, StreamingMessage.Builder::setWorkerInitResponse); this.broker = broker; + this.httpProxyServer = httpProxyServer; + this.httpInvocationCoordinator = httpInvocationCoordinator; } @Override @@ -30,6 +42,8 @@ String execute(WorkerInitRequest request, WorkerInitResponse.Builder response) { response.putCapabilities("HandlesWorkerTerminateMessage", "HandlesWorkerTerminateMessage"); response.putCapabilities("HandlesWorkerWarmupMessage", "HandlesWorkerWarmupMessage"); + advertiseHttpProxy(response); + if (Boolean.parseBoolean(System.getenv(JAVA_ENABLE_OPENTELEMETRY)) || Boolean.parseBoolean(System.getenv(JAVA_APPLICATIONINSIGHTS_ENABLE_TELEMETRY))) { response.putCapabilities("WorkerOpenTelemetryEnabled", "true"); @@ -41,6 +55,23 @@ String execute(WorkerInitRequest request, WorkerInitResponse.Builder response) { return "Worker initialized"; } + private void advertiseHttpProxy(WorkerInitResponse.Builder response) { + if (httpProxyServer == null || httpInvocationCoordinator == null) { + return; + } + try { + String uri = httpProxyServer.start(new HttpProxyHandler(httpInvocationCoordinator)); + response.putCapabilities("HttpUri", uri); + response.putCapabilities("RequiresRouteParameters", "true"); + WorkerLogManager.getSystemLogger().log(Level.INFO, + "Java worker HTTP proxy listening on " + uri); + } catch (IOException ex) { + // Fall back to gRPC-only path: simply do not advertise HttpUri. + WorkerLogManager.getSystemLogger().log(Level.WARNING, + "Failed to start HTTP proxy server; continuing without HttpUri capability", ex); + } + } + private WorkerMetadata.Builder composeWorkerMetadata(){ WorkerMetadata.Builder workerMetadataBuilder = WorkerMetadata.newBuilder(); workerMetadataBuilder.setRuntimeName("java"); @@ -51,4 +82,6 @@ private WorkerMetadata.Builder composeWorkerMetadata(){ } private final JavaFunctionBroker broker; + private final HttpProxyServer httpProxyServer; + private final HttpInvocationCoordinator httpInvocationCoordinator; } diff --git a/src/main/java/com/microsoft/azure/functions/worker/http/HttpBodyBridge.java b/src/main/java/com/microsoft/azure/functions/worker/http/HttpBodyBridge.java new file mode 100644 index 0000000..af6a726 --- /dev/null +++ b/src/main/java/com/microsoft/azure/functions/worker/http/HttpBodyBridge.java @@ -0,0 +1,278 @@ +package com.microsoft.azure.functions.worker.http; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import com.google.protobuf.ByteString; +import com.microsoft.azure.functions.HttpResponseMessage.IOConsumer; +import com.microsoft.azure.functions.rpc.messages.InvocationRequest; +import com.microsoft.azure.functions.rpc.messages.ParameterBinding; +import com.microsoft.azure.functions.rpc.messages.RpcHttp; +import com.microsoft.azure.functions.rpc.messages.TypedData; +import com.sun.net.httpserver.Headers; +import com.sun.net.httpserver.HttpExchange; + +/** + * Bridges between the JDK {@link HttpExchange} surface and the Functions + * worker's protobuf-based binding plumbing. + * + *

For HTTP-proxied invocations, the Functions host sends the request body + * and headers over HTTP (to the worker's embedded proxy server) and the trigger + * metadata over gRPC (in an {@link InvocationRequest} whose HTTP input has an + * empty body). This class:

+ *
    + *
  • Reads the body off the {@code HttpExchange} and folds it into the + * {@code RpcHttp} payload that downstream binding code expects.
  • + *
  • Writes the {@code RpcHttp} response produced by the user function back + * to the {@code HttpExchange}.
  • + *
+ * + *

Body classification mirrors the host's {@code PopulateBody} logic and the + * existing worker behavior for in-process bodies: {@code application/json} → + * {@code TypedData.json}; {@code text/*} and form-encoded → {@code TypedData.string}; + * everything else (including absent {@code Content-Type}) → {@code TypedData.bytes}.

+ */ +public final class HttpBodyBridge { + private static final String CONTENT_TYPE_HEADER = "Content-Type"; + private static final int READ_CHUNK = 8192; + + private HttpBodyBridge() { + } + + /** + * Returns a copy of {@code request} with the body of its HTTP input replaced + * by the body read from {@code exchange}. If no input parameter holds an + * {@code RpcHttp} payload, {@code request} is returned unchanged. + * + *

The body is read eagerly into memory. Streaming support (InputStream + * as a parameter type) is layered on in a later commit and bypasses this + * eager read.

+ */ + public static InvocationRequest enrichRequestWithBody(InvocationRequest request, HttpExchange exchange) + throws IOException { + InvocationRequest.Builder builder = request.toBuilder(); + List inputs = request.getInputDataList(); + boolean enriched = false; + byte[] body = null; + for (int i = 0; i < inputs.size(); i++) { + ParameterBinding input = inputs.get(i); + if (!input.getData().hasHttp()) { + continue; + } + if (body == null) { + body = readBody(exchange); + } + RpcHttp.Builder httpBuilder = input.getData().getHttp().toBuilder(); + httpBuilder.setBody(buildBodyTypedData(body, contentType(exchange.getRequestHeaders()))); + TypedData.Builder dataBuilder = input.getData().toBuilder().setHttp(httpBuilder); + ParameterBinding patched = input.toBuilder().setData(dataBuilder).build(); + builder.setInputData(i, patched); + enriched = true; + } + return enriched ? builder.build() : request; + } + + /** + * Writes an {@link RpcHttp} response (status + headers + body) to the + * given {@link HttpExchange}. The exchange is left open for the caller + * to close. + */ + public static void writeRpcHttpResponse(HttpExchange exchange, RpcHttp response) throws IOException { + int status = parseStatus(response.getStatusCode()); + for (Map.Entry header : response.getHeadersMap().entrySet()) { + exchange.getResponseHeaders().add(header.getKey(), header.getValue()); + } + byte[] bodyBytes = extractBodyBytes(response.getBody(), response.getHeadersMap()); + if (bodyBytes.length == 0) { + // -1 == no response body; the response body stream does not need to be opened. + exchange.sendResponseHeaders(status, -1); + } else { + exchange.sendResponseHeaders(status, bodyBytes.length); + try (OutputStream os = exchange.getResponseBody()) { + os.write(bodyBytes); + } + } + } + + /** + * Streams the body of an HTTP response from an {@link InputStream} directly + * to the {@link HttpExchange} response stream using chunked transfer + * encoding (when the upstream client supports it). The {@code envelope} + * supplies status and headers; its body, if any, is ignored. + * + *

The supplied {@code body} stream is closed by this method regardless + * of outcome. The exchange is left open for the caller to close.

+ */ + public static void writeStreamingResponse(HttpExchange exchange, RpcHttp envelope, InputStream body) + throws IOException { + int status = parseStatus(envelope.getStatusCode()); + applyResponseHeaders(exchange, envelope); + // length=0 ⇒ chunked transfer-encoding (or close-delimited for HTTP/1.0). + exchange.sendResponseHeaders(status, 0); + try (InputStream in = body; OutputStream os = exchange.getResponseBody()) { + byte[] chunk = new byte[READ_CHUNK]; + int n; + while ((n = in.read(chunk)) != -1) { + os.write(chunk, 0, n); + } + } + } + + /** + * Streams the body of an HTTP response by invoking the {@code writer} + * callback with the {@link HttpExchange} response stream. The {@code envelope} + * supplies status and headers; its body, if any, is ignored. + * + *

The response stream is opened (and chunked encoding selected) before + * {@code writer} is invoked; it is flushed and closed when {@code writer} + * returns. Any {@link IOException} thrown by {@code writer} propagates to + * the caller. The exchange is left open for the caller to close.

+ */ + public static void writeStreamingResponse(HttpExchange exchange, RpcHttp envelope, IOConsumer writer) + throws IOException { + int status = parseStatus(envelope.getStatusCode()); + applyResponseHeaders(exchange, envelope); + exchange.sendResponseHeaders(status, 0); + try (OutputStream os = exchange.getResponseBody()) { + writer.accept(os); + os.flush(); + } + } + + private static void applyResponseHeaders(HttpExchange exchange, RpcHttp envelope) { + for (Map.Entry header : envelope.getHeadersMap().entrySet()) { + exchange.getResponseHeaders().add(header.getKey(), header.getValue()); + } + } + + /** + * Writes a plain text error response to the exchange. Used by the proxy + * handler when it cannot wire up an invocation (missing header, lost + * coordinator, etc.). + */ + public static void writeErrorResponse(HttpExchange exchange, int status, String message) throws IOException { + byte[] body = (message == null ? "" : message).getBytes(StandardCharsets.UTF_8); + exchange.getResponseHeaders().set("Content-Type", "text/plain; charset=utf-8"); + exchange.sendResponseHeaders(status, body.length); + try (OutputStream os = exchange.getResponseBody()) { + os.write(body); + } + } + + static TypedData buildBodyTypedData(byte[] bytes, String contentType) { + if (contentType != null) { + String normalized = contentType.toLowerCase(Locale.ROOT); + if (normalized.startsWith("application/json")) { + return TypedData.newBuilder() + .setJson(new String(bytes, charsetFromContentType(contentType))) + .build(); + } + if (normalized.startsWith("text/") + || normalized.startsWith("application/x-www-form-urlencoded") + || normalized.startsWith("application/xml") + || normalized.startsWith("application/javascript")) { + return TypedData.newBuilder() + .setString(new String(bytes, charsetFromContentType(contentType))) + .build(); + } + } + return TypedData.newBuilder() + .setBytes(ByteString.copyFrom(bytes)) + .build(); + } + + private static byte[] readBody(HttpExchange exchange) throws IOException { + try (InputStream in = exchange.getRequestBody()) { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + byte[] chunk = new byte[READ_CHUNK]; + int n; + while ((n = in.read(chunk)) != -1) { + out.write(chunk, 0, n); + } + return out.toByteArray(); + } + } + + private static String contentType(Headers headers) { + return headers.getFirst(CONTENT_TYPE_HEADER); + } + + private static Charset charsetFromContentType(String contentType) { + if (contentType == null) { + return StandardCharsets.UTF_8; + } + int idx = contentType.toLowerCase(Locale.ROOT).indexOf("charset="); + if (idx < 0) { + return StandardCharsets.UTF_8; + } + String charset = contentType.substring(idx + "charset=".length()).trim(); + int semi = charset.indexOf(';'); + if (semi >= 0) { + charset = charset.substring(0, semi).trim(); + } + // strip enclosing quotes + if (charset.length() >= 2 + && charset.charAt(0) == '"' + && charset.charAt(charset.length() - 1) == '"') { + charset = charset.substring(1, charset.length() - 1); + } + try { + return Charset.forName(charset); + } catch (RuntimeException ex) { + return StandardCharsets.UTF_8; + } + } + + private static int parseStatus(String statusCode) { + if (statusCode == null || statusCode.isEmpty()) { + return 200; + } + try { + int status = Integer.parseInt(statusCode); + if (status < 100 || status > 599) { + return 500; + } + return status; + } catch (NumberFormatException ex) { + return 500; + } + } + + private static byte[] extractBodyBytes(TypedData body, Map headers) { + if (body == null) { + return new byte[0]; + } + switch (body.getDataCase()) { + case BYTES: + return body.getBytes().toByteArray(); + case STRING: + return body.getString().getBytes(charsetFromContentType(headerLookup(headers, "Content-Type"))); + case JSON: + return body.getJson().getBytes(StandardCharsets.UTF_8); + case DATA_NOT_SET: + return new byte[0]; + default: + // Unsupported body shapes are coerced to their string form so we never drop the response. + return body.toString().getBytes(StandardCharsets.UTF_8); + } + } + + private static String headerLookup(Map headers, String key) { + if (headers == null) { + return null; + } + for (Map.Entry entry : headers.entrySet()) { + if (key.equalsIgnoreCase(entry.getKey())) { + return entry.getValue(); + } + } + return null; + } +} diff --git a/src/main/java/com/microsoft/azure/functions/worker/http/HttpInvocationCoordinator.java b/src/main/java/com/microsoft/azure/functions/worker/http/HttpInvocationCoordinator.java new file mode 100644 index 0000000..2fc312f --- /dev/null +++ b/src/main/java/com/microsoft/azure/functions/worker/http/HttpInvocationCoordinator.java @@ -0,0 +1,106 @@ +package com.microsoft.azure.functions.worker.http; + +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import com.microsoft.azure.functions.rpc.messages.InvocationRequest; +import com.sun.net.httpserver.HttpExchange; + +/** + * Synchronizes the two halves of an HTTP-proxied invocation by invocation id. + * + *

The Functions host dispatches each HTTP-triggered invocation as two + * messages that arrive on independent channels:

+ *
    + *
  1. An HTTP request forwarded to the worker's embedded proxy server + * (delivered to a worker thread inside the JDK HttpServer pool).
  2. + *
  3. A gRPC {@code InvocationRequest} carrying trigger metadata and + * the matching {@code invocationId}.
  4. + *
+ * + *

This coordinator owns the per-invocation slot, exposes rendezvous methods + * that block until the other half arrives, and releases the slot when the + * invocation completes. Slot lookup and creation are atomic so the two halves + * can race without losing one another.

+ * + *

The coordinator does not impose timeouts: the host owns end-to-end + * timeout enforcement, and per-invocation hangs are observable via the worker's + * existing health and watchdog telemetry.

+ */ +public final class HttpInvocationCoordinator { + private final ConcurrentMap slots = new ConcurrentHashMap<>(); + + /** + * Registers the arrival of an HTTP request for the given invocation. + * Returns the slot so the HTTP handler can await + * {@link HttpInvocationSlot#completion()}. + * + * @throws IllegalStateException if HTTP arrival was already registered for this id + */ + public HttpInvocationSlot registerHttpArrival(String invocationId, HttpExchange exchange) { + Objects.requireNonNull(invocationId, "invocationId"); + Objects.requireNonNull(exchange, "exchange"); + HttpInvocationSlot slot = slots.computeIfAbsent(invocationId, HttpInvocationSlot::new); + if (!slot.httpArrival().complete(exchange)) { + throw new IllegalStateException( + "HTTP arrival already registered for invocation " + invocationId); + } + return slot; + } + + /** + * Registers the arrival of a gRPC InvocationRequest for the given invocation. + * Returns the slot so the gRPC dispatcher can await + * {@link HttpInvocationSlot#httpArrival()}. + * + * @throws IllegalStateException if gRPC arrival was already registered for this id + */ + public HttpInvocationSlot registerGrpcArrival(InvocationRequest request) { + Objects.requireNonNull(request, "request"); + String invocationId = request.getInvocationId(); + HttpInvocationSlot slot = slots.computeIfAbsent(invocationId, HttpInvocationSlot::new); + if (!slot.grpcArrival().complete(request)) { + throw new IllegalStateException( + "gRPC arrival already registered for invocation " + invocationId); + } + return slot; + } + + /** + * Marks the invocation as complete and removes its slot. Idempotent. Any + * outstanding rendezvous futures are cancelled to unblock callers. + */ + public void releaseInvocation(String invocationId) { + Objects.requireNonNull(invocationId, "invocationId"); + HttpInvocationSlot slot = slots.remove(invocationId); + if (slot == null) { + return; + } + slot.httpArrival().cancel(false); + slot.grpcArrival().cancel(false); + slot.completion().complete(null); + } + + /** + * Fails the invocation slot with the given throwable. Used when the worker + * decides to abort an in-flight invocation (e.g., HTTP handler exception + * before the user function runs). The slot is removed after failure. + */ + public void failInvocation(String invocationId, Throwable cause) { + Objects.requireNonNull(invocationId, "invocationId"); + Objects.requireNonNull(cause, "cause"); + HttpInvocationSlot slot = slots.remove(invocationId); + if (slot == null) { + return; + } + slot.httpArrival().completeExceptionally(cause); + slot.grpcArrival().completeExceptionally(cause); + slot.completion().completeExceptionally(cause); + } + + /** Visible for tests. */ + public int activeInvocationCount() { + return slots.size(); + } +} diff --git a/src/main/java/com/microsoft/azure/functions/worker/http/HttpInvocationSlot.java b/src/main/java/com/microsoft/azure/functions/worker/http/HttpInvocationSlot.java new file mode 100644 index 0000000..6f4c77c --- /dev/null +++ b/src/main/java/com/microsoft/azure/functions/worker/http/HttpInvocationSlot.java @@ -0,0 +1,66 @@ +package com.microsoft.azure.functions.worker.http; + +import java.util.concurrent.CompletableFuture; + +import com.microsoft.azure.functions.rpc.messages.InvocationRequest; +import com.sun.net.httpserver.HttpExchange; + +/** + * Holds the rendezvous state for a single in-flight HTTP-proxied invocation. + * + *

The Functions host delivers an invocation along two independent paths:

+ *
    + *
  • An HTTP request forwarded to the worker's proxy server, carrying the + * request body and headers.
  • + *
  • A gRPC {@code InvocationRequest} carrying trigger metadata, route + * parameters, and the {@code invocationId} used to correlate the two.
  • + *
+ * + *

Either side may arrive first. The slot exposes futures that the HTTP + * handler thread and the gRPC dispatcher thread wait on. The {@link #completion} + * future is signaled once the invocation has fully responded, allowing the HTTP + * handler to return from {@code handle()} so the server can close the exchange.

+ * + *

The class is mutable from the coordinator's perspective only; consumers + * see immutable {@link CompletableFuture} handles and use them to await + * rendezvous and completion.

+ */ +public final class HttpInvocationSlot { + private final String invocationId; + private final CompletableFuture httpArrival = new CompletableFuture<>(); + private final CompletableFuture grpcArrival = new CompletableFuture<>(); + private final CompletableFuture completion = new CompletableFuture<>(); + + HttpInvocationSlot(String invocationId) { + this.invocationId = invocationId; + } + + public String getInvocationId() { + return invocationId; + } + + /** + * Future that resolves when the HTTP request for this invocation arrives. + * Consumed by the gRPC dispatcher thread. + */ + public CompletableFuture httpArrival() { + return httpArrival; + } + + /** + * Future that resolves when the gRPC {@code InvocationRequest} for this + * invocation arrives. Consumed by the HTTP handler thread. + */ + public CompletableFuture grpcArrival() { + return grpcArrival; + } + + /** + * Future that resolves when the invocation has fully completed (response + * written to HTTP, output bindings collected for the gRPC response). + * The HTTP handler thread waits on this before returning from {@code handle()}. + */ + public CompletableFuture completion() { + return completion; + } +} diff --git a/src/main/java/com/microsoft/azure/functions/worker/http/HttpProxyHandler.java b/src/main/java/com/microsoft/azure/functions/worker/http/HttpProxyHandler.java new file mode 100644 index 0000000..2b74841 --- /dev/null +++ b/src/main/java/com/microsoft/azure/functions/worker/http/HttpProxyHandler.java @@ -0,0 +1,101 @@ +package com.microsoft.azure.functions.worker.http; + +import java.io.IOException; +import java.util.Objects; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.microsoft.azure.functions.worker.WorkerLogManager; +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; + +/** + * Handler attached to the worker's embedded HTTP proxy server. + * + *

Receives HTTP requests forwarded by the Functions host (via the HttpUri + * capability) and parks them on the {@link HttpInvocationCoordinator} until + * the gRPC dispatcher picks them up. The actual invocation runs on the gRPC + * dispatch thread, which reads the request body and writes the response back + * to the same {@link HttpExchange}. This handler simply:

+ *
    + *
  1. Extracts {@code x-ms-invocation-id} from the request headers.
  2. + *
  3. Registers the HTTP arrival with the coordinator.
  4. + *
  5. Blocks on the slot's {@code completion} future so the exchange stays + * open until the gRPC side finishes writing the response.
  6. + *
  7. Returns from {@code handle()}, letting the JDK HttpServer close the + * exchange.
  8. + *
+ * + *

Missing header or unexpected failures are converted into appropriate HTTP + * error responses so the host always gets a closed connection.

+ */ +public final class HttpProxyHandler implements HttpHandler { + /** Header set by {@code DefaultHttpProxyService} on the host side. */ + public static final String INVOCATION_ID_HEADER = "x-ms-invocation-id"; + + private static final Logger LOGGER = WorkerLogManager.getSystemLogger(); + + private final HttpInvocationCoordinator coordinator; + + public HttpProxyHandler(HttpInvocationCoordinator coordinator) { + this.coordinator = Objects.requireNonNull(coordinator, "coordinator"); + } + + @Override + public void handle(HttpExchange exchange) throws IOException { + String invocationId = exchange.getRequestHeaders().getFirst(INVOCATION_ID_HEADER); + if (invocationId == null || invocationId.isEmpty()) { + LOGGER.warning("HTTP proxy request missing " + INVOCATION_ID_HEADER + " header"); + try { + HttpBodyBridge.writeErrorResponse(exchange, 400, + "Missing required header: " + INVOCATION_ID_HEADER); + } finally { + exchange.close(); + } + return; + } + + HttpInvocationSlot slot; + try { + slot = coordinator.registerHttpArrival(invocationId, exchange); + } catch (IllegalStateException ex) { + LOGGER.log(Level.WARNING, "Duplicate HTTP arrival for invocation " + invocationId, ex); + try { + HttpBodyBridge.writeErrorResponse(exchange, 409, + "Duplicate HTTP arrival for invocation " + invocationId); + } finally { + exchange.close(); + } + return; + } + + try { + // Block until the gRPC dispatcher signals invocation completion. + // The dispatcher is responsible for writing the response to this + // exchange; we simply hold the connection open in the meantime. + slot.completion().get(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + tryWriteError(exchange, 503, "Worker interrupted while waiting for invocation"); + } catch (CancellationException ex) { + // Coordinator cancelled the futures via releaseInvocation(); + // the gRPC side has already written (or chosen not to write) the response. + } catch (ExecutionException ex) { + Throwable cause = ex.getCause() != null ? ex.getCause() : ex; + LOGGER.log(Level.WARNING, "Invocation " + invocationId + " failed before responding", cause); + tryWriteError(exchange, 500, "Invocation failed: " + cause.getMessage()); + } finally { + exchange.close(); + } + } + + private static void tryWriteError(HttpExchange exchange, int status, String message) { + try { + HttpBodyBridge.writeErrorResponse(exchange, status, message); + } catch (IOException ioe) { + LOGGER.log(Level.FINE, "Unable to write error response (response likely already started)", ioe); + } + } +} diff --git a/src/main/java/com/microsoft/azure/functions/worker/http/HttpProxyServer.java b/src/main/java/com/microsoft/azure/functions/worker/http/HttpProxyServer.java new file mode 100644 index 0000000..c1971bb --- /dev/null +++ b/src/main/java/com/microsoft/azure/functions/worker/http/HttpProxyServer.java @@ -0,0 +1,126 @@ +package com.microsoft.azure.functions.worker.http; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Objects; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; + +import com.microsoft.azure.functions.worker.WorkerLogManager; +import com.sun.net.httpserver.HttpHandler; +import com.sun.net.httpserver.HttpServer; + +/** + * Embedded HTTP proxy server used to receive HTTP-triggered invocations + * directly from the Functions host (HttpUri capability). + * + *

Backed by {@link com.sun.net.httpserver.HttpServer}, a JDK built-in + * since Java 6, so the worker takes on no new runtime dependencies.

+ * + *

The server binds to the loopback address on an ephemeral port and is + * started by {@link #start(HttpHandler)} with a single root handler. + * Worker threads come from a cached executor that mirrors the gRPC dispatch + * pool: unbounded growth, named for diagnostics, 15 s drain on shutdown. + * Capping concurrency is left to the platform, matching the Go, Python, and + * .NET isolated workers.

+ */ +public final class HttpProxyServer implements AutoCloseable { + private static final long EXECUTOR_SHUTDOWN_SECONDS = 15L; + private static final long SERVER_STOP_SECONDS = 5L; + + private final ProxyConfig config; + private final AtomicBoolean started = new AtomicBoolean(false); + + private HttpServer server; + private ExecutorService executor; + private String boundUri; + + public HttpProxyServer(ProxyConfig config) { + this.config = Objects.requireNonNull(config, "config"); + } + + /** + * Binds the server, attaches {@code rootHandler} to {@code "/"}, and starts + * serving requests. Returns the absolute {@code http://host:port} URI that + * should be advertised to the Functions host via the {@code HttpUri} + * capability. + * + * @throws IllegalStateException if start has already been called + * @throws IOException if the server cannot bind + */ + public synchronized String start(HttpHandler rootHandler) throws IOException { + Objects.requireNonNull(rootHandler, "rootHandler"); + if (!started.compareAndSet(false, true)) { + throw new IllegalStateException("HttpProxyServer already started"); + } + InetSocketAddress bindAddress = new InetSocketAddress( + config.getBindAddress(), config.getBindPort()); + // Backlog 0 → JDK default. + this.server = HttpServer.create(bindAddress, 0); + this.executor = Executors.newCachedThreadPool(new ProxyThreadFactory()); + this.server.setExecutor(this.executor); + this.server.createContext("/", rootHandler); + this.server.start(); + InetSocketAddress actual = this.server.getAddress(); + this.boundUri = "http://" + actual.getHostString() + ":" + actual.getPort(); + WorkerLogManager.getSystemLogger().log(Level.INFO, + "HTTP proxy server bound to " + boundUri); + return boundUri; + } + + /** + * Returns the URI the server is listening on, or {@code null} if the + * server has not been started. + */ + public String getBoundUri() { + return boundUri; + } + + @Override + public synchronized void close() { + if (!started.compareAndSet(true, false)) { + return; + } + if (server != null) { + try { + // Allow in-flight requests up to SERVER_STOP_SECONDS to drain. + server.stop((int) SERVER_STOP_SECONDS); + } catch (RuntimeException ex) { + WorkerLogManager.getSystemLogger().log(Level.WARNING, + "Error stopping HTTP proxy server", ex); + } + server = null; + } + if (executor != null) { + executor.shutdown(); + try { + if (!executor.awaitTermination(EXECUTOR_SHUTDOWN_SECONDS, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ex) { + executor.shutdownNow(); + Thread.currentThread().interrupt(); + } + executor = null; + } + boundUri = null; + } + + /** + * Thread factory that names worker threads for diagnostics. Daemon threads + * so they do not block JVM shutdown if the server is not explicitly closed. + */ + private static final class ProxyThreadFactory implements java.util.concurrent.ThreadFactory { + private final java.util.concurrent.atomic.AtomicInteger counter = new java.util.concurrent.atomic.AtomicInteger(); + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, "functions-http-proxy-" + counter.incrementAndGet()); + t.setDaemon(true); + return t; + } + } +} diff --git a/src/main/java/com/microsoft/azure/functions/worker/http/ProxyConfig.java b/src/main/java/com/microsoft/azure/functions/worker/http/ProxyConfig.java new file mode 100644 index 0000000..78f31d6 --- /dev/null +++ b/src/main/java/com/microsoft/azure/functions/worker/http/ProxyConfig.java @@ -0,0 +1,43 @@ +package com.microsoft.azure.functions.worker.http; + +import java.util.Objects; + +/** + * Configuration for the embedded HTTP proxy server used to receive HTTP-triggered + * invocations directly from the Functions host (HttpUri capability). + * + *

The configuration deliberately does not impose request body size limits or + * per-request timeouts. The Functions front-end (nginx) enforces an upstream + * ceiling, and per-worker overload is managed by the platform — matching the + * behavior of the Go, Python, and .NET isolated workers.

+ */ +public final class ProxyConfig { + /** Loopback bind address. Other workers also bind to 127.0.0.1 only. */ + public static final String DEFAULT_BIND_ADDRESS = "127.0.0.1"; + + /** Ephemeral port. The OS picks an unused port at bind time. */ + public static final int DEFAULT_BIND_PORT = 0; + + private final String bindAddress; + private final int bindPort; + + public ProxyConfig(String bindAddress, int bindPort) { + this.bindAddress = Objects.requireNonNull(bindAddress, "bindAddress"); + if (bindPort < 0 || bindPort > 65535) { + throw new IllegalArgumentException("bindPort out of range: " + bindPort); + } + this.bindPort = bindPort; + } + + public static ProxyConfig defaults() { + return new ProxyConfig(DEFAULT_BIND_ADDRESS, DEFAULT_BIND_PORT); + } + + public String getBindAddress() { + return bindAddress; + } + + public int getBindPort() { + return bindPort; + } +} diff --git a/src/test/java/com/microsoft/azure/functions/worker/binding/BindingDataStoreTest.java b/src/test/java/com/microsoft/azure/functions/worker/binding/BindingDataStoreTest.java new file mode 100644 index 0000000..305f9da --- /dev/null +++ b/src/test/java/com/microsoft/azure/functions/worker/binding/BindingDataStoreTest.java @@ -0,0 +1,96 @@ +package com.microsoft.azure.functions.worker.binding; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.HashMap; +import java.util.UUID; + +import com.microsoft.azure.functions.HttpResponseMessage; +import com.microsoft.azure.functions.HttpResponseMessage.Builder; +import com.microsoft.azure.functions.HttpResponseMessage.IOConsumer; + +import org.junit.jupiter.api.Test; + +public class BindingDataStoreTest { + + @Test + public void getHttpResponseRawBodyReturnsNullWhenNoTargetsPromoted() { + BindingDataStore store = new BindingDataStore(); + assertNull(store.getHttpResponseRawBody()); + } + + @Test + public void getHttpResponseRawBodyReturnsNullWhenPromotedUuidHasNoTargets() { + BindingDataStore store = new BindingDataStore(); + // Promote a UUID that was never populated. + store.promoteDataTargets(UUID.randomUUID()); + assertNull(store.getHttpResponseRawBody()); + } + + @Test + public void getHttpResponseRawBodyReturnsNullWhenHttpTargetHasNoBody() { + BindingDataStore store = registerHttpReturnTarget(builder -> { /* no body */ }); + assertNull(store.getHttpResponseRawBody()); + } + + @Test + public void getHttpResponseRawBodyReturnsInputStreamBody() { + InputStream stream = new ByteArrayInputStream(new byte[]{1, 2, 3}); + BindingDataStore store = registerHttpReturnTarget(builder -> builder.body(stream)); + Object raw = store.getHttpResponseRawBody(); + assertSame(stream, raw); + assertTrue(raw instanceof InputStream); + } + + @Test + public void getHttpResponseRawBodyReturnsIOConsumerBody() { + IOConsumer writer = out -> out.write(0); + BindingDataStore store = registerHttpReturnTarget(builder -> builder.body(writer)); + Object raw = store.getHttpResponseRawBody(); + assertSame(writer, raw); + assertTrue(raw instanceof IOConsumer); + } + + @Test + public void getHttpResponseRawBodyReturnsStringBody() { + BindingDataStore store = registerHttpReturnTarget(builder -> builder.body("hello")); + assertEquals("hello", store.getHttpResponseRawBody()); + } + + /** + * Helper that sets up a store with a single promoted HTTP output target on + * the {@code $return} binding, then invokes {@code configure} on the + * underlying {@link HttpResponseMessage.Builder}. + */ + @FunctionalInterface + private interface BuilderConfigurator { + void apply(Builder builder); + } + + private static BindingDataStore registerHttpReturnTarget(BuilderConfigurator configure) { + BindingDataStore store = new BindingDataStore(); + // getOrAddDataTarget internally consults `definitions` via isDefinitionOutput + // before short-circuiting on ignoreDefinition; install an empty map so that + // lookup returns Optional.empty() instead of throwing NPE. + store.setBindingDefinitions(new HashMap<>()); + UUID outputId = UUID.randomUUID(); + // ignoreDefinition=true bypasses the binding-definition check, which is + // adequate for unit-testing the data-store accessor in isolation. + BindingData data = store.getOrAddDataTarget( + outputId, BindingDataStore.RETURN_NAME, HttpResponseMessage.class, true).orElseThrow( + () -> new AssertionError("Expected getOrAddDataTarget to create an HTTP target")); + // RpcHttpDataTarget sets its own DataTarget value to `this` in its + // constructor, so the BindingData value is the Builder itself. + Object value = data.getValue(); + assertTrue(value instanceof Builder, + "Expected RpcHttpDataTarget value to be an HttpResponseMessage.Builder, got " + value); + configure.apply((Builder) value); + store.promoteDataTargets(outputId); + return store; + } +} diff --git a/src/test/java/com/microsoft/azure/functions/worker/binding/RpcHttpDataTargetTest.java b/src/test/java/com/microsoft/azure/functions/worker/binding/RpcHttpDataTargetTest.java new file mode 100644 index 0000000..b58b95f --- /dev/null +++ b/src/test/java/com/microsoft/azure/functions/worker/binding/RpcHttpDataTargetTest.java @@ -0,0 +1,91 @@ +package com.microsoft.azure.functions.worker.binding; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; + +import com.microsoft.azure.functions.HttpResponseMessage.IOConsumer; +import com.microsoft.azure.functions.HttpStatus; +import com.microsoft.azure.functions.rpc.messages.RpcHttp; +import com.microsoft.azure.functions.rpc.messages.TypedData; + +import org.junit.jupiter.api.Test; + +public class RpcHttpDataTargetTest { + + @Test + public void isStreamingBodyDetectsInputStream() { + InputStream stream = new ByteArrayInputStream(new byte[]{1, 2, 3}); + assertTrue(RpcHttpDataTarget.isStreamingBody(stream)); + } + + @Test + public void isStreamingBodyDetectsIOConsumer() { + IOConsumer writer = out -> out.write(1); + assertTrue(RpcHttpDataTarget.isStreamingBody(writer)); + } + + @Test + public void isStreamingBodyRejectsNull() { + assertFalse(RpcHttpDataTarget.isStreamingBody(null)); + } + + @Test + public void isStreamingBodyRejectsString() { + assertFalse(RpcHttpDataTarget.isStreamingBody("hello")); + } + + @Test + public void isStreamingBodyRejectsByteArray() { + assertFalse(RpcHttpDataTarget.isStreamingBody(new byte[]{1, 2, 3})); + } + + @Test + public void toRpcHttpDataSkipsBodyForInputStream() throws Exception { + RpcHttpDataTarget target = new RpcHttpDataTarget(); + target.status(HttpStatus.OK) + .header("Content-Type", "text/event-stream") + .body(new ByteArrayInputStream("ignored".getBytes(StandardCharsets.UTF_8))); + + TypedData.Builder builder = RpcHttpDataTarget.toRpcHttpData(target); + RpcHttp http = builder.getHttp(); + + assertEquals("200", http.getStatusCode()); + assertEquals("text/event-stream", http.getHeadersOrDefault("Content-Type", null)); + // Streaming bodies are NOT serialized into the protobuf envelope; the field is unset. + assertFalse(http.hasBody(), "InputStream body should not be serialized into RpcHttp.body"); + } + + @Test + public void toRpcHttpDataSkipsBodyForIOConsumer() throws Exception { + RpcHttpDataTarget target = new RpcHttpDataTarget(); + target.status(HttpStatus.ACCEPTED) + .header("X-Trace", "abc") + .body((IOConsumer) (out -> out.write(0))); + + TypedData.Builder builder = RpcHttpDataTarget.toRpcHttpData(target); + RpcHttp http = builder.getHttp(); + + assertEquals("202", http.getStatusCode()); + assertEquals("abc", http.getHeadersOrDefault("X-Trace", null)); + assertFalse(http.hasBody(), "IOConsumer body should not be serialized into RpcHttp.body"); + } + + @Test + public void toRpcHttpDataSerializesStringBody() throws Exception { + RpcHttpDataTarget target = new RpcHttpDataTarget(); + target.status(HttpStatus.OK) + .body("hello"); + + TypedData.Builder builder = RpcHttpDataTarget.toRpcHttpData(target); + RpcHttp http = builder.getHttp(); + + assertEquals("200", http.getStatusCode()); + assertTrue(http.hasBody(), "Non-streaming body should be serialized"); + assertEquals("hello", http.getBody().getString()); + } +} diff --git a/src/test/java/com/microsoft/azure/functions/worker/binding/tests/RpcHttpRequestDataSourceTest.java b/src/test/java/com/microsoft/azure/functions/worker/binding/tests/RpcHttpRequestDataSourceTest.java index 813f4fa..1c655ce 100644 --- a/src/test/java/com/microsoft/azure/functions/worker/binding/tests/RpcHttpRequestDataSourceTest.java +++ b/src/test/java/com/microsoft/azure/functions/worker/binding/tests/RpcHttpRequestDataSourceTest.java @@ -1,6 +1,8 @@ package com.microsoft.azure.functions.worker.binding.tests; +import java.io.ByteArrayInputStream; +import java.io.InputStream; import java.lang.invoke.WrongMethodTypeException; import java.lang.reflect.Method; import java.lang.reflect.Parameter; @@ -10,9 +12,14 @@ import com.microsoft.azure.functions.rpc.messages.RpcHttp; import com.microsoft.azure.functions.rpc.messages.TypedData; import com.microsoft.azure.functions.worker.binding.*; +import com.sun.net.httpserver.HttpExchange; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class RpcHttpRequestDataSourceTest { @@ -26,6 +33,9 @@ public void HttpRequestIntBody(HttpRequestMessage request) { public void HttpRequestBinaryBody(HttpRequestMessage request) { } + public void HttpRequestStreamBody(HttpRequestMessage request) { + } + public static RpcHttp getTestRpcHttp(Object inputBody) throws Exception { TypedData.Builder dataBuilder = TypedData.newBuilder(); RpcHttp.Builder httpBuilder = RpcHttp.newBuilder() @@ -108,4 +118,76 @@ private Method getFunctionMethod(String methodName) { return functionMethod; } + @Test + public void rpcHttpDataSource_To_HttpRequestMessage_StreamBody_returnsLiveExchangeStream() throws Exception { + Method method = getFunctionMethod("HttpRequestStreamBody"); + Parameter[] parameters = method.getParameters(); + String sourceKey = "testRpcHttp"; + + byte[] payload = "live-stream-body".getBytes(); + InputStream liveStream = new ByteArrayInputStream(payload); + HttpExchange exchange = mock(HttpExchange.class); + when(exchange.getRequestBody()).thenReturn(liveStream); + + RpcHttp input = getTestRpcHttp(new byte[0]); + RpcHttpRequestDataSource.setCurrentExchange(exchange); + try { + RpcHttpRequestDataSource rpcHttp = new RpcHttpRequestDataSource(sourceKey, input); + Optional bindingData = rpcHttp.computeByName(sourceKey, + parameters[0].getParameterizedType()); + BindingData arg = bindingData.orElseThrow(WrongMethodTypeException::new); + HttpRequestMessage requestMsg = (HttpRequestMessage) arg.getValue(); + assertNotNull(requestMsg.getBody()); + assertSame(liveStream, requestMsg.getBody(), + "Streaming-input path must hand back the exchange's live request body unmodified"); + } finally { + RpcHttpRequestDataSource.setCurrentExchange(null); + } + } + + @Test + public void rpcHttpDataSource_To_HttpRequestMessage_StreamBody_withNoCapturedExchange_fallsThrough() + throws Exception { + Method method = getFunctionMethod("HttpRequestStreamBody"); + Parameter[] parameters = method.getParameters(); + String sourceKey = "testRpcHttp"; + + // No exchange installed on the ThreadLocal: the streaming-input branch + // must not trigger and we should fall through to the existing + // bodyDataSource path. With a byte[] body and InputStream target type, no + // converter is registered for InputStream, so the existing pipeline raises + // a ClassCastException -- the same behavior as before this feature was + // added. The point of this test is to confirm we have NOT silently + // swallowed the failure. + RpcHttp input = getTestRpcHttp("ignored-body".getBytes()); + RpcHttpRequestDataSource.setCurrentExchange(null); + RpcHttpRequestDataSource rpcHttp = new RpcHttpRequestDataSource(sourceKey, input); + org.junit.jupiter.api.Assertions.assertThrows( + ClassCastException.class, + () -> rpcHttp.computeByName(sourceKey, parameters[0].getParameterizedType()), + "Without a captured exchange, InputStream resolution must fall through and fail as it did before the feature"); + } + + @Test + public void setCurrentExchange_null_clearsThreadLocal() throws Exception { + HttpExchange exchange = mock(HttpExchange.class); + RpcHttpRequestDataSource.setCurrentExchange(exchange); + // Cleared before construction: the data source must NOT capture the prior + // exchange. + RpcHttpRequestDataSource.setCurrentExchange(null); + + Method method = getFunctionMethod("HttpRequestStreamBody"); + Parameter[] parameters = method.getParameters(); + RpcHttp input = getTestRpcHttp(new byte[0]); + RpcHttpRequestDataSource rpcHttp = new RpcHttpRequestDataSource("k", input); + // Same expectation as the no-exchange-ever case: streaming branch doesn't + // fire, fallthrough fails. If we had leaked the previously-installed + // exchange, mockito would have returned a real (empty default) stream and + // the call would succeed instead of throwing. + org.junit.jupiter.api.Assertions.assertThrows( + ClassCastException.class, + () -> rpcHttp.computeByName("k", parameters[0].getParameterizedType()), + "After clearing the ThreadLocal, the prior exchange must not leak into newly-constructed data sources"); + } + } diff --git a/src/test/java/com/microsoft/azure/functions/worker/broker/JavaFunctionBrokerStreamingTest.java b/src/test/java/com/microsoft/azure/functions/worker/broker/JavaFunctionBrokerStreamingTest.java new file mode 100644 index 0000000..090e2d8 --- /dev/null +++ b/src/test/java/com/microsoft/azure/functions/worker/broker/JavaFunctionBrokerStreamingTest.java @@ -0,0 +1,119 @@ +package com.microsoft.azure.functions.worker.broker; + +import com.microsoft.azure.functions.HttpRequestMessage; +import com.microsoft.azure.functions.worker.reflect.DefaultClassLoaderProvider; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.junit.jupiter.api.Test; + +import java.io.BufferedInputStream; +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Unit tests for {@link JavaFunctionBroker#methodHasStreamingHttpBody(String)}. + * The HTTP proxy dispatch path uses this signature pre-check to decide whether + * to skip the buffered request body read and expose the live HTTP exchange + * input stream to user functions that declare + * {@code HttpRequestMessage}. + */ +public class JavaFunctionBrokerStreamingTest { + + // Test signatures spanning the supported and unsupported shapes. + public void streamingFn(HttpRequestMessage req) {} + public void streamingFnSubtype(HttpRequestMessage req) {} + public void stringFn(HttpRequestMessage req) {} + public void rawFn(@SuppressWarnings("rawtypes") HttpRequestMessage req) {} + public void noHttpFn(String s, int i) {} + + @Test + public void methodHasStreamingHttpBody_inputStreamParam_returnsTrue() throws Exception { + JavaFunctionBroker broker = new JavaFunctionBroker(new DefaultClassLoaderProvider()); + registerMethod(broker, "id-stream", "streamingFn"); + assertTrue(broker.methodHasStreamingHttpBody("id-stream")); + } + + @Test + public void methodHasStreamingHttpBody_inputStreamSubtypeParam_returnsTrue() throws Exception { + JavaFunctionBroker broker = new JavaFunctionBroker(new DefaultClassLoaderProvider()); + registerMethod(broker, "id-substream", "streamingFnSubtype"); + assertTrue(broker.methodHasStreamingHttpBody("id-substream")); + } + + @Test + public void methodHasStreamingHttpBody_stringParam_returnsFalse() throws Exception { + JavaFunctionBroker broker = new JavaFunctionBroker(new DefaultClassLoaderProvider()); + registerMethod(broker, "id-string", "stringFn"); + assertFalse(broker.methodHasStreamingHttpBody("id-string")); + } + + @Test + public void methodHasStreamingHttpBody_rawHttpRequestMessage_returnsFalse() throws Exception { + JavaFunctionBroker broker = new JavaFunctionBroker(new DefaultClassLoaderProvider()); + registerMethod(broker, "id-raw", "rawFn"); + assertFalse(broker.methodHasStreamingHttpBody("id-raw")); + } + + @Test + public void methodHasStreamingHttpBody_noHttpParam_returnsFalse() throws Exception { + JavaFunctionBroker broker = new JavaFunctionBroker(new DefaultClassLoaderProvider()); + registerMethod(broker, "id-nohttp", "noHttpFn"); + assertFalse(broker.methodHasStreamingHttpBody("id-nohttp")); + } + + @Test + public void methodHasStreamingHttpBody_unknownId_returnsFalse() throws Exception { + JavaFunctionBroker broker = new JavaFunctionBroker(new DefaultClassLoaderProvider()); + assertFalse(broker.methodHasStreamingHttpBody("never-registered")); + } + + /** + * Resolves the named test method on this class, builds a real + * {@link MethodBindInfo} for it, wraps a mocked {@link FunctionDefinition} + * around it, and reflectively inserts the entry into the broker's private + * {@code methods} map so the public {@code methodHasStreamingHttpBody} + * lookup can find it without going through the full descriptor / + * classloader pipeline. + */ + private void registerMethod(JavaFunctionBroker broker, String id, String methodName) throws Exception { + Method method = null; + for (Method m : JavaFunctionBrokerStreamingTest.class.getMethods()) { + if (m.getName().equals(methodName)) { + method = m; + break; + } + } + if (method == null) { + throw new IllegalArgumentException("Test method not found: " + methodName); + } + MethodBindInfo mbi = new MethodBindInfo(method); + + FunctionDefinition functionDefinition = mock(FunctionDefinition.class); + when(functionDefinition.getCandidate()).thenReturn(mbi); + + @SuppressWarnings("unchecked") + Map> methods = + (Map>) getField(broker, "methods"); + methods.put(id, ImmutablePair.of(methodName, functionDefinition)); + + // Touch a live stream so the unused-import / classloading paths are + // exercised; protects the assertion that the broker logic depends only + // on the param type, not on any runtime body. + try (InputStream ignored = new ByteArrayInputStream(new byte[0])) { + // no-op + } + } + + private static Object getField(Object target, String name) throws Exception { + Field f = target.getClass().getDeclaredField(name); + f.setAccessible(true); + return f.get(target); + } +} diff --git a/src/test/java/com/microsoft/azure/functions/worker/handler/HttpProxyEndToEndTest.java b/src/test/java/com/microsoft/azure/functions/worker/handler/HttpProxyEndToEndTest.java new file mode 100644 index 0000000..aec2368 --- /dev/null +++ b/src/test/java/com/microsoft/azure/functions/worker/handler/HttpProxyEndToEndTest.java @@ -0,0 +1,465 @@ +package com.microsoft.azure.functions.worker.handler; + +import com.google.protobuf.ByteString; +import com.microsoft.azure.functions.HttpResponseMessage; +import com.microsoft.azure.functions.HttpResponseMessage.IOConsumer; +import com.microsoft.azure.functions.rpc.messages.InvocationRequest; +import com.microsoft.azure.functions.rpc.messages.InvocationResponse; +import com.microsoft.azure.functions.rpc.messages.ParameterBinding; +import com.microsoft.azure.functions.rpc.messages.RpcHttp; +import com.microsoft.azure.functions.rpc.messages.StreamingMessage; +import com.microsoft.azure.functions.rpc.messages.TypedData; +import com.microsoft.azure.functions.worker.WorkerLogManager; +import com.microsoft.azure.functions.worker.binding.RpcHttpRequestDataSource; +import com.microsoft.azure.functions.worker.broker.JavaFunctionBroker; +import com.microsoft.azure.functions.worker.broker.JavaFunctionBroker.HttpInvocationOutcome; +import com.microsoft.azure.functions.worker.http.HttpInvocationCoordinator; +import com.microsoft.azure.functions.worker.http.HttpProxyHandler; +import com.microsoft.azure.functions.worker.http.HttpProxyServer; +import com.microsoft.azure.functions.worker.http.ProxyConfig; +import com.sun.net.httpserver.HttpExchange; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Answers; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Logger; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * End-to-end integration tests for the HTTP proxy dispatch path. These wire + * the real {@link HttpProxyServer}, {@link HttpProxyHandler}, and + * {@link HttpInvocationCoordinator} together with a real + * {@link InvocationRequestHandler} backed by a mocked {@link JavaFunctionBroker}. + * + *

Each test sends an actual HTTP request through the embedded proxy on one + * thread while simulating the gRPC {@code InvocationRequest} arrival on the + * test thread, exercising the rendezvous and validating the full pipeline: + * HTTP arrival, body propagation (buffered or streamed), broker invocation, + * and response writeback (buffered or streamed).

+ * + *

The mocked broker captures the {@link InvocationRequest} it sees and the + * thread-local exchange that was active during {@code invokeMethodForHttpProxy} + * so the tests can assert that the streaming pre-check and the body-read skip + * behaved correctly.

+ */ +public class HttpProxyEndToEndTest { + + private static final String INVOCATION_ID = "e2e-invocation-1"; + private static final String FUNCTION_ID = "e2e-function-1"; + private static final long HTTP_AWAIT_MS = 5_000; + + private HttpInvocationCoordinator coordinator; + private HttpProxyServer server; + private String proxyUri; + private JavaFunctionBroker brokerMock; + private InvocationRequestHandler handler; + private MockedStatic workerLogManagerMock; + + @BeforeEach + public void setUp() throws Exception { + // WorkerLogManager.getInvocationLogger calls addHandlers which asserts + // the singleton has been initialized with a JavaWorkerClient (only true + // when the worker is running under the host). Mock the static façade + // and forward everything else to the real implementation so system/host + // loggers continue to work for diagnostic output during tests. + workerLogManagerMock = Mockito.mockStatic(WorkerLogManager.class, Answers.CALLS_REAL_METHODS); + workerLogManagerMock.when(() -> WorkerLogManager.getInvocationLogger(anyString())) + .thenReturn(Logger.getAnonymousLogger()); + + coordinator = new HttpInvocationCoordinator(); + server = new HttpProxyServer(ProxyConfig.defaults()); + proxyUri = server.start(new HttpProxyHandler(coordinator)); + brokerMock = mock(JavaFunctionBroker.class); + when(brokerMock.getMethodName(anyString())).thenReturn(Optional.of("TestFn")); + handler = new InvocationRequestHandler(brokerMock, coordinator); + } + + @AfterEach + public void tearDown() { + if (server != null) { + server.close(); + } + if (workerLogManagerMock != null) { + workerLogManagerMock.close(); + } + } + + // -------- 1. Buffered request, buffered response (existing path) -------- + @Test + public void bufferedRequest_bufferedResponse_roundTrips() throws Exception { + when(brokerMock.methodHasStreamingHttpBody(anyString())).thenReturn(false); + + AtomicReference seenRequest = new AtomicReference<>(); + AtomicReference seenExchange = new AtomicReference<>(); + when(brokerMock.invokeMethodForHttpProxy(anyString(), any(), any())) + .thenAnswer((InvocationOnMock inv) -> { + seenRequest.set(inv.getArgument(1)); + seenExchange.set(RpcHttpRequestDataSource.currentExchange()); + RpcHttp respHttp = RpcHttp.newBuilder() + .setStatusCode("200") + .putHeaders("Content-Type", "text/plain") + .setBody(TypedData.newBuilder().setString("pong").build()) + .build(); + TypedData returnValue = TypedData.newBuilder().setHttp(respHttp).build(); + return new HttpInvocationOutcome(Optional.of(returnValue), null); + }); + + CompletableFuture futureResp = sendHttpRequestAsync( + "POST", "/api/echo", "text/plain", "ping".getBytes(StandardCharsets.UTF_8)); + + awaitHttpArrival(); + runGrpcArrival(); + + HttpClientResult resp = futureResp.get(HTTP_AWAIT_MS, TimeUnit.MILLISECONDS); + + assertEquals(200, resp.statusCode); + assertEquals("pong", new String(resp.body, StandardCharsets.UTF_8)); + assertEquals("text/plain", resp.contentType); + + // The buffered path must have folded the HTTP body into the RpcHttp envelope. + // For text/* content types the bridge stores the body as a string, not bytes + // (mirrors host PopulateBody behavior). + TypedData httpInput = seenRequest.get().getInputDataList().get(0).getData(); + assertEquals("ping", httpInput.getHttp().getBody().getString()); + // For non-streaming requests the per-thread exchange must NOT be set. + assertNull(seenExchange.get(), "Buffered-input invocations must not install a thread-local exchange"); + } + + // -------- 2. Buffered request, streaming response (commit #5 path) -------- + @Test + public void bufferedRequest_streamingInputStreamResponse_streamsBackUnbuffered() throws Exception { + when(brokerMock.methodHasStreamingHttpBody(anyString())).thenReturn(false); + + byte[] payload = repeat("stream-chunk-", 5000); // ~65KB to force multi-chunk write + when(brokerMock.invokeMethodForHttpProxy(anyString(), any(), any())) + .thenAnswer((InvocationOnMock inv) -> { + RpcHttp envelope = RpcHttp.newBuilder() + .setStatusCode("200") + .putHeaders("Content-Type", "application/octet-stream") + .build(); + TypedData returnValue = TypedData.newBuilder().setHttp(envelope).build(); + return new HttpInvocationOutcome(Optional.of(returnValue), new ByteArrayInputStream(payload)); + }); + + CompletableFuture futureResp = sendHttpRequestAsync( + "GET", "/api/download", null, new byte[0]); + + awaitHttpArrival(); + runGrpcArrival(); + + HttpClientResult resp = futureResp.get(HTTP_AWAIT_MS, TimeUnit.MILLISECONDS); + + assertEquals(200, resp.statusCode); + assertEquals("application/octet-stream", resp.contentType); + assertEquals(payload.length, resp.body.length); + assertTrue(java.util.Arrays.equals(payload, resp.body), "Streamed body must match payload byte-for-byte"); + // Chunked transfer encoding (server-side) is used when the body length is + // unknown. HttpURLConnection transparently dechunks; we can verify + // either header or fall-through behavior via the chunked flag. + assertEquals("chunked", resp.transferEncoding, + "Streaming-output path must use chunked transfer-encoding (no Content-Length)"); + } + + // -------- 3. Buffered request, streaming response via IOConsumer -------- + @Test + public void bufferedRequest_streamingIOConsumerResponse_streamsBack() throws Exception { + when(brokerMock.methodHasStreamingHttpBody(anyString())).thenReturn(false); + + IOConsumer writer = out -> { + for (int i = 0; i < 100; i++) { + out.write(("event: tick\ndata: " + i + "\n\n").getBytes(StandardCharsets.UTF_8)); + } + }; + when(brokerMock.invokeMethodForHttpProxy(anyString(), any(), any())) + .thenAnswer((InvocationOnMock inv) -> { + RpcHttp envelope = RpcHttp.newBuilder() + .setStatusCode("200") + .putHeaders("Content-Type", "text/event-stream") + .build(); + TypedData returnValue = TypedData.newBuilder().setHttp(envelope).build(); + return new HttpInvocationOutcome(Optional.of(returnValue), writer); + }); + + CompletableFuture futureResp = sendHttpRequestAsync( + "GET", "/api/sse", null, new byte[0]); + + awaitHttpArrival(); + runGrpcArrival(); + + HttpClientResult resp = futureResp.get(HTTP_AWAIT_MS, TimeUnit.MILLISECONDS); + + assertEquals(200, resp.statusCode); + assertEquals("text/event-stream", resp.contentType); + String body = new String(resp.body, StandardCharsets.UTF_8); + assertTrue(body.startsWith("event: tick\ndata: 0\n\n")); + assertTrue(body.endsWith("event: tick\ndata: 99\n\n")); + } + + // -------- 4. Streaming request, buffered response (commit #6 path) -------- + @Test + public void streamingRequest_bufferedResponse_skipsBodyReadAndExposesLiveStream() throws Exception { + when(brokerMock.methodHasStreamingHttpBody(anyString())).thenReturn(true); + + byte[] uploadPayload = repeat("upload-frag-", 4096); // ~50KB + AtomicReference consumedByBroker = new AtomicReference<>(); + AtomicReference seenRequest = new AtomicReference<>(); + AtomicReference seenExchange = new AtomicReference<>(); + when(brokerMock.invokeMethodForHttpProxy(anyString(), any(), any())) + .thenAnswer((InvocationOnMock inv) -> { + seenRequest.set(inv.getArgument(1)); + HttpExchange exch = RpcHttpRequestDataSource.currentExchange(); + seenExchange.set(exch); + // Consume the live request body to validate the streaming path actually wired through. + if (exch != null) { + byte[] consumed = readAll(exch.getRequestBody()); + consumedByBroker.set(consumed); + } + RpcHttp respHttp = RpcHttp.newBuilder() + .setStatusCode("201") + .putHeaders("Content-Type", "text/plain") + .setBody(TypedData.newBuilder().setString("ok").build()) + .build(); + TypedData returnValue = TypedData.newBuilder().setHttp(respHttp).build(); + return new HttpInvocationOutcome(Optional.of(returnValue), null); + }); + + CompletableFuture futureResp = sendHttpRequestAsync( + "POST", "/api/upload", "application/octet-stream", uploadPayload); + + awaitHttpArrival(); + runGrpcArrival(); + + HttpClientResult resp = futureResp.get(HTTP_AWAIT_MS, TimeUnit.MILLISECONDS); + + assertEquals(201, resp.statusCode); + assertEquals("ok", new String(resp.body, StandardCharsets.UTF_8)); + + // Critical: the streaming-input path must NOT have populated the protobuf body. + TypedData httpInput = seenRequest.get().getInputDataList().get(0).getData(); + assertEquals(0, httpInput.getHttp().getBody().getBytes().size(), + "Streaming-input request must leave the protobuf body untouched (empty)"); + assertNotNull(seenExchange.get(), "Streaming-input invocation must install the thread-local exchange"); + // The bytes the broker read off the live exchange must equal what the client sent. + assertNotNull(consumedByBroker.get()); + assertTrue(java.util.Arrays.equals(uploadPayload, consumedByBroker.get()), + "Bytes read from the live exchange must equal the bytes the client sent"); + // After the invocation completes, the thread-local must be cleared so the + // worker thread can be safely reused. + assertNull(RpcHttpRequestDataSource.currentExchange(), + "InvocationRequestHandler must clear the thread-local exchange in its finally block"); + } + + // -------- 5. Full streaming: streaming request + streaming response -------- + @Test + public void streamingRequest_streamingResponse_endToEnd() throws Exception { + when(brokerMock.methodHasStreamingHttpBody(anyString())).thenReturn(true); + + byte[] uploadPayload = repeat("full-stream-", 1024); + AtomicReference echoedBack = new AtomicReference<>(); + when(brokerMock.invokeMethodForHttpProxy(anyString(), any(), any())) + .thenAnswer((InvocationOnMock inv) -> { + HttpExchange exch = RpcHttpRequestDataSource.currentExchange(); + byte[] consumed = readAll(exch.getRequestBody()); + echoedBack.set(consumed); + RpcHttp envelope = RpcHttp.newBuilder() + .setStatusCode("200") + .putHeaders("Content-Type", "application/octet-stream") + .build(); + TypedData returnValue = TypedData.newBuilder().setHttp(envelope).build(); + // Echo what we read back to the client as a streaming response. + return new HttpInvocationOutcome(Optional.of(returnValue), new ByteArrayInputStream(consumed)); + }); + + CompletableFuture futureResp = sendHttpRequestAsync( + "POST", "/api/echo-stream", "application/octet-stream", uploadPayload); + + awaitHttpArrival(); + runGrpcArrival(); + + HttpClientResult resp = futureResp.get(HTTP_AWAIT_MS, TimeUnit.MILLISECONDS); + + assertEquals(200, resp.statusCode); + assertTrue(java.util.Arrays.equals(uploadPayload, echoedBack.get()), + "Bytes broker read off live stream must equal client's uploaded payload"); + assertTrue(java.util.Arrays.equals(uploadPayload, resp.body), + "Streaming response body must equal what the broker echoed"); + } + + // -------- 6. Coordinator slot is released after success -------- + @Test + public void coordinatorSlotReleasedAfterSuccessfulInvocation() throws Exception { + when(brokerMock.methodHasStreamingHttpBody(anyString())).thenReturn(false); + when(brokerMock.invokeMethodForHttpProxy(anyString(), any(), any())) + .thenAnswer((InvocationOnMock inv) -> { + RpcHttp respHttp = RpcHttp.newBuilder() + .setStatusCode("204") + .build(); + TypedData returnValue = TypedData.newBuilder().setHttp(respHttp).build(); + return new HttpInvocationOutcome(Optional.of(returnValue), null); + }); + + CompletableFuture futureResp = sendHttpRequestAsync( + "DELETE", "/api/item", null, new byte[0]); + + awaitHttpArrival(); + assertEquals(1, coordinator.activeInvocationCount()); + runGrpcArrival(); + + HttpClientResult resp = futureResp.get(HTTP_AWAIT_MS, TimeUnit.MILLISECONDS); + assertEquals(204, resp.statusCode); + + // The slot must be released so the coordinator can serve subsequent invocations. + assertEquals(0, coordinator.activeInvocationCount(), + "Coordinator must release the slot after a successful invocation"); + } + + // -------- 7. Broker exception surfaces as a 500 on the HTTP side -------- + @Test + public void brokerExceptionPropagatesAsHttp500() throws Exception { + when(brokerMock.methodHasStreamingHttpBody(anyString())).thenReturn(false); + when(brokerMock.invokeMethodForHttpProxy(anyString(), any(), any())) + .thenThrow(new RuntimeException("user function blew up")); + + CompletableFuture futureResp = sendHttpRequestAsync( + "GET", "/api/boom", null, new byte[0]); + + awaitHttpArrival(); + // The gRPC-side execute throws; handle() catches it. We invoke directly so + // the caller (this test) doesn't need to deal with gRPC marshalling. + try { + runGrpcArrival(); + } catch (Exception ignored) { + // Expected: the broker's RuntimeException propagates out of execute(). + } + + HttpClientResult resp = futureResp.get(HTTP_AWAIT_MS, TimeUnit.MILLISECONDS); + assertEquals(500, resp.statusCode); + assertTrue(new String(resp.body, StandardCharsets.UTF_8).contains("user function blew up"), + "500 body must surface the underlying failure message"); + } + + // -------- Helpers -------- + + private void awaitHttpArrival() throws InterruptedException { + long deadline = System.currentTimeMillis() + HTTP_AWAIT_MS; + while (coordinator.activeInvocationCount() == 0 && System.currentTimeMillis() < deadline) { + Thread.sleep(10); + } + assertEquals(1, coordinator.activeInvocationCount(), + "HTTP arrival should have registered with the coordinator by now"); + } + + private void runGrpcArrival() throws Exception { + InvocationRequest request = buildInvocationRequest(); + InvocationResponse.Builder response = InvocationResponse.newBuilder(); + handler.execute(request, response); + } + + private static InvocationRequest buildInvocationRequest() { + // Trigger metadata only; body is empty (the host's HTTP proxy contract). + RpcHttp httpEnvelope = RpcHttp.newBuilder() + .setMethod("POST") + .setUrl("http://localhost/api/test") + .build(); + TypedData inputData = TypedData.newBuilder().setHttp(httpEnvelope).build(); + ParameterBinding binding = ParameterBinding.newBuilder() + .setName("req") + .setData(inputData) + .build(); + return InvocationRequest.newBuilder() + .setInvocationId(INVOCATION_ID) + .setFunctionId(FUNCTION_ID) + .addInputData(binding) + .build(); + } + + private CompletableFuture sendHttpRequestAsync( + String method, String path, String contentType, byte[] body) { + return CompletableFuture.supplyAsync(() -> { + try { + HttpURLConnection conn = (HttpURLConnection) URI.create(proxyUri + path).toURL().openConnection(); + conn.setRequestMethod(method); + conn.setRequestProperty(HttpProxyHandler.INVOCATION_ID_HEADER, INVOCATION_ID); + if (contentType != null) { + conn.setRequestProperty("Content-Type", contentType); + } + if (body != null && body.length > 0) { + conn.setDoOutput(true); + conn.setFixedLengthStreamingMode(body.length); + try (OutputStream os = conn.getOutputStream()) { + os.write(body); + } + } + conn.connect(); + int status = conn.getResponseCode(); + String returnedContentType = conn.getHeaderField("Content-Type"); + String transferEncoding = conn.getHeaderField("Transfer-Encoding"); + InputStream in = status >= 200 && status < 400 ? conn.getInputStream() : conn.getErrorStream(); + byte[] respBody = in != null ? readAll(in) : new byte[0]; + conn.disconnect(); + return new HttpClientResult(status, returnedContentType, transferEncoding, respBody); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + }); + } + + private static byte[] readAll(InputStream in) throws IOException { + java.io.ByteArrayOutputStream buf = new java.io.ByteArrayOutputStream(); + byte[] chunk = new byte[4096]; + int n; + while ((n = in.read(chunk)) > 0) { + buf.write(chunk, 0, n); + } + return buf.toByteArray(); + } + + private static byte[] repeat(String fragment, int times) { + StringBuilder sb = new StringBuilder(fragment.length() * times); + for (int i = 0; i < times; i++) { + sb.append(fragment); + } + return sb.toString().getBytes(StandardCharsets.UTF_8); + } + + /** Snapshot of the response observable by the HTTP client. */ + private static final class HttpClientResult { + final int statusCode; + final String contentType; + final String transferEncoding; + final byte[] body; + + HttpClientResult(int statusCode, String contentType, String transferEncoding, byte[] body) { + this.statusCode = statusCode; + this.contentType = contentType; + this.transferEncoding = transferEncoding; + this.body = body; + } + } +} diff --git a/src/test/java/com/microsoft/azure/functions/worker/http/HttpBodyBridgeTest.java b/src/test/java/com/microsoft/azure/functions/worker/http/HttpBodyBridgeTest.java new file mode 100644 index 0000000..5463d26 --- /dev/null +++ b/src/test/java/com/microsoft/azure/functions/worker/http/HttpBodyBridgeTest.java @@ -0,0 +1,330 @@ +package com.microsoft.azure.functions.worker.http; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotSame; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +import com.google.protobuf.ByteString; +import com.microsoft.azure.functions.rpc.messages.InvocationRequest; +import com.microsoft.azure.functions.rpc.messages.ParameterBinding; +import com.microsoft.azure.functions.rpc.messages.RpcHttp; +import com.microsoft.azure.functions.rpc.messages.TypedData; +import com.sun.net.httpserver.Headers; +import com.sun.net.httpserver.HttpExchange; + +import org.junit.jupiter.api.Test; + +public class HttpBodyBridgeTest { + + @Test + public void buildBodyTypedDataJsonContentType() { + TypedData data = HttpBodyBridge.buildBodyTypedData( + "{\"k\":1}".getBytes(StandardCharsets.UTF_8), "application/json"); + assertEquals(TypedData.DataCase.JSON, data.getDataCase()); + assertEquals("{\"k\":1}", data.getJson()); + } + + @Test + public void buildBodyTypedDataJsonWithCharsetSuffix() { + TypedData data = HttpBodyBridge.buildBodyTypedData( + "{}".getBytes(StandardCharsets.UTF_8), "application/json; charset=utf-8"); + assertEquals(TypedData.DataCase.JSON, data.getDataCase()); + } + + @Test + public void buildBodyTypedDataTextContentType() { + TypedData data = HttpBodyBridge.buildBodyTypedData( + "hello".getBytes(StandardCharsets.UTF_8), "text/plain"); + assertEquals(TypedData.DataCase.STRING, data.getDataCase()); + assertEquals("hello", data.getString()); + } + + @Test + public void buildBodyTypedDataFormEncoded() { + TypedData data = HttpBodyBridge.buildBodyTypedData( + "a=1&b=2".getBytes(StandardCharsets.UTF_8), "application/x-www-form-urlencoded"); + assertEquals(TypedData.DataCase.STRING, data.getDataCase()); + } + + @Test + public void buildBodyTypedDataBinaryWhenNoContentType() { + byte[] bytes = new byte[]{1, 2, 3}; + TypedData data = HttpBodyBridge.buildBodyTypedData(bytes, null); + assertEquals(TypedData.DataCase.BYTES, data.getDataCase()); + assertArrayEquals(bytes, data.getBytes().toByteArray()); + } + + @Test + public void buildBodyTypedDataBinaryWhenOctetStream() { + byte[] bytes = "binary".getBytes(StandardCharsets.UTF_8); + TypedData data = HttpBodyBridge.buildBodyTypedData(bytes, "application/octet-stream"); + assertEquals(TypedData.DataCase.BYTES, data.getDataCase()); + } + + @Test + public void buildBodyTypedDataRespectsCharset() { + byte[] bytes = "héllo".getBytes(StandardCharsets.ISO_8859_1); + TypedData data = HttpBodyBridge.buildBodyTypedData(bytes, "text/plain; charset=ISO-8859-1"); + assertEquals(TypedData.DataCase.STRING, data.getDataCase()); + assertEquals("héllo", data.getString()); + } + + @Test + public void enrichRequestWithBodyReplacesHttpBody() throws Exception { + InvocationRequest original = InvocationRequest.newBuilder() + .setInvocationId("inv-1") + .addInputData(ParameterBinding.newBuilder() + .setName("req") + .setData(TypedData.newBuilder() + .setHttp(RpcHttp.newBuilder().setMethod("POST").setUrl("http://localhost/api/x")))) + .build(); + HttpExchange exchange = mockExchangeWithBody("payload".getBytes(StandardCharsets.UTF_8), "text/plain"); + + InvocationRequest enriched = HttpBodyBridge.enrichRequestWithBody(original, exchange); + assertNotSame(original, enriched); + TypedData body = enriched.getInputData(0).getData().getHttp().getBody(); + assertEquals(TypedData.DataCase.STRING, body.getDataCase()); + assertEquals("payload", body.getString()); + // Method/url preserved. + assertEquals("POST", enriched.getInputData(0).getData().getHttp().getMethod()); + } + + @Test + public void enrichRequestWithBodyReturnsSameWhenNoHttpInput() throws Exception { + InvocationRequest original = InvocationRequest.newBuilder() + .setInvocationId("inv-1") + .addInputData(ParameterBinding.newBuilder() + .setName("queueItem") + .setData(TypedData.newBuilder().setString("hello"))) + .build(); + HttpExchange exchange = mock(HttpExchange.class); + InvocationRequest result = HttpBodyBridge.enrichRequestWithBody(original, exchange); + assertSame(original, result, "Non-HTTP requests should not be modified"); + } + + @Test + public void enrichRequestReadsChunkedBodyLargerThanBuffer() throws Exception { + // Simulate transfer-encoding: chunked by providing a body larger than the read buffer. + byte[] big = new byte[20_000]; + for (int i = 0; i < big.length; i++) { + big[i] = (byte) (i & 0xff); + } + InvocationRequest original = InvocationRequest.newBuilder() + .setInvocationId("inv-big") + .addInputData(ParameterBinding.newBuilder().setName("req") + .setData(TypedData.newBuilder().setHttp(RpcHttp.newBuilder().setMethod("POST")))) + .build(); + HttpExchange exchange = mockExchangeWithBody(big, "application/octet-stream"); + + InvocationRequest enriched = HttpBodyBridge.enrichRequestWithBody(original, exchange); + TypedData body = enriched.getInputData(0).getData().getHttp().getBody(); + assertEquals(TypedData.DataCase.BYTES, body.getDataCase()); + assertArrayEquals(big, body.getBytes().toByteArray()); + } + + @Test + public void writeRpcHttpResponseWritesStatusHeadersAndBody() throws Exception { + ByteArrayOutputStream captured = new ByteArrayOutputStream(); + Headers responseHeaders = new Headers(); + HttpExchange exchange = mock(HttpExchange.class); + when(exchange.getResponseHeaders()).thenReturn(responseHeaders); + when(exchange.getResponseBody()).thenReturn(captured); + + RpcHttp response = RpcHttp.newBuilder() + .setStatusCode("201") + .putHeaders("Content-Type", "application/json") + .putHeaders("X-Custom", "value") + .setBody(TypedData.newBuilder().setJson("{\"ok\":true}")) + .build(); + + HttpBodyBridge.writeRpcHttpResponse(exchange, response); + + byte[] expected = "{\"ok\":true}".getBytes(StandardCharsets.UTF_8); + verify(exchange).sendResponseHeaders(201, expected.length); + assertArrayEquals(expected, captured.toByteArray()); + assertEquals("application/json", responseHeaders.getFirst("Content-Type")); + assertEquals("value", responseHeaders.getFirst("X-Custom")); + } + + @Test + public void writeRpcHttpResponseHandlesEmptyBody() throws Exception { + ByteArrayOutputStream captured = new ByteArrayOutputStream(); + HttpExchange exchange = mock(HttpExchange.class); + when(exchange.getResponseHeaders()).thenReturn(new Headers()); + when(exchange.getResponseBody()).thenReturn(captured); + + RpcHttp response = RpcHttp.newBuilder().setStatusCode("204").build(); + HttpBodyBridge.writeRpcHttpResponse(exchange, response); + + verify(exchange).sendResponseHeaders(204, -1); + assertEquals(0, captured.size()); + } + + @Test + public void writeRpcHttpResponseHandlesBytesBody() throws Exception { + ByteArrayOutputStream captured = new ByteArrayOutputStream(); + HttpExchange exchange = mock(HttpExchange.class); + when(exchange.getResponseHeaders()).thenReturn(new Headers()); + when(exchange.getResponseBody()).thenReturn(captured); + + byte[] payload = new byte[]{0x01, 0x02, 0x03}; + RpcHttp response = RpcHttp.newBuilder() + .setStatusCode("200") + .setBody(TypedData.newBuilder().setBytes(ByteString.copyFrom(payload))) + .build(); + HttpBodyBridge.writeRpcHttpResponse(exchange, response); + + verify(exchange).sendResponseHeaders(200, payload.length); + assertArrayEquals(payload, captured.toByteArray()); + } + + @Test + public void writeRpcHttpResponseDefaultsInvalidStatusTo500() throws Exception { + ByteArrayOutputStream captured = new ByteArrayOutputStream(); + HttpExchange exchange = mock(HttpExchange.class); + when(exchange.getResponseHeaders()).thenReturn(new Headers()); + when(exchange.getResponseBody()).thenReturn(captured); + + RpcHttp response = RpcHttp.newBuilder().setStatusCode("not-a-number").build(); + HttpBodyBridge.writeRpcHttpResponse(exchange, response); + + verify(exchange).sendResponseHeaders(500, -1); + } + + @Test + public void writeErrorResponseWritesPlainText() throws Exception { + ByteArrayOutputStream captured = new ByteArrayOutputStream(); + Headers headers = new Headers(); + HttpExchange exchange = mock(HttpExchange.class); + when(exchange.getResponseHeaders()).thenReturn(headers); + when(exchange.getResponseBody()).thenReturn(captured); + + HttpBodyBridge.writeErrorResponse(exchange, 418, "I'm a teapot"); + + byte[] expected = "I'm a teapot".getBytes(StandardCharsets.UTF_8); + verify(exchange).sendResponseHeaders(418, expected.length); + assertArrayEquals(expected, captured.toByteArray()); + assertTrue(headers.getFirst("Content-Type").startsWith("text/plain")); + } + + @Test + public void writeStreamingResponseFromInputStreamUsesChunkedEncoding() throws Exception { + ByteArrayOutputStream captured = new ByteArrayOutputStream(); + Headers responseHeaders = new Headers(); + HttpExchange exchange = mock(HttpExchange.class); + when(exchange.getResponseHeaders()).thenReturn(responseHeaders); + when(exchange.getResponseBody()).thenReturn(captured); + + RpcHttp envelope = RpcHttp.newBuilder() + .setStatusCode("200") + .putHeaders("Content-Type", "text/event-stream") + .putHeaders("Cache-Control", "no-cache") + .build(); + byte[] payload = "data: one\n\ndata: two\n\n".getBytes(StandardCharsets.UTF_8); + HttpBodyBridge.writeStreamingResponse(exchange, envelope, new ByteArrayInputStream(payload)); + + // length=0 selects chunked transfer-encoding (or close-delimited for HTTP/1.0). + verify(exchange).sendResponseHeaders(200, 0); + assertArrayEquals(payload, captured.toByteArray()); + assertEquals("text/event-stream", responseHeaders.getFirst("Content-Type")); + assertEquals("no-cache", responseHeaders.getFirst("Cache-Control")); + } + + @Test + public void writeStreamingResponseFromInputStreamHandlesLargePayload() throws Exception { + // Larger than the 8KB read chunk; verifies the copy loop iterates correctly. + byte[] big = new byte[32_768]; + for (int i = 0; i < big.length; i++) { + big[i] = (byte) (i & 0xff); + } + ByteArrayOutputStream captured = new ByteArrayOutputStream(); + HttpExchange exchange = mock(HttpExchange.class); + when(exchange.getResponseHeaders()).thenReturn(new Headers()); + when(exchange.getResponseBody()).thenReturn(captured); + + RpcHttp envelope = RpcHttp.newBuilder().setStatusCode("200").build(); + HttpBodyBridge.writeStreamingResponse(exchange, envelope, new ByteArrayInputStream(big)); + + verify(exchange).sendResponseHeaders(200, 0); + assertArrayEquals(big, captured.toByteArray()); + } + + @Test + public void writeStreamingResponseFromInputStreamClosesSource() throws Exception { + boolean[] closed = new boolean[]{false}; + ByteArrayInputStream backing = new ByteArrayInputStream("x".getBytes(StandardCharsets.UTF_8)); + java.io.InputStream tracking = new java.io.FilterInputStream(backing) { + @Override + public void close() throws IOException { + closed[0] = true; + super.close(); + } + }; + HttpExchange exchange = mock(HttpExchange.class); + when(exchange.getResponseHeaders()).thenReturn(new Headers()); + when(exchange.getResponseBody()).thenReturn(new ByteArrayOutputStream()); + + HttpBodyBridge.writeStreamingResponse( + exchange, RpcHttp.newBuilder().setStatusCode("200").build(), tracking); + + assertTrue(closed[0], "Streaming InputStream body should be closed"); + } + + @Test + public void writeStreamingResponseFromIOConsumerInvokesWriterAndFlushes() throws Exception { + ByteArrayOutputStream captured = new ByteArrayOutputStream(); + Headers responseHeaders = new Headers(); + HttpExchange exchange = mock(HttpExchange.class); + when(exchange.getResponseHeaders()).thenReturn(responseHeaders); + when(exchange.getResponseBody()).thenReturn(captured); + + RpcHttp envelope = RpcHttp.newBuilder() + .setStatusCode("202") + .putHeaders("X-Trace", "abc") + .build(); + HttpBodyBridge.writeStreamingResponse(exchange, envelope, out -> { + out.write("chunk-1\n".getBytes(StandardCharsets.UTF_8)); + out.write("chunk-2\n".getBytes(StandardCharsets.UTF_8)); + }); + + verify(exchange).sendResponseHeaders(202, 0); + assertEquals("chunk-1\nchunk-2\n", captured.toString("UTF-8")); + assertEquals("abc", responseHeaders.getFirst("X-Trace")); + } + + @Test + public void writeStreamingResponseFromIOConsumerPropagatesIOException() { + HttpExchange exchange = mock(HttpExchange.class); + when(exchange.getResponseHeaders()).thenReturn(new Headers()); + when(exchange.getResponseBody()).thenReturn(new ByteArrayOutputStream()); + + IOException expected = new IOException("writer-failed"); + IOException thrown = org.junit.jupiter.api.Assertions.assertThrows(IOException.class, () -> + HttpBodyBridge.writeStreamingResponse( + exchange, + RpcHttp.newBuilder().setStatusCode("200").build(), + out -> { throw expected; })); + assertSame(expected, thrown); + } + + private static HttpExchange mockExchangeWithBody(byte[] body, String contentType) throws IOException { + HttpExchange exchange = mock(HttpExchange.class); + Headers headers = new Headers(); + if (contentType != null) { + headers.add("Content-Type", contentType); + } + when(exchange.getRequestHeaders()).thenReturn(headers); + when(exchange.getRequestBody()).thenReturn(new ByteArrayInputStream(body)); + return exchange; + } +} diff --git a/src/test/java/com/microsoft/azure/functions/worker/http/HttpInvocationCoordinatorTest.java b/src/test/java/com/microsoft/azure/functions/worker/http/HttpInvocationCoordinatorTest.java new file mode 100644 index 0000000..4fd35d3 --- /dev/null +++ b/src/test/java/com/microsoft/azure/functions/worker/http/HttpInvocationCoordinatorTest.java @@ -0,0 +1,149 @@ +package com.microsoft.azure.functions.worker.http; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; + +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import com.microsoft.azure.functions.rpc.messages.InvocationRequest; +import com.sun.net.httpserver.HttpExchange; + +import org.junit.jupiter.api.Test; + +public class HttpInvocationCoordinatorTest { + + private static final String INVOCATION_ID = "abc-123"; + + @Test + public void httpArrivesBeforeGrpc() throws Exception { + HttpInvocationCoordinator coordinator = new HttpInvocationCoordinator(); + HttpExchange exchange = mock(HttpExchange.class); + InvocationRequest request = InvocationRequest.newBuilder().setInvocationId(INVOCATION_ID).build(); + + HttpInvocationSlot httpSlot = coordinator.registerHttpArrival(INVOCATION_ID, exchange); + assertFalse(httpSlot.grpcArrival().isDone(), "gRPC future should still be pending before gRPC arrival"); + + HttpInvocationSlot grpcSlot = coordinator.registerGrpcArrival(request); + assertSame(httpSlot, grpcSlot, "both registrations should yield the same slot"); + assertTrue(grpcSlot.httpArrival().isDone(), "HTTP future should already be resolved once gRPC arrives"); + assertSame(exchange, grpcSlot.httpArrival().get(1, TimeUnit.SECONDS)); + assertSame(request, httpSlot.grpcArrival().get(1, TimeUnit.SECONDS)); + } + + @Test + public void grpcArrivesBeforeHttp() throws Exception { + HttpInvocationCoordinator coordinator = new HttpInvocationCoordinator(); + HttpExchange exchange = mock(HttpExchange.class); + InvocationRequest request = InvocationRequest.newBuilder().setInvocationId(INVOCATION_ID).build(); + + HttpInvocationSlot grpcSlot = coordinator.registerGrpcArrival(request); + assertFalse(grpcSlot.httpArrival().isDone(), "HTTP future should still be pending before HTTP arrival"); + + HttpInvocationSlot httpSlot = coordinator.registerHttpArrival(INVOCATION_ID, exchange); + assertSame(grpcSlot, httpSlot, "both registrations should yield the same slot"); + assertTrue(httpSlot.grpcArrival().isDone(), "gRPC future should already be resolved once HTTP arrives"); + assertSame(exchange, httpSlot.httpArrival().get(1, TimeUnit.SECONDS)); + assertSame(request, httpSlot.grpcArrival().get(1, TimeUnit.SECONDS)); + } + + @Test + public void releaseInvocationRemovesSlot() throws Exception { + HttpInvocationCoordinator coordinator = new HttpInvocationCoordinator(); + HttpExchange exchange = mock(HttpExchange.class); + coordinator.registerHttpArrival(INVOCATION_ID, exchange); + assertEquals(1, coordinator.activeInvocationCount()); + + coordinator.releaseInvocation(INVOCATION_ID); + assertEquals(0, coordinator.activeInvocationCount()); + } + + @Test + public void releaseInvocationIsIdempotent() { + HttpInvocationCoordinator coordinator = new HttpInvocationCoordinator(); + // Releasing an unknown invocation does not throw. + coordinator.releaseInvocation("unknown"); + coordinator.releaseInvocation("unknown"); + } + + @Test + public void failInvocationPropagatesToFutures() { + HttpInvocationCoordinator coordinator = new HttpInvocationCoordinator(); + HttpExchange exchange = mock(HttpExchange.class); + HttpInvocationSlot slot = coordinator.registerHttpArrival(INVOCATION_ID, exchange); + + IOException cause = new IOException("boom"); + coordinator.failInvocation(INVOCATION_ID, cause); + + ExecutionException ex = assertThrows(ExecutionException.class, + () -> slot.grpcArrival().get(1, TimeUnit.SECONDS)); + assertSame(cause, ex.getCause()); + ExecutionException completionEx = assertThrows(ExecutionException.class, + () -> slot.completion().get(1, TimeUnit.SECONDS)); + assertSame(cause, completionEx.getCause()); + assertEquals(0, coordinator.activeInvocationCount()); + } + + @Test + public void duplicateHttpArrivalThrows() { + HttpInvocationCoordinator coordinator = new HttpInvocationCoordinator(); + HttpExchange exchange = mock(HttpExchange.class); + coordinator.registerHttpArrival(INVOCATION_ID, exchange); + assertThrows(IllegalStateException.class, + () -> coordinator.registerHttpArrival(INVOCATION_ID, exchange)); + } + + @Test + public void duplicateGrpcArrivalThrows() { + HttpInvocationCoordinator coordinator = new HttpInvocationCoordinator(); + InvocationRequest request = InvocationRequest.newBuilder().setInvocationId(INVOCATION_ID).build(); + coordinator.registerGrpcArrival(request); + assertThrows(IllegalStateException.class, + () -> coordinator.registerGrpcArrival(request)); + } + + @Test + public void independentInvocationsDoNotInterfere() throws Exception { + HttpInvocationCoordinator coordinator = new HttpInvocationCoordinator(); + HttpExchange exchangeA = mock(HttpExchange.class); + HttpExchange exchangeB = mock(HttpExchange.class); + InvocationRequest reqA = InvocationRequest.newBuilder().setInvocationId("a").build(); + InvocationRequest reqB = InvocationRequest.newBuilder().setInvocationId("b").build(); + + HttpInvocationSlot slotA = coordinator.registerHttpArrival("a", exchangeA); + HttpInvocationSlot slotB = coordinator.registerHttpArrival("b", exchangeB); + // Resolve only A; B must still be pending. + coordinator.registerGrpcArrival(reqA); + assertTrue(slotA.grpcArrival().isDone()); + assertFalse(slotB.grpcArrival().isDone()); + + coordinator.registerGrpcArrival(reqB); + assertSame(reqA, slotA.grpcArrival().get(1, TimeUnit.SECONDS)); + assertSame(reqB, slotB.grpcArrival().get(1, TimeUnit.SECONDS)); + } + + @Test + public void grpcFutureRemainsPendingUntilHttpArrives() { + HttpInvocationCoordinator coordinator = new HttpInvocationCoordinator(); + InvocationRequest request = InvocationRequest.newBuilder().setInvocationId(INVOCATION_ID).build(); + HttpInvocationSlot slot = coordinator.registerGrpcArrival(request); + // No HTTP arrival; future must time out. + assertThrows(TimeoutException.class, () -> slot.httpArrival().get(50, TimeUnit.MILLISECONDS)); + } + + @Test + public void completionFutureResolvesOnRelease() throws Exception { + HttpInvocationCoordinator coordinator = new HttpInvocationCoordinator(); + HttpExchange exchange = mock(HttpExchange.class); + HttpInvocationSlot slot = coordinator.registerHttpArrival(INVOCATION_ID, exchange); + assertFalse(slot.completion().isDone()); + coordinator.releaseInvocation(INVOCATION_ID); + slot.completion().get(1, TimeUnit.SECONDS); // resolves without throwing + } +} diff --git a/src/test/java/com/microsoft/azure/functions/worker/http/HttpProxyHandlerTest.java b/src/test/java/com/microsoft/azure/functions/worker/http/HttpProxyHandlerTest.java new file mode 100644 index 0000000..42e9dd6 --- /dev/null +++ b/src/test/java/com/microsoft/azure/functions/worker/http/HttpProxyHandlerTest.java @@ -0,0 +1,129 @@ +package com.microsoft.azure.functions.worker.http; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.ByteArrayOutputStream; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; + +import com.sun.net.httpserver.Headers; +import com.sun.net.httpserver.HttpExchange; + +import org.junit.jupiter.api.Test; + +public class HttpProxyHandlerTest { + + private static final String INVOCATION_ID = "abc-123"; + + @Test + public void rejectsRequestWithoutInvocationIdHeader() throws Exception { + HttpInvocationCoordinator coordinator = new HttpInvocationCoordinator(); + HttpProxyHandler handler = new HttpProxyHandler(coordinator); + ByteArrayOutputStream captured = new ByteArrayOutputStream(); + HttpExchange exchange = mock(HttpExchange.class); + when(exchange.getRequestHeaders()).thenReturn(new Headers()); + when(exchange.getResponseHeaders()).thenReturn(new Headers()); + when(exchange.getResponseBody()).thenReturn(captured); + + handler.handle(exchange); + + verify(exchange).sendResponseHeaders(400, captured.size()); + verify(exchange).close(); + assertEquals(0, coordinator.activeInvocationCount()); + } + + @Test + public void registersHttpArrivalAndWaitsForCompletion() throws Exception { + HttpInvocationCoordinator coordinator = new HttpInvocationCoordinator(); + HttpProxyHandler handler = new HttpProxyHandler(coordinator); + HttpExchange exchange = mock(HttpExchange.class); + Headers requestHeaders = new Headers(); + requestHeaders.add(HttpProxyHandler.INVOCATION_ID_HEADER, INVOCATION_ID); + when(exchange.getRequestHeaders()).thenReturn(requestHeaders); + + AtomicReference handlerError = new AtomicReference<>(); + CompletableFuture handlerDone = CompletableFuture.runAsync(() -> { + try { + handler.handle(exchange); + } catch (Throwable t) { + handlerError.set(t); + } + }); + + // Wait for the handler to register HTTP arrival. + long deadline = System.currentTimeMillis() + 1000; + while (coordinator.activeInvocationCount() == 0 && System.currentTimeMillis() < deadline) { + Thread.sleep(10); + } + assertEquals(1, coordinator.activeInvocationCount()); + + // Simulate the gRPC side finishing the invocation. + coordinator.releaseInvocation(INVOCATION_ID); + + handlerDone.get(); + assertEquals(null, handlerError.get()); + verify(exchange).close(); + // The handler must NOT have written any error response - the gRPC side owns the body. + verify(exchange, never()).sendResponseHeaders(anyInt(), anyLong()); + } + + @Test + public void respondsWith500WhenInvocationFails() throws Exception { + HttpInvocationCoordinator coordinator = new HttpInvocationCoordinator(); + HttpProxyHandler handler = new HttpProxyHandler(coordinator); + ByteArrayOutputStream captured = new ByteArrayOutputStream(); + HttpExchange exchange = mock(HttpExchange.class); + Headers requestHeaders = new Headers(); + requestHeaders.add(HttpProxyHandler.INVOCATION_ID_HEADER, INVOCATION_ID); + when(exchange.getRequestHeaders()).thenReturn(requestHeaders); + when(exchange.getResponseHeaders()).thenReturn(new Headers()); + when(exchange.getResponseBody()).thenReturn(captured); + + CompletableFuture handlerDone = CompletableFuture.runAsync(() -> { + try { + handler.handle(exchange); + } catch (Exception ignored) { + } + }); + + long deadline = System.currentTimeMillis() + 1000; + while (coordinator.activeInvocationCount() == 0 && System.currentTimeMillis() < deadline) { + Thread.sleep(10); + } + coordinator.failInvocation(INVOCATION_ID, new RuntimeException("user fn crashed")); + + handlerDone.get(); + verify(exchange).sendResponseHeaders(500, captured.size()); + verify(exchange).close(); + assertTrue(new String(captured.toByteArray()).contains("user fn crashed")); + } + + @Test + public void duplicateRegistrationReturns409() throws Exception { + HttpInvocationCoordinator coordinator = new HttpInvocationCoordinator(); + HttpProxyHandler handler = new HttpProxyHandler(coordinator); + // Pre-register HTTP arrival to force a duplicate on the next handle() call. + HttpExchange first = mock(HttpExchange.class); + coordinator.registerHttpArrival(INVOCATION_ID, first); + + ByteArrayOutputStream captured = new ByteArrayOutputStream(); + HttpExchange second = mock(HttpExchange.class); + Headers headers = new Headers(); + headers.add(HttpProxyHandler.INVOCATION_ID_HEADER, INVOCATION_ID); + when(second.getRequestHeaders()).thenReturn(headers); + when(second.getResponseHeaders()).thenReturn(new Headers()); + when(second.getResponseBody()).thenReturn(captured); + + handler.handle(second); + + verify(second).sendResponseHeaders(409, captured.size()); + verify(second).close(); + } +} diff --git a/src/test/java/com/microsoft/azure/functions/worker/http/HttpProxyServerTest.java b/src/test/java/com/microsoft/azure/functions/worker/http/HttpProxyServerTest.java new file mode 100644 index 0000000..f9f04a3 --- /dev/null +++ b/src/test/java/com/microsoft/azure/functions/worker/http/HttpProxyServerTest.java @@ -0,0 +1,115 @@ +package com.microsoft.azure.functions.worker.http; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.atomic.AtomicReference; + +import com.sun.net.httpserver.HttpHandler; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +public class HttpProxyServerTest { + + private HttpProxyServer server; + + @AfterEach + public void tearDown() { + if (server != null) { + server.close(); + server = null; + } + } + + @Test + public void startBindsToEphemeralPortAndReturnsUri() throws Exception { + server = new HttpProxyServer(ProxyConfig.defaults()); + String uri = server.start(noOpHandler()); + assertNotNull(uri); + assertTrue(uri.startsWith("http://127.0.0.1:"), "Expected loopback URI, got " + uri); + URI parsed = URI.create(uri); + assertTrue(parsed.getPort() > 0, "Expected a real port number, got " + parsed.getPort()); + assertEquals(uri, server.getBoundUri()); + } + + @Test + public void getBoundUriReturnsNullBeforeStart() { + server = new HttpProxyServer(ProxyConfig.defaults()); + assertNull(server.getBoundUri()); + } + + @Test + public void closeBeforeStartIsNoop() { + server = new HttpProxyServer(ProxyConfig.defaults()); + server.close(); + assertNull(server.getBoundUri()); + } + + @Test + public void doubleStartThrows() throws Exception { + server = new HttpProxyServer(ProxyConfig.defaults()); + server.start(noOpHandler()); + assertThrows(IllegalStateException.class, () -> server.start(noOpHandler())); + } + + @Test + public void routesIncomingRequestToHandler() throws Exception { + server = new HttpProxyServer(ProxyConfig.defaults()); + AtomicReference seenPath = new AtomicReference<>(); + AtomicReference seenHeader = new AtomicReference<>(); + String uri = server.start(exchange -> { + seenPath.set(exchange.getRequestURI().getPath()); + seenHeader.set(exchange.getRequestHeaders().getFirst("x-ms-invocation-id")); + byte[] body = "hello".getBytes(StandardCharsets.UTF_8); + exchange.sendResponseHeaders(200, body.length); + try (OutputStream os = exchange.getResponseBody()) { + os.write(body); + } + }); + + HttpURLConnection conn = (HttpURLConnection) URI.create(uri + "/some/route").toURL().openConnection(); + conn.setRequestProperty("x-ms-invocation-id", "test-123"); + conn.connect(); + try { + assertEquals(200, conn.getResponseCode()); + try (BufferedReader reader = new BufferedReader( + new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8))) { + assertEquals("hello", reader.readLine()); + } + } finally { + conn.disconnect(); + } + assertEquals("/some/route", seenPath.get()); + assertEquals("test-123", seenHeader.get()); + } + + @Test + public void closeStopsAcceptingConnections() throws Exception { + server = new HttpProxyServer(ProxyConfig.defaults()); + String uri = server.start(noOpHandler()); + server.close(); + server = null; + HttpURLConnection conn = (HttpURLConnection) URI.create(uri + "/").toURL().openConnection(); + conn.setConnectTimeout(500); + conn.setReadTimeout(500); + // After close, the next connect attempt must fail (connection refused). + assertThrows(Exception.class, conn::connect); + } + + private static HttpHandler noOpHandler() { + return exchange -> { + exchange.sendResponseHeaders(204, -1); + exchange.close(); + }; + } +}