OpenTelemetry 深度定制:跨服务追踪的实战技巧

opentelemetry · 浏览次数 : 0

小编点评

本文主要讨论了如何在分布式追踪系统中通过自定义Baggage来实现上游信息的传递。通过对OpenTelemetry的分布式追踪功能的深入研究,提出了一种在gRPC和HTTP请求中添加上下文信息的方法,并展示了如何将这些信息整合到现有的分布式追踪系统中。 1. **需求背景**: 本文首先介绍了在分布式系统中进行请求追踪的需求,特别是在复杂的调用链中,需要清晰地了解每个子span的上游调用来源。 2. **解决方案提出**: 为了解决这个问题,提出了在每个子span中加入两个属性来标识其父调用来源的方案。这些属性可以通过自定义的Baggage对象存储,并在SpanProcessor中读取。 3. **实现细节**: 文章详细描述了如何在gRPC和HTTP请求中实现Baggage的写入和读取。对于gRPC,通过在GrpcServerContextCustomizer中写入Baggage,并在SpanProcessor中读取;对于HTTP,通过在HttpServerRouteBuilder中实现ContextCustomizer接口。 4. **实验验证**: 为了验证方案的可行性,作者启动了三个demo应用,并配置了相应的OpenTelemetry配置。通过这种方式,可以实现在分布式追踪系统中清晰地显示每个请求的上游调用来源。 总的来说,本文提出了一种在分布式追踪系统中通过自定义Baggage来实现上游信息传递的有效方法。这种方法不仅有助于理解复杂的调用链,而且对于监控和分析分布式系统的性能和行为具有重要意义。

正文

背景

在上一篇《从 Dapper 到 OpenTelemetry:分布式追踪的演进之旅》中在最后提到在做一些 Trace 的定制开发。

到现在差不多算是完成了,可以和大家分享一下。

我们的需求是这样的:


假设现在有三个服务:ServiceA、ServiceB、ServiceC

ServiceA 对外提供了一个 http 接口 request,在这个接口会调用 ServiceBorder 订单接口创建订单,同时 serviceB 调用 serviceC 的 pay 接口。


整个调用关系如上图所示。

默认情况下 span 中的 attribute 会记录当前 span 的一些信息,比如:

这些都是当前一些当前 span 内置的信息,比如当前 gRPC 接口的一些基本数据:服务名、ip、端口等信息。

但这里并没有上游的一些信息,虽然我们可以通过 Jaeger 的树状图得知上游是哪个应用调用过来的,但是一旦某个 span 下有多个子 span 的调用,就没办法很直观知道这个子 span 的上游是由谁发起的调用。

比如如下这个链路:

当一个调用链非常长,同时也非常复杂时,没办法第一时间知道某一个 span 的上游到底是谁发起的,需要手动一层层的去折叠,或者全靠眼睛去找。

预期效果

为此我们希望的效果是可以通过给每一个子 span 中加入两个 attribute,来标明它的父调用来源。

比如在 serviceB 中的所有 span 中都会加上两个标签:来源是 serviceA,同时是 serviceA 的 request 接口发起的请求。

而在 serviceC 中同样可以知道来源是 serviceB 的 Order 接口发起的调用。

我启动了三个 demo 应用,分别是 create1,create2,create3.

create1 中会提供一个 request 接口,在这里面调用 create2 的 create2 接口,create2 的接口里接着调用 create3 的 create3 接口。

create1:

    @RequestMapping("/request")  
    public String request(@RequestParam String name) {  
       HelloRequest request = HelloRequest.newBuilder()  
             .setName(name)  
             .build();  
       log.info("request: {}", request);  
       String message = myServiceStub.create2(request).getMessage();  
       Executors.newFixedThreadPool(1).execute(() -> {  
          myServiceStub.create2(request).getMessage();  
       });       return message;  
    }

create2:

@Override  
public void create2(HelloRequest request, StreamObserver<HelloReply> responseObserver) {  
    HelloReply reply = HelloReply.newBuilder()  
            .setMessage("Create2 ==> " + request.getName())  
            .build();  
    log.info("Create2: {}", reply.getMessage());  
    myMethod(request.getName());  
    myServiceStub.create3(request);
    responseObserver.onNext(reply);  
    responseObserver.onCompleted();  
}

create3:

