如何实现简单的分布式链路功能?

如何,实现,简单,分布式,链路,功能 · 浏览次数 : 12

小编点评

```java @Component public class TraceIdFeignInterceptor implements RequestInterceptor { @Override public void apply(RequestTemplate requestTemplate) { // spring的上下文对象 ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes(); if (requestAttributes != null) { // 前面的过滤器已经获取并设置了 traceId,这里就可以直接获取了 requestTemplate.header(TraceIdFilter.TRACE_ID, TraceIdHelper.getTraceId()); } } } // service消费者 @Component public class TraceIdServiceImpl { @Override public void setAttachment(String attachment, TraceId traceId) { // dubbo 提供的 setAttachment 方法,将 traceId 传递下去 Dubbo.invoke(attachment, traceId); } } ``` ```java @Component public class TransmittableThreadLocal { private static ThreadLocal traceIdLocal; public static String getTraceId() { return traceIdLocal.get(); } public static void setTraceId(String traceId) { traceIdLocal.set(traceId); } } ``` ```java // 多线程时如何继续传递 traceId 的工具类 TraceIdHelper @Component public class ThreadPoolTaskExecutor { private static ThreadPoolExecutor threadPoolExecutor; public ThreadPoolTaskExecutor() { // 创建线程池 threadPoolExecutor = new ThreadPoolExecutor(4); } public void submit(String task) { // 创建线程并提交任务 threadPoolExecutor.submit(task, new Runnable() { @Override public void run() { // 处理任务 // ... } }); } } ```

正文

为什么需要链路跟踪

为什么需要链路跟踪?微服务环境下,服务之间相互调用,可能存在 A->B->C->D->C 这种复杂的服务交互,那么需要一种方法可以将一次请求链路完整记录下来,否则排查问题不好下手、请求日志也无法完整串起来。

如何实现链路跟踪

假设我们从用户请求接口开始,每次请求需要有唯一的请求 id (我们暂且标记为 traceId)来标识这次请求,然后当接口收到请求后,调用后续服务或者 mq 时,能将该 traceId 一直传递下去,并且 log 日志中可以打印出来,这样就实现了简单的链路功能。

如何为每次请求生成一个唯一的 requestId

微服务环境下,用户请求一般优先经过网关,网关再将请求转发到各个服务。微服务网关多种多样,比如 Nginx、Zuul、Spring Cloud Gateway、Kong、Traefik 等,假设存在这样一条链路,用户请求 -> nginx -> zuul -> service-a、service-b 等(这里我们使用 Eureka 作为服务注册中心,使用 Feign 来实现微服务之间相互调用、使用 Zuul 作为服务前置网关),调用大致如下:

这样的话,从 nginx 请求开始,我们需要标识本次请求的 traceId,然后可以将 traceId 一直传递到 service 服务层,那么基于这样一个链路的话,我们怎么设计一个链路工具呢?

Nginx

nginx 从 1.11.0 版本就开始内置了变量 $request_id,其原理就是生成一串 32 位的随机字符串,虽然不能比拟 uuid,但是重复的概率也很小,可以视为 uuid 来使用。用户每次请求会生成一个 $request_id,可以作为我们的 traceId。

设置的话首先设置 nginx 日志格式,支持 $request_id

log_format access '$remote_addr $request_time $body_bytes_sent $http_user_agent $request $status $request_id'

nginx 常用的内置变量及其含义如下:

  • $remote_addr:客户端地址,如:172.16.11.1
  • $remote_user:客户端用户名称
  • $time_local:访问时间和时区,20/Dec/2022:10:47:58 +0800
  • $request:请求的URI和HTTP协议,"GET / HTTP/1.1"
  • $status:HTTP请求状态,304
  • $body_bytes_sent:发送给客户端文件内容大小
  • $request_time:整个请求的总时间
  • $request_id:当前请求的id

其次要在 nginx 转发请求时,增加 traceId 的 header:

location / {
  proxy_set_header traceId $request_id;
}

Zuul

traceId 经过 nginx 转发到 zuul 之后,zuul 路由转发时存在 header 丢失的问题,我们可以自定义一个 zuul 的前置过滤器了,在过滤器中再将 header 再传递下去,代码比较简单:

