MQTT on ConoHa Cloud

ConoHaにMQTT Mosquittoのテンプレートが追加された。このテンプレート、通常のMQTT 1883だけじゃなくWebSocketも11883で使える。つまり、ブラウザとMQTTを使う端末との接合点としても使える。端末とネットだけじゃなく、ブラウザ(WebSocket)というのも管理的な所やUI、使い勝手的な所では必要になってくると思われ。

まず使ってみる

サーバ追加でMosquittoテンプレを選んで起動するだけ。1.3円/hourで全然十分、サクッと試してみるだけだし。
mosquitto

そして追加したら、今度は端末側。Particle Photonを使うことに。Photonに温度センサ、照度センサを付けた状態がこんな感じ。ソースは下に書いてあるから、コピペすれば使える。もちろん、Photon以外にRaspberry Pi、Arduino Ethernetシールドとか、普通にMQTTが話せる端末だったら何でもOK。

そしたら、今度はWeb側。素敵な方がMQTT/WebSocketを公開されていて、それを使わせて頂くことに。Addressにさっき追加したMosquittoサーバのIPとWebSocket用のポート11883を指定して…こんな感じで。


ConnectしてSubscribeすると、温度と照度のセンサーデータが流れてくるのと、PublishもLEDの色を指定すると端末側の色が変わる。とっても簡単、MosquittoテンプレートにWebSocket付きなのもあって、サーバを追加してあげるとすぐWeb上で確認出来る。

 

Elastic/Kibanaと一緒に使ってみる

ちょっとクラウド的な真面目な構成を考えると次のような感じの図になる。これはセンサーデータをクラウド側でストア、可視化したり分析、そしてWebから端末コントロールをWebSocketで行うような時の構成。恐らく、多くのネットワークに繋がる端末が出てきたとき、このような構成になると考えられる。間にVIPが入って冗長化とかは抜いているけど、その辺も込みでこんな感じに。


ここで重要なパーツになるのがFluentdになる。MQTTを一旦、Fluentdで受けてバックエンドに流すことになる。
これは必ずしも多くのデバイスがMQTTを話す訳では無いという経験から。例えば、JSON、HTTP(GET)、バイナリ直接だったり、UDP(CoAP)という事もあり得る。「プロトコルを変えたいです」->「ファームの入れ替えが。。。」というのは全然ある事で、システム側にProtocol Translationを担う時が必ずやってくると思われ。そこで、実績もプラグインも豊富にあって機能追加も楽だし冗長化も簡単なfluentdは必然的に使った方が良い。多種のデバイス連携やオレオレプロトコルが入り込んで来ることはあり得るからね。

Elastic/Kibana

データの可視化でElatic/Kibanaを使う。センサーからのデータを検索、グラフ化して見るための物で、うまく動くとこんな感じでほぼリアルタイムで照度と温度がグラフで見れる。

以下は、Elastic/Kibanaのインストール。MQTTを入れたサーバでも良いし、別にサーバを立ててもどこでもOK。ただし環境はUbuntuを想定している。

# es install
wget -qO - https://packages.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -
echo "deb http://packages.elastic.co/elasticsearch/2.x/debian stable main" | sudo tee -a /etc/apt/sources.list.d/elasticsearch-2.x.list
sudo apt-get update && sudo apt-get install elasticsearch

# センサーデータ用 mapping追加
curl -XPUT http://localhost:9200/_template/sensor-template_1 -d '
{
    "template": "sensor-log-*",
    "mappings": {
      "_default_": {
        "properties": {
          "temp": {
            "type": "float",
            "index": "analyzed"
          },
          "lux": {
            "type": "float",
            "index": "analyzed"
          }
        }
      }
    }
}
'
# kibanaダウンロード&起動
wget https://download.elastic.co/kibana/kibana/kibana-4.4.2-linux-x64.tar.gz
tar -zxvf kibana-4.4.2-linux-x64.tar.gz
kibana-4.4.2-linux-x64/bin/kibana 

Fluentd

次はFluentd。ここではMosquittoからデータを取得して、Elasticにメッセージをルーティングする感じになる。Mosquittoへの接続と、Elasticへのデータ送信部分を設定する。ここでもサーバはUbuntuを想定している。

# fluentd 
curl -L https://toolbelt.treasuredata.com/sh/install-ubuntu-trusty-td-agent2.sh | sh
/opt/td-agent/embedded/bin/gem install fluent-plugin-mqtt-io