@Override  
public void create3(HelloRequest request, StreamObserver<HelloReply> responseObserver) {  
    HelloReply reply = HelloReply.newBuilder()  
            .setMessage("Create3 ==> " + request.getName())  
            .build();  
    log.info("Create3: {}", reply.getMessage());  
    myMethod(request.getName());  
    responseObserver.onNext(reply);  
    responseObserver.onCompleted();  
}
java -javaagent:opentelemetry-javaagent-2.4.0-SNAPSHOT.jar \
-Dotel.javaagent.extensions=otel-extensions-custom-context-1.0-SNAPSHOT.jar \
-Dotel.traces.exporter=otlp \
-Dotel.logs.exporter=none \
-Dotel.service.name=create2 \
-Dotel.exporter.otlp.protocol=grpc \
-Dotel.propagators=tracecontext,baggage,demo \
-Dotel.exporter.otlp.endpoint=http://127.0.0.1:5317 \
      -jar target/demo-0.0.1-SNAPSHOT.jar --spring.application.name=create2 --server.port=9191 --grpc.server.port=9292 --grpc.client.myService.address=static://127.0.0.1:9393

只是每个应用都需要使用我这边单独打的 agent 包以及一个 extension(tel-extensions-custom-context-1.0-SNAPSHOT.jar) 才能生效。

最终的效果如下:

Baggage

在讲具体的实现之前需要先了解几个 Trace 中的概念,在这里主要用到的是一个称为 Baggage 的对象。

在之前的文章中其实提到过它的原理以及使用场景:
从 Dapper 到 OpenTelemetry:分布式追踪的演进之旅

Baggage 的中文翻译是:包裹📦;简单来说就是我们可以通过自定义 baggage 可以将我们想要的数据存放在其中,这样再整个 Trace 的任意一个 Span 中都可以读取到。

@RequestMapping("/request")  
public String request(@RequestParam String name) {  
	// 写入
    Baggage.current().toBuilder().  
          put("request.name", name).build()  
          .storeInContext(Context.current()).makeCurrent();
}         

// 获取
String value = Baggage.current().getEntryValue("request.name");  
log.info("request.name: {}", value);

理解了这个之后,我们要实现的将上游的信息传递到下游就可以通过这个组件实现了。

只需要在上游创建 span 时将它自身数据写入到 Baggage 中,再到下游 span 取出来写入到 attribute 中即可。

ContextCustomizer

这里的关键就是在哪里写入这个 Baggage,因为对第三方组件的 Instrumentation 的实现都是在 opentelemetry-java-instrumentation项目中。

javaagent.jar 包也是通过该项目打包出来的。

所以在该项目的 io.opentelemetry.instrumentation.api.instrumenter.Instrumenter#doStart 这个函数中我们发现一段逻辑:


这个函数是在创建一个 span 的时候调用的,通常这个创建函数是在这些第三方库的拦截器中创建的。


比如这是在 grpc 的拦截器中调用。

// context customizers run before span start, so that they can have access to the parent span  
// context, and so that their additions to the context will be visible to span processors  
for (ContextCustomizer<? super REQUEST> contextCustomizer : contextCustomizers) {  
  context = contextCustomizer.onStart(context, request, attributes);  
}

ContextCustomizer 是一个接口只提供了一个函数:

public interface ContextCustomizer<REQUEST> {  
  
  /** Allows to customize the operation {@link Context}. */  
  Context onStart(Context parentContext, REQUEST request, Attributes startAttributes);  
}
  • Context 是上下文信息,可以在自定义的 ContextCustomizer 继续往上下文中追加信息。
  • REQUEST 是一个泛型:一般是当前第三方组件的请求信息:
    • 比如是 HTTP 时,这个 request 就是 HTTP 的请求信息。
    • 而如果是 gRPC ,则是 gRPC 的请求信息。
    • 其他的请求类型同理。
  • startAttributes 则是预先写入的一些属性,比如在上图中看到的一些 rpc.service/rpc.method等字段。
// context customizers run before span start, so that they can have access to the parent span  
// context, and so that their additions to the context will be visible to span processors

从这个接口的调用注释可以看出:
这个自定义的 context 会在 span 开始之前调用,所以在这里是可以访问到当前创建的 span 的父 context,同时在这里的 context 中新增的数据可以在 SpanProcessor 访问到。

SpanProcessor

