golangでセンサーデータをMQTTで受けてloggerする

Go その2 Advent Calendar 2015の記事です。

golangで複数のセンサーからのデータをMQTTで受けてfluentdを使ってロギング、elastic/kibana, splunkで可視化します。これは実際にET展のマクニカさんのMpressionによるIoTのPoC(Proof of Concept)にて展示してきた内容の詳細です。実際の展示中の内容はこちらです。会場と世界各国のマクニカさんのオフィスにセンサーを置いて頂き、色んなところからモニタリングしています。

構成
以下の図のような構成です。色んなセンサーを使っています。MQTTを使えるセンサーがあるので、MQTTサーバとしてはmosquittoを使っています。
article_header_library_113613_pic02__1

送信データフォーマット
センサーからのデータは基本的には、温度、湿度、照度を扱っています。他にも色々取れるのですが(加速度、地磁気、音声, IR…etc)基本データとして3個を扱う事にします。MQTTで各センサーデバイスからデータを送信するのですが、その時のデータフォーマットは以下にしています。(一部抜粋です)

DeviceId   string : センサデバイスに振られているID
DeviceType string : TI/BCM/Kivo...etc センサーのタイプ
Temp       string : 温度
Lux        string : 照度
Humidity   string : 湿度

デバイスIDはシステム側で管理して、存在するものだけをloggingしたり、検索時の引き当てで入れておいた方が良いでしょう。センサーのタイプはシステム側でIDから引ければ問題無しです。温度、照度、湿度はそのままですね。

サーバ側
こちらはMQTTで受け側です。センサデータを受けたらfluentdに流しています。fluentdで直接受けて流してもOKですが、ここではgolangで受けてfluentdに投げてみましょう。fluentd, mqttのIPとか設定すれば動くです。

package main

import (
    "encoding/json"
    "flag"
    "log"
    "fmt"
    "time"

    MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git"
    "github.com/t-k/fluent-logger-golang/fluent"
)

// 受信するjsonデータ
type SensorData struct {
    DeviceId   string `json:"DeviceId"`
    DeviceType string `json:"DeviceType"`
    Temp       string `json:"Temp"`
    Lux        string `json:"Lux"`
    Humidity   string `json:"Humidity"`
}

// デフォルトハンドラ(Subscribe時にトピック毎に指定も可能)
var defaultMTTTHandler MQTT.MessageHandler = func(client *MQTT.Client, message MQTT.Message) {
    fmt.Printf("Received MQTT message on topic: %s\n", message.Topic())
    fmt.Printf("Message: %s\n", message.Payload())

    sensorData := SensorData{}
    json.Unmarshal([]byte(message.Payload()), &sensorData)

    logger, err := fluent.New(fluent.Config{FluentPort: 24224, FluentHost: "host.name"})
    if err != nil {
        fmt.Println(err)
    }
    defer logger.Close()

    tag := "sensor.data"
    t := time.Now()

    // ちょっと加工用
    data := map[string]string{
        "temperature":      sensorData.Temp,
        "lux":              sensorData.Lux,
        "humidity":         sensorData.Humidity,
        "device-type":    sensorData.DeviceType,
        "device-id": sensorData.DeviceId,
    }
    logger.PostWithTime(tag, t, data)
    log.Println("Send to fluentd")
}

func main() {
    opts := MQTT.NewClientOptions().AddBroker("tcp://host.name:1883").SetClientID("sensorlogger")
    opts.SetDefaultPublishHandler(defaultMTTTHandler)

    client := MQTT.NewClient(opts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
       panic(token.Error())
    }

    qos := flag.Int("qos", 1, "The QoS to subscribe to messages at")
    if token := client.Subscribe("/sensor/put", byte(*qos), nil); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }

    for {
        time.Sleep(1 * time.Second)
    }
    client.Disconnect(250)
}

センサー側(TI/RPi2)
こちらはセンサー側のRaspberry PI、TI(nodejs)側。SensorTagとBLEで通信して、データを取得したらMQTTで投げているだけですね。

var SensorTag = require('sensortag');
var mqtt    = require('mqtt');
var request = require('request-json');
var mqttclient  = mqtt.connect('mqtt://mqtt server');

SensorTag.discoverByAddress("sensor tag BTのMacアドレス", function(tag) {
    tag.on('disconnect', function() {
        console.log('disconnected!');
        process.exit(0);
    });

    function connectAndSetUpMe() {
        console.log('connectAndSetUp');
        tag.connectAndSetUp(enableIrTempMe);
    }

    function enableIrTempMe() {
        console.log('enableSensor');
        tag.enableHumidity(notifyMe);
        tag.enableLuxometer(notifyLux);
    }

    function notifyLux() {
    }

    function notifyMe() {
        tag.notifyHumidity(listenForHumidity);

        tag.setHumidityPeriod(1000, function(error) {
            if (error != null)
                console.log(error);
        });
    }

    function listenForHumidity() {
        tag.on('humidityChange', function(temperature, humidity) {
            tag.readLuxometer(function(error, lux) {
                var data = {
                    DeviceId: "unique device id",
                    DeviceType: "TI SensorTag",
                        Temp: temperature.toFixed(1),
                         Lux: lux.toFixed(1),
                    Humidity: humidity.toFixed(1)
                };
                mqttclient.publish('/sensor/put', JSON.stringify(data, null, "\t"), {qos : 1});
            });
        });
    }
    connectAndSetUpMe();
});

結果
実際の可視化の様子はこちらです。
elastic/kibana
sp5555

splunk
sp222

Pub/Subのライブラリは色々あって、なぜにGolangを使ったか?というと、とっかかり易くて便利というのもあるけど、センサーデータはストリームで恐ろしい勢いで機械的にデータが飛んできたり、IPは変えられるけどエンドポイントは固定のパスとかあったりする可能性を考えて、並列部分を簡単に書けて、endpointづつ小さいAPIでサクサク作れるgolangで良いかなと。


コメントを残す

メールアドレスが公開されることはありません。 * が付いている欄は必須項目です

このサイトはスパムを低減するために Akismet を使っています。コメントデータの処理方法の詳細はこちらをご覧ください