基于MongoDB与ClickHouse的CQRS架构在Azure PWA项目中的落地实践


我们的团队在一个迭代周期(Sprint)的评审会上遇到了一个棘手的性能瓶颈。我们正在开发一个面向内部运营团队的PWA(Progressive Web App),它需要处理高频的业务操作录入,同时又要提供复杂的实时数据分析仪表盘。最初的架构很简单:一个React构建的PWA,后端是.NET API,数据库统一使用Azure Cosmos DB for MongoDB。在项目初期,一切看起来都很好。但随着数据量的增长和分析维度的复杂化,问题暴露无遗:一个复杂的聚合查询能轻易地拖慢整个MongoDB实例,导致操作录入接口的响应时间(RT)飙升,甚至超时。

Scrum的迭代模式放大了这个问题。每个Sprint,产品负责人都会根据用户反馈提出新的、更复杂的分析需求。我们尝试通过加索引、读写分离到只读副本等传统方式来优化,但效果甚微。为运营分析创建的复杂索引拖慢了写入性能,而只读副本在处理真正临时的、多变的分析查询时,依然力不从心。我们意识到,问题的根源在于将两种截然不同的工作负载——事务处理(OLTP)和分析处理(OLAP)——强行绑定在了同一个技术栈上。

方案A:垂直深化MongoDB生态

第一个摆上台面的方案是继续在MongoDB的技术栈内寻求解决方案。这通常是阻力最小的路径。

  • 构想:

    1. 更强的硬件: 升级Azure Cosmos DB的RU(Request Units),简单粗暴。
    2. 专用分析节点: 在MongoDB Atlas(虽然我们用的是Cosmos DB,但可以借鉴其思路)中有专门的Analytics Nodes,它们使用不同的硬件配置来隔离OLAP工作负载。我们可以尝试在Cosmos DB中通过标签和节点亲和性来模拟类似的环境。
    3. 物化视图 (Materialized Views): 对常用的分析查询模式,预先计算结果并存为新的集合。PWA的仪表盘直接查询这些预计算好的集合。
  • 优势分析:

    • 技术栈统一,团队学习成本低。
    • 无需引入新的数据存储技术,运维复杂度相对可控。
    • 对于固定的查询模式,物化视图能提供极佳的性能。
  • 劣势分析:

    • 成本失控: 单纯增加RU是一种昂贵的做法,且无法从根本上解决OLTP和OLAP的冲突。
    • 分析灵活性差: 物化视图的策略严重依赖于“预知”查询模式。在Scrum快节奏迭代下,运营团队的需求变化极快,我们不可能为每一种临时分析都创建一个物化视图。这种方案僵化且被动。
    • 混合负载的本质矛盾: MongoDB的B-Tree索引和面向文档的存储结构,天生就不是为了大规模、多维度的即席聚合查询而设计的。即便使用专门的分析节点,其底层引擎的局限性依然存在。一个运营人员在仪表盘上拖拽维度、改变时间范围的操作,就可能发起一个穿透所有优化的慢查询。

这个方案很快被否决。它像是在给一艘货轮安装F1赛车的引擎,希望它能跑得快,却忽略了船体本身的设计目标。

方案B:引入CQRS与专用分析引擎ClickHouse

第二个方案则更为激进:从架构层面彻底分离读写模型,即命令查询职责分离(CQRS - Command Query Responsibility Segregation)。

  • 构想:
    1. 命令端 (Command Side): 所有的写入操作(CUD - Create, Update, Delete)都由独立的API处理。数据持久化依然使用MongoDB。MongoDB非常适合作为领域模型的写库,其灵活的文档模型能很好地适应业务在Scrum迭代中的演进。
    2. 查询端 (Query Side): 所有的读取操作,特别是复杂的分析查询,都由另一组独立的API处理。这组API的数据源是一个专门为OLAP设计的数据库。
    3. 数据同步: 命令端的数据变更需要近乎实时地同步到查询端。这需要一个可靠的事件管道。
    4. 技术选型:
      • 写库: 继续使用Azure Cosmos DB for MongoDB。
      • 读库: 选用ClickHouse。它是一个列式存储数据库,为OLAP场景设计,聚合查询性能极其出色。我们可以在Azure上使用虚拟机或容器自行部署。
      • 事件管道: Azure Event Hubs。它是一个高吞吐量的事件流服务,非常适合解耦命令端和查询端。
      • 同步处理器: Azure Functions。使用事件中心触发器,消费事件并将其写入ClickHouse。
