在阿里云上为 Python 与 Kotlin 异构服务实现基于 tRPC 的统一可观测性链路


团队接手了一个混杂技术栈的系统。核心数据处理模块是一个稳定运行多年的Python服务,基于Flask框架,我们称之为DataProcessor。最近,为了应对高并发查询场景,我们引入了一个新的Kotlin服务,QueryGateway,它使用Ktor框架构建。两个服务间的通信选型是个棘手的问题,由于历史原因,公司内部服务治理体系与一个C++构建的tRPC网关深度绑定,所有新服务都要求通过tRPC协议进行内部通信。

最初的集成很顺利,直到第一次线上故障。一个请求通过QueryGateway,调用了DataProcessor,最终在Python服务深处抛出了一个异常。排查过程极为痛苦,我们只能在QueryGateway的日志里找到请求入口,然后在DataProcessor成千上万的日志条目中根据时间戳和参数去猜测哪一条是对应的。两个服务像是独立的黑盒,一次完整的业务调用链被tRPC硬生生切断,可观测性几乎为零。

这个痛点促使我们必须建立一套跨语言、跨服务的统一可观测性链路。目标很明确:任何一个请求,无论它经过多少个服务,都必须拥有一个唯一的trace_id,并且能够将整个调用过程串联成一个完整的火焰图。

构想与技术选型决策

初步构想是利用分布式追踪系统。行业标准无疑是OpenTelemetry (OTel),它的好处在于厂商中立和广泛的社区支持,提供了统一的API、SDK和数据协议(OTLP)。即使我们目前部署在阿里云上,使用OTel也能避免被单一云厂商的探针或SDK锁定。

我们的技术栈是:

  • 云平台: 阿里云。因此,最终的追踪数据和日志需要无缝对接到阿里云可观测性产品,如链路追踪(ARMS)和日志服务(SLS)。
  • 服务: Kotlin (Ktor) 和 Python (Flask)。OTel对这两种语言都有一流的支持。
  • 通信: tRPC。这是真正的挑战所在。

tRPC是一个高性能的RPC框架,但它的生态不像gRPC或REST那样成熟,尤其是在与OpenTelemetry集成的自动化配置方面。OTel的自动插桩库通常能很好地支持主流的HTTP客户端和gRPC框架,但对于tRPC,我们找不到现成的解决方案。这意味着我们必须手动实现上下文的跨进程传播。

上下文传播是分布式追踪的核心。当服务A调用服务B时,服务A必须将当前的追踪上下文(主要是trace_idparent_span_id)注入到请求中。服务B则需要从请求中提取这个上下文,并基于它创建新的子Span。这样,调用链才得以延续。

OTel为此定义了W3C Trace Context标准,通常通过HTTP Header(如traceparent)来传递。在tRPC中,没有HTTP Header的概念,但它提供了trans_info或类似的元数据(metadata)机制,允许在请求中附加自定义的键值对。我们的核心任务,就是编写tRPC的客户端拦截器(ClientInterceptor)和服务器拦截器(ServerInterceptor),利用这个元数据机制来手动注入和提取W3C追踪上下文。

最终的架构设计如下:

sequenceDiagram
    participant Client as 外部请求
    participant KotlinSvc as QueryGateway (Kotlin)
    participant PythonSvc as DataProcessor (Python)
    participant Collector as OTel Collector
    participant AlibabaCloud as 阿里云 (ARMS/SLS)

    Client->>+KotlinSvc: 发起API请求
    Note over KotlinSvc: 启动新的Trace, 生成TraceContext
    KotlinSvc->>+KotlinSvc: [tRPC Client Interceptor]
将TraceContext注入tRPC元数据 KotlinSvc->>+PythonSvc: 发起tRPC调用 Note over PythonSvc: [tRPC Server Interceptor]
从tRPC元数据中提取TraceContext PythonSvc->>+PythonSvc: 处理业务逻辑, 创建子Span PythonSvc-->>-KotlinSvc: tRPC响应 KotlinSvc-->>-Client: API响应 par KotlinSvc-->>Collector: 异步导出Spans (OTLP) and PythonSvc-->>Collector: 异步导出Spans (OTLP) end Collector-->>AlibabaCloud: 批量导出到ARMS/SLS

