AWSメモ > AWS IotでPub/Subしてみる †
概要 †AWS IoT SDK を使用してMQTT接続するクライアントを書く。 AWSサービス側の設定 †参考 ポリシーの作成 †マネージメントコンソール から、以下の通りポリシーを作成する。
証明書の作成 †
※ルートCA証明のダウンロード先 モノの登録 †マネージメントコンソール の「単一のモノを作成する」からモノを登録する。 証明書にモノにアタッチ †マネージメントコンソール から対象の証明書に「モノをアタッチ」する。 エンドポイントの確認 †マネージメントコンソールの設定 からエンドポイントを確認しておく。 デバイス側の環境構築 †いくつか方法はあるが、ここでは AWSIoTPythonSDK を使用した。 GitHub mkdir test_iot && test_iot python3 -m venv venv source venv/bin/activate pip install boto3 pip install AWSIoTPythonSDK デバイス側の処理作成 †https://github.com/aws/aws-iot-device-sdk-python/blob/master/samples/basicPubSub/basicPubSub.py を参考に任意のトピックをサブスクライブするクライアントを作成する。 client.py # coding: utf-8 from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient import logging import time import argparse import json # MQTTの設定情報 host = 設定画面で確認したエンドポイント port = 8883 rootCAPath = ルートCA証明書ファイルPATH certificatePath = モノの証明書ファイルPATH privateKeyPath = プライベートキーのファイルPATH clientId = 'sample-thing1' # Thing Name と同じである事が推奨されている topic = 'topic/SampleTopic1' # サブスクライブするTopic # topic = 'topic/#' # トピックの前方一致でサブスクライブする事も可能 # メッセージ受信時のコールバック def customCallback(client, userdata, message): print("Received a new message: ") print(message.payload) print("from topic: ") print(message.topic) print("--------------\n\n") # ログ設定 logger = logging.getLogger("AWSIoTPythonSDK.core") logger.setLevel(logging.DEBUG) streamHandler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') streamHandler.setFormatter(formatter) logger.addHandler(streamHandler) # MQTTクライアントの初期化/設定 myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId) myAWSIoTMQTTClient.configureEndpoint(host, port) myAWSIoTMQTTClient.configureCredentials(rootCAPath, privateKeyPath, certificatePath) myAWSIoTMQTTClient.configureAutoReconnectBackoffTime(1, 32, 20) myAWSIoTMQTTClient.configureOfflinePublishQueueing(-1) # Infinite offline Publish queueing myAWSIoTMQTTClient.configureDrainingFrequency(2) # Draining: 2 Hz myAWSIoTMQTTClient.configureConnectDisconnectTimeout(10) # 10 sec myAWSIoTMQTTClient.configureMQTTOperationTimeout(5) # 5 sec # 接続 myAWSIoTMQTTClient.connect() # サブスクライブ myAWSIoTMQTTClient.subscribe(topic, 1, customCallback) while True: time.sleep(1) テスト用のPublish処理を作成 †test_publish.py # coding: utf-8 import boto3 import json from datetime import datetime def main(): client = boto3.client('iot-data') now = datetime.now() payload = json.dumps({ 'message': f'Hello AWS Iot! {now}' }).encode() response = client.publish( topic='topic/SampleTopic1', qos=0, payload=payload ) print(response) if __name__ == '__main__': main() 実行 †デバイス側のクライアントを起動 python client.py テストメッセージをPublish python test_publish.py 結果(デバイス側のクライアント) 2018-01-01 14:26:39,574 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Dispatching [message] event Received a new message: b'{"message": "Hello AWS Iot! 2018-01-01 14:26:39.287980"}' from topic: topic/SampleTopic1 -------------- 補足 †ワイルドカードを使用したトピックのサブスクライブ †ワイルドカードを使用してトピックの前方一致や部分一致でサブスクライブする事が可能。 参考 |