[[AWSメモ]] >
* AWS Kinesis Client Libraryでコンシューマ開発 [#h560efd9]
#setlinebreak(on);

#contents
-- 関連
--- [[AWS Java SDKでDynamoDBのCRUDを書いてみる]]
-- ドキュメント/サンプルなど
--- https://docs.aws.amazon.com/ja_jp/streams/latest/dev/kinesis-record-processor-implementation-app-java.html
--- https://github.com/aws/aws-sdk-java/tree/master/src/samples/AmazonKinesis
--- https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/index.html

** 概要 [#t961e653]
#html(<div style="padding-left:10px;">)
AWS Kinesis はストリーミングデータをリアルタイムで収集、処理、分析する為に提供されるサービス。
データは Lambda 等にそのまま流したり、KCL(Kinesis Client Library)を使用して常駐アプリで処理させる事も出来る。
以降、KCLアプリケーションの開発手順を記載する。
#html(</div>)

** KCLを使用したコンシューマ開発 [#vb03e685]
#html(<div style="padding-left:10px;">)
AWS から提供される KCL(Kinesis Client Library)で[[コンシューマ>https://docs.aws.amazon.com/ja_jp/streams/latest/dev/shared-fan-out-consumers.html]] を開発する事が出来る。

コンシューマは基本的に1シャードにつき1コンシューマとなる為、スケールするにはシャードを増やす必要があるが、
[[拡張ファンアウト>https://docs.aws.amazon.com/ja_jp/streams/latest/dev/introduction-to-enhanced-consumers.html]] を使用する事によって、1シャード:Nコンシューマで処理する事もできる。
※ただし、最大5コンシューマの制限がある。(制限の引き上げのリクエストは可能)

各コンシューマへの振り分けは、Kinesisへのデータ出力時に指定するパーティションキーによって決定される。
なので、同じコンシューマに処理させたいデータは同じパーティションキーでKinesisに流す必要がある。
ただ、Kinesisに送信するクライアント側の状況によっても順序等が変わる事があり、レコードが複数回配信されるケースもある為、
1つのコンシューマにデータが偏らないようにする為には、パーティションキーはランダムにしておいて、アプリケーション側で頑張る方向か。
#html(</div>)

** AWS SDK for Java [#vb569f55]
#html(<div style="padding-left:10px;">)
いちおう以下にあるが、今回は Gradle または Mavenで依存関係を解決するので、個別にダウンロードはしなくて良い
http://sdk-for-java.amazonwebservices.com/latest/aws-java-sdk.zip
#html(</div>)

** AWS Toolkit for Eclipse のインストール [#l0365225]
#html(<div style="padding-left:10px;">)
※ Eclipseで開発しない場合は不要。
※ Eclipse 4.4 (Luna) 以上が必要
+[ヘルプ] > [新しいソフトウェアをインストール] を開く
+[機能するソフトウェア] に https://aws.amazon.com/eclipse と入力
+以下のリストから必要な「AWS コア管理ツール」およびその他のオプション項目を選択
+[次へ] をクリック
#html(</div>)

** 準備(AWS CLI) [#ce59cb7c]
#html(<div style="padding-left:10px">)

*** Kinesisストリームの作成 [#f5d2897c]
#html(<div style="padding-left:10px">)
#myterm2(){{
aws kinesis create-stream --stream-name myFirstStream --shard-count 1
}}
※サンプルのストリーム名から変える場合は、AmazonKinesisApplicationSample.java で定義されているストリーム名を書き換える。(後述参照)
#html(</div>)

#html(</div>)
&br;

** プロジェクトの作成 [#p6c5829b]
#html(<div style="padding-left:10px">)

サンプルは以下にあるが、なぜか Antベースなので、ここではGradle または Mavenで作成し直す。
https://github.com/aws/aws-sdk-java/tree/master/src/samples/AmazonKinesis

*** Gradleプロジェクトの場合[#ubbe6a07]
#html(<div style="padding-left:10px">)
#myterm2(){{
# プロジェクトフォルダを作成
mkdir KclSample && cd KclSample

# 初期化
gradle init --type java-application
}}

build.gradle の編集
#mycode2(){{
plugins {
    id 'java'
    id 'application'
}

mainClassName = 'AmazonKinesisApplicationSample'

dependencies {
    compile 'com.google.guava:guava:23.0'
    compile 'com.amazonaws:amazon-kinesis-client:1.9.0'
    compile 'com.google.code.gson:gson:2.8.2'
    compile group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.11.0'
    compile group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.11.0'

    testCompile 'junit:junit:4.12'
}

repositories {
    jcenter()
}

jar {
    manifest {
        attributes 'Main-Class': mainClassName
    }   
} 
}}

#html(</div>)

*** Maven(Eclipse使用)の場合 [#ubbe6a07]
#html(<div style="padding-left:10px">)

EclipseからMavenプロジェクトを作成し、amazon-kinesis-clientとgsonを依存ライブラリとして追記するだけ。

pom.xml
#myhtml2(){{
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>KclSample</groupId>
  <artifactId>KclSample</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <dependencies>
    <dependency>
      <groupId>com.amazonaws</groupId>
      <artifactId>amazon-kinesis-client</artifactId>
      <version>1.9.0</version>
    </dependency>
    <dependency>
      <groupId>com.google.code.gson</groupId>
      <artifactId>gson</artifactId>
      <version>2.8.2</version>
    </dependency>
  </dependencies>
</project>
}}

#html(</div>)
#html(</div>)

** コンシューマの作成 [#o0bbdfbb]
#html(<div style="padding-left:10px">)

*** サンプルソースをプロジェクトにコピー [#td216f77]
#html(<div style="padding-left:10px">)
https://github.com/aws/aws-sdk-java/tree/master/src/samples/AmazonKinesis  にあるサンプルから必要なファイルを取得する。

#myterm2(){{
# src, test フォルダをキレイにしておく
rm -rf src/main/java/*
rm -rf src/test/java/*

# 必要なサンプルソースをダウンロード
wget https://raw.githubusercontent.com/aws/aws-sdk-java/master/src/samples/AmazonKinesis/AmazonKinesisApplicationRecordProcessorFactory.java \
    -O src/main/java/AmazonKinesisApplicationRecordProcessorFactory.java
wget https://raw.githubusercontent.com/aws/aws-sdk-java/master/src/samples/AmazonKinesis/AmazonKinesisApplicationSample.java \
    -O src/main/java/AmazonKinesisApplicationSample.java
wget https://raw.githubusercontent.com/aws/aws-sdk-java/master/src/samples/AmazonKinesis/AmazonKinesisApplicationSampleRecordProcessor.java \
    -O src/main/java/AmazonKinesisApplicationSampleRecordProcessor.java
}}
#html(</div>)

*** サンプルソースの修正 [#z40684bd]
#html(<div style="padding-left:10px">)

サンプルは少し古いKCLのバージョン用のものになっているので一部変更する。
また、IRecordProcessor バージョン1用のサンプルなので、バージョン2用を試したい場合は、必要に応じて書き換える。
https://docs.aws.amazon.com/ja_jp/streams/latest/dev/kinesis-record-processor-implementation-app-java.html#kcl-java-interface-v2

#html(<hr>)

AmazonKinesisApplicationSample.java の修正
#mycode2(){{
    .
    .

    public static final String SAMPLE_APPLICATION_STREAM_NAME = "myFirstStream";     // 別のストリーム名にする場合は変える

    .
    .

    public static void main(String[] args) throws Exception {
          .
          .
        kinesisClientLibConfiguration.withInitialPositionInStream(SAMPLE_APPLICATION_INITIAL_POSITION_IN_STREAM);
        kinesisClientLibConfiguration.withRegionName("ap-northeast-1");      // [追加] 東京リージョンを指定

          .
          .

    public static void deleteResources() {
        // Delete the stream
        AmazonKinesis kinesis = AmazonKinesisClientBuilder.standard()
            .withCredentials(credentialsProvider)
            .withRegion("ap-northeast-1")                            // [変更] 東京リージョンに変更
            .build();

        System.out.printf("Deleting the Amazon Kinesis stream used by the sample. Stream Name = %s.\n",
                SAMPLE_APPLICATION_STREAM_NAME);
        try {
            kinesis.deleteStream(SAMPLE_APPLICATION_STREAM_NAME);
        } catch (ResourceNotFoundException ex) {
            // The stream doesn't exist.
        }   

        // Delete the table
        AmazonDynamoDB dynamoDB = AmazonDynamoDBClientBuilder.standard()
            .withCredentials(credentialsProvider)
            .withRegion("ap-northeast-1")                           // [変更] 東京リージョンに変更
            .build();
        System.out.printf("Deleting the Amazon DynamoDB table used by the Amazon Kinesis Client Library. Table Name = %s.\n",
                SAMPLE_APPLICATION_NAME);
        try {
            dynamoDB.deleteTable(SAMPLE_APPLICATION_NAME);
        } catch (com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException ex) {
            // The table doesn't exist.
        }   
    }   
</pre>
}}

&br;

AmazonKinesisApplicationSampleRecordProcessor.java の修正
#mycode2(){{
.
.

//import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;                   // [コメントアウト] 
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;              //  [追加] 新しいバージョンだとパッケージが変わっている

import com.google.gson.Gson;  // [追加]
import java.util.Map;  // [追加]

    .
    .

    // テキスト文字列 または Json文字列 を受け取ってログ出力するように変更。
    private void processSingleRecord(Record record) {

        String data = null;
        try {
            data = decoder.decode(record.getData()).toString();
            try {
                Map receiveJson = new Gson().fromJson(data, Map.class);
                LOG.info("##### received json : " + receiveJson + " #####");
            } catch (Exception e) {
                LOG.info("##### Received string: " + data + " #####");
            }   

        } catch (NumberFormatException e) {
            LOG.info("Record does not match sample record format. Ignoring record with data; " + data);
        } catch (CharacterCodingException e) {
            LOG.error("Malformed data: " + data, e); 
        }   
    }   
}}

#html(</div>)
#html(</div>)

** 動作確認 - その1 [#v83f28da]
#html(<div style="padding-left:10px">)

*** ワーカーを起動 [#i83f7a38]
#html(<div style="padding-left:10px">)
#myterm2(){{
gradle run
}}
#html(</div>)

*** Kinesis にテストデータを出力 [#dc9e0ff9]
#html(<div style="padding-left:10px">)
#myterm2(){{
aws kinesis put-record --stream-name myFirstStream --partition-key 123 --data "test123"
aws kinesis put-record --stream-name myFirstStream --partition-key 123 --data "{\"var1\": \"test123\"}"
}}
※受信データがログ出力されているか確認する。

#html(</div>)

#html(</div>)


** 動作確認 - その2 [#kcfa0f46]
#html(<div style="padding-left:10px">)

Kinesis自体の動作確認はこんな所だが、AWS Java SDK に慣れる意味で、もう少しだけ弄ってみる。
ここでは、受信したデータを DynamoDB にそのまま登録するように処理を追加してみる。

*** テーブルの作成 [#jffb4773]
#html(<div style="padding-left:10px">)
ddl/KinesisReceivedData.json
#mycode2(){{
{
    "TableName": "KinesisReceivedData",
    "AttributeDefinitions": [
        { "AttributeName": "id", "AttributeType": "S" }
    ],  
    "KeySchema": [
        { "AttributeName": "id", "KeyType": "HASH" }
    ],  
    "ProvisionedThroughput": { "WriteCapacityUnits": 1, "ReadCapacityUnits": 1 }
}
}}

AWS CLI でテーブル作成
#myterm2(){{
aws dynamodb create-table --cli-input-json file://ddl/KinesisReceivedData.json
}}

#html(</div>)


*** AmazonKinesisApplicationSampleRecordProcessor.java の修正 [#yda0efae]
#html(<div style="padding-left:10px">)

AWS Java SDK を使用して、先ほど作成したテーブルに受信データを登録するように処理を変更する。
※参考: https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/index.html
※関連: [[AWS Java SDKでDynamoDBのCRUDを書いてみる]]

AmazonKinesisApplicationSampleRecordProcessor.java
#mycode2(){{
.
.

import com.google.gson.Gson;

import java.time.format.DateTimeFormatter;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.PutItemRequest;
import com.amazonaws.services.dynamodbv2.model.PutItemResult;

.
.
public class AmazonKinesisApplicationSampleRecordProcessor implements IRecordProcessor {

    .
    .
    private AmazonDynamoDB amazonDynamoDB = AmazonDynamoDBClientBuilder.standard().build();                  // DynamoDBアクセス用のクライアント
    .
    .

    private void processSingleRecord(Record record) {

        String data = null;
        try {
            data = decoder.decode(record.getData()).toString();
            try {
                Map receiveJson = new Gson().fromJson(data, Map.class);
                LOG.info("##### received json : " + receiveJson + " #####");
            } catch (Exception e) {
                LOG.info("##### Received string: " + data + " #####");
            }   

            String tableName = "KinesisReceivedData";

            String id = UUID.randomUUID().toString();
            String datetime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("uuuu-MM-dd'T'HH:mm:ss.SSS"));

            Map<String,AttributeValue> item = new HashMap<String,AttributeValue>();
            item.put("id"      ,   new AttributeValue().withS(id));
            item.put("datetime",   new AttributeValue().withS(datetime));
            item.put("data"    ,   new AttributeValue().withS(data));

            PutItemResult result = amazonDynamoDB.putItem(new PutItemRequest(tableName, item));
            System.out.println(result);

        } catch (NumberFormatException e) {
            LOG.info("Record does not match sample record format. Ignoring record with data; " + data);
        } catch (CharacterCodingException e) {
            LOG.error("Malformed data: " + data, e); 
        }   
    }   
}}
#html(</div>)

*** ワーカーの起動 [#l60f011e]
#html(<div style="padding-left:10px">)
AmazonKinesisApplicationSample.java を実行する。
#myterm2(){{
gradle run
}}
#html(</div>)


*** Kinesis にテストデータを出力 [#dc9e0ff9]
#html(<div style="padding-left:10px">)

testdata.json
#mycode2(){{
{
  "var1":  "ABCDEFG",
  "timeStamp": "2018-02-12 18:50:05",
  "data": { "data1": "11111", "data2": "22222" }
}
}}

AWS CLI からKinesisにレコード出力
#myterm2(){{
aws kinesis put-record --stream-name myFirstStream --partition-key 123 --data file://testdata.json
}}
※受信データがログ出力されているか確認する。
#html(</div>)

*** テーブルにデータが登録されているか確認 [#t06bbce6]
#html(<div style="padding-left:10px">)
#myterm2(){{
aws dynamodb scan --table-name KinesisReceivedData
}}
#html(※ <a href="?AWS%20CLI%A5%B3%A5%DE%A5%F3%A5%C9%A4%CE%A5%E1%A5%E2" target="_blank">AWS CLIコマンドのメモ</a>)
#html(</div>)

#html(</div>)


#html(</div>)

トップ   差分 バックアップ リロード   一覧 単語検索 最終更新   ヘルプ   最終更新のRSS