概要

Azure のストレージキューのデータを読み書きする実装について確認する。

尚、GoでAzureのBlobファイルを読み書き にも記載したが、azure-sdk-for-go の将来的な機能廃止に備え、ここでは azure-storage-queue-go を使用して実装する。

 目次

メッセージの取り出し方

Dequeueによるメッセージ取り出し

  • Dequeue による取り出しは可視性タイムアウトを指定して取り出す事ができ、この時間中は他の処理はメッセージを取り出し/参照する事はできない。
  • Dequeue で取り出したメッセージは削除する事が可能。
    ※ただし可視性タイムアウト経過後に、既に別の処理によってDequeue されている時は削除出来ない。(削除時のキーとなる PopReceipt が変わる為)

Dequeueによるメッセージ取り出しイメージ

azure_queue_image_dequeue.png

Peek によるメッセージ取り出し

  • Peek による取り出し時は可視性タイムアウトの指定はなく、他の処理でも同じメッセージを取り出し/参照する事ができる。
  • Peek で取り出したメッセージに対する削除操作は出来ない。

Peekによるメッセージ取り出しイメージ

azure_queue_image_peek.png

実装サンプル

package main

import (
    "context"
    "fmt"
    "log"
    "net/url"
    "os"
    "time"

    "github.com/Azure/azure-storage-queue-go/azqueue"
)

/**
 * 環境変数の取得.
 */
func getEnv(envName string, defaultValue string) string {

    value, exists := os.LookupEnv(envName)
    if exists {
        return value
    } else {
        return defaultValue
    }
}

/**
 * アカウント情報の取得
 */
func getQueueInfo() (string, string, string) {
    // デフォルトはいったんローカルエミュレータ用のアカウント名、キー
    // https://docs.microsoft.com/ja-jp/azure/storage/common/storage-use-emulator#connect-to-the-emulator-account-using-the-well-known-account-name-and-key
    accountName := getEnv("ACCOUNT_NAME" , "devstoreaccount1")
    accountKey  := getEnv("ACCOUNT_KEY"  , "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==")
    queueName   := getEnv("ACCOUNT_QUEUE", "webjobs-blobtrigger-poison")
    return accountName, accountKey, queueName
}

func getMessageCount(ctx context.Context, queueUrl azqueue.QueueURL) int32 {

    props, err := queueUrl.GetProperties(ctx)
    if err != nil {
      errorType := err.(azqueue.StorageError).ServiceCode()
      if (errorType == azqueue.ServiceCodeQueueNotFound) {
        log.Print("Queue does not exist")
      } else {
        log.Fatal("get queue properties error.", err)
      }
    }
    count := props.ApproximateMessagesCount()

    return count
}

func main(){

    accountName, accountKey, queueName := getQueueInfo()

    // ローカルエミュレータへの接続の時はURLフォーマットを変える
    queueUrlFormat := "https://%s.queue.core.windows.net/%s"
    if accountName == "devstoreaccount1" {
        queueUrlFormat = "http://127.0.0.1:10001/%s/%s"
    }
    u, _ := url.Parse(fmt.Sprintf(queueUrlFormat, accountName, queueName))

    credential, err := azqueue.NewSharedKeyCredential(accountName, accountKey)
    if err != nil {
        log.Fatal(err)
    }
    queueUrl := azqueue.NewQueueURL(*u, azqueue.NewPipeline(credential, azqueue.PipelineOptions{}))
    log.Printf("%#v", queueUrl)

    ctx := context.TODO()
    msgUrl := queueUrl.NewMessagesURL()

    // メッセージの追加
    for i := 0; i < 10; i++ {
        messageText := fmt.Sprintf("Sample Message%d %v", i, time.Now().Format(time.RFC3339))
        _, err := msgUrl.Enqueue(ctx, messageText, 0, 0)
        if err != nil {
            log.Fatal("Error Enqueue: ", err)
        }
    }

    // 属性から件数を取得
    count := getMessageCount(ctx, queueUrl)
    log.Printf("message count(prop): %d", count)

    // メッセージ一覧の取得
    maxMessages := int32(32)               // 最大件数
    visibilityTimeout := time.Second * 10  // 可視化タイムアウト

    dequeueResp, err := msgUrl.Dequeue(ctx, maxMessages, visibilityTimeout)
    if err != nil {
        log.Fatal("Error dequeue message: ", err)
    } else {
        log.Printf("message count(dequeue): %v", dequeueResp.NumMessages())
        for i := int32(0); i < dequeueResp.NumMessages(); i++ {
            // 偶数番目のメッセージを削除
            if i % 2 == 0 {
                msg := dequeueResp.Message(i)
                msgIdUrl := msgUrl.NewMessageIDURL(msg.ID)
                _, err = msgIdUrl.Delete(ctx, msg.PopReceipt)
                if err != nil {
                    log.Fatal("Error delete message: ", err)
                } else {
                    log.Print("Success delete message: ", msg.Text)
                }
            }
        }
    }

    // 属性から件数を取得 (属性から得られる件数は可視化タイムアウトに影響されない)
    count = getMessageCount(ctx, queueUrl)
    log.Printf("message count(prop): %d", count)

    // 処理直後のメッセージ一覧を取得
    peekResponse, err := msgUrl.Peek(ctx, 32)
    if err != nil {
        log.Fatal("Error peeking queue messages: ", err)
    } else {
        log.Printf("message count(peek): %v", peekResponse.NumMessages())
        for i := int32(0); i < peekResponse.NumMessages(); i++ {
            msg := peekResponse.Message(i)
            log.Printf("message(%d): %s", i, string(msg.Text))
        }
    }

    log.Printf("sleep %v\n", visibilityTimeout)
    time.Sleep(visibilityTimeout)

    // 可視化タイムアウトの秒数が経過後に再度メッセージ一覧を取得
    peekResponse, err = msgUrl.Peek(ctx, 32)
    if err != nil {
        log.Fatal("Error peeking queue messages: ", err)
    } else {
        log.Printf("message count(peek): %v", peekResponse.NumMessages())
        for i := int32(0); i < peekResponse.NumMessages(); i++ {
            msg := peekResponse.Message(i)
            log.Printf("message(%d): %s", i, string(msg.Text))
        }
    }
}

添付ファイル: fileazure_queue_image_dequeue.png 210件 [詳細] fileazure_queue_image_peek.png 225件 [詳細]

トップ   差分 バックアップ リロード   一覧 単語検索 最終更新   ヘルプ   最終更新のRSS
Last-modified: 2020-09-18 (金) 11:19:55 (1309d)