Koshianを使ってセンサーデータのリアルタイム可視化・分析可能な構成を考えて作ってみたメモ。全体構成はこんな感じ。
– 全体構成
MQTTを使っても大丈夫で、MQTTでもOK。ただ、敢えて使わない。使わない理由は以下にて。
– 可視化の様子(Kibana)
温度のグラフ、device_id(データを送ってきたデバイス毎のID)、接続元IPを使った位置情報を表示している。
– やってみたいこと
1. センサーデータを可視化 -> es/kibana
2. リアルタイムが良い、センサーデータのストリームにも耐性がある -> fluentd
3. 大規模、並列化/スケールする -> es/fluentd/AP(golang)
4. 解析/分析も可能 -> fluentdからhadoopとかに流し込みでも可
– 使うもの
fluentd : ログ集約配送
elasticsearch : 検索用DB
kibana : 可視化
golang : gwとのやりとり用
ConoHA : クラウドのサーバ
KoshianはWeb系の人にはさーっぱり馴染みも無いと思うけど、安くてBTもサクッと使えるという素敵な端末。逆にモノをいじる方には、es(elasticsearch)、fluentd、kibanaとかナニソレ的な感じもするけど、かなり使われている感がする。特に可視化、リアルタイム、スケールするといった所では、導入検討はするはず。
– なぜMQTTを使わないか?
自分がよく使うMQTTは使いません。サーバはUbuntuで立てているから、mosquittoとか使えば簡単なんだけど。その場合、golangの口をMQTTに入れ替えればOK、それでも動くですね。GW(iphone)がpublisherで間にMQTT Brokerが挟まったgolang(subscriber)で取得といった感じで。まぁ簡単に動かせる。
ただ、なぜにMQTTでセンサーデータの配送をしないか?というと、MQTT broker経由でsubscriber側で取得するともちろんpub側のIPが分からない。pub/subでのメッセージ配送物なのでそういうもんだけど。
インターネット上を流れてくるなら発信者のIPは知りたいし、データ発信者(デバイス)の発信者情報(IP)を受け取り手側で処理はしたくなるというか、これみたいに簡単に地図上に表示したくなった時とかでも。まぁ、IPと位置情報は正確には関連性が無いんだけどね。
デバイス/GW側でGPS情報を送信データに付与すれば…というのはそうだけど、別に位置が知りたい訳ではなくIPが知りたい。逆にMQTT BrokerでIPをメッセージに付与…は無いし、Brokerだけで動くならそもそもpub/subの意味は無い。
ちなみに、IPのログとしての意味では、MQTT Broker側ではログとしてID/IP/配送先のID/IPは残しておく必要はあるはず。そうしないと、怪しいメッセージの配送、不正ID・ユーザの混入といった時にかなり厄介なことになるし、追跡手段として残しておくことになるかと。
そこまでしてMQTTか?といったらちょっと別の用途で使った方が好いと思われ。例えば、閉じた(宅内)NWとかバックエンドの配送では良いかなと。セキュリティ的に閉じていれば、その辺のことはある程度すっ飛ばして考えらえるから。
要はインターネット、Globalで利用する時、実際の運用・管理でどーするか?。MQTT Brokerも自分の管理で全て把握できるならそれでも良い気はするけど、この辺の解を持っている人がいたら教えて欲しい。
– td-agentの設定
ここではIPアドレスを追記している。センサーデータの増減には全く影響を受けないから素敵。
# GEOIPで位置情報・国コードの追加 <match debug.temp> type geoip geoip_lookup_key ipaddr <record> lat ${latitude["ipaddr"]} lon ${longitude["ipaddr"]} location_properties '{ "lat" : ${latitude["ipaddr"]}, "lon" : ${longitude["ipaddr"]} }' country ${country_code['ipaddr']} </record> tag store.${tag} </match> # elasticsearchに登録 <match store.debug.temp> type copy <store> type stdout </store> <store> type elasticsearch host localhost port 9200 logstash_format true logstash_prefix temp tag_key @log_name include_tag_key true flush_interval 10s </store> </match>
– golang側
golangはこんな感じ。サクッと書いて、すぐ使えるから好き。スケールさせたかったら並べればOK。
package main import ( "fmt" "log" "net/http" "strings" "time" "github.com/ant0ine/go-json-rest/rest" "github.com/t-k/fluent-logger-golang/fluent" ) // 受信データ用のstruct type SensorData struct { DeviceId string Temp string } func main() { logger, err := fluent.New(fluent.Config{FluentPort: 24224, FluentHost: "ip addr"}) if err != nil { fmt.Println(err) } defer logger.Close() // GW(iphone)からは、RESTで/put/を叩く。 api := rest.NewApi() api.Use(rest.DefaultDevStack...) router, err := rest.MakeRouter( rest.Post("/put/", func(w rest.ResponseWriter, req *rest.Request) { SensorData := SensorData{} err := req.DecodeJsonPayload(&SensorData) log.Println(req.Body) if err != nil { rest.Error(w, err.Error(), http.StatusInternalServerError) return } // このtagはfluentの方と合わせておく tag := "debug.temp" t := time.Now() // 温度, 接続元IP, device-id(端末からくる)を送信 // センサーデータが増えた時はこの辺で追加。ここでは温度のみ。 data := map[string]string{ "temp": SensorData.Temp, "ipaddr": strings.Split(req.RemoteAddr, ":")[0], "device-id": SensorData.DeviceId, } // fluentd行き logger.PostWithTime(tag, t, data) w.WriteJson("OK") }), ) if err != nil { log.Fatal(err) } api.SetApp(router) log.Fatal(http.ListenAndServe(":8080", api.MakeHandler())) }
– その他
es/kibanaの設定方法はググれば書いてある。GW(iphone)からはBLEでKoshianから値を取得して、RESTで叩くだけでOK。とっても簡単。MQTTを使う時は、PodでMQTTKitを入れてすぐ使えるから、それも問題ないでしょう。あと認証をしたくなったらMariaDBでも立てて、そこで管理してあげる感じ。暗号化はTLSで証明書でも突っ込んでおいて。
若干身内感はするけど、ConoHaイケてる。客観視で使ってみて、性能(CPU/SSD)、価格的にもこれは良いと普通に思った。
はずしていたら申し訳ないのですが、GWでIPアドレスをpayloadの中に埋め込むのはだめなのでしょうか。LTEなどの環境下でも、 http://ifconfig.me/ などを使えば外部接続用のIPアドレスを得ることは出来ると思います。
そうですね。自身の外部アドレスを知るのは外のサーバと疎通すれば分かるので、それをGWで埋め込むのはアリとは思いました。ちょっと意味合いはあれですが、STUNとかでも。ただ、実際のところLTEとかでもですが、外部アドレスが変わったりするのを考えると、都度確認するのもちょっと…って感じはしますよね。