- 追加された行はこの色です。
- 削除された行はこの色です。
#author("2019-02-01T01:05:23+00:00","","")
[[AWSメモ]] >
* AWS Iot にPublishデータをDBに登録する [#cea88751]
* AWS IotにPublishされたデータをDynamoDBに登録する [#ef135803]
#setlinebreak(on);
#contents
-- 関連
--- [[AWS IotでPub/Subしてみる]]
-- 参考
--- https://docs.aws.amazon.com/ja_jp/AWSCloudFormation/latest/UserGuide/aws-resource-iot-topicrule.html
--- https://docs.aws.amazon.com/ja_jp/AWSCloudFormation/latest/UserGuide/aws-properties-iot-topicrule-topicrulepayload.html
--- https://docs.aws.amazon.com/ja_jp/AWSCloudFormation/latest/UserGuide/aws-properties-iot-topicrule-action.html
--- https://docs.aws.amazon.com/ja_jp/AWSCloudFormation/latest/UserGuide/cfn-using-cli.html
#TODO
** 概要 [#v046ab75]
#html(<div style="padding-left: 10px;">)
Iot トピックルールのルールアクションを使用して、AWS Iot に publish されたデータを DynamoDB に登録する。
- バイナリデータも考慮して、DBには base64エンコードされたデータを登録する
- Lambda等は介さずに、ルールアクションに "DynamoDBv2" を使用して直接DBに登録する
#html(</div>)
** DB 及び Iotルールアクションの作成 [#s30900c6]
#html(<div style="padding-left: 10px;">)
ここではスタックとして一括で作成する。
template.yml
#mycode2(){{
AWSTemplateFormatVersion: "2010-09-09"
Description: "Iot Rule action sample"
Resources:
# テーブル
SampleIotDataTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: SampleIotData
AttributeDefinitions:
- AttributeName: topic
AttributeType: S
- AttributeName: skey
AttributeType: S
KeySchema:
- AttributeName: topic
KeyType: HASH
- AttributeName: skey
KeyType: RANGE
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1
# テーブルに登録する為のIAMロール
SampleIotDataTablePutItemRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: "Allow"
Principal:
Service:
- "iot.amazonaws.com"
Action:
- "sts:AssumeRole"
Policies:
- PolicyName: "SampleIotDataTablePutItemPolicy"
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: "Allow"
Action: "dynamoDB:PutItem"
Resource: !GetAtt SampleIotDataTable.Arn
# Iotトピックルール
SampleIotRuleAction:
Type: AWS::IoT::TopicRule
Properties:
RuleName: "ToDynamoDB"
TopicRulePayload:
Actions:
- DynamoDBv2:
RoleArn: !GetAtt SampleIotDataTablePutItemRole.Arn
PutItem:
TableName: !Ref SampleIotDataTable
AwsIotSqlVersion: "2016-03-23"
Description: String
RuleDisabled: false
Sql: >-
SELECT
topic() AS topic,
concat(parse_time("yyyy-MM-dd'T'HH:mm:ss.SSS'#'", timestamp()), traceid()) AS skey
principal() AS principal,
traceid() AS traceid,
encode(*, 'base64') AS data,
parse_time("yyyy-MM-dd'T'HH:mm:ss.SSSz", timestamp()) AS ts
FROM 'topic/#'
}}
AWS CLI から一括作成
#myterm2(){{
# 検証&作成&作成完了まで待つ
aws cloudformation validate-template --template-body file://template.yml \
&& aws cloudformation create-stack --stack-name SampleIotRuleAction --template-body file://template.yml --capabilities CAPABILITY_IAM \
&& aws cloudformation wait stack-create-complete --stack-name SampleIotRuleAction
}}
もしくは
#myterm2(){{
# S3バケットの作成(バケット名は世界で唯一である必要がある為、末尾にアカウントID等を付与しておく)
aws s3api create-bucket --create-bucket-configuration '{"LocationConstraint": "ap-northeast-1"}' --bucket my-cloudformation-templates-アカウントID
# 検証&パッケージング&デプロイ
aws cloudformation validate-template --template-body file://template.yml \
&& aws cloudformation package --template-file template.yml --s3-bucket my-cloudformation-templates-アカウントID --output-template-file packaged-template.yml \
&& aws cloudformation deploy --template-file packaged-template.yml --stack-name SampleIotRuleAction --capabilities CAPABILITY_IAM
}}
#html(</div>)
** 動作確認用の処理作成 [#w5f703d6]
#html(<div style="padding-left: 10px;">)
test_publish.py
#mycode2(){{
# coding: utf-8
import base64
import boto3
import json
from datetime import datetime
import time
def publish():
"""Iotにpublishする."""
client = boto3.client('iot-data')
now = datetime.now()
payload = json.dumps({
'message': f'Hello AWS Iot! {now}'
}).encode()
response = client.publish(
topic='topic/SampleTopic1',
qos=0,
payload=payload
)
print('-- publish result --')
print(response)
def scan_table():
"""DBのデータを確認."""
dynamodb = boto3.resource('dynamodb')
res = dynamodb.Table('SampleIotData').scan()
items = res.get('Items') if res.get('Items') else []
print('-- items --')
items = [
{**x, **{'decoded_data': json.loads(base64.b64decode(x['data']).decode())}}
for x in items
]
print(json.dumps(items, indent=4))
def main():
# データをpublish
publish()
# 少し待つ
time.sleep(3)
# DBのデータを確認
scan_table()
if __name__ == '__main__':
main()
}}
#html(</div>)
** 動作確認 [#jb8fd2c3]
#html(<div style="padding-left: 10px;">)
boto3インストール
#myterm2(){{
python3 -m venv venv
source venv/bin/activate
pip install boto3
}}
実行
#myterm2(){{
python test_publish.py
}}
結果
#myterm2(){{
-- publish result --
{'ResponseMetadata': {'RequestId': '9c75d82c-de11-8ee7-ed65-7930fc29c580', 'HTTPStatusCode': 200, 'HTTPHeaders': ... }, 'RetryAttempts': 0}}
-- items --
[
{
"ts": "2018-01-01T12:34:56.123UTC",
"principal": "AIDAJHJTUCF474FK4DJSI",
"topic": "topic/SampleTopic1",
"traceid": "1dc64f4a-c4f5-0419-ebb9-5e0cd283ca84",
"data": "eyJtZXNzYWdlIjogIkhlbGxvIEFXUyBJb3QhIDIwMTgtMDEtMDEgMTI6NDc6MzkuMzI3NTQ1In0=",
"skey": "2018-01-01T12:34:56.123#1dc64f4a-c4f5-0419-ebb9-5e0cd283ca84",
"decoded_data": {
"message": "Hello AWS Iot! 2018-01-01 12:47:39.327545"
}
}
]
}}
#html(</div>)