调试线上问题时,最先求助的总是日志。但随着系统变得复杂,传统的基于关键词的日志搜索,比如在 Kibana 或 Loki 里用 level:error AND "transaction failed"
这样的查询,越来越像是在大海捞针。最近一次排障经历彻底暴露了它的局限性:一个上游服务的偶发性超时,引发了下游多个服务的连锁反应,日志里充满了各种不同的错误信息——"downstream service timeout"
, "context deadline exceeded"
, "failed to process message due to upstream unavailability"
。它们描述的是同一个根本原因,但因为缺乏统一的错误码或关键词,我花了数小时才将这些散落的线索拼凑起来。
那一刻,我意识到我们需要一种能理解“语义”的日志查询方式。我想要的不是搜索精确的字符串,而是搜索“意图”。比如,当我搜索“支付失败导致的用户登录异常”时,系统应该能聪明地找出上述所有相关的日志,即使日志文本里根本没有“支付”或“登录”这些词。这自然地引向了向量搜索和嵌入模型。
这个想法催生了一个内部项目:构建一个语义化的日志分析管道。目标很明确:将现有的日志流通过一个处理管道,转换成向量,存入一个专门的数据库,并最终在我们已经广泛使用的 Grafana 中提供自然语言查询入口。
架构构想与技术选型
初步的架构草图很简单:应用日志通过现有的日志收集管道(如 Fluentd)汇入 Azure Event Hubs,一个 Azure Function 负责消费这些日志,将其转换并写入 Weaviate 向量数据库,最后通过一个定制的 Grafana 数据源插件来提供查询界面。
graph TD subgraph "应用集群" A[App Service 1] --> B{Log Collector}; C[App Service 2] --> B; end subgraph "日志管道" B --> D[Azure Event Hubs]; end subgraph "处理与存储" D -- Trigger --> E[Azure Function: Log Processor]; E -- Generate Embeddings & Write --> F[Weaviate Cluster]; end subgraph "查询与可视化" G[Grafana] --> H{Custom Weaviate Datasource}; H -- GraphQL Query --> F; end
1. Azure Functions: 选用它的理由非常务实。首先,日志流量通常是突发性的,使用 Serverless 的消费模型(Consumption Plan)能完美匹配这种负载,只在有日志需要处理时付费,成本效益极高。其次,它与 Azure 生态(特别是 Event Hubs)的集成是开箱即用的,能省去大量基础设置的麻烦。在真实项目中,运维成本和开发效率是关键考量。
2. Weaviate: 向量数据库的选择很多,但我最终选择了 Weaviate。关键在于它的 text2vec-*
模块。它能内置一个嵌入模型(例如,通过 text2vec-huggingface
模块),这意味着我的 Azure Function 无需自己去调用 OpenAI 或其他 embedding API。数据写入时,Weaviate 会自动将指定的文本字段向量化。这极大地简化了处理逻辑,降低了架构的复杂性和潜在的故障点。同时,它的 GraphQL API 设计精良,支持混合搜索(向量搜索 + 标量过滤),这对于日志场景至关重要——我们总是需要按时间范围、服务名或日志级别进行过滤。
3. Grafana: 团队已经深度使用 Grafana 进行指标监控和告警。引入一个全新的查询前端会增加工程师的学习成本和心智负担。因此,最优解是扩展 Grafana 的能力,让它直接支持对 Weaviate 的查询。这意味着需要开发一个自定义的数据源插件。虽然有额外的工作量,但为了维护统一的可观测性平台,这个投入是值得的。
核心实现:日志处理 Function
Azure Function 的核心职责是:从 Event Hubs 批量拉取日志,进行清洗和结构化,然后批量写入 Weaviate。这里的关键是性能和容错。
local.settings.json
文件需要包含 Weaviate 的连接信息和 Event Hub 的连接字符串。
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"EventHubConnectionString": "Endpoint=sb://your-namespace.servicebus.windows.net/;...",
"EventHubName": "logs",
"WeaviateEndpoint": "http://your-weaviate-instance:8080",
"WeaviateApiKey": "YOUR_API_KEY_IF_NEEDED"
}
}
Function 的代码使用 C# 和 .NET 隔离工作进程模型编写。
// WeaviateLogIngestionFunction.cs
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
using Azure.Messaging.EventHubs;
using System.Text;
using System.Text.Json;
using WeaviateNET;
using WeaviateNET.Query;
public class LogEntry
{
public Guid Id { get; set; }
public string ServiceName { get; set; }
public string Level { get; set; }
public string Message { get; set; }
public DateTime Timestamp { get; set; }
public Dictionary<string, string> Metadata { get; set; }
}
public class WeaviateLogIngestionFunction
{
private readonly WeaviateDB _weaviateDb;
private readonly ILogger<WeaviateLogIngestionFunction> _logger;
private static readonly JsonSerializerOptions _jsonOptions = new JsonSerializerOptions { PropertyNameCaseInsensitive = true };
public WeaviateLogIngestionFunction(ILogger<WeaviateLogIngestionFunction> logger)
{
_logger = logger;
var endpoint = Environment.GetEnvironmentVariable("WeaviateEndpoint") ?? throw new InvalidOperationException("WeaviateEndpoint not set.");
// 在生产环境中,配置应更健壮,例如通过 IOptions<T> 注入
var config = new WeaviateConfig(endpoint);
_weaviateDb = new WeaviateDB(config);
}
[Function(nameof(WeaviateLogIngestionFunction))]
public async Task Run(
[EventHubTrigger("%EventHubName%", Connection = "EventHubConnectionString", ConsumerGroup = "$Default")] EventData[] events)
{
if (events == null || events.Length == 0)
{
return;
}
_logger.LogInformation("Processing a batch of {count} events.", events.Length);
var weaviateObjects = new List<WeaviateObject<LogEntry>>();
foreach (var eventData in events)
{
try
{
var jsonBody = Encoding.UTF8.GetString(eventData.EventBody.ToArray());
var log = JsonSerializer.Deserialize<LogEntry>(jsonBody, _jsonOptions);
if (log == null || string.IsNullOrWhiteSpace(log.Message))
{
_logger.LogWarning("Skipping invalid or empty log message.");
continue;
}
// Weaviate 会自动生成 UUID,但为了可追溯,我们也可以自己指定
var weaviateObj = WeaviateObject.Create(log, WeaviateClass.Create<LogEntry>());
weaviateObj.id = Guid.NewGuid().ToString();
weaviateObjects.Add(weaviateObj);
}
catch (JsonException jsonEx)
{
_logger.LogError(jsonEx, "Failed to deserialize log event. Body: {body}", Encoding.UTF8.GetString(eventData.EventBody.ToArray()));
// 考虑将坏消息推送到死信队列
}
catch (Exception ex)
{
_logger.LogError(ex, "An unexpected error occurred while processing an event.");
}
}
if (weaviateObjects.Count > 0)
{
try
{
// 关键点:使用批量写入以获得最佳性能
var batchResponse = await _weaviateDb.Schema.BatchCreateObjects(weaviateObjects);
// 在真实项目中,必须仔细检查 batchResponse 中的错误
if (batchResponse.Any(r => r.result.errors != null))
{
var failedCount = batchResponse.Count(r => r.result.errors != null);
_logger.LogError("Batch insertion to Weaviate completed with {failedCount} failures out of {totalCount}.", failedCount, weaviateObjects.Count);
// 此处应添加更详细的错误日志记录逻辑
}
else
{
_logger.LogInformation("Successfully inserted {count} log entries into Weaviate.", weaviateObjects.Count);
}
}
catch (Exception ex)
{
_logger.LogCritical(ex, "Failed to execute batch insertion to Weaviate. All {count} logs in this batch may be lost.", weaviateObjects.Count);
// 抛出异常,让 Function App 根据配置进行重试
throw;
}
}
}
}
这里的坑在于:
- 批量处理: 绝对不能逐条写入 Weaviate,网络开销会非常大。Azure Function 的 Event Hubs 触发器天生就是批量的,我们必须利用这一点,组装一个对象列表后调用
BatchCreateObjects
。 - 错误处理: 在一个批次中,部分日志可能格式错误,部分可能因为 Weaviate 的瞬时问题写入失败。代码必须能处理部分失败,记录下有问题的日志,同时确保成功的日志被正确写入,而不是因为单条日志的失败导致整个批次重试。一个健壮的实现会把处理失败的消息发送到死信队列,供后续分析。
- 配置与依赖注入: 生产级的 Function 代码应当使用依赖注入来管理
WeaviateDB
客户端的生命周期,配置信息也应该通过IOptions
模式注入,而不是直接读环境变量。上面的示例为了简化,直接在构造函数中初始化。
Weaviate 的 Schema 设计
Weaviate 的 Schema 定义了数据如何存储、索引和向量化。这是决定我们语义搜索效果好坏的核心。
{
"class": "LogEntry",
"description": "Represents a single log entry from an application",
"vectorizer": "text2vec-huggingface",
"moduleConfig": {
"text2vec-huggingface": {
"model": "sentence-transformers/all-MiniLM-L6-v2",
"options": {
"waitForModel": true
}
}
},
"properties": [
{
"name": "message",
"dataType": ["text"],
"description": "The log message content to be vectorized."
},
{
"name": "serviceName",
"dataType": ["string"],
"description": "Name of the service that generated the log.",
"tokenization": "keyword"
},
{
"name": "level",
"dataType": ["string"],
"description": "Log level (e.g., INFO, WARN, ERROR).",
"tokenization": "field"
},
{
"name": "timestamp",
"dataType": ["date"],
"description": "Timestamp of the log entry."
},
{
"name": "metadata",
"dataType": ["object"],
"description": "Additional key-value metadata.",
"nestedProperties": [
{
"name": "key",
"dataType": ["string"]
},
{
"name": "value",
"dataType": ["string"]
}
]
}
],
"vectorIndexConfig": {
"skip": false,
"efConstruction": 128,
"maxConnections": 16,
"ef": -1,
"hnsw": {
"cleanupIntervalSeconds": 300
}
}
}
设计考量:
- Vectorizer:
text2vec-huggingface
和sentence-transformers/all-MiniLM-L6-v2
模型的组合是一个性能和效果均衡的选择。它在本地运行,避免了 API 调用的延迟和成本。 - Properties: 只有
message
字段需要被向量化。serviceName
和level
等字段被定义为string
类型,并设置了tokenization
方式,这让它们可以用于高效的元数据过滤(where
子句)。timestamp
必须是date
类型,以便进行时间范围过滤。 - Vector Index Config:
HNSW
索引的参数efConstruction
和maxConnections
对构建索引时的性能和查询时的召回率有直接影响。这些值需要根据实际的数据量和查询需求进行调优。一个常见的错误是忽略这些参数,使用默认值,这可能导致在数据量大时查询性能下降。
定制 Grafana 数据源
这是整个方案中最具挑战性但也是价值最高的一环。我们需要创建一个 Grafana 数据源插件,前端负责提供查询输入框,后端负责将用户的自然语言查询转换为 Weaviate 的 GraphQL nearText
查询。
Grafana 插件的后端部分通常用 Go 编写。核心逻辑在 QueryData
方法中。
// pkg/weaviate/weaviate.go
package weaviate
import (
"context"
"encoding/json"
"fmt"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"net/http"
"bytes"
"io/ioutil"
"time"
)
// ... 其他结构体定义 ...
type QueryModel struct {
QueryText string `json:"queryText"` // 用户输入的自然语言查询
}
func (d *Datasource) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
response := backend.NewQueryDataResponse()
for _, q := range req.Queries {
res := d.query(ctx, req.PluginContext, q)
response.Responses[q.RefID] = res
}
return response, nil
}
func (d *Datasource) query(ctx context.Context, pCtx backend.PluginContext, query backend.DataQuery) backend.DataResponse {
var qm QueryModel
err := json.Unmarshal(query.JSON, &qm)
if err != nil {
return backend.ErrDataResponse(backend.StatusBadRequest, fmt.Sprintf("json unmarshal error: %v", err))
}
if qm.QueryText == "" {
// 返回空 Frame,避免前端报错
return backend.DataResponse{
Frames: []*data.Frame{
data.NewFrame("response"),
},
}
}
// 核心:构建 GraphQL 查询
graphqlQuery := fmt.Sprintf(`{
Get {
LogEntry(
nearText: {
concepts: ["%s"]
},
where: {
operator: And,
operands: [
{ path: ["timestamp"], operator: GreaterThanEqual, valueDate: "%s" },
{ path: ["timestamp"], operator: LessThanEqual, valueDate: "%s" }
]
},
limit: 1000,
sort: { path: ["timestamp"], order: desc }
) {
message
level
serviceName
timestamp
_additional {
distance
}
}
}
}`, escapeString(qm.QueryText), query.TimeRange.From.Format(time.RFC3339), query.TimeRange.To.Format(time.RFC3339))
// 向 Weaviate 发送请求
// ... (此处省略了 httpClient 的创建和配置代码) ...
reqBody, _ := json.Marshal(map[string]string{"query": graphqlQuery})
httpReq, _ := http.NewRequestWithContext(ctx, "POST", d.settings.Endpoint+"/v1/graphql", bytes.NewBuffer(reqBody))
// ... (设置 Headers, API Key 等) ...
httpResp, err := d.httpClient.Do(httpReq)
// ... (处理 httpResp 的错误) ...
defer httpResp.Body.Close()
body, _ := ioutil.ReadAll(httpResp.Body)
// 将 Weaviate 的响应转换为 Grafana 的 DataFrame
frame, err := parseGraphQLResponseToFrame(body)
if err != nil {
return backend.ErrDataResponse(backend.StatusInternalError, fmt.Sprintf("failed to parse response: %v", err))
}
return backend.DataResponse{
Frames: []*data.Frame{frame},
}
}
func parseGraphQLResponseToFrame(body []byte) (*data.Frame, error) {
// ...
// 此处是关键的转换逻辑。需要解析 JSON 响应,
// 创建 data.Frame,并为每一列(timestamp, level, serviceName, message)
// 添加相应类型的 Field。
// 时间戳字段必须是 time.Time 类型,日志内容是 string 类型。
// ...
frame := data.NewFrame("logs")
frame.SetMeta(&data.FrameMeta{PreferredVisualization: data.VisTypeLogs})
// ... 伪代码:解析 body, 循环添加数据 ...
// for _, log := range parsedResponse.Data.Get.LogEntry {
// frame.AppendRow(log.timestamp, log.level, log.serviceName, log.message)
// }
return frame, nil
}
开发这个插件的难点在于:
- GraphQL 查询构建: 需要动态地将 Grafana 的时间范围选择器 (
query.TimeRange
) 转换为 Weaviate GraphQLwhere
子句中的timestamp
过滤条件。 - 数据帧转换: Weaviate 返回的是 JSON,Grafana 的 Logs Panel 需要一个特定结构的
data.Frame
对象。时间戳、日志级别、内容等字段必须被正确地映射到data.Frame
的列中,并且字段类型要正确(特别是时间字段)。frame.SetMeta
这行很关键,它告诉 Grafana 这个数据帧最适合用日志面板来展示。 - 前端交互: 插件的前端部分(使用 React 和 TypeScript)需要提供一个简洁的输入框,并将用户的输入作为
queryText
发送到后端。这部分相对直接,但需要熟悉 Grafana 的插件开发工具链。
最终,我们在 Grafana 中得到了一个全新的 Dashboard Panel。它只有一个简单的文本输入框,工程师可以在里面输入像 “database connection pool exhausted after new version rollout” 这样的自然语言,面板下方就会实时地展示出与这个语义最相关的日志条目,按时间排序,并用颜色高亮不同的日志级别。这彻底改变了我们排查复杂问题的方式。
局限性与未来展望
这个方案并非银弹。首先,成本是一个重要考量。对海量日志进行向量化和存储,会带来显著的计算和存储开销,远高于传统的文本索引。我们需要实现精细的日志采样或只对高价值(如 ERROR
和 WARN
级别)的日志进行向量化,以控制成本。
其次,搜索质量严重依赖于所选的嵌入模型。通用的模型可能无法很好地理解我们业务领域内的特定术语和缩写。未来的一个迭代方向是,利用我们自己积累的日志数据,微调一个领域专用的嵌入模型,以期获得更高的搜索相关性。
最后,当前的实现是“事后分析”,无法用于实时告警。如何基于日志的“语义漂移”或“异常语义模式”来构建告警系统,是一个更具挑战性但价值也更高的课题。或许可以结合流处理引擎(如 Flink),对日志向量进行实时聚类分析,当出现新的、异常的日志簇时触发告警。这为可观测性的未来打开了新的想象空间。