而 SpanProcessor 又是一个非常的重要的组件,我们接着刚才的 contextCustomizer 处往后跟踪代码。

context = contextCustomizer.onStart(context, request, attributes);
	--->Span span = spanBuilder.setParent(context).startSpan();
			--->io.opentelemetry.sdk.trace.SdkSpanBuilder#startSpan
				--->io.opentelemetry.sdk.trace.SdkSpan#startSpan
					--->spanProcessor.onStart(parentContext, span);

可以看到 spanProcessor.onStart 这个函数会在 contextCustomizer 之后调用。


/**  
 * SpanProcessor is the interface {@link SdkTracer} uses to allow synchronous hooks for when a  
 * {@code Span} is started or when a {@code Span} is ended.  
 */
 
//==========================================================

/**  
 * Called when a {@link io.opentelemetry.api.trace.Span} is started, if the {@link  
 * Span#isRecording()} returns true.  
 * * <p>This method is called synchronously on the execution thread, should not throw or block the  
 * execution thread. * * @param parentContext the parent {@code Context} of the span that just started.  
 * @param span the {@code Span} that just started.  
 */void onStart(Context parentContext, ReadWriteSpan span);

从注释中可以知道 SpanProcessor 是作为一个 span 的生命周期中的关键节点的 hook 函数。

在这些函数中我们可以自定义一些 span 的数据,比如在 onStart 还可以往 span 中写入一些自定义的 attribute。

这也是我们这次会用到的一个接口,我们的方案是:

在 gRPC 构建 Instrument 时自定义一个 GrpcServerContextCustomizer ,在这个自定义的 ContextCustomizer 中写入一个 Baggage

然后在 io.opentelemetry.sdk.trace.SpanProcessor#onStart 接口中取出这个 Baggage 写入到当前 span 的 attribute 中。

这样我们就可以看到之前提到的那些数据上游信息了。

为 gRPC 添加上下文

先来看看如何为 gRPC 添加 Baggage

我们先自定义一个 GrpcServerContextCustomizer 实现类:

public class GrpcServerContextCustomizer implements ContextCustomizer<GrpcRequest> {  
  private final String currentServiceName;  
  
  private static final String PARENT_RPC_KEY = "parent_rpc";  
  private static final String CURRENT_RPC_KEY = "current_rpc";  
  
  private static final String CURRENT_HTTP_URL_PATH = "current_http_url_path";  
  
  public GrpcServerContextCustomizer(String serviceName) {  
    this.currentServiceName = serviceName;  
  }  
  @Override  
  public Context onStart(Context parentContext, GrpcRequest grpcRequest,  
      Attributes startAttributeds) {  
    BaggageBuilder builder = Baggage.fromContext(parentContext).toBuilder();  
  
    String currentRpc = Baggage.fromContext(parentContext).getEntryValue(CURRENT_RPC_KEY);  
    String fullMethodName = startAttributeds.get(AttributeKey.stringKey("rpc.method"));  
    String rpcService = startAttributeds.get(AttributeKey.stringKey("rpc.service"));  
    // call from grpc  
    String method = rpcService + ":" + fullMethodName;  
    String baggageInfo = getBaggageInfo(currentServiceName, method);  
  
    String httpUrlPath = Baggage.fromContext(parentContext).getEntryValue(CURRENT_HTTP_URL_PATH);  
    if (!StringUtils.isNullOrEmpty(httpUrlPath)) {  
      // call from http  
      // currentRpc = currentRpc;  currentRpc = create1|GET:/request      // clear current_http_url_path      builder.put(CURRENT_HTTP_URL_PATH, "");  
    }  
    Baggage baggage = builder  
        .put(PARENT_RPC_KEY, currentRpc)  
        .put(CURRENT_RPC_KEY, baggageInfo)  
        .build();  
    return parentContext.with(baggage);  
  
  }  
  private static String getBaggageInfo(String serviceName, String method) {  
    if (StringUtils.isNullOrEmpty(serviceName)) {  
      return "";  
    }    return serviceName + "|" + method;  
  }  
}

从这个代码中可以看出,我们需要先从上下文中获取 CURRENT_RPC_KEY ,从而得知当前的 span 是不是 root span。

所以我们其实是把当前的 span 信息作为一个 PARENT_RPC_KEY 写入到 Baggage 中。

