目次 †
概要 †・AWS SQSの標準キュー 及び FIFOキューで受信時の挙動を確認する。 結果 †先に調査、及び 検証結果を書いてしまう。
準備 †キューの作成 †参考 関連 template_normal.yml AWSTemplateFormatVersion: "2010-09-09" Description: sample sqs Resources: SampleQueue01: Type: AWS::SQS::Queue Properties: QueueName: "SampleQueue01" # キュー名(FIFOキューにする場合は末尾は .fifo にする必要がある) Outputs: SampleQueue01: Export: Name: SampleQueue01Name Value: !GetAtt SampleQueue01.QueueName template_fifo.yml AWSTemplateFormatVersion: "2010-09-09" Description: sample sqs Resources: SampleFifoQueue01: Type: AWS::SQS::Queue Properties: QueueName: "SampleQueue01.fifo" # キュー名(FIFOキューにする場合は末尾は .fifo にする必要がある) FifoQueue: true # FIFOキューにする場合はtrue #ReceiveMessageWaitTimeSeconds: 0 # 待機期間 ※ デフォルト: 0(1 〜 20) ※1以上を指定するとデフォルトでロングポーリングとなる Outputs: SampleFifoQueue01: Export: Name: SampleFifoQueue01Name Value: !GetAtt SampleFifoQueue01.QueueName キュー作成 # 標準キュー作成 aws cloudformation create-stack --stack-name SampleNormalQueue01Stack --template-body file://template_normal.yml --capabilities CAPABILITY_IAM \ && aws cloudformation wait stack-create-complete --stack-name SampleNormalQueue01Stack # FIFOキュー作成 aws cloudformation create-stack --stack-name SampleFifoQueue01Stack --template-body file://template_fifo.yml --capabilities CAPABILITY_IAM \ && aws cloudformation wait stack-create-complete --stack-name SampleFifoQueue01Stack 実装 †メッセージ送信処理 †sqs_send_messages.py mport argparse import boto3 from datetime import datetime import math import uuid import re def log(text): print('{} {}'.format(datetime.now().strftime('%H:%M:%S.%f'), text)) def main(queue_name, count): """ メッセージ送信. Args: queue_name (str): キュー名 count (int): 送信数 """ log(f"SEND START (count:{count})") sqs = boto3.resource('sqs') queue = sqs.get_queue_by_name(QueueName=queue_name) # バッチリクエストの最大件数(10件) batch_max_count = 10 # メッセージ作成、送信(バッチリクエストの最大件数分ずつ送信) all_sent_count = 0 times = math.ceil(count / batch_max_count) for i in range(times): start = batch_max_count * i end = batch_max_count * (i + 1) if end > count: end = count # メッセージ作成 entries = [{'Id': str(uuid.uuid1()), 'MessageBody': f'Sample Message {no+1}!'} for no in range(start, end)] # fifoキューの時はパラメータ追加 if re.compile(r'.*\.fifo$').match(queue_name): entries = [{**x, 'MessageGroupId': 'sample', 'MessageDeduplicationId': str(uuid.uuid4())} for x in entries] # メッセージ送信 response = queue.send_messages(Entries=entries) if 'Successful' in response: all_sent_count += len(response['Successful']) log('## {} times, sent count: {}'.format(i+1, len(response['Successful']))) else: log('error!') log(f"All Sent Count: {all_sent_count}") log("SEND END") if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('--queue', type=str, default="SampleQueue01", help='キュー名') parser.add_argument('--count', type=int, default=10, help='送信メッセージ数') args = parser.parse_args() main(args.queue, args.count) メッセージ受信処理 †sqs_receive_messages.py import argparse import boto3 from datetime import datetime import uuid def log(text): print('{} {}'.format(datetime.now().strftime('%H:%M:%S.%f'), text)) def main(queue_name, waitSec=0): """ メッセージ受信. Args: queue_name (str): キュー名 waitSec (int): 待機時間(秒) """ log(f'RECEIVE START (wait:{waitSec})') queue = boto3.resource('sqs').get_queue_by_name(QueueName=queue_name) # キューにメッセージがなくなるまで受信 process_count = 0 received_count = 0 while True: process_count += 1 messages = queue.receive_messages( MaxNumberOfMessages=10, # 最大取得数(1-10) ※0デフォルト:1 VisibilityTimeout=10, # 可視性タイムアウト WaitTimeSeconds=waitSec # メッセージ取得できるまでの待機時間 ) for m in messages: #log(f' {m.body}') m.delete() log(f'## {process_count} times, received count {len(messages)}') received_count += len(messages) if len(messages) == 0: break log(f'All Received Count: {received_count}') log('RECEIVE END') if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('--queue', type=str, default="SampleQueue01", help='キュー名') parser.add_argument('--wait', type=int, default=0, help='ロングポーリングの待ち時間(秒)') args = parser.parse_args() main(args.queue, args.wait) テスト用処理 †test_sqs.py import argparse import sqs_send_messages as send import sqs_receive_messages as receive def main(queue, count, wait): send.main(queue, count) receive.main(queue, wait) if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('--queue', type=str, default="SampleQueue01", help='キュー名') parser.add_argument('--count', type=int, default=10, help='送信メッセージ数') parser.add_argument('--wait', type=int, default=0, help='待機時間(秒)') args = parser.parse_args() main(args.queue, args.count, args.wait) 動作確認 †標準キューからの受信 (ショートポーリング) †python3 test_sqs.py --queue SampleQueue01 --count 100 --wait 0 07:24:35.494487 SEND START (count:100) 07:24:35.790417 ## 1 times, sent count: 10 07:24:35.840414 ## 2 times, sent count: 10 07:24:35.904754 ## 3 times, sent count: 10 07:24:35.953779 ## 4 times, sent count: 10 07:24:36.003688 ## 5 times, sent count: 10 07:24:36.150788 ## 6 times, sent count: 10 07:24:36.256732 ## 7 times, sent count: 10 07:24:36.303518 ## 8 times, sent count: 10 07:24:36.353442 ## 9 times, sent count: 10 07:24:36.403036 ## 10 times, sent count: 10 07:24:36.403113 All Sent Count: 100 07:24:36.403140 SEND END 07:24:36.406127 RECEIVE START (wait:0) 07:24:36.976684 ## 1 times, received count 10 07:24:37.638363 ## 2 times, received count 10 07:24:38.571013 ## 3 times, received count 10 07:24:39.829397 ## 4 times, received count 10 07:24:41.341998 ## 5 times, received count 10 07:24:42.185147 ## 6 times, received count 10 07:24:42.582848 ## 7 times, received count 10 07:24:42.997910 ## 8 times, received count 10 07:24:43.350380 ## 9 times, received count 8 07:24:43.626581 ## 10 times, received count 2 07:24:43.901359 ## 11 times, received count 6 07:24:44.108748 ## 12 times, received count 4 07:24:44.197583 ## 13 times, received count 0 07:24:44.197641 All Received Count: 100 07:24:44.197663 RECEIVE END 標準キューからの受信 (ロングポーリング) †python3 test_sqs.py --queue SampleQueue01 --count 100 --wait 5 07:25:14.042701 SEND START (count:100) 07:25:14.369250 ## 1 times, sent count: 10 07:25:14.416372 ## 2 times, sent count: 10 07:25:14.464309 ## 3 times, sent count: 10 07:25:14.509652 ## 4 times, sent count: 10 07:25:14.556672 ## 5 times, sent count: 10 07:25:14.616482 ## 6 times, sent count: 10 07:25:14.711425 ## 7 times, sent count: 10 07:25:14.762155 ## 8 times, sent count: 10 07:25:14.836459 ## 9 times, sent count: 10 07:25:14.884720 ## 10 times, sent count: 10 07:25:14.884769 All Sent Count: 100 07:25:14.884787 SEND END 07:25:14.887169 RECEIVE START (wait:5) 07:25:16.178706 ## 1 times, received count 10 07:25:17.404264 ## 2 times, received count 10 07:25:18.126479 ## 3 times, received count 6 07:25:19.163440 ## 4 times, received count 10 07:25:19.826154 ## 5 times, received count 8 07:25:20.071369 ## 6 times, received count 2 07:25:20.463368 ## 7 times, received count 10 07:25:20.852562 ## 8 times, received count 10 07:25:21.255428 ## 9 times, received count 10 07:25:21.625951 ## 10 times, received count 10 07:25:22.045919 ## 11 times, received count 6 07:25:22.589616 ## 12 times, received count 8 07:25:27.737257 ## 13 times, received count 0 07:25:27.737329 All Received Count: 100 07:25:27.737357 RECEIVE END FIFOキューからの受信(ショートポーリング) †python3 test_sqs.py --queue SampleQueue01.fifo --count 100 --wait 0 07:25:49.184567 SEND START (count:100) 07:25:49.484980 ## 1 times, sent count: 10 07:25:49.530587 ## 2 times, sent count: 10 07:25:49.570438 ## 3 times, sent count: 10 07:25:49.623459 ## 4 times, sent count: 10 07:25:49.672266 ## 5 times, sent count: 10 07:25:49.721425 ## 6 times, sent count: 10 07:25:49.771065 ## 7 times, sent count: 10 07:25:49.819264 ## 8 times, sent count: 10 07:25:49.868450 ## 9 times, sent count: 10 07:25:49.918310 ## 10 times, sent count: 10 07:25:49.918378 All Sent Count: 100 07:25:49.918402 SEND END 07:25:49.920983 RECEIVE START (wait:0) 07:25:50.631321 ## 1 times, received count 10 07:25:51.122588 ## 2 times, received count 10 07:25:51.636541 ## 3 times, received count 10 07:25:52.120406 ## 4 times, received count 10 07:25:52.732186 ## 5 times, received count 10 07:25:53.219336 ## 6 times, received count 10 07:25:53.957774 ## 7 times, received count 10 07:25:54.447207 ## 8 times, received count 10 07:25:54.906122 ## 9 times, received count 10 07:25:55.557386 ## 10 times, received count 10 07:25:55.645313 ## 11 times, received count 0 07:25:55.645380 All Received Count: 100 07:25:55.645408 RECEIVE END FIFOキューからの受信(ロングポーリング) †python3 test_sqs.py --queue SampleQueue01.fifo --count 100 --wait 5 07:26:11.185384 SEND START (count:100) 07:26:11.482303 ## 1 times, sent count: 10 07:26:11.529832 ## 2 times, sent count: 10 07:26:11.591276 ## 3 times, sent count: 10 07:26:11.639281 ## 4 times, sent count: 10 07:26:11.690327 ## 5 times, sent count: 10 07:26:11.744686 ## 6 times, sent count: 10 07:26:11.793316 ## 7 times, sent count: 10 07:26:11.846589 ## 8 times, sent count: 10 07:26:11.898975 ## 9 times, sent count: 10 07:26:11.949271 ## 10 times, sent count: 10 07:26:11.949342 All Sent Count: 100 07:26:11.949371 SEND END 07:26:11.952469 RECEIVE START (wait:5) 07:26:12.683371 ## 1 times, received count 10 07:26:13.235235 ## 2 times, received count 10 07:26:13.757123 ## 3 times, received count 10 07:26:14.272151 ## 4 times, received count 10 07:26:14.918964 ## 5 times, received count 10 07:26:15.488851 ## 6 times, received count 10 07:26:16.015367 ## 7 times, received count 10 07:26:16.518638 ## 8 times, received count 10 07:26:17.032639 ## 9 times, received count 10 07:26:17.629011 ## 10 times, received count 10 07:26:22.823679 ## 11 times, received count 0 07:26:22.823748 All Received Count: 100 07:26:22.823776 RECEIVE END |