构建基于 CDC 与 Babel 动态转换的 SQL 到 Algolia 实时同步管道


项目的搜索功能改造需求拖了两个季度,终于轮到了。核心问题很典型:一个庞大的单体应用,后端是 PostgreSQL,内置的全文搜索功能又慢又原始,无法满足运营团队对搜索体验(如 typo tolerance、分面搜索)日益增长的需求。技术选型很快敲定,使用 Algolia 来提供高性能的搜索服务。

真正的挑战不在于接入 Algolia,而在于数据同步。运营团队提出的一个棘手需求是:他们需要频繁调整索引到 Algolia 的数据结构,比如合并 first_namelast_name 字段,根据商品状态动态添加 tag,或者格式化价格展示。他们不希望每次微调都依赖工程团队发版。传统的夜间批量同步延迟太高,而简单的数据库触发器方案又会和主业务逻辑耦合,维护性极差。

我们需要的是一个近乎实时的、解耦的、并且允许业务方动态配置转换逻辑的数据同步管道。

初步构想与架构权衡

第一反应是利用数据库的逻辑复制功能,也就是变更数据捕获 (Change Data Capture, CDC)。这能让我们以事件流的方式消费数据库的所有变更,完全与主业务应用解耦。Debezium 是这个领域的首选,它可以将 Postgres 的 WAL (Write-Ahead Log) 变更解析成结构化事件,推送到 Kafka 中。

消费端,一个独立的服务从 Kafka 拉取变更事件,然后写入 Algolia。到这里,架构的骨架基本清晰了。

graph TD
    A[PostgreSQL] -- WAL --> B[Debezium Connector];
    B -- CDC Events --> C[Apache Kafka Topic];
    C --> D[Sync Service];
    D -- Transformed Data --> E[Algolia API];
    subgraph Kafka Connect
        B
    end

但关键的“动态转换逻辑”还没解决。方案有几个:

  1. 硬编码: 在同步服务中写死转换逻辑。最简单,但完全不满足动态配置的需求。
  2. 配置化: 设计一套基于 JSON 的规则引擎。灵活度有限,对于复杂逻辑(如条件判断、字符串处理)会变得异常臃肿。
  3. 嵌入式脚本语言: 比如 Lua。学习曲线和生态是个问题。

最终,我们冒出了一个更大胆的想法:为什么不直接让运营团队用 JavaScript 来写转换逻辑?JS 的表达能力足够强,并且前端和运营的同事也相对熟悉。这里的坑在于,我们不能每次修改脚本都重启同步服务。服务必须能动态加载并执行这些脚本。更进一步,为了跟上前端技术栈,我们希望能支持 ESNext 语法。

这就是 Babel 登场的契机。我们可以在 Node.js 同步服务中,实时加载用户提供的 ESNext 脚本字符串,使用 Babel 的 API 将其动态转换为 ES5 代码,然后在一个安全的沙箱环境中执行。

这个方案听起来有些疯狂,但在工程上是完全可行的。它将数据转换的灵活性提升到了极致,代价是需要处理好安全性和性能。

搭建数据源头:Postgres, Debezium 与 Kafka

首先,我们需要一个完整的本地环境来模拟生产。使用 Docker Compose 是最直接的方式。

docker-compose.yml:

version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.3.0
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

  postgres:
    image: debezium/postgres:14
    container_name: postgres
    ports:
      - "5432:5432"
    environment:
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=password
      - POSTGRES_DB=app_db
    volumes:
      - ./pg_data:/var/lib/postgresql/data

  connect:
    image: debezium/connect:2.1
    container_name: connect
    ports:
      - "8083:8083"
    depends_on:
      - kafka
      - postgres
    environment:
      - BOOTSTRAP_SERVERS=kafka:29092
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
      - CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
      - CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter

启动后,需要为 PostgreSQL 准备一张表,并开启逻辑复制。

-- Connect to the postgres container and run psql
-- \c app_db user

