Skip to content

Commit b3fc942

Browse files
committed
Add Go AMQP 1.0 tutorial port in go-amqp
Introduce runnable examples aligned with the existing Go amqp091 tutorials, using rabbitmq-amqp-go-client, plus README and test-tutorials.sh smoke script. Made-with: Cursor
1 parent d8dddec commit b3fc942

19 files changed

Lines changed: 1370 additions & 0 deletions

go-amqp/.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
pkg/*
2+
src/*

go-amqp/README.md

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# Go code for RabbitMQ tutorials (AMQP 1.0)
2+
3+
4+
Here you can find Go code examples from [RabbitMQ tutorials](https://www.rabbitmq.com/getstarted.html), using the AMQP 1.0 client.
5+
6+
7+
## Requirements
8+
9+
These examples use the [`rabbitmq-amqp-go-client`](https://github.com/rabbitmq/rabbitmq-amqp-go-client) library for RabbitMQ 4.x. Get it with
10+
11+
go get github.com/rabbitmq/rabbitmq-amqp-go-client
12+
13+
A RabbitMQ node must be running on `localhost` with the default port (`5672`) and credentials (`guest` / `guest`).
14+
15+
16+
## Code
17+
18+
Run each example from this directory with `go run`:
19+
20+
[Tutorial one: "Hello World!"](https://www.rabbitmq.com/tutorials/tutorial-one-go.html):
21+
22+
go run send.go
23+
go run receive.go
24+
25+
[Tutorial two: Work Queues](https://www.rabbitmq.com/tutorials/tutorial-two-go.html):
26+
27+
go run new_task.go hello world
28+
go run worker.go
29+
30+
[Tutorial three: Publish/Subscribe](https://www.rabbitmq.com/tutorials/tutorial-three-go.html)
31+
32+
go run receive_logs.go
33+
go run emit_log.go hello world
34+
35+
[Tutorial four: Routing](https://www.rabbitmq.com/tutorials/tutorial-four-go.html)
36+
37+
go run receive_logs_direct.go info warn
38+
go run emit_log_direct.go warn "a warning"
39+
40+
[Tutorial five: Topics](https://www.rabbitmq.com/tutorials/tutorial-five-go.html)
41+
42+
go run receive_logs_topic.go "kern.*" "*.critical"
43+
go run emit_log_topic.go kern.critical "A critical kernel error"
44+
45+
[Tutorial six: RPC](https://www.rabbitmq.com/tutorials/tutorial-six-go.html)
46+
47+
go run rpc_server.go
48+
go run rpc_client.go 10
49+
50+
[Publisher confirms](https://www.rabbitmq.com/tutorials/tutorial-seven-java.html) (AMQP 1.0 publish outcomes)
51+
52+
go run publisher_confirms.go
53+
54+
[AMQP 1.0 Direct Reply-To RPC](https://www.rabbitmq.com/docs/next/direct-reply-to)
55+
56+
go run rpc_amqp10.go
57+
58+
To learn more, see the [package documentation](https://pkg.go.dev/github.com/rabbitmq/rabbitmq-amqp-go-client) and [AMQP in RabbitMQ](https://www.rabbitmq.com/docs/amqp).

go-amqp/emit_log.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"log"
6+
"os"
7+
"strings"
8+
9+
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
10+
)
11+
12+
const brokerURI = "amqp://guest:guest@localhost:5672/"
13+
14+
func main() {
15+
ctx := context.Background()
16+
env := rmq.NewEnvironment(brokerURI, nil)
17+
conn, err := env.NewConnection(ctx)
18+
if err != nil {
19+
log.Panicf("Failed to connect to RabbitMQ: %v", err)
20+
}
21+
defer func() {
22+
_ = env.CloseConnections(context.Background())
23+
}()
24+
25+
_, err = conn.Management().DeclareExchange(ctx, &rmq.FanOutExchangeSpecification{Name: "logs"})
26+
if err != nil {
27+
log.Panicf("Failed to declare an exchange: %v", err)
28+
}
29+
30+
publisher, err := conn.NewPublisher(ctx, &rmq.ExchangeAddress{Exchange: "logs", Key: ""}, nil)
31+
if err != nil {
32+
log.Panicf("Failed to create publisher: %v", err)
33+
}
34+
defer func() { _ = publisher.Close(context.Background()) }()
35+
36+
body := bodyFrom(os.Args)
37+
res, err := publisher.Publish(ctx, rmq.NewMessage([]byte(body)))
38+
if err != nil {
39+
log.Panicf("Failed to publish a message: %v", err)
40+
}
41+
switch res.Outcome.(type) {
42+
case *rmq.StateAccepted:
43+
default:
44+
log.Panicf("Unexpected publish outcome: %v", res.Outcome)
45+
}
46+
log.Printf(" [x] Sent %s", body)
47+
}
48+
49+
func bodyFrom(args []string) string {
50+
var s string
51+
if (len(args) < 2) || args[1] == "" {
52+
s = "hello"
53+
} else {
54+
s = strings.Join(args[1:], " ")
55+
}
56+
return s
57+
}

go-amqp/emit_log_direct.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"log"
6+
"os"
7+
"strings"
8+
9+
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
10+
)
11+
12+
const brokerURI = "amqp://guest:guest@localhost:5672/"
13+
14+
func main() {
15+
ctx := context.Background()
16+
env := rmq.NewEnvironment(brokerURI, nil)
17+
conn, err := env.NewConnection(ctx)
18+
if err != nil {
19+
log.Panicf("Failed to connect to RabbitMQ: %v", err)
20+
}
21+
defer func() {
22+
_ = env.CloseConnections(context.Background())
23+
}()
24+
25+
_, err = conn.Management().DeclareExchange(ctx, &rmq.DirectExchangeSpecification{Name: "logs_direct"})
26+
if err != nil {
27+
log.Panicf("Failed to declare an exchange: %v", err)
28+
}
29+
30+
publisher, err := conn.NewPublisher(ctx, &rmq.ExchangeAddress{
31+
Exchange: "logs_direct",
32+
Key: severityFrom(os.Args),
33+
}, nil)
34+
if err != nil {
35+
log.Panicf("Failed to create publisher: %v", err)
36+
}
37+
defer func() { _ = publisher.Close(context.Background()) }()
38+
39+
body := bodyFrom(os.Args)
40+
res, err := publisher.Publish(ctx, rmq.NewMessage([]byte(body)))
41+
if err != nil {
42+
log.Panicf("Failed to publish a message: %v", err)
43+
}
44+
switch res.Outcome.(type) {
45+
case *rmq.StateAccepted:
46+
default:
47+
log.Panicf("Unexpected publish outcome: %v", res.Outcome)
48+
}
49+
log.Printf(" [x] Sent %s", body)
50+
}
51+
52+
func bodyFrom(args []string) string {
53+
var s string
54+
if (len(args) < 3) || args[2] == "" {
55+
s = "hello"
56+
} else {
57+
s = strings.Join(args[2:], " ")
58+
}
59+
return s
60+
}
61+
62+
func severityFrom(args []string) string {
63+
var s string
64+
if (len(args) < 2) || args[1] == "" {
65+
s = "info"
66+
} else {
67+
s = args[1]
68+
}
69+
return s
70+
}

go-amqp/emit_log_topic.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"log"
6+
"os"
7+
"strings"
8+
9+
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
10+
)
11+
12+
const brokerURI = "amqp://guest:guest@localhost:5672/"
13+
14+
func main() {
15+
ctx := context.Background()
16+
env := rmq.NewEnvironment(brokerURI, nil)
17+
conn, err := env.NewConnection(ctx)
18+
if err != nil {
19+
log.Panicf("Failed to connect to RabbitMQ: %v", err)
20+
}
21+
defer func() {
22+
_ = env.CloseConnections(context.Background())
23+
}()
24+
25+
_, err = conn.Management().DeclareExchange(ctx, &rmq.TopicExchangeSpecification{Name: "logs_topic"})
26+
if err != nil {
27+
log.Panicf("Failed to declare an exchange: %v", err)
28+
}
29+
30+
publisher, err := conn.NewPublisher(ctx, &rmq.ExchangeAddress{
31+
Exchange: "logs_topic",
32+
Key: severityFrom(os.Args),
33+
}, nil)
34+
if err != nil {
35+
log.Panicf("Failed to create publisher: %v", err)
36+
}
37+
defer func() { _ = publisher.Close(context.Background()) }()
38+
39+
body := bodyFrom(os.Args)
40+
res, err := publisher.Publish(ctx, rmq.NewMessage([]byte(body)))
41+
if err != nil {
42+
log.Panicf("Failed to publish a message: %v", err)
43+
}
44+
switch res.Outcome.(type) {
45+
case *rmq.StateAccepted:
46+
default:
47+
log.Panicf("Unexpected publish outcome: %v", res.Outcome)
48+
}
49+
log.Printf(" [x] Sent %s", body)
50+
}
51+
52+
func bodyFrom(args []string) string {
53+
var s string
54+
if (len(args) < 3) || args[2] == "" {
55+
s = "hello"
56+
} else {
57+
s = strings.Join(args[2:], " ")
58+
}
59+
return s
60+
}
61+
62+
func severityFrom(args []string) string {
63+
var s string
64+
if (len(args) < 2) || args[1] == "" {
65+
s = "anonymous.info"
66+
} else {
67+
s = args[1]
68+
}
69+
return s
70+
}

go-amqp/go.mod

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
module github.com/rabbitmq/rabbitmq-tutorials/go-amqp
2+
3+
go 1.24.0
4+
5+
require (
6+
github.com/Azure/go-amqp v1.5.1
7+
github.com/rabbitmq/rabbitmq-amqp-go-client v0.7.0
8+
)
9+
10+
require (
11+
github.com/cespare/xxhash/v2 v2.3.0 // indirect
12+
github.com/google/uuid v1.6.0 // indirect
13+
github.com/gorilla/websocket v1.5.3 // indirect
14+
go.opentelemetry.io/otel v1.40.0 // indirect
15+
go.opentelemetry.io/otel/metric v1.40.0 // indirect
16+
)

go-amqp/go.sum

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
github.com/Azure/go-amqp v1.5.1 h1:WyiPTz2C3zVvDL7RLAqwWdeoYhMtX62MZzQoP09fzsU=
2+
github.com/Azure/go-amqp v1.5.1/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE=
3+
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
4+
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
5+
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
6+
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
7+
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
8+
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
9+
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
10+
github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
11+
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
12+
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
13+
github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
14+
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
15+
github.com/golang-jwt/jwt/v5 v5.2.2 h1:Rl4B7itRWVtYIHFrSNd7vhTiz9UpLdi6gZhZ3wEeDy8=
16+
github.com/golang-jwt/jwt/v5 v5.2.2/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
17+
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
18+
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
19+
github.com/google/pprof v0.0.0-20250403155104-27863c87afa6 h1:BHT72Gu3keYf3ZEu2J0b1vyeLSOYI8bm5wbJM/8yDe8=
20+
github.com/google/pprof v0.0.0-20250403155104-27863c87afa6/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA=
21+
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
22+
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
23+
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
24+
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
25+
github.com/onsi/ginkgo/v2 v2.23.4 h1:ktYTpKJAVZnDT4VjxSbiBenUjmlL/5QkBEocaWXiQus=
26+
github.com/onsi/ginkgo/v2 v2.23.4/go.mod h1:Bt66ApGPBFzHyR+JO10Zbt0Gsp4uWxu5mIOTusL46e8=
27+
github.com/onsi/gomega v1.38.0 h1:c/WX+w8SLAinvuKKQFh77WEucCnPk4j2OTUr7lt7BeY=
28+
github.com/onsi/gomega v1.38.0/go.mod h1:OcXcwId0b9QsE7Y49u+BTrL4IdKOBOKnD6VQNTJEB6o=
29+
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
30+
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
31+
github.com/rabbitmq/rabbitmq-amqp-go-client v0.7.0 h1:mQnrLwgZbYwf6CdRJ2l2kTf8cbn9j4AyBJM9tXnwebg=
32+
github.com/rabbitmq/rabbitmq-amqp-go-client v0.7.0/go.mod h1:u2HAJ0fUFyayNcIvTGCTlpmdPKHrKO9Xl81FbiEzYO4=
33+
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
34+
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
35+
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
36+
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
37+
go.opentelemetry.io/otel v1.40.0 h1:oA5YeOcpRTXq6NN7frwmwFR0Cn3RhTVZvXsP4duvCms=
38+
go.opentelemetry.io/otel v1.40.0/go.mod h1:IMb+uXZUKkMXdPddhwAHm6UfOwJyh4ct1ybIlV14J0g=
39+
go.opentelemetry.io/otel/metric v1.40.0 h1:rcZe317KPftE2rstWIBitCdVp89A2HqjkxR3c11+p9g=
40+
go.opentelemetry.io/otel/metric v1.40.0/go.mod h1:ib/crwQH7N3r5kfiBZQbwrTge743UDc7DTFVZrrXnqc=
41+
go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZYblVjw=
42+
go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA=
43+
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
44+
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
45+
golang.org/x/net v0.41.0 h1:vBTly1HeNPEn3wtREYfy4GZ/NECgw2Cnl+nK6Nz3uvw=
46+
golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA=
47+
golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ=
48+
golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
49+
golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M=
50+
golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA=
51+
golang.org/x/tools v0.33.0 h1:4qz2S3zmRxbGIhDIAgjxvFutSvH5EfnsYrRBj0UI0bc=
52+
golang.org/x/tools v0.33.0/go.mod h1:CIJMaWEY88juyUfo7UbgPqbC8rU2OqfAV1h2Qp0oMYI=
53+
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
54+
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

go-amqp/new_task.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"log"
6+
"os"
7+
"strings"
8+
9+
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
10+
)
11+
12+
const brokerURI = "amqp://guest:guest@localhost:5672/"
13+
14+
func main() {
15+
ctx := context.Background()
16+
env := rmq.NewEnvironment(brokerURI, nil)
17+
conn, err := env.NewConnection(ctx)
18+
if err != nil {
19+
log.Panicf("Failed to connect to RabbitMQ: %v", err)
20+
}
21+
defer func() {
22+
_ = env.CloseConnections(context.Background())
23+
}()
24+
25+
_, err = conn.Management().DeclareQueue(ctx, &rmq.QuorumQueueSpecification{Name: "task_queue"})
26+
if err != nil {
27+
log.Panicf("Failed to declare a queue: %v", err)
28+
}
29+
30+
publisher, err := conn.NewPublisher(ctx, &rmq.QueueAddress{Queue: "task_queue"}, nil)
31+
if err != nil {
32+
log.Panicf("Failed to create publisher: %v", err)
33+
}
34+
defer func() { _ = publisher.Close(context.Background()) }()
35+
36+
body := bodyFrom(os.Args)
37+
res, err := publisher.Publish(ctx, rmq.NewMessage([]byte(body)))
38+
if err != nil {
39+
log.Panicf("Failed to publish a message: %v", err)
40+
}
41+
switch res.Outcome.(type) {
42+
case *rmq.StateAccepted:
43+
default:
44+
log.Panicf("Unexpected publish outcome: %v", res.Outcome)
45+
}
46+
log.Printf(" [x] Sent %s", body)
47+
}
48+
49+
func bodyFrom(args []string) string {
50+
var s string
51+
if (len(args) < 2) || args[1] == "" {
52+
s = "hello"
53+
} else {
54+
s = strings.Join(args[1:], " ")
55+
}
56+
return s
57+
}

0 commit comments

Comments
 (0)