步骤化实现:构建跨语言tRPC追踪链路

第一步:环境准备与OpenTelemetry Collector配置

在真实项目中,我们会将OTel Collector作为DaemonSet或Sidecar部署在Kubernetes集群中。为了演示,我们使用一个简单的docker-compose.yml

# docker-compose.yml
version: '3.8'
services:
  # OTel Collector 负责接收、处理并导出遥测数据
  otel-collector:
    image: otel/opentelemetry-collector-contrib:0.87.0
    container_name: otel-collector
    command: ["--config=/etc/otelcol-contrib/config.yaml"]
    volumes:
      - ./otel-collector-config.yaml:/etc/otelcol-contrib/config.yaml
    ports:
      - "4317:4317" # OTLP gRPC receiver
      - "4318:4318" # OTLP HTTP receiver
    networks:
      - observability-net

  # Kotlin服务 (QueryGateway)
  # ... 此处省略具体服务定义

  # Python服务 (DataProcessor)
  # ... 此处省略具体服务定义

networks:
  observability-net:
    driver: bridge

关键在于otel-collector-config.yaml的配置。我们将接收OTLP协议的数据,然后通过阿里云官方提供的aliyun_sls导出器将数据发送到指定的日志服务(SLS)项目。

# otel-collector-config.yaml
receivers:
  otlp:
    protocols:
      grpc:
      http:

processors:
  batch:

exporters:
  # 配置阿里云SLS导出器
  # 这是将数据送往阿里云可观测性平台的核心
  aliyun_sls:
    # 阿里云地域接入点, 例如: cn-hangzhou.log.aliyuncs.com
    endpoint: "${ALIYUN_SLS_ENDPOINT}"
    # 阿里云访问凭证, 生产环境建议使用RAM角色或STS临时凭证
    access_key_id: "${ALIYUN_ACCESS_KEY_ID}"
    access_key_secret: "${ALIYUN_ACCESS_KEY_SECRET}"
    # Traces数据对应的SLS配置
    traces:
      project: "your-sls-project-name" # SLS项目名
      logstore: "your-traces-logstore-name" # 用于存储Trace数据的Logstore
      # 可选: 自定义字段映射等
    # Logs数据对应的SLS配置
    logs:
      project: "your-sls-project-name"
      logstore: "your-logs-logstore-name"

service:
  pipelines:
    traces:
      receivers: [otlp]
      processors: [batch]
      exporters: [aliyun_sls]
    logs:
      receivers: [otlp]
      processors: [batch]
      exporters: [aliyun_sls]

这里的环境变量ALIYUN_SLS_ENDPOINT等需要在启动容器时注入。这是连接我们自建可观测性管道和云厂商后台的桥梁。

第二步:Kotlin服务(调用方)的tRPC客户端拦截器

QueryGateway服务中,我们需要配置OTel SDK,并实现一个ClientInterceptor

首先,在build.gradle.kts中添加依赖:

// build.gradle.kts
dependencies {
    // OpenTelemetry API & SDK
    implementation("io.opentelemetry:opentelemetry-api:1.31.0")
    implementation("io.opentelemetry:opentelemetry-sdk:1.31.0")

    // OTLP Exporter
    implementation("io.opentelemetry:opentelemetry-exporter-otlp:1.31.0")

    // SDK自动配置扩展
    implementation("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure:1.31.0")
    
    // tRPC Kotlin依赖
    // implementation("com.tencent.trpc:trpc-kotlin-spring-boot-starter:...")
}

接下来,配置OpenTelemetry的全局实例。在生产级的Spring Boot应用中,这通常通过一个@Configuration类来完成。

// src/main/kotlin/com/example/observability/OtelConfiguration.kt
package com.example.observability