CREATE TABLE products (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    description TEXT,
    price_cents INT NOT NULL,
    stock_quantity INT NOT NULL,
    is_active BOOLEAN DEFAULT true,
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- Debezium needs this publication
CREATE PUBLICATION dbz_publication FOR ALL TABLES;

接下来,通过 Kafka Connect 的 REST API 注册 Debezium 的 PostgreSQL 连接器。

register-connector.sh:

#!/bin/bash
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
localhost:8083/connectors/ -d '{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "user",
    "database.password": "password",
    "database.dbname": "app_db",
    "database.server.name": "pg-server",
    "table.include.list": "public.products",
    "plugin.name": "pgoutput",
    "publication.name": "dbz_publication",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "slot.name": "debezium_slot_products",
    "snapshot.mode": "initial"
  }
}'

执行这个脚本后,Debezium 会先对 products 表进行一次快照,然后开始监听 WAL 的实时变更。所有变更都会被发送到名为 pg-server.public.products 的 Kafka topic 中。

核心:Babel 驱动的动态转换服务

这是整个系统的核心。我们将用 Node.js 和 kafkajs 构建消费者服务。

项目结构:

sync-service/
├── src/
│   ├── consumer.js         # Kafka consumer logic
│   ├── transformation.js   # Babel + VM2 transformation engine
│   └── algoliaWriter.js    # Algolia client wrapper
├── scripts/
│   └── product_transformer.js # User-defined transformation script (example)
├── package.json
└── ...

首先安装依赖:

npm install kafkajs algoliasearch @babel/core @babel/preset-env vm2 pino

用户定义的转换脚本

这是我们将暴露给运营团队的配置。他们可以编写这样的 ESNext 脚本,并将其存储在配置中心或者一个专用的数据库表中。

scripts/product_transformer.js:

// This script is executed in a sandboxed environment.
// It receives a `payload` object from Debezium and must return an object.
// The returned object has two keys:
//   - objectID: The unique ID for the Algolia record.
//   - record: The data to be indexed, or `null` to delete the record.

function transform(payload) {
  // Debezium payload structure: { before: {...}, after: {...}, op: 'c'|'u'|'d'|'r' }
  const { op, after, before } = payload;
  
  if (op === 'd') {
    // Handle deletion
    return {
      objectID: before.id,
      record: null, // Returning null signifies deletion in Algolia
    };
  }

  // For create ('c'), update ('u'), or read from snapshot ('r')
  const source = after;
  
  // A common mistake is not checking for active records.
  // We only want to index active products.
  if (!source.is_active) {
    // If an existing product becomes inactive, we should delete it from the index.
    return {
      objectID: source.id,
      record: null,
    };
  }

  // The core transformation logic
  const record = {
    name: source.name,
    description: source.description || '', // Ensure no null values
    // A nice feature: combining fields for better searchability
    search_meta: `${source.name} ${source.description || ''}`.trim(),
    
    // Formatting price from cents to a float
    price: parseFloat((source.price_cents / 100).toFixed(2)),
    
    // Generating dynamic tags based on stock level
    tags: [],
    
    stock: source.stock_quantity,
    
    // ISO 8601 format for Algolia's timestamp filters
    last_updated: new Date(source.updated_at).toISOString(),
  };

  if (source.stock_quantity === 0) {
    record.tags.push('out_of_stock');
  } else if (source.stock_quantity < 10) {
    record.tags.push('low_stock');
  }

  return {
    objectID: source.id,
    record: record,
  };
}

// The sandbox requires us to export the main function.
module.exports = { transform };

这个脚本的健壮性至关重要。它必须处理所有 op 类型,并且对 null 值有防御性编程。

动态转换引擎 transformation.js

这是 Babel 和 vm2 结合的地方。vm2 提供了一个安全的沙箱,防止用户脚本访问文件系统或执行恶意代码。

// src/transformation.js
const { NodeVM } = require('vm2');
const Babel = require('@babel/core');
const pino = require('pino');

const logger = pino({ level: 'info' });

class TransformationEngine {
  constructor() {
    // In a real project, the script content would be fetched from a database
    // or a configuration service. Here we simulate it.
    // We also cache the transpiled script for performance.
    this.scriptCache = new Map();
  }

  // Simulates fetching a script for a given table/topic
  async getScript(topic) {
    // For this example, we always return the same script.
    // In production, you'd have logic to map topic to a script.
    const fs = require('fs');
    const path = require('path');
    return fs.readFileSync(path.join(__dirname, '../scripts/product_transformer.js'), 'utf-8');
  }

