一个诡异的 Pulsar InterruptedException 异常

一个,诡异,pulsar,interruptedexception,异常 · 浏览次数 : 876

小编点评

**问题的原因** * Pulsar 的发送消息线程被中断,因为没有使用该线程进行判断。 * 线程中断是需要调用 Thread.currentThread().interrupt() API的,但是业务代码中没有使用该 API。 * 线程中断导致了 Semaphore 源码中判断的线程中断标记被设置,导致 Pulsar 内部判断了,最终抛出了 InterruptedException 异常。 **解决方案** * 在业务代码中使用 Thread.currentThread().interrupt() API进行判断。 * 避免在业务代码中使用 Thread.currentThread().interrupt() API。 * 使用内置的 BlockQueue 或其他方法来实现线程间通信。

正文

背景


今天收到业务团队反馈线上有个应用往 Pulsar 中发送消息失败了,经过日志查看得知是发送消息时候抛出了 java.lang.InterruptedException 异常。

和业务沟通后得知是在一个 gRPC 接口中触发的消息发送,大约持续了半个小时的异常后便恢复正常了,这是整个问题的背景。

前置排查

拿到该问题后首先排查下是否是共性问题,查看了其他的应用没有发现类似的异常;同时也查看了 Pulsar broker 的监控大盘,在这个时间段依然没有波动和异常;

这样可以初步排除是 Pulsar 服务端的问题。

接着便是查看应用那段时间的负载情况,从应用 QPS 到 JVM 的各个内存情况依然没发现有什么明显的变化。

Pulsar 源码排查

既然看起来应用本身和 Pulsar broker 都没有问题的话那就只能从异常本身来排查了。

首先第一步要得知具体使用的是 Pulsar-client 是版本是多少,因为业务使用的是内部基于官方 SDK 封装 springboot starter 所以第一步还得排查这个 starter 是否有影响。

通过查看源码基本排除了 starter 的嫌疑,里面只是简单的封装了 SDK 的功能而已。

org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: java.lang.InterruptedException at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1027) at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:91) at 
java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: java.lang.InterruptedException at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) 
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) 
at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:89) ... 49 common frames omitted Caused by: org.apache.pulsar.client.api.PulsarClientException: java.lang.InterruptedException 
at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:775) 
at org.apache.pulsar.client.impl.ProducerImpl.sendAsync$original$BWm7PPlZ(ProducerImpl.java:393) 
at org.apache.pulsar.client.impl.ProducerImpl.sendAsync$original$BWm7PPlZ$accessor$i7NYMN6i(ProducerImpl.java) 
at org.apache.pulsar.client.impl.ProducerImpl$auxiliary$EfuVvJLT.call(Unknown Source) 
at org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstMethodsInter.intercept(InstMethodsInter.java:86) 
at org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java) 
at org.apache.pulsar.client.impl.ProducerImpl.internalSendAsync(ProducerImpl.java:292) 
at org.apache.pulsar.client.impl.ProducerImpl.internalSendWithTxnAsync(ProducerImpl.java:363) 
at org.apache.pulsar.client.impl.PartitionedProducerImpl.internalSendWithTxnAsync(PartitionedProducerImpl.java:191) 
at org.apache.pulsar.client.impl.PartitionedProducerImpl.internalSendAsync(PartitionedProducerImpl.java:167) 
at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.sendAsync(TypedMessageBuilderImpl.java:103) 
at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:82) ... 49 common frames omitted Caused by: java.lang.InterruptedException: null
at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1343) 
at java.base/java.util.concurrent.Semaphore.acquire(Semaphore.java:318) 
at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:758)

接下来便只能是分析堆栈了,因为 Pulsar-client 的部分实现源码是没有直接打包到依赖中的,反编译的话许多代码行数对不上,所以需要将官方的源码拉到本地,切换到对于的分支进行查看。

这一步稍微有点麻烦,首先是代码库还挺大的,加上之前如果没有准备好 Pulsar 的开发环境的话估计会劝退一部分人;但其实大部分问题都是网络造成的,只要配置一些 Maven 镜像多试几次总会编译成功。

