构建基于Kotlin Multiplatform与Fluentd的跨端统一结构化日志系统


团队内部的移动端和Web应用日志系统正陷入混乱。iOS团队使用OSLog,日志散落在设备控制台;Web前端团队的日志格式五花八门,直接发往一个简陋的Node.js接收服务;安卓团队则有自己的一套标准。当需要追踪一个跨越Web登录、触发后端服务、最终在iOS应用上产生反馈的用户行为时,排查过程变成了一场灾难。日志格式不统一、时间戳不同步、关键上下文(如session_id, trace_id)缺失,使得数据关联分析几乎不可能。

这种混乱的根源在于缺乏一个统一的日志契约和一套跨平台的实现。每个终端都基于自身的技术栈“就近”选择方案,导致了数据孤岛。

方案A: 平台原生工具链 + 服务端聚合

这是最直接的思路。各端继续使用自己熟悉的技术栈。

  • iOS: 使用OSLogSwiftyBeaver,将日志写入本地文件,然后通过一个Sidecar进程或应用内定时任务,将日志文件上传到服务器或直接发送给Fluentd。
  • Web: 使用winstonpino等库,在客户端封装日志逻辑,通过HTTP请求发送到专门的日志API。
  • Fluentd: 部署多个source插件,为不同平台的、格式迥异的日志流配置独立的解析和转换规则(filter)。

优势:

  • 学习成本低,各端工程师无需掌握新技术栈。
  • 可以充分利用各平台生态中最成熟的日志工具。

劣势:

  • Schema强一致性无法保证。 这是致命的。尽管可以制定日志规范文档,但在实际开发中,由于缺乏代码级别的强制约束,日志格式的“漂移”几乎是必然的。一个团队在payload中添加了新字段,另一个团队忘记添加,数据管道的下游就会出现解析错误。
  • 维护成本极高。 Fluentd的配置文件会变得异常臃肿和复杂,充斥着针对不同来源的grok解析、字段重命名和类型转换逻辑。每次上游日志格式的微小变更,都可能需要修改并重新部署Fluentd配置。
  • 跨平台上下文传递困难。trace_id这类需要从Web传递到原生应用再到后端的标识,在各自为政的日志系统中很难实现统一的注入和上报。

方案B: Kotlin Multiplatform统一日志SDK + Fluentd单一管道

这个方案的核心思想是将日志规范从“文档”提升到“代码”,通过一个共享的KMP模块来强制实现。

  • 核心: 使用Kotlin Multiplatform (KMP) 开发一个名为UnifiedLogger的共享模块。这个模块负责:
    1. 定义严格的日志数据结构(LogEvent data class)。
    2. 实现日志的序列化(kotlinx.serialization)。
    3. 封装网络发送逻辑(Ktor HTTP client)。
    4. 提供平台无关的日志记录接口 (log.info(...), log.error(...))。
  • 客户端集成:
    • iOS: KMP模块编译成原生的XCFramework,Swift代码像调用普通的原生库一样调用它。
    • Web/JS: KMP模块编译成JavaScript模块,在React或Vue项目中直接import使用。
  • Fluentd: 只需要一个in_http输入插件。由于所有客户端发送的日志格式完全一致,Fluentd的配置将变得极其简单,只需专注于数据充实、路由和输出,而无需进行复杂的解析和转换。

优势:

  • Schema由代码保证。 LogEvent数据类的定义就是唯一的真相来源。任何不符合该结构的日志在编译期就会被发现,彻底杜绝了格式漂移问题。
  • 维护成本低。 核心日志逻辑集中在一个地方。Fluentd的配置可以保持干净和稳定。
  • 天然支持跨平台上下文。 可以在UnifiedLogger的初始化阶段或公共方法中统一注入session_id, user_id等信息。

劣势:

  • 初始投入较高。 需要投入资源来设计和开发KMP共享模块,并建立相应的CI/CD流程。
  • 引入新的技术栈。 团队成员(特别是前端和iOS工程师)需要对KMP有基本的了解。

对于一个追求长期可维护性和数据质量的系统而言,方案B的优势是压倒性的。一次性的前期投入,换来的是后续数年内的数据一致性和低廉的维护成本。在真实项目中,数据的可靠性永远是第一位的。因此,我们选择方案B。

