- 追加された行はこの色です。
- 削除された行はこの色です。
#author("2018-09-07T13:27:36+00:00","","")
[[AWSメモ]] >
* AWS Kinesis Client Libraryでコンシューマ開発 [#h560efd9]
#setlinebreak(on);
#contents
-- ドキュメント
--- https://docs.aws.amazon.com/ja_jp/streams/latest/dev/kinesis-record-processor-implementation-app-java.html
-- サンプル
--- https://github.com/aws/aws-sdk-java/tree/master/src/samples/AmazonKinesis
** AWS SDK for Java [#vb569f55]
#html(<div style="padding-left:10px;">)
いちおう以下にあるが、今回は Mavenで依存関係を解決するので、個別にダウンロードはしなくて良い
http://sdk-for-java.amazonwebservices.com/latest/aws-java-sdk.zip
#html(</div>)
** AWS Toolkit for Eclipse のインストール [#l0365225]
#html(<div style="padding-left:10px;">)
※ Eclipse 4.4 (Luna) 以上が必要
+[ヘルプ] > [新しいソフトウェアをインストール] を開く
+[機能するソフトウェア] に https://aws.amazon.com/eclipse と入力
+以下のリストから必要な「AWS コア管理ツール」およびその他のオプション項目を選択
+[次へ] をクリック
#html(</div>)
** 準備(AWS CLI) [#ce59cb7c]
#html(<div style="padding-left:10px">)
*** Kinesisストリームの作成 [#f5d2897c]
#html(<div style="padding-left:10px">)
#myterm2(){{
aws kinesis create-stream --stream-name myFirstStream --shard-count 1
}}
※サンプルのストリーム名から変える場合は、AmazonKinesisApplicationSample.java で定義されているストリーム名も書き換える。
#html(</div>)
#html(</div>)
&br;
** コンシューマの作成 [#m1ed665f]
#html(<div style="padding-left:10px">)
*** Mavenプロジェクトの作成 [#ubbe6a07]
*** Maven(Eclipse使用)の場合 [#ubbe6a07]
#html(<div style="padding-left:10px">)
サンプルは以下にあるが、なぜか Antベースなので、ここではMavenで作成し直す。
https://github.com/aws/aws-sdk-java/tree/master/src/samples/AmazonKinesis
やる事は、EclipseからMavenプロジェクトを作成し、amazon-kinesis-clientとgsonを依存ライブラリとして追記するだけ。
pom.xml
#myhtml2(){{
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>KclSample</groupId>
<artifactId>KclSample</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-client</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.2</version>
</dependency>
</dependencies>
</project>
}}
#html(</div>)
*** Gradleプロジェクトの場合[#ubbe6a07]
#html(<div style="padding-left:10px">)
#myterm2(){{
# プロジェクトフォルダを作成
mkdir KclSample && cd KclSample
# 初期化
gradle init --type java-application
}}
build.gradle
#mycode2(){{
plugins {
id 'java'
id 'application'
}
mainClassName = 'AmazonKinesisApplicationSample'
dependencies {
compile 'com.google.guava:guava:23.0'
compile 'com.amazonaws:amazon-kinesis-client:1.9.0'
compile 'com.google.code.gson:gson:2.8.2'
compile group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.11.0'
compile group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.11.0'
testCompile 'junit:junit:4.12'
}
repositories {
jcenter()
}
jar {
manifest {
attributes 'Main-Class': mainClassName
}
}
}}
#html(</div>)
*** サンプルソースをプロジェクトにコピー [#td216f77]
#html(<div style="padding-left:10px">)
https://github.com/aws/aws-sdk-java/tree/master/src/samples/AmazonKinesis
https://github.com/aws/aws-sdk-java/tree/master/src/samples/AmazonKinesis にあるサンプルから必要なファイルを取得する。
#myterm2(){{
# src, test フォルダをキレイにしておく
rm -rf src/main/java/*
rm -rf src/test/java/*
# 必要なサンプルソースをダウンロード
wget https://raw.githubusercontent.com/aws/aws-sdk-java/master/src/samples/AmazonKinesis/AmazonKinesisApplicationRecordProcessorFactory.java \
-O src/main/java/AmazonKinesisApplicationRecordProcessorFactory.java
wget https://raw.githubusercontent.com/aws/aws-sdk-java/master/src/samples/AmazonKinesis/AmazonKinesisApplicationSample.java \
-O src/main/java/AmazonKinesisApplicationSample.java
wget https://raw.githubusercontent.com/aws/aws-sdk-java/master/src/samples/AmazonKinesis/AmazonKinesisApplicationSampleRecordProcessor.java \
-O src/main/java/AmazonKinesisApplicationSampleRecordProcessor.java
}}
#html(</div>)
*** サンプルの一部変更 [#z40684bd]
*** サンプルソースの修正 [#z40684bd]
#html(<div style="padding-left:10px">)
サンプルは少し古いKCLのバージョン用のものになっているのと、ローカルでワーカーを起動したいので一部変更する。
また、IRecordProcessor バージョン1用のサンプルなので、バージョン2用を試したい場合は、必要に応じて書き換える。
https://docs.aws.amazon.com/ja_jp/streams/latest/dev/kinesis-record-processor-implementation-app-java.html#kcl-java-interface-v2
AmazonKinesisApplicationSample.java
#html(<hr>)
AmazonKinesisApplicationSample.java の修正
#mycode2(){{
public static void main(String[] args) throws Exception {
・
・
KinesisClientLibConfiguration kinesisClientLibConfiguration =
new KinesisClientLibConfiguration(SAMPLE_APPLICATION_NAME,
SAMPLE_APPLICATION_STREAM_NAME,
credentialsProvider,
workerId)
.withInitialPositionInStream(SAMPLE_APPLICATION_INITIAL_POSITION_IN_STREAM)
.withKinesisEndpoint("https://kinesis.ap-northeast-1.amazonaws.com"); // 使用するリージョンのエンドポイント
/* LocalStackで試したかったが動かなかった・・(未調査)
.withKinesisEndpoint("http://localhost:4568/")
.withDynamoDBEndpoint("http://localhost:4569/");
*/
・
・
public static void main(String[] args) throws Exception {
.
.
kinesisClientLibConfiguration.withInitialPositionInStream(SAMPLE_APPLICATION_INITIAL_POSITION_IN_STREAM);
kinesisClientLibConfiguration.withRegionName("ap-northeast-1"); // 東京リージョンを指定
.
.
public static void deleteResources() {
// Delete the stream
AmazonKinesis kinesis = AmazonKinesisClientBuilder.standard()
.withCredentials(credentialsProvider)
.withRegion("us-west-2") // 東京リージョンに変更
.build();
System.out.printf("Deleting the Amazon Kinesis stream used by the sample. Stream Name = %s.\n",
SAMPLE_APPLICATION_STREAM_NAME);
try {
kinesis.deleteStream(SAMPLE_APPLICATION_STREAM_NAME);
} catch (ResourceNotFoundException ex) {
// The stream doesn't exist.
}
// Delete the table
AmazonDynamoDB dynamoDB = AmazonDynamoDBClientBuilder.standard()
.withCredentials(credentialsProvider)
.withRegion("us-west-2") // 東京リージョンに変更
.build();
System.out.printf("Deleting the Amazon DynamoDB table used by the Amazon Kinesis Client Library. Table Name = %s.\n",
SAMPLE_APPLICATION_NAME);
try {
dynamoDB.deleteTable(SAMPLE_APPLICATION_NAME);
} catch (com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException ex) {
// The table doesn't exist.
}
}
</pre>
}}
AmazonKinesisApplicationSampleRecordProcessor.java
#html(<hr>)
AmazonKinesisApplicationSampleRecordProcessor.java の修正
#mycode2(){{
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
↓
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
}}
あとは AmazonKinesisApplicationSampleRecordProcessor を弄っていく。
AmazonKinesisApplicationSampleRecordProcessor.java の修正
#mycode2(){{
private void processSingleRecord(Record record) {
// TODO Add your own record processing logic here
String data = null;
try {
// For this app, we interpret the payload as UTF-8 chars.
data = decoder.decode(record.getData()).toString();
// Assume this record came from AmazonKinesisSample and log its age.
/* コメントアウト
//long recordCreateTime = new Long(data.substring("testData-".length()));
//long ageOfRecordInMillis = System.currentTimeMillis() - recordCreateTime;
LOG.info(record.getSequenceNumber() + ", " + record.getPartitionKey() + ", " + data + ", Created "
+ ageOfRecordInMillis + " milliseconds ago.");
*/
LOG.info(record.getSequenceNumber() + ", " + record.getPartitionKey() + ", Received data: " + data); // 追加
} catch (NumberFormatException e) {
LOG.info("Record does not match sample record format. Ignoring record with data; " + data);
} catch (CharacterCodingException e) {
LOG.error("Malformed data: " + data, e);
}
}
}}
#html(</div>)
#html(</div>)
** まずはサンプルソースほぼそのままで動作確認 [#v83f28da]
#html(<div style="padding-left:10px">)
*** ワーカーを起動 [#i83f7a38]
#html(<div style="padding-left:10px">)
#myterm2(){{
gradle run
}}
#html(</div>)
*** AWS CLI Kinesis へのレコード出力 [#dc9e0ff9]
#html(<div style="padding-left:10px">)
#myterm2(){{
aws kinesis put-record --stream-name myFirstStream --partition-key 123 --data "test123"
}}
#html(</div>)
※ログに "Received data: test123" が出力されればOK。
#html(</div>)
** もう少し修正してみる [#kcfa0f46]
#html(<div style="padding-left:10px">)
あとは AmazonKinesisApplicationSampleRecordProcessor.java を弄っていく。
サンプルでは各レコードの処理は processSingleRecord メソッドで行われるようになっているので、このメソッドをいろいろ弄って動作確認。
とりあえず、受信したデータを DynamoDB にそのまま登録するように処理を追加してみる。
AmazonKinesisApplicationSampleRecordProcessor.java
#mycode2(){{
aaa
}}
#html(</div>)
** 動作確認 [#d2c9a952]
#html(<div style="padding-left:10px">)
*** ワーカーの起動 [#l60f011e]
#html(<div style="padding-left:10px">)
AmazonKinesisApplicationSample.java を Eclipseから実行する。
AmazonKinesisApplicationSample.java を実行する。
注意点としては、デフォルトの aws プロファイルが使用されるので、事前にaws configureしておく事。
(別のプロファイルを使用する場合は、プロパティファイルで指定する。)
#myterm2(){{
gradle run
}}
#html(</div>)
*** Kinesisへのレコード出力 [#dc9e0ff9]
#html(<div style="padding-left:10px">)
testdata.json
#mycode2(){{
{
"var1": "ABCDEFG",
"timeStamp": "2018-02-12 18:50:05",
"data": { "data1": "11111", "data2": "22222" }
}
}}
AWS CLI からKinesisにレコード出力
#myterm2(){{
aws kinesis put-record --stream-name myFirstStream --partition-key 123 --data file://testdata.json
}}
#html(</div>)
#html(</div>)
** 後片付け [#e439ff14]
#html(<div style="padding-left:10px">)
*** Kinesisストリームの削除 [#tb880751]
#html(<div style="padding-left:10px">)
#myterm2(){{
aws kinesis delete-stream --stream-name myFirstStream
}}
#html(</div>)
#html(</div>)