@Component
public class TraceIdPreFilter extends ZuulFilter {
    private static final String TRACE_ID = "traceId";

    @Override
    public String filterType() {
        return "pre";
    }

    @Override
    public int filterOrder() {
        return 0;
    }

    @Override
    public boolean shouldFilter() {
        return true;
    }

    @Override
    public Object run() {
        RequestContext requestContext = RequestContext.getCurrentContext();
        HttpServletRequest request = requestContext.getRequest();
        requestContext.addZuulRequestHeader(TRACE_ID, request.getHeader(TRACE_ID));
        return null;
    }
}

Service 服务层

service 服务层要做几件事:

  • 接收暂存 zuul 转发过来的 traceId
  • 日志文件配置,日志支持输出 traceId
  • service 接收到 traceId 后,再调用其他服务时,需要将 traceId 继续传递下去

首先我们来看下代码层面,如何接收暂存 zuul 转发过来的 traceId,我们需要使用到过滤器和 MDC(放入MDC中的 key 可以在日志中输出)。我们创建一个过滤器,用来接收 zuul 转发过来的 traceId,并且将 traceId 设置到 MDC 以便我们的日志文件可以输出 traceId:

public class TraceIdFilter implements Filter {
    private static final String TRACE_ID = "traceId";
    @Override
    public void doFilter(ServletRequest request, ServletResponse response, FilterChain filterChain) throws IOException, ServletException {
        try {
            HttpServletRequest httpRequest = (HttpServletRequest) request;
            String traceId = httpRequest.getHeader(TRACE_ID);
            TraceIdHelper.setTraceId(traceId);
            filterChain.doFilter(request, response);
        } finally {
            // 清除MDC的traceId值,确保当次请求不会影响其他请求
            TraceIdHelper.clearTraceId();
        }
    }

    @Override
    public void init(FilterConfig filterConfig) throws ServletException {
    }

    @Override
    public void destroy() {
    }
}

@UtilityClass
public class TraceIdHelper {
    public static final String TRACE_ID = "traceId";
    private static final ThreadLocal<String> TRACE_ID_THREAD_LOCAL = new ThreadLocal<>();

    /**
     * 设置traceId,为空时初始化一个
     * @param traceId
     */
    public void setTraceId(String traceId) {
        if (StringUtils.isBlank(traceId)) {
            traceId = UUID.randomUUID().toString();
        }
        TRACE_ID_THREAD_LOCAL.set(traceId);
        MDC.put(TRACE_ID, traceId);
    }

    /**
     * 清除traceId
     */
    public void clearTraceId() {
        TRACE_ID_THREAD_LOCAL.remove();
        MDC.remove(TRACE_ID);
    }

    /**
     * 获取traceId
     * @return
     */
    public String getTraceId() {
        return TRACE_ID_THREAD_LOCAL.get();
    }
}

过滤器注册:

@Configuration
public class TraceIdConfig {
    @Bean
    public FilterRegistrationBean<TraceIdFilter> loggingFilter() {
        FilterRegistrationBean<TraceIdFilter> registrationBean = new FilterRegistrationBean<>();
        registrationBean.setFilter(new TraceIdFilter());
        // 设置过滤的URL模式
        registrationBean.addUrlPatterns("/*");
        return registrationBean;
    }
}

再来看下我们的日志文件配置(logback.xml):

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <property name="LOG_PATTERN"
              value="%d{yyyy-MM-dd} %d{HH:mm:ss.SSS} [%highlight(%-5level)] [%boldYellow(%X{traceId})] [%boldYellow(%thread)] %boldGreen(%logger{36} %F.%L) %msg%n">
    </property>
    <property name="FILE_LOG_PATTERN"
              value="%d{yyyy-MM-dd} %d{HH:mm:ss.SSS} [%-5level] [%X{traceId}] [%thread] %logger{36} %F.%L %msg%n">
    </property>
    <property name="FILE_PATH" value="/wls/app/applogs/service-a.%d{yyyy-MM-dd}.%i.log" />
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>${LOG_PATTERN}</pattern>
        </encoder>
    </appender>
    <appender name="FILE"
              class="ch.qos.logback.core.rolling.RollingFileAppender">
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>${FILE_PATH}</fileNamePattern>
            <!-- keep 15 days' worth of history -->
            <maxHistory>15</maxHistory>
            <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
                <!-- 日志文件的最大大小 -->
                <maxFileSize>10MB</maxFileSize>
            </timeBasedFileNamingAndTriggeringPolicy>
        </rollingPolicy>
        <encoder>
            <pattern>${FILE_LOG_PATTERN}</pattern>
        </encoder>
    </appender>

    <logger name="com.example.service.controller" level="debug"></logger>
    <root level="info">
        <appender-ref ref="STDOUT"/>
        <appender-ref ref="FILE"/>
    </root>