我这里直接将分支切换到 branch-2.8

从堆栈的顶部开始排查 TypedMessageBuilderImpl.java:91

看起来是内部异步发送消息的时候抛了异常。

接着往下看到这里:

java.lang.InterruptedException 
at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:775) at


看起来是这里没错,但是代码行数明显不对;因为 2.8 这个分支也是修复过几个版本,所以中间有修改导致代码行数与最新代码对不上也正常。

semaphore.get().acquire();

不过初步来看应该是这行代码抛出的线程终端异常,这里看起来只有他最有可能了。


为了确认是否是真的是这行代码,这个文件再往前翻了几个版本最终确认了就是这行代码没错了。

我们点开java.util.concurrent.Semaphore#acquire()的源码,

    /**
     * <li>has its interrupted status set on entry to this method; or
     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
     * for a permit,
     * </ul>
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared.
     *
     * @throws InterruptedException if the current thread is interrupted
     */
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    
    public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
        if (Thread.interrupted() ||
            (tryAcquireShared(arg) < 0 &&
             acquire(null, arg, true, true, false, 0L) < 0))
            throw new InterruptedException();
    }    

通过源码会发现 acquire() 函数确实会响应中断,一旦检测到当前线程被中断后便会抛出 InterruptedException 异常。

定位问题

所以问题的原因基本确定了,就是在 Pulsar 的发送消息线程被中断了导致的,但为啥会被中断还需要继续排查。

我们知道线程中断是需要调用 Thread.currentThread().interrupt(); API的,首先猜测是否 Pulsar 客户端内部有个线程中断了这个发送线程。

于是我在 pulsar-client 这个模块中搜索了相关代码:

排除掉和 producer 不相关的地方,其余所有中断线程的代码都是在有了该异常之后继续传递而已;所以初步来看 pulsar-client 内部没有主动中断的操作。

既然 Pulsar 自己没有做,那就只可能是业务做的了?

于是我在业务代码中搜索了一下:

果然在业务代码中搜到了唯一一处中断的地方,而且通过调用关系得知这段代码是在消息发送前执行的,并且和 Pulsar 发送函数处于同一线程。

