Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,19 @@ dependencies {
implementation("io.grpc", "grpc-protobuf", Versions.gRPCVersion)
implementation("io.grpc", "grpc-stub", Versions.gRPCVersion)
implementation("javax.annotation", "javax.annotation-api", "1.3.2")

// Test deps — pinned to versions still compatible with the Java 8 source target.
testImplementation("org.junit.jupiter:junit-jupiter:5.10.5")
testImplementation("org.mockito:mockito-core:4.11.0")
testImplementation("org.awaitility:awaitility:4.2.2")
testImplementation("io.netty", "netty-transport", Versions.nettyVersion)
testImplementation("io.netty", "netty-codec", Versions.nettyVersion)
testImplementation("com.squareup.okhttp3:mockwebserver:4.9.3")
testRuntimeOnly("org.junit.platform:junit-platform-launcher")
}

tasks.test {
useJUnitPlatform()
}

// present on all platforms
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
packetHandlers.getPacketHandlers(msg.getClass())) {

Object res = consumer.apply(ctx, msg, toServer);
if (!res.equals(msg)) {
if (res != msg) {
packet = res;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) {
packetHandlers.getPacketHandlers(msg.getClass())) {

Object res = consumer.apply(ctx, msg, toServer);
if (!res.equals(msg)) {
if (res != msg) {
packet = res;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,9 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import lombok.AccessLevel;
import lombok.Getter;

Expand Down Expand Up @@ -65,10 +64,12 @@ public void shutdown() {
}
}

// Both registries are mutated from Netty I/O threads (per-channel) and iterated
// concurrently, so they must be thread-safe.
@Getter(AccessLevel.PROTECTED)
private final Set<Channel> injectedClients = new HashSet<>();
private final Set<Channel> injectedClients = ConcurrentHashMap.newKeySet();

private final Map<Class<?>, InjectorAddon> addons = new HashMap<>();
private final Map<Class<?>, InjectorAddon> addons = new ConcurrentHashMap<>();

protected boolean addInjectedClient(Channel channel) {
return injectedClients.add(channel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
*/
@RequiredArgsConstructor
public final class LocalSession {
private static final int CONNECTION_TIMEOUT = (int) Duration.ofSeconds(30).toMillis();
private static final int CONNECTION_TIMEOUT = (int) Duration.ofSeconds(10).toMillis();

private static DefaultEventLoopGroup DEFAULT_EVENT_LOOP_GROUP;
private static EventLoopGroup PLATFORM_EVENT_LOOP_GROUP; // Platform-specific event loop group
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,47 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor
class TunnelHandler implements Handler {
private final ConnectLogger logger;
private final Channel downstreamServerConn; // local server connection

// Coalesces flushes across an EventLoop tick: one flush() per batch of
// onReceive calls instead of one per packet. The CAS lives inside the
// write task so the flush is always enqueued after the write that needs
// it — scheduling the CAS outside the EventLoop races, because a later
// write can be enqueued behind an already-scheduled flush.
private final AtomicBoolean flushScheduled = new AtomicBoolean(false);

@Override
public void onReceive(byte[] data) {
// TunnelService -> local session server -> downstream server
ByteBuf buf = Unpooled.wrappedBuffer(data);
downstreamServerConn.writeAndFlush(buf);
// TunnelService -> local session server -> downstream server.
// Allocate the ByteBuf inside the lambda so it isn't leaked if execute()
// rejects (event loop shutting down during proxy stop).
Channel ch = downstreamServerConn;
EventLoop el = ch.eventLoop();
try {
el.execute(() -> {
ch.write(Unpooled.wrappedBuffer(data), ch.voidPromise());
if (flushScheduled.compareAndSet(false, true)) {
try {
el.execute(() -> {
flushScheduled.set(false);
ch.flush();
});
} catch (RejectedExecutionException ignored) {
flushScheduled.set(false);
}
}
});
} catch (RejectedExecutionException ignored) {
// Event loop is shutting down; the channel is going away anyway.
}
}

@Override
Expand All @@ -63,7 +92,20 @@ public void onError(Throwable t) {

@Override
public void onClose() {
// disconnect from downstream server
downstreamServerConn.close();
// Flush before closing: deferred writes from onReceive() may still be
// sitting in the channel's outbound buffer with the flush scheduled as
// a separate EventLoop task, so closing without a final flush can drop
// the last payload.
Channel ch = downstreamServerConn;
try {
ch.eventLoop().execute(() -> {
ch.flush();
ch.close();
});
} catch (RejectedExecutionException ignored) {
// Event loop already shut down: close directly. Netty's close is
// thread-safe and a no-op on an already-closed channel.
ch.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,25 @@
import com.minekube.connect.api.packet.PacketHandlers;
import com.minekube.connect.api.util.TriFunction;
import io.netty.channel.ChannelHandlerContext;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import lombok.AllArgsConstructor;
import lombok.Getter;

public final class PacketHandlersImpl implements PacketHandlers {
private final Map<PacketHandler, List<HandlerEntry>> handlers = new HashMap<>();
private final Set<TriFunction<ChannelHandlerContext, Object, Boolean, Object>> globalPacketHandlers = new HashSet<>();
private final Map<Class<?>, Set<TriFunction<ChannelHandlerContext, Object, Boolean, Object>>> packetHandlers = new HashMap<>();
// CopyOnWriteArraySet for the per-class fanout: reads happen on every packet
// (hot path, must be lock-free); writes only on register/deregister.
private final Map<PacketHandler, List<HandlerEntry>> handlers = new ConcurrentHashMap<>();
private final Set<TriFunction<ChannelHandlerContext, Object, Boolean, Object>> globalPacketHandlers =
new CopyOnWriteArraySet<>();
private final Map<Class<?>, Set<TriFunction<ChannelHandlerContext, Object, Boolean, Object>>> packetHandlers =
new ConcurrentHashMap<>();

@Override
public void register(
Expand All @@ -55,10 +59,10 @@ public void register(
return;
}

handlers.computeIfAbsent(handler, $ -> new ArrayList<>())
handlers.computeIfAbsent(handler, $ -> new CopyOnWriteArrayList<>())
.add(new HandlerEntry(packetClass, consumer));

packetHandlers.computeIfAbsent(packetClass, $ -> new HashSet<>(globalPacketHandlers))
packetHandlers.computeIfAbsent(packetClass, $ -> new CopyOnWriteArraySet<>(globalPacketHandlers))
.add(consumer);
}

Expand All @@ -70,7 +74,7 @@ public void registerAll(PacketHandler handler) {

TriFunction<ChannelHandlerContext, Object, Boolean, Object> packetHandler = handler::handle;

handlers.computeIfAbsent(handler, $ -> new ArrayList<>())
handlers.computeIfAbsent(handler, $ -> new CopyOnWriteArrayList<>())
.add(new HandlerEntry(null, packetHandler));

globalPacketHandlers.add(packetHandler);
Expand All @@ -88,13 +92,19 @@ public void deregister(PacketHandler handler) {
List<HandlerEntry> values = handlers.remove(handler);
if (values != null) {
for (HandlerEntry value : values) {
Set<?> handlers = packetHandlers.get(value.getPacket());

if (handlers != null) {
handlers.removeIf(o -> o.equals(value.getHandler()));
if (handlers.isEmpty()) {
packetHandlers.remove(value.getPacket());
}
// registerAll() stores HandlerEntry with packetClass == null.
// ConcurrentHashMap rejects null keys, so skip the per-class
// lookup for global handlers (the old HashMap returned null
// silently for the same case).
Class<?> packetClass = value.getPacket();
if (packetClass != null) {
// computeIfPresent atomically removes the entry only if it's
// still empty after our removal, so a concurrent register()
// that re-populates the set in between isn't dropped.
packetHandlers.computeIfPresent(packetClass, (k, set) -> {
set.removeIf(o -> o.equals(value.getHandler()));
return set.isEmpty() ? null : set;
});
}

globalPacketHandlers.remove(value.getHandler());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public abstract class CommandUtil {

public @NonNull Collection<String> getOnlineUsernames() {
Collection<String> usernames = new ArrayList<>();
getOnlinePlayers().forEach(this::getUsernameFromSource);
getOnlinePlayers().forEach(player -> usernames.add(getUsernameFromSource(player)));
return usernames;
}

Expand Down
Loading
Loading