Pulsar负载均衡原理及优化

pulsar,负载,均衡,原理,优化 · 浏览次数 : 226

小编点评

## Pulsar 负载均衡问题分析 **问题原因:** * 在升级 Pulsar 版本后,最后一个节点始终没有流量,导致负载不均衡。 * 升级前的每个 broker 的负载值都是 10,重启 broker2 时,它所绑定的 bundle 被 broker0/1 接管,导致broker0一直没有流量。 * 由于原有逻辑走完之后也没有获取需要需要卸载的 bundle,同时也存在一个负载极低的 broker 时(emptyBundle),再触发一次 bundle 查询,导致多个节点都尝试访问同一个 broker。 **优化方案:** 1. **筛选负载最高的节点和 bundle:** 根据 broker 所绑定的数量排序,选择一个数量最多的 broker 的 第一个 bundle 进行卸载。 2. **人工二次确认:** 当升级后出现负载不均衡时人工触发一个逻辑:系统根据各个节点的负载情况计算出一个负载最高的节点和 bundle 在页面上展示,人工二次确认是否要卸载,确认无误后进行卸载。 **优化后的效果:** * 解决了最后一个节点始终没有流量的问题。 * 避免了多个节点尝试访问同一个 broker 的问题。 * 提高了系统的性能。 **总结:** 该问题是 Pulsar 负载均衡优化的重要问题,通过筛选负载最高的节点和 bundle 进行卸载,以及人工二次确认,可以解决问题并提高系统的性能。

正文

前言

前段时间我们在升级 Pulsar 版本的时候发现升级后最后一个节点始终没有流量。

虽然对业务使用没有任何影响,但负载不均会导致资源的浪费。

和同事沟通后得知之前的升级也会出现这样的情况,最终还是人工调用 Pulsar 的 admin API 完成的负载均衡。

这个问题我尝试在 Google 和 Pulsar 社区都没有找到类似的,不知道是大家都没碰到还是很少升级集群。

我之前所在的公司就是一个版本走到黑😂

Pulsar 负载均衡原理

当一个集群可以水平扩展后负载均衡就显得非常重要,根本目的是为了让每个提供服务的节点都能均匀的处理请求,不然扩容就没有意义了。

在分析这个问题的原因之前我们先看看 Pulsar 负载均衡的实现方案。

# Enable load balancer
loadBalancerEnabled=true

我们可以通过这个 broker 的这个配置来控制负载均衡器的开关,默认是打开。

但具体使用哪个实现类来作为负载均衡器也可以在配置文件中指定:

# Name of load manager to use
loadManagerClassName=org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl

默认使用的是 ModularLoadManagerImpl

    static LoadManager create(final PulsarService pulsar) {
        try {
            final ServiceConfiguration conf = pulsar.getConfiguration();
            // Assume there is a constructor with one argument of PulsarService.
            final Object loadManagerInstance = Reflections.createInstance(conf.getLoadManagerClassName(),
                    Thread.currentThread().getContextClassLoader());
            if (loadManagerInstance instanceof LoadManager) {
                final LoadManager casted = (LoadManager) loadManagerInstance;
                casted.initialize(pulsar);
                return casted;
            } else if (loadManagerInstance instanceof ModularLoadManager) {
                final LoadManager casted = new ModularLoadManagerWrapper((ModularLoadManager) loadManagerInstance);
                casted.initialize(pulsar);
                return casted;
            }
        } catch (Exception e) {
            LOG.warn("Error when trying to create load manager: ", e);
        }
        // If we failed to create a load manager, default to SimpleLoadManagerImpl.
        return new SimpleLoadManagerImpl(pulsar);
    }

broker 启动时会从配置文件中读取配置进行加载,如果加载失败会使用 SimpleLoadManagerImpl 作为兜底策略。

当 broker 是一个集群时,只有 leader 节点的 broker 才会执行负载均衡器的逻辑。

Leader 选举是通过 Zookeeper 实现的。

默然情况下成为 Leader 节点的 broker 会每分钟读取各个 broker 的数据来判断是否有节点负载过高需要做重平衡。

而是否重平衡的判断依据是由 org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy 接口提供的,它其实只有一个函数:

