程序员最近都爱上了这个网站  程序员们快来瞅瞅吧!  it98k网:it98k.com

本站消息

站长简介/公众号

  出租广告位,需要合作请联系站长


+关注
已关注

分类  

暂无分类

标签  

暂无标签

日期归档  

2023-06(1)

SpringBoot整合ActiveMQ

发布于2021-01-01 15:54     阅读(792)     评论(0)     点赞(28)     收藏(1)


一、前言

相信看到这篇文章,大家也应该知道ActiveMQ是一个消息中间件。主要特点就是异步处理,用来减少响应时间和解耦。主要的使用场景就是将比较耗时而且不需要即时同步返回结果的操作作为消息放入消息队列。由于使用了消息队列,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,即解耦合。

二、理解ActiveMQ

怎样理解ActiveMQ呢?ActiveMQ有两种消息传递模型,一种是点对点模型,一种是发布-订阅模型。我个人比较喜欢拿微信作为对象来代入进行理解。点对点模型,就好比我给你发送微信信息,一对一,我发送给你信息之后,你在忙的时候,就肯定没有理我(额,除非~这种关系应该不存在),等你闲下来的时候,再看我给你的信息。发布-订阅模型,就好比微信公众号,一对多,只要有人订阅了我的公众号,如果我发送了一条推文,那么凡是订阅了我公众号的人,就都会收到我发的这条推文。

好了,废话不多说,下面直接用SpringBoot来整合ActiveMQ。

三、队列整合

1、新建一个maven工程,引入SpringBoot和ActiveMQ依赖

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <parent>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-parent</artifactId>
  8. <version>2.4.1</version>
  9. <relativePath/> <!-- lookup parent from repository -->
  10. </parent>
  11. <groupId>com.chen</groupId>
  12. <artifactId>chen</artifactId>
  13. <version>0.0.1-SNAPSHOT</version>
  14. <packaging>war</packaging>
  15. <name>Springboot_mq</name>
  16. <description>Demo project for Spring Boot</description>
  17. <properties>
  18. <java.version>1.8</java.version>
  19. </properties>
  20. <dependencies>
  21. <dependency>
  22. <groupId>org.springframework.boot</groupId>
  23. <artifactId>spring-boot-starter-web</artifactId>
  24. </dependency>
  25. <dependency>
  26. <groupId>org.springframework.boot</groupId>
  27. <artifactId>spring-boot-starter-tomcat</artifactId>
  28. <scope>provided</scope>
  29. </dependency>
  30. <dependency>
  31. <groupId>org.springframework.boot</groupId>
  32. <artifactId>spring-boot-starter-test</artifactId>
  33. <scope>test</scope>
  34. </dependency>
  35. <dependency>
  36. <groupId>org.springframework.boot</groupId>
  37. <artifactId>spring-boot-starter-activemq</artifactId>
  38. <version>2.1.5.RELEASE</version>
  39. </dependency>
  40. </dependencies>
  41. <build>
  42. <plugins>
  43. <plugin>
  44. <groupId>org.springframework.boot</groupId>
  45. <artifactId>spring-boot-maven-plugin</artifactId>
  46. </plugin>
  47. </plugins>
  48. </build>
  49. </project>

2、application.yml 核心配置

  1. server:
  2. port: 8086
  3. spring:
  4. activemq:
  5. broker-url: tcp://192.168.203.129:61616
  6. user: admin
  7. password: admin
  8. jms:
  9. pub-sub-domain: false # false代表队列,true代表主题
  10. queue: queue01 # 自定义命名队列

3、配置队列bean,相当于Spring容器的bean标签,注意@EnableJms注解,开启JMS的适配规则。

  1. package com.chen.config;
  2. import org.apache.activemq.command.ActiveMQQueue;
  3. import org.springframework.beans.factory.annotation.Value;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.jms.annotation.EnableJms;
  6. import org.springframework.stereotype.Component;
  7. @Component
  8. @EnableJms // 开启JMS适配
  9. public class ConfigBean {
  10. @Value("${queue}")
  11. private String myQueue; // 注入配置文件中的queue
  12. @Bean
  13. public ActiveMQQueue queue() {
  14. return new ActiveMQQueue(myQueue);
  15. }
  16. }

4、生产者发送消息

  1. package com.chen.produce;
  2. import java.util.UUID;
  3. import javax.jms.Queue;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.jms.core.JmsMessagingTemplate;
  6. import org.springframework.scheduling.annotation.Scheduled;
  7. import org.springframework.stereotype.Component;
  8. @Component
  9. public class Queue_Produce {
  10. @Autowired
  11. private JmsMessagingTemplate jmsMessagingTemplate;
  12. @Autowired
  13. private Queue queue;
  14. // 调用一次一个信息发出
  15. public void produceMessage() {
  16. jmsMessagingTemplate.convertAndSend(queue, "这是一条消息");
  17. }
  18. }

5、消费者接收消息,我这里直接使用@JmsListener注解进行监听队列,底层是由MQ的监听器实现的,还有另外一种接收消息的方法,就是MQ的receive()方法。

  1. package com.chen.consummer;
  2. import javax.jms.TextMessage;
  3. import org.springframework.jms.annotation.JmsListener;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. public class Queue_consummer {
  7. @JmsListener(destination = "${queue}") // 注解监听
  8. public void receive(TextMessage textMessage) throws Exception{
  9. System.out.println("消费者收到消息:"+textMessage.getText());
  10. }
  11. }

