简体中文
Appearance
简体中文
Appearance
当前版本的流计算能力以在线清洗和简单过滤为主,暂不支持复杂窗口聚合、状态管理和事件时间处理等功能。
设备持续上报遥测数据,只保留超过阈值的异常事件,并写入内部时序表供后续查询和告警使用。
CREATE TABLE sink_device_alerts (
ts TIMESTAMP(9) NOT NULL,
sid STRING NOT NULL,
value FLOAT64,
TIMESTAMP KEY(ts)
) ENGINE=TimeSeries
PARTITION BY HASH(sid) PARTITIONS 1;CREATE SOURCE src_device_kafka (
ts TIMESTAMP(9) NOT NULL,
sid STRING NOT NULL,
value FLOAT64
) WITH (
connector='kafka',
brokers='127.0.0.1:9092',
topic='topic_device_metrics',
offset='earliest',
format='json'
);CREATE PIPELINE p_device_alerts
SINK TO sink_device_alerts
AS
SELECT ts, sid, value
FROM src_device_kafka
WHERE value >= 80.0;工业设备或边缘网关通过 MQTT 上报 JSON 消息,只保留满足业务条件的数据写入数据库。
CREATE SOURCE src_factory_mqtt (
ts TIMESTAMP(9) NOT NULL,
sid STRING NOT NULL,
value FLOAT64
) WITH (
connector='mqtt',
broker='127.0.0.1:1883',
topic='factory/line1/sensor',
qos='1',
format='json'
);CREATE TABLE sink_factory_events (
ts TIMESTAMP(9) NOT NULL,
sid STRING NOT NULL,
value FLOAT64,
TIMESTAMP KEY(ts)
) ENGINE=TimeSeries
PARTITION BY HASH(sid) PARTITIONS 1;
CREATE PIPELINE p_factory_events
SINK TO sink_factory_events
AS
SELECT ts, sid, value
FROM src_factory_mqtt
WHERE value >= 2.0;某个第三方系统仅提供 HTTP API,没有消息队列。此时可以按固定周期轮询接口,并将返回结果持续写入数据库。
CREATE SOURCE src_http_once (
ts TIMESTAMP(9) NOT NULL,
sid STRING NOT NULL,
value FLOAT64
) WITH (
connector='http',
endpoint='http://127.0.0.1:18080/once',
method='GET',
poll='once',
format='csv'
);CREATE SOURCE src_http_poll (
ts TIMESTAMP(9) NOT NULL,
sid STRING NOT NULL,
value FLOAT64
) WITH (
connector='http',
endpoint='http://127.0.0.1:18080/poll?ts=${now_ts}',
method='GET',
poll='interval(200)',
format='csv'
);CREATE TABLE sink_http_poll (
ts TIMESTAMP(9) NOT NULL,
sid STRING NOT NULL,
value FLOAT64,
TIMESTAMP KEY(ts)
) ENGINE=TimeSeries
PARTITION BY HASH(sid) PARTITIONS 1;
CREATE PIPELINE p_http_poll
SINK TO sink_http_poll
AS
SELECT ts, sid, value
FROM src_http_poll
WHERE value >= 201.0;