基于gRPC与Cloud Pub/Sub构建异步Saga模式的分布式事务协调器


当订单服务、支付服务和库存服务被拆分后,一个简单的下单操作就跨越了三个独立的数据库。传统的本地ACID事务在这里彻底失效。任何一个环节的失败,比如支付成功但库存扣减失败,都会导致数据的不一致,这是生产环境无法容忍的。

问题的核心在于如何在分布式环境中维护业务流程的最终一致性。我们排除了两阶段提交(2PC)这类强一致性方案,因为它对协调器的依赖过高,性能瓶颈明显,且在网络分区下可用性极差。我们选择了Saga模式,这是一种通过一系列本地事务和对应的补偿操作来保证最终一致性的模式。

我们将构建一个基于事件驱动的、可编排的(Orchestration-based)Saga协调器。它的职责是驱动整个事务流程,并在任何步骤失败时,精确地执行反向的补偿操作。

技术栈选型如下:

  • Go: 高并发性能与强大的标准库,非常适合构建网络服务。
  • gRPC: 服务间的同步调用(命令执行)使用Protobuf进行强类型定义,性能优于RESTful API。
  • Google Cloud Pub/Sub: 服务间的异步通信(事件通知),提供至少一次(at-least-once)的投递保证和自动重试机制,是Saga事件传递的理想选择。
  • Docker: 容器化部署,确保环境一致性与服务隔离。

Saga流程的可视化定义

在我们深入代码之前,必须清晰地定义Saga的执行流程,包括成功路径和所有潜在的失败补偿路径。以一个电商下单流程为例:

graph TD
    subgraph Saga Flow
        A[Start] --> B(Create Order);
        B -- Order Created --> C(Process Payment);
        C -- Payment Succeeded --> D(Deduct Inventory);
        D -- Inventory Deducted --> E[End: Success];
    end

    subgraph Compensation Path
        D -- Inventory Failed --> F(Cancel Payment);
        C -- Payment Failed --> G(Cancel Order);
        F -- Payment Canceled --> G;
        G -- Order Canceled --> H[End: Failed];
    end

    style E fill:#9f9,stroke:#333,stroke-width:2px
    style H fill:#f99,stroke:#333,stroke-width:2px

这个流程图是我们的实现蓝图。协调器(Orchestrator)将作为这个状态机的驱动引擎。

服务接口定义:Protobuf是通信的基石

首先,我们需要用Protobuf为每个参与Saga的服务定义清晰的接口。这不仅是RPC的契约,也是Saga中“本地事务”和“补偿事务”的精确定义。

在一个统一的protos目录下,我们创建以下文件:

protos/order/order.proto

syntax = "proto3";

package order;

option go_package = "github.com/your-repo/saga/gen/order";

service OrderService {
  // 正向操作:创建订单
  rpc CreateOrder(CreateOrderRequest) returns (CreateOrderResponse);
  // 补偿操作:取消订单
  rpc CancelOrder(CancelOrderRequest) returns (CancelOrderResponse);
}

message CreateOrderRequest {
  string saga_id = 1; // 必须包含Saga ID,用于幂等性控制
  string user_id = 2;
  string item_id = 3;
  int32 quantity = 4;
}

message CreateOrderResponse {
  string order_id = 1;
}

message CancelOrderRequest {
  string saga_id = 1;
  string order_id = 2;
}

message CancelOrderResponse {}

protos/payment/payment.proto

syntax = "proto3";

package payment;

option go_package = "github.com/your-repo/saga/gen/payment";

service PaymentService {
  // 正向操作:处理支付
  rpc ProcessPayment(ProcessPaymentRequest) returns (ProcessPaymentResponse);
  // 补偿操作:退款
  rpc RefundPayment(RefundPaymentRequest) returns (RefundPaymentResponse);
}

message ProcessPaymentRequest {
  string saga_id = 1;
  string order_id = 2;
  float amount = 3;
}