核心实现概览

以下是整个架构的实现细节,从KMP模块到Swift集成,再到Fluentd配置。

graph TD
    subgraph Client Apps
        A[iOS App / Swift] --> B{UnifiedLogger.xcframework};
        C[Web App / React] --> D{unified-logger.js};
    end

    subgraph KMP Shared Module
        B --> E[KMP Core Logic];
        D --> E;
    end
    
    E -- JSON over HTTP --> F[Fluentd Collector];

    subgraph Logging Pipeline
        F -- Forward Protocol --> G[Fluentd Aggregator];
        G --> H[Elasticsearch / OpenSearch];
        G --> I[Archive Storage S3];
    end

    subgraph Observability
        J[Turbopack-powered Dashboard] -- Query --> H;
    end

1. Kotlin Multiplatform 日志核心模块

项目结构如下:

unified-logger/
├── build.gradle.kts
└── src/
    ├── commonMain/
    │   └── kotlin/
    │       └── com/mycompany/logger/
    │           ├── UnifiedLogger.kt
    │           ├── LogEvent.kt
    │           └── HttpClient.kt
    ├── iosMain/
    │   └── kotlin/
    │       └── com/mycompany/logger/Platform.kt
    └── jsMain/
        └── kotlin/
            └── com/mycompany/logger/Platform.kt

build.gradle.kts (关键部分):

plugins {
    kotlin("multiplatform")
    kotlin("native.cocoapods") // For iOS integration
    kotlin("plugin.serialization") version "1.9.21"
}

kotlin {
    // Target for modern iPhones and simulators
    iosX64()
    iosArm64()
    iosSimulatorArm64()
    
    // JS target for web applications
    js(IR) {
        browser()
        binaries.executable()
    }

    cocoapods {
        summary = "Unified Logger SDK"
        homepage = "..."
        version = "1.0.0"
        ios.deploymentTarget = "13.0"
        framework {
            baseName = "UnifiedLogger"
            isStatic = false
        }
    }

    sourceSets {
        val commonMain by getting {
            dependencies {
                implementation("io.ktor:ktor-client-core:2.3.6")
                implementation("io.ktor:ktor-client-content-negotiation:2.3.6")
                implementation("io.ktor:ktor-serialization-kotlinx-json:2.3.6")
                implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.3")
                implementation("org.jetbrains.kotlinx:kotlinx-datetime:0.5.0")
            }
        }
        val iosMain by creating {
            dependsOn(commonMain)
            dependencies {
                implementation("io.ktor:ktor-client-darwin:2.3.6")
            }
        }
        val jsMain by getting {
            dependencies {
                implementation("io.ktor:ktor-client-js:2.3.6")
            }
        }
        // Wire up actual targets to the intermediate source set
        val iosX64Main by getting { dependsOn(iosMain) }
        val iosArm64Main by getting { dependsOn(iosMain) }
        val iosSimulatorArm64Main by getting { dependsOn(iosMain) }
    }
}

commonMain/kotlin/LogEvent.kt:
这是日志规范的核心,由代码强制约束。

package com.mycompany.logger

import kotlinx.serialization.Serializable
import kotlinx.datetime.Instant

@Serializable
enum class LogLevel {
    DEBUG, INFO, WARN, ERROR, FATAL
}

@Serializable
data class LogEvent(
    val timestamp: Instant,
    val level: LogLevel,
    val service: String, // e.g., "ios-app", "webapp-checkout"
    val eventName: String, // e.g., "user_login_success"
    val message: String,
    val context: Map<String, String>, // session_id, user_id, device_id etc.
    val payload: Map<String, String>  // event-specific data
)

commonMain/kotlin/UnifiedLogger.kt:
这是提供给所有平台的公共API。

package com.mycompany.logger

import io.ktor.client.*
import io.ktor.client.request.*
import io.ktor.http.*
import kotlinx.coroutines.*
import kotlinx.datetime.Clock

// A simple in-memory queue for batching and resilience
private const val BATCH_SIZE = 20
private const val FLUSH_INTERVAL_MS = 15000L