import io.opentelemetry.api.OpenTelemetry
import io.opentelemetry.api.common.Attributes
import io.opentelemetry.sdk.OpenTelemetrySdk
import io.opentelemetry.sdk.resources.Resource
import io.opentelemetry.sdk.trace.SdkTracerProvider
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import java.util.concurrent.TimeUnit

@Configuration
class OtelConfiguration {

    @Bean
    fun openTelemetry(): OpenTelemetry {
        val resource = Resource.getDefault().merge(
            Resource.create(Attributes.of(ResourceAttributes.SERVICE_NAME, "query-gateway-kotlin"))
        )

        val spanExporter = OtlpGrpcSpanExporter.builder()
            .setEndpoint("http://otel-collector:4317") // 指向OTel Collector
            .setTimeout(2, TimeUnit.SECONDS)
            .build()

        val spanProcessor = BatchSpanProcessor.builder(spanExporter)
            .setScheduleDelay(100, TimeUnit.MILLISECONDS)
            .build()

        val tracerProvider = SdkTracerProvider.builder()
            .addSpanProcessor(spanProcessor)
            .setResource(resource)
            .build()

        return OpenTelemetrySdk.builder()
            .setTracerProvider(tracerProvider)
            .buildAndRegisterGlobal() // 注册为全局实例,方便各处调用
    }
}

最核心的部分是tRPC的ClientInterceptor。我们需要创建一个类,实现tRPC框架提供的拦截器接口。

// src/main/kotlin/com/example/trpc/OtelClientInterceptor.kt
package com.example.trpc

import com.tencent.trpc.core.extension.Extension
import com.tencent.trpc.core.filter.spi.Filter
import com.tencent.trpc.core.rpc.Invoker
import com.tencent.trpc.core.rpc.Request
import com.tencent.trpc.core.rpc.Response
import io.opentelemetry.api.OpenTelemetry
import io.opentelemetry.api.trace.SpanKind
import io.opentelemetry.api.trace.StatusCode
import io.opentelemetry.context.Context
import io.opentelemetry.context.propagation.TextMapSetter
import java.util.concurrent.CompletionStage

// 假设tRPC的元数据是一个MutableMap<String, Any>
// 这里的TextMapSetter是关键,它定义了如何将Context写入载体(carrier)
private val setter = TextMapSetter<MutableMap<String, Any>> { carrier, key, value ->
    // W3C Trace Context规范要求header key是小写的
    carrier?.set(key.lowercase(), value)
}

@Extension(name = "otelClientInterceptor")
class OtelClientInterceptor(private val openTelemetry: OpenTelemetry) : Filter {

    override fun filter(invoker: Invoker<*>, request: Request): CompletionStage<Response> {
        val tracer = openTelemetry.getTracer("trpc-kotlin-client")
        val parentContext = Context.current()

        // 1. 创建一个新的Span, 类型为CLIENT
        val span = tracer.spanBuilder("${request.rpcMethodName} send")
            .setParent(parentContext)
            .setSpanKind(SpanKind.CLIENT)
            .startSpan()

        // 2. 将Span放入当前上下文,并注入到tRPC请求的元数据中
        try {
            span.makeCurrent().use { scope ->
                // 获取trans_info,如果不存在则创建
                // tRPC框架的具体API可能不同,这里是示意
                val attachments = request.attachments ?: mutableMapOf()
                if (request.attachments == null) {
                    request.attachments = attachments
                }

                // 这是魔法发生的地方: 将当前Trace Context注入到attachments中
                openTelemetry.propagators.textMapPropagator.inject(
                    Context.current(),
                    attachments,
                    setter
                )
                
                // 设置span的属性,对于RPC调用非常重要
                span.setAttribute("rpc.system", "trpc")
                span.setAttribute("rpc.service", request.rpcServiceName)
                span.setAttribute("rpc.method", request.rpcMethodName)

                // 3. 执行真正的RPC调用
                return invoker.invoke(request).whenComplete { response, throwable ->
                    // 4. 在调用完成后,根据结果结束Span
                    if (throwable != null) {
                        span.setStatus(StatusCode.ERROR, throwable.message)
                        span.recordException(throwable)
                    } else {
                        if (response.exception != null) {
                            span.setStatus(StatusCode.ERROR, response.exception.message)
                            span.recordException(response.exception)
                        } else {
                            span.setStatus(StatusCode.OK)
                        }
                    }
                    span.end()
                }
            }
        } catch (e: Exception) {
            // 确保即使在注入或调用前发生异常,span也能被正确关闭
            span.setStatus(StatusCode.ERROR, "Failed to send tRPC request")
            span.recordException(e)
            span.end()
            throw e
        }
    }
}

