Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
237 changes: 139 additions & 98 deletions sentry/src/main/java/io/sentry/SentryExecutorService.java
Original file line number Diff line number Diff line change
@@ -1,166 +1,207 @@
package io.sentry;

import io.sentry.util.AutoClosableReentrantLock;
import java.util.PriorityQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

/**
* Custom {@link ISentryExecutorService} backed by a single daemon worker thread and a {@link
* PriorityQueue} pre-allocated to {@link #INITIAL_QUEUE_CAPACITY}.
*
* <p>Because the backing array is sized at construction time, no array resize ever occurs during
* normal SDK operation, and {@link #prewarm()} is a no-op.
*/
@ApiStatus.Internal
public final class SentryExecutorService implements ISentryExecutorService {

/**
* ScheduledThreadPoolExecutor grows work queue by 50% each time. With the initial capacity of 16
* it will have to resize 4 times to reach 40, which is a decent middle-ground for prewarming.
* This will prevent from growing in unexpected areas of the SDK.
* Pre-allocated initial capacity for the task queue. Sized to comfortably exceed the maximum
* number of tasks the SDK queues concurrently so the backing array never needs to grow at
* runtime.
*/
private static final int INITIAL_QUEUE_SIZE = 40;
static final int INITIAL_QUEUE_CAPACITY = 64;

/**
* By default, the work queue is unbounded so it can grow as much as the memory allows. We want to
* limit it by 271 which would be x8 times growth from the default initial capacity.
* Hard limit on the number of pending tasks. Tasks submitted beyond this limit are silently
* dropped and a cancelled {@link Future} is returned.
*/
private static final int MAX_QUEUE_SIZE = 271;

private final @NotNull ScheduledThreadPoolExecutor executorService;
private final @NotNull AutoClosableReentrantLock lock = new AutoClosableReentrantLock();

@SuppressWarnings("UnnecessaryLambda")
private final @NotNull Runnable dummyRunnable = () -> {};

private final @NotNull PriorityQueue<ScheduledTask<?>> queue;
private final @NotNull Object lock = new Object();
private final @NotNull Thread workerThread;
private final @Nullable SentryOptions options;
private volatile boolean closed = false;

@TestOnly
SentryExecutorService(
final @NotNull ScheduledThreadPoolExecutor executorService,
final @Nullable SentryOptions options) {
this.executorService = executorService;
this.options = options;
public SentryExecutorService() {
this(null);
}

public SentryExecutorService(final @Nullable SentryOptions options) {
this(new ScheduledThreadPoolExecutor(1, new SentryExecutorServiceThreadFactory()), options);
}

public SentryExecutorService() {
this(new ScheduledThreadPoolExecutor(1, new SentryExecutorServiceThreadFactory()), null);
this.options = options;
this.queue = new PriorityQueue<>(INITIAL_QUEUE_CAPACITY);
this.workerThread = new Thread(this::loop, "SentryExecutorService");
this.workerThread.setDaemon(true);
this.workerThread.start();
}

private boolean isQueueAvailable() {
// If limit is reached, purge cancelled tasks from the queue
if (executorService.getQueue().size() >= MAX_QUEUE_SIZE) {
executorService.purge();
private void loop() {
while (!closed) {
ScheduledTask<?> task = null;
synchronized (lock) {
while (!closed) {
final ScheduledTask<?> head = queue.peek();
if (head == null) {
try {
lock.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
} else {
final long delayNs = head.triggerTimeNs - System.nanoTime();
if (delayNs <= 0L) {
task = queue.poll();
break;
} else {
// Sleep until the task is due, or until a new earlier task wakes us.
final long delayMs = Math.max(1L, delayNs / 1_000_000L);
try {
lock.wait(delayMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}
}
}
// Execute outside the lock so producers are not blocked during task execution.
if (task != null && !task.isCancelled()) {
task.run();
}
}
// Check limit again after purge
return executorService.getQueue().size() < MAX_QUEUE_SIZE;
}

@Override
public @NotNull Future<?> submit(final @NotNull Runnable runnable)
throws RejectedExecutionException {
if (isQueueAvailable()) {
return executorService.submit(runnable);
}
if (options != null) {
options
.getLogger()
.log(SentryLevel.WARNING, "Task " + runnable + " rejected from " + executorService);
}
return new CancelledFuture<>();
return submit(Executors.callable(runnable, (Void) null));
}

@Override
public @NotNull <T> Future<T> submit(final @NotNull Callable<T> callable)
throws RejectedExecutionException {
if (isQueueAvailable()) {
return executorService.submit(callable);
}
if (options != null) {
options
.getLogger()
.log(SentryLevel.WARNING, "Task " + callable + " rejected from " + executorService);
synchronized (lock) {
if (closed) {
return new CancelledFuture<>();
}
if (queue.size() >= MAX_QUEUE_SIZE) {
// Purge cancelled tasks before declaring the queue full.
queue.removeIf(ScheduledTask::isCancelled);
}
if (queue.size() >= MAX_QUEUE_SIZE) {
if (options != null) {
options
.getLogger()
.log(
SentryLevel.WARNING,
"Task " + callable + " rejected from SentryExecutorService: queue full");
}
return new CancelledFuture<>();
}
final ScheduledTask<T> task = new ScheduledTask<>(callable, System.nanoTime());
queue.offer(task);
lock.notifyAll();
return task;
}
return new CancelledFuture<>();
}

@Override
public @NotNull Future<?> schedule(final @NotNull Runnable runnable, final long delayMillis)
throws RejectedExecutionException {
return executorService.schedule(runnable, delayMillis, TimeUnit.MILLISECONDS);
synchronized (lock) {
if (closed) {
return new CancelledFuture<>();
}
final ScheduledTask<?> task =
new ScheduledTask<>(
Executors.callable(runnable, (Void) null),
System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(delayMillis));
queue.offer(task);
// Wake the worker only if this task is now the earliest — avoids spurious wakeups.
final ScheduledTask<?> head = queue.peek();
if (head == task) {
lock.notifyAll();
}
return task;
}
}

@Override
public void close(final long timeoutMillis) {
try (final @NotNull ISentryLifecycleToken ignored = lock.acquire()) {
if (!executorService.isShutdown()) {
executorService.shutdown();
try {
if (!executorService.awaitTermination(timeoutMillis, TimeUnit.MILLISECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
synchronized (lock) {
if (closed) {
return;
}
closed = true;
queue.clear();
lock.notifyAll();
}
try {
workerThread.join(timeoutMillis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (workerThread.isAlive()) {
workerThread.interrupt();
}
}

@Override
public boolean isClosed() {
try (final @NotNull ISentryLifecycleToken ignored = lock.acquire()) {
return executorService.isShutdown();
}
return closed;
}

@SuppressWarnings({"FutureReturnValueIgnored"})
/**
* No-op. The task queue is pre-allocated at construction time ({@link #INITIAL_QUEUE_CAPACITY}
* slots), so no warm-up is required.
*
* @deprecated Pre-allocation makes this unnecessary. Will be removed in a future release.
*/
@Override
public void prewarm() {
try {
executorService.submit(
() -> {
try {
// schedule a bunch of dummy runnables in the future that will never execute to
// trigger
// queue growth and then purge the queue
for (int i = 0; i < INITIAL_QUEUE_SIZE; i++) {
final Future<?> future =
executorService.schedule(dummyRunnable, 365L, TimeUnit.DAYS);
future.cancel(true);
}
executorService.purge();
} catch (RejectedExecutionException ignored) {
// ignore
}
});
} catch (RejectedExecutionException e) {
if (options != null) {
options
.getLogger()
.log(SentryLevel.WARNING, "Prewarm task rejected from " + executorService, e);
}
}
}
@Deprecated
public void prewarm() {}

private static final class SentryExecutorServiceThreadFactory implements ThreadFactory {
private int cnt;
// ---- internals ----

private static final class ScheduledTask<T> extends FutureTask<T>
implements Comparable<ScheduledTask<?>> {

/** Absolute trigger time in nanoseconds ({@link System#nanoTime()}). */
final long triggerTimeNs;

ScheduledTask(final @NotNull Callable<T> callable, final long triggerTimeNs) {
super(callable);
this.triggerTimeNs = triggerTimeNs;
}

@Override
public @NotNull Thread newThread(final @NotNull Runnable r) {
final Thread ret = new Thread(r, "SentryExecutorServiceThreadFactory-" + cnt++);
ret.setDaemon(true);
return ret;
public int compareTo(final @NotNull ScheduledTask<?> other) {
return Long.compare(this.triggerTimeNs, other.triggerTimeNs);
}
}

private static final class CancelledFuture<T> implements Future<T> {

@Override
public boolean cancel(final boolean mayInterruptIfRunning) {
return true;
Expand Down
Loading