#author("2018-09-09T09:10:29+00:00","","") [[AWSメモ]] > * AWS Kinesis Client Libraryでコンシューマ開発 [#h560efd9] #setlinebreak(on); #contents -- ドキュメント --- https://docs.aws.amazon.com/ja_jp/streams/latest/dev/kinesis-record-processor-implementation-app-java.html -- サンプル --- https://github.com/aws/aws-sdk-java/tree/master/src/samples/AmazonKinesis ** AWS SDK for Java [#vb569f55] #html(<div style="padding-left:10px;">) いちおう以下にあるが、今回は Gradle または Mavenで依存関係を解決するので、個別にダウンロードはしなくて良い http://sdk-for-java.amazonwebservices.com/latest/aws-java-sdk.zip #html(</div>) ** AWS Toolkit for Eclipse のインストール [#l0365225] #html(<div style="padding-left:10px;">) ※ Eclipseで開発しない場合は不要。 ※ Eclipse 4.4 (Luna) 以上が必要 +[ヘルプ] > [新しいソフトウェアをインストール] を開く +[機能するソフトウェア] に https://aws.amazon.com/eclipse と入力 +以下のリストから必要な「AWS コア管理ツール」およびその他のオプション項目を選択 +[次へ] をクリック #html(</div>) ** 準備(AWS CLI) [#ce59cb7c] #html(<div style="padding-left:10px">) *** Kinesisストリームの作成 [#f5d2897c] #html(<div style="padding-left:10px">) #myterm2(){{ aws kinesis create-stream --stream-name myFirstStream --shard-count 1 }} ※サンプルのストリーム名から変える場合は、AmazonKinesisApplicationSample.java で定義されているストリーム名を書き換える。(後述参照) #html(</div>) #html(</div>) &br; ** プロジェクトの作成 [#p6c5829b] #html(<div style="padding-left:10px">) サンプルは以下にあるが、なぜか Antベースなので、ここではGradle または Mavenで作成し直す。 https://github.com/aws/aws-sdk-java/tree/master/src/samples/AmazonKinesis *** Gradleプロジェクトの場合[#ubbe6a07] #html(<div style="padding-left:10px">) #myterm2(){{ # プロジェクトフォルダを作成 mkdir KclSample && cd KclSample # 初期化 gradle init --type java-application }} build.gradle の編集 #mycode2(){{ plugins { id 'java' id 'application' } mainClassName = 'AmazonKinesisApplicationSample' dependencies { compile 'com.google.guava:guava:23.0' compile 'com.amazonaws:amazon-kinesis-client:1.9.0' compile 'com.google.code.gson:gson:2.8.2' compile group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.11.0' compile group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.11.0' testCompile 'junit:junit:4.12' } repositories { jcenter() } jar { manifest { attributes 'Main-Class': mainClassName } } }} #html(</div>) *** Maven(Eclipse使用)の場合 [#ubbe6a07] #html(<div style="padding-left:10px">) EclipseからMavenプロジェクトを作成し、amazon-kinesis-clientとgsonを依存ライブラリとして追記するだけ。 pom.xml #myhtml2(){{ <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>KclSample</groupId> <artifactId>KclSample</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <dependency> <groupId>com.amazonaws</groupId> <artifactId>amazon-kinesis-client</artifactId> <version>1.9.0</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.2</version> </dependency> </dependencies> </project> }} #html(</div>) #html(</div>) ** コンシューマの作成 [#o0bbdfbb] #html(<div style="padding-left:10px">) *** サンプルソースをプロジェクトにコピー [#td216f77] #html(<div style="padding-left:10px">) https://github.com/aws/aws-sdk-java/tree/master/src/samples/AmazonKinesis にあるサンプルから必要なファイルを取得する。 #myterm2(){{ # src, test フォルダをキレイにしておく rm -rf src/main/java/* rm -rf src/test/java/* # 必要なサンプルソースをダウンロード wget https://raw.githubusercontent.com/aws/aws-sdk-java/master/src/samples/AmazonKinesis/AmazonKinesisApplicationRecordProcessorFactory.java \ -O src/main/java/AmazonKinesisApplicationRecordProcessorFactory.java wget https://raw.githubusercontent.com/aws/aws-sdk-java/master/src/samples/AmazonKinesis/AmazonKinesisApplicationSample.java \ -O src/main/java/AmazonKinesisApplicationSample.java wget https://raw.githubusercontent.com/aws/aws-sdk-java/master/src/samples/AmazonKinesis/AmazonKinesisApplicationSampleRecordProcessor.java \ -O src/main/java/AmazonKinesisApplicationSampleRecordProcessor.java }} #html(</div>) *** サンプルソースの修正 [#z40684bd] #html(<div style="padding-left:10px">) サンプルは少し古いKCLのバージョン用のものになっているので一部変更する。 また、IRecordProcessor バージョン1用のサンプルなので、バージョン2用を試したい場合は、必要に応じて書き換える。 https://docs.aws.amazon.com/ja_jp/streams/latest/dev/kinesis-record-processor-implementation-app-java.html#kcl-java-interface-v2 #html(<hr>) AmazonKinesisApplicationSample.java の修正 #mycode2(){{ . . public static final String SAMPLE_APPLICATION_STREAM_NAME = "myFirstStream"; // 別のストリーム名にする場合は変える . . public static void main(String[] args) throws Exception { . . kinesisClientLibConfiguration.withInitialPositionInStream(SAMPLE_APPLICATION_INITIAL_POSITION_IN_STREAM); kinesisClientLibConfiguration.withRegionName("ap-northeast-1"); // [追加] 東京リージョンを指定 . . public static void deleteResources() { // Delete the stream AmazonKinesis kinesis = AmazonKinesisClientBuilder.standard() .withCredentials(credentialsProvider) .withRegion("ap-northeast-1") // [変更] 東京リージョンに変更 .build(); System.out.printf("Deleting the Amazon Kinesis stream used by the sample. Stream Name = %s.\n", SAMPLE_APPLICATION_STREAM_NAME); try { kinesis.deleteStream(SAMPLE_APPLICATION_STREAM_NAME); } catch (ResourceNotFoundException ex) { // The stream doesn't exist. } // Delete the table AmazonDynamoDB dynamoDB = AmazonDynamoDBClientBuilder.standard() .withCredentials(credentialsProvider) .withRegion("ap-northeast-1") // [変更] 東京リージョンに変更 .build(); System.out.printf("Deleting the Amazon DynamoDB table used by the Amazon Kinesis Client Library. Table Name = %s.\n", SAMPLE_APPLICATION_NAME); try { dynamoDB.deleteTable(SAMPLE_APPLICATION_NAME); } catch (com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException ex) { // The table doesn't exist. } } </pre> }} &br; AmazonKinesisApplicationSampleRecordProcessor.java の修正 #mycode2(){{ . . //import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; // [コメントアウト] import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; // [追加] 古いバージョンとパッケージが変わってるらしい import com.google.gson.Gson; // [追加] import java.util.Map; // [追加] . . // テキスト文字列 または Json文字列 を受け取ってログ出力するように変更。 private void processSingleRecord(Record record) { String data = null; try { data = decoder.decode(record.getData()).toString(); try { Map receiveJson = new Gson().fromJson(data, Map.class); LOG.info("##### received json : " + receiveJson + " #####"); } catch (Exception e) { LOG.info("##### Received string: " + data + " #####"); } } catch (NumberFormatException e) { LOG.info("Record does not match sample record format. Ignoring record with data; " + data); } catch (CharacterCodingException e) { LOG.error("Malformed data: " + data, e); } } }} #html(</div>) #html(</div>) ** 動作確認 [#v83f28da] #html(<div style="padding-left:10px">) *** ワーカーを起動 [#i83f7a38] #html(<div style="padding-left:10px">) #myterm2(){{ gradle run }} #html(</div>) *** Kinesis にテストデータを出力 [#dc9e0ff9] #html(<div style="padding-left:10px">) #myterm2(){{ aws kinesis put-record --stream-name myFirstStream --partition-key 123 --data "test123" aws kinesis put-record --stream-name myFirstStream --partition-key 123 --data "{\"var1\": \"test123\"}" }} ※受信データがログ出力されているか確認する。 #html(</div>) #html(</div>) ** おまけ [#kcfa0f46] #html(<div style="padding-left:10px">) Kinesis自体の動作確認はこんな所だが、AWS Java SDK に慣れる意味で、もう少しだけ弄ってみる。 ここでは、受信したデータを DynamoDB にそのまま登録するように処理を追加してみる。 *** テーブルの作成 [#jffb4773] #html(<div style="padding-left:10px">) ddl/KinesisReceivedData.json #mycode2(){{ { "TableName": "KinesisReceivedData", "AttributeDefinitions": [ { "AttributeName": "id", "AttributeType": "S" } ], "KeySchema": [ { "AttributeName": "id", "KeyType": "HASH" } ], "ProvisionedThroughput": { "WriteCapacityUnits": 1, "ReadCapacityUnits": 1 } } }} AWS CLI でテーブル作成 #myterm2(){{ aws dynamodb create-table --cli-input-json file://ddl/KinesisReceivedData.json }} #html(</div>) *** AmazonKinesisApplicationSampleRecordProcessor.java の修正 [#yda0efae] #html(<div style="padding-left:10px">) AWS Java SDK を使用して、先ほど作成したテーブルに受信データを登録するように処理を変更する。 ※参考: https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/index.html ※関連: [[AWS Java SDKでDynamoDBのCRUDを書いてみる]] AmazonKinesisApplicationSampleRecordProcessor.java #mycode2(){{ . . import com.google.gson.Gson; import java.time.format.DateTimeFormatter; import java.time.LocalDateTime; import java.util.HashMap; import java.util.Map; import java.util.UUID; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; import com.amazonaws.services.dynamodbv2.document.DynamoDB; import com.amazonaws.services.dynamodbv2.model.AttributeValue; import com.amazonaws.services.dynamodbv2.model.PutItemRequest; import com.amazonaws.services.dynamodbv2.model.PutItemResult; . . public class AmazonKinesisApplicationSampleRecordProcessor implements IRecordProcessor { . . private AmazonDynamoDB amazonDynamoDB = AmazonDynamoDBClientBuilder.standard().build(); // DynamoDBアクセス用のクライアント . . private void processSingleRecord(Record record) { String data = null; try { data = decoder.decode(record.getData()).toString(); try { Map receiveJson = new Gson().fromJson(data, Map.class); LOG.info("##### received json : " + receiveJson + " #####"); } catch (Exception e) { LOG.info("##### Received string: " + data + " #####"); } String tableName = "KinesisReceivedData"; String id = UUID.randomUUID().toString(); String datetime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("uuuu-MM-dd'T'HH:mm:ss.SSS")); Map<String,AttributeValue> item = new HashMap<String,AttributeValue>(); item.put("id" , new AttributeValue().withS(id)); item.put("datetime", new AttributeValue().withS(datetime)); item.put("data" , new AttributeValue().withS(data)); PutItemResult result = amazonDynamoDB.putItem(new PutItemRequest(tableName, item)); System.out.println(result); } catch (NumberFormatException e) { LOG.info("Record does not match sample record format. Ignoring record with data; " + data); } catch (CharacterCodingException e) { LOG.error("Malformed data: " + data, e); } } }} #html(</div>) *** ワーカーの起動 [#l60f011e] #html(<div style="padding-left:10px">) AmazonKinesisApplicationSample.java を実行する。 #myterm2(){{ gradle run }} #html(</div>) *** Kinesis にテストデータを出力 [#dc9e0ff9] #html(<div style="padding-left:10px">) testdata.json #mycode2(){{ { "var1": "ABCDEFG", "timeStamp": "2018-02-12 18:50:05", "data": { "data1": "11111", "data2": "22222" } } }} AWS CLI からKinesisにレコード出力 #myterm2(){{ aws kinesis put-record --stream-name myFirstStream --partition-key 123 --data file://testdata.json }} ※受信データがログ出力されているか確認する。 #html(</div>) *** テーブルにデータが登録されているか確認 [#t06bbce6] #html(<div style="padding-left:10px">) #myterm2(){{ aws dynamodb scan --table-name KinesisReceivedData }} #html(※ <a href="?AWS%20CLI%A5%B3%A5%DE%A5%F3%A5%C9%A4%CE%A5%E1%A5%E2" target="_blank">AWS CLIコマンドのメモ</a>) #html(</div>) #html(</div>) ** 後片付け [#e439ff14] #html(<div style="padding-left:10px">) *** Kinesisストリームの削除 [#tb880751] #html(<div style="padding-left:10px">) #myterm2(){{ aws kinesis delete-stream --stream-name myFirstStream }} #html(</div>) #html(</div>)