程序员最近都爱上了这个网站  程序员们快来瞅瞅吧!  it98k网:it98k.com

本站消息

站长简介/公众号

  出租广告位,需要合作请联系站长


+关注
已关注

分类  

暂无分类

标签  

暂无标签

日期归档  

RabbitMQ 总结

发布于2021-05-29 19:52     阅读(592)     评论(0)     点赞(4)     收藏(4)


RabbitMQ 定义

RabbitMQ 是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,RabbitMQ 是使用 Erlang 语言来编写的,并且 RabbitMQ 是基于 AMQP 协议的。

AMQP

AMQP(Advanced Message Queuing Protocol),高级消息队列协议。

在这里插入图片描述

RabbitMQ 安装

RabbitMQ 官网

Messaging that just works - RabbitMQ

Docker 安装 RabbitMQ

拉取并运行镜像

使用 Docker 安装最简单,一条语句搞定

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

-d:后台运行

—name:指定容器名称为 rabbitmq

-p:将管理页面端口 15672 和程序通讯端口 5672 映射到主机相同的端口

访问管理页面

然后在浏览器输入 http://[ip]:15672/ ,就可以进入 RabbitMQ 后台管理界面。

在这里插入图片描述

默认的账号密码都是 guest

在这里插入图片描述

RabbitMQ 消息流转过程

在这里插入图片描述

说明:

  • 生产者(Publisher application)发送消息(Message)的时候需要指定交换机(Exchange)和路由键(Rounting Key)
  • 一个交换机(Exchange)下会有多个消息队列(Message Queue),交换机和消息队列之间会绑定路由键
  • 交换机(Exchange)收到消息(Message)后,通过绑定的路由键(Rounting Key)路由到对应的消息队列(Message Queue)中
  • 消费者(Consumer application)监听到消息队列有消息,直接消费

SpringBoot 整合 RabbitMQ

主要有两步:

生产者端

创建生产者项目

目录结构:

在这里插入图片描述

添加 RabbitMQ 依赖

<!-- RabbitMQ -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <!--工具类依赖包-->
        <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.4</version>
        </dependency>
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.4</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.49</version>
        </dependency>
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>javax.servlet-api</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

配置生产者 application.properties

# RabbitMQ 配置
# RabbitMQ 地址
spring.rabbitmq.addresses=192.168.255.8:5672
# 用户名
spring.rabbitmq.username=guest
# 密码
spring.rabbitmq.password=guest
# 配置默认虚拟主机
spring.rabbitmq.virtual-host=/
# 连接超时时间
spring.rabbitmq.connection-timeout=15000ms

# 项目路径
server.servlet.context-path=/
server.port=8001
spring.http.encoding.charset=UTF-8
spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone=GMT+8
spring.jackson.default-property-inclusion=NON_NULL

生产者发送消息

@Component
public class OrderSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(Order order) {

        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(order.getMessageId());

        /**
         * exchange:指定交换机
         * routingKey:指定 routingKey
         * object:指定消息体内同
         * correlationData:消息唯一 ID
         */
        rabbitTemplate.convertAndSend("order-exchange", "order.abcd",
                order, correlationData);
    }
}

创建队列(Queue)

在这里插入图片描述

创建交换机(Exchange)

在这里插入图片描述

Exchange 绑定 Queue

在 Exchange 标签页下,点击刚刚新建的 Exchange, order-exchange

在这里插入图片描述

Bindings 选项卡下,指定这个 order-exchange 和 之前新建的队列 order-queue 进行绑定。

在这里插入图片描述

Routing key 指定路由键,Exchange 就是靠这个键来路由消息到对应的队列中的。

