MQTT on ConoHa Cloud

ConoHaにMQTT Mosquittoのテンプレートが追加された。このテンプレート、通常のMQTT 1883だけじゃなくWebSocketも11883で使える。つまり、ブラウザとMQTTを使う端末との接合点としても使える。端末とネットだけじゃなく、ブラウザ(WebSocket)というのも管理的な所やUI、使い勝手的な所では必要になってくると思われ。

まず使ってみる

サーバ追加でMosquittoテンプレを選んで起動するだけ。1.3円/hourで全然十分、サクッと試してみるだけだし。
mosquitto

そして追加したら、今度は端末側。Particle Photonを使うことに。Photonに温度センサ、照度センサを付けた状態がこんな感じ。ソースは下に書いてあるから、コピペすれば使える。もちろん、Photon以外にRaspberry Pi、Arduino Ethernetシールドとか、普通にMQTTが話せる端末だったら何でもOK。
IMG_0889

そしたら、今度はWeb側。素敵な方がMQTT/WebSocketを公開されていて、それを使わせて頂くことに。Addressにさっき追加したMosquittoサーバのIPとWebSocket用のポート11883を指定して…こんな感じで。


ConnectしてSubscribeすると、温度と照度のセンサーデータが流れてくるのと、PublishもLEDの色を指定すると端末側の色が変わる。とっても簡単、MosquittoテンプレートにWebSocket付きなのもあって、サーバを追加してあげるとすぐWeb上で確認出来る。

 

Elastic/Kibanaと一緒に使ってみる

ちょっとクラウド的な真面目な構成を考えると次のような感じの図になる。これはセンサーデータをクラウド側でストア、可視化したり分析、そしてWebから端末コントロールをWebSocketで行うような時の構成。恐らく、多くのネットワークに繋がる端末が出てきたとき、このような構成になると考えられる。間にVIPが入って冗長化とかは抜いているけど、その辺も込みでこんな感じに。


pic
ここで重要なパーツになるのがFluentdになる。MQTTを一旦、Fluentdで受けてバックエンドに流すことになる。
これは必ずしも多くのデバイスがMQTTを話す訳では無いという経験から。例えば、JSON、HTTP(GET)、バイナリ直接だったり、UDP(CoAP)という事もあり得る。「プロトコルを変えたいです」->「ファームの入れ替えが。。。」というのは全然ある事で、システム側にProtocol Translationを担う時が必ずやってくると思われ。そこで、実績もプラグインも豊富にあって機能追加も楽だし冗長化も簡単なfluentdは必然的に使った方が良い。多種のデバイス連携やオレオレプロトコルが入り込んで来ることはあり得るからね。

Elastic/Kibana

データの可視化でElatic/Kibanaを使う。センサーからのデータを検索、グラフ化して見るための物で、うまく動くとこんな感じでほぼリアルタイムで照度と温度がグラフで見れる。

kiba2

以下は、Elastic/Kibanaのインストール。MQTTを入れたサーバでも良いし、別にサーバを立ててもどこでもOK。ただし環境はUbuntuを想定している。

# es install
wget -qO - https://packages.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -
echo "deb http://packages.elastic.co/elasticsearch/2.x/debian stable main" | sudo tee -a /etc/apt/sources.list.d/elasticsearch-2.x.list
sudo apt-get update && sudo apt-get install elasticsearch

# センサーデータ用 mapping追加
curl -XPUT http://localhost:9200/_template/sensor-template_1 -d '
{
    "template": "sensor-log-*",
    "mappings": {
      "_default_": {
        "properties": {
          "temp": {
            "type": "float",
            "index": "analyzed"
          },
          "lux": {
            "type": "float",
            "index": "analyzed"
          }
        }
      }
    }
}
'
# kibanaダウンロード&起動
wget https://download.elastic.co/kibana/kibana/kibana-4.4.2-linux-x64.tar.gz
tar -zxvf kibana-4.4.2-linux-x64.tar.gz
kibana-4.4.2-linux-x64/bin/kibana 

