From 8aef66e2c325caafb25a8bc806d0e4346c1f9c1f Mon Sep 17 00:00:00 2001 From: SJM526 Date: Fri, 12 Jun 2026 01:46:05 +0800 Subject: [PATCH] Add asynchronous kernel execution API --- src/main/java/com/aparapi/Execution.java | 91 +++++++++++++++++++ src/main/java/com/aparapi/Kernel.java | 74 +++++++++++++++ .../com/aparapi/runtime/ExecutionTest.java | 85 +++++++++++++++++ 3 files changed, 250 insertions(+) create mode 100644 src/main/java/com/aparapi/Execution.java create mode 100644 src/test/java/com/aparapi/runtime/ExecutionTest.java diff --git a/src/main/java/com/aparapi/Execution.java b/src/main/java/com/aparapi/Execution.java new file mode 100644 index 00000000..0531f3d0 --- /dev/null +++ b/src/main/java/com/aparapi/Execution.java @@ -0,0 +1,91 @@ +/** + * Copyright (c) 2016 - 2018 Syncleus, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.aparapi; + +/** + * Handle returned by asynchronous kernel executions. + */ +public final class Execution { + private final Thread thread; + private volatile Throwable failure; + + private Execution(final Runnable task, String threadName) { + thread = new Thread(new Runnable() { + @Override + public void run() { + try { + task.run(); + } catch (Throwable t) { + failure = t; + } + } + }, threadName); + } + + static Execution start(final Runnable task, String threadName) { + Execution execution = new Execution(task, threadName); + execution.thread.start(); + return execution; + } + + /** + * Wait until the asynchronous kernel execution has completed. + * + * @return this execution handle + */ + public Execution waitUntilFinished() { + boolean interrupted = false; + while (true) { + try { + thread.join(); + break; + } catch (InterruptedException e) { + interrupted = true; + } + } + if (interrupted) { + Thread.currentThread().interrupt(); + } + throwFailureIfAny(); + return this; + } + + /** + * @return true if the asynchronous execution has completed + */ + public boolean isFinished() { + return !thread.isAlive(); + } + + /** + * @return true if the asynchronous execution completed with an exception or error + */ + public boolean hasFailed() { + return failure != null; + } + + private void throwFailureIfAny() { + if (failure instanceof RuntimeException) { + throw (RuntimeException) failure; + } + if (failure instanceof Error) { + throw (Error) failure; + } + if (failure != null) { + throw new RuntimeException(failure); + } + } +} diff --git a/src/main/java/com/aparapi/Kernel.java b/src/main/java/com/aparapi/Kernel.java index 4b9686db..cefbda0e 100644 --- a/src/main/java/com/aparapi/Kernel.java +++ b/src/main/java/com/aparapi/Kernel.java @@ -2804,6 +2804,20 @@ public synchronized Kernel execute(Range _range) { return (execute(_range, 1)); } + /** + * Start asynchronous execution of _range kernels. + *

+ * This method returns immediately with an {@link Execution} handle. Call + * {@link Execution#waitUntilFinished()} to wait for completion and rethrow any exception + * raised during execution. + * + * @param _range The number of Kernels that we would like to initiate. + * @return an execution handle for the asynchronous kernel execution + */ + public Execution executeAsync(final Range _range) { + return executeAsync(_range, 1); + } + @Override @SuppressWarnings("deprecation") public String toString() { @@ -2839,6 +2853,16 @@ public synchronized Kernel execute(int _range) { return (execute(createRange(_range), 1)); } + /** + * Start asynchronous execution of _range kernels. + * + * @param _range The number of Kernels that we would like to initiate. + * @return an execution handle for the asynchronous kernel execution + */ + public Execution executeAsync(final int _range) { + return executeAsync(createRange(_range), 1); + } + @SuppressWarnings("deprecation") protected Range createRange(int _range) { if (executionMode.equals(EXECUTION_MODE.AUTO)) { @@ -2864,6 +2888,17 @@ public synchronized Kernel execute(Range _range, int _passes) { return (execute("run", _range, _passes)); } + /** + * Start asynchronous execution of _passes iterations of _range kernels. + * + * @param _range The number of Kernels that we would like to initiate. + * @param _passes The number of passes to make. + * @return an execution handle for the asynchronous kernel execution + */ + public Execution executeAsync(final Range _range, final int _passes) { + return executeAsync("run", _range, _passes); + } + /** * Start execution of _passes iterations over the _range of kernels. *

@@ -2879,6 +2914,17 @@ public synchronized Kernel execute(int _range, int _passes) { return (execute(createRange(_range), _passes)); } + /** + * Start asynchronous execution of _passes iterations of _range kernels. + * + * @param _range The number of Kernels that we would like to initiate. + * @param _passes The number of passes to make. + * @return an execution handle for the asynchronous kernel execution + */ + public Execution executeAsync(final int _range, final int _passes) { + return executeAsync(createRange(_range), _passes); + } + /** * Start execution of globalSize kernels for the given entrypoint. *

@@ -2893,6 +2939,17 @@ public synchronized Kernel execute(String _entrypoint, Range _range) { return (execute(_entrypoint, _range, 1)); } + /** + * Start asynchronous execution of globalSize kernels for the given entrypoint. + * + * @param _entrypoint is the name of the method we wish to use as the entrypoint to the kernel + * @param _range The number of Kernels that we would like to initiate. + * @return an execution handle for the asynchronous kernel execution + */ + public Execution executeAsync(final String _entrypoint, final Range _range) { + return executeAsync(_entrypoint, _range, 1); + } + /** * Start execution of globalSize kernels for the given entrypoint. *

@@ -2907,6 +2964,23 @@ public synchronized Kernel execute(String _entrypoint, Range _range, int _passes return prepareKernelRunner().execute(_entrypoint, _range, _passes); } + /** + * Start asynchronous execution of globalSize kernels for the given entrypoint. + * + * @param _entrypoint is the name of the method we wish to use as the entrypoint to the kernel + * @param _range The number of Kernels that we would like to initiate. + * @param _passes The number of passes to make. + * @return an execution handle for the asynchronous kernel execution + */ + public Execution executeAsync(final String _entrypoint, final Range _range, final int _passes) { + return Execution.start(new Runnable() { + @Override + public void run() { + execute(_entrypoint, _range, _passes); + } + }, Reflection.getSimpleName(getClass()) + "-aparapi-execution"); + } + /** * Force pre-compilation of the kernel for a given device, without executing it. * diff --git a/src/test/java/com/aparapi/runtime/ExecutionTest.java b/src/test/java/com/aparapi/runtime/ExecutionTest.java new file mode 100644 index 00000000..31ca13d5 --- /dev/null +++ b/src/test/java/com/aparapi/runtime/ExecutionTest.java @@ -0,0 +1,85 @@ +/** + * Copyright (c) 2016 - 2018 Syncleus, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.aparapi.runtime; + +import com.aparapi.Execution; +import com.aparapi.Kernel; +import org.junit.Test; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class ExecutionTest { + + @Test + public void executeAsyncReturnsBeforeKernelCompletesAndWaitsForResults() { + final int size = 8; + final int[] values = new int[size]; + final AtomicBoolean releaseKernel = new AtomicBoolean(false); + final AtomicInteger started = new AtomicInteger(0); + Kernel kernel = new Kernel() { + @Override + public void run() { + started.incrementAndGet(); + while (!releaseKernel.get()) { + Thread.yield(); + } + values[getGlobalId()] = getGlobalId() + 1; + } + }; + kernel.setExecutionMode(Kernel.EXECUTION_MODE.SEQ); + + Execution execution = kernel.executeAsync(size); + while (started.get() == 0) { + Thread.yield(); + } + + assertFalse(execution.isFinished()); + releaseKernel.set(true); + execution.waitUntilFinished(); + + assertTrue(execution.isFinished()); + assertFalse(execution.hasFailed()); + assertArrayEquals(new int[] {1, 2, 3, 4, 5, 6, 7, 8}, values); + } + + @Test + public void waitUntilFinishedRethrowsRuntimeExceptionFromKernel() { + Kernel kernel = new Kernel() { + @Override + public void run() { + throw new IllegalStateException("expected failure"); + } + }; + kernel.setExecutionMode(Kernel.EXECUTION_MODE.SEQ); + + Execution execution = kernel.executeAsync(1); + + try { + execution.waitUntilFinished(); + fail("Expected waitUntilFinished to rethrow the kernel failure"); + } catch (IllegalStateException e) { + assertTrue(e.getMessage().contains("expected failure")); + } + assertTrue(execution.isFinished()); + assertTrue(execution.hasFailed()); + } +}