public interface LoadSheddingStrategy {

    /**
     * Recommend that all of the returned bundles be unloaded.
     * @return A map from all selected bundles to the brokers on which they reside.
     */
    Multimap<String, String> findBundlesForUnloading(LoadData loadData, ServiceConfiguration conf);
}

根据所有 broker 的负载信息计算出一个需要被 unload 的 broker 以及 bundle。

这里解释下 unload 和 bundle 的概念:

  • bundle 是一批 topic 的抽象,将 bundlebroker 进行关联后客户端才能知道应当连接哪个 broker;而不是直接将 topic 与 broker 绑定,这样才能实现海量 topic 的管理。
  • unload 则是将已经与 broker 绑定的 bundle 手动解绑,从而触发负载均衡器选择一台合适的 broker 重新进行绑定;通常是整个集群负载不均的时候触发。

ThresholdShedder 原理

LoadSheddingStrategy 接口目前有三个实现,这里以官方默认的 ThresholdShedder 为例:

它的实现算法是根据带宽、内存、流量等各个指标的权重算出每个节点的负载值,之后为整个集群计算出一个平均负载值。

# 阈值
loadBalancerBrokerThresholdShedderPercentage=10

当集群中有某个节点的负载值超过平均负载值达到一定程度(可配置的阈值)时,就会触发 unload,以上图为例就会将最左边节点中红色部分的 bundle 卸载掉,然后再重新计算一个合适的 broker 进行绑定。

阈值存在的目的是为了避免频繁的 unload,从而影响客户端的连接。

问题原因

当某些 topic 的流量突然爆增的时候这种负载策略确实可以处理的很好,但在我们集群升级的情况就不一定了。

假设我这里有三个节点:

  • broker0
  • broker1
  • broker2


集群升级时会从 broker2->0 进行镜像替换重启,假设在升级前每个 broker 的负载值都是 10。

  • 重启 broker2 时,它所绑定的 bundle 被 broker0/1 接管。
  • 升级 broker1 时,它所绑定的 bundle 又被 broker0/2 接管。
  • 最后升级 broker0, 它所绑定的 bundle 会被broker1/2 接管。

只要在这之后没有发生流量激增到触发负载的阈值,那么当前的负载情况就会一直保留下去,也就是 broker0 会一直没有流量。

经过我反复测试,现象也确实如此。

./pulsar-perf monitor-brokers --connect-string pulsar-test-zookeeper:2181


通过这个工具也可以查看各个节点的负载情况

优化方案

这种场景是当前 ThresholdShedder 所没有考虑到的,于是我在我们所使用的版本 2.10.3 的基础上做了简单的优化:

  • 当原有逻辑走完之后也没有获取需要需要卸载的 bundle,同时也存在一个负载极低的 broker 时(emptyBundle),再触发一次 bundle 查询。
  • 按照 broker 所绑定的数量排序,选择一个数量最多的 broker 的 第一个 bundle 进行卸载。

修改后打包发布,再走一遍升级流程后整个集群负载就是均衡的了。

但其实这个方案并不严谨,第二步选择的重点是筛选出负载最高的集群中负载最高的 bundle;这里只是简单的根据数量来判断,并不够准确。

正当我准备持续优化时,鬼使神差的我想看看 master 上有人修复这个问题没,结果一看还真有人修复了;只是还没正式发版。

https://github.com/apache/pulsar/pull/17456

整体思路是类似的,只是筛选负载需要卸载 bundle 时是根据 bundle 自身的流量来的,这样会更加精准。

总结

不过看社区的进度等这个优化最终能用还不知道得多久,于是我们就自己参考这个思路在管理台做了类似的功能,当升级后出现负载不均衡时人工触发一个逻辑:

  • 系统根据各个节点的负载情况计算出一个负载最高的节点和 bundle 在页面上展示。
  • 人工二次确认是否要卸载,确认无误后进行卸载。

本质上只是将上述优化的自动负载流程改为人工处理了,经过测试效果是一样的。

Pulsar 整个项目其实非常庞大,有着几十上百个模块,哪怕每次我只改动一行代码准备发布测试时都得经过漫长的编译+ Docker镜像打包+上传私服这些流程,通常需要1~2个小时;但总的来说收获还是很大的,最近也在提一些 issue 和 PR,希望后面能更深入的参与进社区。