</configuration>

这里 xml 配置文件中配置了 traceId,这样日志中就能看到 traceId 被输出了。

最后的话服务之间调用时,我们需要传递 traceId 到下一个微服务,这就要用到 feign 的拦截器:

@Component
public class TraceIdFeignInterceptor implements RequestInterceptor {
    @Override
    public void apply(RequestTemplate requestTemplate) {
        // spring的上下文对象
        ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
        if (requestAttributes != null) {
            // 前面的过滤器已经获取并设置了 traceId,这里就可以直接获取了
            requestTemplate.header(TraceIdFilter.TRACE_ID, TraceIdHelper.getTraceId());
        }
    }
}

消息队列层

假设 service-a 发送一条 mq 消息后,service-b 消费到了,那么需要将消费链路也串起来怎么做呢?我们以 rocketmq 举例,rocketmq 提供了 UserProperty 可以发送带属性的消息,这样通过 UserProperty 我们便能实现 traceId 的传递。比如消息发送时:

Message msg = new Message("SequenceTopicTest",// topic  
    "TagA",// tag  
    ("Hello RocketMQ " + i).getBytes("utf-8") // body  
);  
msg.putUserProperty("traceId", TraceIdHelper.getTraceId()); //设置 traceId

消息消费时:

String traceId = msgs.get(0).getUserProperty("traceId");
TraceIdHelper.setTraceId(traceId);

dubbo 如何传递 traceId

dubbo 的 spi 机制可以很方便的让我们来实现各种拓展,比如 dubbo 提供的 provider、consumer 过滤器,我们可以分别实现一个 provider、consumer 得到过滤器。

服务消费者那里,我们可以自定义一个 consumer 过滤器,过滤器中先通过 TraceIdHelper.getTraceId() 获取到 traceId 后再通过 dubbo 提供的 setAttachment("traceId", TraceIdHelper.getTraceId()) 将 traceId 传递下去。

同样地,服务提供者那里,我们可以自定义一个 provider 过滤器,首先通过 dubbo 提供的 getAttachment 获取到 traceId,之后再使用封装好的 TraceIdHelper.setTraceId 将 traceId 暂存即可,这里代码就不写了。

多线程时如何继续传递 traceId

我们的工具类 TraceIdHelper 注意看使用的 ThreadLocal 进行的 traceId 暂存,就会存在多线程环境下,子线程取不到 traceId 也就说子线程的日志没法打印出 traceId 的问题,解决思路的话有几种,

  • 可以自定义 ThreadPoolTaskExecutor,线程 run 执行前先将 traceId 设置进去,缺点是比较麻烦
  • 使用阿里提供的开源套件 TransmittableThreadLocal(使用线程池等会池化复用线程的执行组件情况下,提供ThreadLocal值的传递功能,解决异步执行时上下文传递的问题)

总结

链路工具的实现会用到多个组件,每个组件都需要不同的配置:

  • nginx:配置 $request_id,转发时配置 $request_id header
  • zuul:配置前置过滤器,进行 traceId 向下游透传
  • 服务层:log 日志文件配置,用到 MDC 来打印输出 traceId;用到了过滤器和 Feign 拦截器来实现 traceId 透传
  • mq:消息队列要想实现 traceId 传递,如 rocketmq 需要用到 UserProperty
  • 多线程:多线程时子线程可能会获取不到 traceId,可以自定义 ThreadPoolTaskExecutor 或者 使用阿里提供的开源套件 TransmittableThreadLocal

与如何实现简单的分布式链路功能?相似的内容:

如何实现简单的分布式链路功能?

为什么需要链路跟踪 为什么需要链路跟踪?微服务环境下,服务之间相互调用,可能存在 A->B->C->D->C 这种复杂的服务交互,那么需要一种方法可以将一次请求链路完整记录下来,否则排查问题不好下手、请求日志也无法完整串起来。 如何实现链路跟踪 假设我们从用户请求接口开始,每次请求需要有唯一的请求

面试官:你讲下如何设计支持千万级别的短链?

前言 前几天面试遇到的,感觉比较有趣。第一次面试遇到考架构设计相关的题目,挺新奇的,开始向国外大厂靠拢了,比天天问八股文好太多了,工作5年左右的,问八股文,纯纯的不负责任偷懒行为。 感觉此问题比较有趣,这几天简单的实现了一版本,和大家分享一下具体的细节,也欢迎大家交流讨论, 代码github链接 s

Scaling Memcache at Facebook

Memcached 是一种众所周知的、简单的内存缓存解决方案。本文描述了 Facebook 如何利用 memcached 作为构建块来构造和扩展一个分布式键值存储支持世界上最大的社交网络。 1.Introduction 一个社交网络(FB)的基础架构通常需要以下 允许实时通信(近似,允许一定的延迟)

6步带你用Spring Boot开发出商城高并发秒杀系统

摘要:本博客将介绍如何使用 Spring Boot 实现一个简单的商城秒杀系统,并通过使用 Redis 和 MySQL 来增强其性能和可靠性。 本文分享自华为云社区《Spring Boot实现商城高并发秒杀案例》,作者:林欣。 随着经济的发展和人们消费观念的转变,电子商务逐渐成为人们购物的主要方式之

Serverless冷启动:如何让函数计算更快更强?

摘要:借助Serverless计算,开发者仅需上传业务代码并进行简单的资源配置便可实现服务的快速构建部署,云服务商则按照函数服务调用量和实际资源使用收费,从而帮助用户实现业务的快速交付和低成本运行。 本文分享自华为云社区《Serverless冷启动:如何让函数计算更快更强?》,作者:DevAI 。

如何通过gRPC传输文件

在gRPC中,可以通过将文件分割成多个小块,然后使用流式RPC将这些小块发送到服务器来传输文件。以下是一个简单的示例,展示了如何在gRPC中实现文件传输。 首先,我们需要定义一个服务来处理文件传输。在`.proto`文件中,我们可以定义一个`UploadFile`服务,它接收一个流式的`Chunk`

使用doop识别最近commons text漏洞的污点信息流

本文基于笔者对doop静态程序分析框架源代码和规则学习,并结合对目前漏洞公开技术细节的学习,修改增强doop app only模式下的分析规则后,实现通过doop工具识别commons text rce漏洞(CVE-2022-42889)。内容包含三部分,第一部分简单介绍doop分析框架,第二部分简单介绍commons text漏洞的原理和代码调用栈,第三部分重点介绍如何改造doop app on

[转帖]零信任策略下K8s安全监控最佳实践(K+)

https://developer.aliyun.com/article/1009607?spm=a2c6h.24874632.expert-profile.126.3b0b506fysVD76 简介: 本文重点将围绕监控防护展开,逐层递进地介绍如何在复杂的分布式容器化环境中借助可观测性平台,持续监

[转帖]Redis6通信协议升级至RESP3,一口气看完13种新数据类型

原创:微信公众号 码农参上,欢迎分享,转载请保留出处。 在前面的文章 Redis:我是如何与客户端进行通信的 中,我们介绍过RESP V2版本协议的规范,RESP的全程是Redis Serialization Protocol,基于这个实现简单且解析性能优秀的通信协议,Redis的服务端与客户端可以

Pytorch DistributedDataParallel(DDP)教程二:快速入门实践篇

一、简要回顾DDP 在上一篇文章中,简单介绍了Pytorch分布式训练的一些基础原理和基本概念。简要回顾如下: 1,DDP采用Ring-All-Reduce架构,其核心思想为:所有的GPU设备安排在一个逻辑环中,每个GPU应该有一个左邻和一个右邻,设备从它的左邻居接收数据,并将数据汇总后发送给右邻。