class UnifiedLogger(
    private val endpoint: String,
    private val serviceName: String,
    private val initialContext: Map<String, String>
) {
    private val httpClient: HttpClient = createHttpClient()
    private val logQueue = mutableListOf<LogEvent>()
    private val coroutineScope = CoroutineScope(Dispatchers.Default + SupervisorJob())

    init {
        // Start a background job to periodically flush the log queue
        coroutineScope.launch {
            while (isActive) {
                delay(FLUSH_INTERVAL_MS)
                flushQueue()
            }
        }
    }

    fun info(eventName: String, message: String, payload: Map<String, String> = emptyMap()) {
        log(LogLevel.INFO, eventName, message, payload)
    }
    
    fun error(eventName: String, message: String, payload: Map<String, String> = emptyMap()) {
        log(LogLevel.ERROR, eventName, message, payload)
    }

    private fun log(level: LogLevel, eventName: String, message: String, payload: Map<String, String>) {
        val event = LogEvent(
            timestamp = Clock.System.now(),
            level = level,
            service = serviceName,
            eventName = eventName,
            message = message,
            context = initialContext,
            payload = payload
        )
        
        // A real implementation should use a thread-safe collection
        synchronized(logQueue) {
            logQueue.add(event)
            if (logQueue.size >= BATCH_SIZE) {
                flushQueue()
            }
        }
    }

    private fun flushQueue() {
        val batch = synchronized(logQueue) {
            if (logQueue.isEmpty()) return
            val toSend = ArrayList(logQueue)
            logQueue.clear()
            toSend
        }
        
        // Launch a fire-and-forget job to send the logs
        coroutineScope.launch {
            try {
                // Fluentd's in_http expects a JSON array or newline-delimited JSON objects
                httpClient.post(endpoint) {
                    contentType(ContentType.Application.Json)
                    setBody(batch)
                }
            } catch (e: Exception) {
                // In a production system, failed batches should be requeued or saved to disk
                println("Failed to send log batch: ${e.message}")
            }
        }
    }
}

iosMain/kotlin/Platform.kt:
iOS平台的特定实现。

package com.mycompany.logger

import io.ktor.client.*
import io.ktor.client.engine.darwin.*

// Actual factory function for creating the HTTP client on iOS
actual fun createHttpClient(): HttpClient {
    return HttpClient(Darwin) {
        // iOS specific configurations
        engine {
            configureRequest {
                setAllowsCellularAccess(true)
            }
        }
        // Use the common JSON configuration from commonMain
        apply(commonHttpClientConfig)
    }
}

jsMain中的实现类似,只是引擎换成Js

2. Swift/iOS 集成

构建KMP模块后 (./gradlew packForXcode),会生成一个UnifiedLogger.xcframework。将其拖入Xcode项目。

创建一个Swift外观(Facade)来简化调用。
Logger.swift:

import Foundation
import UnifiedLogger // The name of the framework from KMP

class Logger {
    static let shared = Logger()

    private let logger: UnifiedLogger.UnifiedLogger

    private init() {
t        // These values would typically come from your app's configuration
        let endpoint = "http://your-fluentd-collector:9880/kmp.log"
        let serviceName = "ios-app"
        
        // Gather platform-specific context once
        let initialContext = [
            "device_id": UIDevice.current.identifierForVendor?.uuidString ?? "unknown",
            "os_version": UIDevice.current.systemVersion,
            "app_version": Bundle.main.infoDictionary?["CFBundleShortVersionString"] as? String ?? "unknown"
        ]

        self.logger = UnifiedLogger.UnifiedLogger(
            endpoint: endpoint,
            serviceName: serviceName,
            initialContext: initialContext
        )
    }

    func info(eventName: String, message: String, payload: [String: String] = [:]) {
        logger.info(eventName: eventName, message: message, payload: payload)
    }

    func error(eventName: String, message: String, payload: [String: String] = [:]) {
        logger.error(eventName: eventName, message: message, payload: payload)
    }
}

// Usage in the app
// Logger.shared.info(
//     eventName: "screen_viewed", 
//     message: "User viewed the settings screen",
//     payload: ["screen_name": "Settings"]
// )

3. Fluentd 端到端配置

我们需要两层Fluentd:一层是面向公网的Collector,接收客户端日志;另一层是内部的Aggregator,负责处理和转发。

Collector fluent.conf:

# Collector Node Configuration
# Listens for logs from KMP clients

