基于 Azure Functions 与 Weaviate 构建语义化日志分析的可观测性管道


调试线上问题时,最先求助的总是日志。但随着系统变得复杂,传统的基于关键词的日志搜索,比如在 Kibana 或 Loki 里用 level:error AND "transaction failed" 这样的查询,越来越像是在大海捞针。最近一次排障经历彻底暴露了它的局限性:一个上游服务的偶发性超时,引发了下游多个服务的连锁反应,日志里充满了各种不同的错误信息——"downstream service timeout", "context deadline exceeded", "failed to process message due to upstream unavailability"。它们描述的是同一个根本原因,但因为缺乏统一的错误码或关键词,我花了数小时才将这些散落的线索拼凑起来。

那一刻,我意识到我们需要一种能理解“语义”的日志查询方式。我想要的不是搜索精确的字符串,而是搜索“意图”。比如,当我搜索“支付失败导致的用户登录异常”时,系统应该能聪明地找出上述所有相关的日志,即使日志文本里根本没有“支付”或“登录”这些词。这自然地引向了向量搜索和嵌入模型。

这个想法催生了一个内部项目:构建一个语义化的日志分析管道。目标很明确:将现有的日志流通过一个处理管道,转换成向量,存入一个专门的数据库,并最终在我们已经广泛使用的 Grafana 中提供自然语言查询入口。

架构构想与技术选型

初步的架构草图很简单:应用日志通过现有的日志收集管道(如 Fluentd)汇入 Azure Event Hubs,一个 Azure Function 负责消费这些日志,将其转换并写入 Weaviate 向量数据库,最后通过一个定制的 Grafana 数据源插件来提供查询界面。

graph TD
    subgraph "应用集群"
        A[App Service 1] --> B{Log Collector};
        C[App Service 2] --> B;
    end
    subgraph "日志管道"
        B --> D[Azure Event Hubs];
    end
    subgraph "处理与存储"
        D -- Trigger --> E[Azure Function: Log Processor];
        E -- Generate Embeddings & Write --> F[Weaviate Cluster];
    end
    subgraph "查询与可视化"
        G[Grafana] --> H{Custom Weaviate Datasource};
        H -- GraphQL Query --> F;
    end

1. Azure Functions: 选用它的理由非常务实。首先,日志流量通常是突发性的,使用 Serverless 的消费模型(Consumption Plan)能完美匹配这种负载,只在有日志需要处理时付费,成本效益极高。其次,它与 Azure 生态(特别是 Event Hubs)的集成是开箱即用的,能省去大量基础设置的麻烦。在真实项目中,运维成本和开发效率是关键考量。

2. Weaviate: 向量数据库的选择很多,但我最终选择了 Weaviate。关键在于它的 text2vec-* 模块。它能内置一个嵌入模型(例如,通过 text2vec-huggingface 模块),这意味着我的 Azure Function 无需自己去调用 OpenAI 或其他 embedding API。数据写入时,Weaviate 会自动将指定的文本字段向量化。这极大地简化了处理逻辑,降低了架构的复杂性和潜在的故障点。同时,它的 GraphQL API 设计精良,支持混合搜索(向量搜索 + 标量过滤),这对于日志场景至关重要——我们总是需要按时间范围、服务名或日志级别进行过滤。

3. Grafana: 团队已经深度使用 Grafana 进行指标监控和告警。引入一个全新的查询前端会增加工程师的学习成本和心智负担。因此,最优解是扩展 Grafana 的能力,让它直接支持对 Weaviate 的查询。这意味着需要开发一个自定义的数据源插件。虽然有额外的工作量,但为了维护统一的可观测性平台,这个投入是值得的。

核心实现:日志处理 Function

Azure Function 的核心职责是:从 Event Hubs 批量拉取日志,进行清洗和结构化,然后批量写入 Weaviate。这里的关键是性能和容错。

local.settings.json 文件需要包含 Weaviate 的连接信息和 Event Hub 的连接字符串。

{
  "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": "UseDevelopmentStorage=true",
    "FUNCTIONS_WORKER_RUNTIME": "dotnet",
    "EventHubConnectionString": "Endpoint=sb://your-namespace.servicebus.windows.net/;...",
    "EventHubName": "logs",
    "WeaviateEndpoint": "http://your-weaviate-instance:8080",
    "WeaviateApiKey": "YOUR_API_KEY_IF_NEEDED"
  }
}

Function 的代码使用 C# 和 .NET 隔离工作进程模型编写。

// WeaviateLogIngestionFunction.cs