message ProcessPaymentResponse {
  string transaction_id = 1;
}

message RefundPaymentRequest {
  string saga_id = 1;
  string transaction_id = 2;
}

message RefundPaymentResponse {}

protos/inventory/inventory.proto

syntax = "proto3";

package inventory;

option go_package = "github.com/your-repo/saga/gen/inventory";

service InventoryService {
  // 正向操作:扣减库存
  rpc DeductInventory(DeductInventoryRequest) returns (DeductInventoryResponse);
  // 补偿操作:返还库存
  rpc CompensateInventory(CompensateInventoryRequest) returns (CompensateInventoryResponse);
}

message DeductInventoryRequest {
  string saga_id = 1;
  string order_id = 2;
  string item_id = 3;
  int32 quantity = 4;
}

message DeductInventoryResponse {}

message CompensateInventoryRequest {
  string saga_id = 1;
  string order_id = 2;
}

message CompensateInventoryResponse {}

注意,每个请求中都包含了saga_id。这是实现幂等性的关键。由于Pub/Sub的at-least-once特性,消息可能会被重复投递,服务必须能够识别并忽略重复的请求。

核心:Saga协调器的设计与实现

协调器是整个架构的大脑。它不处理具体业务逻辑,只负责编排流程、调用gRPC服务、监听Pub/Sub事件,以及在失败时执行补偿。

orchestrator/main.go

package main

import (
	"context"
	"encoding/json"
	"log"
	"sync"
	"time"

	"cloud.google.com/go/pubsub"
	"github.com/google/uuid"
	// 导入生成的gRPC客户端
	orderpb "github.com/your-repo/saga/gen/order"
	paymentpb "github.com/your-repo/saga/gen/payment"
	inventorypb "github.com/your-repo/saga/gen/inventory"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
)

// SagaState 定义了Saga的当前状态
type SagaState string

const (
	StateStarted           SagaState = "STARTED"
	StateOrderCreated      SagaState = "ORDER_CREATED"
	StatePaymentProcessed  SagaState = "PAYMENT_PROCESSED"
	StateInventoryDeducted SagaState = "INVENTORY_DEDUCTED"
	StateCompleted         SagaState = "COMPLETED"
	StateFailed            SagaState = "FAILED"
)

// Saga represents a distributed transaction instance.
// 在真实项目中,这部分状态需要持久化到Redis或数据库中。
// 这里为了简化,我们使用内存存储。
type Saga struct {
	ID        string
	State     SagaState
	Payload   map[string]interface{}
	History   []SagaState
	mu        sync.Mutex
	CreatedAt time.Time
}

// Orchestrator manages sagas.
type Orchestrator struct {
	sagas          sync.Map // In-memory store: map[string]*Saga
	orderClient    orderpb.OrderServiceClient
	paymentClient  paymentpb.PaymentServiceClient
	inventoryClient inventorypb.InventoryServiceClient
	pubsubClient   *pubsub.Client
	projectID      string
}

// Event received from Pub/Sub
type SagaEvent struct {
	SagaID   string    `json:"saga_id"`
	Source   string    `json:"source"`   // e.g., "order_service"
	Status   string    `json:"status"`   // "SUCCESS" or "FAILURE"
	Payload  map[string]interface{} `json:"payload"`
}


func NewOrchestrator() *Orchestrator {
	// ... (gRPC client and Pub/Sub client initialization)
	// 在生产环境中,应处理连接错误和重试
	// gRPC connections
	orderConn, err := grpc.Dial("order-service:50051", grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatalf("did not connect to order service: %v", err)
	}
	paymentConn, err := grpc.Dial("payment-service:50052", grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatalf("did not connect to payment service: %v", err)
	}
	inventoryConn, err := grpc.Dial("inventory-service:50053", grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatalf("did not connect to inventory service: %v", err)
	}
    
    // Pub/Sub client
    ctx := context.Background()
    projectID := "your-gcp-project-id"
    psClient, err := pubsub.NewClient(ctx, projectID)
    if err != nil {
        log.Fatalf("Failed to create pubsub client: %v", err)
    }

	return &Orchestrator{
		orderClient:    orderpb.NewOrderServiceClient(orderConn),
		paymentClient:  paymentpb.NewPaymentServiceClient(paymentConn),
		inventoryClient: inventorypb.NewInventoryServiceClient(inventoryConn),
		pubsubClient:   psClient,
		projectID:      projectID,
	}
}