  transpile(scriptContent) {
    try {
      const result = Babel.transformSync(scriptContent, {
        presets: ['@babel/preset-env'],
        // Avoid creating a big `require` polyfill for our simple use case.
        sourceType: 'script' 
      });
      return result.code;
    } catch (error) {
      logger.error({ err: error }, 'Babel transpilation failed.');
      throw new Error(`Babel transpilation failed: ${error.message}`);
    }
  }

  async run(topic, payload) {
    let compiledScript = this.scriptCache.get(topic);

    if (!compiledScript) {
      try {
        const scriptContent = await this.getScript(topic);
        compiledScript = this.transpile(scriptContent);
        this.scriptCache.set(topic, compiledScript);
        logger.info(`Successfully compiled and cached script for topic: ${topic}`);
      } catch (err) {
        // If compilation fails, we can't process messages for this topic.
        logger.error({ err, topic }, 'Failed to load or transpile script.');
        // We throw to let the consumer handle the error (e.g., move to DLQ).
        throw err;
      }
    }

    const vm = new NodeVM({
      console: 'inherit',
      sandbox: {},
      require: {
        external: false, // Disallow any 'require' in the script
      },
      timeout: 1000, // Protect against infinite loops
    });

    try {
      // The transpiled code will be in a simple, self-contained script format.
      // We execute it and get the exports.
      const scriptModule = vm.run(compiledScript);
      
      // A common pitfall is the script not exporting the transform function correctly.
      if (typeof scriptModule.transform !== 'function') {
        throw new Error('Script must export a "transform" function.');
      }
      
      return scriptModule.transform(payload);
    } catch (error) {
      logger.error({ err: error, payload }, 'Sandbox execution failed.');
      throw new Error(`Sandbox execution failed: ${error.message}`);
    }
  }
}

module.exports = new TransformationEngine();

Algolia 写入模块 algoliaWriter.js

这个模块负责与 Algolia API 通信,并处理批量写入以提高效率。

// src/algoliaWriter.js
const algoliasearch = require('algoliasearch');
const pino = require('pino');

const logger = pino({ level: 'info' });

class AlgoliaWriter {
  constructor(appId, apiKey, indexName) {
    if (!appId || !apiKey || !indexName) {
      throw new Error('Algolia credentials and index name are required.');
    }
    const client = algoliasearch(appId, apiKey);
    this.index = client.initIndex(indexName);
    logger.info(`Algolia writer initialized for index: ${indexName}`);
  }

  async upsert(record) {
    if (!record || !record.objectID) {
      logger.warn({ record }, 'Invalid record for upsert, missing objectID.');
      return;
    }
    try {
      await this.index.saveObject(record, { autoGenerateObjectIDIfNotExist: false });
    } catch (error) {
      logger.error({ err: error, objectID: record.objectID }, 'Failed to save object to Algolia.');
      throw error; // Propagate error for retry logic
    }
  }

  async remove(objectID) {
    if (!objectID) {
      logger.warn('Invalid objectID for removal.');
      return;
    }
    try {
      await this.index.deleteObject(objectID);
    } catch (error) {
      logger.error({ err: error, objectID }, 'Failed to delete object from Algolia.');
      throw error;
    }
  }
  
  // In a high-throughput system, you'd implement batching here.
  // For simplicity, we are doing single operations.
  async process(transformedData) {
    const { objectID, record } = transformedData;
    if (record) {
      // The record has data, so we create or update it.
      await this.upsert({ ...record, objectID });
    } else {
      // The record is null, signifying a deletion.
      await this.remove(objectID);
    }
  }
}

// Remember to use environment variables for credentials in production.
module.exports = new AlgoliaWriter(
  process.env.ALGOLIA_APP_ID,
  process.env.ALGOLIA_API_KEY,
  'products_index'
);

Kafka 消费者 consumer.js

最后,将所有部分串联起来。

// src/consumer.js
const { Kafka } = require('kafkajs');
const pino =require('pino');
const transformationEngine = require('./transformation');
const algoliaWriter = require('./algoliaWriter');

const logger = pino({ level: 'info' });

const kafka = new Kafka({
  clientId: 'algolia-sync-service',
  brokers: ['localhost:9092'],
});