大概的伪代码如下:

        List.of(1, 2, 3).stream().map(e -> {
                    return CompletableFuture.supplyAsync(() -> {
                        try {
                            TimeUnit.MILLISECONDS.sleep(10);
                        } catch (InterruptedException ex) {
                            throw new RuntimeException(ex);
                        }
                        return e;
                    });
                }
        ).collect(Collectors.toList()).forEach(f -> {
            try {
                Integer integer = f.get();
                log.info("====" + integer);
                if (integer==3){
                    TimeUnit.SECONDS.sleep(10);
                    Thread.currentThread().interrupt();
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (ExecutionException e) {
                throw new RuntimeException(e);
            }
        });
	   MessageId send = producer.newMessage().value(msg.getBytes()).send();

执行这段代码可以完全复现同样的堆栈。

幸好中断这里还打得有日志:


通过日志搜索发现异常的时间和这个中断的日志时间点完全重合,这样也就知道根本原因了。

因为业务线程和消息发送线程是同一个,在某些情况下会执行 Thread.currentThread().interrupt();,其实单纯执行这行函数并不会发生什么,只要没有去响应这个中断,也就是 Semaphore 源码中的判断了线程中断的标记:

    public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
        if (Thread.interrupted() ||
            (tryAcquireShared(arg) < 0 &&
             acquire(null, arg, true, true, false, 0L) < 0))
            throw new InterruptedException();
    }

但恰好这里业务中断后自己并没有去判断这个标记,导致 Pulsar 内部去判断了,最终抛出了这个异常。

总结

所以归根结底还是这里的代码不合理导致的,首先是自己中断了线程但也没使用,从而导致有被其他基础库使用的可能,所以会造成了一些不可预知的后果。

再一个是不建议在业务代码中使用 Thread.currentThread().interrupt(); 这类代码,第一眼根本不知道是要干啥,也不易维护。

其实本质上线程中断也是线程间通信的一种手段,有这类需求完全可以换为内置的 BlockQueue 这类函数来实现。

与一个诡异的 Pulsar InterruptedException 异常相似的内容:

一个诡异的 Pulsar InterruptedException 异常

背景 今天收到业务团队反馈线上有个应用往 Pulsar 中发送消息失败了,经过日志查看得知是发送消息时候抛出了 java.lang.InterruptedException 异常。 和业务沟通后得知是在一个 gRPC 接口中触发的消息发送,大约持续了半个小时的异常后便恢复正常了,这是整个问题的背景。

一个诡异的Python小问题

一个诡异的Python小问题 问题 来自一个基础偏弱的学员,但不知道如何去解释 >>> '' is not None :1: SyntaxWarning: "is not" with a literal. Did you mean "!="? True # 注意结果是会出现的,其实不影

一个难忘的json反序列化问题

前言 最近我在做知识星球中的商品秒杀系统,昨天遇到了一个诡异的json反序列化问题,感觉挺有意思的,现在拿出来跟大家一起分享一下,希望对你会有所帮助。 案发现场 我最近在做知识星球中的商品秒杀系统,写了一个filter,获取用户请求的header中获取JWT的token信息。 然后根据token信息

记一次难忘的json反序列化问题排查经历

前言 最近我在做知识星球中的商品秒杀系统,昨天遇到了一个诡异的json反序列化问题,感觉挺有意思的,现在拿出来跟大家一起分享一下,希望对你会有所帮助。 案发现场 我最近在做知识星球中的商品秒杀系统,写了一个filter,获取用户请求的header中获取JWT的token信息。 然后根据token信息

Harbor镜像仓库的导出与整理之二

Harbor镜像仓库的导出与整理之二 背景 前几天参照大神的blog进行了一下harbor的镜像列表的获取与下载. 当时发现一个很诡异的问题. 实际上镜像仓库里面的镜像很多. 但是导出和列表里面的却很少. 经过查询发现harbor的v2.0的API里面存在一些限制. 每次api的返回结果最多是 10

[转帖]NUMA导致的Oracle性能问题

https://www.cnblogs.com/realcp1018/p/6903721.html 背景简介: Oracle版本:11.2.0.4 OS 版本:OEL5.8 在一次Oracle的Dataguard正常switchover过程中,遇到了一个极其诡异的问题,一条主业务的SQL语句在新主库

记录荒废了三年的四年.net开发的第一次面试

对象 身在成都小微企业,前两天面试深圳老牌金蝶公司。对我这个荒废了三年光影的人来说,怎一个跨度之大了得?作为人我生第一次面试的,整个面试过程,只能用诡异来形容这次感受。而结尾也是迷迷糊糊中草草收场。 不是很好的开局 我我毕业就进了国企。毕业前,在我想象中,他是一个伟光正的形象。所以我抱着人生值得,未

[转帖]JVM内存非典型术语介绍(shallow/retained/rss/reserved/committed)

https://www.jianshu.com/p/871d6bb3a32d 背景 ​ 在服务器性能优化内存这一项时,有一些现象很诡异。如top显示的RES很大,但是实际jvm堆内存占用很小,同时使用nmt发现committed更大。所以决定写这篇wiki大概介绍一下 JVM中如何计算一个对象的实际

Maven依赖冲突解决总结

转载请注明出处: 1.Jar包冲突的通常表现 Jar包冲突往往是很诡异的事情,也很难排查,但也会有一些共性的表现。 抛出java.lang.ClassNotFoundException:典型异常,主要是依赖中没有该类。导致原因有两方面:第一,的确没有引入该类;第二,由于Jar包冲突,Maven仲裁机

服务器神秘挂起:一场惊心动魄的内核探案

2024年6月17日,我们的运维团队突然收到了一连串的告警。监控大屏上,代表着不同 Sealos 可用区的绿点中,零星地闪烁起了一两个红点。 “奇怪,怎么有几台服务器突然 hang 住了?” 值班的小辉皱起了眉头。 这次故障的诡异之处在于它的随机性。并非所有节点都受到影响,而是在不同可用区中,时不时