func (o *Orchestrator) StartNewSaga(ctx context.Context, payload map[string]interface{}) {
	sagaID := uuid.New().String()
	saga := &Saga{
		ID:        sagaID,
		State:     StateStarted,
		Payload:   payload,
		History:   []SagaState{StateStarted},
		CreatedAt: time.Now(),
	}
	o.sagas.Store(sagaID, saga)
	log.Printf("Saga %s started", sagaID)
	o.executeNextStep(ctx, saga)
}

func (o *Orchestrator) executeNextStep(ctx context.Context, saga *Saga) {
	saga.mu.Lock()
	defer saga.mu.Unlock()

	// 这是一个简化的状态机。在真实项目中,会使用更复杂的决策逻辑。
	switch saga.State {
	case StateStarted:
		log.Printf("Saga %s: Executing CreateOrder", saga.ID)
		// ... 从payload中提取参数
		_, err := o.orderClient.CreateOrder(ctx, &orderpb.CreateOrderRequest{
			SagaId:   saga.ID,
			UserId:   "user-123", // Example data
			ItemId:   "item-abc",
			Quantity: 1,
		})
		if err != nil {
			log.Printf("Saga %s: CreateOrder failed: %v", saga.ID, err)
			o.failSaga(ctx, saga)
		}
		// 等待来自OrderService的Pub/Sub事件来驱动下一步

	case StateOrderCreated:
		log.Printf("Saga %s: Executing ProcessPayment", saga.ID)
		orderID := saga.Payload["order_id"].(string)
		_, err := o.paymentClient.ProcessPayment(ctx, &paymentpb.ProcessPaymentRequest{
			SagaId:  saga.ID,
			OrderId: orderID,
			Amount:  99.99,
		})
		if err != nil {
			log.Printf("Saga %s: ProcessPayment failed: %v", saga.ID, err)
			o.failSaga(ctx, saga)
		}

	case StatePaymentProcessed:
		log.Printf("Saga %s: Executing DeductInventory", saga.ID)
		orderID := saga.Payload["order_id"].(string)
		_, err := o.inventoryClient.DeductInventory(ctx, &inventorypb.DeductInventoryRequest{
			SagaId:   saga.ID,
			OrderId:  orderID,
			ItemId:   "item-abc",
			Quantity: 1,
		})
        if err != nil {
			log.Printf("Saga %s: DeductInventory failed: %v", saga.ID, err)
			o.failSaga(ctx, saga)
		}

	case StateInventoryDeducted:
		log.Printf("Saga %s: COMPLETED SUCCESSFULLY", saga.ID)
		saga.State = StateCompleted
        o.sagas.Store(saga.ID, saga)
	}
}