<source>
  @type http
  port 9880
  bind 0.0.0.0
  body_size_limit 32m
  keepalive_timeout 10s
  
  # The tag is constructed from the path.
  # A POST to /kmp.log will result in tag `kmp.log`
  add_http_headers true
</source>

# The body of our KMP client is a JSON array of log events.
# We need to split this array into individual events.
<filter kmp.log>
  @type parser
  key_name $.
  reserve_data true
  <parse>
    @type json_in_json
    json_key $
    # This tells fluentd to process the body as an array of JSON objects
    # and emit each object as a separate event record.
  </parse>
</filter>

# Add geo-location info based on client IP
<filter kmp.log>
  @type geoip
  geoip_lookup_key "HTTP_X_FORWARDED_FOR"
  <record>
    city ${city.names.en["city"]}
    country_code ${country.iso_code["country"]}
  </record>
</filter>

# Enrich records with collector metadata
<filter kmp.log>
    @type record_transformer
    <record>
        collector_node "#{Socket.gethostname}"
    </record>
</filter>

# Forward to the internal aggregator layer
# Use secure_forward for production environments
<match kmp.log>
  @type forward
  send_timeout 60s
  recover_wait 10s
  hard_timeout 60s
  
  <server>
    name aggregator1
    host fluentd-aggregator-1.internal
    port 24224
    weight 60
  </server>
  <server>
    name aggregator2
    host fluentd-aggregator-2.internal
    port 24224
    weight 40
  </server>

  <buffer>
    @type file
    path /var/log/fluentd/buffer/forward-kmp
    flush_mode interval
    flush_interval 5s
    flush_thread_count 2
    retry_type exponential_backoff
    retry_wait 1s
    retry_max_interval 300s
    retry_forever true
    chunk_limit_size 2M
    total_limit_size 1G
    # This is crucial: if the aggregator is down, logs are buffered to disk
    # instead of being dropped.
  </buffer>
</match>

Aggregator fluent.conf:

# Aggregator Node Configuration
# Receives logs from collectors and routes to destinations

<source>
  @type forward
  port 24224
  bind 0.0.0.0
</source>

# Match logs tagged with kmp.log and send them to multiple outputs
<match kmp.log>
    @type copy
    <store>
        @type elasticsearch
        host "elasticsearch.internal"
        port 9200
        logstash_format true
        logstash_prefix "kmp-logs"
        logstash_dateformat "%Y.%m.%d"
        index_name "kmp-logs"
        type_name "log_event"
        
        # Buffer configuration is critical for resilience
        <buffer>
            @type file
            path /var/log/fluentd/buffer/es-kmp
            flush_interval 10s
            chunk_limit_size 10M
            total_limit_size 10G
            retry_forever true
        </buffer>
    </store>
    <store>
        @type s3
        aws_key_id "YOUR_AWS_KEY"
        aws_sec_key "YOUR_AWS_SECRET"
        s3_bucket "my-company-log-archive"
        s3_region "us-east-1"
        path "logs/kmp/%Y/%m/%d/"
        
        <buffer>
            @type file
            path /var/log/fluentd/buffer/s3-kmp
            timekey 3600 # 1 hour per file
            timekey_wait 10m
            chunk_limit_size 256M
        </buffer>
    </store>
</match>

4. Turbopack驱动的可视化前端

为内部开发者创建一个快速、响应式的日志查看工具至关重要。开发这种内部工具时,迭代速度就是生命线。这就是选择Turbopack的原因。它由Rust编写,其增量编译引擎提供了比Webpack快几个数量级的开发服务器启动和热更新速度,极大地提升了开发体验。

next.config.js:

/** @type {import('next').NextConfig} */
const nextConfig = {
  // Turbopack is activated via the CLI flag: `next dev --turbo`
  // No specific config is needed here to enable it.
  reactStrictMode: true,
};

module.exports = nextConfig;

启动开发服务器: pnpm next dev --turbo

app/page.tsx:
一个简单的组件,用于展示从后端(此处用mock数据模拟)获取的结构化日志。

// app/page.tsx
import React from 'react';

