redis 源码分析:Jedis 哨兵模式连接原理

redis,源码,分析,jedis,哨兵,模式,连接,原理 · 浏览次数 : 7

小编点评

```java // ...省略其他代码 ... // 获取master节点地址 HostAndPort masterAddr = initSentinels(sentinels, masterName); // 初始化 Jedis 客户端 Jedis jedis = null; try { jedis = new Jedis(jedisSocketFactory, clientConfig); } catch (JedisException je) { logger.debug("Error while initializing Jedis client.", je); throw je; } // ...省略其他代码 ... // 设置host和port jedis.setHostAndPort(masterAddr.getHost(), masterAddr.getPort()); // ...省略其他代码 ... // 建立jedis 连接 Connection connection = new Connection(jedisSocketFactory, clientConfig); // ...省略其他代码 ... // 关闭jedis 连接 try { if (connection != null) { connection.close(); } } catch (Exception e) { // ...处理异常 ... } ``` 这个代码首先获取master节点地址,然后初始化 Jedis 客户端。最后,设置host和port,建立jedis 连接。

正文

1. 可以从单元测试开始入手

查看类JedisSentinelPool
  private static final String MASTER_NAME = "mymaster";

  protected static final HostAndPort sentinel1 = HostAndPorts.getSentinelServers().get(1);
  protected static final HostAndPort sentinel2 = HostAndPorts.getSentinelServers().get(3);
  
  @Before
  public void setUp() throws Exception {
    sentinels.clear();

    sentinels.add(sentinel1.toString());
    sentinels.add(sentinel2.toString());
  }

  @Test
  public void repeatedSentinelPoolInitialization() {

    for (int i = 0; i < 20; ++i) {
      GenericObjectPoolConfig<Jedis> config = new GenericObjectPoolConfig<>();

      JedisSentinelPool pool = new JedisSentinelPool(MASTER_NAME, sentinels, config, 1000,
          "foobared", 2);
      pool.getResource().close();
      pool.destroy();
    }
  }

可以看到首先是创建了sentinel 的HostAndPort 对象,然后创建了连接池

2. 查看 JedisSentinelPool 构造器,正式进入源码

  public JedisSentinelPool(String masterName, Set<String> sentinels,
      final GenericObjectPoolConfig<Jedis> poolConfig, int timeout, final String password,
      final int database) {
    this(masterName, sentinels, poolConfig, timeout, timeout, null, password, database);
  }
  
  ...
    public JedisSentinelPool(String masterName, Set<String> sentinels,
      final GenericObjectPoolConfig<Jedis> poolConfig,
      final int connectionTimeout, final int soTimeout, final int infiniteSoTimeout,
      final String user, final String password, final int database, final String clientName,
      final int sentinelConnectionTimeout, final int sentinelSoTimeout, final String sentinelUser,
      final String sentinelPassword, final String sentinelClientName) {
    this(masterName, parseHostAndPorts(sentinels), poolConfig,
        DefaultJedisClientConfig.builder().connectionTimeoutMillis(connectionTimeout)
            .socketTimeoutMillis(soTimeout).blockingSocketTimeoutMillis(infiniteSoTimeout)
            .user(user).password(password).database(database).clientName(clientName).build(),
        DefaultJedisClientConfig.builder().connectionTimeoutMillis(sentinelConnectionTimeout)
            .socketTimeoutMillis(sentinelSoTimeout).user(sentinelUser).password(sentinelPassword)
            .clientName(sentinelClientName).build()
    );
  }
  
  ...
    public JedisSentinelPool(String masterName, Set<HostAndPort> sentinels,
      final GenericObjectPoolConfig<Jedis> poolConfig, final JedisClientConfig masterClientConfig,
      final JedisClientConfig sentinelClientConfig) {
    this(masterName, sentinels, poolConfig, new JedisFactory(masterClientConfig), sentinelClientConfig);
  }
  
    public JedisSentinelPool(String masterName, Set<HostAndPort> sentinels,
      final GenericObjectPoolConfig<Jedis> poolConfig, final JedisFactory factory,
      final JedisClientConfig sentinelClientConfig) {
    super(poolConfig, factory);

    this.factory = factory;
    this.sentinelClientConfig = sentinelClientConfig;

    HostAndPort master = initSentinels(sentinels, masterName);
    initMaster(master);
  }