using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
using Azure.Messaging.EventHubs;
using System.Text;
using System.Text.Json;
using WeaviateNET;
using WeaviateNET.Query;

public class LogEntry
{
    public Guid Id { get; set; }
    public string ServiceName { get; set; }
    public string Level { get; set; }
    public string Message { get; set; }
    public DateTime Timestamp { get; set; }
    public Dictionary<string, string> Metadata { get; set; }
}

public class WeaviateLogIngestionFunction
{
    private readonly WeaviateDB _weaviateDb;
    private readonly ILogger<WeaviateLogIngestionFunction> _logger;
    private static readonly JsonSerializerOptions _jsonOptions = new JsonSerializerOptions { PropertyNameCaseInsensitive = true };

    public WeaviateLogIngestionFunction(ILogger<WeaviateLogIngestionFunction> logger)
    {
        _logger = logger;
        var endpoint = Environment.GetEnvironmentVariable("WeaviateEndpoint") ?? throw new InvalidOperationException("WeaviateEndpoint not set.");
        // 在生产环境中,配置应更健壮,例如通过 IOptions<T> 注入
        var config = new WeaviateConfig(endpoint);
        _weaviateDb = new WeaviateDB(config);
    }

    [Function(nameof(WeaviateLogIngestionFunction))]
    public async Task Run(
        [EventHubTrigger("%EventHubName%", Connection = "EventHubConnectionString", ConsumerGroup = "$Default")] EventData[] events)
    {
        if (events == null || events.Length == 0)
        {
            return;
        }

        _logger.LogInformation("Processing a batch of {count} events.", events.Length);

        var weaviateObjects = new List<WeaviateObject<LogEntry>>();

        foreach (var eventData in events)
        {
            try
            {
                var jsonBody = Encoding.UTF8.GetString(eventData.EventBody.ToArray());
                var log = JsonSerializer.Deserialize<LogEntry>(jsonBody, _jsonOptions);

                if (log == null || string.IsNullOrWhiteSpace(log.Message))
                {
                    _logger.LogWarning("Skipping invalid or empty log message.");
                    continue;
                }

                // Weaviate 会自动生成 UUID,但为了可追溯,我们也可以自己指定
                var weaviateObj = WeaviateObject.Create(log, WeaviateClass.Create<LogEntry>());
                weaviateObj.id = Guid.NewGuid().ToString();
                
                weaviateObjects.Add(weaviateObj);
            }
            catch (JsonException jsonEx)
            {
                _logger.LogError(jsonEx, "Failed to deserialize log event. Body: {body}", Encoding.UTF8.GetString(eventData.EventBody.ToArray()));
                // 考虑将坏消息推送到死信队列
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "An unexpected error occurred while processing an event.");
            }
        }

        if (weaviateObjects.Count > 0)
        {
            try
            {
                // 关键点:使用批量写入以获得最佳性能
                var batchResponse = await _weaviateDb.Schema.BatchCreateObjects(weaviateObjects);
                
                // 在真实项目中,必须仔细检查 batchResponse 中的错误
                if (batchResponse.Any(r => r.result.errors != null))
                {
                    var failedCount = batchResponse.Count(r => r.result.errors != null);
                    _logger.LogError("Batch insertion to Weaviate completed with {failedCount} failures out of {totalCount}.", failedCount, weaviateObjects.Count);
                    // 此处应添加更详细的错误日志记录逻辑
                }
                else
                {
                    _logger.LogInformation("Successfully inserted {count} log entries into Weaviate.", weaviateObjects.Count);
                }
            }
            catch (Exception ex)
            {
                _logger.LogCritical(ex, "Failed to execute batch insertion to Weaviate. All {count} logs in this batch may be lost.", weaviateObjects.Count);
                // 抛出异常,让 Function App 根据配置进行重试
                throw;
            }
        }
    }
}

这里的坑在于:

  1. 批量处理: 绝对不能逐条写入 Weaviate,网络开销会非常大。Azure Function 的 Event Hubs 触发器天生就是批量的,我们必须利用这一点,组装一个对象列表后调用 BatchCreateObjects
  2. 错误处理: 在一个批次中,部分日志可能格式错误,部分可能因为 Weaviate 的瞬时问题写入失败。代码必须能处理部分失败,记录下有问题的日志,同时确保成功的日志被正确写入,而不是因为单条日志的失败导致整个批次重试。一个健壮的实现会把处理失败的消息发送到死信队列,供后续分析。
  3. 配置与依赖注入: 生产级的 Function 代码应当使用依赖注入来管理 WeaviateDB 客户端的生命周期,配置信息也应该通过 IOptions 模式注入,而不是直接读环境变量。上面的示例为了简化,直接在构造函数中初始化。