graph TD
    subgraph PWA
        A[操作界面] --> B{命令API};
        C[分析仪表盘] --> D{查询API};
    end

    subgraph Azure Cloud
        B -- CUD 操作 --> E[MongoDB];
        E -- 发布事件 --> F[Azure Event Hubs];
        F -- 触发 --> G[Azure Function: 数据同步器];
        G -- 写入扁平化数据 --> H[ClickHouse];
        D -- OLAP 查询 --> H;
    end

    style E fill:#4DB33D,stroke:#333,stroke-width:2px
    style H fill:#FFDD00,stroke:#333,stroke-width:2px
    style F fill:#0078D4,stroke:#333,stroke-width:2px
    style G fill:#0078D4,stroke:#333,stroke-width:2px
  • 优势分析:

    • 最佳工具做最佳的事: MongoDB处理其擅长的事务性写入,ClickHouse处理其擅长的分析查询。性能和资源利用都达到最优。
    • 完全隔离: 查询端的任何复杂查询,哪怕是全表扫描,都不会影响到命令端的性能。这为PWA提供了稳定、低延迟的写入体验。
    • 独立扩展: 我们可以根据读写负载的不同,在Azure上独立扩展API服务、MongoDB的RU或ClickHouse集群的节点,成本控制更精细。
    • 优化的数据模型: 查询端的数据可以被“压平”或反规范化,设计成最适合ClickHouse查询的宽表模式,无需遵循命令端的领域模型。
  • 劣势分析:

    • 架构复杂度上升: 引入了Event Hubs、Azure Functions和ClickHouse,运维和监控的复杂度显著增加。
    • 最终一致性: 数据从MongoDB同步到ClickHouse存在毫秒到秒级的延迟。PWA的仪表盘显示的数据是非严格实时的。这是一个必须与产品负责人明确沟通的权衡点。在我们的场景中,运营分析对秒级延迟完全可以接受。
    • 开发心智负担: 开发者需要同时理解命令模型和查询模型,并处理好两者之间的数据同步逻辑。

最终选择:拥抱CQRS的复杂性

经过团队讨论和原型验证,我们最终选择了方案B。虽然它更复杂,但它从根本上解决了问题,并且为未来的功能扩展提供了坚实的架构基础。Scrum的核心是拥抱变化,而一个能够独立演进读写两端的架构,正是对这一理念的最佳技术诠释。我们接受了最终一致性所带来的微小代价,以换取整个系统的高性能和高可扩展性。

核心实现细节

以下是这个架构中几个关键部分的代码实现。我们使用.NET 6作为后端技术栈。

1. 命令端:写入MongoDB并发布事件

命令处理器负责接收命令,执行业务逻辑,更新MongoDB,最后将领域事件发布到Azure Event Hubs。

// MediatR Command Handler
public class CreateOperationCommandHandler : IRequestHandler<CreateOperationCommand, Guid>
{
    private readonly IMongoCollection<Operation> _operations;
    private readonly EventHubProducerClient _eventHubClient;
    private readonly ILogger<CreateOperationCommandHandler> _logger;

    // 依赖注入
    public CreateOperationCommandHandler(IMongoDatabase database, EventHubProducerClient eventHubClient, ILogger<CreateOperationCommandHandler> logger)
    {
        _operations = database.GetCollection<Operation>("operations");
        _eventHubClient = eventHubClient;
        _logger = logger;
    }

    public async Task<Guid> Handle(CreateOperationCommand request, CancellationToken cancellationToken)
    {
        // 业务验证
        if (request.Amount <= 0)
        {
            throw new ArgumentException("Amount must be positive.");
        }

        var operation = new Operation
        {
            Id = Guid.NewGuid(),
            OperatorId = request.OperatorId,
            Amount = request.Amount,
            Timestamp = DateTime.UtcNow,
            Metadata = request.Metadata
        };

        try
        {
            // 步骤1: 写入MongoDB
            await _operations.InsertOneAsync(operation, null, cancellationToken);

            // 步骤2: 创建领域事件
            var operationCreatedEvent = new OperationCreatedEvent
            {
                EventId = Guid.NewGuid(),
                OperationId = operation.Id,
                Timestamp = operation.Timestamp,
                // ... 包含事件所需的所有数据
            };

            // 步骤3: 发布到Event Hubs
            await PublishEventAsync(operationCreatedEvent, cancellationToken);
            
            _logger.LogInformation("Operation {OperationId} created and event published.", operation.Id);

            return operation.Id;
        }
        catch (MongoWriteException ex)
        {
            _logger.LogError(ex, "Failed to write operation to MongoDB.");
            // 这里的错误处理很关键,如果写入失败,就不应该发布事件。
            // 考虑引入更复杂的事务机制,例如事务发件箱模式(Transactional Outbox)
            // 来保证写入和事件发布的原子性。但在当前场景,我们简化处理。
            throw;
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "An unexpected error occurred during operation creation.");
            throw;
        }
    }

    private async Task PublishEventAsync(OperationCreatedEvent eventData, CancellationToken cancellationToken)
    {
        var eventBody = JsonSerializer.Serialize(eventData);
        var eventBatch = await _eventHubClient.CreateBatchAsync(cancellationToken);

        if (!eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes(eventBody))))
        {
            // 如果事件太大,无法添加到批次中
            _logger.LogError("Event {EventId} is too large for the batch.", eventData.EventId);
            throw new InvalidOperationException("Event is too large.");
        }
        
        // 发送批次
        await _eventHubClient.SendAsync(eventBatch, cancellationToken);
    }
}