这里执行了两个重要方法
initSentinelsinitMaster

1. initSentinels 负责初始化sentinel ,并获得master的地址
2. 有了master地址,就可以 initMaster 了
  private HostAndPort initSentinels(Set<HostAndPort> sentinels, final String masterName) {

    HostAndPort master = null;
    boolean sentinelAvailable = false;

    LOG.info("Trying to find master from available Sentinels...");

    for (HostAndPort sentinel : sentinels) {

      LOG.debug("Connecting to Sentinel {}", sentinel);
      //连接sentinel 节点
      try (Jedis jedis = new Jedis(sentinel, sentinelClientConfig)) {

        // 向sentinel发送命令  sentinel get-master-addr-by-name mymaster
        List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName);

        // connected to sentinel...
        sentinelAvailable = true;

        if (masterAddr == null || masterAddr.size() != 2) {
          LOG.warn("Can not get master addr, master name: {}. Sentinel: {}", masterName, sentinel);
          continue;
        }

        master = toHostAndPort(masterAddr);
        LOG.debug("Found Redis master at {}", master);
        break;
      } catch (JedisException e) {
        // resolves #1036, it should handle JedisException there's another chance
        // of raising JedisDataException
        LOG.warn(
          "Cannot get master address from sentinel running @ {}. Reason: {}. Trying next one.", sentinel, e);
      }
    }

    if (master == null) {
      if (sentinelAvailable) {
        // can connect to sentinel, but master name seems to not monitored
        throw new JedisException("Can connect to sentinel, but " + masterName
            + " seems to be not monitored...");
      } else {
        throw new JedisConnectionException("All sentinels down, cannot determine where is "
            + masterName + " master is running...");
      }
    }

    LOG.info("Redis master running at {}, starting Sentinel listeners...", master);

    for (HostAndPort sentinel : sentinels) {

      MasterListener masterListener = new MasterListener(masterName, sentinel.getHost(), sentinel.getPort());
      // whether MasterListener threads are alive or not, process can be stopped
      masterListener.setDaemon(true);
      masterListeners.add(masterListener);
      masterListener.start();
    }

    return master;
  }
这里最终要的一步就是jedis.sentinelGetMasterAddrByName(masterName);,即向sentinel发送命令

sentinel get-master-addr-by-name mymaster, 用来获取master节点的地址,并将地址返回

然后initMaster(master);
  private void initMaster(HostAndPort master) {
    synchronized (initPoolLock) {
      if (!master.equals(currentHostMaster)) {
        currentHostMaster = master;
        // 这里是容易忽略但非常关键的一步
        factory.setHostAndPort(currentHostMaster);
        // although we clear the pool, we still have to check the returned object in getResource,
        // this call only clears idle instances, not borrowed instances
        super.clear();

        LOG.info("Created JedisSentinelPool to master at {}", master);
      }
    }
  }
在这里对factory的连接地址进行了设置(在之前这里还是空值)

它是由 构造方法中的 new JedisFactory(masterClientConfig) 构造出来,在单元测试中我们得知masterClientConfig里面的属性都是空值

到这里 sentinelpool 就构造完毕了,其实这里还没有初始化出一个连接到master节点的实例,我们继续往后看

3. 单元测试中下一步 getResouce()

  @Override
  public Jedis getResource() {
    while (true) {
      // 关键一步
      Jedis jedis = super.getResource();
      // 这里没啥大用,容易误导
      jedis.setDataSource(this);

      // get a reference because it can change concurrently
      final HostAndPort master = currentHostMaster;
      final HostAndPort connection = jedis.getClient().getHostAndPort();

      if (master.equals(connection)) {
        // connected to the correct master
        return jedis;
      } else {
        returnBrokenResource(jedis);
      }
    }
  }
这里调用 super.getResource(), 父类是Pool, 而Pool 的对象一般是由Factory 构建出来
  public JedisSentinelPool(String masterName, Set<HostAndPort> sentinels,
      final GenericObjectPoolConfig<Jedis> poolConfig, final JedisClientConfig masterClientConfig,
      final JedisClientConfig sentinelClientConfig) {
    this(masterName, sentinels, poolConfig, new JedisFactory(masterClientConfig), sentinelClientConfig);
  }

  public JedisSentinelPool(String masterName, Set<HostAndPort> sentinels,
      final JedisFactory factory, final JedisClientConfig sentinelClientConfig) {
    super(factory);

    this.factory = factory;
    this.sentinelClientConfig = sentinelClientConfig;

    HostAndPort master = initSentinels(sentinels, masterName);
    initMaster(master);
  }

