Skip to content

MQTT Connector

MQTT connector 用于订阅 MQTT broker 中的主题消息,并持续读入 Datalayers source。

配置项

配置项类型默认值必选说明
connectorSTRINGYes固定为 mqtt
brokerSTRINGYesMQTT broker 地址,格式为 [scheme://]host:port;未写 scheme 时默认 tcp
topicSTRINGYes要订阅的 topic
qosSTRING0No服务质量等级,支持 012
client_idSTRINGdatalayers-<job_id>-consumerNoMQTT client ID
keep_aliveSTRING60sNokeep alive 时间,采用 duration 格式,例如 60s
connect_timeoutSTRING10sNo建立连接的超时时间
versionSTRINGNoMQTT 协议版本,支持 3.1.15.0
usernameSTRINGNo认证用户名
passwordSTRINGNo认证密码
caSTRINGNoTLS CA 证书
certSTRINGNoTLS 客户端证书
keySTRINGNoTLS 客户端私钥

broker 支持以下 scheme:

  • tcp
  • ssl
  • ws
  • wss

Metadata 字段

MQTT source 当前支持以下 metadata key:

metadata key类型说明
topicSTRING当前消息实际所在的 topic

Format 相关配置请参考 Formats

示例:读取 MQTT JSON 消息

1. 启动 MQTT broker

bash
docker run -d --name emqx-enterprise -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx-enterprise:6.1.1

2. 创建 sink table、source 和 pipeline

sql
CREATE DATABASE stream_demo_mqtt;
USE stream_demo_mqtt;

CREATE TABLE sink_t (
  ts TIMESTAMP(9),
  sid STRING,
  value FLOAT64,
  TIMESTAMP KEY(ts)
) ENGINE=TimeSeries
PARTITION BY HASH(sid) PARTITIONS 1;

CREATE SOURCE src_mqtt (
  ts TIMESTAMP(9),
  sid STRING,
  value FLOAT64
) WITH (
  connector='mqtt',
  broker='tcp://127.0.0.1:1883',
  topic='topic/stream_demo',
  qos='1',
  keep_alive='60s',
  connect_timeout='10s',
  format='json'
);

CREATE PIPELINE p_mqtt
SINK TO sink_t
AS
SELECT ts, sid, value
FROM src_mqtt
WHERE value >= 2.0;

3. 发布测试消息

如果环境里有 mosquitto_pub,可以直接发送:

bash
mosquitto_pub -h 127.0.0.1 -p 1883 -t topic/stream_demo -m '{"ts":"2025-01-01T00:00:01Z","sid":"sid-1","value":1.0}'
mosquitto_pub -h 127.0.0.1 -p 1883 -t topic/stream_demo -m '{"ts":"2025-01-01T00:00:02Z","sid":"sid-2","value":2.0}'
mosquitto_pub -h 127.0.0.1 -p 1883 -t topic/stream_demo -m '{"ts":"2025-01-01T00:00:03Z","sid":"sid-3","value":3.0}'

4. 查询结果

sql
SELECT ts, sid, value FROM sink_t ORDER BY ts;

预期仅看到 value >= 2.0 的两行。

注意事项

  • 当前 MQTT 仅支持作为 source,不支持作为 sink
  • 建议先确认 topic 中消息 schema 与 source 列定义一致