前言
代码实现为go语言版本,rabbitmq的guest用户只能本地登陆,远程需要单独创建用户
项目初始化
首先初始化一个go mod项目
1
2
3
| mkdir rabbitmq_study
cd rabbitmq_study
go mod init rabbitmq_study
|
安装RabbitMQ的Go语言客户端库:
1
| go get github.com/rabbitmq/amqp091-go
|
代码实现
连接RabbitMQ服务器
1
2
3
4
5
| conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
|
创建一个通道
1
2
3
4
5
| ch, err := conn.Channel()
if err != nil {
log.Fatal(err)
}
defer ch.Close()
|
声明一个持久化的队列
1
2
3
4
5
6
7
8
9
10
11
| queue, err := ch.QueueDeclare(
"simple_queue", // 队列名
true, // 是否持久化
false, // 是否自动删除
false, // 是否独占,排它
false, // 是否不等待,丢弃结果
nil, // 其他参数
)
if err != nil {
log.Fatal(err)
}
|
发送消息
普通消息模型的生产者和消费者为点对点模式,两者交互只需要一个消息队列,不需要交换机和路由键系列的绑定
1
2
3
4
5
6
7
8
9
10
11
12
| err = ch.PublishWithContext(ctx,
"", // 交换机名,普通模式没有交换机
queue.Name, // 路由键名,普通模式没有路由键,此处为队列名
false, // 是否强制,失败需要通知
false, // 是否立即发送给能够接收消息的消费者
amqp.Publishing{
ContentType: "text/plain",
Body: []byte("Hello World!"),
})
if err != nil {
log.Fatal(err)
}
|
获取一个消息接收器
1
2
3
4
5
6
7
8
9
10
11
12
| msgs, err := ch.Consume(
"simple_queue", // 队列名
"", // 消费者标识
true, // 自动应答
false, // 是否独占,排它
false, // 是否只消费远程消息
false, // 是否不等待,丢弃结果
nil, // 其他参数
)
if err != nil {
log.Fatal(err)
}
|
不断读取消息
1
2
3
4
5
6
| for msg := range msgs {
// 处理消息
fmt.Println(string(msg.Body))
// 如果消息接收器不是自动应答,就需要手动ack,第二个参数表示是否批量应答
// ch.Ack(msg.DeliveryTag, false)
}
|
完整代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
| package main
import (
"context"
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
"log"
"time"
)
func init() {
log.SetFlags(log.Flags() | log.Llongfile)
}
func main() {
// 连接RabbitMQ服务器
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
// 创建一个通道
ch, err := conn.Channel()
if err != nil {
log.Fatal(err)
}
defer ch.Close()
// 声明一个持久化的队列
queue, err := ch.QueueDeclare(
"simple_queue", // 队列名
true, // 是否持久化
false, // 是否自动删除
false, // 是否独占,排它
false, // 是否不等待,丢弃结果
nil, // 其他参数
)
if err != nil {
log.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 发送消息
err = ch.PublishWithContext(ctx,
"", // 交换机名,普通模式没有交换机
queue.Name, // 路由键名,普通模式没有路由键,此处为队列名
false, // 是否强制,失败需要通知
false, // 是否立即发送给能够接收消息的消费者
amqp.Publishing{
ContentType: "text/plain",
Body: []byte("Hello World!"),
})
if err != nil {
log.Fatal(err)
}
// 获取一个消息接收器
msgs, err := ch.Consume(
"simple_queue", // 队列名
"", // 消费者标识
true, // 自动应答
false, // 是否独占,排它
false, // 是否只消费远程消息
false, // 是否不等待,丢弃结果
nil, // 其他参数
)
if err != nil {
log.Fatal(err)
}
// 不断读取消息
for msg := range msgs {
// 处理消息
fmt.Println(string(msg.Body))
// 如果消息接收器不是自动应答,就需要手动ack,第二个参数表示是否批量应答
// ch.Ack(msg.DeliveryTag, false)
}
}
|