const consumer = kafka.consumer({ groupId: 'algolia-sync-group' });
const topic = 'pg-server.public.products';

const run = async () => {
  await consumer.connect();
  await consumer.subscribe({ topic, fromBeginning: true });

  logger.info(`Consumer subscribed to topic: ${topic}`);

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      let payload;
      try {
        // Debezium message payload is in the 'value' field.
        const messageValue = message.value.toString();
        // A common error is assuming the message is always valid JSON.
        if (!messageValue) {
            logger.warn({ key: message.key.toString() }, 'Received message with empty value, skipping.');
            return;
        }
        payload = JSON.parse(messageValue).payload;
      } catch (err) {
        logger.error({ err, value: message.value.toString() }, 'Failed to parse Kafka message.');
        // Here you would move the message to a Dead Letter Queue (DLQ).
        return;
      }

      try {
        const transformedData = await transformationEngine.run(topic, payload);
        
        // The transformation script can decide to skip a message by returning nothing.
        if (!transformedData || !transformedData.objectID) {
            logger.info({ payload }, 'Transformation script decided to skip this message.');
            return;
        }

        await algoliaWriter.process(transformedData);
        
        logger.info(`Successfully processed and synced objectID: ${transformedData.objectID}`);

      } catch (err) {
        logger.error({ err, payload }, 'Failed to process message.');
        // Again, this is where robust DLQ and retry logic is critical.
        // For now, we just log and continue. In production, this could lead to data loss.
      }
    },
  });
};

run().catch(e => logger.error({ err: e }, 'Consumer encountered a fatal error'));

// Graceful shutdown
const errorTypes = ['unhandledRejection', 'uncaughtException'];
const signalTraps = ['SIGTERM', 'SIGINT', 'SIGUSR2'];

errorTypes.forEach(type => {
  process.on(type, async e => {
    try {
      logger.fatal(e, `process.on ${type}`);
      await consumer.disconnect();
      process.exit(1);
    } catch (_) {
      process.exit(1);
    }
  });
});

signalTraps.forEach(type => {
  process.once(type, async () => {
    try {
      await consumer.disconnect();
    } finally {
      process.kill(process.pid, type);
    }
  });
});

运行服务前,确保设置了 Algolia 的环境变量。

局限性与未来迭代路径

这个架构虽然强大,但并非没有缺点。在真实项目中,还有很多坑要填。

  1. 沙箱安全性与资源控制: vm2 提供了不错的隔离,但面对恶意的转换脚本(例如,通过 while(true) 消耗 CPU),还需要更强的保护。timeout 是一个基本防御,但更精细的 CPU 和内存限制可能需要探索其他沙箱技术,或者在独立的、资源受限的 worker 进程中运行脚本。

  2. 脚本管理与版本控制: 用户脚本不能简单地放在文件系统里。需要一个带 UI 的管理后台,允许运营团队编辑、测试和发布脚本。所有脚本都应该有版本历史,以便快速回滚。

  3. 错误处理与可观测性: 当前的错误处理只是简单打印日志。一个生产级的系统必须有完善的 Dead Letter Queue (DLQ) 机制。任何解析、转换或写入失败的消息都应该被移到 DLQ,以便后续分析和重试。此外,需要详细的监控指标,比如消息处理延迟、转换脚本执行时间、Algolia API 错误率等。

  4. 数据库 Schema 变更: 这是一个大难题。如果 products 表添加或删除了一个字段,现有的转换脚本可能会失败。一个健壮的系统需要能从 Debezium 的消息中解析出 Schema 变更事件,并据此通知脚本维护者,甚至尝试自动适配或禁用旧脚本。

  5. 性能: 对于每个消息都执行 Babel 转译的开销不容忽视。虽然有缓存,但在服务启动或脚本更新时仍有性能冲击。对于极高吞吐量的场景,可能需要一个预编译步骤:当用户保存脚本时,后端服务就将其编译并缓存,消费者直接加载已编译的代码。

尽管存在这些挑战,这个方案的核心思想——利用 Babel 动态执行可信的 ESNext 脚本——为业务逻辑与底层数据管道的解耦提供了一条极具吸引力的路径。它将灵活性交给了最需要它的人,同时保持了后端架构的整洁和稳定。


  目录