44import com .rabbitmq .client .ConnectionFactory ;
55
66import java .time .Duration ;
7+ import java .util .LinkedList ;
78import java .util .UUID ;
89import java .util .concurrent .ConcurrentNavigableMap ;
910import java .util .concurrent .ConcurrentSkipListMap ;
@@ -21,10 +22,16 @@ static Connection createConnection() throws Exception {
2122 return cf .newConnection ();
2223 }
2324
25+ static final int MAX_OUTSTANDING = 1000 ; // Confirmation window
26+ static final int THROTTLING_PERCENTAGE = 50 ; // Start throttling at 50% capacity
27+ static final int MAX_DELAY_MS = 1000 ; // Maximum delay in milliseconds
28+
2429 public static void main (String [] args ) throws Exception {
2530 publishMessagesIndividually ();
2631 publishMessagesInBatch ();
2732 handlePublishConfirmsAsynchronously ();
33+ handlePublishConfirmsWithWindow ();
34+ handlePublishConfirmsWithAdaptiveThrottling ();
2835 }
2936
3037 static void publishMessagesIndividually () throws Exception {
@@ -125,6 +132,123 @@ static void handlePublishConfirmsAsynchronously() throws Exception {
125132 }
126133 }
127134
135+ static void handlePublishConfirmsWithWindow () throws Exception {
136+ try (Connection connection = createConnection ()) {
137+ Channel ch = connection .createChannel ();
138+
139+ String queue = UUID .randomUUID ().toString ();
140+ ch .queueDeclare (queue , false , false , true , null );
141+ ch .confirmSelect ();
142+
143+ ConcurrentNavigableMap <Long , String > outstandingConfirms = new ConcurrentSkipListMap <>();
144+
145+ ConfirmCallback cleanOutstandingConfirms = (sequenceNumber , multiple ) -> {
146+ if (multiple ) {
147+ outstandingConfirms .headMap (sequenceNumber , true ).clear ();
148+ } else {
149+ outstandingConfirms .remove (sequenceNumber );
150+ }
151+ synchronized (outstandingConfirms ) {
152+ outstandingConfirms .notifyAll ();
153+ }
154+ };
155+
156+ ch .addConfirmListener (cleanOutstandingConfirms , (sequenceNumber , multiple ) -> {
157+ System .err .format ("Message nacked. Sequence: %d, multiple: %b%n" , sequenceNumber , multiple );
158+ cleanOutstandingConfirms .handle (sequenceNumber , multiple );
159+ });
160+
161+ long start = System .nanoTime ();
162+ for (int i = 0 ; i < MESSAGE_COUNT ; i ++) {
163+ // Wait if window is full
164+ synchronized (outstandingConfirms ) {
165+ while (outstandingConfirms .size () >= MAX_OUTSTANDING ) {
166+ outstandingConfirms .wait ();
167+ }
168+ }
169+
170+ String body = String .valueOf (i );
171+ outstandingConfirms .put (ch .getNextPublishSeqNo (), body );
172+ ch .basicPublish ("" , queue , null , body .getBytes ());
173+ }
174+
175+ // Wait for remaining confirmations
176+ synchronized (outstandingConfirms ) {
177+ while (!outstandingConfirms .isEmpty ()) {
178+ outstandingConfirms .wait ();
179+ }
180+ }
181+
182+ long end = System .nanoTime ();
183+ System .out .format ("Published %,d messages with confirmation window in %,d ms%n" ,
184+ MESSAGE_COUNT , Duration .ofNanos (end - start ).toMillis ());
185+ }
186+ }
187+
188+ static void handlePublishConfirmsWithAdaptiveThrottling () throws Exception {
189+ try (Connection connection = createConnection ()) {
190+ Channel ch = connection .createChannel ();
191+
192+ String queue = UUID .randomUUID ().toString ();
193+ ch .queueDeclare (queue , false , false , true , null );
194+ ch .confirmSelect ();
195+
196+ LinkedList <Long > outstandingConfirms = new LinkedList <>();
197+ int throttlingThreshold = MAX_OUTSTANDING * THROTTLING_PERCENTAGE / 100 ;
198+
199+ ConfirmCallback cleanOutstandingConfirms = (sequenceNumber , multiple ) -> {
200+ synchronized (outstandingConfirms ) {
201+ if (multiple ) {
202+ outstandingConfirms .removeIf (seqNo -> seqNo <= sequenceNumber );
203+ } else {
204+ outstandingConfirms .removeFirstOccurrence (sequenceNumber );
205+ }
206+ outstandingConfirms .notifyAll ();
207+ }
208+ };
209+
210+ ch .addConfirmListener (cleanOutstandingConfirms , (sequenceNumber , multiple ) -> {
211+ System .err .format ("Message nacked. Sequence: %d, multiple: %b%n" , sequenceNumber , multiple );
212+ cleanOutstandingConfirms .handle (sequenceNumber , multiple );
213+ });
214+
215+ long start = System .nanoTime ();
216+ for (int i = 0 ; i < MESSAGE_COUNT ; i ++) {
217+ String body = String .valueOf (i );
218+
219+ synchronized (outstandingConfirms ) {
220+ while (outstandingConfirms .size () >= MAX_OUTSTANDING ) {
221+ outstandingConfirms .wait ();
222+ }
223+
224+ int availablePermits = MAX_OUTSTANDING - outstandingConfirms .size ();
225+ if (availablePermits < throttlingThreshold ) {
226+ double percentageUsed = 1.0 - (availablePermits / (double ) MAX_OUTSTANDING );
227+ int delay = (int ) (percentageUsed * MAX_DELAY_MS );
228+ if (delay > 0 ) {
229+ outstandingConfirms .wait (delay );
230+ }
231+ }
232+
233+ long seqNo = ch .getNextPublishSeqNo ();
234+ outstandingConfirms .addLast (seqNo );
235+ }
236+
237+ ch .basicPublish ("" , queue , null , body .getBytes ());
238+ }
239+
240+ synchronized (outstandingConfirms ) {
241+ while (!outstandingConfirms .isEmpty ()) {
242+ outstandingConfirms .wait ();
243+ }
244+ }
245+
246+ long end = System .nanoTime ();
247+ System .out .format ("Published %,d messages with adaptive throttling in %,d ms%n" ,
248+ MESSAGE_COUNT , Duration .ofNanos (end - start ).toMillis ());
249+ }
250+ }
251+
128252 static boolean waitUntil (Duration timeout , BooleanSupplier condition ) throws InterruptedException {
129253 int waited = 0 ;
130254 while (!condition .getAsBoolean () && waited < timeout .toMillis ()) {
0 commit comments