程序员最近都爱上了这个网站  程序员们快来瞅瞅吧!  it98k网:it98k.com

本站消息

站长简介/公众号

  出租广告位,需要合作请联系站长


+关注
已关注

分类  

暂无分类

标签  

暂无标签

日期归档  

第9篇 rabbitmq WorkPool源码解析

发布于2021-06-12 14:58     阅读(486)     评论(0)     点赞(21)     收藏(3)


消费者消费消息它有一个工作池的概念,所以现在研究一下WorkPool的源码,看一看可以学到什么技巧。

  • 这个实现是一个线程安全的类

  • WorkPool<K,W> K : 客户端类型, W : 工作项目类型

1、成员变量

  • MAX_QUEUE_LENGTH = 1000 队列最大为1000
  • ready = new SetQueue() 准备好客户端
  • inProgress = new HashSet() 正在处理客户端
  • Map<K, VariableLinkedBlockingQueue> = new HashMap 真正的池子,包括注册客户端和它们工作队列
  • unlimited = new HashSet() 不限制队列大小
  • enqueueingCallback 入队回调函数 (使用BiConsumer)

2、构造函数

  • 定义enqueueingCallback 的函数,入队工作项

    • public WorkPool(final int queueingTimeout) {
              if (queueingTimeout > 0) {
                  this.enqueueingCallback = (queue, item) -> {
                      try {
                          boolean offered = queue.offer(item, queueingTimeout, TimeUnit.MILLISECONDS);
                          if (!offered) {
                              throw new WorkPoolFullException("Could not enqueue in work pool after " + queueingTimeout + " ms.");
                          }
                      } catch (InterruptedException e) {
                          Thread.currentThread().interrupt();
                      }
                  };
              } else {
                  this.enqueueingCallback = (queue, item) -> {
                      try {
                          queue.put(item);
                      } catch (InterruptedException e) {
                          Thread.currentThread().interrupt();
                      }
                  };
              }
          }
      

3、核心方法

  • com.rabbitmq.client.impl.WorkPool#registerKey 注册客户类型(key)

    • 注册到pool中
  • com.rabbitmq.client.impl.WorkPool#limit 如果unlimited.isEmpty 时那么pool对于value的队列长度是1000

  • com.rabbitmq.client.impl.WorkPool#unlimit 如果unlimited.isNotEmpty 时那么pool对于value的队列长度是Integer.MAX_VALUE 【个人这个用boolean值不是很好,可能是后期扩展需要】

  • com.rabbitmq.client.impl.WorkPool#unregisterKey 注销客户类型(key) remove操作

  • com.rabbitmq.client.impl.WorkPool#unregisterAllKeys 注销所有客户类型(key) clear操作

  • com.rabbitmq.client.impl.WorkPool#nextWorkBlock (获取ready 客户端类型KEY, 对应queue的元素,最理想情况是拿到size的元素)

    • image-20210608205418406
  • com.rabbitmq.client.impl.WorkPool#addWorkItem (添加工作项,pool必须存在对应客户端类型KEY,且添加ready队列中)

    • image-20210608210717906
  • com.rabbitmq.client.impl.WorkPool#finishWorkBlock 判断是否完成工作块

    • key不存在pool中返回false
    • key不存在inProgress,就抛出IllegalStateException异常
    • 如果这个Key对应pool队列还有未处理完元素,那么这个key将会在inProgress, 同时添加到ready队列中 返回true
    • key对应pool队列的元素全部处理完了,那么直接从inProgress移除,返回false

4、SetQueue

  • WorkPool核心应该是ready队列.
  • 不是线程安全
  • 元素不能为null
  • 这个类没有什么特别地方只是具有set和queue一些属性方法,相对于一份数据存了两种数据结构

4.1、成员变量

  • Set members = new HashSet(); 保证任务的唯一性
  • Queue queue = new LinkedList(); 链表队队列

5、VariableLinkedBlockingQueue

  • extends AbstractQueue implements BlockingQueue, java.io.Serializable
  • 其他这个类是克隆 java.util.concurrent.LinkedBlockingQueue
  • 只是增加了setCapacity(int)方法,运行我们取修改队列长度

6、总结

  • 一般我也是可以利用现有集合组合满足要求类,就像SetQueue(同时保证唯一和队列属性)
  • 对应二元没有返回值可以使用BiConsumer函数,其使用者是不需要关系函数具体定义的

学习点

BiConsumer使用示例 (两个数进行数学运算)

  • public class JavaBiConsumer3 {
    
        public static void main(String[] args) {
    
            math(1, 1, (x, y) -> System.out.println(x + y));   // 2
            math(1, 1, (x, y) -> System.out.println(x - y));   // 0
            math(1, 1, (x, y) -> System.out.println(x * y));   // 1
            math(1, 1, (x, y) -> System.out.println(x / y));   // 1
    
        }
    
        static <Integer> void math(Integer a1, Integer a2, BiConsumer<Integer, Integer> c) {
            c.accept(a1, a2);
        }
    
    }
    
  • Map.forEach也有用到BiConsumer

    •  default void forEach(BiConsumer<? super K, ? super V> action) {
              Objects.requireNonNull(action);
              for (Map.Entry<K, V> entry : entrySet()) {
                  K k;
                  V v;
                  try {
                      k = entry.getKey();
                      v = entry.getValue();
                  } catch(IllegalStateException ise) {
                      // this usually means the entry is no longer in the map.
                      throw new ConcurrentModificationException(ise);
                  }
                  action.accept(k, v);
              }
          }
      
  • 下次使用两个对象进行相关操作可以使用BiConsumer

结尾

  • 感谢大家的耐心阅读,如有建议请私信或评论留言。
  • 如有收获,劳烦支持,关注、点赞、评论、收藏均可,博主会经常更新,与大家共同进步
  • 既然rabbitmq用到LinkedBlockingQueue,下一篇研究一下 LinkedBlockingQueue

参考

[Java 8 BiConsumer Examples](

原文链接:https://blog.csdn.net/m0_37355951/article/details/117717560



所属网站分类: 技术文章 > 博客

作者:我爱编程

链接:http://www.javaheidong.com/blog/article/222357/07395e5b54b3f84c6496/

来源:java黑洞网

任何形式的转载都请注明出处,如有侵权 一经发现 必将追究其法律责任

21 0
收藏该文
已收藏

评论内容:(最多支持255个字符)