Weaviate 的 Schema 设计

Weaviate 的 Schema 定义了数据如何存储、索引和向量化。这是决定我们语义搜索效果好坏的核心。

{
  "class": "LogEntry",
  "description": "Represents a single log entry from an application",
  "vectorizer": "text2vec-huggingface",
  "moduleConfig": {
    "text2vec-huggingface": {
      "model": "sentence-transformers/all-MiniLM-L6-v2",
      "options": {
        "waitForModel": true
      }
    }
  },
  "properties": [
    {
      "name": "message",
      "dataType": ["text"],
      "description": "The log message content to be vectorized."
    },
    {
      "name": "serviceName",
      "dataType": ["string"],
      "description": "Name of the service that generated the log.",
      "tokenization": "keyword"
    },
    {
      "name": "level",
      "dataType": ["string"],
      "description": "Log level (e.g., INFO, WARN, ERROR).",
      "tokenization": "field"
    },
    {
      "name": "timestamp",
      "dataType": ["date"],
      "description": "Timestamp of the log entry."
    },
    {
      "name": "metadata",
      "dataType": ["object"],
      "description": "Additional key-value metadata.",
      "nestedProperties": [
        {
          "name": "key",
          "dataType": ["string"]
        },
        {
          "name": "value",
          "dataType": ["string"]
        }
      ]
    }
  ],
  "vectorIndexConfig": {
    "skip": false,
    "efConstruction": 128,
    "maxConnections": 16,
    "ef": -1,
    "hnsw": {
      "cleanupIntervalSeconds": 300
    }
  }
}

设计考量:

  • Vectorizer: text2vec-huggingfacesentence-transformers/all-MiniLM-L6-v2 模型的组合是一个性能和效果均衡的选择。它在本地运行,避免了 API 调用的延迟和成本。
  • Properties: 只有 message 字段需要被向量化。serviceNamelevel 等字段被定义为 string 类型,并设置了 tokenization 方式,这让它们可以用于高效的元数据过滤(where子句)。timestamp 必须是 date 类型,以便进行时间范围过滤。
  • Vector Index Config: HNSW 索引的参数 efConstructionmaxConnections 对构建索引时的性能和查询时的召回率有直接影响。这些值需要根据实际的数据量和查询需求进行调优。一个常见的错误是忽略这些参数,使用默认值,这可能导致在数据量大时查询性能下降。

定制 Grafana 数据源

这是整个方案中最具挑战性但也是价值最高的一环。我们需要创建一个 Grafana 数据源插件,前端负责提供查询输入框,后端负责将用户的自然语言查询转换为 Weaviate 的 GraphQL nearText 查询。

Grafana 插件的后端部分通常用 Go 编写。核心逻辑在 QueryData 方法中。

// pkg/weaviate/weaviate.go

package weaviate

import (
	"context"
	"encoding/json"
	"fmt"
	"github.com/grafana/grafana-plugin-sdk-go/backend"
	"github.com/grafana/grafana-plugin-sdk-go/data"
	"net/http"
	"bytes"
	"io/ioutil"
	"time"
)

// ... 其他结构体定义 ...

type QueryModel struct {
	QueryText string `json:"queryText"` // 用户输入的自然语言查询
}

func (d *Datasource) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
	response := backend.NewQueryDataResponse()

	for _, q := range req.Queries {
		res := d.query(ctx, req.PluginContext, q)
		response.Responses[q.RefID] = res
	}

	return response, nil
}

