目次

概要

準備

キューの作成

template_normal.yml

AWSTemplateFormatVersion: "2010-09-09"
Description: sample sqs 
Resources:
  SampleQueue01:
    Type: AWS::SQS::Queue
    Properties:
      QueueName: "SampleQueue01"              # キュー名(FIFOキューにする場合は末尾は .fifo にする必要がある)
Outputs:
  SampleQueue01:
    Export:
      Name: SampleQueue01Name
    Value: !GetAtt SampleQueue01.QueueName

template_fifo.yml

AWSTemplateFormatVersion: "2010-09-09"
Description: sample sqs 
Resources:
  SampleFifoQueue01:
    Type: AWS::SQS::Queue
    Properties:
      QueueName: "SampleQueue01.fifo"         # キュー名(FIFOキューにする場合は末尾は .fifo にする必要がある)
      FifoQueue: true                         # FIFOキューにする場合はtrue
      #ReceiveMessageWaitTimeSeconds: 0       # 待機期間 ※ デフォルト: 0(1 〜 20) ※1以上を指定するとデフォルトでロングポーリングとなる
Outputs:
  SampleFifoQueue01:
    Export:
      Name: SampleFifoQueue01Name
    Value: !GetAtt SampleFifoQueue01.QueueName

実装

メッセージ送信処理

sqs_send_messages.py

import argparse
import boto3
import uuid
import re


def main(queue_name, count):
    """ 
    メッセージ送信.

    Args:
        queue_name (str): キュー名
        count (int): 送信数
    """
    sqs = boto3.resource('sqs')
    queue = sqs.get_queue_by_name(QueueName=queue_name)

    # メッセージ作成
    entries = [{'Id': str(uuid.uuid1()), 'MessageBody': f'Sample Message {i}!'} for i in range(count)]

    # fifoキューの時はパラメータ追加
    if re.compile(r'.*\.fifo$').match(queue_name):
        entries = [{**x, 'MessageGroupId': 'sample', 'MessageDeduplicationId': str(uuid.uuid4())} for x in entries]

    # メッセージ送信
    response = queue.send_messages(Entries=entries)
    if 'Successful' in response:
        print('sent message count: {}'.format(len(response['Successful'])))
    else:
        print('error!')


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--queue', type=str, default="SampleQueue01", help='キュー名')
    parser.add_argument('--count', type=int, default=10, help='送信メッセージ数')
    args = parser.parse_args()
    main(args.queue, args.count)

メッセージ受信処理

sqs_receive_messages.py

mport argparse
import boto3
from datetime import datetime
import uuid


def log(text):
    print('{} {}'.format(datetime.now().strftime('%H:%M:%S.%f'), text))


def main(queue_name, waitSec=0):
    """ 
    メッセージ受信.

    Args:
        queue_name (str): キュー名
        waitSec (int): 待機時間(秒)
    """

    log(f'receive start (wait:{waitSec})')

    queue = boto3.resource('sqs').get_queue_by_name(QueueName=queue_name)

    # キューにメッセージがなくなるまで受信
    process_count = 0 
    received_count = 0 
    while True:
        process_count += 1
        log(f'  ## times: {process_count}')
        messages = queue.receive_messages(
            MaxNumberOfMessages=10,
            VisibilityTimeout=10,
            WaitTimeSeconds=waitSec
        )   
        for m in messages:
            log(f'  {m.body}')
            m.delete()
        log('  ## received count: {} ##'.format(len(messages)))
        received_count += len(messages)
        if len(messages) == 0:
            break

    log(f'all received count: {received_count}')
    log('receive end')


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--queue', type=str, default="SampleQueue01", help='キュー名')
    parser.add_argument('--wait', type=int, default=0, help='ロングポーリングの待ち時間(秒)')
    args = parser.parse_args()
    main(args.queue, args.wait)

動作確認

標準キューからの受信 (WaitTimeSeconds=0)

# メッセージ送信
python3 sqs_send_messages.py --queue SampleQueue01
sent message count: 10