// failSaga 启动补偿流程
func (o *Orchestrator) failSaga(ctx context.Context, saga *Saga) {
	saga.State = StateFailed
	log.Printf("Saga %s: Starting compensation flow", saga.ID)
	
	// 从后往前执行补偿操作
	for i := len(saga.History) - 1; i >= 0; i-- {
		step := saga.History[i]
		switch step {
		case StatePaymentProcessed:
			log.Printf("Saga %s: Compensating payment", saga.ID)
			transactionID := saga.Payload["transaction_id"].(string)
			_, err := o.paymentClient.RefundPayment(ctx, &paymentpb.RefundPaymentRequest{
				SagaId: saga.ID,
				TransactionId: transactionID,
			})
			if err != nil {
				// 这里的坑在于:补偿操作本身也可能失败。
				// 生产级系统需要实现重试、告警,甚至人工干预流程。
				log.Printf("CRITICAL: Saga %s: FAILED TO COMPENSATE PAYMENT: %v", saga.ID, err)
			}

		case StateOrderCreated:
			log.Printf("Saga %s: Compensating order", saga.ID)
			orderID := saga.Payload["order_id"].(string)
			_, err := o.orderClient.CancelOrder(ctx, &orderpb.CancelOrderRequest{
				SagaId: saga.ID,
				OrderId: orderID,
			})
			if err != nil {
				log.Printf("CRITICAL: Saga %s: FAILED TO COMPENSATE ORDER: %v", saga.ID, err)
			}
		}
	}
    o.sagas.Store(saga.ID, saga)
}

func (o *Orchestrator) ListenForSagaEvents() {
    subID := "saga-orchestrator-sub" // 订阅所有服务事件的同一个订阅
    sub := o.pubsubClient.Subscription(subID)
    
    log.Println("Orchestrator starts listening for events...")
    err := sub.Receive(context.Background(), func(ctx context.Context, msg *pubsub.Message) {
        msg.Ack() // 无论处理成功与否,先Ack防止消息风暴
        
        var event SagaEvent
        if err := json.Unmarshal(msg.Data, &event); err != nil {
            log.Printf("Failed to unmarshal event: %v", err)
            return
        }
        
        log.Printf("Received event for Saga %s from %s with status %s", event.SagaID, event.Source, event.Status)

        val, ok := o.sagas.Load(event.SagaID)
        if !ok {
            log.Printf("Saga %s not found", event.SagaID)
            return
        }
        saga := val.(*Saga)

        saga.mu.Lock()
        // 将事件中的payload合并到Saga的payload中
        for k, v := range event.Payload {
            saga.Payload[k] = v
        }

        var nextState SagaState
        if event.Status == "SUCCESS" {
            switch saga.State {
            case StateStarted:
                nextState = StateOrderCreated
            case StateOrderCreated:
                nextState = StatePaymentProcessed
            case StatePaymentProcessed:
                nextState = StateInventoryDeducted
            }
        } else {
            saga.mu.Unlock()
            o.failSaga(ctx, saga)
            return
        }

        saga.State = nextState
        saga.History = append(saga.History, nextState)
        saga.mu.Unlock()

        o.sagas.Store(saga.ID, saga)
        
        o.executeNextStep(ctx, saga)
    })
    
    if err != nil {
        log.Fatalf("Pub/Sub Receive error: %v", err)
    }
}

func main() {
    orchestrator := NewOrchestrator()
    go orchestrator.ListenForSagaEvents()

    // 模拟一个HTTP触发器来启动Saga
    // In a real application, this would be an API endpoint.
    time.Sleep(5 * time.Second) // Wait for listener to start
    log.Println("--- Triggering a new Saga ---")
    orchestrator.StartNewSaga(context.Background(), make(map[string]interface{}))
    
    select{} // Keep the application running
}

协调器的实现有几个关键点:

  1. 状态持久化:示例中使用sync.Map作为内存存储,这在生产中是不可接受的。协调器重启会导致所有进行中的Saga状态丢失。必须使用Redis、PostgreSQL或类似工具持久化Saga实例的状态。
  2. 事件驱动:协调器通过gRPC发出命令,然后等待参与者服务通过Pub/Sub发回事件。这种异步模型解耦了协调器和参与者,提高了系统的弹性。
  3. 补偿逻辑failSaga函数是Saga模式的核心。它必须按业务逻辑的逆序执行补偿操作。一个常见的错误是补偿逻辑本身没有做好错误处理和重试。

参与者服务的实现

每个微服务都需要实现其gRPC接口,并在完成业务逻辑后发布事件到Pub/Sub。

