有状态的 WebSocket 连接管理与无状态、可水平扩展的计算实例,这两者在架构设计上存在着天然的矛盾。一个典型的 WebSocket 服务器通常在内存中维护一个连接池,用于跟踪所有活跃的客户端。这种模式简单直接,但在一个实例随时可能被创建或销毁的无服务器(Serverless)或云原生环境中,内存中的状态会随着实例的终止而丢失,导致单点故障和扩展性瓶颈。我们的挑战是:构建一个高可用、可水平扩展的 WebSocket 服务,其连接状态管理必须与计算实例的生命周期完全解耦。
定义问题与架构权衡
目标是设计一个能够向数万乃至数十万个并发 WebSocket 客户端广播消息的系统。核心要求如下:
- 无状态应用层:应用实例可以随时被替换或扩展,不应持有任何关键连接状态。
- 持久化连接管理:所有活跃的连接信息必须持久化存储,独立于任何单个计算实例。
- 高可用广播:任何授权的服务都能够触发一个广播事件,该事件必须可靠地分发给所有当前连接的客户端。
- 自动化部署:整个服务的构建、测试、打包和部署流程必须是自动化的。
方案A:完全托管的云厂商方案 (例如 AWS API Gateway + Lambda)
这是一个常见的捷径。使用 AWS API Gateway 的 WebSocket API,它可以直接与 Lambda 函数集成。连接、断开和消息事件会触发不同的 Lambda。API Gateway 内部维护连接状态,并提供一个管理 API (@connections
),允许后端的任何服务通过 Connection ID 向客户端发送消息。
- 优势:
- 快速实现:免去了大量基础设施的管理工作。
- 完全无服务器:按量付费,自动扩展。
- 与生态系统深度集成:与 IAM、DynamoDB 等无缝协作。
- 劣势:
- 厂商锁定: 整个实时通信层被 AWS 深度绑定。迁移成本极高。
- 成本不可预测: 在大规模广播场景下,按消息和连接时长计费的模式可能会迅速膨胀,成本难以精确控制。
- 控制力与透明度低: 对底层网络、路由和超时机制的控制力有限。
方案B:平台无关的自建核心架构
该方案采用一系列开源和云中立的技术栈,构建一个可部署在任何 Kubernetes 集群上的解决方案。
Caddy: 作为边缘入口,处理 TLS 终止、请求路由和 WebSocket 代理。其自动 HTTPS 和简洁的配置是巨大优势。
Go 应用实例: 运行在 Kubernetes 上的 Go 应用处理 WebSocket 协议握手和消息读写。应用本身是无状态的。
DynamoDB: 作为外部的、高可用的连接状态存储。选择 DynamoDB 是因为它是一个完全托管的 NoSQL 数据库,具备极低的延迟和按需扩展能力,完美契合连接状态管理的需求。
Tekton: 作为 Kubernetes 原生的 CI/CD 引擎,负责从代码提交到镜像构建和部署的整个自动化流程。
优势:
- 平台无关性: 核心逻辑不依赖特定云厂商,可以轻松迁移。
- 成本可控: 主要成本在于 Kubernetes 计算资源和 DynamoDB 的读写容量,模型更清晰,更易于优化。
- 完全控制: 对整个技术栈拥有完全的控制权和可见性。
劣势:
- 更高的运维复杂度: 需要维护 Kubernetes 集群、Caddy 配置和 Tekton 流水线。
- 初始开发成本更高: 需要自行实现连接管理逻辑和广播机制。
在真实项目中,长期的灵活性和成本控制往往比初期的开发速度更重要。因此,我们选择方案B。它虽然更复杂,但提供了一个坚实、可控且不会被单一供应商绑架的架构基础。
核心实现概览
整个系统的交互流程可以用下面的图表来描述。
sequenceDiagram participant Client participant Caddy participant GoApp as Go WebSocket Pod (K8s) participant DynamoDB participant BroadcastAPI as Broadcast API Endpoint Client->>+Caddy: Upgrade to WebSocket Caddy->>+GoApp: Forward WebSocket Request GoApp->>+DynamoDB: SAVE Connection(connId) GoApp-->>-Client: WebSocket Connection Established Note over Client, GoApp: Client sends/receives messages... BroadcastAPI->>+GoApp: POST /broadcast (message) GoApp->>+DynamoDB: SCAN all connections DynamoDB-->>-GoApp: List of all connId Note over GoApp: This is the architectural challenge.
Pod A receives the broadcast request,
but connections are held by Pods A, B, C... GoApp-->>BroadcastAPI: Ack (Broadcast initiated) Client--xGoApp: Connection Closed GoApp->>-DynamoDB: DELETE Connection(connId)
这个流程图暴露了方案 B 的核心挑战:当一个 GoApp
实例(例如 Pod A)接收到广播请求时,它如何将消息发送给由其他实例(Pod B, Pod C)持有的 WebSocket 连接?我们将在实现部分深入探讨这个问题。
1. 边缘入口:Caddy 配置
Caddy 的角色是作为反向代理,将外部的 WebSocket 流量安全地路由到后端的 Kubernetes 服务。Caddyfile
的配置极其简洁。
# Caddyfile
# Replace example.com with your actual domain
ws.example.com {
# Automatic HTTPS for the domain
tls internal
# Reverse proxy WebSocket requests to the Kubernetes service
# The service name 'ws-service' and port '8080' must match your K8s service definition
reverse_proxy /ws/* ws-service:8080 {
# Required for WebSocket proxying
header_up Host {http.request.host}
header_up X-Real-IP {http.request.remote.ip}
header_up X-Forwarded-For {http.request.remote.ip}
header_up X-Forwarded-Proto {http.request.scheme}
}
# An endpoint to trigger broadcast messages
reverse_proxy /api/broadcast ws-service:8080
# Health check endpoint
reverse_proxy /healthz ws-service:8080
# Log configuration
log {
output file /var/log/caddy/access.log {
roll_size 100mb
roll_keep 10
}
level INFO
}
}
这里的关键是 reverse_proxy
指令,它透明地处理了 HTTP Upgrade 请求,建立了 WebSocket 隧道。同时,我们还代理了一个用于广播的 API 端点和一个健康检查端点。
2. 连接状态持久化:DynamoDB 表设计
我们需要一个 DynamoDB 表来存储连接信息。设计非常简单:
- 表名:
WebSocketConnections
- 主键:
connectionId
(String) - 一个在连接建立时生成的唯一标识符(如 UUID)。 - 属性:
-
createdAt
(Number) - 连接建立的时间戳,用于调试和分析。 -
ttl
(Number) - TTL 时间戳。DynamoDB 可以根据这个字段自动删除过期的项目,这是一个非常有效的清理僵尸连接的机制。
-
在 Go 代码中,我们会为每个新连接设置一个合理的 TTL(例如24小时),并在连接保持活跃时定期刷新它。如果一个 Go 实例崩溃,其持有的连接记录最终会因为 TTL 过期而被 DynamoDB 自动清除。
3. 核心应用逻辑:Go WebSocket 服务
这是系统的核心。我们将用 Go 语言实现,因为它在处理高并发网络应用方面表现出色。
项目结构:
.
├── go.mod
├── go.sum
├── main.go
├── api
│ └── handler.go
└── store
└── dynamodb.go
store/dynamodb.go
: 封装所有与 DynamoDB 的交互。
package store
import (
"context"
"fmt"
"log"
"os"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
)
const (
tableName = "WebSocketConnections"
ttlInHours = 24
)
// ConnectionItem represents a record in our DynamoDB table.
type ConnectionItem struct {
ConnectionID string `json:"connectionId" dynamodbav:"connectionId"`
CreatedAt int64 `json:"createdAt" dynamodbav:"createdAt"`
TTL int64 `json:"ttl" dynamodbav:"ttl"`
}
// Store manages the connection state in DynamoDB.
type Store struct {
client *dynamodb.Client
}
// NewStore creates a new DynamoDB store client.
func NewStore(ctx context.Context) (*Store, error) {
// Using the SDK's default configuration chain.
// It will look for credentials in env vars, shared credentials file, and IAM role.
cfg, err := config.LoadDefaultConfig(ctx, config.WithRegion(os.Getenv("AWS_REGION")))
if err != nil {
return nil, fmt.Errorf("unable to load SDK config: %w", err)
}
return &Store{
client: dynamodb.NewFromConfig(cfg),
}, nil
}
// AddConnection saves a new connection ID to DynamoDB.
func (s *Store) AddConnection(ctx context.Context, connID string) error {
now := time.Now()
item := ConnectionItem{
ConnectionID: connID,
CreatedAt: now.Unix(),
TTL: now.Add(ttlInHours * time.Hour).Unix(),
}
av, err := attributevalue.MarshalMap(item)
if err != nil {
return fmt.Errorf("failed to marshal connection item: %w", err)
}
_, err = s.client.PutItem(ctx, &dynamodb.PutItemInput{
TableName: aws.String(tableName),
Item: av,
})
if err != nil {
return fmt.Errorf("failed to put item to DynamoDB: %w", err)
}
log.Printf("Successfully added connection: %s", connID)
return nil
}
// RemoveConnection deletes a connection ID from DynamoDB.
func (s *Store) RemoveConnection(ctx context.Context, connID string) error {
key, err := attributevalue.MarshalMap(map[string]string{"connectionId": connID})
if err != nil {
return fmt.Errorf("failed to marshal key: %w", err)
}
_, err = s.client.DeleteItem(ctx, &dynamodb.DeleteItemInput{
TableName: aws.String(tableName),
Key: key,
})
if err != nil {
// In a production system, you might want to handle ConditionalCheckFailedException
// if the item was already deleted, which is not an error in this context.
return fmt.Errorf("failed to delete item from DynamoDB: %w", err)
}
log.Printf("Successfully removed connection: %s", connID)
return nil
}
// GetAllConnections scans the entire table and returns all connection IDs.
// WARNING: This is a potentially slow and expensive operation for large tables.
// In a production broadcast system, this should be replaced with a proper pub/sub mechanism.
func (s *Store) GetAllConnections(ctx context.Context) ([]string, error) {
var connections []string
paginator := dynamodb.NewScanPaginator(s.client, &dynamodb.ScanInput{
TableName: aws.String(tableName),
ProjectionExpression: aws.String("connectionId"),
})
for paginator.HasMorePages() {
page, err := paginator.NextPage(ctx)
if err != nil {
return nil, fmt.Errorf("failed to scan DynamoDB page: %w", err)
}
var pageItems []ConnectionItem
err = attributevalue.UnmarshalListOfMaps(page.Items, &pageItems)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal scan page items: %w", err)
}
for _, item := range pageItems {
connections = append(connections, item.ConnectionID)
}
}
return connections, nil
}
关键点: GetAllConnections
函数中的警告注释至关重要。直接扫描 DynamoDB 表来实现广播是反模式的,效率低下且成本高昂。我们在这里实现它,是为了完整地展示一个基本可用的系统,并以此为基础讨论其架构缺陷和改进方向。
api/handler.go
: HTTP 和 WebSocket 处理器。
package api
import (
"context"
"encoding/json"
"log"
"net/http"
"sync"
"time"
"github.com/google/uuid"
"github.com/gorilla/websocket"
"your-module/store"
)
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
// In production, you should implement a proper origin check.
return true
},
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
// We maintain a local map of connections held by this specific pod.
// The key is connectionId, value is the actual websocket connection.
var localConnections = struct {
sync.RWMutex
m map[string]*websocket.Conn
}{m: make(map[string]*websocket.Conn)}
// Handler holds dependencies like the database store.
type Handler struct {
store *store.Store
}
func NewHandler(s *store.Store) *Handler {
return &Handler{store: s}
}
// WsHandler handles the WebSocket connection lifecycle.
func (h *Handler) WsHandler(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("Failed to upgrade connection: %v", err)
return
}
connID := uuid.New().String()
// 1. Add to local map
localConnections.Lock()
localConnections.m[connID] = conn
localConnections.Unlock()
// 2. Add to persistent store
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := h.store.AddConnection(ctx, connID); err != nil {
log.Printf("Failed to save connection %s to store: %v", connID, err)
conn.Close()
// Also remove from local map if it was added
localConnections.Lock()
delete(localConnections.m, connID)
localConnections.Unlock()
return
}
// This defer ensures cleanup happens on any exit path of this function.
defer func() {
log.Printf("Closing connection: %s", connID)
conn.Close()
// Clean up from local map
localConnections.Lock()
delete(localConnections.m, connID)
localConnections.Unlock()
// Clean up from persistent store
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := h.store.RemoveConnection(ctx, connID); err != nil {
log.Printf("Failed to remove connection %s from store: %v", connID, err)
}
}()
log.Printf("New connection established: %s", connID)
// Keep the connection alive by handling incoming messages.
// A production implementation would have ping/pong handlers.
for {
// We can read messages here, but for a broadcast service, we might just ignore them.
if _, _, err := conn.ReadMessage(); err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("Unexpected close error for %s: %v", connID, err)
} else {
log.Printf("Connection %s closed gracefully.", connID)
}
break // Exit the loop to trigger the defer cleanup.
}
}
}
type BroadcastMessage struct {
Message string `json:"message"`
}
// BroadcastHandler receives a message and attempts to send it to all clients.
func (h *Handler) BroadcastHandler(w http.ResponseWriter, r *http.Request) {
var msg BroadcastMessage
if err := json.NewDecoder(r.Body).Decode(&msg); err != nil {
http.Error(w, "Invalid request body", http.StatusBadRequest)
return
}
if msg.Message == "" {
http.Error(w, "Message cannot be empty", http.StatusBadRequest)
return
}
// THIS IS THE ARCHITECTURAL FLAW
// The broadcast logic here is incorrect for a multi-pod setup.
// This pod only knows about its own `localConnections`.
// A proper solution requires a message bus (e.g., Redis Pub/Sub, NATS).
// When a message is published, ALL pods receive it and then broadcast
// to their respective local connections.
log.Println("--- WARNING: Naive broadcast implementation ---")
localConnections.RLock()
defer localConnections.RUnlock()
log.Printf("Broadcasting message to %d local connections", len(localConnections.m))
for id, conn := range localConnections.m {
if err := conn.WriteMessage(websocket.TextMessage, []byte(msg.Message)); err != nil {
log.Printf("Failed to write message to connection %s: %v", id, err)
}
}
w.WriteHeader(http.StatusOK)
w.Write([]byte("Broadcast initiated to local connections."))
}
代码剖析:
WsHandler
: 这是连接生命周期管理的核心。当一个新连接建立时,它会:- 生成一个唯一的
connID
。 - 将连接对象
*websocket.Conn
存入一个仅存于当前 Pod 的localConnections
map 中。 - 将
connID
存入 DynamoDB。 - 使用
defer
块确保当函数退出(无论是正常关闭还是错误)时,连接会从本地 map 和 DynamoDB 中被清理。
- 生成一个唯一的
-
BroadcastHandler
: 这里的实现故意暴露了架构的弱点。它只遍历了当前 Pod 的localConnections
map。在一个有10个 Pod 的环境中,一个广播请求只会触达10%的客户端。这是用代码来阐明架构挑战的直接方式。
4. 自动化构建与部署:Tekton CI/CD
Tekton 允许我们在 Kubernetes 内部定义和执行 CI/CD 流水线。
tekton/task-build-and-push.yaml
: 定义一个原子任务,用于构建 Go 应用并将其推送到镜像仓库。
apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
name: go-build-push
spec:
params:
- name: imageUrl
description: The URL of the image to push
type: string
- name: imageTag
description: The tag of the image
type: string
default: "latest"
workspaces:
- name: source
description: The workspace containing the source code
steps:
- name: build
image: golang:1.19-alpine
workingDir: $(workspaces.source.path)
script: |
#!/bin/sh
set -e
echo "--- Building Go application ---"
CGO_ENABLED=0 GOOS=linux go build -v -o app .
echo "--- Build complete ---"
- name: build-and-push-image
image: gcr.io/kaniko-project/executor:v1.9.0
# Kaniko does not require a Docker daemon and runs in userspace.
args:
- "--dockerfile=./Dockerfile"
- "--context=$(workspaces.source.path)"
- "--destination=$(params.imageUrl):$(params.imageTag)"
# Assumes a Kubernetes secret `docker-credentials` is mounted for registry auth.
tekton/pipeline-ws-service.yaml
: 将多个任务串联成一个完整的流水线。
apiVersion: tekton.dev/v1beta1
kind: Pipeline
metadata:
name: ws-service-pipeline
spec:
params:
- name: repoUrl
type: string
- name: revision
type: string
default: "main"
- name: imageUrl
type: string
workspaces:
- name: shared-data
tasks:
- name: fetch-source
taskRef:
name: git-clone
workspaces:
- name: output
workspace: shared-data
params:
- name: url
value: $(params.repoUrl)
- name: revision
value: $(params.revision)
- name: build-image
taskRef:
name: go-build-push
runAfter: [ "fetch-source" ]
workspaces:
- name: source
workspace: shared-data
params:
- name: imageUrl
value: $(params.imageUrl)
这条流水线首先从 Git 仓库拉取代码,然后调用我们之前定义的 go-build-push
任务来构建和推送镜像。一个 PipelineRun
资源被创建时,Tekton 会根据这个定义执行整个流程。这为后续使用 GitOps 工具(如 ArgoCD)自动触发部署铺平了道路。
架构的局限性与未来迭代
当前架构成功地将连接状态与计算实例解耦,实现了应用层的无状态化和水平扩展能力。Caddy 提供了可靠的入口,DynamoDB 提供了高可用的状态存储,Tekton 实现了自动化构建。然而,它并非没有缺陷。
最核心的局限性在于广播机制。如代码注释所强调,依赖单个 Pod 接收请求并试图通知所有客户端是行不通的。一个生产级的解决方案必须引入一个中间消息总线(例如 Redis Pub/Sub、NATS 或 AWS SNS)。架构会演变为:
- 广播 API 接收请求后,不再扫描 DynamoDB,而是将消息发布到一个全局的 Topic。
- 所有
GoApp
实例都订阅这个 Topic。 - 当消息到达时,每个实例将消息分发给它本地管理的 WebSocket 连接。
这种发布/订阅模式将广播操作从 O(N) 的数据库扫描和复杂的跨 Pod 通信,转变为一个高效、解耦的事件驱动流程。
其次,对于超大规模的连接(数百万级),对 DynamoDB 进行全表扫描以进行管理或调试的成本会变得很高。在这种情况下,可能需要考虑更复杂的索引策略,或者将连接信息分片到多个表中。
最后,本方案仅涵盖了 CI 部分。一个完整的 GitOps 流程还需要集成 ArgoCD 或 Flux,监听镜像仓库的变化,并自动将新版本的应用部署到 Kubernetes 集群中。