使用 Elixir Phoenix 构建连接 Swift 原生客户端与 LiveView 仪表盘的实时 MLOps 控制平面


技术痛点

在维护一个中等规模的 MLOps 平台时,我们面临一个日益尖锐的矛盾:模型训练和数据处理任务是长周期的、异步的,但开发人员和运维团队对任务状态的感知却要求是实时的。传统的方案,无论是让前端轮询 API,还是依赖批处理日志分析,都存在明显的延迟和资源浪费。当一个关键的训练任务失败时,我们希望在秒级内获得反馈,而不是在几分钟后通过刷新页面或查看日志聚合系统才发现。

团队的需求是分裂的:平台管理员需要一个全局的 Web 仪表盘来监控所有正在运行的任务、资源利用率和系统健康状况;而算法工程师则更偏爱能够无缝集成到他们 macOS 开发环境中的原生工具,最好能提供系统级通知,让他们在任务完成或失败时立即得到提醒,而无需时刻关注浏览器标签页。

这就构成了一个具体的技术挑战:如何构建一个统一的后端,既能为 Web 端提供丰富的实时交互体验,又能为原生桌面客户端提供稳定、低延迟的状态同步和推送能力?这个后端必须能够高效地管理成百上千个并发任务的状态,并以极低的开销将状态变更实时广播给所有订阅的客户端,无论它们是浏览器还是原生应用。

初步构想与技术选型

问题的核心在于实时通信和状态管理。我们需要一个能够充当“实时消息中枢”的后端。经过评估,我们排除了基于轮询的 RESTful 架构,因为它无法满足实时性要求且效率低下。WebSocket 是显而易见的选择,但这仅仅是传输协议。真正的挑战在于服务端如何优雅地处理成千上万的持久连接,并对后端业务状态(例如一个 MLOps 任务的生命周期)进行建模。

这正是 Elixir 和其背后的 OTP(开放电信平台)的闪光点。

  1. 后端: Elixir 与 Phoenix Framework

    • 原因: Elixir 构建在 BEAM 虚拟机之上,其轻量级进程模型使得处理大量并发连接的成本极低。一个 Elixir 应用可以轻松维持数十万甚至上百万个 WebSocket 连接。OTP 提供的 GenServer 行为模式是为长生命周期、有状态的实体(比如一个 MLOps 任务)建模的完美工具。每个任务都可以映射为一个独立的 GenServer 进程,自我管理其状态,完全隔离,并且由 Supervisor 监督,具备极强的容错能力。Phoenix Framework 内置的 Channels 功能,为 WebSocket 通信提供了强大的抽象,可以轻松实现基于主题的发布/订阅模型。
  2. Web 仪表盘: Phoenix LiveView

    • 原因: 既然后端选择了 Phoenix,使用 LiveView 构建管理员仪表盘就成了自然而然的选择。LiveView 允许我们完全在 Elixir 中构建丰富的、实时的用户界面,无需编写一行 JavaScript。它通过一个持久化的 WebSocket 连接工作,服务器端的状态变更能够被自动地、高效地推送到前端并差异化更新 DOM。这对于需要实时展示任务列表、进度条和日志流的 MLOps 仪表盘来说,简直是绝配。这里的 “UI 组件库” 概念,被 LiveView 以一种服务端渲染的方式重新诠释了。
  3. 原生客户端: Swift

    • 原因: 我们的算法团队主要使用 macOS。提供一个原生的 Swift 应用能带来浏览器无法比拟的体验:更快的响应速度、更低的资源占用、与操作系统的深度集成(如 Dock 图标的徽章、系统通知)。Swift 需要一个可靠的 WebSocket 客户端库来与 Phoenix Channels 对话。这部分存在挑战,因为我们需要在 Swift 中手动实现 Phoenix Channel 协议的部分逻辑,但这部分工作是一次性的,带来的用户体验提升是巨大的。

整体架构如下:

