构建基于 Caddy、DynamoDB 与 Tekton 的无服务器 WebSocket 广播架构


有状态的 WebSocket 连接管理与无状态、可水平扩展的计算实例,这两者在架构设计上存在着天然的矛盾。一个典型的 WebSocket 服务器通常在内存中维护一个连接池,用于跟踪所有活跃的客户端。这种模式简单直接,但在一个实例随时可能被创建或销毁的无服务器(Serverless)或云原生环境中,内存中的状态会随着实例的终止而丢失,导致单点故障和扩展性瓶颈。我们的挑战是:构建一个高可用、可水平扩展的 WebSocket 服务,其连接状态管理必须与计算实例的生命周期完全解耦。

定义问题与架构权衡

目标是设计一个能够向数万乃至数十万个并发 WebSocket 客户端广播消息的系统。核心要求如下:

  1. 无状态应用层:应用实例可以随时被替换或扩展,不应持有任何关键连接状态。
  2. 持久化连接管理:所有活跃的连接信息必须持久化存储,独立于任何单个计算实例。
  3. 高可用广播:任何授权的服务都能够触发一个广播事件,该事件必须可靠地分发给所有当前连接的客户端。
  4. 自动化部署:整个服务的构建、测试、打包和部署流程必须是自动化的。

方案A:完全托管的云厂商方案 (例如 AWS API Gateway + Lambda)

这是一个常见的捷径。使用 AWS API Gateway 的 WebSocket API,它可以直接与 Lambda 函数集成。连接、断开和消息事件会触发不同的 Lambda。API Gateway 内部维护连接状态,并提供一个管理 API (@connections),允许后端的任何服务通过 Connection ID 向客户端发送消息。

  • 优势:
    • 快速实现:免去了大量基础设施的管理工作。
    • 完全无服务器:按量付费,自动扩展。
    • 与生态系统深度集成:与 IAM、DynamoDB 等无缝协作。
  • 劣势:
    • 厂商锁定: 整个实时通信层被 AWS 深度绑定。迁移成本极高。
    • 成本不可预测: 在大规模广播场景下,按消息和连接时长计费的模式可能会迅速膨胀,成本难以精确控制。
    • 控制力与透明度低: 对底层网络、路由和超时机制的控制力有限。

方案B:平台无关的自建核心架构

该方案采用一系列开源和云中立的技术栈,构建一个可部署在任何 Kubernetes 集群上的解决方案。

  • Caddy: 作为边缘入口,处理 TLS 终止、请求路由和 WebSocket 代理。其自动 HTTPS 和简洁的配置是巨大优势。

  • Go 应用实例: 运行在 Kubernetes 上的 Go 应用处理 WebSocket 协议握手和消息读写。应用本身是无状态的。

  • DynamoDB: 作为外部的、高可用的连接状态存储。选择 DynamoDB 是因为它是一个完全托管的 NoSQL 数据库,具备极低的延迟和按需扩展能力,完美契合连接状态管理的需求。

  • Tekton: 作为 Kubernetes 原生的 CI/CD 引擎,负责从代码提交到镜像构建和部署的整个自动化流程。

  • 优势:

    • 平台无关性: 核心逻辑不依赖特定云厂商,可以轻松迁移。
    • 成本可控: 主要成本在于 Kubernetes 计算资源和 DynamoDB 的读写容量,模型更清晰,更易于优化。
    • 完全控制: 对整个技术栈拥有完全的控制权和可见性。
  • 劣势:

    • 更高的运维复杂度: 需要维护 Kubernetes 集群、Caddy 配置和 Tekton 流水线。
    • 初始开发成本更高: 需要自行实现连接管理逻辑和广播机制。

在真实项目中,长期的灵活性和成本控制往往比初期的开发速度更重要。因此,我们选择方案B。它虽然更复杂,但提供了一个坚实、可控且不会被单一供应商绑架的架构基础。

核心实现概览

整个系统的交互流程可以用下面的图表来描述。