func (d *Datasource) query(ctx context.Context, pCtx backend.PluginContext, query backend.DataQuery) backend.DataResponse {
	var qm QueryModel
	err := json.Unmarshal(query.JSON, &qm)
	if err != nil {
		return backend.ErrDataResponse(backend.StatusBadRequest, fmt.Sprintf("json unmarshal error: %v", err))
	}

	if qm.QueryText == "" {
		// 返回空 Frame,避免前端报错
		return backend.DataResponse{
			Frames: []*data.Frame{
				data.NewFrame("response"),
			},
		}
	}

	// 核心:构建 GraphQL 查询
	graphqlQuery := fmt.Sprintf(`{
		Get {
			LogEntry(
				nearText: {
					concepts: ["%s"]
				},
				where: {
					operator: And,
					operands: [
						{ path: ["timestamp"], operator: GreaterThanEqual, valueDate: "%s" },
						{ path: ["timestamp"], operator: LessThanEqual, valueDate: "%s" }
					]
				},
				limit: 1000,
				sort: { path: ["timestamp"], order: desc }
			) {
				message
				level
				serviceName
				timestamp
				_additional {
					distance
				}
			}
		}
	}`, escapeString(qm.QueryText), query.TimeRange.From.Format(time.RFC3339), query.TimeRange.To.Format(time.RFC3339))

	// 向 Weaviate 发送请求
	// ... (此处省略了 httpClient 的创建和配置代码) ...
	
	reqBody, _ := json.Marshal(map[string]string{"query": graphqlQuery})
	httpReq, _ := http.NewRequestWithContext(ctx, "POST", d.settings.Endpoint+"/v1/graphql", bytes.NewBuffer(reqBody))
	// ... (设置 Headers, API Key 等) ...

	httpResp, err := d.httpClient.Do(httpReq)
	// ... (处理 httpResp 的错误) ...
	
	defer httpResp.Body.Close()
	body, _ := ioutil.ReadAll(httpResp.Body)

	// 将 Weaviate 的响应转换为 Grafana 的 DataFrame
	frame, err := parseGraphQLResponseToFrame(body)
	if err != nil {
		return backend.ErrDataResponse(backend.StatusInternalError, fmt.Sprintf("failed to parse response: %v", err))
	}

	return backend.DataResponse{
		Frames: []*data.Frame{frame},
	}
}

func parseGraphQLResponseToFrame(body []byte) (*data.Frame, error) {
    // ...
    // 此处是关键的转换逻辑。需要解析 JSON 响应,
    // 创建 data.Frame,并为每一列(timestamp, level, serviceName, message)
    // 添加相应类型的 Field。
    // 时间戳字段必须是 time.Time 类型,日志内容是 string 类型。
    // ...
	frame := data.NewFrame("logs")
	frame.SetMeta(&data.FrameMeta{PreferredVisualization: data.VisTypeLogs})

	// ... 伪代码:解析 body, 循环添加数据 ...
	// for _, log := range parsedResponse.Data.Get.LogEntry {
	//    frame.AppendRow(log.timestamp, log.level, log.serviceName, log.message)
	// }
	
	return frame, nil
}

开发这个插件的难点在于:

  1. GraphQL 查询构建: 需要动态地将 Grafana 的时间范围选择器 (query.TimeRange) 转换为 Weaviate GraphQL where 子句中的 timestamp 过滤条件。
  2. 数据帧转换: Weaviate 返回的是 JSON,Grafana 的 Logs Panel 需要一个特定结构的 data.Frame 对象。时间戳、日志级别、内容等字段必须被正确地映射到 data.Frame 的列中,并且字段类型要正确(特别是时间字段)。frame.SetMeta 这行很关键,它告诉 Grafana 这个数据帧最适合用日志面板来展示。
  3. 前端交互: 插件的前端部分(使用 React 和 TypeScript)需要提供一个简洁的输入框,并将用户的输入作为 queryText 发送到后端。这部分相对直接,但需要熟悉 Grafana 的插件开发工具链。

最终,我们在 Grafana 中得到了一个全新的 Dashboard Panel。它只有一个简单的文本输入框,工程师可以在里面输入像 “database connection pool exhausted after new version rollout” 这样的自然语言,面板下方就会实时地展示出与这个语义最相关的日志条目,按时间排序,并用颜色高亮不同的日志级别。这彻底改变了我们排查复杂问题的方式。

局限性与未来展望

这个方案并非银弹。首先,成本是一个重要考量。对海量日志进行向量化和存储,会带来显著的计算和存储开销,远高于传统的文本索引。我们需要实现精细的日志采样或只对高价值(如 ERRORWARN 级别)的日志进行向量化,以控制成本。

其次,搜索质量严重依赖于所选的嵌入模型。通用的模型可能无法很好地理解我们业务领域内的特定术语和缩写。未来的一个迭代方向是,利用我们自己积累的日志数据,微调一个领域专用的嵌入模型,以期获得更高的搜索相关性。

最后,当前的实现是“事后分析”,无法用于实时告警。如何基于日志的“语义漂移”或“异常语义模式”来构建告警系统,是一个更具挑战性但价值也更高的课题。或许可以结合流处理引擎(如 Flink),对日志向量进行实时聚类分析,当出现新的、异常的日志簇时触发告警。这为可观测性的未来打开了新的想象空间。


  目录