AWSメモ >

AWS Kinesis Client Libraryでコンシューマ開発

AWS SDK for Java

いちおう以下にあるが、今回は Mavenで依存関係を解決するので、個別にダウンロードはしなくて良い
http://sdk-for-java.amazonwebservices.com/latest/aws-java-sdk.zip

AWS Toolkit for Eclipse のインストール

※ Eclipse 4.4 (Luna) 以上が必要

  1. [ヘルプ] > [新しいソフトウェアをインストール] を開く
  2. [機能するソフトウェア] に https://aws.amazon.com/eclipse と入力
  3. 以下のリストから必要な「AWS コア管理ツール」およびその他のオプション項目を選択
  4. [次へ] をクリック

準備(AWS CLI)

Kinesisストリームの作成

aws kinesis create-stream --stream-name myFirstStream --shard-count 1

※サンプルのストリーム名から変える場合は、AmazonKinesisApplicationSample.java で定義されているストリーム名も書き換える。


コンシューマの作成

Maven(Eclipse使用)の場合

サンプルは以下にあるが、なぜか Antベースなので、ここではMavenで作成し直す。
https://github.com/aws/aws-sdk-java/tree/master/src/samples/AmazonKinesis

やる事は、EclipseからMavenプロジェクトを作成し、amazon-kinesis-clientとgsonを依存ライブラリとして追記するだけ。

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>KclSample</groupId>
  <artifactId>KclSample</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <dependencies>
    <dependency>
      <groupId>com.amazonaws</groupId>
      <artifactId>amazon-kinesis-client</artifactId>
      <version>1.9.0</version>
    </dependency>
    <dependency>
      <groupId>com.google.code.gson</groupId>
      <artifactId>gson</artifactId>
      <version>2.8.2</version>
    </dependency>
  </dependencies>
</project>

Gradleプロジェクトの場合

# プロジェクトフォルダを作成
mkdir KclSample && cd KclSample

# 初期化
gradle init --type java-application

build.gradle

plugins {
    id 'java'
    id 'application'
}

mainClassName = 'AmazonKinesisApplicationSample'

dependencies {
    compile 'com.google.guava:guava:23.0'
    compile 'com.amazonaws:amazon-kinesis-client:1.9.0'
    compile 'com.google.code.gson:gson:2.8.2'
    compile group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.11.0'
    compile group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.11.0'

    testCompile 'junit:junit:4.12'
}

repositories {
    jcenter()
}

jar {
    manifest {
        attributes 'Main-Class': mainClassName
    }   
} 

サンプルソースをプロジェクトにコピー

https://github.com/aws/aws-sdk-java/tree/master/src/samples/AmazonKinesis にあるサンプルから必要なファイルを取得する。