# 設定 /etc/td-agent/td-agent.conf に追加
<source>
 type mqtt
 # MosquittoサーバのIPを指定
 host XXX.XXX.XXX.XXX
 port 1883
 format json
 @label @MQTT_OUT
</source>

<label @MQTT_OUT>
 type copy
 <match sensor.**> 
 @type elasticsearch
 # ElasticのIP(localhostならそれでOK)を指定
 host localhost
 port 9200

 type_name sensor-log
 logstash_format true
 logstash_prefix sensor-log
 logstash_dateformat %Y%m

 buffer_type memory
 buffer_chunk_limit 10m
 buffer_queue_limit 10
 flush_interval 1s
 retry_limit 16
 retry_wait 1s
 </match>
</label>

# fluentdを再起動
/etc/init.d/td-agent restart

端末からデータを投げる

ここではParticle Photonを使う。別にPhotonじゃなくても、Raspbeerry PI、Arduino、iPhone/Android…別に何でも良いんだけど、手元にあったというだけの理由で。まぁ、あとParticle上でMQTT作ったりするというのもあったり。コードはこんな感じで、Arduinoとほぼ同じように書ける。
MosquittoへはMQTT(json形式のメッセージ)で送信する。

ちなみにParticleのWeb IDEのビルド画面はこんな感じで、Web上(インターネット側)から端末にプログラムをFlashする。現時点でPhoton上のMQTTは1600アプリ、CoAPはたったの11だけ使われている。この差はLibraryで提供している側としては、わりと泣けるというかUDPの方が軽いのに…というのは常々思っていたりする。まぁ、好きな方をつかえば良い。

#include "MQTT/MQTT.h"

void callback(char* topic, byte* payload, unsigned int length);
MQTT client("XXX.XXX.XXX.XXX", 1883, callback);

// Subscribeしたメッセージのコールバック
void callback(char* topic, byte* payload, unsigned int length) {
    char p[length + 1];
    memcpy(p, payload, length);
    p[length] = NULL;
    String message(p);

    if (message.equals("RED"))    
        RGB.color(255, 0, 0);
    else if (message.equals("GREEN"))    
        RGB.color(0, 255, 0);
    else if (message.equals("BLUE"))    
        RGB.color(0, 0, 255);
    else    
        RGB.color(255, 255, 255);
    delay(1000);
}


void setup() {
    RGB.control(true);
    
    // connect to the server
    client.connect("sparkclient");

    // publish/subscribe
    if (client.isConnected()) {
        client.subscribe("led/");
    }
}

void loop() {
    // MQTT用
    if (client.isConnected())
        client.loop();
    
    // 温度取得
    int tempValue = 0;
    for (int i = 0; i < 100; i++) {
        tempValue += analogRead(0);
    }
    tempValue = tempValue / 100;
    float v = (float(tempValue)  / 1024) * 3.3;
    float temp = (v - 0.424) / 0.08725;
    
    // 照度取得
    int luxValue = 0;
    for (int i = 0; i < 100; i++) {
        luxValue += analogRead(1);
    }
    luxValue = luxValue / 100;
    
    // MQTTにjson形式で配信
    client.publish("sensor/", String("{\"temp\":\"") + String(temp) + String("\", \"lux\" : \"") + String(luxValue) + String("\"}"));

    delay(1000);
}

以上で全部おわり。端末が無くても手動でmosquitto_pub/subコマンドで、直接mosquittoに接続して確認できる。

# センサーデータの送信
mosquitto_pub -h XXX.XXX.XXX.XXX -t sensor/ -m '{"temp" : "22.0303", "lux" : "703" }'

# LEDの色を赤色に変えてみる
mosquitto_pub -h XXX.XXX.XXX.XXX -t led/ -m 'RED'

# 受信して確認してみる
mosquitto_sub -h XXX.XXX.XXX.XXX -t sensor/

ConoHaのmosquittoテンプレは身内なサービス(自分自身GMOな所に居るし)だけど、普通に使って内容を書いてみた。もちろんステマ的なあれじゃないからねw
他にもVIPやTLS/SSLを設定したりといった、冗長化、セキュリティの要素も入ってくる事にはなるんだけど、サクッと使ってまずは試してみてその次に、そういう要素を入れていく事になるかなと。普通にこんな構成で自分は使うと思われ。あと、プロトコル的な互換性の解決にfluentdを使ったりするにしても、サクッとMQTTを使って動確してみたりWebSocketでブラウザからも確認できたりできて、わりと良い感じのよーな気がする。

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です

このサイトはスパムを低減するために Akismet を使っています。コメントデータの処理方法の詳細はこちらをご覧ください