我们的团队在一个迭代周期(Sprint)的评审会上遇到了一个棘手的性能瓶颈。我们正在开发一个面向内部运营团队的PWA(Progressive Web App),它需要处理高频的业务操作录入,同时又要提供复杂的实时数据分析仪表盘。最初的架构很简单:一个React构建的PWA,后端是.NET API,数据库统一使用Azure Cosmos DB for MongoDB。在项目初期,一切看起来都很好。但随着数据量的增长和分析维度的复杂化,问题暴露无遗:一个复杂的聚合查询能轻易地拖慢整个MongoDB实例,导致操作录入接口的响应时间(RT)飙升,甚至超时。
Scrum的迭代模式放大了这个问题。每个Sprint,产品负责人都会根据用户反馈提出新的、更复杂的分析需求。我们尝试通过加索引、读写分离到只读副本等传统方式来优化,但效果甚微。为运营分析创建的复杂索引拖慢了写入性能,而只读副本在处理真正临时的、多变的分析查询时,依然力不从心。我们意识到,问题的根源在于将两种截然不同的工作负载——事务处理(OLTP)和分析处理(OLAP)——强行绑定在了同一个技术栈上。
方案A:垂直深化MongoDB生态
第一个摆上台面的方案是继续在MongoDB的技术栈内寻求解决方案。这通常是阻力最小的路径。
构想:
- 更强的硬件: 升级Azure Cosmos DB的RU(Request Units),简单粗暴。
- 专用分析节点: 在MongoDB Atlas(虽然我们用的是Cosmos DB,但可以借鉴其思路)中有专门的Analytics Nodes,它们使用不同的硬件配置来隔离OLAP工作负载。我们可以尝试在Cosmos DB中通过标签和节点亲和性来模拟类似的环境。
- 物化视图 (Materialized Views): 对常用的分析查询模式,预先计算结果并存为新的集合。PWA的仪表盘直接查询这些预计算好的集合。
优势分析:
- 技术栈统一,团队学习成本低。
- 无需引入新的数据存储技术,运维复杂度相对可控。
- 对于固定的查询模式,物化视图能提供极佳的性能。
劣势分析:
- 成本失控: 单纯增加RU是一种昂贵的做法,且无法从根本上解决OLTP和OLAP的冲突。
- 分析灵活性差: 物化视图的策略严重依赖于“预知”查询模式。在Scrum快节奏迭代下,运营团队的需求变化极快,我们不可能为每一种临时分析都创建一个物化视图。这种方案僵化且被动。
- 混合负载的本质矛盾: MongoDB的B-Tree索引和面向文档的存储结构,天生就不是为了大规模、多维度的即席聚合查询而设计的。即便使用专门的分析节点,其底层引擎的局限性依然存在。一个运营人员在仪表盘上拖拽维度、改变时间范围的操作,就可能发起一个穿透所有优化的慢查询。
这个方案很快被否决。它像是在给一艘货轮安装F1赛车的引擎,希望它能跑得快,却忽略了船体本身的设计目标。
方案B:引入CQRS与专用分析引擎ClickHouse
第二个方案则更为激进:从架构层面彻底分离读写模型,即命令查询职责分离(CQRS - Command Query Responsibility Segregation)。
- 构想:
- 命令端 (Command Side): 所有的写入操作(CUD - Create, Update, Delete)都由独立的API处理。数据持久化依然使用MongoDB。MongoDB非常适合作为领域模型的写库,其灵活的文档模型能很好地适应业务在Scrum迭代中的演进。
- 查询端 (Query Side): 所有的读取操作,特别是复杂的分析查询,都由另一组独立的API处理。这组API的数据源是一个专门为OLAP设计的数据库。
- 数据同步: 命令端的数据变更需要近乎实时地同步到查询端。这需要一个可靠的事件管道。
- 技术选型:
- 写库: 继续使用Azure Cosmos DB for MongoDB。
- 读库: 选用ClickHouse。它是一个列式存储数据库,为OLAP场景设计,聚合查询性能极其出色。我们可以在Azure上使用虚拟机或容器自行部署。
- 事件管道: Azure Event Hubs。它是一个高吞吐量的事件流服务,非常适合解耦命令端和查询端。
- 同步处理器: Azure Functions。使用事件中心触发器,消费事件并将其写入ClickHouse。
graph TD subgraph PWA A[操作界面] --> B{命令API}; C[分析仪表盘] --> D{查询API}; end subgraph Azure Cloud B -- CUD 操作 --> E[MongoDB]; E -- 发布事件 --> F[Azure Event Hubs]; F -- 触发 --> G[Azure Function: 数据同步器]; G -- 写入扁平化数据 --> H[ClickHouse]; D -- OLAP 查询 --> H; end style E fill:#4DB33D,stroke:#333,stroke-width:2px style H fill:#FFDD00,stroke:#333,stroke-width:2px style F fill:#0078D4,stroke:#333,stroke-width:2px style G fill:#0078D4,stroke:#333,stroke-width:2px
优势分析:
- 最佳工具做最佳的事: MongoDB处理其擅长的事务性写入,ClickHouse处理其擅长的分析查询。性能和资源利用都达到最优。
- 完全隔离: 查询端的任何复杂查询,哪怕是全表扫描,都不会影响到命令端的性能。这为PWA提供了稳定、低延迟的写入体验。
- 独立扩展: 我们可以根据读写负载的不同,在Azure上独立扩展API服务、MongoDB的RU或ClickHouse集群的节点,成本控制更精细。
- 优化的数据模型: 查询端的数据可以被“压平”或反规范化,设计成最适合ClickHouse查询的宽表模式,无需遵循命令端的领域模型。
劣势分析:
- 架构复杂度上升: 引入了Event Hubs、Azure Functions和ClickHouse,运维和监控的复杂度显著增加。
- 最终一致性: 数据从MongoDB同步到ClickHouse存在毫秒到秒级的延迟。PWA的仪表盘显示的数据是非严格实时的。这是一个必须与产品负责人明确沟通的权衡点。在我们的场景中,运营分析对秒级延迟完全可以接受。
- 开发心智负担: 开发者需要同时理解命令模型和查询模型,并处理好两者之间的数据同步逻辑。
最终选择:拥抱CQRS的复杂性
经过团队讨论和原型验证,我们最终选择了方案B。虽然它更复杂,但它从根本上解决了问题,并且为未来的功能扩展提供了坚实的架构基础。Scrum的核心是拥抱变化,而一个能够独立演进读写两端的架构,正是对这一理念的最佳技术诠释。我们接受了最终一致性所带来的微小代价,以换取整个系统的高性能和高可扩展性。
核心实现细节
以下是这个架构中几个关键部分的代码实现。我们使用.NET 6作为后端技术栈。
1. 命令端:写入MongoDB并发布事件
命令处理器负责接收命令,执行业务逻辑,更新MongoDB,最后将领域事件发布到Azure Event Hubs。
// MediatR Command Handler
public class CreateOperationCommandHandler : IRequestHandler<CreateOperationCommand, Guid>
{
private readonly IMongoCollection<Operation> _operations;
private readonly EventHubProducerClient _eventHubClient;
private readonly ILogger<CreateOperationCommandHandler> _logger;
// 依赖注入
public CreateOperationCommandHandler(IMongoDatabase database, EventHubProducerClient eventHubClient, ILogger<CreateOperationCommandHandler> logger)
{
_operations = database.GetCollection<Operation>("operations");
_eventHubClient = eventHubClient;
_logger = logger;
}
public async Task<Guid> Handle(CreateOperationCommand request, CancellationToken cancellationToken)
{
// 业务验证
if (request.Amount <= 0)
{
throw new ArgumentException("Amount must be positive.");
}
var operation = new Operation
{
Id = Guid.NewGuid(),
OperatorId = request.OperatorId,
Amount = request.Amount,
Timestamp = DateTime.UtcNow,
Metadata = request.Metadata
};
try
{
// 步骤1: 写入MongoDB
await _operations.InsertOneAsync(operation, null, cancellationToken);
// 步骤2: 创建领域事件
var operationCreatedEvent = new OperationCreatedEvent
{
EventId = Guid.NewGuid(),
OperationId = operation.Id,
Timestamp = operation.Timestamp,
// ... 包含事件所需的所有数据
};
// 步骤3: 发布到Event Hubs
await PublishEventAsync(operationCreatedEvent, cancellationToken);
_logger.LogInformation("Operation {OperationId} created and event published.", operation.Id);
return operation.Id;
}
catch (MongoWriteException ex)
{
_logger.LogError(ex, "Failed to write operation to MongoDB.");
// 这里的错误处理很关键,如果写入失败,就不应该发布事件。
// 考虑引入更复杂的事务机制,例如事务发件箱模式(Transactional Outbox)
// 来保证写入和事件发布的原子性。但在当前场景,我们简化处理。
throw;
}
catch (Exception ex)
{
_logger.LogError(ex, "An unexpected error occurred during operation creation.");
throw;
}
}
private async Task PublishEventAsync(OperationCreatedEvent eventData, CancellationToken cancellationToken)
{
var eventBody = JsonSerializer.Serialize(eventData);
var eventBatch = await _eventHubClient.CreateBatchAsync(cancellationToken);
if (!eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes(eventBody))))
{
// 如果事件太大,无法添加到批次中
_logger.LogError("Event {EventId} is too large for the batch.", eventData.EventId);
throw new InvalidOperationException("Event is too large.");
}
// 发送批次
await _eventHubClient.SendAsync(eventBatch, cancellationToken);
}
}
// MongoDB Document Model
public class Operation
{
[BsonId]
[BsonRepresentation(BsonType.String)]
public Guid Id { get; set; }
public string OperatorId { get; set; }
public decimal Amount { get; set; }
public DateTime Timestamp { get; set; }
public Dictionary<string, object> Metadata { get; set; } // 灵活的元数据
}
代码解析:
- 原子性问题: 上述代码中,
InsertOneAsync
和PublishEventAsync
并不是一个原子操作。如果数据库写入成功但事件发布失败,就会导致数据不一致。在真实项目中,我们会采用事务发件箱模式:将事件和业务数据在同一个MongoDB事务中写入到一个outbox
集合,然后由一个后台进程轮询outbox
集合,保证事件一定会被发布出去。 - 依赖注入: 所有外部依赖(MongoDB客户端,Event Hub客户端,Logger)都通过构造函数注入,便于单元测试。
- 错误处理: 包含了针对数据库写入和事件发布的具体错误日志。
2. ClickHouse表结构设计
查询端的数据模型是为分析而优化的宽表。
-- ClickHouse DDL for the read model
-- 我们选择在Azure VM上部署ClickHouse集群
CREATE TABLE default.operations_view (
-- 主键和分区键
`OperationId` UUID,
`Timestamp` DateTime,
`EventDate` Date,
-- 维度字段
`OperatorId` String,
`OperatorName` String, -- 反规范化数据
`Department` String, -- 反规范化数据
-- 指标字段
`Amount` Decimal(18, 2),
-- 从MongoDB Metadata中提取的关键K-V对,扁平化处理
`Metadata_Source` String,
`Metadata_ProductType` String,
-- 事件处理时间,用于追踪延迟
`ProcessedAt` DateTime DEFAULT now()
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(EventDate)
ORDER BY (EventDate, OperatorId, Timestamp);
设计解析:
-
MergeTree
引擎: 这是ClickHouse最核心的表引擎,专为高性能数据写入和查询而设计。 - 分区键 (
PARTITION BY
): 我们按月对数据进行分区。这对于涉及时间范围的查询能极大地减少需要扫描的数据量。 - 主键/排序键 (
ORDER BY
): 定义了数据在磁盘上的物理排序。将常用的过滤字段(如OperatorId
)放在排序键中,可以极大地加速查询。 - 反规范化:
OperatorName
和Department
这些数据在MongoDB中可能存储在另一个集合里。在同步过程中,我们会一并查询出来,写入到ClickHouse宽表中,避免查询时进行昂贵的JOIN
操作。
3. 数据同步器:Azure Function
这个函数由Event Hubs触发,负责将数据写入ClickHouse。
// Azure Function with Event Hub Trigger
public class SyncOperationToClickHouse
{
private readonly ClickHouseConnection _clickHouseConnection;
private readonly ILogger<SyncOperationToClickHouse> _logger;
public SyncOperationToClickHouse(ClickHouseConnection clickHouseConnection, ILogger<SyncOperationToClickHouse> logger)
{
_clickHouseConnection = clickHouseConnection;
_logger = logger;
}
[FunctionName("SyncOperationToClickHouse")]
public async Task Run([EventHubTrigger("operations-events", Connection = "EventHubConnectionString")] EventData[] events)
{
if (events.Length == 0) return;
var operationsForInsert = new List<OperationView>();
foreach (var eventData in events)
{
try
{
var eventBody = Encoding.UTF8.GetString(eventData.EventBody.ToArray());
var operationEvent = JsonSerializer.Deserialize<OperationCreatedEvent>(eventBody);
// 数据转换和丰富逻辑
// 在真实场景中,这里可能会调用其他服务获取OperatorName等信息
var operationView = new OperationView
{
OperationId = operationEvent.OperationId,
Timestamp = operationEvent.Timestamp,
EventDate = operationEvent.Timestamp.Date,
OperatorId = operationEvent.OperatorId,
OperatorName = "Mock Name", // Placeholder
Department = "Mock Dept", // Placeholder
Amount = operationEvent.Amount,
Metadata_Source = operationEvent.Metadata.TryGetValue("source", out var src) ? src.ToString() : "N/A",
Metadata_ProductType = operationEvent.Metadata.TryGetValue("productType", out var pt) ? pt.ToString() : "N/A"
};
operationsForInsert.Add(operationView);
}
catch (JsonException ex)
{
_logger.LogError(ex, "Failed to deserialize event. Body: {Body}", Encoding.UTF8.GetString(eventData.EventBody.ToArray()));
// 考虑推送到死信队列
}
catch (Exception ex)
{
_logger.LogError(ex, "An unexpected error occurred processing an event.");
// 同样,需要有重试和死信队列策略
}
}
if (operationsForInsert.Any())
{
try
{
// 使用ClickHouse.AdoNet.BulkCopy进行高效批量插入
using var bulkCopy = new ClickHouseBulkCopy(_clickHouseConnection)
{
DestinationTableName = "default.operations_view",
BatchSize = 1000 // 配置合适的批次大小
};
await bulkCopy.WriteToServerAsync(operationsForInsert);
_logger.LogInformation("{Count} records successfully synced to ClickHouse.", operationsForInsert.Count);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to bulk insert records into ClickHouse.");
// 如果批量插入失败,整个批次的事件都需要重试,
// Azure Function的重试策略在这里非常重要。
throw; // 抛出异常以触发函数的重试机制
}
}
}
}
// ClickHouse Read Model DTO
public class OperationView
{
public Guid OperationId { get; set; }
public DateTime Timestamp { get; set; }
public DateTime EventDate { get; set; }
public string OperatorId { get; set; }
public string OperatorName { get; set; }
public string Department { get; set; }
public decimal Amount { get; set; }
public string Metadata_Source { get; set; }
public string Metadata_ProductType { get; set; }
}
实现要点:
- 批量处理: Azure Function可以一次性接收一批事件。代码中将所有事件处理后,使用
ClickHouseBulkCopy
进行一次性的批量写入,这是保证高吞吐量的关键。直接单条INSERT
会带来巨大的性能灾难。 - 健壮性: 必须处理反序列化失败、数据转换失败和写入ClickHouse失败等各种情况。配置Azure Function的重试策略和死信队列是生产环境的必备操作,确保数据不会丢失。
- 幂等性: 同步逻辑需要考虑幂等性。如果一个事件被重复处理,系统不应该插入重复数据。可以在ClickHouse表上设置唯一约束(虽然ClickHouse的约束是异步检查的),或者在写入前进行
SELECT
检查。更可靠的方式是使用ReplacingMergeTree
引擎,它可以根据排序键自动处理重复数据。
架构的扩展性与局限性
这套CQRS架构并非银弹。它最大的局限性在于引入了最终一致性。业务方和产品团队必须理解并接受,用户刚提交的操作不会在下一毫秒就出现在分析报表中。对于需要强一致性读的场景(例如,检查用户账户余额),查询请求必须路由到命令端的MongoDB,而不是查询端的ClickHouse。
另一个挑战是运维复杂度。我们现在需要监控MongoDB、Event Hubs、Azure Function以及ClickHouse集群。任何一个环节出现问题,都会导致数据同步延迟或中断。一套完善的可观测性体系(Logging, Metrics, Tracing)是支撑这套架构稳定运行的基石。
未来的优化路径是清晰的。我们可以引入Schema Registry来管理事件的版本,确保生产者和消费者的兼容性。对于数据回溯的需求,可以利用Event Hubs的事件持久化能力,重新消费所有历史事件来重建ClickHouse中的读模型。对于更复杂的实时计算,甚至可以在数据同步器(Azure Function)和ClickHouse之间再加入一个流处理引擎如Azure Stream Analytics或Apache Flink,进行更复杂的事件转换和聚合。