简体中文
Appearance
简体中文
Appearance
Format 用于将 connector 读取的消息解析为 source 的列结构。
| Format | 适用 connector | 说明 |
|---|---|---|
| JSON | Kafka、MQTT、HTTP | 适合结构化事件消息 |
| CSV | Kafka、MQTT、HTTP | 适合简单表格型文本或按行输入 |
| 配置项 | 类型 | 默认值 | 必选 | 说明 |
|---|---|---|---|---|
format | STRING | 无 | Yes | 指定消息格式,当前支持 json 和 csv |
bad_data | STRING | drop | No | 坏数据处理策略,支持 drop 或 fail |
说明:
bad_data 仅对 source 生效{"ts":"2025-01-01T00:00:01Z","sid":"sid-1","value":1.0}
{"ts":"2025-01-01T00:00:02Z","sid":"sid-2","value":2.0}| 配置项 | 类型 | 默认值 | 必选 | 说明 |
|---|---|---|---|---|
unstructured | BOOL | false | No | 是否允许更宽松的 JSON 解码,默认按 schema 严格解析 |
CREATE SOURCE src_json (
ts TIMESTAMP(9) NOT NULL,
sid STRING NOT NULL,
value FLOAT64
) WITH (
connector='kafka',
brokers='127.0.0.1:9092',
topic='topic_json_demo',
format='json',
unstructured='false',
bad_data='fail'
);2025-01-01T00:00:03Z,sid-once,101
2025-01-01T00:00:04Z,sid-poll-1,201| 配置项 | 类型 | 默认值 | 必选 | 说明 |
|---|---|---|---|---|
has_header | BOOL | false | No | 是否将首行作为表头 |
delimiter | STRING | , | No | 单字节分隔符 |
CREATE SOURCE src_csv (
ts TIMESTAMP(9) NOT NULL,
sid STRING NOT NULL,
value FLOAT64
) WITH (
connector='http',
endpoint='http://127.0.0.1:18080/poll',
method='GET',
poll='interval(1000)',
format='csv',
has_header='false',
delimiter=','
);| 格式 | 推荐场景 | 不足 |
|---|---|---|
| JSON | 结构化事件、MQTT / Kafka 消息 | 文本体积通常更大 |
| CSV | 简单行式数据、HTTP 接口文本返回 | 字段可读性和扩展性较弱 |
当上游消息偶尔出现坏行、但更关注链路持续可用时,可以使用 bad_data='drop',让系统跳过当前坏批次。
当希望格式错误立即暴露、便于定位上游数据问题时,可以使用 bad_data='fail'。