#mynavi(Azureメモ); #setlinebreak(on); #html(){{ <style> .images img { border: 1px solid #333; margin: 0 0 20px 0 !important; } .img_margin { margin: 0 !important; } </style> }} * 概要 [#pd7305e0] #html(<div class="pl10">) Queueトリガーを使用してメッセージを他のキューにコピーする方法について記載する。 #html(</div>) * Queueトリガーを利用する際の注意点 [#o413c452] #html(<div class="pl10">) Queueトリガーを利用する場合、以下の点に注意する必要がある。 - Queueトリガーで Functions を起動した場合、正常終了、異常終了に関わらず元キューの対象メッセージは自動的に削除される。 - Queueトリガーのエラー時にはリトライが行われるが、全てのリトライに失敗した場合は有害キューに出力される。(元キューには残らない)&br; ※有害キューの名前は {元キュー名}-poison - Queueトリガーで関数を起動をする場合は、キューメッセージは BASE64エンコードされている必要がある。&br; ※BASE64エンコードされていない場合はエラーになる。(関数の処理まで到達しない) - Queueトリガーのエラー時にはリトライが行われるが、&color(red){全てのリトライに失敗した場合は有害キューに出力される};。(&color(red){''元キューには残らない''};)&br; ※有害キューの名前は {元キュー名}-poison - Queueトリガーで関数を起動をする場合は、&color(red){キューメッセージは BASE64エンコードされている必要がある};。&br; ※BASE64エンコードされていない場合はエラーになる。(関数の処理まで到達しない) #html(</div>) * 構築イメージ [#d6378432] #html(<div class="pl10">) 上記の注意点を踏まえて、下図の通り構築する。 #html(<div class="images">) 元キューにメッセージが追加されたら Queue トリガーで 関数アプリを起動して 他の2つのキューにメッセージをコピーする。 #ref(image1.png,nolink); #html(</div>) #html(<div class="images">) 何らかの要因で関数アプリが動作していなかった場合を考慮して、Timerトリガーで定期的に元キューを監視しておく。 こちらは元キューに残っている全てのメッセージを取得して、外の2つのキューにコピーする。(ただし当サンプルでは最大32メッセージまで) #ref(proc_image2.png,nolink); #html(</div>) #html(<div class="images">) &color(red){Queue トリガーでエラーになったメッセージは元キューには残らず、有害キューに出力される};ので、有害キューを定期的に処理する関数も用意しておく。 尚、有害キューの名前は {元キュー名}-poison となる。 #ref(proc_image3.png,nolink); #html(</div>) #html(</div>) * 目次 [#q6a1cf43] #contents - 関連 -- [[Azureメモ]] -- [[Azure Blobトリガーで起動される関数をリトライで再利用する]] -- [[GoでAzureのストレージキューの読み書き]] * 作成するファイル [#q3c136ab] #html(<div class="pl10">) #html(){{ <div style="padding: 0px 10px 10px 10px; border: 1px solid #333;"> <pre style="display: inline-block; margin: 0; padding-right: 10px; font-size: 1rem; background: transparent; border: 0px; vertical-align: top;"> . ├─ 0_env.sh ├─ 1_resources.sh ├─ 2_put_message.sh ├─ X1_local_start.sh ├─ X2_local_message_put.sh ├─ X3_local_message_peek.sh ├─ X4_local_message_clear.sh ├─ functions │   ├─ server.go │   ├─ host.json │   └─ local.settings.json │   ├─ RelayQueue │   │   └─ function.json │   ├─ RelayQueueAll │   │   └─ function.json │   ├─ RelayPoisonQueue │   │   └─ function.json </pre> <pre style="display: inline-block; margin: 0; font-size: 1rem; background: transparent; border: 0px; vertical-align: top;"> ... リソース名の定義など ... リソース作成用のシェル ... キューへのメッセージ出力用のシェル ... (ローカル確認用) 関数アプリ起動用シェル ... (ローカル確認用) ストレージエミュレータへのメッセージ出力用シェル ... (ローカル確認用) ストレージエミュレータのキューに溜まっているメッセージの確認用シェル ... (ローカル確認用) ストレージエミュレータのキューに溜まっているメッセージをクリアするシェル ... 関数ソース(Goによるカスタムハンドラー) ... 関数アプリの定義ファイル ... ローカル実行用の設定ファイル ... 元キューにメッセージが出力されたタイミングで起動する関数の定義(Queueトリガー) ... 元キューに残っているメッセージを定期的に処理する為の関数の定義(Timerトリガー) ... 有害キューに溜まっているメッセージを定期的に処理する為の関数の定義(Timerトリガー) </pre> </div> }} #html(</div>) * 関数ソース(functions配下) [#d2a8dd5e] #html(<div class="pl10">) #html(){{ <div id="tabs1"> <ul> <li><a href="#tabs1-1">host.json</a></li> <li><a href="#tabs1-2">local.settings.json</a></li> <li><a href="#tabs1-3">server.go</a></li> <li><a href="#tabs1-4">RelayQueue/function.json</a></li> <li><a href="#tabs1-5">RelayQueueAll/function.json</a></li> <li><a href="#tabs1-6">RelayPoisonQueue/function.json</a></li> </ul> }} #html(<div id="tabs1-1">) host.json #mycode2(){{ { "version": "2.0", "httpWorker": { "description": { "defaultExecutablePath": "server.exe" } }, "extensions": { "queues": { "maxPollingInterval": "00:00:10", "visibilityTimeout" : "00:00:00", "batchSize": 16, "maxDequeueCount": 5, "newBatchThreshold": 8 } }, "extensionBundle": { "id": "Microsoft.Azure.Functions.ExtensionBundle", "version": "[1.*, 2.0.0)" } } }} #html(</div>) #html(<div id="tabs1-2">) local.settings.json #mycode2(){{ { "IsEncrypted": false, "Values": { "AzureWebJobsStorage": "UseDevelopmentStorage=true", "ACCOUNT_NAME": "devstoreaccount1", "ACCOUNT_KEY": "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==", "BASE_QUEUE": "my-base-queue", "RELAY_QUEUE1": "my-queue1", "RELAY_QUEUE2": "my-queue2" } } }} ※アカウントキーはローカルエミュレータ(Azurite)用のもの。 #html(</div>) #html(<div id="tabs1-3">) server.go #mycode2(){{ package main import ( "context" "encoding/base64" "encoding/json" "fmt" "log" "math/rand" "net/http" "net/url" "os" "strings" "time" "github.com/Azure/azure-storage-queue-go/azqueue" ) type ReturnValue struct { Data string } type InvokeResponse struct { Outputs map[string]interface{} Logs []string ReturnValue interface{} } type InvokeRequest struct { Data map[string]interface{} Metadata map[string]interface{} } func printDebug(format string, params ...interface{}){ log.SetOutput(os.Stdout) msg := fmt.Sprintf(format, params...) log.Printf("[DEBUG] %s\n", msg) } func printInfo(format string, params ...interface{}){ log.SetOutput(os.Stdout) msg := fmt.Sprintf(format, params...) log.Printf("[INFO] %s\n", msg) } func printError(format string, params ...interface{}){ log.SetOutput(os.Stderr) msg := fmt.Sprintf(format, params...) log.Printf("[ERROR] %s\n", msg) log.SetOutput(os.Stdout) } func init(){ log.SetOutput(os.Stdout) log.SetFlags(0) } func isLocalAccount(accountName string) bool { return accountName == "devstoreaccount1" } func getQueueUrlFormat(accountName string) string { queueUrlFormat := "https://%s.queue.core.windows.net/%s" if isLocalAccount(accountName) { queueUrlFormat = "http://127.0.0.1:10001/%s/%s" } return queueUrlFormat } /** * 環境変数の取得. */ func getEnv(envName string, defaultValue string) string { value, exists := os.LookupEnv(envName) if exists { return value } else { return defaultValue } } /** * キューアカウント情報の取得 */ func getRelayQueueInfo() (string, string, string, []string) { accountName := getEnv("ACCOUNT_NAME", "") accountKey := getEnv("ACCOUNT_KEY" , "") queueName0 := getEnv("BASE_QUEUE" , "") queueName1 := getEnv("RELAY_QUEUE1" , "") queueName2 := getEnv("RELAY_QUEUE2" , "") return accountName, accountKey, queueName0, []string{queueName1, queueName2} } /** * 過剰なエスケープを除去する. */ func removeExcessEscape(queueItem string) string { queueItem = strings.TrimRight(queueItem, "\"") queueItem = strings.TrimLeft(queueItem, "\"") queueItem = strings.Replace(queueItem, "\\r\\n", "\n", -1) queueItem = strings.Replace(queueItem, "\\n", "\n", -1) queueItem = strings.Replace(queueItem, "\\\"", "\"", -1) queueItem = strings.Replace(queueItem, "\\\\\"", "", -1) return queueItem } /** * 指定されたキューにメッセージを出力する. */ func relayQueueMessage(accountName string, accountKey string, queueName string, queueItem string){ queueUrlFormat := getQueueUrlFormat(accountName) u, _ := url.Parse(fmt.Sprintf(queueUrlFormat, accountName, queueName)) credential, err := azqueue.NewSharedKeyCredential(accountName, accountKey) if err != nil { log.Fatal(err) } queueUrl := azqueue.NewQueueURL(*u, azqueue.NewPipeline(credential, azqueue.PipelineOptions{})) ctx := context.TODO() msgUrl := queueUrl.NewMessagesURL() _, err = msgUrl.Enqueue(ctx, queueItem, 0, 0) if err != nil { log.Fatal("Error Enqueue: ", err) printInfo("Error relay queue. (%s), error: %v", queueName, err) } else { printInfo("Success relay queue. (%s)", queueName) } } /** * キューメッセージのコピー. */ func relayQueueHandler(w http.ResponseWriter, r *http.Request) { printInfo("[relayQueueHandler] START") defer func(){ err := recover() if err != nil { printError("ERROR relayQueueHandler: %v", err) panic(err) } else { printInfo("SUCCESS relayQueueHandler") } }() outputs := make(map[string]interface{}) var invokeReq InvokeRequest d := json.NewDecoder(r.Body) decodeErr := d.Decode(&invokeReq) if decodeErr != nil { http.Error(w, decodeErr.Error(), http.StatusBadRequest) return } // 何回目の試行か dequeueCount := fmt.Sprintf("%v",invokeReq.Metadata["DequeueCount"]) printInfo("[relayQueueHandler] dequeueCount: %v", dequeueCount) printDebug("[relayQueueHandler] invokeReq: %v", invokeReq) printDebug("[relayQueueHandler] queue metadata: %v", invokeReq.Metadata) // テスト用に一定の確率でエラーにする rand.Seed(time.Now().UnixNano()) if rand.Intn(100) >= 50 || dequeueCount != "1" { panic("Random Error!") } // キューメッセージを取得(BASE64デコードされた状態で取得される) queueItem := invokeReq.Data["queueItem"].(string) // 過剰エスケープを除去 decodedItem := removeExcessEscape(queueItem) printInfo("[relayQueueHandler] queue message(Value): %v", decodedItem) // キューメッセージをコピー(自動的にBASE64エンコードされる) outputs["relayqueue1"] = decodedItem outputs["relayqueue2"] = decodedItem // 正常終了 invokeResponse := InvokeResponse{Outputs: outputs, Logs: []string{} } js, err := json.Marshal(invokeResponse) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") w.Write(js) //http.Error(w, "process relayQueueHandler.", http.StatusInternalServerError) printInfo("[relayQueueHandler] END") } /** * 元キューに溜まっているメッセージをすべて処理する. */ func relayQueueAllHandler(w http.ResponseWriter, r *http.Request) { printInfo("[relayQueueAllHandler] START") // キューのアカウント情報等を取得 accountName, accountKey, baseQueue, queueNames := getRelayQueueInfo() // メッセージのコピーを実行 retryRelayQueue(accountName, accountKey, baseQueue, queueNames) printInfo("[relayQueueAllHandler] END") w.Header().Set("Content-Type", "application/json") w.Write([]byte("{\"message\": \"relayQueue\"}")) } /** * 有害キューに溜まっているメッセージをすべて処理する. */ func relayPoisonQueueHandler(w http.ResponseWriter, r *http.Request) { printInfo("[relayPoisonQueueHandler] START") // キューのアカウント情報等を取得 accountName, accountKey, baseQueue, queueNames := getRelayQueueInfo() poisonQueue := fmt.Sprintf("%s-poison", baseQueue) // メッセージのコピーを実行 retryRelayQueue(accountName, accountKey, poisonQueue, queueNames) printInfo("[relayPoisonQueueHandler] END") w.Header().Set("Content-Type", "application/json") w.Write([]byte("{\"message\": \"relayQueue\"}")) } /** * キューに溜まっているメッセージをリレー先キューにコピーする. */ func retryRelayQueue(accountName string, accountKey string, targetQueue string, queueNames []string) { printInfo("[retryRelayQueue] START") // キューURLの取得 credential, err := azqueue.NewSharedKeyCredential(accountName, accountKey) if err != nil { panic(fmt.Sprintf("NewSharedKeyCredential error: %v", err)) } queueUrlFormat := getQueueUrlFormat(accountName) u, _ := url.Parse(fmt.Sprintf(queueUrlFormat, accountName, targetQueue)) queueUrl := azqueue.NewQueueURL(*u, azqueue.NewPipeline(credential, azqueue.PipelineOptions{})) // メッセージ一覧を取得 queueCtx := context.TODO() maxMessages := int32(32) // 最大件数 visibilityTimeout := time.Second * 10 // 可視性タイムアウト msgUrl := queueUrl.NewMessagesURL() dequeueResp, err := msgUrl.Dequeue(queueCtx, maxMessages, visibilityTimeout) if err != nil { panic(err) } else { // 残っているメッセージを全て処理 for i := int32(0); i < dequeueResp.NumMessages(); i++ { // リレー用のキューにコピー msg := dequeueResp.Message(i) // メッセージをいったんBASE64デコード // (元メッセージがエンコードされていないケースも想定してデコード&エンコードし直しておく) planTextMessage := "" messageBytes, err := base64.StdEncoding.DecodeString(msg.Text) if err != nil { planTextMessage = msg.Text } else { planTextMessage = removeExcessEscape(string(messageBytes)) } printDebug("queueMessage: %s", msg.Text) printDebug("textMessage : %s", planTextMessage) // メッセージを再度BASE64エンコード encodedMessage := base64.StdEncoding.EncodeToString([]byte(planTextMessage)) // メッセージ(JSON)をさらに構造体に変換する場合 //var queueData map[string]interface{} //decodeErr := json.Unmarshal([]byte(planTextMessage), &queueData) //if decodeErr != nil { // printError("json decode error: %v",decodeErr.Error()) // panic(decodeErr) //} //printDebug("%v", queueData) //message := queueData["message"].(string) //printDebug("copy to %v, queueItem: %s, message: %s", queueNames, queueItem, message) // メッセージを対象のキューにコピー for _, queueName := range queueNames { printDebug("copy to %s, queueItem: %s", queueName, planTextMessage) relayQueueMessage(accountName, accountKey, queueName, encodedMessage) } // 元キューからメッセージを削除 msgIdUrl := msgUrl.NewMessageIDURL(msg.ID) _, err = msgIdUrl.Delete(queueCtx, msg.PopReceipt) if err != nil { printError("Error delete message. %s, %v", planTextMessage, err) } else { printInfo("Success delete message %s", planTextMessage) } } } printInfo("[retryRelayQueue] END") } /** * メイン処理. */ func main() { httpInvokerPort, exists := os.LookupEnv("FUNCTIONS_HTTPWORKER_PORT") if exists { printInfo("FUNCTIONS_HTTPWORKER_PORT: " + httpInvokerPort) } mux := http.NewServeMux() mux.HandleFunc("/RelayQueue", relayQueueHandler) mux.HandleFunc("/RelayQueueAll", relayQueueAllHandler) mux.HandleFunc("/RelayPoisonQueue", relayPoisonQueueHandler) log.Println("Go server Listening...on httpInvokerPort:", httpInvokerPort) log.Fatal(http.ListenAndServe(":"+httpInvokerPort, mux)) } }} #html(</div>) #html(<div id="tabs1-4">) RelayQueue/function.json #mycode2(){{ { "bindings": [ { "name": "queueItem", "type": "queueTrigger", "direction": "in", "queueName": "my-base-queue", "connection": "AzureWebJobsStorage" }, { "name": "relayqueue1", "type": "queue", "direction": "out", "queueName": "my-queue1", "connection": "AzureWebJobsStorage" }, { "name": "relayqueue2", "type": "queue", "direction": "out", "queueName": "my-queue2", "connection": "AzureWebJobsStorage" } ] } }} #html(</div>) #html(<div id="tabs1-5">) 元キューの定期処理用の定義。ここでは 12時間間隔とした。 RelayQueueAll/function.json #mycode2(){{ { "bindings": [ { "name": "myTimer", "type": "timerTrigger", "direction": "in", "schedule": "0 0 */12 * * *", "useMonitor": true } ] } }} #html(</div>) #html(<div id="tabs1-6">) 有害キューの定期処理用の定義。ここではテスト用に1分間隔とした。 RelayPoisonQueue/function.json #mycode2(){{ { "bindings": [ { "name": "myTimer", "type": "timerTrigger", "direction": "in", "schedule": "0 * * * * *", "useMonitor": true } ] } }} #html(</div>) #html(</div>) // END tabs1 #html(<script>$(function() { $("#tabs1").tabs(); });</script>) #html(</div>) * デプロイ/動作確認用シェル [#zaed8683] #html(<div class="pl10">) // START tabs2 #html(){{ <div id="tabs2"> <ul> <li><a href="#tabs2-1">0_env.sh</a></li> <li><a href="#tabs2-2">1_resources.sh</a></li> <li><a href="#tabs2-3">2_put_message.sh</a></li> <li><a href="#tabs2-4">X1_local_start.sh</a></li> <li><a href="#tabs2-5">X2_local_message_put.sh</a></li> <li><a href="#tabs2-6">X3_local_message_peek.sh</a></li> <li><a href="#tabs2-7">X4_local_message_clear.sh</a></li> </ul> }} #html(<div id="tabs2-1">) 0_env.sh #mycode2(){{ #!/bin/bash # 全てのリソース名に付与する接頭文字 (Storageアカウント名などは世界でユニークな必要があるので他ユーザと被らないような名前を付ける) PREFIX=xxxxxxxxxx # サブスクリプションID (Application Insight を使用しない場合は空でも可) subscriptionId=xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx # リージョン region=japanwest insightsRegion=japaneast # 2020/7時点では Appplication Insights で西日本(japanwest)は使用できない # リソースグループ名 resourceGroup=${PREFIX}ResourceGroup # ストレージアカウント名 storageAccountName=${PREFIX}straccount storageSku=Standard_LRS # Storageキュー名 storageQueue0=my-base-queue storageQueue1=my-queue1 storageQueue2=my-queue2 # 関数アプリ名 funcAppName1=${PREFIX}RelayFunc funcAppName2=${PREFIX}Queue1Func # 使用するFunctionsのバージョン funcVersion=2 # 関数アプリのプラン名 funcPlanName=${PREFIX}plan #funcPlanSku=EP1 funcPlanSku=B1 # Application insights insightsName=${PREFIX}Insights insightsDays=30 insightsSetting="" if [ "$subscriptionId" != "" ]; then insightsSetting="--app-insights $insightsName" fi }} #html(</div>) #html(<div id="tabs2-2">) 1_resources.sh #mycode2(){{ #!/bin/bash source ./0_env.sh # リソースの作成 if [ "$1" == "--create" ]; then #------------------------------- # リソースグループの作成 #------------------------------- echo az group create az group create \ --name $resourceGroup \ --location $region #------------------------------- # ストレージアカウントの作成 #------------------------------- echo az storage account create az storage account create \ --name $storageAccountName \ --location $region \ --resource-group $resourceGroup \ --sku $storageSku # ストレージアカウントキーを取得 storageAccountKey=`az storage account keys list -n $storageAccountName -g $resourceGroup -o table | grep key1 | awk '{print $NF}'` #------------------------------- # Storageキューの作成 # ※-poisonキューはエラー時に自動で作成されるが、先に作成しておいても問題ない。 #------------------------------- echo "az storage queue create" az storage queue create --name $storageQueue0 --account-name $storageAccountName --account-key $storageAccountKey az storage queue create --name ${storageQueue0}-poison --account-name $storageAccountName --account-key $storageAccountKey az storage queue create --name $storageQueue1 --account-name $storageAccountName --account-key $storageAccountKey az storage queue create --name $storageQueue2 --account-name $storageAccountName --account-key $storageAccountKey #------------------------------- # Application Insights 拡張が利用できない場合は追加インストール #------------------------------- x=`az monitor app-insights --help 2>&1` if [ "$?" != "0" ]; then az extension add -n application-insights fi #------------------------------- # Application Insights コンポーネント作成 #------------------------------- echo az monitor app-insights component create if [ "$subscriptionId" != "" ]; then az monitor app-insights component create \ --app $insightsName \ --location $insightsRegion \ --resource-group $resourceGroup \ --query-access Enabled \ --retention-time $insightsDays \ --subscription $subscriptionId fi #------------------------------- # キューリレー用の関数 #------------------------------- # 関数プランの作成 echo az functionapp plan create az functionapp plan create \ --name $funcPlanName \ --resource-group $resourceGroup \ --location $region \ --sku $funcPlanSku # 関数アプリの作成 echo az functionapp create az functionapp create \ --name $funcAppName1 \ --storage-account $storageAccountName \ --plan $funcPlanName \ --resource-group $resourceGroup \ --functions-version $funcVersion $insightsSetting # 関数アプリの環境変数の設定 echo "az functionapp config appsettings" az functionapp config appsettings set \ --name $funcAppName1 \ --resource-group $resourceGroup \ --settings "ACCOUNT_NAME=$storageAccountName" \ "ACCOUNT_KEY=$storageAccountKey" \ "BASE_QUEUE=$storageQueue0" \ "RELAY_QUEUE1=$storageQueue1" \ "RELAY_QUEUE2=$storageQueue2" # 関数アプリのデプロイ # すぐにデプロイするとエラー(Timeout)になる場合がある為、少し待つ echo "sleep 10 seconds..." sleep 10 cd functions exefile=`cat host.json | grep defaultExecutablePath | awk '{print $2}' | sed 's/"//g'` rm -rf $exefile GOOS=windows GOARCH=amd64 go build -o $exefile zip -r ../functions.zip * cd ../ az functionapp deployment source config-zip -g $resourceGroup -n $funcAppName1 --src functions.zip rm -rf functions.zip fi # リソースの削除 if [ "$1" == "--delete" ]; then az group delete --name $resourceGroup fi }} #html(</div>) #html(<div id="tabs2-3">) 2_put_message.sh #mycode2(){{ #!/bin/bash source ./0_env.sh date_text=`date "+%Y-%m-%d %H:%M:%S"` message=`cat << _EOF_ {"message": "$date_text"} _EOF_ ` base64_message=`echo $message | base64` # ストレージアカウントキーを取得 storageAccountKey=`az storage account keys list -n $storageAccountName -g $resourceGroup -o table | grep key1 | awk '{print $NF}'` # キューにメッセージをPUT az storage message put \ --account-name $storageAccountName \ --account-key "$storageAccountKey" \ --queue-name $storageQueue0 \ --content "$base64_message" }} #html(</div>) #html(<div id="tabs2-4">) X1_local_start.sh #mycode2(){{ #!/bin/bash source ./0_env.sh # ローカルのストレージエミュレータ起動 mkdir -p local_azurite azurite --silent --location `pwd`/local_azurite & # ストレージキュー作成 export AZURE_STORAGE_CONNECTION_STRING="UseDevelopmentStorage=true" az storage queue create --name $storageQueue0 az storage queue create --name ${storageQueue0}-poison az storage queue create --name $storageQueue1 az storage queue create --name $storageQueue2 # 関数アプリの起動 cd ./functions exefile=`cat host.json | grep defaultExecutablePath | awk '{print $2}' | sed 's/"//g'` go build -o $exefile func start rm -rf $exefile export AZURE_STORAGE_CONNECTION_STRING= cd ../ }} #html(</div>) #html(<div id="tabs2-5">) X2_local_message_put.sh #mycode2(){{ #!/bin/bash source ./0_env.sh date_text=`date "+%Y-%m-%d %H:%M:%S"` message=`cat << _EOF_ {"message": "$date_text"} _EOF_ ` base64_message=`echo $message | base64` if [ "$1" == "--error" ]; then # Base64エンコードせずに出力 base64_message="$message" fi export AZURE_STORAGE_CONNECTION_STRING="UseDevelopmentStorage=true" az storage message put \ --queue-name $storageQueue0 \ --content "$base64_message" export AZURE_STORAGE_CONNECTION_STRING= }} #html(</div>) #html(<div id="tabs2-6">) X3_local_message_peek.sh #mycode2(){{ #!/bin/bash source ./0_env.sh export AZURE_STORAGE_CONNECTION_STRING="UseDevelopmentStorage=true" echo "-- $storageQueue0 --" az storage message peek --queue-name $storageQueue0 -o table --num-messages 32 echo "" echo "-- $storageQueue1 --" az storage message peek --queue-name $storageQueue1 -o table --num-messages 32 echo "" echo "-- $storageQueue2 --" az storage message peek --queue-name $storageQueue2 -o table --num-messages 32 echo "" echo "-- ${storageQueue0}-poison --" az storage message peek --queue-name ${storageQueue0}-poison -o table --num-messages 32 echo "" export AZURE_STORAGE_CONNECTION_STRING= }} #html(</div>) #html(<div id="tabs2-7">) X4_local_message_clear.sh #mycode2(){{ #!/bin/bash source ./0_env.sh export AZURE_STORAGE_CONNECTION_STRING="UseDevelopmentStorage=true" az storage message clear -q $storageQueue0 az storage message clear -q $storageQueue1 az storage message clear -q $storageQueue2 az storage message clear -q ${storageQueue0}-poison export AZURE_STORAGE_CONNECTION_STRING= }} #html(</div>) #html(</div>) // END tabs2 #html(<script>$(function() { $("#tabs2").tabs(); });</script>) #html(</div>) * 動作確認(ローカル) [#l6d1d871] #html(<div class="pl10">) ** ローカルで関数アプリ起動 [#ca017985] #myterm2(){{ ./X1_local_start.sh }} ** キューにメッセージを出力 [#qb3bf412] #myterm2(){{ ./X2_local_message_put.sh }} キュー1 及び キュー2の内容をStorageExplorerで確認 #html(<div class="images">) #ref(azure_queue_trigger_test1.png,nolink); #html(</div>) #html(<div class="images">) #ref(azure_queue_trigger_test2.png,nolink); #html(</div>) ** エラーになるメッセージをキューに出力してみる [#a27b5b19] &color(red){''Queueトリガーで処理されるデータはBASE64エンコードされている必要があるらしい''}; #myterm2(){{ ./X2_local_message_put.sh --error }} 関数アプリのコンソールを確認してみると、エラーになってから、有害キューの処理関数で正常に処理できている模様。 #myterm2(){{ [2020/11/07 18:43:26] Executed 'Functions.RelayQueue' (Failed, Id=d3932930-eac9-4112-b565-824975810193) [2020/11/07 18:43:26] System.Private.CoreLib: Exception while executing function: Functions.RelayQueue. Microsoft.Azure.WebJobs.Host: Exception binding parameter 'queueItem'. System.Private.CoreLib: The input is not a valid Base-64 string as it contains a non-base 64 character, more than two padding characters, or an illegal character among the padding characters. [2020/11/07 18:43:26] Executing 'Functions.RelayQueue' (Reason='New queue message detected on 'my-base-queue'.', Id=4b9c8787-c81f-4449-b0d6-80bab35c1579) [2020/11/07 18:43:26] Trigger Details: MessageId: ad4910c8-7097-4c0b-9714-ce5ee64ff8eb, DequeueCount: 2, InsertionTime: 2020-11-07T18:43:24.000+00:00 [2020/11/07 18:43:26] Executed 'Functions.RelayQueue' (Failed, Id=4b9c8787-c81f-4449-b0d6-80bab35c1579) [2020/11/07 18:43:26] System.Private.CoreLib: Exception while executing function: Functions.RelayQueue. Microsoft.Azure.WebJobs.Host: Exception binding parameter 'queueItem'. System.Private.CoreLib: The input is not a valid Base-64 string as it contains a non-base 64 character, more than two padding characters, or an illegal character among the padding characters. [2020/11/07 18:43:26] Executing 'Functions.RelayQueue' (Reason='New queue message detected on 'my-base-queue'.', Id=24dab509-29f6-483a-90ed-a9de475b80e3) [2020/11/07 18:43:26] Trigger Details: MessageId: ad4910c8-7097-4c0b-9714-ce5ee64ff8eb, DequeueCount: 3, InsertionTime: 2020-11-07T18:43:24.000+00:00 [2020/11/07 18:43:26] Executed 'Functions.RelayQueue' (Failed, Id=24dab509-29f6-483a-90ed-a9de475b80e3) [2020/11/07 18:43:26] System.Private.CoreLib: Exception while executing function: Functions.RelayQueue. Microsoft.Azure.WebJobs.Host: Exception binding parameter 'queueItem'. System.Private.CoreLib: The input is not a valid Base-64 string as it contains a non-base 64 character, more than two padding characters, or an illegal character among the padding characters. [2020/11/07 18:43:26] Executing 'Functions.RelayQueue' (Reason='New queue message detected on 'my-base-queue'.', Id=6f9410d1-67ff-4393-844c-0da005845167) [2020/11/07 18:43:26] Trigger Details: MessageId: ad4910c8-7097-4c0b-9714-ce5ee64ff8eb, DequeueCount: 4, InsertionTime: 2020-11-07T18:43:24.000+00:00 [2020/11/07 18:43:26] Executed 'Functions.RelayQueue' (Failed, Id=6f9410d1-67ff-4393-844c-0da005845167) [2020/11/07 18:43:26] System.Private.CoreLib: Exception while executing function: Functions.RelayQueue. Microsoft.Azure.WebJobs.Host: Exception binding parameter 'queueItem'. System.Private.CoreLib: The input is not a valid Base-64 string as it contains a non-base 64 character, more than two padding characters, or an illegal character among the padding characters. [2020/11/07 18:43:26] Executing 'Functions.RelayQueue' (Reason='New queue message detected on 'my-base-queue'.', Id=ddd65b87-2d5b-4a74-9a6e-a44d7b9075b4) [2020/11/07 18:43:26] Trigger Details: MessageId: ad4910c8-7097-4c0b-9714-ce5ee64ff8eb, DequeueCount: 5, InsertionTime: 2020-11-07T18:43:24.000+00:00 [2020/11/07 18:43:26] Executed 'Functions.RelayQueue' (Failed, Id=ddd65b87-2d5b-4a74-9a6e-a44d7b9075b4) [2020/11/07 18:43:26] System.Private.CoreLib: Exception while executing function: Functions.RelayQueue. Microsoft.Azure.WebJobs.Host: Exception binding parameter 'queueItem'. System.Private.CoreLib: The input is not a valid Base-64 string as it contains a non-base 64 character, more than two padding characters, or an illegal character among the padding characters. [2020/11/07 18:43:26] Message has reached MaxDequeueCount of 5. Moving message to queue 'my-base-queue-poison'. : : [2020/11/07 18:44:00] Executing 'Functions.RelayPoisonQueue' (Reason='Timer fired at 2020-11-08T03:44:00.0050040+09:00', Id=2db55918-246c-4447-963f-c93a3185362e) [2020/11/07 18:44:00] [INFO] [relayPoisonQueueHandler] START [2020/11/07 18:44:00] [INFO] [retryRelayQueue] START [2020/11/07 18:44:00] [DEBUG] queueMessage: {"message": "2020-11-08 03:43:23"} [2020/11/07 18:44:00] [DEBUG] textMessage : {"message": "2020-11-08 03:43:23"} [2020/11/07 18:44:00] [DEBUG] copy to my-queue1, queueItem: {"message": "2020-11-08 03:43:23"} [2020/11/07 18:44:00] [INFO] Success relay queue. (my-queue1) [2020/11/07 18:44:00] [DEBUG] copy to my-queue2, queueItem: {"message": "2020-11-08 03:43:23"} [2020/11/07 18:44:00] [INFO] Success relay queue. (my-queue2) [2020/11/07 18:44:00] [INFO] Success delete message {"message": "2020-11-08 03:43:23"} [2020/11/07 18:44:00] [INFO] [retryRelayQueue] END [2020/11/07 18:44:00] Executed 'Functions.RelayPoisonQueue' (Succeeded, Id=2db55918-246c-4447-963f-c93a3185362e) [2020/11/07 18:44:00] [INFO] [relayPoisonQueueHandler] END }} キューの状態を確認してみると有害キューからメッセージがなくなって、リレー先のキューにメッセージがコピーされている事がわかる。 [有害キューが処理される前の状態] #myterm2(){{ ./X3_local_message_peek.sh -- my-base-queue -- -- my-queue1 -- MessageId Content InsertionTime ExpirationTime DequeueCount ------------------------------------ -------------------------------------------------------- ------------------------- ------------------------- -------------- 3aeb0ea8-8a07-4168-9e0e-9264566bfdc0 IntcIm1lc3NhZ2VcIjogXCIyMDIwLTExLTA4IDAzOjQyOjA3XCJ9XG4i 2020-11-07T18:42:08+00:00 2020-11-14T18:42:08+00:00 0 7b8c9fc3-561d-4bde-a669-d696e0098d2e eyJtZXNzYWdlIjogIjIwMjAtMTEtMDggMDM6NDI6MTQifQo= 2020-11-07T18:43:00+00:00 2020-11-14T18:43:00+00:00 0 -- my-queue2 -- MessageId Content InsertionTime ExpirationTime DequeueCount ------------------------------------ -------------------------------------------------------- ------------------------- ------------------------- -------------- 1991cce6-8783-4823-848d-18078700ca8b IntcIm1lc3NhZ2VcIjogXCIyMDIwLTExLTA4IDAzOjQyOjA3XCJ9XG4i 2020-11-07T18:42:08+00:00 2020-11-14T18:42:08+00:00 0 953147c2-0c83-4af3-b11b-af9b45c02b3b eyJtZXNzYWdlIjogIjIwMjAtMTEtMDggMDM6NDI6MTQifQo= 2020-11-07T18:43:00+00:00 2020-11-14T18:43:00+00:00 0 -- my-base-queue-poison -- MessageId Content InsertionTime ExpirationTime DequeueCount ------------------------------------ ---------------------------------- ------------------------- ------------------------- -------------- 61664ff4-5566-4722-a7e9-6b1b337cdd4d {"message": "2020-11-08 03:43:23"} 2020-11-07T18:43:26+00:00 2020-11-14T18:43:26+00:00 0 }} [有害キューが処理された後の状態] #myterm2(){{ -- my-base-queue -- -- my-queue1 -- MessageId Content InsertionTime ExpirationTime DequeueCount ------------------------------------ -------------------------------------------------------- ------------------------- ------------------------- -------------- 3aeb0ea8-8a07-4168-9e0e-9264566bfdc0 IntcIm1lc3NhZ2VcIjogXCIyMDIwLTExLTA4IDAzOjQyOjA3XCJ9XG4i 2020-11-07T18:42:08+00:00 2020-11-14T18:42:08+00:00 0 7b8c9fc3-561d-4bde-a669-d696e0098d2e eyJtZXNzYWdlIjogIjIwMjAtMTEtMDggMDM6NDI6MTQifQo= 2020-11-07T18:43:00+00:00 2020-11-14T18:43:00+00:00 0 88a05f28-4db8-4ec3-ba96-6eaf6236caa7 eyJtZXNzYWdlIjogIjIwMjAtMTEtMDggMDM6NDM6MjMifQ== 2020-11-07T18:44:00+00:00 2020-11-14T18:44:00+00:00 0 -- my-queue2 -- MessageId Content InsertionTime ExpirationTime DequeueCount ------------------------------------ -------------------------------------------------------- ------------------------- ------------------------- -------------- 1991cce6-8783-4823-848d-18078700ca8b IntcIm1lc3NhZ2VcIjogXCIyMDIwLTExLTA4IDAzOjQyOjA3XCJ9XG4i 2020-11-07T18:42:08+00:00 2020-11-14T18:42:08+00:00 0 953147c2-0c83-4af3-b11b-af9b45c02b3b eyJtZXNzYWdlIjogIjIwMjAtMTEtMDggMDM6NDI6MTQifQo= 2020-11-07T18:43:00+00:00 2020-11-14T18:43:00+00:00 0 25e2ec62-c380-4517-b31b-685c8ff7b165 eyJtZXNzYWdlIjogIjIwMjAtMTEtMDggMDM6NDM6MjMifQ== 2020-11-07T18:44:00+00:00 2020-11-14T18:44:00+00:00 0 -- my-base-queue-poison -- }} #html(</div>) * 動作確認(Azure) [#e422a399] #html(<div class="pl10">) ** デプロイ [#u75a0b91] #myterm2(){{ ./1_resources.sh }} ** キューにメッセージを出力 [#na17bcb3] #myterm2(){{ ./2_put_message.sh }} キュー1 及び キュー2の内容をStorageExplorerで確認(ローカルと同じなので省略) #html(</div>)