概要

Queueトリガーを使用してメッセージを他のキューにコピーする方法について記載する。

Queueトリガーを利用する際の注意点

Queueトリガーを利用する場合、以下の点に注意する必要がある。

  • Queueトリガーで Functions を起動した場合、正常終了、異常終了に関わらず元キューの対象メッセージは自動的に削除される。
  • Queueトリガーのエラー時にはリトライが行われるが、全てのリトライに失敗した場合は有害キューに出力される。(元キューには残らない
     ※有害キューの名前は {元キュー名}-poison
  • Queueトリガーで関数を起動をする場合は、キューメッセージは BASE64エンコードされている必要がある
     ※BASE64エンコードされていない場合はエラーになる。(関数の処理まで到達しない)

構築イメージ

上記の注意点を踏まえて、下図の通り構築する。

元キューにメッセージが追加されたら Queue トリガーで 関数アプリを起動して 他の2つのキューにメッセージをコピーする。

image1.png

何らかの要因で関数アプリが動作していなかった場合を考慮して、Timerトリガーで定期的に元キューを監視しておく。
こちらは元キューに残っている全てのメッセージを取得して、外の2つのキューにコピーする。(ただし当サンプルでは最大32メッセージまで)

proc_image2.png

Queue トリガーでエラーになったメッセージは元キューには残らず、有害キューに出力されるので、有害キューを定期的に処理する関数も用意しておく。
尚、有害キューの名前は {元キュー名}-poison となる。

proc_image3.png

目次

作成するファイル

.
├─ 0_env.sh
├─ 1_resources.sh
├─ 2_put_message.sh
├─ X1_local_start.sh
├─ X2_local_message_put.sh
├─ X3_local_message_peek.sh
├─ X4_local_message_clear.sh
├─ functions
│   ├─ server.go
│   ├─ host.json
│   └─ local.settings.json
│   ├─ RelayQueue
│   │   └─ function.json
│   ├─ RelayQueueAll
│   │   └─ function.json
│   ├─ RelayPoisonQueue
│   │   └─ function.json
 
... リソース名の定義など
... リソース作成用のシェル
... キューへのメッセージ出力用のシェル
... (ローカル確認用) 関数アプリ起動用シェル
... (ローカル確認用) ストレージエミュレータへのメッセージ出力用シェル
... (ローカル確認用) ストレージエミュレータのキューに溜まっているメッセージの確認用シェル
... (ローカル確認用) ストレージエミュレータのキューに溜まっているメッセージをクリアするシェル
 
... 関数ソース(Goによるカスタムハンドラー)
... 関数アプリの定義ファイル
... ローカル実行用の設定ファイル
 
... 元キューにメッセージが出力されたタイミングで起動する関数の定義(Queueトリガー)
 
... 元キューに残っているメッセージを定期的に処理する為の関数の定義(Timerトリガー)
 
... 有害キューに溜まっているメッセージを定期的に処理する為の関数の定義(Timerトリガー)

関数ソース(functions配下)

host.json

{
    "version": "2.0",
    "httpWorker": {
        "description": {
            "defaultExecutablePath": "server.exe"
        }   
    },  
    "extensions": {
        "queues": {
            "maxPollingInterval": "00:00:10",
            "visibilityTimeout" : "00:00:00",
            "batchSize": 16, 
            "maxDequeueCount": 5,
            "newBatchThreshold": 8
        }   
    },  
    "extensionBundle": {
        "id": "Microsoft.Azure.Functions.ExtensionBundle",
        "version": "[1.*, 2.0.0)"
    }   
}

local.settings.json

{
    "IsEncrypted": false,
    "Values": {
      "AzureWebJobsStorage": "UseDevelopmentStorage=true",
      "ACCOUNT_NAME": "devstoreaccount1",
      "ACCOUNT_KEY": "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==",
      "BASE_QUEUE": "my-base-queue",
      "RELAY_QUEUE1": "my-queue1",
      "RELAY_QUEUE2": "my-queue2"
    }   
}

※アカウントキーはローカルエミュレータ(Azurite)用のもの。

server.go

package main

import (
    "context"
    "encoding/base64"
    "encoding/json"
    "fmt"
    "log"
    "math/rand"
    "net/http"
    "net/url"
    "os"
    "strings"
    "time"
    "github.com/Azure/azure-storage-queue-go/azqueue"
)

type ReturnValue struct {
    Data string
}
type InvokeResponse struct {
    Outputs     map[string]interface{}
    Logs        []string
    ReturnValue interface{}
}

type InvokeRequest struct {
    Data     map[string]interface{}
    Metadata map[string]interface{}
}

func printDebug(format string, params ...interface{}){
    log.SetOutput(os.Stdout)
    msg := fmt.Sprintf(format, params...)
    log.Printf("[DEBUG] %s\n", msg)
}

func printInfo(format string, params ...interface{}){
    log.SetOutput(os.Stdout)
    msg := fmt.Sprintf(format, params...)
    log.Printf("[INFO] %s\n", msg)
}

func printError(format string, params ...interface{}){
    log.SetOutput(os.Stderr)
    msg := fmt.Sprintf(format, params...)
    log.Printf("[ERROR] %s\n", msg)
    log.SetOutput(os.Stdout)
}

func init(){
    log.SetOutput(os.Stdout)
    log.SetFlags(0)
}

func isLocalAccount(accountName string) bool {
    return accountName == "devstoreaccount1"
}

func getQueueUrlFormat(accountName string) string {
    queueUrlFormat := "https://%s.queue.core.windows.net/%s"
    if isLocalAccount(accountName) {
        queueUrlFormat = "http://127.0.0.1:10001/%s/%s"
    }
    return queueUrlFormat
}


/**
 * 環境変数の取得.
 */
func getEnv(envName string, defaultValue string) string {
    value, exists := os.LookupEnv(envName)
    if exists {
        return value
    } else {
        return defaultValue
    }
}

/**
 * キューアカウント情報の取得
 */
func getRelayQueueInfo() (string, string, string, []string) {
    accountName := getEnv("ACCOUNT_NAME", "")
    accountKey  := getEnv("ACCOUNT_KEY" , "")
    queueName0  := getEnv("BASE_QUEUE" , "")
    queueName1  := getEnv("RELAY_QUEUE1" , "")
    queueName2  := getEnv("RELAY_QUEUE2" , "")
    return accountName, accountKey, queueName0, []string{queueName1, queueName2}
}

/**
 * 過剰なエスケープを除去する.
 */
func removeExcessEscape(queueItem string) string {
    queueItem = strings.TrimRight(queueItem, "\"")
    queueItem = strings.TrimLeft(queueItem, "\"")
    queueItem = strings.Replace(queueItem, "\\r\\n", "\n", -1)
    queueItem = strings.Replace(queueItem, "\\n", "\n", -1)
    queueItem = strings.Replace(queueItem, "\\\"", "\"", -1)
    queueItem = strings.Replace(queueItem, "\\\\\"", "", -1)
    return queueItem
}

/**
 * 指定されたキューにメッセージを出力する.
 */
func relayQueueMessage(accountName string, accountKey string, queueName string, queueItem string){

    queueUrlFormat := getQueueUrlFormat(accountName)
    u, _ := url.Parse(fmt.Sprintf(queueUrlFormat, accountName, queueName))
    credential, err := azqueue.NewSharedKeyCredential(accountName, accountKey)
    if err != nil {
        log.Fatal(err)
    }
    queueUrl := azqueue.NewQueueURL(*u, azqueue.NewPipeline(credential, azqueue.PipelineOptions{}))

    ctx := context.TODO()
    msgUrl := queueUrl.NewMessagesURL()
    _, err = msgUrl.Enqueue(ctx, queueItem, 0, 0)
    if err != nil {
        log.Fatal("Error Enqueue: ", err)
        printInfo("Error relay queue. (%s), error: %v", queueName, err)
    } else {
        printInfo("Success relay queue. (%s)", queueName)
    }
}


/**
 * キューメッセージのコピー.
 */
func relayQueueHandler(w http.ResponseWriter, r *http.Request) {

    printInfo("[relayQueueHandler] START")

    defer func(){
        err := recover()
        if err != nil {
            printError("ERROR relayQueueHandler: %v", err)
            panic(err)
        } else {
            printInfo("SUCCESS relayQueueHandler")
        }
    }()

    outputs := make(map[string]interface{})

    var invokeReq InvokeRequest
    d := json.NewDecoder(r.Body)
    decodeErr := d.Decode(&invokeReq)
    if decodeErr != nil {
        http.Error(w, decodeErr.Error(), http.StatusBadRequest)
        return
    }

    // 何回目の試行か
    dequeueCount := fmt.Sprintf("%v",invokeReq.Metadata["DequeueCount"])

    printInfo("[relayQueueHandler] dequeueCount: %v", dequeueCount)
    printDebug("[relayQueueHandler] invokeReq: %v", invokeReq)
    printDebug("[relayQueueHandler] queue metadata: %v", invokeReq.Metadata)

    // テスト用に一定の確率でエラーにする
    rand.Seed(time.Now().UnixNano())
    if rand.Intn(100) >= 50 || dequeueCount != "1" {
        panic("Random Error!")
    }

    // キューメッセージを取得(BASE64デコードされた状態で取得される)
    queueItem := invokeReq.Data["queueItem"].(string)

    // 過剰エスケープを除去
    decodedItem := removeExcessEscape(queueItem)
    printInfo("[relayQueueHandler] queue message(Value): %v", decodedItem)

    // キューメッセージをコピー(自動的にBASE64エンコードされる)
    outputs["relayqueue1"] = decodedItem
    outputs["relayqueue2"] = decodedItem

    // 正常終了
    invokeResponse := InvokeResponse{Outputs: outputs, Logs: []string{} }
    js, err := json.Marshal(invokeResponse)
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    w.Header().Set("Content-Type", "application/json")
    w.Write(js)
    //http.Error(w, "process relayQueueHandler.", http.StatusInternalServerError)

    printInfo("[relayQueueHandler] END")
}


/**
 * 元キューに溜まっているメッセージをすべて処理する.
 */
func relayQueueAllHandler(w http.ResponseWriter, r *http.Request) {

    printInfo("[relayQueueAllHandler] START")

    // キューのアカウント情報等を取得
    accountName, accountKey, baseQueue, queueNames := getRelayQueueInfo()

    // メッセージのコピーを実行
    retryRelayQueue(accountName, accountKey, baseQueue, queueNames)

    printInfo("[relayQueueAllHandler] END")

    w.Header().Set("Content-Type", "application/json")
    w.Write([]byte("{\"message\": \"relayQueue\"}"))
}


/**
 * 有害キューに溜まっているメッセージをすべて処理する.
 */
func relayPoisonQueueHandler(w http.ResponseWriter, r *http.Request) {

    printInfo("[relayPoisonQueueHandler] START")

    // キューのアカウント情報等を取得
    accountName, accountKey, baseQueue, queueNames := getRelayQueueInfo()
    poisonQueue := fmt.Sprintf("%s-poison", baseQueue)

    // メッセージのコピーを実行
    retryRelayQueue(accountName, accountKey, poisonQueue, queueNames)

    printInfo("[relayPoisonQueueHandler] END")

    w.Header().Set("Content-Type", "application/json")
    w.Write([]byte("{\"message\": \"relayQueue\"}"))
}


/**
 * キューに溜まっているメッセージをリレー先キューにコピーする.
 */
func retryRelayQueue(accountName string, accountKey string, targetQueue string, queueNames []string) {

    printInfo("[retryRelayQueue] START")

    // キューURLの取得
    credential, err := azqueue.NewSharedKeyCredential(accountName, accountKey)
    if err != nil {
        panic(fmt.Sprintf("NewSharedKeyCredential error: %v", err))
    }
    queueUrlFormat := getQueueUrlFormat(accountName)
    u, _ := url.Parse(fmt.Sprintf(queueUrlFormat, accountName, targetQueue))
    queueUrl := azqueue.NewQueueURL(*u, azqueue.NewPipeline(credential, azqueue.PipelineOptions{}))

    // メッセージ一覧を取得
    queueCtx := context.TODO()
    maxMessages := int32(32)               // 最大件数
    visibilityTimeout := time.Second * 10  // 可視性タイムアウト
    msgUrl := queueUrl.NewMessagesURL()
    dequeueResp, err := msgUrl.Dequeue(queueCtx, maxMessages, visibilityTimeout)
    if err != nil {
        panic(err)
    } else {

        // 残っているメッセージを全て処理
        for i := int32(0); i < dequeueResp.NumMessages(); i++ {

            // リレー用のキューにコピー
            msg := dequeueResp.Message(i)

            // メッセージをいったんBASE64デコード
            // (元メッセージがエンコードされていないケースも想定してデコード&エンコードし直しておく)
            planTextMessage := ""
            messageBytes, err := base64.StdEncoding.DecodeString(msg.Text)
            if err != nil {
                planTextMessage = msg.Text
            } else {
                planTextMessage = removeExcessEscape(string(messageBytes))
            }
            printDebug("queueMessage: %s", msg.Text)
            printDebug("textMessage : %s", planTextMessage)

            // メッセージを再度BASE64エンコード
            encodedMessage := base64.StdEncoding.EncodeToString([]byte(planTextMessage))

            // メッセージ(JSON)をさらに構造体に変換する場合
            //var queueData map[string]interface{}
            //decodeErr := json.Unmarshal([]byte(planTextMessage), &queueData)
            //if decodeErr != nil {
            //    printError("json decode error: %v",decodeErr.Error())
            //    panic(decodeErr)
            //}
            //printDebug("%v", queueData)
            //message := queueData["message"].(string)
            //printDebug("copy to %v, queueItem: %s, message: %s", queueNames, queueItem, message)

            // メッセージを対象のキューにコピー
            for _, queueName := range queueNames {
                printDebug("copy to %s, queueItem: %s", queueName, planTextMessage)
                relayQueueMessage(accountName, accountKey, queueName, encodedMessage)
            }

            // 元キューからメッセージを削除
            msgIdUrl := msgUrl.NewMessageIDURL(msg.ID)
            _, err = msgIdUrl.Delete(queueCtx, msg.PopReceipt)
            if err != nil {
                printError("Error delete message. %s, %v", planTextMessage, err)
            } else {
                printInfo("Success delete message %s", planTextMessage)
            }
        }
    }

    printInfo("[retryRelayQueue] END")
}

/**
 * メイン処理.
 */
func main() {
    httpInvokerPort, exists := os.LookupEnv("FUNCTIONS_HTTPWORKER_PORT")
    if exists {
        printInfo("FUNCTIONS_HTTPWORKER_PORT: " + httpInvokerPort)
    }
    mux := http.NewServeMux()
    mux.HandleFunc("/RelayQueue", relayQueueHandler)
    mux.HandleFunc("/RelayQueueAll", relayQueueAllHandler)
    mux.HandleFunc("/RelayPoisonQueue", relayPoisonQueueHandler)
    log.Println("Go server Listening...on httpInvokerPort:", httpInvokerPort)
    log.Fatal(http.ListenAndServe(":"+httpInvokerPort, mux))
}

RelayQueue/function.json

{
  "bindings": [
    {   
      "name": "queueItem",
      "type": "queueTrigger",
      "direction": "in",
      "queueName": "my-base-queue",
      "connection": "AzureWebJobsStorage"
    },  
    {   
      "name": "relayqueue1",
      "type": "queue",
      "direction": "out",
      "queueName": "my-queue1",
      "connection": "AzureWebJobsStorage"    
    },  
    {   
      "name": "relayqueue2",
      "type": "queue",
      "direction": "out",
      "queueName": "my-queue2",
      "connection": "AzureWebJobsStorage"    
    }   
  ]
}

元キューの定期処理用の定義。ここでは 12時間間隔とした。

RelayQueueAll/function.json

{
  "bindings": [
    {   
      "name": "myTimer",
      "type": "timerTrigger",
      "direction": "in",
      "schedule": "0 0 */12 * * *",
      "useMonitor": true
    }   
  ]
}

有害キューの定期処理用の定義。ここではテスト用に1分間隔とした。

RelayPoisonQueue/function.json

{
  "bindings": [
    {   
      "name": "myTimer",
      "type": "timerTrigger",
      "direction": "in",
      "schedule": "0 * * * * *",
      "useMonitor": true
    }   
  ]
}

デプロイ/動作確認用シェル

0_env.sh

#!/bin/bash

# 全てのリソース名に付与する接頭文字 (Storageアカウント名などは世界でユニークな必要があるので他ユーザと被らないような名前を付ける)
PREFIX=xxxxxxxxxx

# サブスクリプションID (Application Insight を使用しない場合は空でも可)
subscriptionId=xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx

# リージョン
region=japanwest
insightsRegion=japaneast # 2020/7時点では Appplication Insights で西日本(japanwest)は使用できない

# リソースグループ名
resourceGroup=${PREFIX}ResourceGroup

# ストレージアカウント名
storageAccountName=${PREFIX}straccount
storageSku=Standard_LRS

# Storageキュー名
storageQueue0=my-base-queue
storageQueue1=my-queue1
storageQueue2=my-queue2

# 関数アプリ名
funcAppName1=${PREFIX}RelayFunc
funcAppName2=${PREFIX}Queue1Func

# 使用するFunctionsのバージョン
funcVersion=2

# 関数アプリのプラン名
funcPlanName=${PREFIX}plan
#funcPlanSku=EP1
funcPlanSku=B1

# Application insights
insightsName=${PREFIX}Insights
insightsDays=30
insightsSetting=""
if [ "$subscriptionId" != "" ]; then
  insightsSetting="--app-insights $insightsName"
fi

1_resources.sh

#!/bin/bash

source ./0_env.sh

# リソースの作成
if [ "$1" == "--create" ]; then

    #-------------------------------
    # リソースグループの作成
    #-------------------------------
    echo az group create
    az group create \
      --name $resourceGroup \
      --location $region

    #-------------------------------
    # ストレージアカウントの作成
    #-------------------------------
    echo az storage account create
    az storage account create \
      --name $storageAccountName \
      --location $region \
      --resource-group $resourceGroup \
      --sku $storageSku

    # ストレージアカウントキーを取得
    storageAccountKey=`az storage account keys list -n $storageAccountName -g $resourceGroup -o table | grep key1 | awk '{print $NF}'`

    #-------------------------------
    # Storageキューの作成
    # ※-poisonキューはエラー時に自動で作成されるが、先に作成しておいても問題ない。
    #-------------------------------
    echo "az storage queue create"
    az storage queue create --name $storageQueue0 --account-name $storageAccountName --account-key $storageAccountKey
    az storage queue create --name ${storageQueue0}-poison --account-name $storageAccountName --account-key $storageAccountKey
    az storage queue create --name $storageQueue1 --account-name $storageAccountName --account-key $storageAccountKey
    az storage queue create --name $storageQueue2 --account-name $storageAccountName --account-key $storageAccountKey

    #-------------------------------
    # Application Insights 拡張が利用できない場合は追加インストール
    #-------------------------------
    x=`az monitor app-insights --help 2>&1`
    if [ "$?" != "0" ]; then
      az extension add -n application-insights
    fi

    #-------------------------------
    # Application Insights コンポーネント作成
    #-------------------------------
    echo az monitor app-insights component create
    if [ "$subscriptionId" != "" ]; then
      az monitor app-insights component create \
          --app $insightsName \
          --location $insightsRegion \
          --resource-group $resourceGroup \
          --query-access Enabled \
          --retention-time $insightsDays \
          --subscription $subscriptionId
    fi

    #-------------------------------
    # キューリレー用の関数
    #-------------------------------

    # 関数プランの作成
    echo az functionapp plan create
    az functionapp plan create \
      --name $funcPlanName \
      --resource-group $resourceGroup \
      --location $region \
      --sku $funcPlanSku

    # 関数アプリの作成
    echo az functionapp create
    az functionapp create \
      --name $funcAppName1 \
      --storage-account $storageAccountName \
      --plan $funcPlanName \
      --resource-group $resourceGroup \
      --functions-version $funcVersion $insightsSetting

    # 関数アプリの環境変数の設定
    echo "az functionapp config appsettings"
    az functionapp config appsettings set \
        --name $funcAppName1 \
        --resource-group $resourceGroup \
        --settings "ACCOUNT_NAME=$storageAccountName" \
                   "ACCOUNT_KEY=$storageAccountKey" \
                   "BASE_QUEUE=$storageQueue0" \
                   "RELAY_QUEUE1=$storageQueue1" \
                   "RELAY_QUEUE2=$storageQueue2"

    # 関数アプリのデプロイ

    # すぐにデプロイするとエラー(Timeout)になる場合がある為、少し待つ
    echo "sleep 10 seconds..."
    sleep 10

    cd functions
    exefile=`cat host.json | grep defaultExecutablePath | awk '{print $2}' | sed 's/"//g'`
    rm -rf $exefile
    GOOS=windows GOARCH=amd64 go build -o $exefile
    zip -r ../functions.zip *
    cd ../
    az functionapp deployment source config-zip -g $resourceGroup -n $funcAppName1 --src functions.zip
    rm -rf functions.zip
fi

# リソースの削除
if [ "$1" == "--delete" ]; then
    az group delete --name $resourceGroup
fi

2_put_message.sh

#!/bin/bash

source ./0_env.sh

date_text=`date "+%Y-%m-%d %H:%M:%S"`
message=`cat << _EOF_
{"message": "$date_text"}
_EOF_
`
base64_message=`echo $message | base64`

# ストレージアカウントキーを取得
storageAccountKey=`az storage account keys list -n $storageAccountName -g $resourceGroup -o table | grep key1 | awk '{print $NF}'`

# キューにメッセージをPUT
az storage message put \
    --account-name $storageAccountName \
    --account-key "$storageAccountKey" \
    --queue-name $storageQueue0 \
    --content "$base64_message"

X1_local_start.sh

#!/bin/bash

source ./0_env.sh

# ローカルのストレージエミュレータ起動
mkdir -p local_azurite
azurite --silent --location `pwd`/local_azurite &

# ストレージキュー作成
export AZURE_STORAGE_CONNECTION_STRING="UseDevelopmentStorage=true"
az storage queue create --name $storageQueue0
az storage queue create --name ${storageQueue0}-poison
az storage queue create --name $storageQueue1
az storage queue create --name $storageQueue2

# 関数アプリの起動
cd ./functions
exefile=`cat host.json | grep defaultExecutablePath | awk '{print $2}' | sed 's/"//g'`
go build -o $exefile
func start
rm -rf $exefile

export AZURE_STORAGE_CONNECTION_STRING=
cd ../

X2_local_message_put.sh

#!/bin/bash

source ./0_env.sh

date_text=`date "+%Y-%m-%d %H:%M:%S"`
message=`cat << _EOF_
{"message": "$date_text"}
_EOF_
`

base64_message=`echo $message | base64`
if [ "$1" == "--error" ]; then
  # Base64エンコードせずに出力
  base64_message="$message"
fi

export AZURE_STORAGE_CONNECTION_STRING="UseDevelopmentStorage=true"
az storage message put \
    --queue-name $storageQueue0 \
    --content "$base64_message"

export AZURE_STORAGE_CONNECTION_STRING=

X3_local_message_peek.sh

#!/bin/bash

source ./0_env.sh

export AZURE_STORAGE_CONNECTION_STRING="UseDevelopmentStorage=true"

echo "-- $storageQueue0 --"
az storage message peek --queue-name $storageQueue0 -o table --num-messages 32
echo ""

echo "-- $storageQueue1 --"
az storage message peek --queue-name $storageQueue1 -o table --num-messages 32
echo ""

echo "-- $storageQueue2 --"
az storage message peek --queue-name $storageQueue2 -o table --num-messages 32
echo ""

echo "-- ${storageQueue0}-poison --"
az storage message peek --queue-name ${storageQueue0}-poison -o table --num-messages 32
echo ""

export AZURE_STORAGE_CONNECTION_STRING=

X4_local_message_clear.sh

#!/bin/bash

source ./0_env.sh

export AZURE_STORAGE_CONNECTION_STRING="UseDevelopmentStorage=true"

az storage message clear -q $storageQueue0
az storage message clear -q $storageQueue1
az storage message clear -q $storageQueue2
az storage message clear -q ${storageQueue0}-poison

export AZURE_STORAGE_CONNECTION_STRING=

動作確認(ローカル)

ローカルで関数アプリ起動

./X1_local_start.sh

キューにメッセージを出力

./X2_local_message_put.sh

キュー1 及び キュー2の内容をStorageExplorerで確認

azure_queue_trigger_test1.png
azure_queue_trigger_test2.png

エラーになるメッセージをキューに出力してみる

Queueトリガーで処理されるデータはBASE64エンコードされている必要があるらしい

./X2_local_message_put.sh --error

関数アプリのコンソールを確認してみると、エラーになってから、有害キューの処理関数で正常に処理できている模様。

[2020/11/07 18:43:26] Executed 'Functions.RelayQueue' (Failed, Id=d3932930-eac9-4112-b565-824975810193)
[2020/11/07 18:43:26] System.Private.CoreLib: Exception while executing function: Functions.RelayQueue. Microsoft.Azure.WebJobs.Host: Exception binding parameter
 'queueItem'. System.Private.CoreLib: The input is not a valid Base-64 string as it contains a non-base 64 character, more than two padding characters, or an illegal character among the padding characters.
[2020/11/07 18:43:26] Executing 'Functions.RelayQueue' (Reason='New queue message detected on 'my-base-queue'.', Id=4b9c8787-c81f-4449-b0d6-80bab35c1579)
[2020/11/07 18:43:26] Trigger Details: MessageId: ad4910c8-7097-4c0b-9714-ce5ee64ff8eb, DequeueCount: 2, InsertionTime: 2020-11-07T18:43:24.000+00:00
[2020/11/07 18:43:26] Executed 'Functions.RelayQueue' (Failed, Id=4b9c8787-c81f-4449-b0d6-80bab35c1579)
[2020/11/07 18:43:26] System.Private.CoreLib: Exception while executing function: Functions.RelayQueue. Microsoft.Azure.WebJobs.Host: Exception binding parameter
 'queueItem'. System.Private.CoreLib: The input is not a valid Base-64 string as it contains a non-base 64 character, more than two padding characters, or an illegal character among the padding characters.
[2020/11/07 18:43:26] Executing 'Functions.RelayQueue' (Reason='New queue message detected on 'my-base-queue'.', Id=24dab509-29f6-483a-90ed-a9de475b80e3)
[2020/11/07 18:43:26] Trigger Details: MessageId: ad4910c8-7097-4c0b-9714-ce5ee64ff8eb, DequeueCount: 3, InsertionTime: 2020-11-07T18:43:24.000+00:00
[2020/11/07 18:43:26] Executed 'Functions.RelayQueue' (Failed, Id=24dab509-29f6-483a-90ed-a9de475b80e3)
[2020/11/07 18:43:26] System.Private.CoreLib: Exception while executing function: Functions.RelayQueue. Microsoft.Azure.WebJobs.Host: Exception binding parameter
 'queueItem'. System.Private.CoreLib: The input is not a valid Base-64 string as it contains a non-base 64 character, more than two padding characters, or an illegal character among the padding characters.
[2020/11/07 18:43:26] Executing 'Functions.RelayQueue' (Reason='New queue message detected on 'my-base-queue'.', Id=6f9410d1-67ff-4393-844c-0da005845167)
[2020/11/07 18:43:26] Trigger Details: MessageId: ad4910c8-7097-4c0b-9714-ce5ee64ff8eb, DequeueCount: 4, InsertionTime: 2020-11-07T18:43:24.000+00:00
[2020/11/07 18:43:26] Executed 'Functions.RelayQueue' (Failed, Id=6f9410d1-67ff-4393-844c-0da005845167)
[2020/11/07 18:43:26] System.Private.CoreLib: Exception while executing function: Functions.RelayQueue. Microsoft.Azure.WebJobs.Host: Exception binding parameter
 'queueItem'. System.Private.CoreLib: The input is not a valid Base-64 string as it contains a non-base 64 character, more than two padding characters, or an illegal character among the padding characters.
[2020/11/07 18:43:26] Executing 'Functions.RelayQueue' (Reason='New queue message detected on 'my-base-queue'.', Id=ddd65b87-2d5b-4a74-9a6e-a44d7b9075b4)
[2020/11/07 18:43:26] Trigger Details: MessageId: ad4910c8-7097-4c0b-9714-ce5ee64ff8eb, DequeueCount: 5, InsertionTime: 2020-11-07T18:43:24.000+00:00
[2020/11/07 18:43:26] Executed 'Functions.RelayQueue' (Failed, Id=ddd65b87-2d5b-4a74-9a6e-a44d7b9075b4)
[2020/11/07 18:43:26] System.Private.CoreLib: Exception while executing function: Functions.RelayQueue. Microsoft.Azure.WebJobs.Host: Exception binding parameter
 'queueItem'. System.Private.CoreLib: The input is not a valid Base-64 string as it contains a non-base 64 character, more than two padding characters, or an illegal character among the padding characters.
[2020/11/07 18:43:26] Message has reached MaxDequeueCount of 5. Moving message to queue 'my-base-queue-poison'.
 :
 :
[2020/11/07 18:44:00] Executing 'Functions.RelayPoisonQueue' (Reason='Timer fired at 2020-11-08T03:44:00.0050040+09:00', Id=2db55918-246c-4447-963f-c93a3185362e)
[2020/11/07 18:44:00] [INFO] [relayPoisonQueueHandler] START
[2020/11/07 18:44:00] [INFO] [retryRelayQueue] START
[2020/11/07 18:44:00] [DEBUG] queueMessage: {"message": "2020-11-08 03:43:23"}
[2020/11/07 18:44:00] [DEBUG] textMessage : {"message": "2020-11-08 03:43:23"}
[2020/11/07 18:44:00] [DEBUG] copy to my-queue1, queueItem: {"message": "2020-11-08 03:43:23"}
[2020/11/07 18:44:00] [INFO] Success relay queue. (my-queue1)
[2020/11/07 18:44:00] [DEBUG] copy to my-queue2, queueItem: {"message": "2020-11-08 03:43:23"}
[2020/11/07 18:44:00] [INFO] Success relay queue. (my-queue2)
[2020/11/07 18:44:00] [INFO] Success delete message {"message": "2020-11-08 03:43:23"}
[2020/11/07 18:44:00] [INFO] [retryRelayQueue] END
[2020/11/07 18:44:00] Executed 'Functions.RelayPoisonQueue' (Succeeded, Id=2db55918-246c-4447-963f-c93a3185362e)
[2020/11/07 18:44:00] [INFO] [relayPoisonQueueHandler] END

キューの状態を確認してみると有害キューからメッセージがなくなって、リレー先のキューにメッセージがコピーされている事がわかる。

[有害キューが処理される前の状態]

./X3_local_message_peek.sh 
-- my-base-queue --


-- my-queue1 --
MessageId                             Content                                                   InsertionTime              ExpirationTime             DequeueCount
------------------------------------  --------------------------------------------------------  -------------------------  -------------------------  --------------
3aeb0ea8-8a07-4168-9e0e-9264566bfdc0  IntcIm1lc3NhZ2VcIjogXCIyMDIwLTExLTA4IDAzOjQyOjA3XCJ9XG4i  2020-11-07T18:42:08+00:00  2020-11-14T18:42:08+00:00  0
7b8c9fc3-561d-4bde-a669-d696e0098d2e  eyJtZXNzYWdlIjogIjIwMjAtMTEtMDggMDM6NDI6MTQifQo=          2020-11-07T18:43:00+00:00  2020-11-14T18:43:00+00:00  0

-- my-queue2 --
MessageId                             Content                                                   InsertionTime              ExpirationTime             DequeueCount
------------------------------------  --------------------------------------------------------  -------------------------  -------------------------  --------------
1991cce6-8783-4823-848d-18078700ca8b  IntcIm1lc3NhZ2VcIjogXCIyMDIwLTExLTA4IDAzOjQyOjA3XCJ9XG4i  2020-11-07T18:42:08+00:00  2020-11-14T18:42:08+00:00  0
953147c2-0c83-4af3-b11b-af9b45c02b3b  eyJtZXNzYWdlIjogIjIwMjAtMTEtMDggMDM6NDI6MTQifQo=          2020-11-07T18:43:00+00:00  2020-11-14T18:43:00+00:00  0

-- my-base-queue-poison --
MessageId                             Content                             InsertionTime              ExpirationTime             DequeueCount
------------------------------------  ----------------------------------  -------------------------  -------------------------  --------------
61664ff4-5566-4722-a7e9-6b1b337cdd4d  {"message": "2020-11-08 03:43:23"}  2020-11-07T18:43:26+00:00  2020-11-14T18:43:26+00:00  0

[有害キューが処理された後の状態]

-- my-base-queue --


-- my-queue1 --
MessageId                             Content                                                   InsertionTime              ExpirationTime             DequeueCount
------------------------------------  --------------------------------------------------------  -------------------------  -------------------------  --------------
3aeb0ea8-8a07-4168-9e0e-9264566bfdc0  IntcIm1lc3NhZ2VcIjogXCIyMDIwLTExLTA4IDAzOjQyOjA3XCJ9XG4i  2020-11-07T18:42:08+00:00  2020-11-14T18:42:08+00:00  0
7b8c9fc3-561d-4bde-a669-d696e0098d2e  eyJtZXNzYWdlIjogIjIwMjAtMTEtMDggMDM6NDI6MTQifQo=          2020-11-07T18:43:00+00:00  2020-11-14T18:43:00+00:00  0
88a05f28-4db8-4ec3-ba96-6eaf6236caa7  eyJtZXNzYWdlIjogIjIwMjAtMTEtMDggMDM6NDM6MjMifQ==          2020-11-07T18:44:00+00:00  2020-11-14T18:44:00+00:00  0

-- my-queue2 --
MessageId                             Content                                                   InsertionTime              ExpirationTime             DequeueCount
------------------------------------  --------------------------------------------------------  -------------------------  -------------------------  --------------
1991cce6-8783-4823-848d-18078700ca8b  IntcIm1lc3NhZ2VcIjogXCIyMDIwLTExLTA4IDAzOjQyOjA3XCJ9XG4i  2020-11-07T18:42:08+00:00  2020-11-14T18:42:08+00:00  0
953147c2-0c83-4af3-b11b-af9b45c02b3b  eyJtZXNzYWdlIjogIjIwMjAtMTEtMDggMDM6NDI6MTQifQo=          2020-11-07T18:43:00+00:00  2020-11-14T18:43:00+00:00  0
25e2ec62-c380-4517-b31b-685c8ff7b165  eyJtZXNzYWdlIjogIjIwMjAtMTEtMDggMDM6NDM6MjMifQ==          2020-11-07T18:44:00+00:00  2020-11-14T18:44:00+00:00  0

-- my-base-queue-poison --

動作確認(Azure)

デプロイ

./1_resources.sh

キューにメッセージを出力

./2_put_message.sh

キュー1 及び キュー2の内容をStorageExplorerで確認(ローカルと同じなので省略)


添付ファイル: fileproc_image3.png 293件 [詳細] fileproc_image2.png 266件 [詳細] fileimage2.png 299件 [詳細] fileimage1.png 267件 [詳細] fileazure_queue_trigger_test2.png 308件 [詳細] fileazure_queue_trigger_test1.png 274件 [詳細]

トップ   差分 バックアップ リロード   一覧 単語検索 最終更新   ヘルプ   最終更新のRSS
Last-modified: 2020-11-08 (日) 03:28:32 (1404d)