这个拦截器会在每次tRPC调用前执行。它创建了一个CLIENT类型的Span,然后使用openTelemetry.propagators.textMapPropagator将追踪上下文(如traceparent头)序列化并存入tRPC请求的attachments中。调用结束后,无论成功失败,它都会正确地关闭Span。

第三步:Python服务(被调方)的tRPC服务器拦截器

现在轮到DataProcessor服务了。我们需要做与Kotlin服务相反的操作:提取追踪上下文,并基于它创建一个SERVER类型的Span。

首先是Python环境的依赖:

# requirements.txt
flask
opentelemetry-api
opentelemetry-sdk
opentelemetry-exporter-otlp
# tRPC Python依赖
# trpc-py

接着,在Python应用启动时配置OTel SDK。

# app/observability.py
import os
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource, SERVICE_NAME

def configure_tracer():
    """配置全局的TracerProvider"""
    # 这里的endpoint指向OTel Collector
    otel_collector_endpoint = os.getenv("OTEL_COLLECTOR_ENDPOINT", "otel-collector:4317")
    
    resource = Resource(attributes={
        SERVICE_NAME: "data-processor-python"
    })

    provider = TracerProvider(resource=resource)
    processor = BatchSpanProcessor(OTLPSpanExporter(endpoint=otel_collector_endpoint, insecure=True))
    provider.add_span_processor(processor)

    # 设置为全局的TracerProvider
    trace.set_tracer_provider(provider)

然后是核心的tRPC ServerInterceptor。我们需要一个TextMapGetter来告诉OTel如何从tRPC的元数据中读取上下文。

# app/trpc_interceptor.py
from opentelemetry import trace
from opentelemetry.propagate import extract
from opentelemetry.context import attach, detach
from opentelemetry.trace.propagation import get_current_span
from opentelemetry.trace import SpanKind, StatusCode
from opentelemetry.context.context import Context

# TextMapGetter定义了如何从载体(carrier)中读取值
# tRPC元数据在Python中通常表现为字典
class TrpcMetadataGetter:
    def get(self, carrier: dict, key: str) -> list[str] | None:
        return carrier.get(key, None)

    def keys(self, carrier: dict) -> list[str]:
        return list(carrier.keys())

# 全局实例化getter
getter = TrpcMetadataGetter()

class OtelServerInterceptor:
    """
    一个tRPC服务器拦截器,用于处理OpenTelemetry的上下文传播。
    在真实项目中,此类需要继承tRPC Python框架提供的拦截器基类。
    """
    def __init__(self):
        self.tracer = trace.get_tracer(__name__)

    def intercept(self, handler, request_context):
        """
        拦截tRPC请求的核心方法。
        
        Args:
            handler: 原始的RPC处理函数
            request_context: 包含请求元数据等的上下文对象
        """
        
        # 1. 从tRPC元数据中提取上游传递的上下文
        # request_context.trans_info 假设是存储元数据的地方
        metadata = request_context.trans_info or {}
        
        # 使用OTel的API从metadata中提取context
        parent_context = extract(carrier=metadata, getter=getter)

        # 2. 基于提取的上下文创建新的SERVER Span
        span_name = f"{request_context.rpc_name} receive"
        
        with self.tracer.start_as_current_span(
            span_name,
            context=parent_context,
            kind=SpanKind.SERVER
        ) as span:
            
            # 丰富span的属性
            span.set_attribute("rpc.system", "trpc")
            span.set_attribute("rpc.service", request_context.service_name)
            span.set_attribute("rpc.method", request_context.method_name)
            
            # 在一个常见的错误是,忘记检查span是否有效
            if not span.is_recording():
                # 如果因为采样等原因span未被记录,直接调用业务逻辑以减少开销
                return handler(request_context)

            try:
                # 3. 执行真正的业务逻辑
                result = handler(request_context)
                span.set_status(StatusCode.OK)
                return result
            except Exception as e:
                # 4. 如果业务逻辑异常,记录异常并设置span状态
                span.record_exception(e)
                span.set_status(StatusCode.ERROR, str(e))
                raise e

