AWSメモ >

AWS IotでPub/Subしてみる

概要

AWS IoT SDK を使用してMQTT接続するクライアントを書く。

AWSサービス側の設定

参考
AWS IoT への Raspberry Pi の接続 - https://docs.aws.amazon.com/ja_jp/iot/latest/developerguide/iot-sdk-setup.html

ポリシーの作成

マネージメントコンソール から、以下の通りポリシーを作成する。

名前sample-iot-policy
アクションiot:*
リソース ARN*
効果許可

証明書の作成

  • マネージメントコンソール から証明書を作成する。
  • 作成したモノの証明書、パブリックキー、プライベートキー、及び ルートCA証明書をダウンロードしておく。
  • 「有効化」しておく。
  • 「ポリシーをアタッチ」で作成しておいたポリシーをアタッチする。

※ルートCA証明のダウンロード先
https://docs.aws.amazon.com/ja_jp/iot/latest/developerguide/managing-device-certs.html#server-authentication

モノの登録

マネージメントコンソール の「単一のモノを作成する」からモノを登録する。

証明書にモノにアタッチ

マネージメントコンソール から対象の証明書に「モノをアタッチ」する。
※ポリシーをアタッチ済みの場合は不要?

エンドポイントの確認

マネージメントコンソールの設定 からエンドポイントを確認しておく。
※エンドポイントはAWSアカウント毎に1つだけ存在する。

デバイス側の環境構築

いくつか方法はあるが、ここでは AWSIoTPythonSDK を使用した。
尚、テスト用の Publish は マネージメントコンソール から簡単に行う事ができるが、一応 boto3 から Publish する処理を作る為、ここでは boto3 もインストールしておく。
※aws configure は事前にしておく事。

GitHub
https://github.com/aws/aws-iot-device-sdk-python

mkdir test_iot && test_iot
python3 -m venv venv
source venv/bin/activate
pip install boto3
pip install AWSIoTPythonSDK

デバイス側の処理作成

https://github.com/aws/aws-iot-device-sdk-python/blob/master/samples/basicPubSub/basicPubSub.py を参考に任意のトピックをサブスクライブするクライアントを作成する。

client.py

# coding: utf-8

from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
import logging
import time
import argparse
import json

# MQTTの設定情報
host = 設定画面で確認したエンドポイント
port = 8883
rootCAPath = ルートCA証明書ファイルPATH
certificatePath = モノの証明書ファイルPATH
privateKeyPath = プライベートキーのファイルPATH
clientId = 'sample-thing1'  # Thing Name と同じである事が推奨されている
topic = 'topic/SampleTopic1'  # サブスクライブするTopic
# topic = 'topic/#'  # トピックの前方一致でサブスクライブする事も可能

# メッセージ受信時のコールバック
def customCallback(client, userdata, message):
    print("Received a new message: ")
    print(message.payload)
    print("from topic: ")
    print(message.topic)
    print("--------------\n\n")

# ログ設定
logger = logging.getLogger("AWSIoTPythonSDK.core")
logger.setLevel(logging.DEBUG)
streamHandler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)

# MQTTクライアントの初期化/設定
myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId)
myAWSIoTMQTTClient.configureEndpoint(host, port)
myAWSIoTMQTTClient.configureCredentials(rootCAPath, privateKeyPath, certificatePath)
myAWSIoTMQTTClient.configureAutoReconnectBackoffTime(1, 32, 20) 
myAWSIoTMQTTClient.configureOfflinePublishQueueing(-1)  # Infinite offline Publish queueing
myAWSIoTMQTTClient.configureDrainingFrequency(2)  # Draining: 2 Hz
myAWSIoTMQTTClient.configureConnectDisconnectTimeout(10)  # 10 sec
myAWSIoTMQTTClient.configureMQTTOperationTimeout(5)  # 5 sec

# 接続
myAWSIoTMQTTClient.connect()

# サブスクライブ
myAWSIoTMQTTClient.subscribe(topic, 1, customCallback)

while True:
    time.sleep(1)

テスト用のPublish処理を作成

test_publish.py

# coding: utf-8

import boto3
import json
from datetime import datetime

def main():
    client = boto3.client('iot-data')
    now = datetime.now()
    payload = json.dumps({
        'message': f'Hello AWS Iot! {now}'
    }).encode()
    response = client.publish(
        topic='topic/SampleTopic1',
        qos=0,
        payload=payload
    )   
    print(response)


if __name__ == '__main__':
    main()

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/iot-data.html#IoTDataPlane.Client.publish

実行

デバイス側のクライアントを起動

python client.py

テストメッセージをPublish

python test_publish.py

結果(デバイス側のクライアント)

2018-01-01 14:26:39,574 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Dispatching [message] event
Received a new message: 
b'{"message": "Hello AWS Iot! 2018-01-01 14:26:39.287980"}'
from topic: 
topic/SampleTopic1
--------------

補足

ワイルドカードを使用したトピックのサブスクライブ

ワイルドカードを使用してトピックの前方一致や部分一致でサブスクライブする事が可能。

参考
https://docs.aws.amazon.com/ja_jp/iot/latest/developerguide/topics.html


トップ   差分 バックアップ リロード   一覧 単語検索 最終更新   ヘルプ   最終更新のRSS
Last-modified: 2019-01-30 (水) 14:29:33 (1911d)