AWSメモ > AWS IotにPublishされたデータをDynamoDBに登録する †
概要 †Iot トピックルールのルールアクションを使用して、AWS Iot に publish されたデータを DynamoDB に登録する。
DB 及び Iotルールアクションの作成 †ここではスタックとして一括で作成する。 template.yml AWSTemplateFormatVersion: "2010-09-09" Description: "Iot Rule action sample" Resources: # テーブル SampleIotDataTable: Type: AWS::DynamoDB::Table Properties: TableName: SampleIotData AttributeDefinitions: - AttributeName: topic AttributeType: S - AttributeName: skey AttributeType: S KeySchema: - AttributeName: topic KeyType: HASH - AttributeName: skey KeyType: RANGE ProvisionedThroughput: ReadCapacityUnits: 1 WriteCapacityUnits: 1 # テーブルに登録する為のIAMロール SampleIotDataTablePutItemRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "iot.amazonaws.com" Action: - "sts:AssumeRole" Policies: - PolicyName: "SampleIotDataTablePutItemPolicy" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: "dynamoDB:PutItem" Resource: !GetAtt SampleIotDataTable.Arn # Iotトピックルール SampleIotRuleAction: Type: AWS::IoT::TopicRule Properties: RuleName: "ToDynamoDB" TopicRulePayload: Actions: - DynamoDBv2: RoleArn: !GetAtt SampleIotDataTablePutItemRole.Arn PutItem: TableName: !Ref SampleIotDataTable AwsIotSqlVersion: "2016-03-23" Description: String RuleDisabled: false Sql: >- SELECT topic() AS topic, concat(parse_time("yyyy-MM-dd'T'HH:mm:ss.SSS'#'", timestamp()), traceid()) AS skey principal() AS principal, traceid() AS traceid, encode(*, 'base64') AS data, parse_time("yyyy-MM-dd'T'HH:mm:ss.SSSz", timestamp()) AS ts FROM 'topic/#' AWS CLI から一括作成 # 検証&作成&作成完了まで待つ aws cloudformation validate-template --template-body file://template.yml \ && aws cloudformation create-stack --stack-name SampleIotRuleAction --template-body file://template.yml --capabilities CAPABILITY_IAM \ && aws cloudformation wait stack-create-complete --stack-name SampleIotRuleAction もしくは # S3バケットの作成(バケット名は世界で唯一である必要がある為、末尾にアカウントID等を付与しておく) aws s3api create-bucket --create-bucket-configuration '{"LocationConstraint": "ap-northeast-1"}' --bucket my-cloudformation-templates-アカウントID # 検証&パッケージング&デプロイ aws cloudformation validate-template --template-body file://template.yml \ && aws cloudformation package --template-file template.yml --s3-bucket my-cloudformation-templates-アカウントID --output-template-file packaged-template.yml \ && aws cloudformation deploy --template-file packaged-template.yml --stack-name SampleIotRuleAction --capabilities CAPABILITY_IAM 動作確認用の処理作成 †test_publish.py # coding: utf-8 import base64 import boto3 import json from datetime import datetime import time def publish(): """Iotにpublishする.""" client = boto3.client('iot-data') now = datetime.now() payload = json.dumps({ 'message': f'Hello AWS Iot! {now}' }).encode() response = client.publish( topic='topic/SampleTopic1', qos=0, payload=payload ) print('-- publish result --') print(response) def scan_table(): """DBのデータを確認.""" dynamodb = boto3.resource('dynamodb') res = dynamodb.Table('SampleIotData').scan() items = res.get('Items') if res.get('Items') else [] print('-- items --') items = [ {**x, **{'decoded_data': json.loads(base64.b64decode(x['data']).decode())}} for x in items ] print(json.dumps(items, indent=4)) def main(): # データをpublish publish() # 少し待つ time.sleep(3) # DBのデータを確認 scan_table() if __name__ == '__main__': main() 動作確認 †boto3インストール python3 -m venv venv source venv/bin/activate pip install boto3 実行 python test_publish.py 結果 -- publish result -- {'ResponseMetadata': {'RequestId': '9c75d82c-de11-8ee7-ed65-7930fc29c580', 'HTTPStatusCode': 200, 'HTTPHeaders': ... }, 'RetryAttempts': 0}} -- items -- [ { "ts": "2018-01-01T12:34:56.123UTC", "principal": "AIDAJHJTUCF474FK4DJSI", "topic": "topic/SampleTopic1", "traceid": "1dc64f4a-c4f5-0419-ebb9-5e0cd283ca84", "data": "eyJtZXNzYWdlIjogIkhlbGxvIEFXUyBJb3QhIDIwMTgtMDEtMDEgMTI6NDc6MzkuMzI3NTQ1In0=", "skey": "2018-01-01T12:34:56.123#1dc64f4a-c4f5-0419-ebb9-5e0cd283ca84", "decoded_data": { "message": "Hello AWS Iot! 2018-01-01 12:47:39.327545" } } ] |