graph TD
    subgraph "客户端"
        A[Swift macOS App] -- WebSocket --> C;
        B[Phoenix LiveView Dashboard] -- WebSocket --> C;
    end

    subgraph "Elixir 后端"
        C(Phoenix Endpoint/Channel) -- PubSub --> D;
        C -- 调用 --> E[Job Supervisor];
        E -- 启动/监督 --> F{MLJob GenServer 1};
        E -- 启动/监督 --> G{MLJob GenServer 2};
        F -- 广播状态 --> D[Phoenix.PubSub];
        G -- 广播状态 --> D[Phoenix.PubSub];
    end

    F -- 执行 --> H[外部 ML 任务];
    G -- 执行 --> I[外部 ML 任务];

    style A fill:#D6EAF8,stroke:#3498DB
    style B fill:#D5F5E3,stroke:#2ECC71

这个架构的核心是 Elixir/OTP,它既是任务生命周期的管理器,也是状态变更的广播中心,而 LiveView 和 Swift 客户端则是这个中心的两个不同“显示器”。

步骤化实现

1. 核心状态管理:MLJob GenServer

我们首先需要为每个 MLOps 任务创建一个状态机。在 Elixir 中,GenServer 是实现这一点的标准方式。这个 GenServer 将负责管理单个任务的整个生命周期,从 pendingrunning,再到 completedfailed

这里的坑在于,我们不能让 GenServer 自己去执行耗时的计算任务,否则会阻塞进程消息邮箱。正确的做法是 GenServer 作为一个协调者,启动一个外部进程或发送一个命令到任务执行系统(如 Kubernetes),然后异步地接收结果。为了简化示例,我们将使用 Process.send_after/3 来模拟一个耗时的异步任务。

lib/ml_ops/job.ex

defmodule MLOps.Job do
  use GenServer
  alias Phoenix.PubSub

  # 客户端API
  def start_link(job_id) do
    GenServer.start_link(__MODULE__, %{job_id: job_id}, name: via_tuple(job_id))
  end

  def get_state(job_id) do
    GenServer.call(via_tuple(job_id), :get_state)
  end

  # 服务端回调
  @impl true
  def init(args) do
    # 初始化状态
    initial_state = %{
      job_id: args.job_id,
      status: :pending,
      progress: 0,
      logs: ["Job #{args.job_id} initialized."],
      result: nil
    }

    # 立即启动任务
    schedule_work()

    {:ok, initial_state}
  end

  @impl true
  def handle_call(:get_state, _from, state) do
    {:reply, state, state}
  end

  @impl true
  def handle_info(:work_started, state) do
    new_state =
      state
      |> Map.put(:status, :running)
      |> put_log("Execution started. Simulating training process...")

    broadcast_change(new_state)

    # 模拟10个步骤,每秒一个
    for i <- 1..10 do
      Process.send_after(self(), {:work_progress, i * 10}, i * 1000)
    end
    Process.send_after(self(), :work_finished, 11 * 1000)

    {:noreply, new_state}
  end

  @impl true
  def handle_info({:work_progress, progress}, state) do
    new_state =
      state
      |> Map.put(:progress, progress)
      |> put_log("Progress update: #{progress}%")

    broadcast_change(new_state)
    {:noreply, new_state}
  end

  @impl true
  def handle_info(:work_finished, state) do
    # 模拟任务成功或失败
    result = if :rand.uniform() > 0.3, do: :completed, else: :failed
    final_log = "Job finished with status: #{result}"

    new_state =
      state
      |> Map.put(:status, result)
      |> Map.put(:progress, 100)
      |> Map.put(:result, %{accuracy: :rand.uniform()})
      |> put_log(final_log)

    broadcast_change(new_state)
    {:noreply, new_state}
  end

  # --- 私有函数 ---

  defp schedule_work do
    # 在真实项目中,这里会与外部系统交互
    # 为避免阻塞,我们异步发送消息给自己来启动工作
    Process.send_after(self(), :work_started, 500)
  end

  defp put_log(state, message) do
    timestamp = DateTime.utc_now() |> to_string()
    log_entry = "[#{timestamp}] #{message}"
    update_in(state.logs, &([log_entry | &1]))
  end

  # 广播是核心,它将状态同步到所有客户端
  defp broadcast_change(state) do
    topic = "job:#{state.job_id}"
    # 使用Phoenix的PubSub将状态广播到指定主题
    PubSub.broadcast(MLOps.PubSub, topic, {:job_updated, state})
  end

  defp via_tuple(job_id), do: {:via, Registry, {MLOps.JobRegistry, job_id}}
