团队接手了一个混杂技术栈的系统。核心数据处理模块是一个稳定运行多年的Python服务,基于Flask框架,我们称之为DataProcessor
。最近,为了应对高并发查询场景,我们引入了一个新的Kotlin服务,QueryGateway
,它使用Ktor框架构建。两个服务间的通信选型是个棘手的问题,由于历史原因,公司内部服务治理体系与一个C++构建的tRPC网关深度绑定,所有新服务都要求通过tRPC协议进行内部通信。
最初的集成很顺利,直到第一次线上故障。一个请求通过QueryGateway
,调用了DataProcessor
,最终在Python服务深处抛出了一个异常。排查过程极为痛苦,我们只能在QueryGateway
的日志里找到请求入口,然后在DataProcessor
成千上万的日志条目中根据时间戳和参数去猜测哪一条是对应的。两个服务像是独立的黑盒,一次完整的业务调用链被tRPC硬生生切断,可观测性几乎为零。
这个痛点促使我们必须建立一套跨语言、跨服务的统一可观测性链路。目标很明确:任何一个请求,无论它经过多少个服务,都必须拥有一个唯一的trace_id
,并且能够将整个调用过程串联成一个完整的火焰图。
构想与技术选型决策
初步构想是利用分布式追踪系统。行业标准无疑是OpenTelemetry (OTel),它的好处在于厂商中立和广泛的社区支持,提供了统一的API、SDK和数据协议(OTLP)。即使我们目前部署在阿里云上,使用OTel也能避免被单一云厂商的探针或SDK锁定。
我们的技术栈是:
- 云平台: 阿里云。因此,最终的追踪数据和日志需要无缝对接到阿里云可观测性产品,如链路追踪(ARMS)和日志服务(SLS)。
- 服务: Kotlin (Ktor) 和 Python (Flask)。OTel对这两种语言都有一流的支持。
- 通信: tRPC。这是真正的挑战所在。
tRPC是一个高性能的RPC框架,但它的生态不像gRPC或REST那样成熟,尤其是在与OpenTelemetry集成的自动化配置方面。OTel的自动插桩库通常能很好地支持主流的HTTP客户端和gRPC框架,但对于tRPC,我们找不到现成的解决方案。这意味着我们必须手动实现上下文的跨进程传播。
上下文传播是分布式追踪的核心。当服务A调用服务B时,服务A必须将当前的追踪上下文(主要是trace_id
和parent_span_id
)注入到请求中。服务B则需要从请求中提取这个上下文,并基于它创建新的子Span。这样,调用链才得以延续。
OTel为此定义了W3C Trace Context
标准,通常通过HTTP Header(如traceparent
)来传递。在tRPC中,没有HTTP Header的概念,但它提供了trans_info
或类似的元数据(metadata)机制,允许在请求中附加自定义的键值对。我们的核心任务,就是编写tRPC的客户端拦截器(ClientInterceptor)和服务器拦截器(ServerInterceptor),利用这个元数据机制来手动注入和提取W3C追踪上下文。
最终的架构设计如下:
sequenceDiagram participant Client as 外部请求 participant KotlinSvc as QueryGateway (Kotlin) participant PythonSvc as DataProcessor (Python) participant Collector as OTel Collector participant AlibabaCloud as 阿里云 (ARMS/SLS) Client->>+KotlinSvc: 发起API请求 Note over KotlinSvc: 启动新的Trace, 生成TraceContext KotlinSvc->>+KotlinSvc: [tRPC Client Interceptor]
将TraceContext注入tRPC元数据 KotlinSvc->>+PythonSvc: 发起tRPC调用 Note over PythonSvc: [tRPC Server Interceptor]
从tRPC元数据中提取TraceContext PythonSvc->>+PythonSvc: 处理业务逻辑, 创建子Span PythonSvc-->>-KotlinSvc: tRPC响应 KotlinSvc-->>-Client: API响应 par KotlinSvc-->>Collector: 异步导出Spans (OTLP) and PythonSvc-->>Collector: 异步导出Spans (OTLP) end Collector-->>AlibabaCloud: 批量导出到ARMS/SLS
步骤化实现:构建跨语言tRPC追踪链路
第一步:环境准备与OpenTelemetry Collector配置
在真实项目中,我们会将OTel Collector作为DaemonSet或Sidecar部署在Kubernetes集群中。为了演示,我们使用一个简单的docker-compose.yml
。
# docker-compose.yml
version: '3.8'
services:
# OTel Collector 负责接收、处理并导出遥测数据
otel-collector:
image: otel/opentelemetry-collector-contrib:0.87.0
container_name: otel-collector
command: ["--config=/etc/otelcol-contrib/config.yaml"]
volumes:
- ./otel-collector-config.yaml:/etc/otelcol-contrib/config.yaml
ports:
- "4317:4317" # OTLP gRPC receiver
- "4318:4318" # OTLP HTTP receiver
networks:
- observability-net
# Kotlin服务 (QueryGateway)
# ... 此处省略具体服务定义
# Python服务 (DataProcessor)
# ... 此处省略具体服务定义
networks:
observability-net:
driver: bridge
关键在于otel-collector-config.yaml
的配置。我们将接收OTLP协议的数据,然后通过阿里云官方提供的aliyun_sls
导出器将数据发送到指定的日志服务(SLS)项目。
# otel-collector-config.yaml
receivers:
otlp:
protocols:
grpc:
http:
processors:
batch:
exporters:
# 配置阿里云SLS导出器
# 这是将数据送往阿里云可观测性平台的核心
aliyun_sls:
# 阿里云地域接入点, 例如: cn-hangzhou.log.aliyuncs.com
endpoint: "${ALIYUN_SLS_ENDPOINT}"
# 阿里云访问凭证, 生产环境建议使用RAM角色或STS临时凭证
access_key_id: "${ALIYUN_ACCESS_KEY_ID}"
access_key_secret: "${ALIYUN_ACCESS_KEY_SECRET}"
# Traces数据对应的SLS配置
traces:
project: "your-sls-project-name" # SLS项目名
logstore: "your-traces-logstore-name" # 用于存储Trace数据的Logstore
# 可选: 自定义字段映射等
# Logs数据对应的SLS配置
logs:
project: "your-sls-project-name"
logstore: "your-logs-logstore-name"
service:
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [aliyun_sls]
logs:
receivers: [otlp]
processors: [batch]
exporters: [aliyun_sls]
这里的环境变量ALIYUN_SLS_ENDPOINT
等需要在启动容器时注入。这是连接我们自建可观测性管道和云厂商后台的桥梁。
第二步:Kotlin服务(调用方)的tRPC客户端拦截器
在QueryGateway
服务中,我们需要配置OTel SDK,并实现一个ClientInterceptor
。
首先,在build.gradle.kts
中添加依赖:
// build.gradle.kts
dependencies {
// OpenTelemetry API & SDK
implementation("io.opentelemetry:opentelemetry-api:1.31.0")
implementation("io.opentelemetry:opentelemetry-sdk:1.31.0")
// OTLP Exporter
implementation("io.opentelemetry:opentelemetry-exporter-otlp:1.31.0")
// SDK自动配置扩展
implementation("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure:1.31.0")
// tRPC Kotlin依赖
// implementation("com.tencent.trpc:trpc-kotlin-spring-boot-starter:...")
}
接下来,配置OpenTelemetry的全局实例。在生产级的Spring Boot应用中,这通常通过一个@Configuration
类来完成。
// src/main/kotlin/com/example/observability/OtelConfiguration.kt
package com.example.observability
import io.opentelemetry.api.OpenTelemetry
import io.opentelemetry.api.common.Attributes
import io.opentelemetry.sdk.OpenTelemetrySdk
import io.opentelemetry.sdk.resources.Resource
import io.opentelemetry.sdk.trace.SdkTracerProvider
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import java.util.concurrent.TimeUnit
@Configuration
class OtelConfiguration {
@Bean
fun openTelemetry(): OpenTelemetry {
val resource = Resource.getDefault().merge(
Resource.create(Attributes.of(ResourceAttributes.SERVICE_NAME, "query-gateway-kotlin"))
)
val spanExporter = OtlpGrpcSpanExporter.builder()
.setEndpoint("http://otel-collector:4317") // 指向OTel Collector
.setTimeout(2, TimeUnit.SECONDS)
.build()
val spanProcessor = BatchSpanProcessor.builder(spanExporter)
.setScheduleDelay(100, TimeUnit.MILLISECONDS)
.build()
val tracerProvider = SdkTracerProvider.builder()
.addSpanProcessor(spanProcessor)
.setResource(resource)
.build()
return OpenTelemetrySdk.builder()
.setTracerProvider(tracerProvider)
.buildAndRegisterGlobal() // 注册为全局实例,方便各处调用
}
}
最核心的部分是tRPC的ClientInterceptor
。我们需要创建一个类,实现tRPC框架提供的拦截器接口。
// src/main/kotlin/com/example/trpc/OtelClientInterceptor.kt
package com.example.trpc
import com.tencent.trpc.core.extension.Extension
import com.tencent.trpc.core.filter.spi.Filter
import com.tencent.trpc.core.rpc.Invoker
import com.tencent.trpc.core.rpc.Request
import com.tencent.trpc.core.rpc.Response
import io.opentelemetry.api.OpenTelemetry
import io.opentelemetry.api.trace.SpanKind
import io.opentelemetry.api.trace.StatusCode
import io.opentelemetry.context.Context
import io.opentelemetry.context.propagation.TextMapSetter
import java.util.concurrent.CompletionStage
// 假设tRPC的元数据是一个MutableMap<String, Any>
// 这里的TextMapSetter是关键,它定义了如何将Context写入载体(carrier)
private val setter = TextMapSetter<MutableMap<String, Any>> { carrier, key, value ->
// W3C Trace Context规范要求header key是小写的
carrier?.set(key.lowercase(), value)
}
@Extension(name = "otelClientInterceptor")
class OtelClientInterceptor(private val openTelemetry: OpenTelemetry) : Filter {
override fun filter(invoker: Invoker<*>, request: Request): CompletionStage<Response> {
val tracer = openTelemetry.getTracer("trpc-kotlin-client")
val parentContext = Context.current()
// 1. 创建一个新的Span, 类型为CLIENT
val span = tracer.spanBuilder("${request.rpcMethodName} send")
.setParent(parentContext)
.setSpanKind(SpanKind.CLIENT)
.startSpan()
// 2. 将Span放入当前上下文,并注入到tRPC请求的元数据中
try {
span.makeCurrent().use { scope ->
// 获取trans_info,如果不存在则创建
// tRPC框架的具体API可能不同,这里是示意
val attachments = request.attachments ?: mutableMapOf()
if (request.attachments == null) {
request.attachments = attachments
}
// 这是魔法发生的地方: 将当前Trace Context注入到attachments中
openTelemetry.propagators.textMapPropagator.inject(
Context.current(),
attachments,
setter
)
// 设置span的属性,对于RPC调用非常重要
span.setAttribute("rpc.system", "trpc")
span.setAttribute("rpc.service", request.rpcServiceName)
span.setAttribute("rpc.method", request.rpcMethodName)
// 3. 执行真正的RPC调用
return invoker.invoke(request).whenComplete { response, throwable ->
// 4. 在调用完成后,根据结果结束Span
if (throwable != null) {
span.setStatus(StatusCode.ERROR, throwable.message)
span.recordException(throwable)
} else {
if (response.exception != null) {
span.setStatus(StatusCode.ERROR, response.exception.message)
span.recordException(response.exception)
} else {
span.setStatus(StatusCode.OK)
}
}
span.end()
}
}
} catch (e: Exception) {
// 确保即使在注入或调用前发生异常,span也能被正确关闭
span.setStatus(StatusCode.ERROR, "Failed to send tRPC request")
span.recordException(e)
span.end()
throw e
}
}
}
这个拦截器会在每次tRPC调用前执行。它创建了一个CLIENT
类型的Span,然后使用openTelemetry.propagators.textMapPropagator
将追踪上下文(如traceparent
头)序列化并存入tRPC请求的attachments
中。调用结束后,无论成功失败,它都会正确地关闭Span。
第三步:Python服务(被调方)的tRPC服务器拦截器
现在轮到DataProcessor
服务了。我们需要做与Kotlin服务相反的操作:提取追踪上下文,并基于它创建一个SERVER
类型的Span。
首先是Python环境的依赖:
# requirements.txt
flask
opentelemetry-api
opentelemetry-sdk
opentelemetry-exporter-otlp
# tRPC Python依赖
# trpc-py
接着,在Python应用启动时配置OTel SDK。
# app/observability.py
import os
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource, SERVICE_NAME
def configure_tracer():
"""配置全局的TracerProvider"""
# 这里的endpoint指向OTel Collector
otel_collector_endpoint = os.getenv("OTEL_COLLECTOR_ENDPOINT", "otel-collector:4317")
resource = Resource(attributes={
SERVICE_NAME: "data-processor-python"
})
provider = TracerProvider(resource=resource)
processor = BatchSpanProcessor(OTLPSpanExporter(endpoint=otel_collector_endpoint, insecure=True))
provider.add_span_processor(processor)
# 设置为全局的TracerProvider
trace.set_tracer_provider(provider)
然后是核心的tRPC ServerInterceptor
。我们需要一个TextMapGetter
来告诉OTel如何从tRPC的元数据中读取上下文。
# app/trpc_interceptor.py
from opentelemetry import trace
from opentelemetry.propagate import extract
from opentelemetry.context import attach, detach
from opentelemetry.trace.propagation import get_current_span
from opentelemetry.trace import SpanKind, StatusCode
from opentelemetry.context.context import Context
# TextMapGetter定义了如何从载体(carrier)中读取值
# tRPC元数据在Python中通常表现为字典
class TrpcMetadataGetter:
def get(self, carrier: dict, key: str) -> list[str] | None:
return carrier.get(key, None)
def keys(self, carrier: dict) -> list[str]:
return list(carrier.keys())
# 全局实例化getter
getter = TrpcMetadataGetter()
class OtelServerInterceptor:
"""
一个tRPC服务器拦截器,用于处理OpenTelemetry的上下文传播。
在真实项目中,此类需要继承tRPC Python框架提供的拦截器基类。
"""
def __init__(self):
self.tracer = trace.get_tracer(__name__)
def intercept(self, handler, request_context):
"""
拦截tRPC请求的核心方法。
Args:
handler: 原始的RPC处理函数
request_context: 包含请求元数据等的上下文对象
"""
# 1. 从tRPC元数据中提取上游传递的上下文
# request_context.trans_info 假设是存储元数据的地方
metadata = request_context.trans_info or {}
# 使用OTel的API从metadata中提取context
parent_context = extract(carrier=metadata, getter=getter)
# 2. 基于提取的上下文创建新的SERVER Span
span_name = f"{request_context.rpc_name} receive"
with self.tracer.start_as_current_span(
span_name,
context=parent_context,
kind=SpanKind.SERVER
) as span:
# 丰富span的属性
span.set_attribute("rpc.system", "trpc")
span.set_attribute("rpc.service", request_context.service_name)
span.set_attribute("rpc.method", request_context.method_name)
# 在一个常见的错误是,忘记检查span是否有效
if not span.is_recording():
# 如果因为采样等原因span未被记录,直接调用业务逻辑以减少开销
return handler(request_context)
try:
# 3. 执行真正的业务逻辑
result = handler(request_context)
span.set_status(StatusCode.OK)
return result
except Exception as e:
# 4. 如果业务逻辑异常,记录异常并设置span状态
span.record_exception(e)
span.set_status(StatusCode.ERROR, str(e))
raise e
这个拦截器的逻辑与客户端的正好相反。它首先使用extract
方法,通过我们定义的TrpcMetadataGetter
从请求的元数据中解析出traceparent
等信息,构建出父Context
。然后,它创建一个SERVER
类型的Span,并确保这个新Span的父级是来自Kotlin服务的那个Span。这样,调用链就完美地连接起来了。
验证成果
当所有组件都部署并运行后,我们发起一个请求到QueryGateway
。
- Kotlin服务收到请求,创建一个Trace,其根Span为
A
。 - 在调用Python服务之前,
OtelClientInterceptor
创建了一个子SpanB
(kind=CLIENT),并将trace_id
和span_id(B)
注入到tRPC元数据中。 - Python服务收到tRPC请求,
OtelServerInterceptor
从元数据中提取上下文,创建了另一个子SpanC
(kind=SERVER)。此时,span C
的父Span正是span B
。 - 两个服务都将各自的Span数据异步发送到OTel Collector。
- Collector处理后,通过
aliyun_sls
导出器发送到阿里云。
登录阿里云链路追踪(ARMS)控制台,我们可以看到一条完整的调用链。火焰图清晰地展示了请求从query-gateway-kotlin
服务流向data-processor-python
服务,每个阶段的耗时一目了然。点击任何一个Span,都可以看到详细的属性,如RPC方法名、服务名等。当DataProcessor
服务中出现错误时,对应的Span会标记为红色,并且异常信息会被记录下来。曾经需要数小时才能定位的问题,现在几分钟内就能找到根源。
当前方案的局限性与未来迭代路径
这套方案解决了我们最紧迫的跨语言追踪问题,但在真实生产环境中,它并非终点。
首先,元数据开销问题。我们在每个tRPC请求中都添加了traceparent
和tracestate
元数据。对于性能极其敏感的服务,这种微小的字符串处理开销在海量请求下可能会累积。未来的优化可以考虑在tRPC协议层面支持二进制格式的上下文传播,或者实施更精细的尾部采样策略,只对感兴趣的(如错误的或慢的)请求进行完整追踪,减少正常请求的开销。
其次,上下文传播的健壮性。目前的实现依赖于tRPC框架的拦截器机制。如果服务内部存在复杂的异步处理逻辑(例如Python的asyncio
或Kotlin的协程),必须确保Context
在不同的线程或协程之间能够被正确传递。这通常需要使用语言或框架特定的库(如opentelemetry-instrumentation-asyncio
)来辅助,否则追踪链会在异步边界处断裂。
再者,可观测性的广度不足。我们目前只解决了Tracing。一个完整的可观测性平台还应包括Metrics和Logs。下一步的迭代计划是将trace_id
和span_id
自动注入到所有结构化日志中,实现日志与链路的关联。同时,可以从Span数据中聚合出关键业务指标(如请求延迟的P99分位数、错误率),并将其导出到阿里云监控(CMS)或Prometheus,建立起更全面的监控告警体系。
最后,维护成本。手动编写和维护拦截器意味着当tRPC框架或OTel SDK升级时,我们可能需要同步更新代码。理想的未来是,tRPC社区能够提供官方的OpenTelemetry Instrumentation库,将这些集成工作标准化,从而让业务开发者可以完全专注于业务逻辑本身。