由此可知 factory 是 new JedisFactory(masterClientConfig), 并且由 父类子类都引用到,并且在 initMaster 方法中调用factory.setHostAndPort(currentHostMaster); 更新了master的地址。

而Pool extends GenericObjectPool , 这里GenericObjectPool 来自包 org.apache.commons.pool2
  public T getResource() {
    try {
      return super.borrowObject();
    } catch (JedisException je) {
      throw je;
    } catch (Exception e) {
      throw new JedisException("Could not get a resource from the pool", e);
    }
  }

这里borrowObject 时,实际是调用工厂的方法干活,直接看工厂类JedisFactory

  @Override
  public PooledObject<Jedis> makeObject() throws Exception {
    Jedis jedis = null;
    try {
      jedis = new Jedis(jedisSocketFactory, clientConfig);
      return new DefaultPooledObject<>(jedis);
    } catch (JedisException je) {
      logger.debug("Error while makeObject", je);
      throw je;
    }
  }
在这里会构建出Jedis对象 ,注意这里的jedisSocketFactory对象,实在构造方法中构造出
  protected JedisFactory(final URI uri, final int connectionTimeout, final int soTimeout,
      final int infiniteSoTimeout, final String clientName, final SSLSocketFactory sslSocketFactory,
      final SSLParameters sslParameters, final HostnameVerifier hostnameVerifier) {
    if (!JedisURIHelper.isValid(uri)) {
      throw new InvalidURIException(String.format(
          "Cannot open Redis connection due invalid URI. %s", uri.toString()));
    }
    this.clientConfig = DefaultJedisClientConfig.builder().connectionTimeoutMillis(connectionTimeout)
        .socketTimeoutMillis(soTimeout).blockingSocketTimeoutMillis(infiniteSoTimeout)
        .user(JedisURIHelper.getUser(uri)).password(JedisURIHelper.getPassword(uri))
        .database(JedisURIHelper.getDBIndex(uri)).clientName(clientName)
        .protocol(JedisURIHelper.getRedisProtocol(uri))
        .ssl(JedisURIHelper.isRedisSSLScheme(uri)).sslSocketFactory(sslSocketFactory)
        .sslParameters(sslParameters).hostnameVerifier(hostnameVerifier).build();
    this.jedisSocketFactory = new DefaultJedisSocketFactory(new HostAndPort(uri.getHost(), uri.getPort()), this.clientConfig);
  }

  void setHostAndPort(final HostAndPort hostAndPort) {
    if (!(jedisSocketFactory instanceof DefaultJedisSocketFactory)) {
      throw new IllegalStateException("setHostAndPort method has limited capability.");
    }
    ((DefaultJedisSocketFactory) jedisSocketFactory).updateHostAndPort(hostAndPort);
  }

4. 再看Jedis


  public Jedis(final JedisSocketFactory jedisSocketFactory, final JedisClientConfig clientConfig) {
    connection = new Connection(jedisSocketFactory, clientConfig);
    RedisProtocol proto = clientConfig.getRedisProtocol();
    if (proto != null) commandObjects.setProtocol(proto);
  }

这里直接构造出Connectrion 对象, 并传入socketFactory

  public Connection(final JedisSocketFactory socketFactory, JedisClientConfig clientConfig) {
    this.socketFactory = socketFactory;
    this.soTimeout = clientConfig.getSocketTimeoutMillis();
    this.infiniteSoTimeout = clientConfig.getBlockingSocketTimeoutMillis();
    initializeFromClientConfig(clientConfig);
  }

在这个构造方法中执行关键方法initializeFromClientConfig

