程序员最近都爱上了这个网站  程序员们快来瞅瞅吧!  it98k网:it98k.com

本站消息

站长简介/公众号

  出租广告位,需要合作请联系站长


+关注
已关注

分类  

java(0)

标签  

暂无标签

日期归档  

2023-06(2)

第4篇、rabbitmq客户端ConnectionFactory(源码解析)

发布于2021-06-08 11:22     阅读(654)     评论(0)     点赞(17)     收藏(3)


这篇文章主要熟悉一下rabbitmq客户端的核心类ConnectionFactory,主要会从几个方面入手,成员变量整理和核心方法整理,得出它们核心能力,做一些实践验证

1、成员变量整理

  • fielddefault(默认)含义
    virtualHost/虚拟主机概念(隔离消息能力)
    port-1端口号(默认端口号非ssl为5672,ssl为5671)
    hostlocalhost主机地址
    requestedChannelMax2047最大通道数,2047 因为它是服务器端的 2048 减去通道 0,每个连接用于协商和错误通信
    requestedFrameMax0默认是0,表示请求帧大小没有任何限制
    requestedHeartbeat60默认心跳检测间隔是60s
    connectionTimeout60000默认连接超时是60s
    handshakeTimeout10000默认握手超时是10s
    shutdownTimeout10000默认关闭超时是10s,如果是0则无限等待
    _clientProperties默认rabbitmq客户端自带属性,客户端能力
    socketFactorynullsocket工厂类(jdk自带的)
    saslConfig简单验证和安全层配置默认机制为PLAIN
    sharedExecutor共享执行器ExecutorService (jdk自带)
    threadFactory线程工厂类new DefaultThreadFactory()(jdk自带)
    shutdownExecutor关闭执行器ExecutorService (jdk自带)
    heartbeatExecutor心跳执行器ScheduledExecutorService
    socketConfsocket配置默认是非Nagle算法,也就是无延迟,数据包立即发送(有人就发车,不需要坐满),这里使用jdk1.8 函数特性
    exceptionHandler异常处理器默认异常处理
    credentialsProvider默认username=guest, password=guest验证用户名和密码
    automaticRecoverytrue是否自动恢复(由于各种原因断开了)
    topologyRecoverytrue拓扑恢复
    topologyRecoveryExecutornull拓扑恢复执行器
    networkRecoveryInterval5000单位是毫秒,就是5s
    recoveryDelayHandlernull恢复延迟处理器
    metricsCollectorNoOpMetricsCollectorrabbitmq消息监控收集器
    niofalse是否为nio
    frameHandlerFactorynull帧处理器工厂类(socket处理)
    nioParamsnew NioParamsnio一些默认参数
    sslContextFactorynullssl上下文工厂类
    channelRpcTimeout10min默认通道超时时间为10分钟
    channelShouldCheckRpcResponseTypefalse是否需要检查返回rpc调用结果类型
    errorOnWriteListenernull监听发生io写异常,并传播异常
    workPoolTimeout-1工作线程池排队超时时间,默认没有超时时间
    topologyRecoveryFilternulltopology恢复过滤
    connectionRecoveryTriggeringConditionnull条件自动恢复连接(Predicate)
    topologyRecoveryRetryHandlernull重试处理拓扑恢复处理器
    trafficListenerTrafficListener.NO_OP命令流量监控,一般用于debug
  • 以上是所有成员变量,我们以时间轴进行整理,主要分一下几块(如图所示)

    1. 连接和握手相关成员变量

    2. 发送数据和处理数据成员变量

    3. 异常逻辑处理和监听成员变量

    4. 发生异常之后进行恢复和恢复策略成员变量

    5. 关闭连接成员变量

    6. 心跳检测成员变量

    7. 指标收集器(收集客户与服务端交互整个过程)

  • image-20210604111434256

2、核心方法

  • com.rabbitmq.client.ConnectionFactory#portOrDefault (获取端口号)

    1. 自定义优先级最高
    2. 非ssl默认端口是5672
    3. ssl默认端口是5671
  • com.rabbitmq.client.ConnectionFactory#setCredentialsProvider (设置凭证)

    1. 用户名和密码都是封装到这个类中
    2. 可以自定义实现
  • com.rabbitmq.client.ConnectionFactory#setUri(java.net.URI) (解析uri)

    1. 一般都是加载配置文件时候可能用到
    2. amqp是非ssl方式,amqps是ssl方式
    3. 解析内容是协议头,host,port,username,password, vhost
  • com.rabbitmq.client.ConnectionFactory#isSSL (判断是否为ssl)

    1. SocketFactory 是否为SSLSocketFactory或sslContextFactory不为空
  • com.rabbitmq.client.ConnectionFactory#useSslProtocol() (使用ssl进行连接)

    1. 默认协议是TLSv1
    2. 后面还需要仔细研究一下ssl (TODO)
  • com.rabbitmq.client.ConnectionFactory#createFrameHandlerFactory (创建数据帧处理工厂类)

    1. 核心socket处理(比较重要需要仔细看一下)
  • com.rabbitmq.client.ConnectionFactory#newConnection(创建连接对象)

    1. 多个重载方法(最终方法入参executor,addressResolver, clientProvideName)
  • com.rabbitmq.client.ConnectionFactory#load (加载连接rabbit属性)

    1. 从配置文件中加载

