InfluxDB Cloudへデータを送信&可視化してみた
こんにちは。CX事業本部Delivery部のakkyです。
IoTセンサーのデータベースとして、AWS IoT SiteWiseやAzure Cosmos DBを使う方法をご紹介してきました。Developers.IOにはAmazon Timestreamなどを使う方法も紹介されていますが、ほとんどが3大クラウドサービスが提供するDBでした。
今回は、InfluxDB Cloudへデータを送信し、可視化してみましたのでご紹介します。
InfluxDB
InfluxDBはInfluxData社が開発しているオープンソースの時系列データベースです。サーバはセルフホスティングするか、フルマネージドのInfluxDB Cloudというサービスがあります。
データの書き込みには、HTTPエンドポイントのほか、MQTTや各種サービスのメトリクスの取得に対応しています。クエリにはFluxまたはInfluxQLというクエリ言語を使います。Fluxは多機能で、ある程度の計算はこれだけでできてしまいます。 クエリは独自言語となりますが、各種プログラミング言語で使えるライブラリが用意されています。
可視化ツールは組み込みのものが用意されていますが、Grafanaを使うこともできます。もちろんAmazon Managed Grafanaも対応しています。
今回はInfluxDB Cloudの無料プランを使ってみました。
データ構造
InfluxDBでは、NoSQLのようなスキーマレスのほか、スキーマを強制することができます。センサやサーバのメトリクスを入れるユースケースではスキーマありで使うことが多いかもしれません。
バケットの中に各測定値の行が入る構造はRDBと似ていますが、各行のデータとして、必須の測定名、インデックスの対象になるタグ、インデックス対象外のフィールド、そして必須の時刻データがあるという形式がInflexDBの特徴です。
測定名にはデータの種類を説明する名前、タグにはデータの取得元などを識別するための値、フィールドには測定値を入れるのが想定した使い方のようです。 詳しくは公式ドキュメントを参照してください。
Pythonからの書き込み
Pine A64上のPythonで動作確認したコードから、InfluxDBの部分を抜き出しました。 Pythonのinfluxdbライブラリが必要です。
from datetime import datetime import time from influxdb_client import InfluxDBClient, Point, WritePrecision from influxdb_client.client.exceptions import InfluxDBError influxdb_url = "https://XXXXXXX.aws.cloud2.influxdata.com" token = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" org = "[email protected]" bucket = "BucketName" points = [] BATCH_CNT = 6 class BatchingCallback(object): def success(self, conf, data: str): #print(f"Written batch: {conf}, data: {data}") pass def error(self, conf, data: str, exception: InfluxDBError): print(f"Cannot write batch: {conf}, data: {data} due: {exception}") def retry(self, conf, data: str, exception: InfluxDBError): print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}") def func(): frequency = 50.0 voltage = 100.0 point = Point("ac") \ .tag("place", "home") \ .field("frequency", frequency) \ .field("voltage", voltage) \ .time(datetime.utcnow(), WritePrecision.NS) points.append(point) if len(points) >= BATCH_CNT: with InfluxDBClient(url=influxdb_url, token=token, org=org) as influxdb_client: callback = BatchingCallback() with influxdb_client.write_api(success_callback=callback.success, error_callback=callback.error, retry_callback=callback.retry) as write_api: write_api.write(bucket, org, points) points.clear()
Pythonでは、測定値はPointというクラスで生成します。今回は周波数と電圧を測定するセンサーを例としたので、前述のとおり、measurement(コンストラクタの引数)にはac、データを識別するための値をtagとしてplaceにセットし、測定値をfieldとしてfrequencyとvoltageをセットしました。 HTTPエンドポイントに送信するので、いくつかのデータをまとめてwrite_api.write()するコードになっています。
データの可視化
InfluxDB Cloudへログインし、Data Explorerを開きます。FROMでBucketを選び、MEASUREMENTから測定値を選択し、そのほかのタグで絞り込んでからSUBMITをクリックするとグラフが表示されます。Grafanaのように表示範囲も選択できます。