sequenceDiagram
    participant Client
    participant Caddy
    participant GoApp as Go WebSocket Pod (K8s)
    participant DynamoDB
    participant BroadcastAPI as Broadcast API Endpoint

    Client->>+Caddy: Upgrade to WebSocket
    Caddy->>+GoApp: Forward WebSocket Request
    GoApp->>+DynamoDB: SAVE Connection(connId)
    GoApp-->>-Client: WebSocket Connection Established
    
    Note over Client, GoApp: Client sends/receives messages...

    BroadcastAPI->>+GoApp: POST /broadcast (message)
    GoApp->>+DynamoDB: SCAN all connections
    DynamoDB-->>-GoApp: List of all connId
    
    Note over GoApp: This is the architectural challenge.
Pod A receives the broadcast request,
but connections are held by Pods A, B, C... GoApp-->>BroadcastAPI: Ack (Broadcast initiated) Client--xGoApp: Connection Closed GoApp->>-DynamoDB: DELETE Connection(connId)

这个流程图暴露了方案 B 的核心挑战:当一个 GoApp 实例(例如 Pod A)接收到广播请求时,它如何将消息发送给由其他实例(Pod B, Pod C)持有的 WebSocket 连接?我们将在实现部分深入探讨这个问题。

1. 边缘入口:Caddy 配置

Caddy 的角色是作为反向代理,将外部的 WebSocket 流量安全地路由到后端的 Kubernetes 服务。Caddyfile 的配置极其简洁。