Fluentd

次はFluentd。ここではMosquittoからデータを取得して、Elasticにメッセージをルーティングする感じになる。Mosquittoへの接続と、Elasticへのデータ送信部分を設定する。ここでもサーバはUbuntuを想定している。

# fluentd 
curl -L https://toolbelt.treasuredata.com/sh/install-ubuntu-trusty-td-agent2.sh | sh
/opt/td-agent/embedded/bin/gem install fluent-plugin-mqtt-io

# 設定 /etc/td-agent/td-agent.conf に追加
<source>
 type mqtt
 # MosquittoサーバのIPを指定
 host XXX.XXX.XXX.XXX
 port 1883
 format json
 @label @MQTT_OUT
</source>

<label @MQTT_OUT>
 type copy
 <match sensor.**> 
 @type elasticsearch
 # ElasticのIP(localhostならそれでOK)を指定
 host localhost
 port 9200

 type_name sensor-log
 logstash_format true
 logstash_prefix sensor-log
 logstash_dateformat %Y%m

 buffer_type memory
 buffer_chunk_limit 10m
 buffer_queue_limit 10
 flush_interval 1s
 retry_limit 16
 retry_wait 1s
 </match>
</label>

# fluentdを再起動
/etc/init.d/td-agent restart

端末からデータを投げる

ここではParticle Photonを使う。別にPhotonじゃなくても、Raspbeerry PI、Arduino、iPhone/Android…別に何でも良いんだけど、手元にあったというだけの理由で。まぁ、あとParticle上でMQTT作ったりするというのもあったり。コードはこんな感じで、Arduinoとほぼ同じように書ける。
MosquittoへはMQTT(json形式のメッセージ)で送信する。

ちなみにParticleのWeb IDEのビルド画面はこんな感じで、Web上(インターネット側)から端末にプログラムをFlashする。現時点でPhoton上のMQTTは1600アプリ、CoAPはたったの11だけ使われている。この差はLibraryで提供している側としては、わりと泣けるというかUDPの方が軽いのに…というのは常々思っていたりする。まぁ、好きな方をつかえば良い。
particle

#include "MQTT/MQTT.h"

void callback(char* topic, byte* payload, unsigned int length);
MQTT client("XXX.XXX.XXX.XXX", 1883, callback);

// Subscribeしたメッセージのコールバック
void callback(char* topic, byte* payload, unsigned int length) {
    char p[length + 1];
    memcpy(p, payload, length);
    p[length] = NULL;
    String message(p);

    if (message.equals("RED"))    
        RGB.color(255, 0, 0);
    else if (message.equals("GREEN"))    
        RGB.color(0, 255, 0);
    else if (message.equals("BLUE"))    
        RGB.color(0, 0, 255);
    else    
        RGB.color(255, 255, 255);
    delay(1000);
}


void setup() {
    RGB.control(true);
    
    // connect to the server
    client.connect("sparkclient");

    // publish/subscribe
    if (client.isConnected()) {
        client.subscribe("led/");
    }
}

void loop() {
    // MQTT用
    if (client.isConnected())
        client.loop();
    
    // 温度取得
    int tempValue = 0;
    for (int i = 0; i < 100; i++) {
        tempValue += analogRead(0);
    }
    tempValue = tempValue / 100;
    float v = (float(tempValue)  / 1024) * 3.3;
    float temp = (v - 0.424) / 0.08725;
    
    // 照度取得
    int luxValue = 0;
    for (int i = 0; i < 100; i++) {
        luxValue += analogRead(1);
    }
    luxValue = luxValue / 100;
    
    // MQTTにjson形式で配信
    client.publish("sensor/", String("{\"temp\":\"") + String(temp) + String("\", \"lux\" : \"") + String(luxValue) + String("\"}"));

    delay(1000);
}

以上で全部おわり。端末が無くても手動でmosquitto_pub/subコマンドで、直接mosquittoに接続して確認できる。

# センサーデータの送信
mosquitto_pub -h XXX.XXX.XXX.XXX -t sensor/ -m '{"temp" : "22.0303", "lux" : "703" }'

