程序员最近都爱上了这个网站  程序员们快来瞅瞅吧!  it98k网:it98k.com

本站消息

站长简介/公众号

  出租广告位,需要合作请联系站长


+关注
已关注

分类  

暂无分类

标签  

暂无标签

日期归档  

2023-06(3)

RabbitMQ(二)JavaClient SpringBoot集成 Work queues

发布于2020-11-19 20:27     阅读(898)     评论(0)     点赞(30)     收藏(4)


Java Client

package com.fang.java_client.work_queue_2;

import com.fang.java_client.utils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * @Author Mr. Sun.
 * @Date 2020-11-12 16:44
 *
 * 这种模型默认平均分配
 *  解决办法就是把默认确认机制关掉
 */
public class Provider {

    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        // 申明队列 这些true false 什么的点进去看,有英文注释!
        channel.queueDeclare("worker", true, false, false, null);
        // 生产消息
        for (int i=0;i<100;i++) {
            channel.basicPublish("", "worker", null, ("hello work queue"+i).getBytes());
        }
        // 关闭资源
        RabbitMQUtils.closeConnectAndChannel(channel,connection);

    }
}

// 两个消费者
package com.fang.java_client.work_queue_2;

import com.fang.java_client.utils.RabbitMQUtils;
import com.rabbitmq.client.*;
import lombok.SneakyThrows;

import java.io.IOException;

/**
 * @Author Mr. Sun.
 * @Date 2020-11-12 17:35
 */
public class Consumer_1 {
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("worker", true, false, false, null);

        // 告诉通道一次只消费一个
        // 解决能者多劳!
        channel.basicQos(1);


        // 默认确认机制 autoAck
        channel.basicConsume("worker", false, new DefaultConsumer(channel) {
            @SneakyThrows
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumer1" + new String(body));
                Thread.sleep(500);
                // 手动确认
                //@param multiple 是否开启多个消息同时确认
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}
-------------------------------------------------------------------
package com.fang.java_client.work_queue_2;

import com.fang.java_client.utils.RabbitMQUtils;
import com.rabbitmq.client.*;
import lombok.SneakyThrows;

import java.io.IOException;

/**
 * @Author Mr. Sun.
 * @Date 2020-11-12 17:35
 */
public class Consumer_2 {
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        // 告诉通道一次只消费一个
        channel.basicQos(1);

        channel.queueDeclare("worker", true, false, false, null);
        channel.basicConsume("worker", false, new DefaultConsumer(channel) {
            @SneakyThrows
            @Override
            public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumer2" + new String(body));
                Thread.sleep(1000);
                // 手动确认
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}

SpringBoot

	@Test// worke queues 模型
    void test_work_queue() {
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("work", "worker 模型"+i);
        }
    }



	// worker 模型默认公平模型~
    @RabbitListener(queuesToDeclare=@Queue(value = "work"))
    private void receiveMsgWork1(String msg){
        System.out.println("worker_1:"+msg);
    }

    @RabbitListener(queuesToDeclare=@Queue(value = "work"))
    private void receiveMsgWork2(String msg){
        System.out.println("worker_2:"+msg);
    }
 


所属网站分类: 技术文章 > 博客

作者:天使之恋

链接:http://www.javaheidong.com/blog/article/986/68f8e03d0ada45a58de2/

来源:java黑洞网

任何形式的转载都请注明出处,如有侵权 一经发现 必将追究其法律责任

30 0
收藏该文
已收藏

评论内容:(最多支持255个字符)