end

我们使用 Registry 来通过 job_id 动态查找 GenServer 进程,这比直接使用进程名更灵活。每次状态变更,broadcast_change/1 函数都会通过 Phoenix PubSub 将最新的状态广播到与 job_id 关联的主题上。这是整个实时系统的脉搏。

2. 通信枢纽:Phoenix Channel

接下来,我们定义 Phoenix Channel,作为客户端(无论是 LiveView 还是 Swift)接入系统的网关。

lib/ml_ops_web/channels/job_channel.ex

defmodule MLOpsWeb.JobChannel do
  use Phoenix.Channel
  alias MLOps.Job

  # 客户端尝试加入 "job:<id>" 频道时调用
  def join("job:" <> job_id, _payload, socket) do
    # 在真实项目中,这里应该有认证和授权逻辑
    # 检查用户是否有权限订阅此job_id

    # 启动对应的Job GenServer (如果它还没启动的话)
    # Supervisor会确保一个job_id只有一个进程
    case MLOps.Jobs.start_job(job_id) do
      {:ok, _pid} ->
        # 加入成功后,立即将当前任务状态发送给新加入的客户端
        # 这确保了客户端UI能立即显示正确状态,而不是等待下一次更新
        current_state = Job.get_state(job_id)
        push(socket, "job_updated", current_state)

        {:ok, socket}
      {:error, {:already_started, _pid}} ->
        # 进程已存在,同样是正常情况
        current_state = Job.get_state(job_id)
        push(socket, "job_updated", current_state)
        
        {:ok, socket}
      _ ->
        {:error, %{reason: "failed_to_start_job"}}
    end
  end

  # 客户端断开连接时的清理逻辑 (可选)
  @impl true
  def terminate(reason, socket) do
    # 可以在这里记录日志
    :ok
  end
end

user_socket.ex 中,我们需要定义 channel 路由:
lib/ml_ops_web/channels/user_socket.ex

defmodule MLOpsWeb.UserSocket do
  use Phoenix.Socket

  # ... 其他代码
  channel "job:*", MLOpsWeb.JobChannel

  # ... 其他代码
end

这个 Channel 的设计很关键:当客户端 join 成功时,它会立即拉取一次任务的当前状态并推送给该客户端。这避免了客户端加入后需要等待下一次状态广播才能更新UI的尴尬情况。

3. Web 仪表盘:Phoenix LiveView

现在,为平台管理员创建一个实时仪表盘。我们将创建一个 LiveView 页面,用于显示特定任务的详细信息。

lib/ml_ops_web/live/job_live.ex

defmodule MLOpsWeb.Live.JobLive do
  use MLOpsWeb, :live_view
  alias MLOps.Job
  alias Phoenix.PubSub

  @impl true
  def mount(%{"id" => job_id}, _session, socket) do
    # 订阅与job_id相关的主题
    if connected?(socket), do: PubSub.subscribe(MLOps.PubSub, "job:#{job_id}")

    # 获取初始状态
    initial_state = Job.get_state(job_id)

    {:ok, assign(socket, :job, initial_state)}
  end

  # 核心:处理从PubSub接收到的广播消息
  @impl true
  def handle_info({:job_updated, job_state}, socket) do
    # 将新状态更新到socket的assigns中,LiveView会自动重渲染变化的UI部分
    {:noreply, assign(socket, :job, job_state)}
  end

  @impl true
  def render(assigns) do
    ~H"""
    <div class="p-8 font-sans">
      <h1 class="text-3xl font-bold">ML Job Monitor: <span class="font-mono text-blue-600"><%= @job.job_id %></span></h1>
      <div class="mt-4 p-4 border rounded-lg shadow-md bg-gray-50">
        <h2 class="text-xl">Status: <%= render_status(@job.status) %></h2>
        <div class="w-full bg-gray-200 rounded-full h-4 mt-2">
          <div class="bg-green-500 h-4 rounded-full" style={"width: #{@job.progress}%"}></div>
        </div>
        <p class="text-sm text-gray-600 mt-1"><%= @job.progress %>% complete</p>
      </div>

      <div class="mt-6">
        <h3 class="text-lg font-semibold">Execution Logs</h3>
        <div class="mt-2 p-4 h-64 font-mono text-sm bg-black text-white rounded-md overflow-y-auto flex flex-col-reverse">
          <%= for log <- @job.logs do %>
            <p><%= log %></p>
          <% end %>
        </div>
      </div>
    </div>
    """
  end

  defp render_status(:pending), do: ~H|<span class="text-gray-500">Pending</span>|
  defp render_status(:running), do: ~H|<span class="text-yellow-600 animate-pulse">Running...</span>|
  defp render_status(:completed), do: ~H|<span class="text-green-600">Completed</span>|
  defp render_status(:failed), do: ~H|<span class="text-red-600">Failed</span>|