// MongoDB Document Model
public class Operation
{
    [BsonId]
    [BsonRepresentation(BsonType.String)]
    public Guid Id { get; set; }
    public string OperatorId { get; set; }
    public decimal Amount { get; set; }
    public DateTime Timestamp { get; set; }
    public Dictionary<string, object> Metadata { get; set; } // 灵活的元数据
}

代码解析:

  • 原子性问题: 上述代码中,InsertOneAsyncPublishEventAsync 并不是一个原子操作。如果数据库写入成功但事件发布失败,就会导致数据不一致。在真实项目中,我们会采用事务发件箱模式:将事件和业务数据在同一个MongoDB事务中写入到一个outbox集合,然后由一个后台进程轮询outbox集合,保证事件一定会被发布出去。
  • 依赖注入: 所有外部依赖(MongoDB客户端,Event Hub客户端,Logger)都通过构造函数注入,便于单元测试。
  • 错误处理: 包含了针对数据库写入和事件发布的具体错误日志。

2. ClickHouse表结构设计

查询端的数据模型是为分析而优化的宽表。

-- ClickHouse DDL for the read model
-- 我们选择在Azure VM上部署ClickHouse集群
CREATE TABLE default.operations_view (
    -- 主键和分区键
    `OperationId` UUID,
    `Timestamp` DateTime,
    `EventDate` Date,

    -- 维度字段
    `OperatorId` String,
    `OperatorName` String, -- 反规范化数据
    `Department` String,   -- 反规范化数据
    
    -- 指标字段
    `Amount` Decimal(18, 2),

    -- 从MongoDB Metadata中提取的关键K-V对,扁平化处理
    `Metadata_Source` String,
    `Metadata_ProductType` String,

    -- 事件处理时间,用于追踪延迟
    `ProcessedAt` DateTime DEFAULT now()

) ENGINE = MergeTree()
PARTITION BY toYYYYMM(EventDate)
ORDER BY (EventDate, OperatorId, Timestamp);

设计解析:

  • MergeTree 引擎: 这是ClickHouse最核心的表引擎,专为高性能数据写入和查询而设计。
  • 分区键 (PARTITION BY): 我们按月对数据进行分区。这对于涉及时间范围的查询能极大地减少需要扫描的数据量。
  • 主键/排序键 (ORDER BY): 定义了数据在磁盘上的物理排序。将常用的过滤字段(如OperatorId)放在排序键中,可以极大地加速查询。
  • 反规范化: OperatorNameDepartment 这些数据在MongoDB中可能存储在另一个集合里。在同步过程中,我们会一并查询出来,写入到ClickHouse宽表中,避免查询时进行昂贵的JOIN操作。

3. 数据同步器:Azure Function

这个函数由Event Hubs触发,负责将数据写入ClickHouse。

// Azure Function with Event Hub Trigger
public class SyncOperationToClickHouse
{
    private readonly ClickHouseConnection _clickHouseConnection;
    private readonly ILogger<SyncOperationToClickHouse> _logger;

    public SyncOperationToClickHouse(ClickHouseConnection clickHouseConnection, ILogger<SyncOperationToClickHouse> logger)
    {
        _clickHouseConnection = clickHouseConnection;
        _logger = logger;
    }

