[[AWSメモ]] >
* AWS IotにPublishされたデータをDynamoDBに登録する [#ef135803]
#setlinebreak(on);

#contents
-- 関連
--- [[AWS IotでPub/Subしてみる]]
-- 参考
--- https://docs.aws.amazon.com/ja_jp/AWSCloudFormation/latest/UserGuide/aws-resource-iot-topicrule.html
--- https://docs.aws.amazon.com/ja_jp/AWSCloudFormation/latest/UserGuide/aws-properties-iot-topicrule-topicrulepayload.html
--- https://docs.aws.amazon.com/ja_jp/AWSCloudFormation/latest/UserGuide/aws-properties-iot-topicrule-action.html
--- https://docs.aws.amazon.com/ja_jp/AWSCloudFormation/latest/UserGuide/cfn-using-cli.html

** 概要 [#v046ab75]
#html(<div style="padding-left: 10px;">)

Iot トピックルールのルールアクションを使用して、AWS Iot に publish されたデータを DynamoDB に登録する。

- バイナリデータも考慮して、DBには base64エンコードされたデータを登録する
- Lambda等は介さずに、ルールアクションに "DynamoDBv2" を使用して直接DBに登録する

#html(</div>)

** DB 及び Iotルールアクションの作成 [#s30900c6]
#html(<div style="padding-left: 10px;">)

ここではスタックとして一括で作成する。

template.yml
#mycode2(){{
AWSTemplateFormatVersion: "2010-09-09"
Description: "Iot Rule action sample"

Resources:
  # テーブル
  SampleIotDataTable:
    Type: AWS::DynamoDB::Table
    Properties:
      TableName: SampleIotData
      AttributeDefinitions:
        - AttributeName: topic
          AttributeType: S
        - AttributeName: skey
          AttributeType: S
      KeySchema:
        - AttributeName: topic
          KeyType: HASH
        - AttributeName: skey
          KeyType: RANGE
      ProvisionedThroughput:
        ReadCapacityUnits: 1
        WriteCapacityUnits: 1
  # テーブルに登録する為のIAMロール
  SampleIotDataTablePutItemRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: "Allow"
            Principal:
              Service:
                - "iot.amazonaws.com"
            Action:
              - "sts:AssumeRole"
      Policies:
        - PolicyName: "SampleIotDataTablePutItemPolicy"
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: "Allow"
                Action: "dynamoDB:PutItem"
                Resource: !GetAtt SampleIotDataTable.Arn
  # Iotトピックルール
  SampleIotRuleAction:
    Type: AWS::IoT::TopicRule
    Properties:
      RuleName: "ToDynamoDB"
      TopicRulePayload:
        Actions:
          - DynamoDBv2:
              RoleArn: !GetAtt SampleIotDataTablePutItemRole.Arn
              PutItem:
                TableName: !Ref SampleIotDataTable
        AwsIotSqlVersion: "2016-03-23"
        Description: String
        RuleDisabled: false
        Sql: >-
          SELECT
          topic() AS topic,
          concat(parse_time("yyyy-MM-dd'T'HH:mm:ss.SSS'#'", timestamp()), traceid()) AS skey
          principal() AS principal,
          traceid() AS traceid,
          encode(*, 'base64') AS data,
          parse_time("yyyy-MM-dd'T'HH:mm:ss.SSSz", timestamp()) AS ts
          FROM 'topic/#'
}}

AWS CLI から一括作成

#myterm2(){{
# 検証&作成&作成完了まで待つ
aws cloudformation validate-template --template-body file://template.yml \
  && aws cloudformation create-stack --stack-name SampleIotRuleAction --template-body file://template.yml --capabilities CAPABILITY_IAM \
  && aws cloudformation wait stack-create-complete --stack-name SampleIotRuleAction
}}

もしくは

#myterm2(){{
# S3バケットの作成(バケット名は世界で唯一である必要がある為、末尾にアカウントID等を付与しておく)
aws s3api create-bucket --create-bucket-configuration '{"LocationConstraint": "ap-northeast-1"}' --bucket my-cloudformation-templates-アカウントID

# 検証&パッケージング&デプロイ
aws cloudformation validate-template --template-body file://template.yml \
  && aws cloudformation package --template-file template.yml --s3-bucket my-cloudformation-templates-アカウントID --output-template-file packaged-template.yml \
  && aws cloudformation deploy --template-file packaged-template.yml --stack-name SampleIotRuleAction --capabilities CAPABILITY_IAM
}}

#html(</div>)

** 動作確認用の処理作成 [#w5f703d6]
#html(<div style="padding-left: 10px;">)

test_publish.py
#mycode2(){{
# coding: utf-8

import base64
import boto3
import json
from datetime import datetime
import time

def publish():
    """Iotにpublishする."""
    client = boto3.client('iot-data')
    now = datetime.now()
    payload = json.dumps({
        'message': f'Hello AWS Iot! {now}'
    }).encode()
    response = client.publish(
        topic='topic/SampleTopic1',
        qos=0,
        payload=payload
    )   
    print('-- publish result --')
    print(response)


def scan_table():
    """DBのデータを確認."""
    dynamodb = boto3.resource('dynamodb')
    res = dynamodb.Table('SampleIotData').scan()
    items = res.get('Items') if res.get('Items') else []
    print('-- items --')
    items = [
        {**x, **{'decoded_data': json.loads(base64.b64decode(x['data']).decode())&#125;&#125;
        for x in items
    ]   
    print(json.dumps(items, indent=4))


def main():
    # データをpublish
    publish()

    # 少し待つ
    time.sleep(3)

    # DBのデータを確認
    scan_table()


if __name__ == '__main__':
    main()
}}
#html(</div>)

** 動作確認 [#jb8fd2c3]
#html(<div style="padding-left: 10px;">)

boto3インストール
#myterm2(){{
python3 -m venv venv
source venv/bin/activate
pip install boto3
}}

実行
#myterm2(){{
python test_publish.py
}}

結果
#myterm2(){{
-- publish result --
{'ResponseMetadata': {'RequestId': '9c75d82c-de11-8ee7-ed65-7930fc29c580', 'HTTPStatusCode': 200, 'HTTPHeaders':  ... }, 'RetryAttempts': 0&#125;&#125;
-- items --
[
    {
        "ts": "2018-01-01T12:34:56.123UTC",
        "principal": "AIDAJHJTUCF474FK4DJSI",
        "topic": "topic/SampleTopic1",
        "traceid": "1dc64f4a-c4f5-0419-ebb9-5e0cd283ca84",
        "data": "eyJtZXNzYWdlIjogIkhlbGxvIEFXUyBJb3QhIDIwMTgtMDEtMDEgMTI6NDc6MzkuMzI3NTQ1In0=",
        "skey": "2018-01-01T12:34:56.123#1dc64f4a-c4f5-0419-ebb9-5e0cd283ca84",
        "decoded_data": {
            "message": "Hello AWS Iot! 2018-01-01 12:34:56.123456"
            "message": "Hello AWS Iot! 2018-01-01 12:47:39.327545"
        }
    }
]
}}

#html(</div>)

トップ   差分 バックアップ リロード   一覧 単語検索 最終更新   ヘルプ   最終更新のRSS