目次

概要

・AWS SQSの標準キュー 及び FIFOキューで受信時の挙動を確認する。
・送信 及び 受信処理は boto3 の 高レベルAPI を使用する。

結果

先に調査、及び 検証結果を書いてしまう。

  • 送信
    • バッチリクエストを使用した場合でも一度に10件までしか送信できない
    • メッセージあたりのサイズは256KBまで。
       ※ 大きめのメッセージを送る場合は gzip & base64 する等してサイズを小さくする等の工夫が必要。
       ※ ドキュメント には256KBを超えるサイズのメッセージを送信する場合は Java 用 Amazon SQS 拡張クライアントライブラリ)を使えと書いてある。
  • 受信
    • バッチリクエストを使用しても一度に最大10件までしか受信できない
    • 標準キューの場合で、キューに溜まっているメッセージが少ない場合は全てを受信できない事がある。
       ※検証時の感覚では、キューに残っているメッセージが10件未満の場合は顕著。(2件程しか取得されない事が多い)
       ※標準キューの場合でキューに残っているメッセージが少ない場合は、確認用のリトライが必要。
        なので、標準キューの場合はロングポーリングはあまり意味がないかもドキュメント にもしれっとFIFOで作るように書いてある。)
       ※FIFOキューの場合は、溜まっているメッセージを全て受信できそう。
  • 料金
  • 制限

準備

キューの作成

参考
https://docs.aws.amazon.com/ja_jp/AWSCloudFormation/latest/UserGuide/aws-properties-sqs-queues.html

関連
AWS CloudFormationメモ#サンプル/SQS

template_normal.yml

AWSTemplateFormatVersion: "2010-09-09"
Description: sample sqs 
Resources:
  SampleQueue01:
    Type: AWS::SQS::Queue
    Properties:
      QueueName: "SampleQueue01"              # キュー名(FIFOキューにする場合は末尾は .fifo にする必要がある)
Outputs:
  SampleQueue01:
    Export:
      Name: SampleQueue01Name
    Value: !GetAtt SampleQueue01.QueueName

template_fifo.yml

AWSTemplateFormatVersion: "2010-09-09"
Description: sample sqs 
Resources:
  SampleFifoQueue01:
    Type: AWS::SQS::Queue
    Properties:
      QueueName: "SampleQueue01.fifo"         # キュー名(FIFOキューにする場合は末尾は .fifo にする必要がある)
      FifoQueue: true                         # FIFOキューにする場合はtrue
      #ReceiveMessageWaitTimeSeconds: 0       # 待機期間 ※ デフォルト: 0(1 〜 20) ※1以上を指定するとデフォルトでロングポーリングとなる
Outputs:
  SampleFifoQueue01:
    Export:
      Name: SampleFifoQueue01Name
    Value: !GetAtt SampleFifoQueue01.QueueName

キュー作成

# 標準キュー作成
aws cloudformation create-stack --stack-name SampleNormalQueue01Stack --template-body file://template_normal.yml --capabilities CAPABILITY_IAM \
  && aws cloudformation wait stack-create-complete --stack-name SampleNormalQueue01Stack

# FIFOキュー作成
aws cloudformation create-stack --stack-name SampleFifoQueue01Stack --template-body file://template_fifo.yml --capabilities CAPABILITY_IAM \
  && aws cloudformation wait stack-create-complete --stack-name SampleFifoQueue01Stack

実装

メッセージ送信処理

sqs_send_messages.py

mport argparse
import boto3
from datetime import datetime
import math
import uuid
import re


def log(text):
    print('{} {}'.format(datetime.now().strftime('%H:%M:%S.%f'), text))


def main(queue_name, count):
    """ 
    メッセージ送信.

    Args:
        queue_name (str): キュー名
        count (int): 送信数
    """
    log(f"SEND START (count:{count})")

    sqs = boto3.resource('sqs')
    queue = sqs.get_queue_by_name(QueueName=queue_name)

    # バッチリクエストの最大件数(10件)
    batch_max_count = 10

    # メッセージ作成、送信(バッチリクエストの最大件数分ずつ送信)
    all_sent_count = 0 
    times = math.ceil(count / batch_max_count)
    for i in range(times):

        start = batch_max_count * i 
        end   = batch_max_count * (i + 1)
        if end > count:
            end = count

        # メッセージ作成
        entries = [{'Id': str(uuid.uuid1()), 'MessageBody': f'Sample Message {no+1}!'} for no in range(start, end)]

        # fifoキューの時はパラメータ追加
        if re.compile(r'.*\.fifo$').match(queue_name):
            entries = [{**x, 'MessageGroupId': 'sample', 'MessageDeduplicationId': str(uuid.uuid4())} for x in entries]

        # メッセージ送信
        response = queue.send_messages(Entries=entries)
        if 'Successful' in response:
            all_sent_count += len(response['Successful'])
            log('## {} times, sent count: {}'.format(i+1, len(response['Successful'])))
        else:
            log('error!')

    log(f"All Sent Count: {all_sent_count}")
    log("SEND END")


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--queue', type=str, default="SampleQueue01", help='キュー名')
    parser.add_argument('--count', type=int, default=10, help='送信メッセージ数')
    args = parser.parse_args()
    main(args.queue, args.count)