    [FunctionName("SyncOperationToClickHouse")]
    public async Task Run([EventHubTrigger("operations-events", Connection = "EventHubConnectionString")] EventData[] events)
    {
        if (events.Length == 0) return;

        var operationsForInsert = new List<OperationView>();

        foreach (var eventData in events)
        {
            try
            {
                var eventBody = Encoding.UTF8.GetString(eventData.EventBody.ToArray());
                var operationEvent = JsonSerializer.Deserialize<OperationCreatedEvent>(eventBody);
                
                // 数据转换和丰富逻辑
                // 在真实场景中,这里可能会调用其他服务获取OperatorName等信息
                var operationView = new OperationView
                {
                    OperationId = operationEvent.OperationId,
                    Timestamp = operationEvent.Timestamp,
                    EventDate = operationEvent.Timestamp.Date,
                    OperatorId = operationEvent.OperatorId,
                    OperatorName = "Mock Name", // Placeholder
                    Department = "Mock Dept",   // Placeholder
                    Amount = operationEvent.Amount,
                    Metadata_Source = operationEvent.Metadata.TryGetValue("source", out var src) ? src.ToString() : "N/A",
                    Metadata_ProductType = operationEvent.Metadata.TryGetValue("productType", out var pt) ? pt.ToString() : "N/A"
                };
                operationsForInsert.Add(operationView);
            }
            catch (JsonException ex)
            {
                _logger.LogError(ex, "Failed to deserialize event. Body: {Body}", Encoding.UTF8.GetString(eventData.EventBody.ToArray()));
                // 考虑推送到死信队列
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "An unexpected error occurred processing an event.");
                // 同样,需要有重试和死信队列策略
            }
        }

        if (operationsForInsert.Any())
        {
            try
            {
                // 使用ClickHouse.AdoNet.BulkCopy进行高效批量插入
                using var bulkCopy = new ClickHouseBulkCopy(_clickHouseConnection)
                {
                    DestinationTableName = "default.operations_view",
                    BatchSize = 1000 // 配置合适的批次大小
                };
                
                await bulkCopy.WriteToServerAsync(operationsForInsert);
                _logger.LogInformation("{Count} records successfully synced to ClickHouse.", operationsForInsert.Count);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Failed to bulk insert records into ClickHouse.");
                // 如果批量插入失败,整个批次的事件都需要重试,
                // Azure Function的重试策略在这里非常重要。
                throw; // 抛出异常以触发函数的重试机制
            }
        }
    }
}

// ClickHouse Read Model DTO
public class OperationView 
{
    public Guid OperationId { get; set; }
    public DateTime Timestamp { get; set; }
    public DateTime EventDate { get; set; }
    public string OperatorId { get; set; }
    public string OperatorName { get; set; }
    public string Department { get; set; }
    public decimal Amount { get; set; }
    public string Metadata_Source { get; set; }
    public string Metadata_ProductType { get; set; }
}

实现要点:

  • 批量处理: Azure Function可以一次性接收一批事件。代码中将所有事件处理后,使用ClickHouseBulkCopy进行一次性的批量写入,这是保证高吞吐量的关键。直接单条INSERT会带来巨大的性能灾难。
  • 健壮性: 必须处理反序列化失败、数据转换失败和写入ClickHouse失败等各种情况。配置Azure Function的重试策略和死信队列是生产环境的必备操作,确保数据不会丢失。
  • 幂等性: 同步逻辑需要考虑幂等性。如果一个事件被重复处理,系统不应该插入重复数据。可以在ClickHouse表上设置唯一约束(虽然ClickHouse的约束是异步检查的),或者在写入前进行SELECT检查。更可靠的方式是使用ReplacingMergeTree引擎,它可以根据排序键自动处理重复数据。

架构的扩展性与局限性

这套CQRS架构并非银弹。它最大的局限性在于引入了最终一致性。业务方和产品团队必须理解并接受,用户刚提交的操作不会在下一毫秒就出现在分析报表中。对于需要强一致性读的场景(例如,检查用户账户余额),查询请求必须路由到命令端的MongoDB,而不是查询端的ClickHouse。

另一个挑战是运维复杂度。我们现在需要监控MongoDB、Event Hubs、Azure Function以及ClickHouse集群。任何一个环节出现问题,都会导致数据同步延迟或中断。一套完善的可观测性体系(Logging, Metrics, Tracing)是支撑这套架构稳定运行的基石。

未来的优化路径是清晰的。我们可以引入Schema Registry来管理事件的版本,确保生产者和消费者的兼容性。对于数据回溯的需求,可以利用Event Hubs的事件持久化能力,重新消费所有历史事件来重建ClickHouse中的读模型。对于更复杂的实时计算,甚至可以在数据同步器(Azure Function)和ClickHouse之间再加入一个流处理引擎如Azure Stream Analytics或Apache Flink,进行更复杂的事件转换和聚合。


  目录