发布于2021-01-01 15:54 阅读(792) 评论(0) 点赞(28) 收藏(1)
相信看到这篇文章,大家也应该知道ActiveMQ是一个消息中间件。主要特点就是异步处理,用来减少响应时间和解耦。主要的使用场景就是将比较耗时而且不需要即时同步返回结果的操作作为消息放入消息队列。由于使用了消息队列,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,即解耦合。
怎样理解ActiveMQ呢?ActiveMQ有两种消息传递模型,一种是点对点模型,一种是发布-订阅模型。我个人比较喜欢拿微信作为对象来代入进行理解。点对点模型,就好比我给你发送微信信息,一对一,我发送给你信息之后,你在忙的时候,就肯定没有理我(额,除非~这种关系应该不存在),等你闲下来的时候,再看我给你的信息。发布-订阅模型,就好比微信公众号,一对多,只要有人订阅了我的公众号,如果我发送了一条推文,那么凡是订阅了我公众号的人,就都会收到我发的这条推文。
好了,废话不多说,下面直接用SpringBoot来整合ActiveMQ。
1、新建一个maven工程,引入SpringBoot和ActiveMQ依赖
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>2.4.1</version>
- <relativePath/> <!-- lookup parent from repository -->
- </parent>
- <groupId>com.chen</groupId>
- <artifactId>chen</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- <packaging>war</packaging>
- <name>Springboot_mq</name>
- <description>Demo project for Spring Boot</description>
-
- <properties>
- <java.version>1.8</java.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-tomcat</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-activemq</artifactId>
- <version>2.1.5.RELEASE</version>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- </plugin>
- </plugins>
- </build>
-
- </project>
2、application.yml 核心配置
- server:
- port: 8086
-
- spring:
- activemq:
- broker-url: tcp://192.168.203.129:61616
- user: admin
- password: admin
- jms:
- pub-sub-domain: false # false代表队列,true代表主题
-
- queue: queue01 # 自定义命名队列
3、配置队列bean,相当于Spring容器的bean标签,注意@EnableJms注解,开启JMS的适配规则。
- package com.chen.config;
-
- import org.apache.activemq.command.ActiveMQQueue;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.jms.annotation.EnableJms;
- import org.springframework.stereotype.Component;
-
- @Component
- @EnableJms // 开启JMS适配
- public class ConfigBean {
-
- @Value("${queue}")
- private String myQueue; // 注入配置文件中的queue
-
- @Bean
- public ActiveMQQueue queue() {
- return new ActiveMQQueue(myQueue);
- }
-
- }
4、生产者发送消息
- package com.chen.produce;
-
- import java.util.UUID;
-
- import javax.jms.Queue;
-
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.jms.core.JmsMessagingTemplate;
- import org.springframework.scheduling.annotation.Scheduled;
- import org.springframework.stereotype.Component;
-
- @Component
- public class Queue_Produce {
-
- @Autowired
- private JmsMessagingTemplate jmsMessagingTemplate;
-
- @Autowired
- private Queue queue;
-
- // 调用一次一个信息发出
- public void produceMessage() {
- jmsMessagingTemplate.convertAndSend(queue, "这是一条消息");
- }
-
- }
5、消费者接收消息,我这里直接使用@JmsListener注解进行监听队列,底层是由MQ的监听器实现的,还有另外一种接收消息的方法,就是MQ的receive()方法。
- package com.chen.consummer;
-
- import javax.jms.TextMessage;
-
- import org.springframework.jms.annotation.JmsListener;
- import org.springframework.stereotype.Component;
-
- @Component
- public class Queue_consummer {
-
- @JmsListener(destination = "${queue}") // 注解监听
- public void receive(TextMessage textMessage) throws Exception{
- System.out.println("消费者收到消息:"+textMessage.getText());
- }
-
- }
6、接下来利用Junit进行一个单元测试,直接运行单元方法,调用生产者进行发送消息
- package com.chen;
-
- import javax.annotation.Resource;
-
- import org.junit.Test;
- import org.junit.runner.RunWith;
- import org.springframework.boot.test.context.SpringBootTest;
- import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
- import org.springframework.test.context.web.WebAppConfiguration;
-
- import com.chen.produce.Queue_Produce;
-
- @SpringBootTest(classes = SpringbootMqApplication.class)
- @RunWith(SpringJUnit4ClassRunner.class)
- @WebAppConfiguration
- public class TestActiveMQ {
-
- @Resource
- private Queue_Produce queue_produce;
-
- @Test
- public void testSend() throws Exception {
- queue_produce.produceMessage();
- }
- }
可以在ActiveMQ图形化界面看到,生产者生产了一条消息,被消费者消费了一条消息
7、增加定时投递功能,利用Spring自带的@Scheduled注解实现定时任务,每隔一段时间生产一个消息,注意使用这个注解,需要在主启动类开启这个功能,不然无效启用
- package com.chen.produce;
-
- import java.util.UUID;
-
- import javax.jms.Queue;
-
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.jms.core.JmsMessagingTemplate;
- import org.springframework.scheduling.annotation.Scheduled;
- import org.springframework.stereotype.Component;
-
- @Component
- public class Queue_Produce {
-
- @Autowired
- private JmsMessagingTemplate jmsMessagingTemplate;
-
- @Autowired
- private Queue queue;
-
- // 调用一次一个信息发出
- public void produceMessage() {
- jmsMessagingTemplate.convertAndSend(queue, "这是一条消息");
- }
-
- // 带定时投递的业务方法
- @Scheduled(fixedDelay = 3000) // 每3秒自动调用
- public void produceMessageScheduled() {
- jmsMessagingTemplate.convertAndSend(queue, "这是一条定时投递的消息,标记:" + UUID.randomUUID().toString().substring(0, 6));
- System.out.println("投递完成");
- }
-
- }
- package com.chen;
-
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
- import org.springframework.scheduling.annotation.EnableScheduling;
-
- @SpringBootApplication
- @EnableScheduling // 开启对定时任务的支持
- public class SpringbootMqApplication {
-
- public static void main(String[] args) {
- SpringApplication.run(SpringbootMqApplication.class, args);
- }
-
- }
1、重新新建一个maven项目,依赖和第三整合队列一样,配置application.yml,注意需要将false改为true,代表使用的是主题
- server:
- port: 8088
-
- spring:
- activemq:
- broker-url: tcp://192.168.203.129:61616
- user: admin
- password: admin
- jms:
- pub-sub-domain: true
-
- topic: topic01
2、配置主题bean
- package com.chen.config;
-
- import javax.jms.Topic;
-
- import org.apache.activemq.command.ActiveMQTopic;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.stereotype.Component;
-
- @Component
- public class ConfigBean {
-
- @Value("${topic}")
- private String topicName;
-
- @Bean
- public Topic topic() {
- return new ActiveMQTopic(topicName);
- }
-
- }
3、同样是生产者
- package com.chen.produce;
-
- import java.util.UUID;
-
- import javax.jms.Topic;
-
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.jms.core.JmsMessagingTemplate;
- import org.springframework.scheduling.annotation.Scheduled;
- import org.springframework.stereotype.Component;
-
- @Component
- public class Topic_Produce {
- @Autowired
- private JmsMessagingTemplate jmsMessagingTemplate;
- @Autowired
- private Topic topic;
-
- @Scheduled(fixedDelay = 3000)
- public void produceTopic() {
- jmsMessagingTemplate.convertAndSend(topic, "这是一条主题消息,标记:" + UUID.randomUUID().toString().substring(0, 6));
- }
- }
4、消费者
- package com.chen.consummer;
-
- import javax.jms.TextMessage;
-
- import org.springframework.jms.annotation.JmsListener;
- import org.springframework.stereotype.Component;
-
- @Component
- public class Topic_Consummer {
-
- @JmsListener(destination = "${topic}")
- public void receive(TextMessage textMessage) throws Exception{
- System.out.println("消费者接收订阅的主题消息:"+textMessage.getText());
- }
-
- }
5、因为我这里使用了定时任务,所以直接启用主启动类就行,会自动调用生产者每隔三秒生产一条消息
- package com.chen;
-
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
- import org.springframework.scheduling.annotation.EnableScheduling;
-
- @SpringBootApplication
- @EnableScheduling
- public class SpringbootMqTopicApplication {
-
- public static void main(String[] args) {
- SpringApplication.run(SpringbootMqTopicApplication.class, args);
- }
-
- }
作者:lovejava
链接:http://www.javaheidong.com/blog/article/45746/c35fcd45a177310fc301/
来源:java黑洞网
任何形式的转载都请注明出处,如有侵权 一经发现 必将追究其法律责任
昵称:
评论内容:(最多支持255个字符)
---无人问津也好,技不如人也罢,你都要试着安静下来,去做自己该做的事,而不是让内心的烦躁、焦虑,坏掉你本来就不多的热情和定力
Copyright © 2018-2021 java黑洞网 All Rights Reserved 版权所有,并保留所有权利。京ICP备18063182号-2
投诉与举报,广告合作请联系vgs_info@163.com或QQ3083709327
免责声明:网站文章均由用户上传,仅供读者学习交流使用,禁止用做商业用途。若文章涉及色情,反动,侵权等违法信息,请向我们举报,一经核实我们会立即删除!