这里填写的值可以是明确的字符串,例如 order.abcd ,也可以用通配符 order.* ,匹配诸如 order.aorder.b 等的 Key, * 只能匹配第一层,如果要匹配 [order.xxx.xxx](http://order.xxx.xxx) 这样多层的话,需要用 #

最后点击 Bind 进行绑定。

在这里插入图片描述

测试生产者发送消息

@SpringBootTest
class SpringbootRabbitmqProducerApplicationTests {

    @Test
    void contextLoads() {
    }

    @Autowired
    private OrderSender orderSender;

    @Test
    public void testSend1() {
        Order order = new Order();
        order.setId("202104160000001");
        order.setName("测试订单1");
        order.setMessageId(System.currentTimeMillis() + "$" + UUID.randomUUID().toString());

        orderSender.send(order);
    }
}

验证消息发送

在这里插入图片描述

在 RabbitMQ 后台管理页面的 Queues 标签下,可以看到,发送的消息已经成功传递到 order-queue 队列当中了,并且是准备状态,准备被消费者消费。

消费者端

创建消费者项目

目录结构:

在这里插入图片描述

配置消费者 apprication.properties

# RabbitMQ 配置
# RabbitMQ 地址
spring.rabbitmq.addresses=192.168.255.8:5672
# 用户名
spring.rabbitmq.username=guest
# 密码
spring.rabbitmq.password=guest
# 配置默认虚拟主机
spring.rabbitmq.virtual-host=/
# 连接超时时间
spring.rabbitmq.connection-timeout=15000ms

# 消费者配置
# 最大并发数
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10
# 签收模式,手动签收
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 用于限流,同一时间只消费 1 条
spring.rabbitmq.listener.simple.prefetch=1

# 项目路径
server.servlet.context-path=/
server.port=8002

消费者消费信息

@Component
public class OrderReceiver {

    /**
     *
     * @param order @Payload 指定接收的消息体为 Order
     * @param headers 请求头
     * @param channel application.properties 中配置的 spring.rabbitmq.listener.simple.acknowledge-mode
     *                为 manual 手动签收,所以这里需要用到 Channel
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "order-queue", durable = "true"),
            exchange = @Exchange(name = "order-exchange", durable = "true", type = "topic"),
            key = "order.*"
    ))
    @RabbitHandler
    public void onOrderMessage(@Payload Order order, @Headers Map<String, Object> headers,
                               Channel channel) throws IOException {
        System.out.println("---------收到消息,开始消费---------");
        System.out.println("订单 ID:" + order.getId());

        // 获取交付标签
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        // 手动签收模式,需要主动调用 channel 的 basicAck 方法,告知
        // RabbitMQ 确认收到了消息
        channel.basicAck(deliveryTag, false);

    }
}

@RabbitListener 注解用来指定消费者的这个方法监听的是哪个队列中的消息。

其中, @QueueBinding 指定 Queue 和 Exchange 的绑定关系,

value 属性指定绑定的队列;

exchange 属性指定交换机 Exchange;

key 属性指定 RountingKey。

如果指定的这些队列(Queue)、交换机(Exchange)还有它们之间的绑定关系在 RabbitMQ 后台还没有创建的话,@RabbitListener 会帮我们自动创建好。

@RabbitHandler 标注的方法表示,这个方法是监听到队列消息后的处理方法。

测试消费者消费信息

消费者 Application 启动

在这里插入图片描述

生产者执行发送消息测试方法

消费者端看到信息打印,说明已经成功消费消息

在这里插入图片描述

测试消费者端遇到的问题

  • 实体类找不到

Caused by: java.lang.ClassNotFoundException: com.myj.springbootrabbitmqproducer.entity.Order

这个错误的原因是生产者端发送的实体类 Order 和消费者端的实体类 Order 包路径不一致所致。

解决:只需要将两个实体类的包路径改为一致即可。

  • 序列化失败

Caused by: java.io.InvalidClassException: com.myj.springbootrabbitmq.entity.Order; local class incompatible: stream classdesc serialVersionUID = -8605719115300010363, local class serialVersionUID = -4799427397249076144

这个错误是因为生产者端的实体类的 serialVersionUID 和消费者端的不一致导致。

解决:只需要保证两个实体类的 serialVersionUID 一致即可。

消息可靠性投递

在这里插入图片描述

说明:

  • 发送者(Sender)发送消息之前,先将业务数据和消息日志做入库处理,消息状态标识为发送中(status=0)(Step1)
  • 再将消息发送给 MQ Broker**(Step2)**
  • MQ Broker 接收到消息会回调一个 Confirm Callback ,通过在生产力定义的 Confirm Listener 监听 MQ Broker 回调的标示,判断消息是否成功发送**(Step3)**
  • 如果回调成功标示的话,更新消息日志里的消息记录状态为成功发送(status=1)(Step4)
  • 对于消息发送到 MQ Broker 失败或者因为网络原因没有接收到 MQ Broker 的回调的,定义定时器任务,定时将状态为发送中(status=0)的消息查询返回**(Step5)**
  • 定时任务对这些状态为发送中的消息进行发送重试**(Step6)**
  • 重试一定次数后仍然无法成功发送,则将消息状态置为失败(status=2)(Step7)

小结下就是,通过生产者监听 MQ Broker 的回调函数,和定时任务的重试来保证消息的成功投递。

原文链接:https://blog.csdn.net/qq_23412263/article/details/117038302



所属网站分类: 技术文章 > 博客

作者:你看我可爱不

链接:http://www.javaheidong.com/blog/article/207255/48b1488104427edf6ec5/

来源:java黑洞网

任何形式的转载都请注明出处,如有侵权 一经发现 必将追究其法律责任

4 0
收藏该文
已收藏

评论内容:(最多支持255个字符)