# LEDの色を赤色に変えてみる
mosquitto_pub -h XXX.XXX.XXX.XXX -t led/ -m 'RED'

# 受信して確認してみる
mosquitto_sub -h XXX.XXX.XXX.XXX -t sensor/

ConoHaのmosquittoテンプレは身内なサービス(自分自身GMOな所に居るし)だけど、普通に使って内容を書いてみた。もちろんステマ的なあれじゃないからねw
他にもVIPやTLS/SSLを設定したりといった、冗長化、セキュリティの要素も入ってくる事にはなるんだけど、サクッと使ってまずは試してみてその次に、そういう要素を入れていく事になるかなと。普通にこんな構成で自分は使うと思われ。あと、プロトコル的な互換性の解決にfluentdを使ったりするにしても、サクッとMQTTを使って動確してみたりWebSocketでブラウザからも確認できたりできて、わりと良い感じのよーな気がする。

iOSのIPv4/v6 DNSの挙動

iOS9からIPv6対応との事で、こんな事が起きるようで。

Apple、iOS 9とOS X 10.11 El Capitan BetaでIPv6を優先し使用するよう「Happy Eyeballs」のアルゴリズムを変更。
IPv4接続で遅延が発生?!

これが本当なのか実験してみた。環境は以下の通り。

pic

closedなLAN宅内環境で、IPv6にはAU光で実際に利用できるGlobal IPv6が/64で降ってきてそれを利用する。Raspberry Pi2にはbind9とiPhoneからアクセス用のNginXを立てて、IPv4/IPv6の名前解決はワイルドカードで行っておく。そして適当に作ったiPhone用のアプリからNginXのサーバにアクセスする際の名前は…

# IPv4はサフィックスにipv4と入れる
10001.ipv4.test.niisato.net
10002.ipv4.test.niisato.net
10003.ipv4.test.niisato.net

# IPv6はサフィックスにipv6と入れる
60001.ipv6.test.niisato.net
60002.ipv6.test.niisato.net
60003.ipv6.test.niisato.net

といった感じでプレフィックスを適当な数字でインクリメントする事で、都度DNSにアクセスを行うようにして、IPv4/v6のアクセスで常に名前解決を行いNginXにアクセスするように出来る。こうする事で、[DNSへの問い合わせ->HTTPアクセス]のループを1000回シーケンシャルにかまして両者の違いが分かるはず…だった。

だった…というのは、実際に試すそれ以前にRaspberry Pi上でDNSのパケットをキャプチャしたらもうそれ以前の問題だった。以下はIPv4へのアクセス時で、tcpdumpの結果をWireSharkで見たもの。

スクリーンショット 2016-03-07 23.22.30

そう…25msの誤差もへったくれもなく、AAAA(IPv6)レコードを常に有線して名前解決が走る。そもそもね…AAAA(IPv6)優先で名前解決をしている以上、25ms分IPv4へのアクセスを遅くするもヘッタクレもなくIPv6があればそっちが早くなるというのは当然の帰結というわけですな。closedなLAN環境だとあれだけど、ベストエフォートなインターネット環境だったら、ネットワーク時間も影響してIPv6の方が早くなるのも理解できる。

ちなみにこれはIPv6の場合、

スクリーンショット 2016-03-07 23.26.55

まだIPv4/v6と名前解決のリクエストは関係ないから、当然こっちでもAAAAが優先して問い合わせが行われる。ぶっちゃけ、これで大きな時間的な差が生まれるとは考えにくい。というのは、こんなオモシロユニークな名前解決を連発するという事は通常は殆どない&キャッシュされるのと、最初の問い合わせで25msの遅延が挿入されると言っても…分からんべ(w)とは思うし。さらに国内のiPhoneでIPv6使ってるぜヒャッハーな人とかって…一般ユースだと無いもん(キャリア対応してないし)。ただ、AAAAが優先されて一発送られているというのは気に止めておいた方が良いと思われ。