6、接下来利用Junit进行一个单元测试,直接运行单元方法,调用生产者进行发送消息

  1. package com.chen;
  2. import javax.annotation.Resource;
  3. import org.junit.Test;
  4. import org.junit.runner.RunWith;
  5. import org.springframework.boot.test.context.SpringBootTest;
  6. import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
  7. import org.springframework.test.context.web.WebAppConfiguration;
  8. import com.chen.produce.Queue_Produce;
  9. @SpringBootTest(classes = SpringbootMqApplication.class)
  10. @RunWith(SpringJUnit4ClassRunner.class)
  11. @WebAppConfiguration
  12. public class TestActiveMQ {
  13. @Resource
  14. private Queue_Produce queue_produce;
  15. @Test
  16. public void testSend() throws Exception {
  17. queue_produce.produceMessage();
  18. }
  19. }

可以在ActiveMQ图形化界面看到,生产者生产了一条消息,被消费者消费了一条消息

7、增加定时投递功能,利用Spring自带的@Scheduled注解实现定时任务,每隔一段时间生产一个消息,注意使用这个注解,需要在主启动类开启这个功能,不然无效启用

  1. package com.chen.produce;
  2. import java.util.UUID;
  3. import javax.jms.Queue;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.jms.core.JmsMessagingTemplate;
  6. import org.springframework.scheduling.annotation.Scheduled;
  7. import org.springframework.stereotype.Component;
  8. @Component
  9. public class Queue_Produce {
  10. @Autowired
  11. private JmsMessagingTemplate jmsMessagingTemplate;
  12. @Autowired
  13. private Queue queue;
  14. // 调用一次一个信息发出
  15. public void produceMessage() {
  16. jmsMessagingTemplate.convertAndSend(queue, "这是一条消息");
  17. }
  18. // 带定时投递的业务方法
  19. @Scheduled(fixedDelay = 3000) // 每3秒自动调用
  20. public void produceMessageScheduled() {
  21. jmsMessagingTemplate.convertAndSend(queue, "这是一条定时投递的消息,标记:" + UUID.randomUUID().toString().substring(0, 6));
  22. System.out.println("投递完成");
  23. }
  24. }
  1. package com.chen;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. import org.springframework.scheduling.annotation.EnableScheduling;
  5. @SpringBootApplication
  6. @EnableScheduling // 开启对定时任务的支持
  7. public class SpringbootMqApplication {
  8. public static void main(String[] args) {
  9. SpringApplication.run(SpringbootMqApplication.class, args);
  10. }
  11. }

四、整合主题

1、重新新建一个maven项目,依赖和第三整合队列一样,配置application.yml,注意需要将false改为true,代表使用的是主题

  1. server:
  2. port: 8088
  3. spring:
  4. activemq:
  5. broker-url: tcp://192.168.203.129:61616
  6. user: admin
  7. password: admin
  8. jms:
  9. pub-sub-domain: true
  10. topic: topic01

2、配置主题bean

  1. package com.chen.config;
  2. import javax.jms.Topic;
  3. import org.apache.activemq.command.ActiveMQTopic;
  4. import org.springframework.beans.factory.annotation.Value;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.stereotype.Component;
  7. @Component
  8. public class ConfigBean {
  9. @Value("${topic}")
  10. private String topicName;
  11. @Bean
  12. public Topic topic() {
  13. return new ActiveMQTopic(topicName);
  14. }
  15. }

3、同样是生产者

  1. package com.chen.produce;
  2. import java.util.UUID;
  3. import javax.jms.Topic;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.jms.core.JmsMessagingTemplate;
  6. import org.springframework.scheduling.annotation.Scheduled;
  7. import org.springframework.stereotype.Component;
  8. @Component
  9. public class Topic_Produce {
  10. @Autowired
  11. private JmsMessagingTemplate jmsMessagingTemplate;
  12. @Autowired
  13. private Topic topic;
  14. @Scheduled(fixedDelay = 3000)
  15. public void produceTopic() {
  16. jmsMessagingTemplate.convertAndSend(topic, "这是一条主题消息,标记:" + UUID.randomUUID().toString().substring(0, 6));
  17. }
  18. }

4、消费者

  1. package com.chen.consummer;
  2. import javax.jms.TextMessage;
  3. import org.springframework.jms.annotation.JmsListener;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. public class Topic_Consummer {
  7. @JmsListener(destination = "${topic}")
  8. public void receive(TextMessage textMessage) throws Exception{
  9. System.out.println("消费者接收订阅的主题消息:"+textMessage.getText());
  10. }
  11. }

5、因为我这里使用了定时任务,所以直接启用主启动类就行,会自动调用生产者每隔三秒生产一条消息

  1. package com.chen;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. import org.springframework.scheduling.annotation.EnableScheduling;
  5. @SpringBootApplication
  6. @EnableScheduling
  7. public class SpringbootMqTopicApplication {
  8. public static void main(String[] args) {
  9. SpringApplication.run(SpringbootMqTopicApplication.class, args);
  10. }
  11. }



所属网站分类: 技术文章 > 博客

作者:lovejava

链接:http://www.javaheidong.com/blog/article/45746/c35fcd45a177310fc301/

来源:java黑洞网

任何形式的转载都请注明出处,如有侵权 一经发现 必将追究其法律责任

28 0
收藏该文
已收藏

评论内容:(最多支持255个字符)