简体中文
Appearance
简体中文
Appearance
Kafka connector 用于将 Kafka topic 中的消息持续读入 Datalayers source。
| 配置项 | 类型 | 默认值 | 必选 | 说明 |
|---|---|---|---|---|
connector | STRING | 无 | Yes | 固定为 kafka |
brokers | STRING | 无 | Yes | Kafka broker 列表,逗号分隔,格式为 host:port |
topic | STRING | 无 | Yes | 要消费的 topic |
offset | STRING | latest | No | 起始消费位置,支持 earliest、latest、at(<timestamp>) |
group_id | STRING | 无 | No | 消费组 ID |
username | STRING | 无 | No | SASL 用户名 |
password | STRING | 无 | No | SASL 密码 |
Format 相关配置请参考 Formats。
先拉取 Kafka 镜像:
docker pull confluentinc/cp-kafka:7.7.1docker run -d --name dl-kafka \
-p 9092:9092 \
-e KAFKA_NODE_ID=1 \
-e KAFKA_PROCESS_ROLES='broker,controller' \
-e KAFKA_LISTENERS='PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093' \
-e KAFKA_ADVERTISED_LISTENERS='PLAINTEXT://127.0.0.1:9092' \
-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP='CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT' \
-e KAFKA_CONTROLLER_QUORUM_VOTERS='1@127.0.0.1:9093' \
-e KAFKA_CONTROLLER_LISTENER_NAMES='CONTROLLER' \
-e KAFKA_INTER_BROKER_LISTENER_NAME='PLAINTEXT' \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
-e CLUSTER_ID='MkU3OEVBNTcwNTJENDM2Qk' \
confluentinc/cp-kafka:7.7.1创建 topic:
docker exec -it dl-kafka kafka-topics \
--bootstrap-server 127.0.0.1:9092 \
--create \
--topic topic_stream_demo \
--partitions 1 \
--replication-factor 1CREATE DATABASE stream_demo;
USE stream_demo;
CREATE TABLE sink_t (
ts TIMESTAMP(9) NOT NULL,
sid STRING NOT NULL,
value FLOAT64,
TIMESTAMP KEY(ts)
) ENGINE=TimeSeries
PARTITION BY HASH(sid) PARTITIONS 1;
CREATE SOURCE src_kafka (
ts TIMESTAMP(9) NOT NULL,
sid STRING NOT NULL,
value FLOAT64
) WITH (
connector='kafka',
brokers='127.0.0.1:9092',
topic='topic_stream_demo',
offset='earliest',
format='json'
);
CREATE PIPELINE p_kafka
SINK TO sink_t
AS
SELECT ts, sid, value
FROM src_kafka
WHERE value >= 2.0;docker exec -it dl-kafka kafka-console-producer \
--bootstrap-server 127.0.0.1:9092 \
--topic topic_stream_demo输入:
{"ts":"2025-01-01T00:00:01Z","sid":"sid-1","value":1.0}
{"ts":"2025-01-01T00:00:02Z","sid":"sid-2","value":2.0}
{"ts":"2025-01-01T00:00:03Z","sid":"sid-3","value":3.0}SELECT ts, sid, value FROM sink_t ORDER BY ts;预期仅看到 value >= 2.0 的两行。
brokers,CREATE SOURCE 会直接失败DROP SOURCE 时该 source 仍被某个 pipeline 引用,执行会失败