简体中文
Appearance
简体中文
Appearance
Format 用于将 connector 读取的消息解析为 source 的列结构。
| Format | 适用 connector | 说明 |
|---|---|---|
| JSON | Kafka、MQTT、HTTP | 适合结构化事件消息 |
| CSV | Kafka、MQTT、HTTP | 适合简单表格型文本或按行输入 |
| Parquet | HTTP | 适合一次性抓取或轮询返回的完整 Parquet 文件 |
| 配置项 | 类型 | 默认值 | 必选 | 说明 |
|---|---|---|---|---|
format | STRING | 无 | Yes | 指定消息格式,当前支持 json 和 csv |
bad_data | STRING | drop | No | 坏数据处理策略,支持 drop 或 fail |
说明:
bad_data 仅对 source 生效json 和 csv 默认按逐行方式解码,适合 newline-delimited JSON 和按行 CSVparquet 按整个 payload 解码为一份完整 Parquet 文件{"ts":"2025-01-01T00:00:01Z","sid":"sid-1","value":1.0}
{"ts":"2025-01-01T00:00:02Z","sid":"sid-2","value":2.0}| 配置项 | 类型 | 默认值 | 必选 | 说明 |
|---|---|---|---|---|
unstructured | BOOL | false | No | 是否允许更宽松的 JSON 解码,默认按 schema 严格解析 |
CREATE SOURCE src_json (
ts TIMESTAMP(9),
sid STRING,
value FLOAT64
) WITH (
connector='kafka',
brokers='127.0.0.1:9092',
topic='topic_json_demo',
format='json',
unstructured='false',
bad_data='fail'
);2025-01-01T00:00:03Z,sid-once,101
2025-01-01T00:00:04Z,sid-poll-1,201| 配置项 | 类型 | 默认值 | 必选 | 说明 |
|---|---|---|---|---|
has_header | BOOL | false | No | 是否将首行作为表头 |
delimiter | STRING | , | No | 单字节分隔符 |
CREATE SOURCE src_csv (
ts TIMESTAMP(9),
sid STRING,
value FLOAT64
) WITH (
connector='http',
endpoint='http://127.0.0.1:18080/poll',
method='GET',
poll='interval(1000)',
format='csv',
has_header='false',
delimiter=','
);CREATE SOURCE src_parquet (
ts TIMESTAMP(9),
sid STRING,
value FLOAT64
) WITH (
connector='http',
endpoint='http://127.0.0.1:18080/export.parquet',
method='GET',
poll='once',
format='parquet',
bad_data='fail'
);| 格式 | 推荐场景 | 不足 |
|---|---|---|
| JSON | 结构化事件、MQTT / Kafka 消息 | 文本体积通常更大 |
| CSV | 简单行式数据、HTTP 接口文本返回 | 字段可读性和扩展性较弱 |
| Parquet | HTTP 返回的列式文件、批量抓取 | 当前仅支持 HTTP connector |
当上游消息偶尔出现坏行、但更关注链路持续可用时,可以使用 bad_data='drop',让系统跳过当前坏批次。
当希望格式错误立即暴露、便于定位上游数据问题时,可以使用 bad_data='fail'。