这个拦截器的逻辑与客户端的正好相反。它首先使用extract方法,通过我们定义的TrpcMetadataGetter从请求的元数据中解析出traceparent等信息,构建出父Context。然后,它创建一个SERVER类型的Span,并确保这个新Span的父级是来自Kotlin服务的那个Span。这样,调用链就完美地连接起来了。

验证成果

当所有组件都部署并运行后,我们发起一个请求到QueryGateway

  1. Kotlin服务收到请求,创建一个Trace,其根Span为A
  2. 在调用Python服务之前,OtelClientInterceptor创建了一个子Span B (kind=CLIENT),并将trace_idspan_id(B)注入到tRPC元数据中。
  3. Python服务收到tRPC请求,OtelServerInterceptor从元数据中提取上下文,创建了另一个子Span C (kind=SERVER)。此时,span C的父Span正是span B
  4. 两个服务都将各自的Span数据异步发送到OTel Collector。
  5. Collector处理后,通过aliyun_sls导出器发送到阿里云。

登录阿里云链路追踪(ARMS)控制台,我们可以看到一条完整的调用链。火焰图清晰地展示了请求从query-gateway-kotlin服务流向data-processor-python服务,每个阶段的耗时一目了然。点击任何一个Span,都可以看到详细的属性,如RPC方法名、服务名等。当DataProcessor服务中出现错误时,对应的Span会标记为红色,并且异常信息会被记录下来。曾经需要数小时才能定位的问题,现在几分钟内就能找到根源。

当前方案的局限性与未来迭代路径

这套方案解决了我们最紧迫的跨语言追踪问题,但在真实生产环境中,它并非终点。

首先,元数据开销问题。我们在每个tRPC请求中都添加了traceparenttracestate元数据。对于性能极其敏感的服务,这种微小的字符串处理开销在海量请求下可能会累积。未来的优化可以考虑在tRPC协议层面支持二进制格式的上下文传播,或者实施更精细的尾部采样策略,只对感兴趣的(如错误的或慢的)请求进行完整追踪,减少正常请求的开销。

其次,上下文传播的健壮性。目前的实现依赖于tRPC框架的拦截器机制。如果服务内部存在复杂的异步处理逻辑(例如Python的asyncio或Kotlin的协程),必须确保Context在不同的线程或协程之间能够被正确传递。这通常需要使用语言或框架特定的库(如opentelemetry-instrumentation-asyncio)来辅助,否则追踪链会在异步边界处断裂。

再者,可观测性的广度不足。我们目前只解决了Tracing。一个完整的可观测性平台还应包括Metrics和Logs。下一步的迭代计划是将trace_idspan_id自动注入到所有结构化日志中,实现日志与链路的关联。同时,可以从Span数据中聚合出关键业务指标(如请求延迟的P99分位数、错误率),并将其导出到阿里云监控(CMS)或Prometheus,建立起更全面的监控告警体系。

最后,维护成本。手动编写和维护拦截器意味着当tRPC框架或OTel SDK升级时,我们可能需要同步更新代码。理想的未来是,tRPC社区能够提供官方的OpenTelemetry Instrumentation库,将这些集成工作标准化,从而让业务开发者可以完全专注于业务逻辑本身。


  目录