Skip to content

Commit f9114c3

Browse files
Merge pull request #677 from rabbitmq/lukebakken/java-tutorial-confirmation-window
Add example code to Java publisher confirms tutorial
2 parents 0a0d15d + dcd72b3 commit f9114c3

1 file changed

Lines changed: 124 additions & 0 deletions

File tree

java/PublisherConfirms.java

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.rabbitmq.client.ConnectionFactory;
55

66
import java.time.Duration;
7+
import java.util.LinkedList;
78
import java.util.UUID;
89
import java.util.concurrent.ConcurrentNavigableMap;
910
import java.util.concurrent.ConcurrentSkipListMap;
@@ -21,10 +22,16 @@ static Connection createConnection() throws Exception {
2122
return cf.newConnection();
2223
}
2324

25+
static final int MAX_OUTSTANDING = 1000; // Confirmation window
26+
static final int THROTTLING_PERCENTAGE = 50; // Start throttling at 50% capacity
27+
static final int MAX_DELAY_MS = 1000; // Maximum delay in milliseconds
28+
2429
public static void main(String[] args) throws Exception {
2530
publishMessagesIndividually();
2631
publishMessagesInBatch();
2732
handlePublishConfirmsAsynchronously();
33+
handlePublishConfirmsWithWindow();
34+
handlePublishConfirmsWithAdaptiveThrottling();
2835
}
2936

3037
static void publishMessagesIndividually() throws Exception {
@@ -125,6 +132,123 @@ static void handlePublishConfirmsAsynchronously() throws Exception {
125132
}
126133
}
127134

135+
static void handlePublishConfirmsWithWindow() throws Exception {
136+
try (Connection connection = createConnection()) {
137+
Channel ch = connection.createChannel();
138+
139+
String queue = UUID.randomUUID().toString();
140+
ch.queueDeclare(queue, false, false, true, null);
141+
ch.confirmSelect();
142+
143+
ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
144+
145+
ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> {
146+
if (multiple) {
147+
outstandingConfirms.headMap(sequenceNumber, true).clear();
148+
} else {
149+
outstandingConfirms.remove(sequenceNumber);
150+
}
151+
synchronized (outstandingConfirms) {
152+
outstandingConfirms.notifyAll();
153+
}
154+
};
155+
156+
ch.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> {
157+
System.err.format("Message nacked. Sequence: %d, multiple: %b%n", sequenceNumber, multiple);
158+
cleanOutstandingConfirms.handle(sequenceNumber, multiple);
159+
});
160+
161+
long start = System.nanoTime();
162+
for (int i = 0; i < MESSAGE_COUNT; i++) {
163+
// Wait if window is full
164+
synchronized (outstandingConfirms) {
165+
while (outstandingConfirms.size() >= MAX_OUTSTANDING) {
166+
outstandingConfirms.wait();
167+
}
168+
}
169+
170+
String body = String.valueOf(i);
171+
outstandingConfirms.put(ch.getNextPublishSeqNo(), body);
172+
ch.basicPublish("", queue, null, body.getBytes());
173+
}
174+
175+
// Wait for remaining confirmations
176+
synchronized (outstandingConfirms) {
177+
while (!outstandingConfirms.isEmpty()) {
178+
outstandingConfirms.wait();
179+
}
180+
}
181+
182+
long end = System.nanoTime();
183+
System.out.format("Published %,d messages with confirmation window in %,d ms%n",
184+
MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
185+
}
186+
}
187+
188+
static void handlePublishConfirmsWithAdaptiveThrottling() throws Exception {
189+
try (Connection connection = createConnection()) {
190+
Channel ch = connection.createChannel();
191+
192+
String queue = UUID.randomUUID().toString();
193+
ch.queueDeclare(queue, false, false, true, null);
194+
ch.confirmSelect();
195+
196+
LinkedList<Long> outstandingConfirms = new LinkedList<>();
197+
int throttlingThreshold = MAX_OUTSTANDING * THROTTLING_PERCENTAGE / 100;
198+
199+
ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> {
200+
synchronized (outstandingConfirms) {
201+
if (multiple) {
202+
outstandingConfirms.removeIf(seqNo -> seqNo <= sequenceNumber);
203+
} else {
204+
outstandingConfirms.removeFirstOccurrence(sequenceNumber);
205+
}
206+
outstandingConfirms.notifyAll();
207+
}
208+
};
209+
210+
ch.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> {
211+
System.err.format("Message nacked. Sequence: %d, multiple: %b%n", sequenceNumber, multiple);
212+
cleanOutstandingConfirms.handle(sequenceNumber, multiple);
213+
});
214+
215+
long start = System.nanoTime();
216+
for (int i = 0; i < MESSAGE_COUNT; i++) {
217+
String body = String.valueOf(i);
218+
219+
synchronized (outstandingConfirms) {
220+
while (outstandingConfirms.size() >= MAX_OUTSTANDING) {
221+
outstandingConfirms.wait();
222+
}
223+
224+
int availablePermits = MAX_OUTSTANDING - outstandingConfirms.size();
225+
if (availablePermits < throttlingThreshold) {
226+
double percentageUsed = 1.0 - (availablePermits / (double) MAX_OUTSTANDING);
227+
int delay = (int) (percentageUsed * MAX_DELAY_MS);
228+
if (delay > 0) {
229+
outstandingConfirms.wait(delay);
230+
}
231+
}
232+
233+
long seqNo = ch.getNextPublishSeqNo();
234+
outstandingConfirms.addLast(seqNo);
235+
}
236+
237+
ch.basicPublish("", queue, null, body.getBytes());
238+
}
239+
240+
synchronized (outstandingConfirms) {
241+
while (!outstandingConfirms.isEmpty()) {
242+
outstandingConfirms.wait();
243+
}
244+
}
245+
246+
long end = System.nanoTime();
247+
System.out.format("Published %,d messages with adaptive throttling in %,d ms%n",
248+
MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
249+
}
250+
}
251+
128252
static boolean waitUntil(Duration timeout, BooleanSupplier condition) throws InterruptedException {
129253
int waited = 0;
130254
while (!condition.getAsBoolean() && waited < timeout.toMillis()) {

0 commit comments

Comments
 (0)