与Pulsar负载均衡原理及优化相似的内容:

Pulsar负载均衡原理及优化

前言 前段时间我们在升级 Pulsar 版本的时候发现升级后最后一个节点始终没有流量。 虽然对业务使用没有任何影响,但负载不均会导致资源的浪费。 和同事沟通后得知之前的升级也会出现这样的情况,最终还是人工调用 Pulsar 的 admin API 完成的负载均衡。 这个问题我尝试在 Google 和

载均衡技术全解析:Pulsar 分布式系统的最佳实践

背景 Pulsar 有提供一个查询 Broker 负载的接口: /** * Get load for this broker. * * @return * @throws PulsarAdminException */ LoadManagerReport getLoadReport() throws

深入剖析:如何使用Pulsar和Arthas高效排查消息队列延迟问题

背景 前两天收到业务反馈有一个 topic 的分区消息堆积了: 根据之前的经验来看,要么是业务消费逻辑出现问题导致消费过慢,当然也有小概率是消息队列的 Bug(我们使用的是 pulsar)。 排查 通过排查,发现确实是在一点多的时候消息堆积了(后面是修复之后堆积开始下降)。 于是我在刚才堆积处查看了

对 Pulsar 集群的压测与优化

前言 这段时间在做 MQ(Pulsar)相关的治理工作,其中一个部分内容关于消息队列的升级,比如: 一键创建一个测试集群。 运行一批测试用例,覆盖我们线上使用到的功能,并输出测试报告。 模拟压测,输出测试结果。 本质目的就是想直到新版本升级过程中和升级后对现有业务是否存在影响。 一键创建集群和执行测

从 Pulsar Client 的原理到它的监控面板

![image.png](https://s2.loli.net/2023/08/02/GipDPSlbycQxqFd.png) # 背景 前段时间业务团队偶尔会碰到一些 Pulsar 使用的问题,比如消息阻塞不消费了、生产者消息发送缓慢等各种问题。 虽然我们有个监控页面可以根据 topic 维度查

使用 SQL 的方式查询消息队列数据以及踩坑指南

![Pulsar-sql.png](https://s2.loli.net/2023/08/30/3iz9yqfuSCn18xk.png) # 背景 为了让业务团队可以更好的跟踪自己消息的生产和消费状态,需要一个类似于表格视图的消息列表,用户可以直观的看到发送的消息;同时点击详情后也能查到消息的整个

通过 Pulsar 源码彻底解决重复消费问题

背景 最近真是和 Pulsar 杠上了,业务团队反馈说是线上有个应用消息重复消费。 而且在测试环境是可以稳定复现的,根据经验来看一般能稳定复现的都比较好解决。 定位问题 接着便是定位问题了,根据之前的经验让业务按照这几种情况先排查一下: 通过排查:1,2可以排除了。 没有相关日志 存在异常,但最外层

我是如何从零到成为 Apache 顶级项目的 Committer

最近收到了 Apache Pulsar 和 Apache HertzBeat社区的邀请邮件,成为了这两个项目的 Committer。 一路走来我从最开始的打游击战的闲散人员到如今活跃在各个开源项目里的“老兵”,用现在流行的话来说 Apache 的这两个 Committer 就相当于是拿到了编制,进入

[转帖]对比测试:Apache Pulsar 与 Kafka 在金融场景下的性能分析

https://baijiahao.baidu.com/s?id=1680081990582501220&wfr=spider&for=pc Apache Pulsar 是下一代分布式消息流平台,采用计算存储分层架构,具备多租户、高一致、高性能、百万 topic、数据平滑迁移等诸多优势。越来越多的企

一个诡异的 Pulsar InterruptedException 异常

背景 今天收到业务团队反馈线上有个应用往 Pulsar 中发送消息失败了,经过日志查看得知是发送消息时候抛出了 java.lang.InterruptedException 异常。 和业务沟通后得知是在一个 gRPC 接口中触发的消息发送,大约持续了半个小时的异常后便恢复正常了,这是整个问题的背景。