简体中文
Appearance
简体中文
Appearance
本文通过一个最小可运行示例,演示 Kafka -> Source -> Pipeline -> Sink Table 的完整链路。
dlsql 命令行工具先拉取 Kafka 镜像:
docker pull confluentinc/cp-kafka:7.7.1使用单节点 KRaft 模式启动一个本地 Kafka 容器:
docker run -d --name dl-kafka \
-p 9092:9092 \
-e KAFKA_NODE_ID=1 \
-e KAFKA_PROCESS_ROLES='broker,controller' \
-e KAFKA_LISTENERS='PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093' \
-e KAFKA_ADVERTISED_LISTENERS='PLAINTEXT://127.0.0.1:9092' \
-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP='CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT' \
-e KAFKA_CONTROLLER_QUORUM_VOTERS='1@127.0.0.1:9093' \
-e KAFKA_CONTROLLER_LISTENER_NAMES='CONTROLLER' \
-e KAFKA_INTER_BROKER_LISTENER_NAME='PLAINTEXT' \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
-e CLUSTER_ID='MkU3OEVBNTcwNTJENDM2Qk' \
confluentinc/cp-kafka:7.7.1创建测试 topic:
docker exec -it dl-kafka kafka-topics \
--bootstrap-server 127.0.0.1:9092 \
--create \
--topic topic_stream_demo \
--partitions 1 \
--replication-factor 1./target/bin/dlsql -h 127.0.0.1 -P 8360 -u admin -p publicCREATE DATABASE stream_demo;
USE stream_demo;
CREATE TABLE sink_t (
ts TIMESTAMP(9) NOT NULL,
sid STRING NOT NULL,
value FLOAT64,
TIMESTAMP KEY(ts)
) ENGINE=TimeSeries
PARTITION BY HASH(sid) PARTITIONS 1;这里的 sink_t 是 pipeline 的写入目标。当前版本要求 sink 必须为 TimeSeries 表。
创建 Kafka source:
CREATE SOURCE src_kafka (
ts TIMESTAMP(9) NOT NULL,
sid STRING NOT NULL,
value FLOAT64
) WITH (
connector='kafka',
brokers='127.0.0.1:9092',
topic='topic_stream_demo',
offset='earliest',
format='json'
);创建 pipeline,仅保留 value >= 2.0 的事件:
CREATE PIPELINE p_kafka
SINK TO sink_t
AS
SELECT ts, sid, value
FROM src_kafka
WHERE value >= 2.0;在另一个终端启动 producer,然后输入测试数据。
docker exec -it dl-kafka kafka-console-producer \
--bootstrap-server 127.0.0.1:9092 \
--topic topic_stream_demo输入以下三行 JSON:
{"ts":"2025-01-01T00:00:01Z","sid":"sid-1","value":1.0}
{"ts":"2025-01-01T00:00:02Z","sid":"sid-2","value":2.0}
{"ts":"2025-01-01T00:00:03Z","sid":"sid-3","value":3.0}按 Ctrl-D 结束。
回到 dlsql,执行:
SELECT ts, sid, value FROM sink_t ORDER BY ts;预期结果仅包含两行,即 value >= 2.0 的记录。
查看 source、pipeline 和重建 SQL:
# 查看当前数据库下的所有 source
SHOW SOURCES;
# 查看当前数据库下的所有 pipeline
SHOW PIPELINES;
# 查看指定 source 的定义 SQL
SHOW CREATE SOURCE src_kafka;
# 查看指定 pipeline 的定义 SQL
SHOW CREATE PIPELINE p_kafka;停止和重启 pipeline:
# 停止一个运行中的 pipeline
ALTER PIPELINE p_kafka STOP;
# 重启一个 pipeline
ALTER PIPELINE p_kafka RESTART;# 删除 pipeline
DROP PIPELINE p_kafka;
# 删除 source
DROP SOURCE src_kafka;
# 删除 sink table
DROP TABLE sink_t;
# 删除数据库
DROP DATABASE stream_demo;docker rm -f dl-kafka