Skip to content

Commit 0a97dfe

Browse files
Ruby: adopt Bunny 3.1.0, port tutorial 7 (publisher confirms)
Now that Bunny `3.x.` provides publisher confirm tracking and backpressure just like .NET, Swift 6 clients do.
1 parent 2244a2c commit 0a97dfe

4 files changed

Lines changed: 123 additions & 9 deletions

File tree

ruby/Gemfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
source 'https://rubygems.org'
22

3-
ruby ">= 2.5.0"
3+
ruby ">= 3.1.0"
44

5-
gem "bunny", ">= 2.14.2", " < 3.0"
5+
gem "bunny", "~> 3.1.0"

ruby/Gemfile.lock

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,24 @@
11
GEM
22
remote: https://rubygems.org/
33
specs:
4-
amq-protocol (2.3.0)
5-
bunny (2.14.2)
6-
amq-protocol (~> 2.3, >= 2.3.0)
4+
amq-protocol (2.7.0)
5+
bunny (3.1.0)
6+
amq-protocol (~> 2.7)
7+
logger (~> 1, >= 1.7)
8+
sorted_set (~> 1, >= 1.0.2)
9+
logger (1.7.0)
10+
rbtree (0.4.6)
11+
sorted_set (1.1.0)
12+
rbtree
713

814
PLATFORMS
915
ruby
1016

1117
DEPENDENCIES
12-
bunny (>= 2.14.2, < 3.0)
18+
bunny (~> 3.1.0)
1319

1420
RUBY VERSION
15-
ruby 2.5.1p57
21+
ruby 3.1.0
1622

1723
BUNDLED WITH
18-
1.17.2
24+
2.5.23

ruby/README.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ Here you can find Ruby code examples from
55

66
## Requirements
77

8-
These tutorials require Ruby 2.2 or later, [Bundler](https://bundler.io/) and [Bunny](http://rubybunny.info) to be installed.
8+
These tutorials require Ruby 3.1.0 or later, [Bundler](https://bundler.io/) and [Bunny](http://rubybunny.info) to be installed.
99

1010
To install Bunny with Bundler, do
1111

@@ -58,4 +58,10 @@ bundle exec ruby rpc_server.rb
5858
bundle exec ruby rpc_client.rb
5959
```
6060

61+
[Tutorial seven: Publisher Confirms](https://www.rabbitmq.com/tutorials/tutorial-seven-ruby.html)
62+
63+
``` sh
64+
bundle exec ruby publisher_confirms.rb
65+
```
66+
6167
To learn more, see [Bunny documentation](http://rubybunny.info).

ruby/publisher_confirms.rb

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
#!/usr/bin/env ruby
2+
require 'bunny'
3+
4+
MESSAGE_COUNT = 50_000
5+
6+
def publish_messages_individually(connection)
7+
channel = connection.create_channel
8+
queue = channel.queue('', exclusive: true)
9+
10+
channel.confirm_select(tracking: true)
11+
12+
start_time = Time.now
13+
MESSAGE_COUNT.times do |i|
14+
body = i.to_s
15+
channel.basic_publish(body, '', queue.name)
16+
end
17+
18+
# Wait for any remaining confirmations
19+
channel.wait_for_confirms
20+
end_time = Time.now
21+
22+
puts "Published #{MESSAGE_COUNT} messages individually in #{((end_time - start_time) * 1000).to_i} ms"
23+
ensure
24+
channel.close if channel && channel.open?
25+
end
26+
27+
def publish_messages_in_batch(connection)
28+
channel = connection.create_channel
29+
queue = channel.queue('', exclusive: true)
30+
31+
channel.confirm_select(tracking: true)
32+
33+
batch_size = 1000
34+
start_time = Time.now
35+
36+
(0...MESSAGE_COUNT).each_slice(batch_size) do |batch|
37+
messages = batch.map { |i| i.to_s }
38+
channel.basic_publish_batch(messages, '', queue.name)
39+
end
40+
41+
# Wait for any remaining confirmations
42+
channel.wait_for_confirms
43+
end_time = Time.now
44+
45+
puts "Published #{MESSAGE_COUNT} messages in batch in #{((end_time - start_time) * 1000).to_i} ms"
46+
ensure
47+
channel.close if channel && channel.open?
48+
end
49+
50+
def handle_publish_confirms_asynchronously(connection)
51+
channel = connection.create_channel
52+
queue = channel.queue('', exclusive: true)
53+
54+
outstanding_confirms = {}
55+
# A mutex is necessary because the confirm callbacks are executed in a separate thread
56+
confirms_mutex = Mutex.new
57+
58+
channel.confirm_select do |delivery_tag, multiple, nack|
59+
confirms_mutex.synchronize do
60+
if multiple
61+
outstanding_confirms.reject! { |k, _| k <= delivery_tag }
62+
else
63+
outstanding_confirms.delete(delivery_tag)
64+
end
65+
end
66+
if nack
67+
puts "Message with delivery tag #{delivery_tag} was nacked!"
68+
end
69+
end
70+
71+
start_time = Time.now
72+
MESSAGE_COUNT.times do |i|
73+
body = i.to_s
74+
seq_no = channel.next_publish_seq_no
75+
confirms_mutex.synchronize do
76+
outstanding_confirms[seq_no] = body
77+
end
78+
channel.basic_publish(body, '', queue.name)
79+
end
80+
81+
# Wait for any remaining confirmations
82+
channel.wait_for_confirms
83+
end_time = Time.now
84+
85+
puts "Published #{MESSAGE_COUNT} messages and handled confirms asynchronously in #{((end_time - start_time) * 1000).to_i} ms"
86+
ensure
87+
channel.close if channel && channel.open?
88+
end
89+
90+
begin
91+
connection = Bunny.new
92+
connection.start
93+
94+
publish_messages_individually(connection)
95+
publish_messages_in_batch(connection)
96+
handle_publish_confirms_asynchronously(connection)
97+
rescue Interrupt => _
98+
connection.close
99+
exit(0)
100+
ensure
101+
connection.close if connection
102+
end

0 commit comments

Comments
 (0)