项目的搜索功能改造需求拖了两个季度,终于轮到了。核心问题很典型:一个庞大的单体应用,后端是 PostgreSQL,内置的全文搜索功能又慢又原始,无法满足运营团队对搜索体验(如 typo tolerance、分面搜索)日益增长的需求。技术选型很快敲定,使用 Algolia 来提供高性能的搜索服务。
真正的挑战不在于接入 Algolia,而在于数据同步。运营团队提出的一个棘手需求是:他们需要频繁调整索引到 Algolia 的数据结构,比如合并 first_name
和 last_name
字段,根据商品状态动态添加 tag,或者格式化价格展示。他们不希望每次微调都依赖工程团队发版。传统的夜间批量同步延迟太高,而简单的数据库触发器方案又会和主业务逻辑耦合,维护性极差。
我们需要的是一个近乎实时的、解耦的、并且允许业务方动态配置转换逻辑的数据同步管道。
初步构想与架构权衡
第一反应是利用数据库的逻辑复制功能,也就是变更数据捕获 (Change Data Capture, CDC)。这能让我们以事件流的方式消费数据库的所有变更,完全与主业务应用解耦。Debezium 是这个领域的首选,它可以将 Postgres 的 WAL (Write-Ahead Log) 变更解析成结构化事件,推送到 Kafka 中。
消费端,一个独立的服务从 Kafka 拉取变更事件,然后写入 Algolia。到这里,架构的骨架基本清晰了。
graph TD A[PostgreSQL] -- WAL --> B[Debezium Connector]; B -- CDC Events --> C[Apache Kafka Topic]; C --> D[Sync Service]; D -- Transformed Data --> E[Algolia API]; subgraph Kafka Connect B end
但关键的“动态转换逻辑”还没解决。方案有几个:
- 硬编码: 在同步服务中写死转换逻辑。最简单,但完全不满足动态配置的需求。
- 配置化: 设计一套基于 JSON 的规则引擎。灵活度有限,对于复杂逻辑(如条件判断、字符串处理)会变得异常臃肿。
- 嵌入式脚本语言: 比如 Lua。学习曲线和生态是个问题。
最终,我们冒出了一个更大胆的想法:为什么不直接让运营团队用 JavaScript 来写转换逻辑?JS 的表达能力足够强,并且前端和运营的同事也相对熟悉。这里的坑在于,我们不能每次修改脚本都重启同步服务。服务必须能动态加载并执行这些脚本。更进一步,为了跟上前端技术栈,我们希望能支持 ESNext 语法。
这就是 Babel 登场的契机。我们可以在 Node.js 同步服务中,实时加载用户提供的 ESNext 脚本字符串,使用 Babel 的 API 将其动态转换为 ES5 代码,然后在一个安全的沙箱环境中执行。
这个方案听起来有些疯狂,但在工程上是完全可行的。它将数据转换的灵活性提升到了极致,代价是需要处理好安全性和性能。
搭建数据源头:Postgres, Debezium 与 Kafka
首先,我们需要一个完整的本地环境来模拟生产。使用 Docker Compose 是最直接的方式。
docker-compose.yml
:
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.3.0
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
postgres:
image: debezium/postgres:14
container_name: postgres
ports:
- "5432:5432"
environment:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=password
- POSTGRES_DB=app_db
volumes:
- ./pg_data:/var/lib/postgresql/data
connect:
image: debezium/connect:2.1
container_name: connect
ports:
- "8083:8083"
depends_on:
- kafka
- postgres
environment:
- BOOTSTRAP_SERVERS=kafka:29092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
- CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
- CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
启动后,需要为 PostgreSQL 准备一张表,并开启逻辑复制。
-- Connect to the postgres container and run psql
-- \c app_db user
CREATE TABLE products (
id SERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description TEXT,
price_cents INT NOT NULL,
stock_quantity INT NOT NULL,
is_active BOOLEAN DEFAULT true,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Debezium needs this publication
CREATE PUBLICATION dbz_publication FOR ALL TABLES;
接下来,通过 Kafka Connect 的 REST API 注册 Debezium 的 PostgreSQL 连接器。
register-connector.sh
:
#!/bin/bash
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
localhost:8083/connectors/ -d '{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "user",
"database.password": "password",
"database.dbname": "app_db",
"database.server.name": "pg-server",
"table.include.list": "public.products",
"plugin.name": "pgoutput",
"publication.name": "dbz_publication",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"slot.name": "debezium_slot_products",
"snapshot.mode": "initial"
}
}'
执行这个脚本后,Debezium 会先对 products
表进行一次快照,然后开始监听 WAL 的实时变更。所有变更都会被发送到名为 pg-server.public.products
的 Kafka topic 中。
核心:Babel 驱动的动态转换服务
这是整个系统的核心。我们将用 Node.js 和 kafkajs
构建消费者服务。
项目结构:
sync-service/
├── src/
│ ├── consumer.js # Kafka consumer logic
│ ├── transformation.js # Babel + VM2 transformation engine
│ └── algoliaWriter.js # Algolia client wrapper
├── scripts/
│ └── product_transformer.js # User-defined transformation script (example)
├── package.json
└── ...
首先安装依赖:
npm install kafkajs algoliasearch @babel/core @babel/preset-env vm2 pino
用户定义的转换脚本
这是我们将暴露给运营团队的配置。他们可以编写这样的 ESNext 脚本,并将其存储在配置中心或者一个专用的数据库表中。
scripts/product_transformer.js
:
// This script is executed in a sandboxed environment.
// It receives a `payload` object from Debezium and must return an object.
// The returned object has two keys:
// - objectID: The unique ID for the Algolia record.
// - record: The data to be indexed, or `null` to delete the record.
function transform(payload) {
// Debezium payload structure: { before: {...}, after: {...}, op: 'c'|'u'|'d'|'r' }
const { op, after, before } = payload;
if (op === 'd') {
// Handle deletion
return {
objectID: before.id,
record: null, // Returning null signifies deletion in Algolia
};
}
// For create ('c'), update ('u'), or read from snapshot ('r')
const source = after;
// A common mistake is not checking for active records.
// We only want to index active products.
if (!source.is_active) {
// If an existing product becomes inactive, we should delete it from the index.
return {
objectID: source.id,
record: null,
};
}
// The core transformation logic
const record = {
name: source.name,
description: source.description || '', // Ensure no null values
// A nice feature: combining fields for better searchability
search_meta: `${source.name} ${source.description || ''}`.trim(),
// Formatting price from cents to a float
price: parseFloat((source.price_cents / 100).toFixed(2)),
// Generating dynamic tags based on stock level
tags: [],
stock: source.stock_quantity,
// ISO 8601 format for Algolia's timestamp filters
last_updated: new Date(source.updated_at).toISOString(),
};
if (source.stock_quantity === 0) {
record.tags.push('out_of_stock');
} else if (source.stock_quantity < 10) {
record.tags.push('low_stock');
}
return {
objectID: source.id,
record: record,
};
}
// The sandbox requires us to export the main function.
module.exports = { transform };
这个脚本的健壮性至关重要。它必须处理所有 op
类型,并且对 null
值有防御性编程。
动态转换引擎 transformation.js
这是 Babel 和 vm2
结合的地方。vm2
提供了一个安全的沙箱,防止用户脚本访问文件系统或执行恶意代码。
// src/transformation.js
const { NodeVM } = require('vm2');
const Babel = require('@babel/core');
const pino = require('pino');
const logger = pino({ level: 'info' });
class TransformationEngine {
constructor() {
// In a real project, the script content would be fetched from a database
// or a configuration service. Here we simulate it.
// We also cache the transpiled script for performance.
this.scriptCache = new Map();
}
// Simulates fetching a script for a given table/topic
async getScript(topic) {
// For this example, we always return the same script.
// In production, you'd have logic to map topic to a script.
const fs = require('fs');
const path = require('path');
return fs.readFileSync(path.join(__dirname, '../scripts/product_transformer.js'), 'utf-8');
}
transpile(scriptContent) {
try {
const result = Babel.transformSync(scriptContent, {
presets: ['@babel/preset-env'],
// Avoid creating a big `require` polyfill for our simple use case.
sourceType: 'script'
});
return result.code;
} catch (error) {
logger.error({ err: error }, 'Babel transpilation failed.');
throw new Error(`Babel transpilation failed: ${error.message}`);
}
}
async run(topic, payload) {
let compiledScript = this.scriptCache.get(topic);
if (!compiledScript) {
try {
const scriptContent = await this.getScript(topic);
compiledScript = this.transpile(scriptContent);
this.scriptCache.set(topic, compiledScript);
logger.info(`Successfully compiled and cached script for topic: ${topic}`);
} catch (err) {
// If compilation fails, we can't process messages for this topic.
logger.error({ err, topic }, 'Failed to load or transpile script.');
// We throw to let the consumer handle the error (e.g., move to DLQ).
throw err;
}
}
const vm = new NodeVM({
console: 'inherit',
sandbox: {},
require: {
external: false, // Disallow any 'require' in the script
},
timeout: 1000, // Protect against infinite loops
});
try {
// The transpiled code will be in a simple, self-contained script format.
// We execute it and get the exports.
const scriptModule = vm.run(compiledScript);
// A common pitfall is the script not exporting the transform function correctly.
if (typeof scriptModule.transform !== 'function') {
throw new Error('Script must export a "transform" function.');
}
return scriptModule.transform(payload);
} catch (error) {
logger.error({ err: error, payload }, 'Sandbox execution failed.');
throw new Error(`Sandbox execution failed: ${error.message}`);
}
}
}
module.exports = new TransformationEngine();
Algolia 写入模块 algoliaWriter.js
这个模块负责与 Algolia API 通信,并处理批量写入以提高效率。
// src/algoliaWriter.js
const algoliasearch = require('algoliasearch');
const pino = require('pino');
const logger = pino({ level: 'info' });
class AlgoliaWriter {
constructor(appId, apiKey, indexName) {
if (!appId || !apiKey || !indexName) {
throw new Error('Algolia credentials and index name are required.');
}
const client = algoliasearch(appId, apiKey);
this.index = client.initIndex(indexName);
logger.info(`Algolia writer initialized for index: ${indexName}`);
}
async upsert(record) {
if (!record || !record.objectID) {
logger.warn({ record }, 'Invalid record for upsert, missing objectID.');
return;
}
try {
await this.index.saveObject(record, { autoGenerateObjectIDIfNotExist: false });
} catch (error) {
logger.error({ err: error, objectID: record.objectID }, 'Failed to save object to Algolia.');
throw error; // Propagate error for retry logic
}
}
async remove(objectID) {
if (!objectID) {
logger.warn('Invalid objectID for removal.');
return;
}
try {
await this.index.deleteObject(objectID);
} catch (error) {
logger.error({ err: error, objectID }, 'Failed to delete object from Algolia.');
throw error;
}
}
// In a high-throughput system, you'd implement batching here.
// For simplicity, we are doing single operations.
async process(transformedData) {
const { objectID, record } = transformedData;
if (record) {
// The record has data, so we create or update it.
await this.upsert({ ...record, objectID });
} else {
// The record is null, signifying a deletion.
await this.remove(objectID);
}
}
}
// Remember to use environment variables for credentials in production.
module.exports = new AlgoliaWriter(
process.env.ALGOLIA_APP_ID,
process.env.ALGOLIA_API_KEY,
'products_index'
);
Kafka 消费者 consumer.js
最后,将所有部分串联起来。
// src/consumer.js
const { Kafka } = require('kafkajs');
const pino =require('pino');
const transformationEngine = require('./transformation');
const algoliaWriter = require('./algoliaWriter');
const logger = pino({ level: 'info' });
const kafka = new Kafka({
clientId: 'algolia-sync-service',
brokers: ['localhost:9092'],
});
const consumer = kafka.consumer({ groupId: 'algolia-sync-group' });
const topic = 'pg-server.public.products';
const run = async () => {
await consumer.connect();
await consumer.subscribe({ topic, fromBeginning: true });
logger.info(`Consumer subscribed to topic: ${topic}`);
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
let payload;
try {
// Debezium message payload is in the 'value' field.
const messageValue = message.value.toString();
// A common error is assuming the message is always valid JSON.
if (!messageValue) {
logger.warn({ key: message.key.toString() }, 'Received message with empty value, skipping.');
return;
}
payload = JSON.parse(messageValue).payload;
} catch (err) {
logger.error({ err, value: message.value.toString() }, 'Failed to parse Kafka message.');
// Here you would move the message to a Dead Letter Queue (DLQ).
return;
}
try {
const transformedData = await transformationEngine.run(topic, payload);
// The transformation script can decide to skip a message by returning nothing.
if (!transformedData || !transformedData.objectID) {
logger.info({ payload }, 'Transformation script decided to skip this message.');
return;
}
await algoliaWriter.process(transformedData);
logger.info(`Successfully processed and synced objectID: ${transformedData.objectID}`);
} catch (err) {
logger.error({ err, payload }, 'Failed to process message.');
// Again, this is where robust DLQ and retry logic is critical.
// For now, we just log and continue. In production, this could lead to data loss.
}
},
});
};
run().catch(e => logger.error({ err: e }, 'Consumer encountered a fatal error'));
// Graceful shutdown
const errorTypes = ['unhandledRejection', 'uncaughtException'];
const signalTraps = ['SIGTERM', 'SIGINT', 'SIGUSR2'];
errorTypes.forEach(type => {
process.on(type, async e => {
try {
logger.fatal(e, `process.on ${type}`);
await consumer.disconnect();
process.exit(1);
} catch (_) {
process.exit(1);
}
});
});
signalTraps.forEach(type => {
process.once(type, async () => {
try {
await consumer.disconnect();
} finally {
process.kill(process.pid, type);
}
});
});
运行服务前,确保设置了 Algolia 的环境变量。
局限性与未来迭代路径
这个架构虽然强大,但并非没有缺点。在真实项目中,还有很多坑要填。
沙箱安全性与资源控制:
vm2
提供了不错的隔离,但面对恶意的转换脚本(例如,通过while(true)
消耗 CPU),还需要更强的保护。timeout
是一个基本防御,但更精细的 CPU 和内存限制可能需要探索其他沙箱技术,或者在独立的、资源受限的 worker 进程中运行脚本。脚本管理与版本控制: 用户脚本不能简单地放在文件系统里。需要一个带 UI 的管理后台,允许运营团队编辑、测试和发布脚本。所有脚本都应该有版本历史,以便快速回滚。
错误处理与可观测性: 当前的错误处理只是简单打印日志。一个生产级的系统必须有完善的 Dead Letter Queue (DLQ) 机制。任何解析、转换或写入失败的消息都应该被移到 DLQ,以便后续分析和重试。此外,需要详细的监控指标,比如消息处理延迟、转换脚本执行时间、Algolia API 错误率等。
数据库 Schema 变更: 这是一个大难题。如果
products
表添加或删除了一个字段,现有的转换脚本可能会失败。一个健壮的系统需要能从 Debezium 的消息中解析出 Schema 变更事件,并据此通知脚本维护者,甚至尝试自动适配或禁用旧脚本。性能: 对于每个消息都执行 Babel 转译的开销不容忽视。虽然有缓存,但在服务启动或脚本更新时仍有性能冲击。对于极高吞吐量的场景,可能需要一个预编译步骤:当用户保存脚本时,后端服务就将其编译并缓存,消费者直接加载已编译的代码。
尽管存在这些挑战,这个方案的核心思想——利用 Babel 动态执行可信的 ESNext 脚本——为业务逻辑与底层数据管道的解耦提供了一条极具吸引力的路径。它将灵活性交给了最需要它的人,同时保持了后端架构的整洁和稳定。