payment-service/main.go (以支付服务为例)

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"net"
	"sync"
	"time"

	"cloud.google.com/go/pubsub"
	paymentpb "github.com/your-repo/saga/gen/payment"
	"google.golang.org/grpc"
)

// SagaEvent is the message published to Pub/Sub
type SagaEvent struct {
	SagaID  string                 `json:"saga_id"`
	Source  string                 `json:"source"`
	Status  string                 `json:"status"`
	Payload map[string]interface{} `json:"payload"`
}

type server struct {
	paymentpb.UnimplementedPaymentServiceServer
	processedSagas sync.Map // For idempotency
	pubsubClient   *pubsub.Client
	pubsubTopic    *pubsub.Topic
}

func newServer() *server {
	ctx := context.Background()
	projectID := "your-gcp-project-id"
	topicID := "saga-events"

	psClient, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		log.Fatalf("Failed to create pubsub client: %v", err)
	}

	topic := psClient.Topic(topicID)
	exists, err := topic.Exists(ctx)
	if err != nil {
		log.Fatalf("Failed to check topic existence: %v", err)
	}
	if !exists {
		topic, err = psClient.CreateTopic(ctx, topicID)
		if err != nil {
			log.Fatalf("Failed to create topic: %v", err)
		}
	}

	return &server{
		pubsubClient: psClient,
		pubsubTopic:  topic,
	}
}

// ProcessPayment a real implementation would connect to a payment gateway.
func (s *server) ProcessPayment(ctx context.Context, req *paymentpb.ProcessPaymentRequest) (*paymentpb.ProcessPaymentResponse, error) {
	log.Printf("Received ProcessPayment request for Saga %s, Order %s", req.SagaId, req.OrderId)

	// 1. Idempotency Check
	if _, loaded := s.processedSagas.LoadOrStore(req.SagaId, true); loaded {
		log.Printf("Saga %s already processed for payment. Ignoring.", req.SagaId)
		// 即使是重复请求,也要返回成功并发布事件,因为协调器可能没收到上一次的事件。
		// 关键在于业务逻辑不能重复执行。
		transactionID := "existing-tx-id-for-" + req.SagaId // Fetch from DB in real app
		s.publishEvent(ctx, req.SagaId, "SUCCESS", map[string]interface{}{"transaction_id": transactionID})
		return &paymentpb.ProcessPaymentResponse{TransactionId: transactionID}, nil
	}

	// 2. Business Logic
	time.Sleep(1 * time.Second) // Simulate processing delay
	transactionID := fmt.Sprintf("tx_%d", time.Now().UnixNano())

	// 3. Publish Event
	payload := map[string]interface{}{"transaction_id": transactionID}
	s.publishEvent(ctx, req.SagaId, "SUCCESS", payload)
	
	log.Printf("Payment for Saga %s successful. Transaction ID: %s", req.SagaId, transactionID)
	return &paymentpb.ProcessPaymentResponse{TransactionId: transactionID}, nil
}

func (s *server) RefundPayment(ctx context.Context, req *paymentpb.RefundPaymentRequest) (*paymentpb.RefundPaymentResponse, error) {
	log.Printf("Received RefundPayment (compensation) for Saga %s, Transaction %s", req.SagaId, req.TransactionId)
	
	// Idempotency and business logic for refund...
	// ...

	log.Printf("Refund for Saga %s successful.", req.SagaId)
	return &paymentpb.RefundPaymentResponse{}, nil
}

func (s *server) publishEvent(ctx context.Context, sagaID, status string, payload map[string]interface{}) {
	event := SagaEvent{
		SagaID:  sagaID,
		Source:  "payment_service",
		Status:  status,
		Payload: payload,
	}
	data, err := json.Marshal(event)
	if err != nil {
		log.Printf("Failed to marshal event for saga %s: %v", sagaID, err)
		return
	}
	
	result := s.pubsubTopic.Publish(ctx, &pubsub.Message{Data: data})
	
	// Block until the result is returned and log server-generated message ID.
	msgID, err := result.Get(ctx)
	if err != nil {
		log.Printf("Failed to publish event for saga %s: %v", sagaID, err)
		return
	}
	log.Printf("Published event for saga %s, message ID: %s", sagaID, msgID)
}