end

这个 LiveView 组件非常简洁。mount/3 负责在组件加载时获取初始数据并订阅 PubSub 主题。handle_info/2 是魔法发生的地方——每当 Job GenServer 广播更新时,这个函数就会被调用,它简单地更新 socketassigns,LiveView 框架会智能地将变更推送到浏览器。render/1 模板中的所有动态部分都会自动更新。这就是 LiveView 作为服务端 UI 组件库的强大之处。

4. 原生客户端:Swift 集成

这是最具挑战性但也是价值最高的部分。我们需要让一个 Swift 应用“说” Phoenix Channel 协议。这个协议本质上是建立在 WebSocket 之上的一套 JSON 消息规范。我们需要手动构造 phx_join, heartbeat 等消息。

我们将使用 Apple 的原生 URLSessionWebSocketTask

PhoenixSocketClient.swift

import Foundation
import Combine

// 定义Phoenix Channel协议的消息结构
struct PhoenixMessage: Codable {
    let topic: String
    let event: String
    let payload: [String: AnyCodable]
    let ref: String?
    
    enum CodingKeys: String, CodingKey {
        case topic, event, payload, ref
    }
}

// 帮助处理异构JSON payload
struct AnyCodable: Codable {
    let value: Any
    // ... 实现 Codable 协议
}

class PhoenixSocketClient: ObservableObject {
    private var webSocketTask: URLSessionWebSocketTask?
    private var heartbeatTimer: Timer?
    private var ref: Int = 0
    
    @Published var jobState: [String: AnyCodable]?
    @Published var isConnected: Bool = false

    private let jobID: String
    private let socketURL: URL

    init(jobID: String) {
        self.jobID = jobID
        // 确保你的 Phoenix endpoint 配置正确
        self.socketURL = URL(string: "ws://localhost:4000/socket/websocket?vsn=2.0.0")!
    }

    func connect() {
        let session = URLSession(configuration: .default)
        webSocketTask = session.webSocketTask(with: socketURL)
        webSocketTask?.resume()
        isConnected = true
        
        receiveMessages()
        joinChannel()
        startHeartbeat()
    }

    func disconnect() {
        heartbeatTimer?.invalidate()
        webSocketTask?.cancel(with: .goingAway, reason: nil)
        isConnected = false
    }

    private func receiveMessages() {
        webSocketTask?.receive { [weak self] result in
            switch result {
            case .failure(let error):
                print("WebSocket receiving error: \(error)")
                self?.isConnected = false
            case .success(let message):
                switch message {
                case .string(let text):
                    // 核心解析逻辑
                    self?.handleIncomingMessage(text)
                case .data(let data):
                    print("Received binary data: \(data)")
                @unknown default:
                    fatalError()
                }
                // 持续监听
                self?.receiveMessages()
            }
        }
    }
    
    private func handleIncomingMessage(_ text: String) {
        guard let data = text.data(using: .utf8),
              let json = try? JSONDecoder().decode(PhoenixMessage.self, from: data) else {
            print("Failed to decode message: \(text)")
            return
        }

        DispatchQueue.main.async {
            switch json.event {
            case "phx_reply":
                // 可以处理 join 成功或失败的回复
                print("Received reply for ref \(json.ref ?? "")")
            case "job_updated":
                // 这就是我们需要的实时状态!
                self.jobState = json.payload
            default:
                print("Received unhandled event: \(json.event)")
            }
        }
    }

