[[AWSメモ]] >
* AWS Kinesis Client Libraryでコンシューマ開発 [#h560efd9]
#setlinebreak(on);

#contents
-- ドキュメント
--- https://docs.aws.amazon.com/ja_jp/streams/latest/dev/kinesis-record-processor-implementation-app-java.html
-- サンプル
--- https://github.com/aws/aws-sdk-java/tree/master/src/samples/AmazonKinesis

** AWS SDK for Java [#vb569f55]
#html(<div style="padding-left:10px;">)
いちおう以下にあるが、今回は Mavenで依存関係を解決するので、個別にダウンロードはしなくて良い
http://sdk-for-java.amazonwebservices.com/latest/aws-java-sdk.zip
#html(</div>)

** AWS Toolkit for Eclipse のインストール [#l0365225]
#html(<div style="padding-left:10px;">)
※ Eclipse 4.4 (Luna) 以上が必要
+[ヘルプ] > [新しいソフトウェアをインストール] を開く
+[機能するソフトウェア] に https://aws.amazon.com/eclipse と入力
+以下のリストから必要な「AWS コア管理ツール」およびその他のオプション項目を選択
+[次へ] をクリック
#html(</div>)

** 準備(AWS CLI) [#ce59cb7c]
#html(<div style="padding-left:10px">)

*** Kinesisストリームの作成 [#f5d2897c]
#html(<div style="padding-left:10px">)
#myterm2(){{
aws kinesis create-stream --stream-name myFirstStream --shard-count 1
}}
#html(</div>)

#html(</div>)
&br;

** コンシューマの作成 [#m1ed665f]
#html(<div style="padding-left:10px">)

*** Mavenプロジェクトの作成 [#ubbe6a07]
#html(<div style="padding-left:10px">)

サンプルは以下にあるが、なぜか Antベースなので、ここではMavenで作成し直す。
https://github.com/aws/aws-sdk-java/tree/master/src/samples/AmazonKinesis

やる事は、EclipseからMavenプロジェクトを作成し、amazon-kinesis-clientとgsonを依存ライブラリとして追記するだけ。

pom.xml
#myhtml2(){{
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>KclSample</groupId>
  <artifactId>KclSample</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <dependencies>
    <dependency>
      <groupId>com.amazonaws</groupId>
      <artifactId>amazon-kinesis-client</artifactId>
      <version>1.9.0</version>
    </dependency>
    <dependency>
      <groupId>com.google.code.gson</groupId>
      <artifactId>gson</artifactId>
      <version>2.8.2</version>
    </dependency>
  </dependencies>
</project>
}}
#html(</div>)

*** サンプルソースをプロジェクトにコピー [#td216f77]
#html(<div style="padding-left:10px">)
https://github.com/aws/aws-sdk-java/tree/master/src/samples/AmazonKinesis
#html(</div>)

*** サンプルの一部変更 [#z40684bd]
#html(<div style="padding-left:10px">)

サンプルは少し古いKCLのバージョン用のものになっているのと、ローカルでワーカーを起動したいので一部変更する。
また、IRecordProcessor バージョン1用のサンプルなので、バージョン2用を試したい場合は、必要に応じて書き換える。
https://docs.aws.amazon.com/ja_jp/streams/latest/dev/kinesis-record-processor-implementation-app-java.html#kcl-java-interface-v2

AmazonKinesisApplicationSample.java
#mycode2(){{
    public static void main(String[] args) throws Exception {
          ・
          ・
        KinesisClientLibConfiguration kinesisClientLibConfiguration =
                new KinesisClientLibConfiguration(SAMPLE_APPLICATION_NAME,
                        SAMPLE_APPLICATION_STREAM_NAME,
                        credentialsProvider,
                        workerId)
        .withInitialPositionInStream(SAMPLE_APPLICATION_INITIAL_POSITION_IN_STREAM)
        .withKinesisEndpoint("https://kinesis.ap-northeast-1.amazonaws.com");  // 使用するリージョンのエンドポイント
/*     LocalStackで試したかったが動かなかった・・(未調査)
        .withKinesisEndpoint("http://localhost:4568/")
        .withDynamoDBEndpoint("http://localhost:4569/");
*/
          ・
          ・
}}

AmazonKinesisApplicationSampleRecordProcessor.java
#mycode2(){{
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
↓
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
}}

あとは AmazonKinesisApplicationSampleRecordProcessor を弄っていく。

#html(</div>)


#html(</div>)

** 動作確認 [#d2c9a952]
#html(<div style="padding-left:10px">)

*** ワーカーの起動 [#l60f011e]
#html(<div style="padding-left:10px">)
AmazonKinesisApplicationSample.java を Eclipseから実行する。
注意点としては、デフォルトの aws プロファイルが使用されるので、事前にaws configureしておく事。
(別のプロファイルを使用する場合は、プロパティファイルで指定する。)
#html(</div>)

*** Kinesisへのレコード出力 [#dc9e0ff9]
#html(<div style="padding-left:10px">)

testdata.json
#mycode2(){{
{
  "var1":  "ABCDEFG",
  "timeStamp": "2018-02-12 18:50:05",
  "data": { "data1": "11111", "data2": "22222" }
}
}}

AWS CLI からKinesisにレコード出力
#myterm2(){{
aws kinesis put-record --stream-name myFirstStream --partition-key 123 --data file://testdata.json
}}
#html(</div>)


#html(</div>)

** 後片付け [#e439ff14]
#html(<div style="padding-left:10px">)

*** Kinesisストリームの削除 [#tb880751]
#html(<div style="padding-left:10px">)
#myterm2(){{
aws kinesis delete-stream --stream-name myFirstStream
}}
#html(</div>)

#html(</div>)

トップ   一覧 単語検索 最終更新   ヘルプ   最終更新のRSS