这样在 SpanProcessor 中便可以直接取出 PARENT_RPC_KEY 作为上游的信息写入 span 的 attribute 中。

    @Override  
    public void onStart(Context parentContext, ReadWriteSpan span) {
        String parentRpc = Baggage.fromContext(parentContext).getEntryValue("parent_rpc");  
        if (!StringUtils.isNullOrEmpty(parentRpc)) {  
            String[] split = parentRpc.split("\\|");  
            span.setAttribute("parent_rpc", parentRpc);  
            span.setAttribute("parent_service_name", split[0]);  
            span.setAttribute("parent_service_method", split[1]); 
        }  
    }

需要注意的是,这里的 Baggage 需要使用 Baggage.fromContext(parentContext) 才能拿到刚才写入 Baggage 信息。

之后我们找到构建 gRPCServerInstrumenterBuilder 的地方,写入我们刚才自定义的 GrpcServerContextCustomizer 即可。

.addContextCustomizer(new GrpcServerContextCustomizer(serviceName))

这里我们选择写入到是 serverInstrumenterBuilder 而不是clientInstrumenterBuilder,因为在服务端的入口就知道是从哪个接口进来的请求。

为 spring boot 的 http 接口添加上下文

如果只存在 gRPC 调用时只添加 gRPC 的上下文也够用了,但是我们也不排除由外部接口是通过 HTTP 访问进来的,然后再调用内部的 gRPC 接口;这也是非常常见的架构模式。

所以我们还需要在 HTTP 中增加 ContextCustomizer 将自身的数据写入到 Baggage 中。

好在 HttpServerRouteBuilder 自身是实现了 ContextCustomizer 接口的,我们只需要往里面写入 Baggage 数据即可。

public ContextCustomizer<REQUEST> build() {  
  Set<String> knownMethods = new HashSet<>(this.knownMethods);  
  return (context, request, startAttributes) -> {  
    if (HttpRouteState.fromContextOrNull(context) != null) {  
      return context;  
    }    String method = getter.getHttpRequestMethod(request);  
    if (method == null || !knownMethods.contains(method)) {  
      method = "HTTP";  
    }    String urlPath = getter.getUrlPath(request);  
    String methodPath = method + ":" + urlPath;  
  
    String currentRpc = Baggage.fromContext(context).getEntryValue(CURRENT_RPC_KEY);  
    String baggageInfo = getBaggageInfo(serviceName, methodPath);  
    Baggage baggage = Baggage.fromContext(context).toBuilder()  
        .put(PARENT_RPC_KEY, currentRpc)  
        .put(CURRENT_RPC_KEY, baggageInfo)  
        .put(CURRENT_HTTP_URL_PATH, methodPath)  
        .build();   
    return context.with(HttpRouteState.create(method, null, 0))  
        .with(baggage);  
  };}

这里新增了 CURRENT_HTTP_URL_PATH 用于标记当前的请求来源是 HTTP,在 grpc 的 ContextCustomizer 解析时会判断这个值是否为空。

String httpUrlPath = Baggage.fromContext(parentContext).getEntryValue(CURRENT_HTTP_URL_PATH);  
if (!StringUtils.isNullOrEmpty(httpUrlPath)) {  
  // call from http  
  // currentRpc = currentRpc;  currentRpc = create1|GET:/request  // clear current_http_url_path  builder.put(CURRENT_HTTP_URL_PATH, "");  
}

这样就可以在 grpc 的下游接口拿到入口的 HTTP 接口数据了。


当然也有可能是在 grpc 接口中调用 HTTP 的接口的场景,只是我们的业务中没有这种情况,所以就没有适配这类的场景。

总结

ContextCustomizer 接口没有提供对应的扩展,但是 SpanProcessor 是提供了扩展接口的。

原本是想尽量别维护自己的 javaagent,但也好在 OpenTelemetry 是提供的接口,所以也并不会去修改原本的代码。

所以我们还是需要创建一个 extensions 的项目在实现 SpanProcessor,这个在之前的 《实战:如何编写一个 OpenTelemetry Extensions》有详细讲到。

所以最后的应用启动方式如下:

java -javaagent:opentelemetry-javaagent-2.4.0-SNAPSHOT.jar \
-Dotel.javaagent.extensions=otel-extensions-custom-context-1.0-SNAPSHOT.jar \

需要使用我们手动打包的 javaagent 以及一个自定义扩展包。

