Koshianを使ってセンサーデータのリアルタイム可視化・分析可能な構成を考えて作ってみたメモ。全体構成はこんな感じ。
– 全体構成

MQTTを使っても大丈夫で、MQTTでもOK。ただ、敢えて使わない。使わない理由は以下にて。
– 可視化の様子(Kibana)

温度のグラフ、device_id(データを送ってきたデバイス毎のID)、接続元IPを使った位置情報を表示している。
– やってみたいこと
1. センサーデータを可視化 -> es/kibana
2. リアルタイムが良い、センサーデータのストリームにも耐性がある -> fluentd
3. 大規模、並列化/スケールする -> es/fluentd/AP(golang)
4. 解析/分析も可能 -> fluentdからhadoopとかに流し込みでも可
– 使うもの
fluentd : ログ集約配送
elasticsearch : 検索用DB
kibana : 可視化
golang : gwとのやりとり用
ConoHA : クラウドのサーバ
KoshianはWeb系の人にはさーっぱり馴染みも無いと思うけど、安くてBTもサクッと使えるという素敵な端末。逆にモノをいじる方には、es(elasticsearch)、fluentd、kibanaとかナニソレ的な感じもするけど、かなり使われている感がする。特に可視化、リアルタイム、スケールするといった所では、導入検討はするはず。
– なぜMQTTを使わないか?
自分がよく使うMQTTは使いません。サーバはUbuntuで立てているから、mosquittoとか使えば簡単なんだけど。その場合、golangの口をMQTTに入れ替えればOK、それでも動くですね。GW(iphone)がpublisherで間にMQTT Brokerが挟まったgolang(subscriber)で取得といった感じで。まぁ簡単に動かせる。
ただ、なぜにMQTTでセンサーデータの配送をしないか?というと、MQTT broker経由でsubscriber側で取得するともちろんpub側のIPが分からない。pub/subでのメッセージ配送物なのでそういうもんだけど。
インターネット上を流れてくるなら発信者のIPは知りたいし、データ発信者(デバイス)の発信者情報(IP)を受け取り手側で処理はしたくなるというか、これみたいに簡単に地図上に表示したくなった時とかでも。まぁ、IPと位置情報は正確には関連性が無いんだけどね。
デバイス/GW側でGPS情報を送信データに付与すれば…というのはそうだけど、別に位置が知りたい訳ではなくIPが知りたい。逆にMQTT BrokerでIPをメッセージに付与…は無いし、Brokerだけで動くならそもそもpub/subの意味は無い。
ちなみに、IPのログとしての意味では、MQTT Broker側ではログとしてID/IP/配送先のID/IPは残しておく必要はあるはず。そうしないと、怪しいメッセージの配送、不正ID・ユーザの混入といった時にかなり厄介なことになるし、追跡手段として残しておくことになるかと。
そこまでしてMQTTか?といったらちょっと別の用途で使った方が好いと思われ。例えば、閉じた(宅内)NWとかバックエンドの配送では良いかなと。セキュリティ的に閉じていれば、その辺のことはある程度すっ飛ばして考えらえるから。
要はインターネット、Globalで利用する時、実際の運用・管理でどーするか?。MQTT Brokerも自分の管理で全て把握できるならそれでも良い気はするけど、この辺の解を持っている人がいたら教えて欲しい。
– td-agentの設定
ここではIPアドレスを追記している。センサーデータの増減には全く影響を受けないから素敵。
# GEOIPで位置情報・国コードの追加
<match debug.temp>
type geoip
geoip_lookup_key ipaddr
<record>
lat ${latitude["ipaddr"]}
lon ${longitude["ipaddr"]}
location_properties '{ "lat" : ${latitude["ipaddr"]}, "lon" : ${longitude["ipaddr"]} }'
country ${country_code['ipaddr']}
</record>
tag store.${tag}
</match>
# elasticsearchに登録
<match store.debug.temp>
type copy
<store>
type stdout
</store>
<store>
type elasticsearch
host localhost
port 9200
logstash_format true
logstash_prefix temp
tag_key @log_name
include_tag_key true
flush_interval 10s
</store>
</match>
– golang側
golangはこんな感じ。サクッと書いて、すぐ使えるから好き。スケールさせたかったら並べればOK。
package main
import (
"fmt"
"log"
"net/http"
"strings"
"time"
"github.com/ant0ine/go-json-rest/rest"
"github.com/t-k/fluent-logger-golang/fluent"
)
// 受信データ用のstruct
type SensorData struct {
DeviceId string
Temp string
}
func main() {
logger, err := fluent.New(fluent.Config{FluentPort: 24224, FluentHost: "ip addr"})
if err != nil {
fmt.Println(err)
}
defer logger.Close()
// GW(iphone)からは、RESTで/put/を叩く。
api := rest.NewApi()
api.Use(rest.DefaultDevStack...)
router, err := rest.MakeRouter(
rest.Post("/put/", func(w rest.ResponseWriter, req *rest.Request) {
SensorData := SensorData{}
err := req.DecodeJsonPayload(&SensorData)
log.Println(req.Body)
if err != nil {
rest.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// このtagはfluentの方と合わせておく
tag := "debug.temp"
t := time.Now()
// 温度, 接続元IP, device-id(端末からくる)を送信
// センサーデータが増えた時はこの辺で追加。ここでは温度のみ。
data := map[string]string{
"temp": SensorData.Temp,
"ipaddr": strings.Split(req.RemoteAddr, ":")[0],
"device-id": SensorData.DeviceId,
}
// fluentd行き
logger.PostWithTime(tag, t, data)
w.WriteJson("OK")
}),
)
if err != nil {
log.Fatal(err)
}
api.SetApp(router)
log.Fatal(http.ListenAndServe(":8080", api.MakeHandler()))
}
– その他
es/kibanaの設定方法はググれば書いてある。GW(iphone)からはBLEでKoshianから値を取得して、RESTで叩くだけでOK。とっても簡単。MQTTを使う時は、PodでMQTTKitを入れてすぐ使えるから、それも問題ないでしょう。あと認証をしたくなったらMariaDBでも立てて、そこで管理してあげる感じ。暗号化はTLSで証明書でも突っ込んでおいて。
若干身内感はするけど、ConoHaイケてる。客観視で使ってみて、性能(CPU/SSD)、価格的にもこれは良いと普通に思った。
はずしていたら申し訳ないのですが、GWでIPアドレスをpayloadの中に埋め込むのはだめなのでしょうか。LTEなどの環境下でも、 http://ifconfig.me/ などを使えば外部接続用のIPアドレスを得ることは出来ると思います。
そうですね。自身の外部アドレスを知るのは外のサーバと疎通すれば分かるので、それをGWで埋め込むのはアリとは思いました。ちょっと意味合いはあれですが、STUNとかでも。ただ、実際のところLTEとかでもですが、外部アドレスが変わったりするのを考えると、都度確認するのもちょっと…って感じはしますよね。