- ソリューションアーキテクト
- プロジェクトマネージャー
- テクニカルオペレーション
- Other occupations (4)
- Development
- Business
- Other
技術3課の森です。
春の陽気が来たかと思うと雪が降ったりと難しい季節。
新社会人や入学の季節ということで、引っ越しがピークになってきたのではないでしょうか。
引っ越しと言えば、データを送り出すということで、今回は、MQTTでAWS IoTにPublishしたデータをKinesis Streamsに流してみることをしてみました。
アジェンダ
今回はこのようなものを作っていきます。
- AWS IoTの作成
- Amazon Kinesis Streamsの作成
- IAMロールの作成
- MQTTクライアントの作成
MQTTクライアントからQoS1でデータを送信し、AWS IoTで受けた後、アクションでKinesis Streamsへ流すような感じです。
環境構築
必要なAWSリソースの作成を行います。赤字にしたところを作っていきます。
- AWS IoTの作成
- Amazon Kinesis Streamsの作成
- IAMロールの作成
- MQTTクライアントの作成
AWS IoT - Thingの作成
1.まずはマネージメントコンソールから「IoT Core」を選択し、「管理」をクリックします。
2.「モノ」の「作成」ボタンをクリックします。
3.「単一のモノを作成する」ボタンをクリックします。
4.「名前」を入力して、モノのタイプを作成する必要があるので、「タイプの作成」ボタンをクリックします。
5.「名前」を入力して、「モノのタイプの作成」ボタンをクリックします。(今回モノのタイプは特に設定しません。)
6.「モノの作成」画面で、「モノのタイプ」が先程作成したものになっていることを確認します。
7.それ以外の項目には何も入れずに「次へ」をクリックします。
8.AWS IoTと通信するための証明書の作成をします。今回はさっと作るため、「1-Click 証明書作成(推奨)」の「証明書の作成」ボタンをクリックします。
9.後で、プログラムでも利用する証明書のダウンロードを行います。「このモノの証明書」「パブリックキー」「プライベートキー」「AWS IoTのルートCA」をダウンロードし、「有効化」ボタンをクリックした後、「ポリシーのアタッチ」ボタンをクリックします。
10.作成した証明書をポリシーにアタッチします。今回は既に作成されている「PubSubToAnyTopic」を利用します。「PubSubToAnyTopic」を選択し、「モノの登録」ボタンをクリックします。
11.「モノ」に作成した「DeviceForBlog」が作成されていることを確認します。
12.ルールの作成を行います。左側のフレームの「ACT」をクリックし、右側の「作成」ボタンをクリックします。
13.ルールの「名前」を入力します。
14.プログラム的にデータは作成していますが、今回は作成するデータを全て表示するようにするため、「属性」に情報を入力し、「トピックフィルター」にも情報を入力します。「条件」は特に指定なしで全てを表示します。
15.AWS IoTで受けた後にデータを流すことを今回は行うため、アクション指定が必要になります。「アクションの追加」ボタンをクリックします。
16.Kinesis Streamsを利用するため、「Amazon Kinesisストリームにメッセージを送信する」を選択し、「アクションの設定」ボタンをクリックします。
17.現時点では、Kinesis Streamsを作成していないので、「新しいリソースを作成する」ボタンをクリックします。
18.新しいタブが表示され、Amazon Kinesisの画面が表示されます。その中で、「Kinesis ストリームの作成」画面に遷移していますので、情報を入力していきます。
「Kinesis ストリームの名前」を入力し、今回は複数のシャードを利用したいため、「シャード数」を「2」にして、「Kinesis ストリームの作成」ボタンをクリックします。
19.作成したKinesis Streamsが作成されていることを確認します。
20.AWS IoTのアクションの設定画面に戻り、ストリーム名の更新ボタンをクリックした後、先程作成した「ストリーム名」を選択し、「パーティションキー」(今回は、ユニークになるように「newuuid()」関数を利用しています)を入力し、Kinesisストリームに設定するロールを新たに作成するために、「新しいロールの作成」をクリックします。
21.「IAMロール名」を入力するフィールドが出てくるので、名前を入力して、「新しいロールの作成」ボタンをクリックします。
22.先程入力した「IAMロール名」を選択し、「アクションの追加」ボタンをクリックします。
23.作成したKinesis Streamsになっていることを確認し、「ルールの作成」ボタンをクリックします。
24.作成したルールが表示されていることを確認します。
MQTTクライアント作成
MQTTクライアントを作成していきます。Python3.6を使ってコードの作成をしていきます。といっても、既に作成したものです。 mqtt-pub_01.pyをサンプルとして記載します。mqtt-pub_01.pyとmqtt-pub_02.pyの違いはGZファイルの名前だけです。 異なるテキストを用意し、GZIPで圧縮して下さい。
#!/usr/bin/python
# -*- coding: utf-8 -*-
#mqtt-pub_01.py
import paho.mqtt.client as mqtt
import ssl
import time
import json
import base64
from datetime import datetime
# settings
deviceplace = 'device_00'
roomname = 'room_00'
# AWS IoT settings
## マネージメントコンソール→AWS IoT→設定→カスタムエンドポイント にあるエンドポイント名をコピー
host = '<<endpoint>>.iot.ap-northeast-1.amazonaws.com' # AWS IoT Endpoint
## ポート番号は以下、固定
port = 8883 # port
## AWS IoTで作成した証明書
cacert = './cert/rootCA.pem' # root ca
clientCert = './cert/<<id>>-certificate.pem.crt' # certificate
clientKey = './cert/<<id>>-private.pem.key' # private key
## AWS IoTで利用するトピック名
topic = 'deviceforblog/%s' % roomname # topic
counter = 0
def on_connect(client, userdata, flags, respons_code):
print('Connected')
# AWS IoTと接続する関数
def sensing():
while True:
data = {}
data['bindata'] = get_bindata()
data['bindate'] = datetime.now().strftime("%Y/%m/%d %H:%M:%S")
publish(data)
time.sleep(1)
# バイナリデータの読み込み
def get_bindata():
#create the bin data
##送信するバイナリデータを利用
##mqtt-pub_01.pyの場合はtestdata01.gz
##mqtt-pub_02.pyの場合はtestdata02.gz
bindata = open('testdata01.gz', 'rb').read()
#base64 encoding
bindata_base64 = base64.b64encode(bindata).decode('utf-8')
#return bin data
return bindata_base64
# データをPublish
def publish(data):
data['place'] = deviceplace
global counter
counter = counter + 1
data['counter'] = counter
print("put record:" + str(counter))
#第一引数: AWS IoT トピック
#第二引数: JSONデータ
#第三引数: QoS(今回はQoS1、QoS0の場合は省略可)
client.publish(topic, json.dumps(data, ensure_ascii=False), 1) # publish
#以下、メイン
if __name__ == '__main__':
client = mqtt.Client(protocol=mqtt.MQTTv311)
# certifications
client.tls_set(cacert,
certfile=clientCert,
keyfile=clientKey,
tls_version=ssl.PROTOCOL_TLSv1_2)
client.tls_insecure_set(True)
# callback
client.on_connect = on_connect
# port, keepalive
client.connect(host, port=port, keepalive=60)
client.loop_start()
sensing()
フォルダ構成
AWSIoT2Kinesis/
├── cert
│ ├── cd74dc100b-certificate.pem.crt
│ ├── cd74dc100b-private.pem.key
│ ├── cd74dc100b-public.pem.key
│ └── rootCA.pem
├── testdata01.gz
├── testdata02.gz
├── mqtt-pub_01.py
├── mqtt-pub_02.py
└── requirements.txt
プログラムの実行
pythonコマンドを利用して、実行します。 今回は、2つプログラムを実行しますので、ターミナルを2つ立ち上げて実行して下さい。 1つ目のターミナルでは、「python mqtt-pub_01.py」を実行すると以下のようなメッセージが出力されます。
$ python mqtt-pub_01.py
Connected
put record:1
put record:2
2つ目のターミナルでも、「python mqtt-pub_02.py」を実行すると以下のようなメッセージが出力されます。
$ python mqtt-pub_02.py
Connected
put record:1
put record:2
シャードに入った情報を数える
実行した結果を確認していきます。一定の時間(今回は約20分程度)が経過した時、ターミナルで実行しているプログラムをCTRL+Cで止めて、そのときに出力されている「put record:n」の「n」をメモしておきます。今回、mqtt-pub_01.pyは1,300、mqtt-pub_02.pyは1,250と仮定します。
$ aws kinesis get-records --shard-iterator $(aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name kinesis-for-blog | jq -r .ShardIterator) > 00.txt
$ aws kinesis get-records --shard-iterator $(aws kinesis get-shard-iterator --shard-id shardId-000000000001 --shard-iterator-type TRIM_HORIZON --stream-name kinesis-for-blog | jq -r .ShardIterator) > 01.txt
これで、シャードに入ってるデータ情報を確認できます。結果はこのような感じになっています。
{
"Records": [
{
"Data": "XXXXXXXXXXRhIjoiSDRzSUNHMzhzVm9BQTJGaVl5NTBlSFFBXXXXXXXXXXVZaTRBSG9EbHR3MEFBQUE9IXXXXXXXXXXX0ZSI6IjIwMTgvMDMvMjUgMTA6Mzk6NTkiLCJwbGFXXXXXXXXXX21fMDAiLCJjb3VudGVyIjoxfQ==",
"PartitionKey": "deviceforblog/room_00/99999999-9999-9999-9999-99999999999",
"ApproximateArrivalTimestamp": 1521942003.734,
"SequenceNumber": "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
},
....
],
"NextShardIterator": "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
"MillisBehindLatest": 9305000
}
レコードの中身で「ApproximateArrivalTimestamp」を使って件数を取得してみます。
$ grep ApproximateArrivalTimestamp 00.txt | wc -l
1278
$grep ApproximateArrivalTimestamp 01.txt | wc -l
1272
今回の結果
実行した結果、このような結果が得られました。
- 2つのシャードに分散されていた
- 送った側のメッセージとKinesis Streamsで受け取ったデータ数(2つのシャード内を確認)がだいたい同じ
次回予告
この後、試してみる内容としてはこのようなことを考えています。
- Kinesis Streamsで受け取ったデータをAWS Lambdaで取って見てみる
- AWS IoTのルールのアクションで権限のないKinesis Streamsへアクセスした場合の動きを見てみる