AWSメモ >
いちおう以下にあるが、今回は Mavenで依存関係を解決するので、個別にダウンロードはしなくて良い
http://sdk-for-java.amazonwebservices.com/latest/aws-java-sdk.zip
※ Eclipse 4.4 (Luna) 以上が必要
aws kinesis create-stream --stream-name myFirstStream --shard-count 1
※サンプルのストリーム名から変える場合は、AmazonKinesisApplicationSample.java で定義されているストリーム名も書き換える。
サンプルは以下にあるが、なぜか Antベースなので、ここではMavenで作成し直す。
https://github.com/aws/aws-sdk-java/tree/master/src/samples/AmazonKinesis
やる事は、EclipseからMavenプロジェクトを作成し、amazon-kinesis-clientとgsonを依存ライブラリとして追記するだけ。
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>KclSample</groupId> <artifactId>KclSample</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <dependency> <groupId>com.amazonaws</groupId> <artifactId>amazon-kinesis-client</artifactId> <version>1.9.0</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.2</version> </dependency> </dependencies> </project>
# プロジェクトフォルダを作成 mkdir KclSample && cd KclSample # 初期化 gradle init --type java-application
build.gradle
plugins { id 'java' id 'application' } mainClassName = 'AmazonKinesisApplicationSample' dependencies { compile 'com.google.guava:guava:23.0' compile 'com.amazonaws:amazon-kinesis-client:1.9.0' compile 'com.google.code.gson:gson:2.8.2' compile group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.11.0' compile group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.11.0' testCompile 'junit:junit:4.12' } repositories { jcenter() } jar { manifest { attributes 'Main-Class': mainClassName } }
https://github.com/aws/aws-sdk-java/tree/master/src/samples/AmazonKinesis にあるサンプルから必要なファイルを取得する。
# src, test フォルダをキレイにしておく rm -rf src/main/java/* rm -rf src/test/java/* # 必要なサンプルソースをダウンロード wget https://raw.githubusercontent.com/aws/aws-sdk-java/master/src/samples/AmazonKinesis/AmazonKinesisApplicationRecordProcessorFactory.java \ -O src/main/java/AmazonKinesisApplicationRecordProcessorFactory.java wget https://raw.githubusercontent.com/aws/aws-sdk-java/master/src/samples/AmazonKinesis/AmazonKinesisApplicationSample.java \ -O src/main/java/AmazonKinesisApplicationSample.java wget https://raw.githubusercontent.com/aws/aws-sdk-java/master/src/samples/AmazonKinesis/AmazonKinesisApplicationSampleRecordProcessor.java \ -O src/main/java/AmazonKinesisApplicationSampleRecordProcessor.java
サンプルは少し古いKCLのバージョン用のものになっているのと、ローカルでワーカーを起動したいので一部変更する。
また、IRecordProcessor バージョン1用のサンプルなので、バージョン2用を試したい場合は、必要に応じて書き換える。
https://docs.aws.amazon.com/ja_jp/streams/latest/dev/kinesis-record-processor-implementation-app-java.html#kcl-java-interface-v2
AmazonKinesisApplicationSample.java の修正
public static void main(String[] args) throws Exception { . . kinesisClientLibConfiguration.withInitialPositionInStream(SAMPLE_APPLICATION_INITIAL_POSITION_IN_STREAM); kinesisClientLibConfiguration.withRegionName("ap-northeast-1"); // 東京リージョンを指定 . . public static void deleteResources() { // Delete the stream AmazonKinesis kinesis = AmazonKinesisClientBuilder.standard() .withCredentials(credentialsProvider) .withRegion("us-west-2") // 東京リージョンに変更 .build(); System.out.printf("Deleting the Amazon Kinesis stream used by the sample. Stream Name = %s.\n", SAMPLE_APPLICATION_STREAM_NAME); try { kinesis.deleteStream(SAMPLE_APPLICATION_STREAM_NAME); } catch (ResourceNotFoundException ex) { // The stream doesn't exist. } // Delete the table AmazonDynamoDB dynamoDB = AmazonDynamoDBClientBuilder.standard() .withCredentials(credentialsProvider) .withRegion("us-west-2") // 東京リージョンに変更 .build(); System.out.printf("Deleting the Amazon DynamoDB table used by the Amazon Kinesis Client Library. Table Name = %s.\n", SAMPLE_APPLICATION_NAME); try { dynamoDB.deleteTable(SAMPLE_APPLICATION_NAME); } catch (com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException ex) { // The table doesn't exist. } } </pre>
AmazonKinesisApplicationSampleRecordProcessor.java の修正
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; ↓ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
AmazonKinesisApplicationSampleRecordProcessor.java の修正
private void processSingleRecord(Record record) { // TODO Add your own record processing logic here String data = null; try { // For this app, we interpret the payload as UTF-8 chars. data = decoder.decode(record.getData()).toString(); // Assume this record came from AmazonKinesisSample and log its age. /* コメントアウト //long recordCreateTime = new Long(data.substring("testData-".length())); //long ageOfRecordInMillis = System.currentTimeMillis() - recordCreateTime; LOG.info(record.getSequenceNumber() + ", " + record.getPartitionKey() + ", " + data + ", Created " + ageOfRecordInMillis + " milliseconds ago."); */ LOG.info(record.getSequenceNumber() + ", " + record.getPartitionKey() + ", Received data: " + data); // 追加 } catch (NumberFormatException e) { LOG.info("Record does not match sample record format. Ignoring record with data; " + data); } catch (CharacterCodingException e) { LOG.error("Malformed data: " + data, e); } }
gradle run
aws kinesis put-record --stream-name myFirstStream --partition-key 123 --data "test123"
※ログに "Received data: test123" が出力されればOK。
あとは AmazonKinesisApplicationSampleRecordProcessor.java を弄っていく。
サンプルでは各レコードの処理は processSingleRecord メソッドで行われるようになっているので、このメソッドをいろいろ弄って動作確認。
とりあえず、受信したデータを DynamoDB にそのまま登録するように処理を追加してみる。
AmazonKinesisApplicationSampleRecordProcessor.java
aaa
AmazonKinesisApplicationSample.java を実行する。
注意点としては、デフォルトの aws プロファイルが使用されるので、事前にaws configureしておく事。
(別のプロファイルを使用する場合は、プロパティファイルで指定する。)
gradle run
testdata.json
{ "var1": "ABCDEFG", "timeStamp": "2018-02-12 18:50:05", "data": { "data1": "11111", "data2": "22222" } }
AWS CLI からKinesisにレコード出力
aws kinesis put-record --stream-name myFirstStream --partition-key 123 --data file://testdata.json
aws kinesis delete-stream --stream-name myFirstStream