Skip to content

Datalayers 集成 EMQX 数据写入指南

EMQX 是一款高度可扩展、高性能的开源 MQTT 消息代理(Message Broker),用于处理海量物联网设备的消息通信。它是基于 Erlang/OTP 语言开发的,具有卓越的并发处理能力和稳定性。EMQX 主要应用于物联网(IoT)领域,帮助设备通过 MQTT 协议进行消息发布和订阅,广泛用于智慧城市、智能制造、车联网、能源管理等领域。

本文介绍如何通过 EMQX 规则引擎将数据存储到 Datalayers 中。

前置条件

  1. 已安装 Datalayers,版本要求:>= 2.1.8。
  2. 已安装 EMQX 企业版,版本要求:>= 5.8。

配置步骤

配置 Datalayers

向 Datalayers 中写入数据,需提前创建 databasetable,可使用 Datalayers 命令行工具 dlsql 来进行创建。

sql
> create database demo
Query OK, 0 rows affected. (0.001 sec)

> use demo
Database changed to `demo`

> CREATE TABLE `sensor_info` (
  time TIMESTAMP(9) NOT NULL,
  sid STRING NOT NULL,
  temp DOUBLE,
  timestamp key(time))
  PARTITION BY HASH (sid) PARTITIONS 1 
  ENGINE=TimeSeries
Query OK, 0 rows affected. (0.034 sec)

配置 EMQX

创建连接器

  • 登录 EMQX 管理控制台
  • 左侧菜单导航至 集成 > 连接器
  • 点击 "创建",在搜索框中输入 Datalayers,在搜索结果中选择 Datalayers, 点击下一步
  • 填写 Datalayers 连接信息,填写完成后可点击测试连接,确保连接成功,如下图:

demo

注意:服务器通讯地址应填写 Datalayers HTTP 地址

  • 点击创建,保存资源配置

创建规则

  • 在左侧菜单导航 集成 > 规则 页面,在左侧 SQL 编辑器中填写相应的 SQL 规则,点击右侧 创建。如下图:

    create emqx rule

  • 在创建规则页面填写相应连接器信息、数据模板。如下图:

    create emqx rule写语句的编辑器中,写入相应的语句模板。

  • 在右下角点击创建,以保存动作

  • 在创建规则页面,点击保存

完成以上步骤后,EMQX 规则引擎就会将匹配的 MQTT 消息写入 Datalayers。

数据写入示例

通过以上配置后,向 EMQX 发布消息(topic 以 t/ 开头)时,相应数据会自动保存到 Datalayers 中。以下使用 EMQX Dashboard 的“问题分析 > WebSocket 客户端”进行演示。

  • 在右边连接配置中,确保相应配置信息正常,点击连接,确保成功连接到 MQTT Broker

  • 在订阅栏中,主题框中输入 t/# 进行通配订阅,方便后续观察

  • 在发布窗口中,主题框输入 t/1,Payload 框中输入 { "sid": "1", "temp":25.6 },点击发布后可看到 pub 与 sub 消息,如下:

    emqx pub sub

此时说明消息已经成功发布到 MQTT Broker。

  • 使用 Datalayers dlsql 命令行工具查看写入的数据
sql
demo> select * from sensor_info;
+-------------------------------+-----+------+
| time                          | sid | temp |
+-------------------------------+-----+------+
| 2024-09-12T20:26:58.804+08:00 | 1   | 25.6 |
| 2024-09-12T20:29:16.010+08:00 | 1   | 25.7 |
+-------------------------------+-----+------+
2 rows in set (0.001 sec)

更多配置方式参考