メッセージ受信処理

sqs_receive_messages.py

import argparse
import boto3
from datetime import datetime
import uuid


def log(text):
    print('{} {}'.format(datetime.now().strftime('%H:%M:%S.%f'), text))


def main(queue_name, waitSec=0):
    """ 
    メッセージ受信.

    Args:
        queue_name (str): キュー名
        waitSec (int): 待機時間(秒)
    """

    log(f'RECEIVE START (wait:{waitSec})')

    queue = boto3.resource('sqs').get_queue_by_name(QueueName=queue_name)

    # キューにメッセージがなくなるまで受信
    process_count = 0 
    received_count = 0 
    while True:
        process_count += 1
        messages = queue.receive_messages(
            MaxNumberOfMessages=10,  # 最大取得数(1-10) ※0デフォルト:1
            VisibilityTimeout=10,    # 可視性タイムアウト
            WaitTimeSeconds=waitSec  # メッセージ取得できるまでの待機時間
        )   
        for m in messages:
            #log(f'  {m.body}')
            m.delete()
        log(f'## {process_count} times, received count {len(messages)}')
        received_count += len(messages)
        if len(messages) == 0:
            break

    log(f'All Received Count: {received_count}')
    log('RECEIVE END')


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--queue', type=str, default="SampleQueue01", help='キュー名')
    parser.add_argument('--wait', type=int, default=0, help='ロングポーリングの待ち時間(秒)')
    args = parser.parse_args()
    main(args.queue, args.wait)

テスト用処理

test_sqs.py

import argparse
import sqs_send_messages as send
import sqs_receive_messages as receive

def main(queue, count, wait):
    send.main(queue, count)
    receive.main(queue, wait)

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--queue', type=str, default="SampleQueue01", help='キュー名')
    parser.add_argument('--count', type=int, default=10, help='送信メッセージ数')
    parser.add_argument('--wait', type=int, default=0, help='待機時間(秒)')
    args = parser.parse_args()
    main(args.queue, args.count, args.wait)

動作確認

標準キューからの受信 (ショートポーリング)

python3 test_sqs.py --queue SampleQueue01 --count 100 --wait 0
07:24:35.494487 SEND START (count:100)
07:24:35.790417 ## 1 times, sent count: 10
07:24:35.840414 ## 2 times, sent count: 10
07:24:35.904754 ## 3 times, sent count: 10
07:24:35.953779 ## 4 times, sent count: 10
07:24:36.003688 ## 5 times, sent count: 10
07:24:36.150788 ## 6 times, sent count: 10
07:24:36.256732 ## 7 times, sent count: 10
07:24:36.303518 ## 8 times, sent count: 10
07:24:36.353442 ## 9 times, sent count: 10
07:24:36.403036 ## 10 times, sent count: 10
07:24:36.403113 All Sent Count: 100
07:24:36.403140 SEND END
07:24:36.406127 RECEIVE START (wait:0)
07:24:36.976684 ## 1 times, received count 10
07:24:37.638363 ## 2 times, received count 10
07:24:38.571013 ## 3 times, received count 10
07:24:39.829397 ## 4 times, received count 10
07:24:41.341998 ## 5 times, received count 10
07:24:42.185147 ## 6 times, received count 10
07:24:42.582848 ## 7 times, received count 10
07:24:42.997910 ## 8 times, received count 10
07:24:43.350380 ## 9 times, received count 8
07:24:43.626581 ## 10 times, received count 2
07:24:43.901359 ## 11 times, received count 6
07:24:44.108748 ## 12 times, received count 4
07:24:44.197583 ## 13 times, received count 0
07:24:44.197641 All Received Count: 100
07:24:44.197663 RECEIVE END

標準キューからの受信 (ロングポーリング)