# Caddyfile
# Replace example.com with your actual domain
ws.example.com {
    # Automatic HTTPS for the domain
    tls internal

    # Reverse proxy WebSocket requests to the Kubernetes service
    # The service name 'ws-service' and port '8080' must match your K8s service definition
    reverse_proxy /ws/* ws-service:8080 {
        # Required for WebSocket proxying
        header_up Host {http.request.host}
        header_up X-Real-IP {http.request.remote.ip}
        header_up X-Forwarded-For {http.request.remote.ip}
        header_up X-Forwarded-Proto {http.request.scheme}
    }

    # An endpoint to trigger broadcast messages
    reverse_proxy /api/broadcast ws-service:8080

    # Health check endpoint
    reverse_proxy /healthz ws-service:8080

    # Log configuration
    log {
        output file /var/log/caddy/access.log {
            roll_size 100mb
            roll_keep 10
        }
        level INFO
    }
}

这里的关键是 reverse_proxy 指令,它透明地处理了 HTTP Upgrade 请求,建立了 WebSocket 隧道。同时,我们还代理了一个用于广播的 API 端点和一个健康检查端点。

2. 连接状态持久化:DynamoDB 表设计

我们需要一个 DynamoDB 表来存储连接信息。设计非常简单:

  • 表名: WebSocketConnections
  • 主键: connectionId (String) - 一个在连接建立时生成的唯一标识符(如 UUID)。
  • 属性:
    • createdAt (Number) - 连接建立的时间戳,用于调试和分析。
    • ttl (Number) - TTL 时间戳。DynamoDB 可以根据这个字段自动删除过期的项目,这是一个非常有效的清理僵尸连接的机制。

在 Go 代码中,我们会为每个新连接设置一个合理的 TTL(例如24小时),并在连接保持活跃时定期刷新它。如果一个 Go 实例崩溃,其持有的连接记录最终会因为 TTL 过期而被 DynamoDB 自动清除。

3. 核心应用逻辑:Go WebSocket 服务

这是系统的核心。我们将用 Go 语言实现,因为它在处理高并发网络应用方面表现出色。

项目结构:

.
├── go.mod
├── go.sum
├── main.go
├── api
│   └── handler.go
└── store
    └── dynamodb.go

store/dynamodb.go: 封装所有与 DynamoDB 的交互。

package store

import (
	"context"
	"fmt"
	"log"
	"os"
	"time"

	"github.com/aws/aws-sdk-go-v2/aws"
	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
	"github.com/aws/aws-sdk-go-v2/service/dynamodb"
	"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
)

const (
	tableName    = "WebSocketConnections"
	ttlInHours   = 24
)

// ConnectionItem represents a record in our DynamoDB table.
type ConnectionItem struct {
	ConnectionID string `json:"connectionId" dynamodbav:"connectionId"`
	CreatedAt    int64  `json:"createdAt" dynamodbav:"createdAt"`
	TTL          int64  `json:"ttl" dynamodbav:"ttl"`
}

// Store manages the connection state in DynamoDB.
type Store struct {
	client *dynamodb.Client
}

// NewStore creates a new DynamoDB store client.
func NewStore(ctx context.Context) (*Store, error) {
	// Using the SDK's default configuration chain.
	// It will look for credentials in env vars, shared credentials file, and IAM role.
	cfg, err := config.LoadDefaultConfig(ctx, config.WithRegion(os.Getenv("AWS_REGION")))
	if err != nil {
		return nil, fmt.Errorf("unable to load SDK config: %w", err)
	}

	return &Store{
		client: dynamodb.NewFromConfig(cfg),
	}, nil
}

// AddConnection saves a new connection ID to DynamoDB.
func (s *Store) AddConnection(ctx context.Context, connID string) error {
	now := time.Now()
	item := ConnectionItem{
		ConnectionID: connID,
		CreatedAt:    now.Unix(),
		TTL:          now.Add(ttlInHours * time.Hour).Unix(),
	}

	av, err := attributevalue.MarshalMap(item)
	if err != nil {
		return fmt.Errorf("failed to marshal connection item: %w", err)
	}

	_, err = s.client.PutItem(ctx, &dynamodb.PutItemInput{
		TableName: aws.String(tableName),
		Item:      av,
	})
	if err != nil {
		return fmt.Errorf("failed to put item to DynamoDB: %w", err)
	}
	log.Printf("Successfully added connection: %s", connID)
	return nil
}

// RemoveConnection deletes a connection ID from DynamoDB.
func (s *Store) RemoveConnection(ctx context.Context, connID string) error {
	key, err := attributevalue.MarshalMap(map[string]string{"connectionId": connID})
	if err != nil {
		return fmt.Errorf("failed to marshal key: %w", err)
	}

	_, err = s.client.DeleteItem(ctx, &dynamodb.DeleteItemInput{
		TableName: aws.String(tableName),
		Key:       key,
	})
	if err != nil {
		// In a production system, you might want to handle ConditionalCheckFailedException
		// if the item was already deleted, which is not an error in this context.
		return fmt.Errorf("failed to delete item from DynamoDB: %w", err)
	}
	log.Printf("Successfully removed connection: %s", connID)
	return nil
}

// GetAllConnections scans the entire table and returns all connection IDs.
// WARNING: This is a potentially slow and expensive operation for large tables.
// In a production broadcast system, this should be replaced with a proper pub/sub mechanism.
func (s *Store) GetAllConnections(ctx context.Context) ([]string, error) {
	var connections []string
	paginator := dynamodb.NewScanPaginator(s.client, &dynamodb.ScanInput{
		TableName:            aws.String(tableName),
		ProjectionExpression: aws.String("connectionId"),
	})

	for paginator.HasMorePages() {
		page, err := paginator.NextPage(ctx)
		if err != nil {
			return nil, fmt.Errorf("failed to scan DynamoDB page: %w", err)
		}

		var pageItems []ConnectionItem
		err = attributevalue.UnmarshalListOfMaps(page.Items, &pageItems)
		if err != nil {
			return nil, fmt.Errorf("failed to unmarshal scan page items: %w", err)
		}

		for _, item := range pageItems {
			connections = append(connections, item.ConnectionID)
		}
	}
	return connections, nil
}

关键点: GetAllConnections 函数中的警告注释至关重要。直接扫描 DynamoDB 表来实现广播是反模式的,效率低下且成本高昂。我们在这里实现它,是为了完整地展示一个基本可用的系统,并以此为基础讨论其架构缺陷和改进方向。

api/handler.go: HTTP 和 WebSocket 处理器。

package api

import (
	"context"
	"encoding/json"
	"log"
	"net/http"
	"sync"
	"time"

	"github.com/google/uuid"
	"github.com/gorilla/websocket"
	"your-module/store"
)

var upgrader = websocket.Upgrader{
	CheckOrigin: func(r *http.Request) bool {
		// In production, you should implement a proper origin check.
		return true
	},
	ReadBufferSize:  1024,
	WriteBufferSize: 1024,
}

// We maintain a local map of connections held by this specific pod.
// The key is connectionId, value is the actual websocket connection.
var localConnections = struct {
	sync.RWMutex
	m map[string]*websocket.Conn
}{m: make(map[string]*websocket.Conn)}

// Handler holds dependencies like the database store.
type Handler struct {
	store *store.Store
}

func NewHandler(s *store.Store) *Handler {
	return &Handler{store: s}
}

// WsHandler handles the WebSocket connection lifecycle.
func (h *Handler) WsHandler(w http.ResponseWriter, r *http.Request) {
	conn, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		log.Printf("Failed to upgrade connection: %v", err)
		return
	}

	connID := uuid.New().String()

	// 1. Add to local map
	localConnections.Lock()
	localConnections.m[connID] = conn
	localConnections.Unlock()

	// 2. Add to persistent store
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	if err := h.store.AddConnection(ctx, connID); err != nil {
		log.Printf("Failed to save connection %s to store: %v", connID, err)
		conn.Close()
		// Also remove from local map if it was added
		localConnections.Lock()
		delete(localConnections.m, connID)
		localConnections.Unlock()
		return
	}

	// This defer ensures cleanup happens on any exit path of this function.
	defer func() {
		log.Printf("Closing connection: %s", connID)
		conn.Close()

		// Clean up from local map
		localConnections.Lock()
		delete(localConnections.m, connID)
		localConnections.Unlock()

		// Clean up from persistent store
		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
		defer cancel()
		if err := h.store.RemoveConnection(ctx, connID); err != nil {
			log.Printf("Failed to remove connection %s from store: %v", connID, err)
		}
	}()

	log.Printf("New connection established: %s", connID)

	// Keep the connection alive by handling incoming messages.
	// A production implementation would have ping/pong handlers.
	for {
		// We can read messages here, but for a broadcast service, we might just ignore them.
		if _, _, err := conn.ReadMessage(); err != nil {
			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
				log.Printf("Unexpected close error for %s: %v", connID, err)
			} else {
				log.Printf("Connection %s closed gracefully.", connID)
			}
			break // Exit the loop to trigger the defer cleanup.
		}
	}
}

type BroadcastMessage struct {
	Message string `json:"message"`
}

// BroadcastHandler receives a message and attempts to send it to all clients.
func (h *Handler) BroadcastHandler(w http.ResponseWriter, r *http.Request) {
	var msg BroadcastMessage
	if err := json.NewDecoder(r.Body).Decode(&msg); err != nil {
		http.Error(w, "Invalid request body", http.StatusBadRequest)
		return
	}

	if msg.Message == "" {
		http.Error(w, "Message cannot be empty", http.StatusBadRequest)
		return
	}
	
	// THIS IS THE ARCHITECTURAL FLAW
	// The broadcast logic here is incorrect for a multi-pod setup.
	// This pod only knows about its own `localConnections`.
	// A proper solution requires a message bus (e.g., Redis Pub/Sub, NATS).
	// When a message is published, ALL pods receive it and then broadcast
	// to their respective local connections.
	
	log.Println("--- WARNING: Naive broadcast implementation ---")
	localConnections.RLock()
	defer localConnections.RUnlock()
	
	log.Printf("Broadcasting message to %d local connections", len(localConnections.m))
	for id, conn := range localConnections.m {
		if err := conn.WriteMessage(websocket.TextMessage, []byte(msg.Message)); err != nil {
			log.Printf("Failed to write message to connection %s: %v", id, err)
		}
	}

	w.WriteHeader(http.StatusOK)
	w.Write([]byte("Broadcast initiated to local connections."))
}

代码剖析:

  • WsHandler: 这是连接生命周期管理的核心。当一个新连接建立时,它会:
    1. 生成一个唯一的 connID
    2. 将连接对象 *websocket.Conn 存入一个仅存于当前 PodlocalConnections map 中。
    3. connID 存入 DynamoDB。
    4. 使用 defer 块确保当函数退出(无论是正常关闭还是错误)时,连接会从本地 map 和 DynamoDB 中被清理。
  • BroadcastHandler: 这里的实现故意暴露了架构的弱点。它只遍历了当前 Pod 的 localConnections map。在一个有10个 Pod 的环境中,一个广播请求只会触达10%的客户端。这是用代码来阐明架构挑战的直接方式。

4. 自动化构建与部署:Tekton CI/CD

Tekton 允许我们在 Kubernetes 内部定义和执行 CI/CD 流水线。

tekton/task-build-and-push.yaml: 定义一个原子任务,用于构建 Go 应用并将其推送到镜像仓库。

apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
  name: go-build-push
spec:
  params:
    - name: imageUrl
      description: The URL of the image to push
      type: string
    - name: imageTag
      description: The tag of the image
      type: string
      default: "latest"
  workspaces:
    - name: source
      description: The workspace containing the source code
  steps:
    - name: build
      image: golang:1.19-alpine
      workingDir: $(workspaces.source.path)
      script: |
        #!/bin/sh
        set -e
        echo "--- Building Go application ---"
        CGO_ENABLED=0 GOOS=linux go build -v -o app .
        echo "--- Build complete ---"
      
    - name: build-and-push-image
      image: gcr.io/kaniko-project/executor:v1.9.0
      # Kaniko does not require a Docker daemon and runs in userspace.
      args:
        - "--dockerfile=./Dockerfile"
        - "--context=$(workspaces.source.path)"
        - "--destination=$(params.imageUrl):$(params.imageTag)"
      # Assumes a Kubernetes secret `docker-credentials` is mounted for registry auth.

tekton/pipeline-ws-service.yaml: 将多个任务串联成一个完整的流水线。

apiVersion: tekton.dev/v1beta1
kind: Pipeline
metadata:
  name: ws-service-pipeline
spec:
  params:
    - name: repoUrl
      type: string
    - name: revision
      type: string
      default: "main"
    - name: imageUrl
      type: string
  workspaces:
    - name: shared-data
  tasks:
    - name: fetch-source
      taskRef:
        name: git-clone
      workspaces:
        - name: output
          workspace: shared-data
      params:
        - name: url
          value: $(params.repoUrl)
        - name: revision
          value: $(params.revision)

    - name: build-image
      taskRef:
        name: go-build-push
      runAfter: [ "fetch-source" ]
      workspaces:
        - name: source
          workspace: shared-data
      params:
        - name: imageUrl
          value: $(params.imageUrl)

这条流水线首先从 Git 仓库拉取代码,然后调用我们之前定义的 go-build-push 任务来构建和推送镜像。一个 PipelineRun 资源被创建时,Tekton 会根据这个定义执行整个流程。这为后续使用 GitOps 工具(如 ArgoCD)自动触发部署铺平了道路。

架构的局限性与未来迭代

当前架构成功地将连接状态与计算实例解耦,实现了应用层的无状态化和水平扩展能力。Caddy 提供了可靠的入口,DynamoDB 提供了高可用的状态存储,Tekton 实现了自动化构建。然而,它并非没有缺陷。

最核心的局限性在于广播机制。如代码注释所强调,依赖单个 Pod 接收请求并试图通知所有客户端是行不通的。一个生产级的解决方案必须引入一个中间消息总线(例如 Redis Pub/Sub、NATS 或 AWS SNS)。架构会演变为:

  1. 广播 API 接收请求后,不再扫描 DynamoDB,而是将消息发布到一个全局的 Topic。
  2. 所有 GoApp 实例都订阅这个 Topic。
  3. 当消息到达时,每个实例将消息分发给它本地管理的 WebSocket 连接。

这种发布/订阅模式将广播操作从 O(N) 的数据库扫描和复杂的跨 Pod 通信,转变为一个高效、解耦的事件驱动流程。

其次,对于超大规模的连接(数百万级),对 DynamoDB 进行全表扫描以进行管理或调试的成本会变得很高。在这种情况下,可能需要考虑更复杂的索引策略,或者将连接信息分片到多个表中。

最后,本方案仅涵盖了 CI 部分。一个完整的 GitOps 流程还需要集成 ArgoCD 或 Flux,监听镜像仓库的变化,并自动将新版本的应用部署到 Kubernetes 集群中。


  目录