3、总结

3.1、socketConf (成员变量)

  • 点开看一下发现是自定义的函数,也就jdk1.8引入的

    • @FunctionalInterface
      public interface SocketConfigurator {
      
          /**
           * Provides a hook to insert custom configuration of the sockets
           * used to connect to an AMQP server before they connect.
           */
          void configure(Socket socket) throws IOException;
      
          /**
           * Returns a composed configurator that performs, in sequence, this
           * operation followed by the {@code after} operation.
           *
           * @param after the operation to perform after this operation
           * @return a composed configurator that performs in sequence this
           * operation followed by the {@code after} operation
           * @throws NullPointerException if {@code after} is null
           */
          default SocketConfigurator andThen(SocketConfigurator after) {
              Objects.requireNonNull(after);
              return t -> {
                  configure(t);
                  after.configure(t);
              };
          }
      }
      
      
  • 总结

    1. 使用@FunctionalInterface来标识这个函数接口,且函数接口只能自定义一个处理接口,这里就configure方法
    2. andThen组合函数方法使用
  • 研究一下他们是怎么用的,我们是否可以模仿学习一波

    • SocketConfigurator有一个配套SocketConfigurators抽象类,里面有Builder构造器设计模式

    • 看一下使用例子,链式编程,看起来很舒服,这个是我们模仿典范实例代码

      •             this.socketConf = SocketConfigurators.builder().defaultConfigurator().enableHostnameVerification().build();
        
        
    • 看一下定义SocketConfigurator函数实例,就是设置一个属性不延迟通信

      •     public static final SocketConfigurator DISABLE_NAGLE_ALGORITHM = socket -> socket.setTcpNoDelay(true);
        
        
      • 这样做不需要定义一个方法,一行搞定,而且还可以利用函数的andThen,组合函数

    • 写一个例子,支持支付方式(现在有现金、优惠券,微信支付)

      • 看一下核心类,其实和SocketConfigurators基本类似

      • package com.jack.function.arithmetic;
        
        
        /**
         * 是否支持一些支付方式
         * @author liangchen
         * @date 2021/6/4
         */
        public abstract class PayConfigs {
        
            // 使用优惠券
            public static final PayConfig USE_COUPON = pay ->  pay.setUseCoupon(true);
        
            // 使用现金
            public static final PayConfig USE_CASH = pay -> pay.setCash(true);
        
            // 使用微信
            public static final PayConfig USE_WEIXIN = pay -> pay.setWeixin(true);
        
        
        
            public static Builder  builder(){
               return new Builder();
            }
        
            public static class Builder {
        
                // 定义空
                private PayConfig configurator = pay -> {
        
                };
        
        
                public Builder useCoupon(){
                    configurator = configurator.andThen(USE_COUPON);
                    return this;
                }
        
        
                public Builder useCash(){
                    configurator = configurator.andThen(USE_CASH);
                    return this;
                }
        
                public Builder useWeixin(){
                    configurator = configurator.andThen(USE_WEIXIN);
                    return this;
                }
        
                public PayConfig build() {
                    return configurator;
                }
            }
        
        }
        
        
    • 使用的main方法

      • /**
         * @author liangchen
         * @date 2021/6/4
         */
        public class TestFunction {
            public static void main(String[] args) {
                // 具体我们定义支持cash,coupon支付
                PayConfig build = PayConfigs.builder().useCash().useCoupon().build();
                SupportPayMethod supportPayMethod = new SupportPayMethod();
                build.execute(supportPayMethod);
                System.out.println(supportPayMethod);
            }
        }
        
    • 输出结果

      • SupportPayMethod{useCoupon=true, cash=true, weixin=false}
        
    • 详情代码参考github示例代码, 小伙伴你们尝试在工作使用这种方式。

###3.1.1、socketConf学习总结

  • 直接调用方法不是很好吗?为啥使用函数编程? 函数比较简洁,自定义非常灵活,可读性比较强,职责单一且逻辑是内聚的。

结尾

  • 感谢大家的耐心阅读,如有建议请私信或评论留言。
  • 如有收获,劳烦支持,关注、点赞、评论、收藏均可,博主会经常更新,与大家共同进步
  • 下一篇就是分析rabbitmq客户端ConnectionFactory使用ThreadFactory相关源码


所属网站分类: 技术文章 > 博客

作者:java王侯

链接:http://www.javaheidong.com/blog/article/219478/6f974e320df41f331d99/

来源:java黑洞网

任何形式的转载都请注明出处,如有侵权 一经发现 必将追究其法律责任

17 0
收藏该文
已收藏

评论内容:(最多支持255个字符)