# メッセージ受信
python3 sqs_receive_messages.py --queue SampleQueue01
06:18:41.907223 receive start (wait:0)
06:18:42.161742   ## times: 1
06:18:42.219472   Sample Message 6!
06:18:42.263471   Sample Message 7!
06:18:42.305184   ## received count: 2 ##
06:18:42.305253   ## times: 2
06:18:42.338077   Sample Message 8!
06:18:42.378476   Sample Message 9!
06:18:42.413713   ## received count: 2 ##
06:18:42.413786   ## times: 3
06:18:42.455132   Sample Message 0!
06:18:42.491965   Sample Message 1!
06:18:42.534380   ## received count: 2 ##
06:18:42.534448   ## times: 4
06:18:42.571035   Sample Message 4!
06:18:42.606354   Sample Message 5!
06:18:42.645746   ## received count: 2 ##
06:18:42.645813   ## times: 5
06:18:42.694219   Sample Message 2!
06:18:42.731699   Sample Message 3!
06:18:42.766721   ## received count: 2 ##
06:18:42.766790   ## times: 6
06:18:42.850375   ## received count: 0 ##
06:18:42.850447 all received count: 10
06:18:42.850537 receive end

標準キューからの受信 (WaitTimeSeconds=5)

# メッセージ送信
python3 sqs_send_messages.py --queue SampleQueue01
sent message count: 10

# メッセージ受信
python3 sqs_receive_messages.py --queue SampleQueue01 --wait 5
06:19:29.850615 receive start (wait:5)
06:19:30.060734   ## times: 1
06:19:30.117383   Sample Message 0!
06:19:30.153267   Sample Message 1!
06:19:30.186305   ## received count: 2 ##
06:19:30.186363   ## times: 2
06:19:30.245380   Sample Message 2!
06:19:30.275071   Sample Message 3!
06:19:30.303633   ## received count: 2 ##
06:19:30.303690   ## times: 3
06:19:30.383249   Sample Message 6!
06:19:30.418233   Sample Message 7!
06:19:30.448917   ## received count: 2 ##
06:19:30.448974   ## times: 4
06:19:30.523434   Sample Message 4!
06:19:30.574284   Sample Message 5!
06:19:30.617360   ## received count: 2 ##
06:19:30.617428   ## times: 5
06:19:30.686887   Sample Message 8!
06:19:30.716161   Sample Message 9!
06:19:30.745607   ## received count: 2 ##
06:19:30.745666   ## times: 6
06:19:35.784671   ## received count: 0 ##
06:19:35.784741 all received count: 10
06:19:35.784770 receive end

FIFOキューからの受信(WaitTimeSeconds=0)

# メッセージ送信
python3 sqs_send_messages.py --queue SampleQueue01.fifo
sent message count: 10

# メッセージ受信
python3 sqs_receive_messages.py --queue SampleQueue01.fifo
06:20:41.883319 receive start (wait:0)
06:20:42.145858   ## times: 1
06:20:42.203745   Sample Message 0!
06:20:42.244354   Sample Message 1!
06:20:42.285375   Sample Message 2!
06:20:42.326636   Sample Message 3!
06:20:42.368955   Sample Message 4!
06:20:42.409999   Sample Message 5!
06:20:42.450055   Sample Message 6!
06:20:42.485745   Sample Message 7!
06:20:42.539294   Sample Message 8!
06:20:42.579761   Sample Message 9!
06:20:42.621650   ## received count: 10 ##
06:20:42.621708   ## times: 2
06:20:42.708396   ## received count: 0 ##
06:20:42.708447 all received count: 10
06:20:42.708466 receive end

FIFOキューからの受信(WaitTimeSeconds=5)

# メッセージ送信
python3 sqs_send_messages.py --queue SampleQueue01.fifo
sent message count: 10

# メッセージ受信
python3 sqs_receive_messages.py --queue SampleQueue01.fifo --wait 5
06:24:40.971442 receive start (wait:5)
06:24:41.231637   ## times: 1
06:24:41.306999   Sample Message 0!
06:24:41.375247   Sample Message 1!
06:24:41.427477   Sample Message 2!
06:24:41.475459   Sample Message 3!
06:24:41.533345   Sample Message 4!
06:24:41.582722   Sample Message 5!
06:24:41.640533   Sample Message 6!
06:24:41.719576   Sample Message 7!
06:24:41.761849   Sample Message 8!
06:24:41.805343   Sample Message 9!
06:24:41.854775   ## received count: 10 ##
06:24:41.854845   ## times: 2
06:24:46.978988   ## received count: 0 ##
06:24:46.979064 all received count: 10
06:24:46.979091 receive end

トップ   一覧 単語検索 最終更新   ヘルプ   最終更新のRSS