发布于2021-05-29 19:52 阅读(724) 评论(0) 点赞(4) 收藏(4)
RabbitMQ 是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,RabbitMQ 是使用 Erlang 语言来编写的,并且 RabbitMQ 是基于 AMQP 协议的。
AMQP(Advanced Message Queuing Protocol),高级消息队列协议。
RabbitMQ 官网
Messaging that just works - RabbitMQ
拉取并运行镜像
使用 Docker 安装最简单,一条语句搞定
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
-d:后台运行
—name:指定容器名称为 rabbitmq
-p:将管理页面端口 15672 和程序通讯端口 5672 映射到主机相同的端口
访问管理页面
然后在浏览器输入 http://[ip]:15672/
,就可以进入 RabbitMQ 后台管理界面。
默认的账号密码都是 guest
说明:
主要有两步:
创建生产者项目
目录结构:
添加 RabbitMQ 依赖
<!-- RabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--工具类依赖包-->
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.4</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.49</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
配置生产者 application.properties
# RabbitMQ 配置
# RabbitMQ 地址
spring.rabbitmq.addresses=192.168.255.8:5672
# 用户名
spring.rabbitmq.username=guest
# 密码
spring.rabbitmq.password=guest
# 配置默认虚拟主机
spring.rabbitmq.virtual-host=/
# 连接超时时间
spring.rabbitmq.connection-timeout=15000ms
# 项目路径
server.servlet.context-path=/
server.port=8001
spring.http.encoding.charset=UTF-8
spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone=GMT+8
spring.jackson.default-property-inclusion=NON_NULL
生产者发送消息
@Component
public class OrderSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(Order order) {
CorrelationData correlationData = new CorrelationData();
correlationData.setId(order.getMessageId());
/**
* exchange:指定交换机
* routingKey:指定 routingKey
* object:指定消息体内同
* correlationData:消息唯一 ID
*/
rabbitTemplate.convertAndSend("order-exchange", "order.abcd",
order, correlationData);
}
}
创建队列(Queue)
创建交换机(Exchange)
Exchange 绑定 Queue
在 Exchange 标签页下,点击刚刚新建的 Exchange, order-exchange
在 Bindings
选项卡下,指定这个 order-exchange
和 之前新建的队列 order-queue
进行绑定。
Routing key
指定路由键,Exchange 就是靠这个键来路由消息到对应的队列中的。
这里填写的值可以是明确的字符串,例如 order.abcd
,也可以用通配符 order.*
,匹配诸如 order.a
、 order.b
等的 Key, *
只能匹配第一层,如果要匹配 [order.xxx.xxx](http://order.xxx.xxx)
这样多层的话,需要用 #
。
最后点击 Bind
进行绑定。
测试生产者发送消息
@SpringBootTest
class SpringbootRabbitmqProducerApplicationTests {
@Test
void contextLoads() {
}
@Autowired
private OrderSender orderSender;
@Test
public void testSend1() {
Order order = new Order();
order.setId("202104160000001");
order.setName("测试订单1");
order.setMessageId(System.currentTimeMillis() + "$" + UUID.randomUUID().toString());
orderSender.send(order);
}
}
验证消息发送
在 RabbitMQ 后台管理页面的 Queues
标签下,可以看到,发送的消息已经成功传递到 order-queue
队列当中了,并且是准备状态,准备被消费者消费。
创建消费者项目
目录结构:
配置消费者 apprication.properties
# RabbitMQ 配置
# RabbitMQ 地址
spring.rabbitmq.addresses=192.168.255.8:5672
# 用户名
spring.rabbitmq.username=guest
# 密码
spring.rabbitmq.password=guest
# 配置默认虚拟主机
spring.rabbitmq.virtual-host=/
# 连接超时时间
spring.rabbitmq.connection-timeout=15000ms
# 消费者配置
# 最大并发数
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10
# 签收模式,手动签收
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 用于限流,同一时间只消费 1 条
spring.rabbitmq.listener.simple.prefetch=1
# 项目路径
server.servlet.context-path=/
server.port=8002
消费者消费信息
@Component
public class OrderReceiver {
/**
*
* @param order @Payload 指定接收的消息体为 Order
* @param headers 请求头
* @param channel application.properties 中配置的 spring.rabbitmq.listener.simple.acknowledge-mode
* 为 manual 手动签收,所以这里需要用到 Channel
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "order-queue", durable = "true"),
exchange = @Exchange(name = "order-exchange", durable = "true", type = "topic"),
key = "order.*"
))
@RabbitHandler
public void onOrderMessage(@Payload Order order, @Headers Map<String, Object> headers,
Channel channel) throws IOException {
System.out.println("---------收到消息,开始消费---------");
System.out.println("订单 ID:" + order.getId());
// 获取交付标签
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
// 手动签收模式,需要主动调用 channel 的 basicAck 方法,告知
// RabbitMQ 确认收到了消息
channel.basicAck(deliveryTag, false);
}
}
@RabbitListener
注解用来指定消费者的这个方法监听的是哪个队列中的消息。
其中, @QueueBinding
指定 Queue 和 Exchange 的绑定关系,
value
属性指定绑定的队列;
exchange
属性指定交换机 Exchange;
key
属性指定 RountingKey。
如果指定的这些队列(Queue)、交换机(Exchange)还有它们之间的绑定关系在 RabbitMQ 后台还没有创建的话,@RabbitListener
会帮我们自动创建好。
@RabbitHandler
标注的方法表示,这个方法是监听到队列消息后的处理方法。
测试消费者消费信息
消费者 Application 启动
生产者执行发送消息测试方法
消费者端看到信息打印,说明已经成功消费消息
测试消费者端遇到的问题
Caused by: java.lang.ClassNotFoundException: com.myj.springbootrabbitmqproducer.entity.Order
这个错误的原因是生产者端发送的实体类 Order
和消费者端的实体类 Order
包路径不一致所致。
解决:只需要将两个实体类的包路径改为一致即可。
Caused by: java.io.InvalidClassException: com.myj.springbootrabbitmq.entity.Order; local class incompatible: stream classdesc serialVersionUID = -8605719115300010363, local class serialVersionUID = -4799427397249076144
这个错误是因为生产者端的实体类的 serialVersionUID
和消费者端的不一致导致。
解决:只需要保证两个实体类的 serialVersionUID
一致即可。
说明:
小结下就是,通过生产者监听 MQ Broker 的回调函数,和定时任务的重试来保证消息的成功投递。
原文链接:https://blog.csdn.net/qq_23412263/article/details/117038302
作者:你看我可爱不
链接:http://www.javaheidong.com/blog/article/207255/48b1488104427edf6ec5/
来源:java黑洞网
任何形式的转载都请注明出处,如有侵权 一经发现 必将追究其法律责任
昵称:
评论内容:(最多支持255个字符)
---无人问津也好,技不如人也罢,你都要试着安静下来,去做自己该做的事,而不是让内心的烦躁、焦虑,坏掉你本来就不多的热情和定力
Copyright © 2018-2021 java黑洞网 All Rights Reserved 版权所有,并保留所有权利。京ICP备18063182号-2
投诉与举报,广告合作请联系vgs_info@163.com或QQ3083709327
免责声明:网站文章均由用户上传,仅供读者学习交流使用,禁止用做商业用途。若文章涉及色情,反动,侵权等违法信息,请向我们举报,一经核实我们会立即删除!