概要 †Queueトリガーを使用してメッセージを他のキューにコピーする方法について記載する。 Queueトリガーを利用する際の注意点 †Queueトリガーを利用する場合、以下の点に注意する必要がある。
構築イメージ †上記の注意点を踏まえて、下図の通り構築する。 元キューにメッセージが追加されたら Queue トリガーで 関数アプリを起動して 他の2つのキューにメッセージをコピーする。 何らかの要因で関数アプリが動作していなかった場合を考慮して、Timerトリガーで定期的に元キューを監視しておく。 Queue トリガーでエラーになったメッセージは元キューには残らず、有害キューに出力されるので、有害キューを定期的に処理する関数も用意しておく。 目次 †作成するファイル †. ├─ 0_env.sh ├─ 1_resources.sh ├─ 2_put_message.sh ├─ X1_local_start.sh ├─ X2_local_message_put.sh ├─ X3_local_message_peek.sh ├─ X4_local_message_clear.sh ├─ functions │ ├─ server.go │ ├─ host.json │ └─ local.settings.json │ ├─ RelayQueue │ │ └─ function.json │ ├─ RelayQueueAll │ │ └─ function.json │ ├─ RelayPoisonQueue │ │ └─ function.json ... リソース名の定義など ... リソース作成用のシェル ... キューへのメッセージ出力用のシェル ... (ローカル確認用) 関数アプリ起動用シェル ... (ローカル確認用) ストレージエミュレータへのメッセージ出力用シェル ... (ローカル確認用) ストレージエミュレータのキューに溜まっているメッセージの確認用シェル ... (ローカル確認用) ストレージエミュレータのキューに溜まっているメッセージをクリアするシェル ... 関数ソース(Goによるカスタムハンドラー) ... 関数アプリの定義ファイル ... ローカル実行用の設定ファイル ... 元キューにメッセージが出力されたタイミングで起動する関数の定義(Queueトリガー) ... 元キューに残っているメッセージを定期的に処理する為の関数の定義(Timerトリガー) ... 有害キューに溜まっているメッセージを定期的に処理する為の関数の定義(Timerトリガー) 関数ソース(functions配下) †
host.json { "version": "2.0", "httpWorker": { "description": { "defaultExecutablePath": "server.exe" } }, "extensions": { "queues": { "maxPollingInterval": "00:00:10", "visibilityTimeout" : "00:00:00", "batchSize": 16, "maxDequeueCount": 5, "newBatchThreshold": 8 } }, "extensionBundle": { "id": "Microsoft.Azure.Functions.ExtensionBundle", "version": "[1.*, 2.0.0)" } } local.settings.json { "IsEncrypted": false, "Values": { "AzureWebJobsStorage": "UseDevelopmentStorage=true", "ACCOUNT_NAME": "devstoreaccount1", "ACCOUNT_KEY": "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==", "BASE_QUEUE": "my-base-queue", "RELAY_QUEUE1": "my-queue1", "RELAY_QUEUE2": "my-queue2" } } ※アカウントキーはローカルエミュレータ(Azurite)用のもの。 server.go package main import ( "context" "encoding/base64" "encoding/json" "fmt" "log" "math/rand" "net/http" "net/url" "os" "strings" "time" "github.com/Azure/azure-storage-queue-go/azqueue" ) type ReturnValue struct { Data string } type InvokeResponse struct { Outputs map[string]interface{} Logs []string ReturnValue interface{} } type InvokeRequest struct { Data map[string]interface{} Metadata map[string]interface{} } func printDebug(format string, params ...interface{}){ log.SetOutput(os.Stdout) msg := fmt.Sprintf(format, params...) log.Printf("[DEBUG] %s\n", msg) } func printInfo(format string, params ...interface{}){ log.SetOutput(os.Stdout) msg := fmt.Sprintf(format, params...) log.Printf("[INFO] %s\n", msg) } func printError(format string, params ...interface{}){ log.SetOutput(os.Stderr) msg := fmt.Sprintf(format, params...) log.Printf("[ERROR] %s\n", msg) log.SetOutput(os.Stdout) } func init(){ log.SetOutput(os.Stdout) log.SetFlags(0) } func isLocalAccount(accountName string) bool { return accountName == "devstoreaccount1" } func getQueueUrlFormat(accountName string) string { queueUrlFormat := "https://%s.queue.core.windows.net/%s" if isLocalAccount(accountName) { queueUrlFormat = "http://127.0.0.1:10001/%s/%s" } return queueUrlFormat } /** * 環境変数の取得. */ func getEnv(envName string, defaultValue string) string { value, exists := os.LookupEnv(envName) if exists { return value } else { return defaultValue } } /** * キューアカウント情報の取得 */ func getRelayQueueInfo() (string, string, string, []string) { accountName := getEnv("ACCOUNT_NAME", "") accountKey := getEnv("ACCOUNT_KEY" , "") queueName0 := getEnv("BASE_QUEUE" , "") queueName1 := getEnv("RELAY_QUEUE1" , "") queueName2 := getEnv("RELAY_QUEUE2" , "") return accountName, accountKey, queueName0, []string{queueName1, queueName2} } /** * 過剰なエスケープを除去する. */ func removeExcessEscape(queueItem string) string { queueItem = strings.TrimRight(queueItem, "\"") queueItem = strings.TrimLeft(queueItem, "\"") queueItem = strings.Replace(queueItem, "\\r\\n", "\n", -1) queueItem = strings.Replace(queueItem, "\\n", "\n", -1) queueItem = strings.Replace(queueItem, "\\\"", "\"", -1) queueItem = strings.Replace(queueItem, "\\\\\"", "", -1) return queueItem } /** * 指定されたキューにメッセージを出力する. */ func relayQueueMessage(accountName string, accountKey string, queueName string, queueItem string){ queueUrlFormat := getQueueUrlFormat(accountName) u, _ := url.Parse(fmt.Sprintf(queueUrlFormat, accountName, queueName)) credential, err := azqueue.NewSharedKeyCredential(accountName, accountKey) if err != nil { log.Fatal(err) } queueUrl := azqueue.NewQueueURL(*u, azqueue.NewPipeline(credential, azqueue.PipelineOptions{})) ctx := context.TODO() msgUrl := queueUrl.NewMessagesURL() _, err = msgUrl.Enqueue(ctx, queueItem, 0, 0) if err != nil { log.Fatal("Error Enqueue: ", err) printInfo("Error relay queue. (%s), error: %v", queueName, err) } else { printInfo("Success relay queue. (%s)", queueName) } } /** * キューメッセージのコピー. */ func relayQueueHandler(w http.ResponseWriter, r *http.Request) { printInfo("[relayQueueHandler] START") defer func(){ err := recover() if err != nil { printError("ERROR relayQueueHandler: %v", err) panic(err) } else { printInfo("SUCCESS relayQueueHandler") } }() outputs := make(map[string]interface{}) var invokeReq InvokeRequest d := json.NewDecoder(r.Body) decodeErr := d.Decode(&invokeReq) if decodeErr != nil { http.Error(w, decodeErr.Error(), http.StatusBadRequest) return } // 何回目の試行か dequeueCount := fmt.Sprintf("%v",invokeReq.Metadata["DequeueCount"]) printInfo("[relayQueueHandler] dequeueCount: %v", dequeueCount) printDebug("[relayQueueHandler] invokeReq: %v", invokeReq) printDebug("[relayQueueHandler] queue metadata: %v", invokeReq.Metadata) // テスト用に一定の確率でエラーにする rand.Seed(time.Now().UnixNano()) if rand.Intn(100) >= 50 || dequeueCount != "1" { panic("Random Error!") } // キューメッセージを取得(BASE64デコードされた状態で取得される) queueItem := invokeReq.Data["queueItem"].(string) // 過剰エスケープを除去 decodedItem := removeExcessEscape(queueItem) printInfo("[relayQueueHandler] queue message(Value): %v", decodedItem) // キューメッセージをコピー(自動的にBASE64エンコードされる) outputs["relayqueue1"] = decodedItem outputs["relayqueue2"] = decodedItem // 正常終了 invokeResponse := InvokeResponse{Outputs: outputs, Logs: []string{} } js, err := json.Marshal(invokeResponse) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") w.Write(js) //http.Error(w, "process relayQueueHandler.", http.StatusInternalServerError) printInfo("[relayQueueHandler] END") } /** * 元キューに溜まっているメッセージをすべて処理する. */ func relayQueueAllHandler(w http.ResponseWriter, r *http.Request) { printInfo("[relayQueueAllHandler] START") // キューのアカウント情報等を取得 accountName, accountKey, baseQueue, queueNames := getRelayQueueInfo() // メッセージのコピーを実行 retryRelayQueue(accountName, accountKey, baseQueue, queueNames) printInfo("[relayQueueAllHandler] END") w.Header().Set("Content-Type", "application/json") w.Write([]byte("{\"message\": \"relayQueue\"}")) } /** * 有害キューに溜まっているメッセージをすべて処理する. */ func relayPoisonQueueHandler(w http.ResponseWriter, r *http.Request) { printInfo("[relayPoisonQueueHandler] START") // キューのアカウント情報等を取得 accountName, accountKey, baseQueue, queueNames := getRelayQueueInfo() poisonQueue := fmt.Sprintf("%s-poison", baseQueue) // メッセージのコピーを実行 retryRelayQueue(accountName, accountKey, poisonQueue, queueNames) printInfo("[relayPoisonQueueHandler] END") w.Header().Set("Content-Type", "application/json") w.Write([]byte("{\"message\": \"relayQueue\"}")) } /** * キューに溜まっているメッセージをリレー先キューにコピーする. */ func retryRelayQueue(accountName string, accountKey string, targetQueue string, queueNames []string) { printInfo("[retryRelayQueue] START") // キューURLの取得 credential, err := azqueue.NewSharedKeyCredential(accountName, accountKey) if err != nil { panic(fmt.Sprintf("NewSharedKeyCredential error: %v", err)) } queueUrlFormat := getQueueUrlFormat(accountName) u, _ := url.Parse(fmt.Sprintf(queueUrlFormat, accountName, targetQueue)) queueUrl := azqueue.NewQueueURL(*u, azqueue.NewPipeline(credential, azqueue.PipelineOptions{})) // メッセージ一覧を取得 queueCtx := context.TODO() maxMessages := int32(32) // 最大件数 visibilityTimeout := time.Second * 10 // 可視性タイムアウト msgUrl := queueUrl.NewMessagesURL() dequeueResp, err := msgUrl.Dequeue(queueCtx, maxMessages, visibilityTimeout) if err != nil { panic(err) } else { // 残っているメッセージを全て処理 for i := int32(0); i < dequeueResp.NumMessages(); i++ { // リレー用のキューにコピー msg := dequeueResp.Message(i) // メッセージをいったんBASE64デコード // (元メッセージがエンコードされていないケースも想定してデコード&エンコードし直しておく) planTextMessage := "" messageBytes, err := base64.StdEncoding.DecodeString(msg.Text) if err != nil { planTextMessage = msg.Text } else { planTextMessage = removeExcessEscape(string(messageBytes)) } printDebug("queueMessage: %s", msg.Text) printDebug("textMessage : %s", planTextMessage) // メッセージを再度BASE64エンコード encodedMessage := base64.StdEncoding.EncodeToString([]byte(planTextMessage)) // メッセージ(JSON)をさらに構造体に変換する場合 //var queueData map[string]interface{} //decodeErr := json.Unmarshal([]byte(planTextMessage), &queueData) //if decodeErr != nil { // printError("json decode error: %v",decodeErr.Error()) // panic(decodeErr) //} //printDebug("%v", queueData) //message := queueData["message"].(string) //printDebug("copy to %v, queueItem: %s, message: %s", queueNames, queueItem, message) // メッセージを対象のキューにコピー for _, queueName := range queueNames { printDebug("copy to %s, queueItem: %s", queueName, planTextMessage) relayQueueMessage(accountName, accountKey, queueName, encodedMessage) } // 元キューからメッセージを削除 msgIdUrl := msgUrl.NewMessageIDURL(msg.ID) _, err = msgIdUrl.Delete(queueCtx, msg.PopReceipt) if err != nil { printError("Error delete message. %s, %v", planTextMessage, err) } else { printInfo("Success delete message %s", planTextMessage) } } } printInfo("[retryRelayQueue] END") } /** * メイン処理. */ func main() { httpInvokerPort, exists := os.LookupEnv("FUNCTIONS_HTTPWORKER_PORT") if exists { printInfo("FUNCTIONS_HTTPWORKER_PORT: " + httpInvokerPort) } mux := http.NewServeMux() mux.HandleFunc("/RelayQueue", relayQueueHandler) mux.HandleFunc("/RelayQueueAll", relayQueueAllHandler) mux.HandleFunc("/RelayPoisonQueue", relayPoisonQueueHandler) log.Println("Go server Listening...on httpInvokerPort:", httpInvokerPort) log.Fatal(http.ListenAndServe(":"+httpInvokerPort, mux)) } RelayQueue/function.json { "bindings": [ { "name": "queueItem", "type": "queueTrigger", "direction": "in", "queueName": "my-base-queue", "connection": "AzureWebJobsStorage" }, { "name": "relayqueue1", "type": "queue", "direction": "out", "queueName": "my-queue1", "connection": "AzureWebJobsStorage" }, { "name": "relayqueue2", "type": "queue", "direction": "out", "queueName": "my-queue2", "connection": "AzureWebJobsStorage" } ] } 元キューの定期処理用の定義。ここでは 12時間間隔とした。 RelayQueueAll/function.json { "bindings": [ { "name": "myTimer", "type": "timerTrigger", "direction": "in", "schedule": "0 0 */12 * * *", "useMonitor": true } ] } 有害キューの定期処理用の定義。ここではテスト用に1分間隔とした。 RelayPoisonQueue/function.json { "bindings": [ { "name": "myTimer", "type": "timerTrigger", "direction": "in", "schedule": "0 * * * * *", "useMonitor": true } ] } デプロイ/動作確認用シェル †
0_env.sh #!/bin/bash # 全てのリソース名に付与する接頭文字 (Storageアカウント名などは世界でユニークな必要があるので他ユーザと被らないような名前を付ける) PREFIX=xxxxxxxxxx # サブスクリプションID (Application Insight を使用しない場合は空でも可) subscriptionId=xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx # リージョン region=japanwest insightsRegion=japaneast # 2020/7時点では Appplication Insights で西日本(japanwest)は使用できない # リソースグループ名 resourceGroup=${PREFIX}ResourceGroup # ストレージアカウント名 storageAccountName=${PREFIX}straccount storageSku=Standard_LRS # Storageキュー名 storageQueue0=my-base-queue storageQueue1=my-queue1 storageQueue2=my-queue2 # 関数アプリ名 funcAppName1=${PREFIX}RelayFunc funcAppName2=${PREFIX}Queue1Func # 使用するFunctionsのバージョン funcVersion=2 # 関数アプリのプラン名 funcPlanName=${PREFIX}plan #funcPlanSku=EP1 funcPlanSku=B1 # Application insights insightsName=${PREFIX}Insights insightsDays=30 insightsSetting="" if [ "$subscriptionId" != "" ]; then insightsSetting="--app-insights $insightsName" fi 1_resources.sh #!/bin/bash source ./0_env.sh # リソースの作成 if [ "$1" == "--create" ]; then #------------------------------- # リソースグループの作成 #------------------------------- echo az group create az group create \ --name $resourceGroup \ --location $region #------------------------------- # ストレージアカウントの作成 #------------------------------- echo az storage account create az storage account create \ --name $storageAccountName \ --location $region \ --resource-group $resourceGroup \ --sku $storageSku # ストレージアカウントキーを取得 storageAccountKey=`az storage account keys list -n $storageAccountName -g $resourceGroup -o table | grep key1 | awk '{print $NF}'` #------------------------------- # Storageキューの作成 # ※-poisonキューはエラー時に自動で作成されるが、先に作成しておいても問題ない。 #------------------------------- echo "az storage queue create" az storage queue create --name $storageQueue0 --account-name $storageAccountName --account-key $storageAccountKey az storage queue create --name ${storageQueue0}-poison --account-name $storageAccountName --account-key $storageAccountKey az storage queue create --name $storageQueue1 --account-name $storageAccountName --account-key $storageAccountKey az storage queue create --name $storageQueue2 --account-name $storageAccountName --account-key $storageAccountKey #------------------------------- # Application Insights 拡張が利用できない場合は追加インストール #------------------------------- x=`az monitor app-insights --help 2>&1` if [ "$?" != "0" ]; then az extension add -n application-insights fi #------------------------------- # Application Insights コンポーネント作成 #------------------------------- echo az monitor app-insights component create if [ "$subscriptionId" != "" ]; then az monitor app-insights component create \ --app $insightsName \ --location $insightsRegion \ --resource-group $resourceGroup \ --query-access Enabled \ --retention-time $insightsDays \ --subscription $subscriptionId fi #------------------------------- # キューリレー用の関数 #------------------------------- # 関数プランの作成 echo az functionapp plan create az functionapp plan create \ --name $funcPlanName \ --resource-group $resourceGroup \ --location $region \ --sku $funcPlanSku # 関数アプリの作成 echo az functionapp create az functionapp create \ --name $funcAppName1 \ --storage-account $storageAccountName \ --plan $funcPlanName \ --resource-group $resourceGroup \ --functions-version $funcVersion $insightsSetting # 関数アプリの環境変数の設定 echo "az functionapp config appsettings" az functionapp config appsettings set \ --name $funcAppName1 \ --resource-group $resourceGroup \ --settings "ACCOUNT_NAME=$storageAccountName" \ "ACCOUNT_KEY=$storageAccountKey" \ "BASE_QUEUE=$storageQueue0" \ "RELAY_QUEUE1=$storageQueue1" \ "RELAY_QUEUE2=$storageQueue2" # 関数アプリのデプロイ # すぐにデプロイするとエラー(Timeout)になる場合がある為、少し待つ echo "sleep 10 seconds..." sleep 10 cd functions exefile=`cat host.json | grep defaultExecutablePath | awk '{print $2}' | sed 's/"//g'` rm -rf $exefile GOOS=windows GOARCH=amd64 go build -o $exefile zip -r ../functions.zip * cd ../ az functionapp deployment source config-zip -g $resourceGroup -n $funcAppName1 --src functions.zip rm -rf functions.zip fi # リソースの削除 if [ "$1" == "--delete" ]; then az group delete --name $resourceGroup fi 2_put_message.sh #!/bin/bash source ./0_env.sh date_text=`date "+%Y-%m-%d %H:%M:%S"` message=`cat << _EOF_ {"message": "$date_text"} _EOF_ ` base64_message=`echo $message | base64` # ストレージアカウントキーを取得 storageAccountKey=`az storage account keys list -n $storageAccountName -g $resourceGroup -o table | grep key1 | awk '{print $NF}'` # キューにメッセージをPUT az storage message put \ --account-name $storageAccountName \ --account-key "$storageAccountKey" \ --queue-name $storageQueue0 \ --content "$base64_message" X1_local_start.sh #!/bin/bash source ./0_env.sh # ローカルのストレージエミュレータ起動 mkdir -p local_azurite azurite --silent --location `pwd`/local_azurite & # ストレージキュー作成 export AZURE_STORAGE_CONNECTION_STRING="UseDevelopmentStorage=true" az storage queue create --name $storageQueue0 az storage queue create --name ${storageQueue0}-poison az storage queue create --name $storageQueue1 az storage queue create --name $storageQueue2 # 関数アプリの起動 cd ./functions exefile=`cat host.json | grep defaultExecutablePath | awk '{print $2}' | sed 's/"//g'` go build -o $exefile func start rm -rf $exefile export AZURE_STORAGE_CONNECTION_STRING= cd ../ X2_local_message_put.sh #!/bin/bash source ./0_env.sh date_text=`date "+%Y-%m-%d %H:%M:%S"` message=`cat << _EOF_ {"message": "$date_text"} _EOF_ ` base64_message=`echo $message | base64` if [ "$1" == "--error" ]; then # Base64エンコードせずに出力 base64_message="$message" fi export AZURE_STORAGE_CONNECTION_STRING="UseDevelopmentStorage=true" az storage message put \ --queue-name $storageQueue0 \ --content "$base64_message" export AZURE_STORAGE_CONNECTION_STRING= X3_local_message_peek.sh #!/bin/bash source ./0_env.sh export AZURE_STORAGE_CONNECTION_STRING="UseDevelopmentStorage=true" echo "-- $storageQueue0 --" az storage message peek --queue-name $storageQueue0 -o table --num-messages 32 echo "" echo "-- $storageQueue1 --" az storage message peek --queue-name $storageQueue1 -o table --num-messages 32 echo "" echo "-- $storageQueue2 --" az storage message peek --queue-name $storageQueue2 -o table --num-messages 32 echo "" echo "-- ${storageQueue0}-poison --" az storage message peek --queue-name ${storageQueue0}-poison -o table --num-messages 32 echo "" export AZURE_STORAGE_CONNECTION_STRING= X4_local_message_clear.sh #!/bin/bash source ./0_env.sh export AZURE_STORAGE_CONNECTION_STRING="UseDevelopmentStorage=true" az storage message clear -q $storageQueue0 az storage message clear -q $storageQueue1 az storage message clear -q $storageQueue2 az storage message clear -q ${storageQueue0}-poison export AZURE_STORAGE_CONNECTION_STRING= 動作確認(ローカル) †ローカルで関数アプリ起動 †./X1_local_start.sh キューにメッセージを出力 †./X2_local_message_put.sh キュー1 及び キュー2の内容をStorageExplorerで確認 エラーになるメッセージをキューに出力してみる †Queueトリガーで処理されるデータはBASE64エンコードされている必要があるらしい ./X2_local_message_put.sh --error 関数アプリのコンソールを確認してみると、エラーになってから、有害キューの処理関数で正常に処理できている模様。 [2020/11/07 18:43:26] Executed 'Functions.RelayQueue' (Failed, Id=d3932930-eac9-4112-b565-824975810193) [2020/11/07 18:43:26] System.Private.CoreLib: Exception while executing function: Functions.RelayQueue. Microsoft.Azure.WebJobs.Host: Exception binding parameter 'queueItem'. System.Private.CoreLib: The input is not a valid Base-64 string as it contains a non-base 64 character, more than two padding characters, or an illegal character among the padding characters. [2020/11/07 18:43:26] Executing 'Functions.RelayQueue' (Reason='New queue message detected on 'my-base-queue'.', Id=4b9c8787-c81f-4449-b0d6-80bab35c1579) [2020/11/07 18:43:26] Trigger Details: MessageId: ad4910c8-7097-4c0b-9714-ce5ee64ff8eb, DequeueCount: 2, InsertionTime: 2020-11-07T18:43:24.000+00:00 [2020/11/07 18:43:26] Executed 'Functions.RelayQueue' (Failed, Id=4b9c8787-c81f-4449-b0d6-80bab35c1579) [2020/11/07 18:43:26] System.Private.CoreLib: Exception while executing function: Functions.RelayQueue. Microsoft.Azure.WebJobs.Host: Exception binding parameter 'queueItem'. System.Private.CoreLib: The input is not a valid Base-64 string as it contains a non-base 64 character, more than two padding characters, or an illegal character among the padding characters. [2020/11/07 18:43:26] Executing 'Functions.RelayQueue' (Reason='New queue message detected on 'my-base-queue'.', Id=24dab509-29f6-483a-90ed-a9de475b80e3) [2020/11/07 18:43:26] Trigger Details: MessageId: ad4910c8-7097-4c0b-9714-ce5ee64ff8eb, DequeueCount: 3, InsertionTime: 2020-11-07T18:43:24.000+00:00 [2020/11/07 18:43:26] Executed 'Functions.RelayQueue' (Failed, Id=24dab509-29f6-483a-90ed-a9de475b80e3) [2020/11/07 18:43:26] System.Private.CoreLib: Exception while executing function: Functions.RelayQueue. Microsoft.Azure.WebJobs.Host: Exception binding parameter 'queueItem'. System.Private.CoreLib: The input is not a valid Base-64 string as it contains a non-base 64 character, more than two padding characters, or an illegal character among the padding characters. [2020/11/07 18:43:26] Executing 'Functions.RelayQueue' (Reason='New queue message detected on 'my-base-queue'.', Id=6f9410d1-67ff-4393-844c-0da005845167) [2020/11/07 18:43:26] Trigger Details: MessageId: ad4910c8-7097-4c0b-9714-ce5ee64ff8eb, DequeueCount: 4, InsertionTime: 2020-11-07T18:43:24.000+00:00 [2020/11/07 18:43:26] Executed 'Functions.RelayQueue' (Failed, Id=6f9410d1-67ff-4393-844c-0da005845167) [2020/11/07 18:43:26] System.Private.CoreLib: Exception while executing function: Functions.RelayQueue. Microsoft.Azure.WebJobs.Host: Exception binding parameter 'queueItem'. System.Private.CoreLib: The input is not a valid Base-64 string as it contains a non-base 64 character, more than two padding characters, or an illegal character among the padding characters. [2020/11/07 18:43:26] Executing 'Functions.RelayQueue' (Reason='New queue message detected on 'my-base-queue'.', Id=ddd65b87-2d5b-4a74-9a6e-a44d7b9075b4) [2020/11/07 18:43:26] Trigger Details: MessageId: ad4910c8-7097-4c0b-9714-ce5ee64ff8eb, DequeueCount: 5, InsertionTime: 2020-11-07T18:43:24.000+00:00 [2020/11/07 18:43:26] Executed 'Functions.RelayQueue' (Failed, Id=ddd65b87-2d5b-4a74-9a6e-a44d7b9075b4) [2020/11/07 18:43:26] System.Private.CoreLib: Exception while executing function: Functions.RelayQueue. Microsoft.Azure.WebJobs.Host: Exception binding parameter 'queueItem'. System.Private.CoreLib: The input is not a valid Base-64 string as it contains a non-base 64 character, more than two padding characters, or an illegal character among the padding characters. [2020/11/07 18:43:26] Message has reached MaxDequeueCount of 5. Moving message to queue 'my-base-queue-poison'. : : [2020/11/07 18:44:00] Executing 'Functions.RelayPoisonQueue' (Reason='Timer fired at 2020-11-08T03:44:00.0050040+09:00', Id=2db55918-246c-4447-963f-c93a3185362e) [2020/11/07 18:44:00] [INFO] [relayPoisonQueueHandler] START [2020/11/07 18:44:00] [INFO] [retryRelayQueue] START [2020/11/07 18:44:00] [DEBUG] queueMessage: {"message": "2020-11-08 03:43:23"} [2020/11/07 18:44:00] [DEBUG] textMessage : {"message": "2020-11-08 03:43:23"} [2020/11/07 18:44:00] [DEBUG] copy to my-queue1, queueItem: {"message": "2020-11-08 03:43:23"} [2020/11/07 18:44:00] [INFO] Success relay queue. (my-queue1) [2020/11/07 18:44:00] [DEBUG] copy to my-queue2, queueItem: {"message": "2020-11-08 03:43:23"} [2020/11/07 18:44:00] [INFO] Success relay queue. (my-queue2) [2020/11/07 18:44:00] [INFO] Success delete message {"message": "2020-11-08 03:43:23"} [2020/11/07 18:44:00] [INFO] [retryRelayQueue] END [2020/11/07 18:44:00] Executed 'Functions.RelayPoisonQueue' (Succeeded, Id=2db55918-246c-4447-963f-c93a3185362e) [2020/11/07 18:44:00] [INFO] [relayPoisonQueueHandler] END キューの状態を確認してみると有害キューからメッセージがなくなって、リレー先のキューにメッセージがコピーされている事がわかる。 [有害キューが処理される前の状態] ./X3_local_message_peek.sh -- my-base-queue -- -- my-queue1 -- MessageId Content InsertionTime ExpirationTime DequeueCount ------------------------------------ -------------------------------------------------------- ------------------------- ------------------------- -------------- 3aeb0ea8-8a07-4168-9e0e-9264566bfdc0 IntcIm1lc3NhZ2VcIjogXCIyMDIwLTExLTA4IDAzOjQyOjA3XCJ9XG4i 2020-11-07T18:42:08+00:00 2020-11-14T18:42:08+00:00 0 7b8c9fc3-561d-4bde-a669-d696e0098d2e eyJtZXNzYWdlIjogIjIwMjAtMTEtMDggMDM6NDI6MTQifQo= 2020-11-07T18:43:00+00:00 2020-11-14T18:43:00+00:00 0 -- my-queue2 -- MessageId Content InsertionTime ExpirationTime DequeueCount ------------------------------------ -------------------------------------------------------- ------------------------- ------------------------- -------------- 1991cce6-8783-4823-848d-18078700ca8b IntcIm1lc3NhZ2VcIjogXCIyMDIwLTExLTA4IDAzOjQyOjA3XCJ9XG4i 2020-11-07T18:42:08+00:00 2020-11-14T18:42:08+00:00 0 953147c2-0c83-4af3-b11b-af9b45c02b3b eyJtZXNzYWdlIjogIjIwMjAtMTEtMDggMDM6NDI6MTQifQo= 2020-11-07T18:43:00+00:00 2020-11-14T18:43:00+00:00 0 -- my-base-queue-poison -- MessageId Content InsertionTime ExpirationTime DequeueCount ------------------------------------ ---------------------------------- ------------------------- ------------------------- -------------- 61664ff4-5566-4722-a7e9-6b1b337cdd4d {"message": "2020-11-08 03:43:23"} 2020-11-07T18:43:26+00:00 2020-11-14T18:43:26+00:00 0 [有害キューが処理された後の状態] -- my-base-queue -- -- my-queue1 -- MessageId Content InsertionTime ExpirationTime DequeueCount ------------------------------------ -------------------------------------------------------- ------------------------- ------------------------- -------------- 3aeb0ea8-8a07-4168-9e0e-9264566bfdc0 IntcIm1lc3NhZ2VcIjogXCIyMDIwLTExLTA4IDAzOjQyOjA3XCJ9XG4i 2020-11-07T18:42:08+00:00 2020-11-14T18:42:08+00:00 0 7b8c9fc3-561d-4bde-a669-d696e0098d2e eyJtZXNzYWdlIjogIjIwMjAtMTEtMDggMDM6NDI6MTQifQo= 2020-11-07T18:43:00+00:00 2020-11-14T18:43:00+00:00 0 88a05f28-4db8-4ec3-ba96-6eaf6236caa7 eyJtZXNzYWdlIjogIjIwMjAtMTEtMDggMDM6NDM6MjMifQ== 2020-11-07T18:44:00+00:00 2020-11-14T18:44:00+00:00 0 -- my-queue2 -- MessageId Content InsertionTime ExpirationTime DequeueCount ------------------------------------ -------------------------------------------------------- ------------------------- ------------------------- -------------- 1991cce6-8783-4823-848d-18078700ca8b IntcIm1lc3NhZ2VcIjogXCIyMDIwLTExLTA4IDAzOjQyOjA3XCJ9XG4i 2020-11-07T18:42:08+00:00 2020-11-14T18:42:08+00:00 0 953147c2-0c83-4af3-b11b-af9b45c02b3b eyJtZXNzYWdlIjogIjIwMjAtMTEtMDggMDM6NDI6MTQifQo= 2020-11-07T18:43:00+00:00 2020-11-14T18:43:00+00:00 0 25e2ec62-c380-4517-b31b-685c8ff7b165 eyJtZXNzYWdlIjogIjIwMjAtMTEtMDggMDM6NDM6MjMifQ== 2020-11-07T18:44:00+00:00 2020-11-14T18:44:00+00:00 0 -- my-base-queue-poison -- 動作確認(Azure) † |