// This would come from an API call to a service that queries Elasticsearch
async function getLogData() {
  // Mock data representing what the API would return
  return [
    {
      _source: {
        timestamp: "2023-11-20T10:30:00.123Z",
        level: "INFO",
        service: "ios-app",
        eventName: "user_login_success",
        message: "User successfully logged in via email.",
        context: { device_id: "ABC-123", user_id: "u-456" },
        payload: { method: "email", duration_ms: "150" },
        city: "San Francisco",
      },
    },
    {
      _source: {
        timestamp: "2023-11-20T10:31:05.456Z",
        level: "ERROR",
        service: "webapp-checkout",
        eventName: "payment_failed",
        message: "Credit card authorization failed.",
        context: { session_id: "XYZ-789", user_id: "u-456" },
        payload: { processor: "stripe", error_code: "card_declined" },
        city: "New York",
      },
    },
  ];
}

export default async function LogViewerPage() {
  const logs = await getLogData();

  return (
    <div className="p-8 font-sans">
      <h1 className="text-2xl font-bold mb-4">Unified Log Viewer</h1>
      <div className="overflow-x-auto">
        <table className="min-w-full bg-white border border-gray-300">
          <thead className="bg-gray-100">
            <tr>
              <th className="py-2 px-4 border-b">Timestamp</th>
              <th className="py-2 px-4 border-b">Service</th>
              <th className="py-2 px-4 border-b">Event Name</th>
              <th className="py-2 px-4 border-b">Message</th>
              <th className="py-2 px-4 border-b">Context</th>
              <th className="py-2 px-4 border-b">Payload</th>
            </tr>
          </thead>
          <tbody>
            {logs.map((log, index) => (
              <tr key={index} className="hover:bg-gray-50">
                <td className="py-2 px-4 border-b text-sm text-gray-600">{log._source.timestamp}</td>
                <td className="py-2 px-4 border-b text-sm"><span className="bg-blue-100 text-blue-800 px-2 py-1 rounded-full">{log._source.service}</span></td>
                <td className="py-2 px-4 border-b text-sm font-mono">{log._source.eventName}</td>
                <td className="py-2 px-4 border-b text-sm">{log._source.message}</td>
                <td className="py-2 px-4 border-b text-sm font-mono whitespace-pre-wrap">{JSON.stringify(log._source.context, null, 2)}</td>
                <td className="py-2 px-4 border-b text-sm font-mono whitespace-pre-wrap">{JSON.stringify(log._source.payload, null, 2)}</td>
              </tr>
            ))}
          </tbody>
        </table>
      </div>
    </div>
  );
}

架构的扩展性与局限性

此架构的核心优势在于其围绕共享KMP模块建立的“契约即代码”原则。它可以轻松扩展到Android、桌面JVM或Node.js后端,只需在KMP项目中添加相应的target即可,而Fluentd的配置几乎无需改动。数据管道也可以进一步丰富,例如加入fluent-plugin-prometheus来暴露日志流的监控指标,或使用fluent-plugin-rewrite-tag-filter根据日志内容进行更精细的动态路由。

不过,当前方案也存在一些局限性:

  1. 离线场景处理: 当前KMP模块的实现是一个简单的内存队列。如果应用在离线状态下产生日志,或者在发送失败时应用被关闭,这些日志会丢失。一个生产级的SDK需要实现一个持久化队列,例如在移动端使用SQLite或文件系统来缓存日志,并在网络恢复后重新发送。
  2. 网络开销: 对于日志量极大的应用,特别是移动端,频繁的HTTP请求可能会消耗用户的流量和电量。可以考虑的优化包括更激进的批处理策略、使用压缩(如Gzip)传输,甚至评估切换到更高效的协议,如基于UDP的传输或gRPC。
  3. KMP生态与构建复杂性: KMP虽然强大,但其生态系统仍在发展中。管理多平台的构建配置、依赖项,以及处理不同平台间的细微差异,对团队的技能提出了更高的要求。一个常见的错误是试图在commonMain中塞入过多逻辑,而忽略了利用actual/expect机制来处理平台相关的最佳实践。
  4. Turbopack的成熟度: 虽然Turbopack在开发环境中的速度优势显著,但它仍在Beta阶段。对于生产构建,可能还需要依赖Next.js默认的Webpack构建器,直到Turbopack达到完全稳定。在技术选型时,需明确其使用边界是“提升开发效率”,而非“生产构建的唯一选择”。

  目录