func main() {
	lis, err := net.Listen("tcp", ":50052")
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	s := grpc.NewServer()
	paymentpb.RegisterPaymentServiceServer(s, newServer())
	log.Println("Payment service listening at :50052")
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

参与者服务的关键在于:

  1. 幂等性实现sync.Map同样是简化示例。在生产中,通常使用数据库中的一张表来记录已处理的saga_id,并在事务中检查和插入,以保证原子性。
  2. 原子性:业务操作(如数据库更新)和事件发布必须具备原子性。一个常见的错误是先更新数据库,但在发布事件前服务崩溃,导致Saga流程卡住。解决方案是使用“事务性发件箱模式”(Transactional Outbox Pattern),将事件写入本地数据库的同一事务中,再由另一个进程轮询该表并将事件可靠地发送到Pub/Sub。
  3. 补偿接口:补偿操作必须是可靠的,并且也需要幂等。

容器化与部署

使用Docker和Docker Compose可以轻松地在本地启动整个系统。

docker-compose.yml

version: '3.8'

services:
  orchestrator:
    build: ./orchestrator
    ports:
      - "8080:8080" # If it had an API
    environment:
      - GOOGLE_CLOUD_PROJECT=your-gcp-project-id
      # Use gcloud auth application-default login for local credentials
    volumes:
      - ~/.config/gcloud:/root/.config/gcloud
    depends_on:
      - order-service
      - payment-service
      - inventory-service

  order-service:
    build: ./order-service
    ports:
      - "50051:50051"
    environment:
      - GOOGLE_CLOUD_PROJECT=your-gcp-project-id
    volumes:
      - ~/.config/gcloud:/root/.config/gcloud

  payment-service:
    build: ./payment-service
    ports:
      - "50052:50052"
    environment:
      - GOOGLE_CLOUD_PROJECT=your-gcp-project-id
    volumes:
      - ~/.config/gcloud:/root/.config/gcloud

  inventory-service:
    build: ./inventory-service
    ports:
      - "50053:50053"
    environment:
      - GOOGLE_CLOUD_PROJECT=your-gcp-project-id
    volumes:
      - ~/.config/gcloud:/root/.config/gcloud

每个服务目录下都需要一个简单的Dockerfile

# Dockerfile for each Go service
FROM golang:1.20-alpine

WORKDIR /app

# Copy protos and gen folders if they are in a shared directory
COPY . .

RUN go mod tidy
RUN go build -o /app/server .

EXPOSE 50051 # Change port for each service

CMD [ "/app/server" ]

局限性与未来迭代路径

这套实现展示了Saga模式的核心思想,但距离一个真正健壮的生产系统还有距离。当前的方案主要存在以下局限性:

  1. 协调器状态的持久化:如前所述,内存中的Saga状态存储是单点故障。将其迁移到高可用的数据库或KV存储(如Redis, etcd)是第一要务。这会引入新的复杂性,如并发控制和分布式锁。
  2. 可观测性:当Saga流程失败时,排查问题非常困难。它跨越了多个服务和异步消息。必须引入分布式追踪(如OpenTelemetry),将saga_id作为追踪上下文的一部分,以便在Jaeger或Zipkin中看到完整的调用链。
  3. 补偿操作的失败:当前实现对补偿失败只是简单地打印了日志。一个生产级的系统需要一个完备的重试和告警机制。对于无法自动恢复的补偿失败,需要有“死信队列”和人工干预的流程。
  4. Saga定义与执行:当前的状态机逻辑硬编码在协调器中。更高级的实现会使用DSL(领域特定语言)或可视化编辑器来定义Saga流程,协调器动态解析并执行,从而将业务流程与协调器引擎本身解耦。

  目录