    private func sendMessage(topic: String, event: String, payload: [String: AnyCodable] = [:]) {
        self.ref += 1
        let message = PhoenixMessage(topic: topic, event: event, payload: payload, ref: String(ref))
        
        guard let jsonData = try? JSONEncoder().encode(message),
              let jsonString = String(data: jsonData, encoding: .utf8) else {
            return
        }
        
        webSocketTask?.send(.string(jsonString)) { error in
            if let error = error {
                print("WebSocket sending error: \(error)")
            }
        }
    }

    private func joinChannel() {
        let topic = "job:\(jobID)"
        sendMessage(topic: topic, event: "phx_join")
    }

    private func startHeartbeat() {
        heartbeatTimer = Timer.scheduledTimer(withTimeInterval: 30.0, repeats: true) { [weak self] _ in
            self?.sendMessage(topic: "phoenix", event: "heartbeat")
        }
    }
}

在 SwiftUI 视图中,我们可以这样使用它:

JobStatusView.swift

import SwiftUI

struct JobStatusView: View {
    @StateObject private var client = PhoenixSocketClient(jobID: "native-swift-job-001")

    var body: some View {
        VStack(alignment: .leading, spacing: 16) {
            Text("ML Job Monitor (SwiftUI)")
                .font(.largeTitle)

            if client.isConnected {
                if let state = client.jobState {
                    // 解析并显示状态
                    Text("Job ID: \(state["job_id"]?.value as? String ?? "N/A")")
                    Text("Status: \(state["status"]?.value as? String ?? "N/A")")
                    
                    if let progress = state["progress"]?.value as? Int {
                        ProgressView(value: Double(progress), total: 100.0)
                        Text("\(progress)%")
                    }
                    // ... 还可以显示日志等
                } else {
                    Text("Waiting for job state...")
                }
            } else {
                Text("Disconnected")
            }
        }
        .padding()
        .onAppear(perform: client.connect)
        .onDisappear(perform: client.disconnect)
    }
}

这段 Swift 代码是整个架构的点睛之笔。它证明了 Elixir/Phoenix 后端不仅仅是为 Web 生态系统服务的,它的实时能力可以通过标准协议(WebSocket)无缝地扩展到任何客户端技术栈,包括高性能的桌面原生应用。在真实项目中,还可以进一步封装,比如增加对系统通知的调用,当任务状态变为 completedfailed 时,给用户发送一个桌面通知。

遗留问题与未来迭代

这个实现验证了核心架构的可行性,但在投入生产前,还有几个关键点需要完善。

  1. 认证与授权: 目前的 Channel join 是开放的。生产环境必须在 UserSocketconnect/3 回调中集成认证逻辑(例如,验证 JWT),并在 JobChanneljoin/3 中检查该用户是否有权限订阅特定 job_id 的更新。

  2. 状态持久化: GenServer 的状态是存在于内存中的。如果 Elixir 节点重启,所有任务状态都会丢失。必须引入持久层(如 PostgreSQL)。GenServer 的角色应转变为数据库记录在内存中的“代理”或“缓存”。在 init 时从数据库加载状态,在状态变更时,除了广播,还要异步地将变更写入数据库。可以使用 Ecto 来完成这项工作。

  3. 任务执行解耦: Process.send_after 只是一个模拟。一个健壮的 MLOps 平台需要一个真正的任务执行后端。可以将 GenServer 与 Oban 这样的 Elixir 后台作业处理库集成,由 Oban 负责任务的持久化、重试和实际执行(例如通过端口或调用外部脚本)。GenServer 则订阅 Oban 的作业事件来更新自身状态。

  4. 客户端连接健壮性: Swift 客户端中的 WebSocket 连接可能会因为网络问题中断。PhoenixSocketClient 需要实现一套带指数退避的自动重连机制,以确保在网络恢复后能无缝地重新订阅并同步到最新状态。Phoenix Channel 协议本身对重连有很好的支持。

这套架构的真正价值在于其统一性和扩展性。Elixir/OTP 提供了一个极其稳固且高效的实时核心,无论是通过 LiveView 快速构建功能强大的 Web UI,还是服务于对体验要求更高的原生应用,这个核心都能提供一致、可靠、低延迟的数据同步能力。这解决了跨技术栈异构系统在实时场景下的一个核心痛点。


  目录