[[AWSメモ]] > * AWS IotにPublishされたデータをDynamoDBに登録する [#ef135803] #setlinebreak(on); #contents -- 関連 --- [[AWS IotでPub/Subしてみる]] -- 参考 --- https://docs.aws.amazon.com/ja_jp/AWSCloudFormation/latest/UserGuide/aws-resource-iot-topicrule.html --- https://docs.aws.amazon.com/ja_jp/AWSCloudFormation/latest/UserGuide/aws-properties-iot-topicrule-topicrulepayload.html --- https://docs.aws.amazon.com/ja_jp/AWSCloudFormation/latest/UserGuide/aws-properties-iot-topicrule-action.html --- https://docs.aws.amazon.com/ja_jp/AWSCloudFormation/latest/UserGuide/cfn-using-cli.html ** 概要 [#v046ab75] #html(<div style="padding-left: 10px;">) Iot トピックルールのルールアクションを使用して、AWS Iot に publish されたデータを DynamoDB に登録する。 - バイナリデータも考慮して、DBには base64エンコードされたデータを登録する - Lambda等は介さずに、ルールアクションに "DynamoDBv2" を使用して直接DBに登録する #html(</div>) ** DB 及び Iotルールアクションの作成 [#s30900c6] #html(<div style="padding-left: 10px;">) ここではスタックとして一括で作成する。 template.yml #mycode2(){{ AWSTemplateFormatVersion: "2010-09-09" Description: "Iot Rule action sample" Resources: # テーブル SampleIotDataTable: Type: AWS::DynamoDB::Table Properties: TableName: SampleIotData AttributeDefinitions: - AttributeName: topic AttributeType: S - AttributeName: skey AttributeType: S KeySchema: - AttributeName: topic KeyType: HASH - AttributeName: skey KeyType: RANGE ProvisionedThroughput: ReadCapacityUnits: 1 WriteCapacityUnits: 1 # テーブルに登録する為のIAMロール SampleIotDataTablePutItemRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "iot.amazonaws.com" Action: - "sts:AssumeRole" Policies: - PolicyName: "SampleIotDataTablePutItemPolicy" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: "dynamoDB:PutItem" Resource: !GetAtt SampleIotDataTable.Arn # Iotトピックルール SampleIotRuleAction: Type: AWS::IoT::TopicRule Properties: RuleName: "ToDynamoDB" TopicRulePayload: Actions: - DynamoDBv2: RoleArn: !GetAtt SampleIotDataTablePutItemRole.Arn PutItem: TableName: !Ref SampleIotDataTable AwsIotSqlVersion: "2016-03-23" Description: String RuleDisabled: false Sql: >- SELECT topic() AS topic, concat(parse_time("yyyy-MM-dd'T'HH:mm:ss.SSS'#'", timestamp()), traceid()) AS skey principal() AS principal, traceid() AS traceid, encode(*, 'base64') AS data, parse_time("yyyy-MM-dd'T'HH:mm:ss.SSSz", timestamp()) AS ts FROM 'topic/#' }} AWS CLI から一括作成 #myterm2(){{ # 検証&作成&作成完了まで待つ aws cloudformation validate-template --template-body file://template.yml \ && aws cloudformation create-stack --stack-name SampleIotRuleAction --template-body file://template.yml --capabilities CAPABILITY_IAM \ && aws cloudformation wait stack-create-complete --stack-name SampleIotRuleAction }} もしくは #myterm2(){{ # S3バケットの作成(バケット名は世界で唯一である必要がある為、末尾にアカウントID等を付与しておく) aws s3api create-bucket --create-bucket-configuration '{"LocationConstraint": "ap-northeast-1"}' --bucket my-cloudformation-templates-アカウントID # 検証&パッケージング&デプロイ aws cloudformation validate-template --template-body file://template.yml \ && aws cloudformation package --template-file template.yml --s3-bucket my-cloudformation-templates-アカウントID --output-template-file packaged-template.yml \ && aws cloudformation deploy --template-file packaged-template.yml --stack-name SampleIotRuleAction --capabilities CAPABILITY_IAM }} #html(</div>) ** 動作確認用の処理作成 [#w5f703d6] #html(<div style="padding-left: 10px;">) test_publish.py #mycode2(){{ # coding: utf-8 import base64 import boto3 import json from datetime import datetime import time def publish(): """Iotにpublishする.""" client = boto3.client('iot-data') now = datetime.now() payload = json.dumps({ 'message': f'Hello AWS Iot! {now}' }).encode() response = client.publish( topic='topic/SampleTopic1', qos=0, payload=payload ) print('-- publish result --') print(response) def scan_table(): """DBのデータを確認.""" dynamodb = boto3.resource('dynamodb') res = dynamodb.Table('SampleIotData').scan() items = res.get('Items') if res.get('Items') else [] print('-- items --') items = [ {**x, **{'decoded_data': json.loads(base64.b64decode(x['data']).decode())}} for x in items ] print(json.dumps(items, indent=4)) def main(): # データをpublish publish() # 少し待つ time.sleep(3) # DBのデータを確認 scan_table() if __name__ == '__main__': main() }} #html(</div>) ** 動作確認 [#jb8fd2c3] #html(<div style="padding-left: 10px;">) boto3インストール #myterm2(){{ python3 -m venv venv source venv/bin/activate pip install boto3 }} 実行 #myterm2(){{ python test_publish.py }} 結果 #myterm2(){{ -- publish result -- {'ResponseMetadata': {'RequestId': '9c75d82c-de11-8ee7-ed65-7930fc29c580', 'HTTPStatusCode': 200, 'HTTPHeaders': ... }, 'RetryAttempts': 0}} -- items -- [ { "ts": "2018-01-01T12:34:56.123UTC", "principal": "AIDAJHJTUCF474FK4DJSI", "topic": "topic/SampleTopic1", "traceid": "1dc64f4a-c4f5-0419-ebb9-5e0cd283ca84", "data": "eyJtZXNzYWdlIjogIkhlbGxvIEFXUyBJb3QhIDIwMTgtMDEtMDEgMTI6NDc6MzkuMzI3NTQ1In0=", "skey": "2018-01-01T12:34:56.123#1dc64f4a-c4f5-0419-ebb9-5e0cd283ca84", "decoded_data": { "message": "Hello AWS Iot! 2018-01-01 12:34:56.123456" "message": "Hello AWS Iot! 2018-01-01 12:47:39.327545" } } ] }} #html(</div>)