private void initializeFromClientConfig(final JedisClientConfig config) {
    try {
      connect();
	......
      
  }

connect()

  public void connect() throws JedisConnectionException {
    if (!isConnected()) {
      try {
        socket = socketFactory.createSocket();
        soTimeout = socket.getSoTimeout(); //?

        outputStream = new RedisOutputStream(socket.getOutputStream());
        inputStream = new RedisInputStream(socket.getInputStream());

        broken = false; // unset broken status when connection is (re)initialized

      } catch (JedisConnectionException jce) {

        setBroken();
        throw jce;

      } catch (IOException ioe) {

        setBroken();
        throw new JedisConnectionException("Failed to create input/output stream", ioe);

      } finally {

        if (broken) {
          IOUtils.closeQuietly(socket);
        }
      }
    }
  }

最终通过 soeckFactory 构建出socket,完成对redis 的master节点的连接

与redis 源码分析:Jedis 哨兵模式连接原理相似的内容:

redis 源码分析:Jedis 哨兵模式连接原理

1. 可以从单元测试开始入手 查看类JedisSentinelPool private static final String MASTER_NAME = "mymaster"; protected static final HostAndPort sentinel1 = HostAndPorts.

记一次Redis Cluster Pipeline导致的死锁问题

本文介绍了一次排查Dubbo线程池耗尽问题的过程。通过查看Dubbo线程状态、分析Jedis连接池获取连接的源码、排查死锁条件等方面,最终确认是因为使用了cluster pipeline模式且没有设置超时时间导致死锁问题。

[转帖]Redis 4.0 自动内存碎片整理(Active Defrag)源码分析

阅读本文前建议先阅读此篇博客: Redis源码从哪里读起 Redis 4.0 版本增加了许多不错的新功能,其中自动内存碎片整理功能 activedefrag 肯定是非常诱人的一个,这让 Redis 集群回收内存碎片相比 Redis 3.0 更加优雅,便利。我们升级 Redis 4.0 后直接开启了a

redis7源码分析:redis 启动流程

1. redis 由 server.c 的main函数启动 int main(int argc, char **argv) { ... // 上面的部分为读取配置和启动命令参数解析,看到这一行下面为启动流程 serverLog(LL_WARNING, "oO0OoO0OoO0Oo Redis is

redis7源码分析:redis 单线程模型解析,一条get命令执行流程

有了下文的梳理后 redis 启动流程 再来解析redis 在单线程模式下解析并处理客户端发来的命令 1. 当 client fd 可读时,会回调readQueryFromClient函数 void readQueryFromClient(connection *conn) { client *c

redis7源码分析:redis 多线程模型解析

多线程模式中,在main函数中会执行InitServerLast void InitServerLast() { bioInit(); // 关键一步, 这里启动了多条线程,用于执行命令,redis起名为IO 线程 initThreadedIO(); set_jemalloc_bg_thread(s

Redisson 限流器源码分析

Redisson 限流器源码分析 对上篇文章网友评论给出问题进行解答:redis 的key 是否会过期,过期指的限流器 可以先阅读上篇文章: redis + AOP + 自定义注解实现接口限流 - 古渡蓝按 - 博客园 (cnblogs.com) 注解AOP 代码部分提取 // 调用Reids工具类

Redisson 限流器源码分析

Redisson 限流器源码分析 对上篇文章网友评论给出问题进行解答:redis 的key 是否会过期 可以先阅读上篇文章: redis + AOP + 自定义注解实现接口限流 - 古渡蓝按 - 博客园 (cnblogs.com) 注解AOP 代码部分提取 // 调用Reids工具类的rateLim

跳跃表数据结构与算法分析

目前市面上充斥着大量关于跳跃表结构与Redis的源码解析,但是经过长期观察后发现大都只是在停留在代码的表面,而没有系统性地介绍跳跃表的由来以及各种常量的由来。作为一种概率数据结构,理解各种常量的由来可以更好地进行变化并应用到高性能功能开发中。本文没有重复地以对现有优秀实现进行代码分析,而是通过对跳跃表进行了系统性地介绍与形式化分析,并给出了在特定场景下的跳跃表扩展方式,方便读者更好地理解跳跃表数据

[转帖]Redis进阶实践之十三 Redis的Redis-trib.rb脚本文件使用详解

https://www.cnblogs.com/PatrickLiu/p/8484784.html 一、简介 事先说明一下,本篇文章不涉及对redis-trib.rb源代码的分析,只是从使用的角度来阐述一下,对第一次使用的人来说很重要。redis-trib.rb是redis官方推出的管理redis集