打包方式:

 ./gradlew assemble

opentelemetry-java-instrumentation 项目比较大,所以打包过程可能比较久。

因为这其实是一些定制需求,所以就没提交到上游,感兴趣的可以自行合并代码测试。

最后可以这个分支中查看到修改的部分:
https://github.com/crossoverJie/opentelemetry-java-instrumentation/compare/main...add-grpc-context

与OpenTelemetry 深度定制:跨服务追踪的实战技巧相似的内容:

OpenTelemetry 深度定制:跨服务追踪的实战技巧

背景 在上一篇《从 Dapper 到 OpenTelemetry:分布式追踪的演进之旅》中在最后提到在做一些 Trace 的定制开发。 到现在差不多算是完成了,可以和大家分享一下。 我们的需求是这样的: 假设现在有三个服务:ServiceA、ServiceB、ServiceC ServiceA 对外

OpenTelemetry 实践指南:历史、架构与基本概念

背景 之前陆续写过一些和 OpenTelemetry 相关的文章: 实战:如何优雅的从 Skywalking 切换到 OpenTelemetry 实战:如何编写一个 OpenTelemetry Extensions 从一个 JDK21+OpenTelemetry 不兼容的问题讲起 这些内容的前提是最

OpenTelemetry agent 对 Spring Boot 应用的影响:一次 SPI 失效的案例

背景 前段时间公司领导让我排查一个关于在 JDK21 环境中使用 Spring Boot 配合一个 JDK18 新增的一个 SPI(java.net.spi.InetAddressResolverProvider) 不生效的问题。 但这个不生效的前置条件有点多: JDK 的版本得在 18+ Spri

.NET程序对接 OpenTelemetry logs

OpenTelemetry 简介 OpenTelemetry 是一个由 CNCF(Cloud Native Computing Foundation)托管的开源项目,旨在为观察性(Observability)提供一套全面的工具,包括度量(Metrics)、日志(Logs)和追踪(Traces)。它的

OpenTelemetry agent 对 Spring Boot 应用的影响:一次 SPI 失效的

背景 前段时间公司领导让我排查一个关于在 JDK21 环境中使用 Spring Boot 配合一个 JDK18 新增的一个 SPI(java.net.spi.InetAddressResolverProvider) 不生效的问题。 但这个不生效的前置条件有点多: JDK 的版本得在 18+ Spri

AgileConfig-1.9.4 发布,支持 OpenTelemetry

Hello 大家好,最新版的 AgileConfig 1.9.4 发布了。现在它可以通过 OpenTelemetry 对外提供 logs,traces,metrics 三个维度的数据。用户可以自由选择支持 otlp 协议的工具来进行查询与分析。比如 Seq,loki,prometheus, graf

.NET 使用 OpenTelemetry metrics 监控应用程序指标

上一次我们讲了 OpenTelemetry Logs 与 OpenTelemetry Traces。今天继续来说说 OpenTelemetry Metrics。 随着现代应用程序的复杂性不断增加,对于性能监控和故障排除的需求也日益迫切。在 .NET 生态系统中,OpenTelemetry Metri

.NET 中使用 OpenTelemetry Traces 追踪应用程序

上一次我们讲了 OpenTelemetry Logs。今天继续来说说 OpenTelemetry Traces。 在今天的微服务和云原生环境中,理解和监控系统的行为变得越来越重要。在当下我们实现一个功能可能需要调用了 N 个方法,涉及到 N 个服务。方法之间的调用如蜘蛛网一样。分布式追踪这个时候就至

使用 OpenTelemetry 构建 .NET 应用可观测性(1):什么是可观测性

[TOC] # 什么是系统的可观测性(Observability) 对软件行业来说,可观测性(Observability)是一个舶来词,出自控制论(Control Theory)。 **可观测性是系统的一个属性**,它是指系统的状态能否被观测,也就是说,系统的状态能否被监控、收集、分析、查询、可视化

使用 OpenTelemetry 构建 .NET 应用可观测性(2):OpenTelemetry 项目简介

[TOC] # 前世今生 ## OpenTracing OpenTracing 项目启动于 2016 年,旨在提供一套分布式追踪标准,以便开发人员可以更轻松地实现分布式追踪。 OpenTracing 定义了一套 Tracing 模型,以及一套 API,用于在应用程序中创建和管理这些数据模型。 下面是