# src, test フォルダをキレイにしておく
rm -rf src/main/java/*
rm -rf src/test/java/*

# 必要なサンプルソースをダウンロード
wget https://raw.githubusercontent.com/aws/aws-sdk-java/master/src/samples/AmazonKinesis/AmazonKinesisApplicationRecordProcessorFactory.java \
    -O src/main/java/AmazonKinesisApplicationRecordProcessorFactory.java
wget https://raw.githubusercontent.com/aws/aws-sdk-java/master/src/samples/AmazonKinesis/AmazonKinesisApplicationSample.java \
    -O src/main/java/AmazonKinesisApplicationSample.java
wget https://raw.githubusercontent.com/aws/aws-sdk-java/master/src/samples/AmazonKinesis/AmazonKinesisApplicationSampleRecordProcessor.java \
    -O src/main/java/AmazonKinesisApplicationSampleRecordProcessor.java

サンプルソースの修正

サンプルは少し古いKCLのバージョン用のものになっているのと、ローカルでワーカーを起動したいので一部変更する。
また、IRecordProcessor バージョン1用のサンプルなので、バージョン2用を試したい場合は、必要に応じて書き換える。
https://docs.aws.amazon.com/ja_jp/streams/latest/dev/kinesis-record-processor-implementation-app-java.html#kcl-java-interface-v2


AmazonKinesisApplicationSample.java の修正

     public static void main(String[] args) throws Exception {
          .
          .
        kinesisClientLibConfiguration.withInitialPositionInStream(SAMPLE_APPLICATION_INITIAL_POSITION_IN_STREAM);
        kinesisClientLibConfiguration.withRegionName("ap-northeast-1");      //  東京リージョンを指定

          .
          .

    public static void deleteResources() {
        // Delete the stream
        AmazonKinesis kinesis = AmazonKinesisClientBuilder.standard()
            .withCredentials(credentialsProvider)
            .withRegion("us-west-2")                            //  東京リージョンに変更
            .build();

        System.out.printf("Deleting the Amazon Kinesis stream used by the sample. Stream Name = %s.\n",
                SAMPLE_APPLICATION_STREAM_NAME);
        try {
            kinesis.deleteStream(SAMPLE_APPLICATION_STREAM_NAME);
        } catch (ResourceNotFoundException ex) {
            // The stream doesn't exist.
        }   

        // Delete the table
        AmazonDynamoDB dynamoDB = AmazonDynamoDBClientBuilder.standard()
            .withCredentials(credentialsProvider)
            .withRegion("us-west-2")                           //  東京リージョンに変更
            .build();
        System.out.printf("Deleting the Amazon DynamoDB table used by the Amazon Kinesis Client Library. Table Name = %s.\n",
                SAMPLE_APPLICATION_NAME);
        try {
            dynamoDB.deleteTable(SAMPLE_APPLICATION_NAME);
        } catch (com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException ex) {
            // The table doesn't exist.
        }   
    }   
</pre>

AmazonKinesisApplicationSampleRecordProcessor.java の修正

import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
↓
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;

AmazonKinesisApplicationSampleRecordProcessor.java の修正

    private void processSingleRecord(Record record) {
        // TODO Add your own record processing logic here

        String data = null;
        try {
            // For this app, we interpret the payload as UTF-8 chars.
            data = decoder.decode(record.getData()).toString();
            // Assume this record came from AmazonKinesisSample and log its age.
            /* コメントアウト
            //long recordCreateTime = new Long(data.substring("testData-".length()));
            //long ageOfRecordInMillis = System.currentTimeMillis() - recordCreateTime;

            LOG.info(record.getSequenceNumber() + ", " + record.getPartitionKey() + ", " + data + ", Created "
                    + ageOfRecordInMillis + " milliseconds ago.");
            */
            LOG.info(record.getSequenceNumber() + ", " + record.getPartitionKey() + ", Received data: " + data);   // 追加

        } catch (NumberFormatException e) {
            LOG.info("Record does not match sample record format. Ignoring record with data; " + data);
        } catch (CharacterCodingException e) {
            LOG.error("Malformed data: " + data, e); 
        }   
    }   

まずはサンプルソースほぼそのままで動作確認

ワーカーを起動

gradle run

AWS CLI Kinesis へのレコード出力

aws kinesis put-record --stream-name myFirstStream --partition-key 123 --data "test123"

※ログに "Received data: test123" が出力されればOK。

もう少し修正してみる

あとは AmazonKinesisApplicationSampleRecordProcessor.java を弄っていく。
サンプルでは各レコードの処理は processSingleRecord メソッドで行われるようになっているので、このメソッドをいろいろ弄って動作確認。

とりあえず、受信したデータを DynamoDB にそのまま登録するように処理を追加してみる。
AmazonKinesisApplicationSampleRecordProcessor.java

aaa

動作確認

ワーカーの起動

AmazonKinesisApplicationSample.java を実行する。
注意点としては、デフォルトの aws プロファイルが使用されるので、事前にaws configureしておく事。
(別のプロファイルを使用する場合は、プロパティファイルで指定する。)

gradle run

Kinesisへのレコード出力

testdata.json

{
  "var1":  "ABCDEFG",
  "timeStamp": "2018-02-12 18:50:05",
  "data": { "data1": "11111", "data2": "22222" }
}

AWS CLI からKinesisにレコード出力

aws kinesis put-record --stream-name myFirstStream --partition-key 123 --data file://testdata.json

後片付け

Kinesisストリームの削除

aws kinesis delete-stream --stream-name myFirstStream

トップ   一覧 単語検索 最終更新   ヘルプ   最終更新のRSS