2020年9月14日月曜日

Docker で MQTT -> Fluentd -> MySQL する

やること

+-----------------------+  +-------------------+  +-------------------------+  +-------+
| HiveMQ                |->| EMQ X             |->| Fluentd                 |->| MySQL |
| (MQTT パブリッシャー) |  | (MQTT ブローカー) |  | (MQTT サブスクライバー) |  |       |
+-----------------------+  +-------------------+  +-------------------------+  +-------+

前提

とりあえず動かす

MySQL

テーブル定義を作って ./docker-entrypoint-initdb.d にマウント

設定ファイル作成

mkdir -p ./conf/mysql/initdb.d

./conf/mysql/initdb.d/Message.sql

DROP SCHEMA IF EXISTS demo;
CREATE SCHEMA demo;
USE demo;

DROP TABLE IF EXISTS message;

CREATE TABLE message
(
  id           INT(10),
  node_name    VARCHAR(40),
  user_name    VARCHAR(40),
  message      VARCHAR(40)
);

起動

docker run -it --rm --name mysql -e "MYSQL_ROOT_PASSWORD=password" -p "3306:3306" -v "$(pwd)/conf/mysql/initdb.d:/docker-entrypoint-initdb.d" mysql:8.0.21 --default-authentication-plugin=mysql_native_password

EMQ X

docker run で立ち上げるだけ。

docker run -it --rm --name emqx -p 1883:1883 -p 8083:8083 emqx/emqx:4.2.0
  • 1883: MQTT TCP port
  • 8083: MQTT WS port

Fluentd

  1. 使用するプラグインをインストールした Docker イメージを作成
  2. プラグイン定義を作って /fluentd/etc/fluent.conf にマウント

Dockerfile を作成

mkdir dockerfile/fluentd

./dockerfile/fluend/Dockerfile

FROM fluent/fluentd:v1.11.2-1.0

LABEL maintainer "mikoto2000 <mikoto2000@gmail.com>"
LABEL version="1.0.0"
LABEL description "fluentd v1.11.2-1.0, fluent-plugin-mqtt, fluent-plugin-mysql."

USER root

RUN apk add alpine-sdk ruby-dev mariadb-dev

RUN fluent-gem install fluent-plugin-mqtt-io fluent-plugin-mysql

USER fluent

fluentd 設定ファイルを作成

mkdir -p ./conf/fluentd/td-agent

./conf/fluentd/td-agent/td-agent.conf

<source>
  @type mqtt
  host host.docker.internal
  port 1883
  topic test/#
  <parse>
    @type json
  </parse>
</source>

<filter test.**>
  @type record_transformer
  <record>
    node_name ${tag_parts[1]}
  </record>
</filter>

<match test.one>
  @type mysql_bulk
  host host.docker.internal
  port 3306
  database demo
  username root
  password password
  column_names id,node_name,user_name,message
  table message
  flush_interval 10s
</match>

イメージビルド

docker build -t mikoto2000/fluentd:mqtt-mysql ./dockerfile/fluentd

起動

docker run -it --rm --name fluentd -v "$(pwd)/conf/fluentd/td-agent/td-agent.conf:/fluentd/etc/fluent.conf" mikoto2000/fluentd:mqtt-mysql

接続確認

Topic を Publish

  1. http://www.hivemq.com/demos/websocket-client に接続
  2. Connection を設定し、 Connect ボタン押下
    • Host: localhost
    • Port: 8083
  3. Publish を設定し、 Publish ボタン押下
    • Topic: test/one
    • Message: {"id":1, "user_name": "test", "message":"testing"}

MySQL テーブル確認

docker execmysql コンテナに接続してテーブルを確認。

> docker exec -it mysql mysql -p
Enter password:
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 11
Server version: 8.0.21 MySQL Community Server - GPL

Copyright (c) 2000, 2020, Oracle and/or its affiliates. All rights reserved.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> use demo;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
mysql> select * from message;
+------+-----------+-----------+---------+
| id   | node_name | user_name | message |
+------+-----------+-----------+---------+
|    1 | one       | test      | testing |
+------+-----------+-----------+---------+
1 row in set (0.00 sec)

参考資料

0 件のコメント:

コメントを投稿