python3 test_sqs.py --queue SampleQueue01 --count 100 --wait 5
07:25:14.042701 SEND START (count:100)
07:25:14.369250 ## 1 times, sent count: 10
07:25:14.416372 ## 2 times, sent count: 10
07:25:14.464309 ## 3 times, sent count: 10
07:25:14.509652 ## 4 times, sent count: 10
07:25:14.556672 ## 5 times, sent count: 10
07:25:14.616482 ## 6 times, sent count: 10
07:25:14.711425 ## 7 times, sent count: 10
07:25:14.762155 ## 8 times, sent count: 10
07:25:14.836459 ## 9 times, sent count: 10
07:25:14.884720 ## 10 times, sent count: 10
07:25:14.884769 All Sent Count: 100
07:25:14.884787 SEND END
07:25:14.887169 RECEIVE START (wait:5)
07:25:16.178706 ## 1 times, received count 10
07:25:17.404264 ## 2 times, received count 10
07:25:18.126479 ## 3 times, received count 6
07:25:19.163440 ## 4 times, received count 10
07:25:19.826154 ## 5 times, received count 8
07:25:20.071369 ## 6 times, received count 2
07:25:20.463368 ## 7 times, received count 10
07:25:20.852562 ## 8 times, received count 10
07:25:21.255428 ## 9 times, received count 10
07:25:21.625951 ## 10 times, received count 10
07:25:22.045919 ## 11 times, received count 6
07:25:22.589616 ## 12 times, received count 8
07:25:27.737257 ## 13 times, received count 0
07:25:27.737329 All Received Count: 100
07:25:27.737357 RECEIVE END

FIFOキューからの受信(ショートポーリング)

python3 test_sqs.py --queue SampleQueue01.fifo --count 100 --wait 0
07:25:49.184567 SEND START (count:100)
07:25:49.484980 ## 1 times, sent count: 10
07:25:49.530587 ## 2 times, sent count: 10
07:25:49.570438 ## 3 times, sent count: 10
07:25:49.623459 ## 4 times, sent count: 10
07:25:49.672266 ## 5 times, sent count: 10
07:25:49.721425 ## 6 times, sent count: 10
07:25:49.771065 ## 7 times, sent count: 10
07:25:49.819264 ## 8 times, sent count: 10
07:25:49.868450 ## 9 times, sent count: 10
07:25:49.918310 ## 10 times, sent count: 10
07:25:49.918378 All Sent Count: 100
07:25:49.918402 SEND END
07:25:49.920983 RECEIVE START (wait:0)
07:25:50.631321 ## 1 times, received count 10
07:25:51.122588 ## 2 times, received count 10
07:25:51.636541 ## 3 times, received count 10
07:25:52.120406 ## 4 times, received count 10
07:25:52.732186 ## 5 times, received count 10
07:25:53.219336 ## 6 times, received count 10
07:25:53.957774 ## 7 times, received count 10
07:25:54.447207 ## 8 times, received count 10
07:25:54.906122 ## 9 times, received count 10
07:25:55.557386 ## 10 times, received count 10
07:25:55.645313 ## 11 times, received count 0
07:25:55.645380 All Received Count: 100
07:25:55.645408 RECEIVE END

FIFOキューからの受信(ロングポーリング)

python3 test_sqs.py --queue SampleQueue01.fifo --count 100 --wait 5
07:26:11.185384 SEND START (count:100)
07:26:11.482303 ## 1 times, sent count: 10
07:26:11.529832 ## 2 times, sent count: 10
07:26:11.591276 ## 3 times, sent count: 10
07:26:11.639281 ## 4 times, sent count: 10
07:26:11.690327 ## 5 times, sent count: 10
07:26:11.744686 ## 6 times, sent count: 10
07:26:11.793316 ## 7 times, sent count: 10
07:26:11.846589 ## 8 times, sent count: 10
07:26:11.898975 ## 9 times, sent count: 10
07:26:11.949271 ## 10 times, sent count: 10
07:26:11.949342 All Sent Count: 100
07:26:11.949371 SEND END
07:26:11.952469 RECEIVE START (wait:5)
07:26:12.683371 ## 1 times, received count 10
07:26:13.235235 ## 2 times, received count 10
07:26:13.757123 ## 3 times, received count 10
07:26:14.272151 ## 4 times, received count 10
07:26:14.918964 ## 5 times, received count 10
07:26:15.488851 ## 6 times, received count 10
07:26:16.015367 ## 7 times, received count 10
07:26:16.518638 ## 8 times, received count 10
07:26:17.032639 ## 9 times, received count 10
07:26:17.629011 ## 10 times, received count 10
07:26:22.823679 ## 11 times, received count 0
07:26:22.823748 All Received Count: 100
07:26:22.823776 RECEIVE END

トップ   差分 バックアップ リロード   一覧 単語検索 最終更新   ヘルプ   最終更新のRSS
Last-modified: 2018-09-16 (日) 02:05:32 (1107d)