#mynavi(Azureメモ);
#setlinebreak(on);

#html(){{
<style>
.images img {
  border: 1px solid #333;
  margin: 0 0 20px 0 !important;
}
.img_margin {
  margin: 0 !important;
}
</style>
}}

* 概要 [#pd7305e0]
#html(<div class="pl10">)
Queueトリガーを使用してメッセージを他のキューにコピーする方法について記載する。
#html(</div>)

* Queueトリガーを利用する際の注意点 [#o413c452]
#html(<div class="pl10">)
Queueトリガーを利用する場合、以下の点に注意する必要がある。

- Queueトリガーで Functions を起動した場合、正常終了、異常終了に関わらず元キューの対象メッセージは自動的に削除される。
- Queueトリガーのエラー時にはリトライが行われるが、全てのリトライに失敗した場合は有害キューに出力される。(元キューには残らない)&br; ※有害キューの名前は {元キュー名}-poison
- Queueトリガーで関数を起動をする場合は、キューメッセージは BASE64エンコードされている必要がある。&br; ※BASE64エンコードされていない場合はエラーになる。(関数の処理まで到達しない)
- Queueトリガーのエラー時にはリトライが行われるが、&color(red){全てのリトライに失敗した場合は有害キューに出力される};。(&color(red){''元キューには残らない''};)&br; ※有害キューの名前は {元キュー名}-poison
- Queueトリガーで関数を起動をする場合は、&color(red){キューメッセージは BASE64エンコードされている必要がある};。&br; ※BASE64エンコードされていない場合はエラーになる。(関数の処理まで到達しない)

#html(</div>)


* 構築イメージ [#d6378432]
#html(<div class="pl10">)

上記の注意点を踏まえて、下図の通り構築する。

#html(<div class="images">)
元キューにメッセージが追加されたら Queue トリガーで 関数アプリを起動して 他の2つのキューにメッセージをコピーする。

#ref(image1.png,nolink);
#html(</div>)

#html(<div class="images">)
何らかの要因で関数アプリが動作していなかった場合を考慮して、Timerトリガーで定期的に元キューを監視しておく。
こちらは元キューに残っている全てのメッセージを取得して、外の2つのキューにコピーする。(ただし当サンプルでは最大32メッセージまで)

#ref(proc_image2.png,nolink);
#html(</div>)

#html(<div class="images">)
&color(red){Queue トリガーでエラーになったメッセージは元キューには残らず、有害キューに出力される};ので、有害キューを定期的に処理する関数も用意しておく。
尚、有害キューの名前は {元キュー名}-poison となる。

#ref(proc_image3.png,nolink);
#html(</div>)

#html(</div>)

* 目次 [#q6a1cf43]
#contents
- 関連
-- [[Azureメモ]]
-- [[Azure Blobトリガーで起動される関数をリトライで再利用する]]
-- [[GoでAzureのストレージキューの読み書き]]

* 作成するファイル [#q3c136ab]
#html(<div class="pl10">)

#html(){{
<div style="padding: 0px 10px 10px 10px; border: 1px solid #333;">
<pre style="display: inline-block; margin: 0; padding-right: 10px; font-size: 1rem; background: transparent; border: 0px; vertical-align: top;">
.
├─ 0_env.sh
├─ 1_resources.sh
├─ 2_put_message.sh
├─ X1_local_start.sh
├─ X2_local_message_put.sh
├─ X3_local_message_peek.sh
├─ X4_local_message_clear.sh
├─ functions
│&#160;&#160; ├─ server.go
│&#160;&#160; ├─ host.json
│&#160;&#160; └─ local.settings.json
│&#160;&#160; ├─ RelayQueue
│&#160;&#160; │&#160;&#160; └─ function.json
│&#160;&#160; ├─ RelayQueueAll
│&#160;&#160; │&#160;&#160; └─ function.json
│&#160;&#160; ├─ RelayPoisonQueue
│&#160;&#160; │&#160;&#160; └─ function.json
</pre>
<pre style="display: inline-block; margin: 0;  font-size: 1rem; background: transparent; border: 0px; vertical-align: top;">
 
... リソース名の定義など
... リソース作成用のシェル
... キューへのメッセージ出力用のシェル
... (ローカル確認用) 関数アプリ起動用シェル
... (ローカル確認用) ストレージエミュレータへのメッセージ出力用シェル
... (ローカル確認用) ストレージエミュレータのキューに溜まっているメッセージの確認用シェル
... (ローカル確認用) ストレージエミュレータのキューに溜まっているメッセージをクリアするシェル
 
... 関数ソース(Goによるカスタムハンドラー)
... 関数アプリの定義ファイル
... ローカル実行用の設定ファイル
 
... 元キューにメッセージが出力されたタイミングで起動する関数の定義(Queueトリガー)
 
... 元キューに残っているメッセージを定期的に処理する為の関数の定義(Timerトリガー)
 
... 有害キューに溜まっているメッセージを定期的に処理する為の関数の定義(Timerトリガー)
</pre>
</div>
}}

#html(</div>)


* 関数ソース(functions配下) [#d2a8dd5e]
#html(<div class="pl10">)

#html(){{
<div id="tabs1">
  <ul>
    <li><a href="#tabs1-1">host.json</a></li>
    <li><a href="#tabs1-2">local.settings.json</a></li>
    <li><a href="#tabs1-3">server.go</a></li>
    <li><a href="#tabs1-4">RelayQueue/function.json</a></li>
    <li><a href="#tabs1-5">RelayQueueAll/function.json</a></li>
    <li><a href="#tabs1-6">RelayPoisonQueue/function.json</a></li>
  </ul>
}}

#html(<div id="tabs1-1">)

host.json
#mycode2(){{
{
    "version": "2.0",
    "httpWorker": {
        "description": {
            "defaultExecutablePath": "server.exe"
        }   
    },  
    "extensions": {
        "queues": {
            "maxPollingInterval": "00:00:10",
            "visibilityTimeout" : "00:00:00",
            "batchSize": 16, 
            "maxDequeueCount": 5,
            "newBatchThreshold": 8
        }   
    },  
    "extensionBundle": {
        "id": "Microsoft.Azure.Functions.ExtensionBundle",
        "version": "[1.*, 2.0.0)"
    }   
}
}}

#html(</div>)

#html(<div id="tabs1-2">)

local.settings.json
#mycode2(){{
{
    "IsEncrypted": false,
    "Values": {
      "AzureWebJobsStorage": "UseDevelopmentStorage=true",
      "ACCOUNT_NAME": "devstoreaccount1",
      "ACCOUNT_KEY": "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==",
      "BASE_QUEUE": "my-base-queue",
      "RELAY_QUEUE1": "my-queue1",
      "RELAY_QUEUE2": "my-queue2"
    }   
}
}}
※アカウントキーはローカルエミュレータ(Azurite)用のもの。

#html(</div>)

#html(<div id="tabs1-3">)

server.go
#mycode2(){{
package main

import (
    "context"
    "encoding/base64"
    "encoding/json"
    "fmt"
    "log"
    "math/rand"
    "net/http"
    "net/url"
    "os"
    "strings"
    "time"
    "github.com/Azure/azure-storage-queue-go/azqueue"
)

type ReturnValue struct {
    Data string
}
type InvokeResponse struct {
    Outputs     map[string]interface{}
    Logs        []string
    ReturnValue interface{}
}

type InvokeRequest struct {
    Data     map[string]interface{}
    Metadata map[string]interface{}
}

func printDebug(format string, params ...interface{}){
    log.SetOutput(os.Stdout)
    msg := fmt.Sprintf(format, params...)
    log.Printf("[DEBUG] %s\n", msg)
}

func printInfo(format string, params ...interface{}){
    log.SetOutput(os.Stdout)
    msg := fmt.Sprintf(format, params...)
    log.Printf("[INFO] %s\n", msg)
}

func printError(format string, params ...interface{}){
    log.SetOutput(os.Stderr)
    msg := fmt.Sprintf(format, params...)
    log.Printf("[ERROR] %s\n", msg)
    log.SetOutput(os.Stdout)
}

func init(){
    log.SetOutput(os.Stdout)
    log.SetFlags(0)
}

func isLocalAccount(accountName string) bool {
    return accountName == "devstoreaccount1"
}

func getQueueUrlFormat(accountName string) string {
    queueUrlFormat := "https://%s.queue.core.windows.net/%s"
    if isLocalAccount(accountName) {
        queueUrlFormat = "http://127.0.0.1:10001/%s/%s"
    }
    return queueUrlFormat
}


/**
 * 環境変数の取得.
 */
func getEnv(envName string, defaultValue string) string {
    value, exists := os.LookupEnv(envName)
    if exists {
        return value
    } else {
        return defaultValue
    }
}

/**
 * キューアカウント情報の取得
 */
func getRelayQueueInfo() (string, string, string, []string) {
    accountName := getEnv("ACCOUNT_NAME", "")
    accountKey  := getEnv("ACCOUNT_KEY" , "")
    queueName0  := getEnv("BASE_QUEUE" , "")
    queueName1  := getEnv("RELAY_QUEUE1" , "")
    queueName2  := getEnv("RELAY_QUEUE2" , "")
    return accountName, accountKey, queueName0, []string{queueName1, queueName2}
}

/**
 * 過剰なエスケープを除去する.
 */
func removeExcessEscape(queueItem string) string {
    queueItem = strings.TrimRight(queueItem, "\"")
    queueItem = strings.TrimLeft(queueItem, "\"")
    queueItem = strings.Replace(queueItem, "\\r\\n", "\n", -1)
    queueItem = strings.Replace(queueItem, "\\n", "\n", -1)
    queueItem = strings.Replace(queueItem, "\\\"", "\"", -1)
    queueItem = strings.Replace(queueItem, "\\\\\"", "", -1)
    return queueItem
}

/**
 * 指定されたキューにメッセージを出力する.
 */
func relayQueueMessage(accountName string, accountKey string, queueName string, queueItem string){

    queueUrlFormat := getQueueUrlFormat(accountName)
    u, _ := url.Parse(fmt.Sprintf(queueUrlFormat, accountName, queueName))
    credential, err := azqueue.NewSharedKeyCredential(accountName, accountKey)
    if err != nil {
        log.Fatal(err)
    }
    queueUrl := azqueue.NewQueueURL(*u, azqueue.NewPipeline(credential, azqueue.PipelineOptions{}))

    ctx := context.TODO()
    msgUrl := queueUrl.NewMessagesURL()
    _, err = msgUrl.Enqueue(ctx, queueItem, 0, 0)
    if err != nil {
        log.Fatal("Error Enqueue: ", err)
        printInfo("Error relay queue. (%s), error: %v", queueName, err)
    } else {
        printInfo("Success relay queue. (%s)", queueName)
    }
}


/**
 * キューメッセージのコピー.
 */
func relayQueueHandler(w http.ResponseWriter, r *http.Request) {

    printInfo("[relayQueueHandler] START")

    defer func(){
        err := recover()
        if err != nil {
            printError("ERROR relayQueueHandler: %v", err)
            panic(err)
        } else {
            printInfo("SUCCESS relayQueueHandler")
        }
    }()

    outputs := make(map[string]interface{})

    var invokeReq InvokeRequest
    d := json.NewDecoder(r.Body)
    decodeErr := d.Decode(&invokeReq)
    if decodeErr != nil {
        http.Error(w, decodeErr.Error(), http.StatusBadRequest)
        return
    }

    // 何回目の試行か
    dequeueCount := fmt.Sprintf("%v",invokeReq.Metadata["DequeueCount"])

    printInfo("[relayQueueHandler] dequeueCount: %v", dequeueCount)
    printDebug("[relayQueueHandler] invokeReq: %v", invokeReq)
    printDebug("[relayQueueHandler] queue metadata: %v", invokeReq.Metadata)

    // テスト用に一定の確率でエラーにする
    rand.Seed(time.Now().UnixNano())
    if rand.Intn(100) >= 50 || dequeueCount != "1" {
        panic("Random Error!")
    }

    // キューメッセージを取得(BASE64デコードされた状態で取得される)
    queueItem := invokeReq.Data["queueItem"].(string)

    // 過剰エスケープを除去
    decodedItem := removeExcessEscape(queueItem)
    printInfo("[relayQueueHandler] queue message(Value): %v", decodedItem)

    // キューメッセージをコピー(自動的にBASE64エンコードされる)
    outputs["relayqueue1"] = decodedItem
    outputs["relayqueue2"] = decodedItem

    // 正常終了
    invokeResponse := InvokeResponse{Outputs: outputs, Logs: []string{} }
    js, err := json.Marshal(invokeResponse)
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    w.Header().Set("Content-Type", "application/json")
    w.Write(js)
    //http.Error(w, "process relayQueueHandler.", http.StatusInternalServerError)

    printInfo("[relayQueueHandler] END")
}


/**
 * 元キューに溜まっているメッセージをすべて処理する.
 */
func relayQueueAllHandler(w http.ResponseWriter, r *http.Request) {

    printInfo("[relayQueueAllHandler] START")

    // キューのアカウント情報等を取得
    accountName, accountKey, baseQueue, queueNames := getRelayQueueInfo()

    // メッセージのコピーを実行
    retryRelayQueue(accountName, accountKey, baseQueue, queueNames)

    printInfo("[relayQueueAllHandler] END")

    w.Header().Set("Content-Type", "application/json")
    w.Write([]byte("{\"message\": \"relayQueue\"}"))
}


/**
 * 有害キューに溜まっているメッセージをすべて処理する.
 */
func relayPoisonQueueHandler(w http.ResponseWriter, r *http.Request) {

    printInfo("[relayPoisonQueueHandler] START")

    // キューのアカウント情報等を取得
    accountName, accountKey, baseQueue, queueNames := getRelayQueueInfo()
    poisonQueue := fmt.Sprintf("%s-poison", baseQueue)

    // メッセージのコピーを実行
    retryRelayQueue(accountName, accountKey, poisonQueue, queueNames)

    printInfo("[relayPoisonQueueHandler] END")

    w.Header().Set("Content-Type", "application/json")
    w.Write([]byte("{\"message\": \"relayQueue\"}"))
}


/**
 * キューに溜まっているメッセージをリレー先キューにコピーする.
 */
func retryRelayQueue(accountName string, accountKey string, targetQueue string, queueNames []string) {

    printInfo("[retryRelayQueue] START")

    // キューURLの取得
    credential, err := azqueue.NewSharedKeyCredential(accountName, accountKey)
    if err != nil {
        panic(fmt.Sprintf("NewSharedKeyCredential error: %v", err))
    }
    queueUrlFormat := getQueueUrlFormat(accountName)
    u, _ := url.Parse(fmt.Sprintf(queueUrlFormat, accountName, targetQueue))
    queueUrl := azqueue.NewQueueURL(*u, azqueue.NewPipeline(credential, azqueue.PipelineOptions{}))

    // メッセージ一覧を取得
    queueCtx := context.TODO()
    maxMessages := int32(32)               // 最大件数
    visibilityTimeout := time.Second * 10  // 可視性タイムアウト
    msgUrl := queueUrl.NewMessagesURL()
    dequeueResp, err := msgUrl.Dequeue(queueCtx, maxMessages, visibilityTimeout)
    if err != nil {
        panic(err)
    } else {

        // 残っているメッセージを全て処理
        for i := int32(0); i < dequeueResp.NumMessages(); i++ {

            // リレー用のキューにコピー
            msg := dequeueResp.Message(i)

            // メッセージをいったんBASE64デコード
            // (元メッセージがエンコードされていないケースも想定してデコード&エンコードし直しておく)
            planTextMessage := ""
            messageBytes, err := base64.StdEncoding.DecodeString(msg.Text)
            if err != nil {
                planTextMessage = msg.Text
            } else {
                planTextMessage = removeExcessEscape(string(messageBytes))
            }
            printDebug("queueMessage: %s", msg.Text)
            printDebug("textMessage : %s", planTextMessage)

            // メッセージを再度BASE64エンコード
            encodedMessage := base64.StdEncoding.EncodeToString([]byte(planTextMessage))

            // メッセージ(JSON)をさらに構造体に変換する場合
            //var queueData map[string]interface{}
            //decodeErr := json.Unmarshal([]byte(planTextMessage), &queueData)
            //if decodeErr != nil {
            //    printError("json decode error: %v",decodeErr.Error())
            //    panic(decodeErr)
            //}
            //printDebug("%v", queueData)
            //message := queueData["message"].(string)
            //printDebug("copy to %v, queueItem: %s, message: %s", queueNames, queueItem, message)

            // メッセージを対象のキューにコピー
            for _, queueName := range queueNames {
                printDebug("copy to %s, queueItem: %s", queueName, planTextMessage)
                relayQueueMessage(accountName, accountKey, queueName, encodedMessage)
            }

            // 元キューからメッセージを削除
            msgIdUrl := msgUrl.NewMessageIDURL(msg.ID)
            _, err = msgIdUrl.Delete(queueCtx, msg.PopReceipt)
            if err != nil {
                printError("Error delete message. %s, %v", planTextMessage, err)
            } else {
                printInfo("Success delete message %s", planTextMessage)
            }
        }
    }

    printInfo("[retryRelayQueue] END")
}

/**
 * メイン処理.
 */
func main() {
    httpInvokerPort, exists := os.LookupEnv("FUNCTIONS_HTTPWORKER_PORT")
    if exists {
        printInfo("FUNCTIONS_HTTPWORKER_PORT: " + httpInvokerPort)
    }
    mux := http.NewServeMux()
    mux.HandleFunc("/RelayQueue", relayQueueHandler)
    mux.HandleFunc("/RelayQueueAll", relayQueueAllHandler)
    mux.HandleFunc("/RelayPoisonQueue", relayPoisonQueueHandler)
    log.Println("Go server Listening...on httpInvokerPort:", httpInvokerPort)
    log.Fatal(http.ListenAndServe(":"+httpInvokerPort, mux))
}
}}

#html(</div>)

#html(<div id="tabs1-4">)

RelayQueue/function.json
#mycode2(){{
{
  "bindings": [
    {   
      "name": "queueItem",
      "type": "queueTrigger",
      "direction": "in",
      "queueName": "my-base-queue",
      "connection": "AzureWebJobsStorage"
    },  
    {   
      "name": "relayqueue1",
      "type": "queue",
      "direction": "out",
      "queueName": "my-queue1",
      "connection": "AzureWebJobsStorage"    
    },  
    {   
      "name": "relayqueue2",
      "type": "queue",
      "direction": "out",
      "queueName": "my-queue2",
      "connection": "AzureWebJobsStorage"    
    }   
  ]
}
}}

#html(</div>)

#html(<div id="tabs1-5">)

元キューの定期処理用の定義。ここでは 12時間間隔とした。

RelayQueueAll/function.json
#mycode2(){{
{
  "bindings": [
    {   
      "name": "myTimer",
      "type": "timerTrigger",
      "direction": "in",
      "schedule": "0 0 */12 * * *",
      "useMonitor": true
    }   
  ]
}
}}

#html(</div>)


#html(<div id="tabs1-6">)

有害キューの定期処理用の定義。ここではテスト用に1分間隔とした。

RelayPoisonQueue/function.json
#mycode2(){{
{
  "bindings": [
    {   
      "name": "myTimer",
      "type": "timerTrigger",
      "direction": "in",
      "schedule": "0 * * * * *",
      "useMonitor": true
    }   
  ]
}
}}

#html(</div>)


#html(</div>)
// END tabs1

#html(<script>$(function() { $("#tabs1").tabs(); });</script>)

#html(</div>)


* デプロイ/動作確認用シェル [#zaed8683]
#html(<div class="pl10">)

// START tabs2
#html(){{
<div id="tabs2">
  <ul>
    <li><a href="#tabs2-1">0_env.sh</a></li>
    <li><a href="#tabs2-2">1_resources.sh</a></li>
    <li><a href="#tabs2-3">2_put_message.sh</a></li>
    <li><a href="#tabs2-4">X1_local_start.sh</a></li>
    <li><a href="#tabs2-5">X2_local_message_put.sh</a></li>
    <li><a href="#tabs2-6">X3_local_message_peek.sh</a></li>
    <li><a href="#tabs2-7">X4_local_message_clear.sh</a></li>
  </ul>
}}

#html(<div id="tabs2-1">)

0_env.sh
#mycode2(){{
#!/bin/bash

# 全てのリソース名に付与する接頭文字 (Storageアカウント名などは世界でユニークな必要があるので他ユーザと被らないような名前を付ける)
PREFIX=xxxxxxxxxx

# サブスクリプションID (Application Insight を使用しない場合は空でも可)
subscriptionId=xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx

# リージョン
region=japanwest
insightsRegion=japaneast # 2020/7時点では Appplication Insights で西日本(japanwest)は使用できない

# リソースグループ名
resourceGroup=${PREFIX}ResourceGroup

# ストレージアカウント名
storageAccountName=${PREFIX}straccount
storageSku=Standard_LRS

# Storageキュー名
storageQueue0=my-base-queue
storageQueue1=my-queue1
storageQueue2=my-queue2

# 関数アプリ名
funcAppName1=${PREFIX}RelayFunc
funcAppName2=${PREFIX}Queue1Func

# 使用するFunctionsのバージョン
funcVersion=2

# 関数アプリのプラン名
funcPlanName=${PREFIX}plan
#funcPlanSku=EP1
funcPlanSku=B1

# Application insights
insightsName=${PREFIX}Insights
insightsDays=30
insightsSetting=""
if [ "$subscriptionId" != "" ]; then
  insightsSetting="--app-insights $insightsName"
fi
}}

#html(</div>)

#html(<div id="tabs2-2">)

1_resources.sh
#mycode2(){{
#!/bin/bash

source ./0_env.sh

# リソースの作成
if [ "$1" == "--create" ]; then

    #-------------------------------
    # リソースグループの作成
    #-------------------------------
    echo az group create
    az group create \
      --name $resourceGroup \
      --location $region

    #-------------------------------
    # ストレージアカウントの作成
    #-------------------------------
    echo az storage account create
    az storage account create \
      --name $storageAccountName \
      --location $region \
      --resource-group $resourceGroup \
      --sku $storageSku

    # ストレージアカウントキーを取得
    storageAccountKey=`az storage account keys list -n $storageAccountName -g $resourceGroup -o table | grep key1 | awk '{print $NF}'`

    #-------------------------------
    # Storageキューの作成
    # ※-poisonキューはエラー時に自動で作成されるが、先に作成しておいても問題ない。
    #-------------------------------
    echo "az storage queue create"
    az storage queue create --name $storageQueue0 --account-name $storageAccountName --account-key $storageAccountKey
    az storage queue create --name ${storageQueue0}-poison --account-name $storageAccountName --account-key $storageAccountKey
    az storage queue create --name $storageQueue1 --account-name $storageAccountName --account-key $storageAccountKey
    az storage queue create --name $storageQueue2 --account-name $storageAccountName --account-key $storageAccountKey

    #-------------------------------
    # Application Insights 拡張が利用できない場合は追加インストール
    #-------------------------------
    x=`az monitor app-insights --help 2>&1`
    if [ "$?" != "0" ]; then
      az extension add -n application-insights
    fi

    #-------------------------------
    # Application Insights コンポーネント作成
    #-------------------------------
    echo az monitor app-insights component create
    if [ "$subscriptionId" != "" ]; then
      az monitor app-insights component create \
          --app $insightsName \
          --location $insightsRegion \
          --resource-group $resourceGroup \
          --query-access Enabled \
          --retention-time $insightsDays \
          --subscription $subscriptionId
    fi

    #-------------------------------
    # キューリレー用の関数
    #-------------------------------

    # 関数プランの作成
    echo az functionapp plan create
    az functionapp plan create \
      --name $funcPlanName \
      --resource-group $resourceGroup \
      --location $region \
      --sku $funcPlanSku

    # 関数アプリの作成
    echo az functionapp create
    az functionapp create \
      --name $funcAppName1 \
      --storage-account $storageAccountName \
      --plan $funcPlanName \
      --resource-group $resourceGroup \
      --functions-version $funcVersion $insightsSetting

    # 関数アプリの環境変数の設定
    echo "az functionapp config appsettings"
    az functionapp config appsettings set \
        --name $funcAppName1 \
        --resource-group $resourceGroup \
        --settings "ACCOUNT_NAME=$storageAccountName" \
                   "ACCOUNT_KEY=$storageAccountKey" \
                   "BASE_QUEUE=$storageQueue0" \
                   "RELAY_QUEUE1=$storageQueue1" \
                   "RELAY_QUEUE2=$storageQueue2"

    # 関数アプリのデプロイ

    # すぐにデプロイするとエラー(Timeout)になる場合がある為、少し待つ
    echo "sleep 10 seconds..."
    sleep 10

    cd functions
    exefile=`cat host.json | grep defaultExecutablePath | awk '{print $2}' | sed 's/"//g'`
    rm -rf $exefile
    GOOS=windows GOARCH=amd64 go build -o $exefile
    zip -r ../functions.zip *
    cd ../
    az functionapp deployment source config-zip -g $resourceGroup -n $funcAppName1 --src functions.zip
    rm -rf functions.zip
fi

# リソースの削除
if [ "$1" == "--delete" ]; then
    az group delete --name $resourceGroup
fi
}}

#html(</div>)

#html(<div id="tabs2-3">)

2_put_message.sh
#mycode2(){{
#!/bin/bash

source ./0_env.sh

date_text=`date "+%Y-%m-%d %H:%M:%S"`
message=`cat << _EOF_
{"message": "$date_text"}
_EOF_
`
base64_message=`echo $message | base64`

# ストレージアカウントキーを取得
storageAccountKey=`az storage account keys list -n $storageAccountName -g $resourceGroup -o table | grep key1 | awk '{print $NF}'`

# キューにメッセージをPUT
az storage message put \
    --account-name $storageAccountName \
    --account-key "$storageAccountKey" \
    --queue-name $storageQueue0 \
    --content "$base64_message"
}}

#html(</div>)

#html(<div id="tabs2-4">)

X1_local_start.sh
#mycode2(){{
#!/bin/bash

source ./0_env.sh

# ローカルのストレージエミュレータ起動
mkdir -p local_azurite
azurite --silent --location `pwd`/local_azurite &

# ストレージキュー作成
export AZURE_STORAGE_CONNECTION_STRING="UseDevelopmentStorage=true"
az storage queue create --name $storageQueue0
az storage queue create --name ${storageQueue0}-poison
az storage queue create --name $storageQueue1
az storage queue create --name $storageQueue2

# 関数アプリの起動
cd ./functions
exefile=`cat host.json | grep defaultExecutablePath | awk '{print $2}' | sed 's/"//g'`
go build -o $exefile
func start
rm -rf $exefile

export AZURE_STORAGE_CONNECTION_STRING=
cd ../
}}

#html(</div>)

#html(<div id="tabs2-5">)

X2_local_message_put.sh
#mycode2(){{
#!/bin/bash

source ./0_env.sh

date_text=`date "+%Y-%m-%d %H:%M:%S"`
message=`cat << _EOF_
{"message": "$date_text"}
_EOF_
`

base64_message=`echo $message | base64`
if [ "$1" == "--error" ]; then
  # Base64エンコードせずに出力
  base64_message="$message"
fi

export AZURE_STORAGE_CONNECTION_STRING="UseDevelopmentStorage=true"
az storage message put \
    --queue-name $storageQueue0 \
    --content "$base64_message"

export AZURE_STORAGE_CONNECTION_STRING=
}}

#html(</div>)

#html(<div id="tabs2-6">)

X3_local_message_peek.sh
#mycode2(){{
#!/bin/bash

source ./0_env.sh

export AZURE_STORAGE_CONNECTION_STRING="UseDevelopmentStorage=true"

echo "-- $storageQueue0 --"
az storage message peek --queue-name $storageQueue0 -o table --num-messages 32
echo ""

echo "-- $storageQueue1 --"
az storage message peek --queue-name $storageQueue1 -o table --num-messages 32
echo ""

echo "-- $storageQueue2 --"
az storage message peek --queue-name $storageQueue2 -o table --num-messages 32
echo ""

echo "-- ${storageQueue0}-poison --"
az storage message peek --queue-name ${storageQueue0}-poison -o table --num-messages 32
echo ""

export AZURE_STORAGE_CONNECTION_STRING=
}}

#html(</div>)

#html(<div id="tabs2-7">)

X4_local_message_clear.sh
#mycode2(){{
#!/bin/bash

source ./0_env.sh

export AZURE_STORAGE_CONNECTION_STRING="UseDevelopmentStorage=true"

az storage message clear -q $storageQueue0
az storage message clear -q $storageQueue1
az storage message clear -q $storageQueue2
az storage message clear -q ${storageQueue0}-poison

export AZURE_STORAGE_CONNECTION_STRING=
}}

#html(</div>)


#html(</div>)
// END tabs2

#html(<script>$(function() { $("#tabs2").tabs(); });</script>)

#html(</div>)


* 動作確認(ローカル) [#l6d1d871]
#html(<div class="pl10">)

** ローカルで関数アプリ起動 [#ca017985]
#myterm2(){{
./X1_local_start.sh
}}

** キューにメッセージを出力 [#qb3bf412]
#myterm2(){{
./X2_local_message_put.sh
}}

キュー1 及び キュー2の内容をStorageExplorerで確認
#html(<div class="images">)
#ref(azure_queue_trigger_test1.png,nolink);
#html(</div>)
#html(<div class="images">)
#ref(azure_queue_trigger_test2.png,nolink);
#html(</div>)

** エラーになるメッセージをキューに出力してみる [#a27b5b19]

&color(red){''Queueトリガーで処理されるデータはBASE64エンコードされている必要があるらしい''};

#myterm2(){{
./X2_local_message_put.sh --error
}}

関数アプリのコンソールを確認してみると、エラーになってから、有害キューの処理関数で正常に処理できている模様。
#myterm2(){{
[2020/11/07 18:43:26] Executed 'Functions.RelayQueue' (Failed, Id=d3932930-eac9-4112-b565-824975810193)
[2020/11/07 18:43:26] System.Private.CoreLib: Exception while executing function: Functions.RelayQueue. Microsoft.Azure.WebJobs.Host: Exception binding parameter
 'queueItem'. System.Private.CoreLib: The input is not a valid Base-64 string as it contains a non-base 64 character, more than two padding characters, or an illegal character among the padding characters.
[2020/11/07 18:43:26] Executing 'Functions.RelayQueue' (Reason='New queue message detected on 'my-base-queue'.', Id=4b9c8787-c81f-4449-b0d6-80bab35c1579)
[2020/11/07 18:43:26] Trigger Details: MessageId: ad4910c8-7097-4c0b-9714-ce5ee64ff8eb, DequeueCount: 2, InsertionTime: 2020-11-07T18:43:24.000+00:00
[2020/11/07 18:43:26] Executed 'Functions.RelayQueue' (Failed, Id=4b9c8787-c81f-4449-b0d6-80bab35c1579)
[2020/11/07 18:43:26] System.Private.CoreLib: Exception while executing function: Functions.RelayQueue. Microsoft.Azure.WebJobs.Host: Exception binding parameter
 'queueItem'. System.Private.CoreLib: The input is not a valid Base-64 string as it contains a non-base 64 character, more than two padding characters, or an illegal character among the padding characters.
[2020/11/07 18:43:26] Executing 'Functions.RelayQueue' (Reason='New queue message detected on 'my-base-queue'.', Id=24dab509-29f6-483a-90ed-a9de475b80e3)
[2020/11/07 18:43:26] Trigger Details: MessageId: ad4910c8-7097-4c0b-9714-ce5ee64ff8eb, DequeueCount: 3, InsertionTime: 2020-11-07T18:43:24.000+00:00
[2020/11/07 18:43:26] Executed 'Functions.RelayQueue' (Failed, Id=24dab509-29f6-483a-90ed-a9de475b80e3)
[2020/11/07 18:43:26] System.Private.CoreLib: Exception while executing function: Functions.RelayQueue. Microsoft.Azure.WebJobs.Host: Exception binding parameter
 'queueItem'. System.Private.CoreLib: The input is not a valid Base-64 string as it contains a non-base 64 character, more than two padding characters, or an illegal character among the padding characters.
[2020/11/07 18:43:26] Executing 'Functions.RelayQueue' (Reason='New queue message detected on 'my-base-queue'.', Id=6f9410d1-67ff-4393-844c-0da005845167)
[2020/11/07 18:43:26] Trigger Details: MessageId: ad4910c8-7097-4c0b-9714-ce5ee64ff8eb, DequeueCount: 4, InsertionTime: 2020-11-07T18:43:24.000+00:00
[2020/11/07 18:43:26] Executed 'Functions.RelayQueue' (Failed, Id=6f9410d1-67ff-4393-844c-0da005845167)
[2020/11/07 18:43:26] System.Private.CoreLib: Exception while executing function: Functions.RelayQueue. Microsoft.Azure.WebJobs.Host: Exception binding parameter
 'queueItem'. System.Private.CoreLib: The input is not a valid Base-64 string as it contains a non-base 64 character, more than two padding characters, or an illegal character among the padding characters.
[2020/11/07 18:43:26] Executing 'Functions.RelayQueue' (Reason='New queue message detected on 'my-base-queue'.', Id=ddd65b87-2d5b-4a74-9a6e-a44d7b9075b4)
[2020/11/07 18:43:26] Trigger Details: MessageId: ad4910c8-7097-4c0b-9714-ce5ee64ff8eb, DequeueCount: 5, InsertionTime: 2020-11-07T18:43:24.000+00:00
[2020/11/07 18:43:26] Executed 'Functions.RelayQueue' (Failed, Id=ddd65b87-2d5b-4a74-9a6e-a44d7b9075b4)
[2020/11/07 18:43:26] System.Private.CoreLib: Exception while executing function: Functions.RelayQueue. Microsoft.Azure.WebJobs.Host: Exception binding parameter
 'queueItem'. System.Private.CoreLib: The input is not a valid Base-64 string as it contains a non-base 64 character, more than two padding characters, or an illegal character among the padding characters.
[2020/11/07 18:43:26] Message has reached MaxDequeueCount of 5. Moving message to queue 'my-base-queue-poison'.
 :
 :
[2020/11/07 18:44:00] Executing 'Functions.RelayPoisonQueue' (Reason='Timer fired at 2020-11-08T03:44:00.0050040+09:00', Id=2db55918-246c-4447-963f-c93a3185362e)
[2020/11/07 18:44:00] [INFO] [relayPoisonQueueHandler] START
[2020/11/07 18:44:00] [INFO] [retryRelayQueue] START
[2020/11/07 18:44:00] [DEBUG] queueMessage: {"message": "2020-11-08 03:43:23"}
[2020/11/07 18:44:00] [DEBUG] textMessage : {"message": "2020-11-08 03:43:23"}
[2020/11/07 18:44:00] [DEBUG] copy to my-queue1, queueItem: {"message": "2020-11-08 03:43:23"}
[2020/11/07 18:44:00] [INFO] Success relay queue. (my-queue1)
[2020/11/07 18:44:00] [DEBUG] copy to my-queue2, queueItem: {"message": "2020-11-08 03:43:23"}
[2020/11/07 18:44:00] [INFO] Success relay queue. (my-queue2)
[2020/11/07 18:44:00] [INFO] Success delete message {"message": "2020-11-08 03:43:23"}
[2020/11/07 18:44:00] [INFO] [retryRelayQueue] END
[2020/11/07 18:44:00] Executed 'Functions.RelayPoisonQueue' (Succeeded, Id=2db55918-246c-4447-963f-c93a3185362e)
[2020/11/07 18:44:00] [INFO] [relayPoisonQueueHandler] END
}}

キューの状態を確認してみると有害キューからメッセージがなくなって、リレー先のキューにメッセージがコピーされている事がわかる。

[有害キューが処理される前の状態]
#myterm2(){{
./X3_local_message_peek.sh 
-- my-base-queue --


-- my-queue1 --
MessageId                             Content                                                   InsertionTime              ExpirationTime             DequeueCount
------------------------------------  --------------------------------------------------------  -------------------------  -------------------------  --------------
3aeb0ea8-8a07-4168-9e0e-9264566bfdc0  IntcIm1lc3NhZ2VcIjogXCIyMDIwLTExLTA4IDAzOjQyOjA3XCJ9XG4i  2020-11-07T18:42:08+00:00  2020-11-14T18:42:08+00:00  0
7b8c9fc3-561d-4bde-a669-d696e0098d2e  eyJtZXNzYWdlIjogIjIwMjAtMTEtMDggMDM6NDI6MTQifQo=          2020-11-07T18:43:00+00:00  2020-11-14T18:43:00+00:00  0

-- my-queue2 --
MessageId                             Content                                                   InsertionTime              ExpirationTime             DequeueCount
------------------------------------  --------------------------------------------------------  -------------------------  -------------------------  --------------
1991cce6-8783-4823-848d-18078700ca8b  IntcIm1lc3NhZ2VcIjogXCIyMDIwLTExLTA4IDAzOjQyOjA3XCJ9XG4i  2020-11-07T18:42:08+00:00  2020-11-14T18:42:08+00:00  0
953147c2-0c83-4af3-b11b-af9b45c02b3b  eyJtZXNzYWdlIjogIjIwMjAtMTEtMDggMDM6NDI6MTQifQo=          2020-11-07T18:43:00+00:00  2020-11-14T18:43:00+00:00  0

-- my-base-queue-poison --
MessageId                             Content                             InsertionTime              ExpirationTime             DequeueCount
------------------------------------  ----------------------------------  -------------------------  -------------------------  --------------
61664ff4-5566-4722-a7e9-6b1b337cdd4d  {"message": "2020-11-08 03:43:23"}  2020-11-07T18:43:26+00:00  2020-11-14T18:43:26+00:00  0
}}

[有害キューが処理された後の状態]
#myterm2(){{
-- my-base-queue --


-- my-queue1 --
MessageId                             Content                                                   InsertionTime              ExpirationTime             DequeueCount
------------------------------------  --------------------------------------------------------  -------------------------  -------------------------  --------------
3aeb0ea8-8a07-4168-9e0e-9264566bfdc0  IntcIm1lc3NhZ2VcIjogXCIyMDIwLTExLTA4IDAzOjQyOjA3XCJ9XG4i  2020-11-07T18:42:08+00:00  2020-11-14T18:42:08+00:00  0
7b8c9fc3-561d-4bde-a669-d696e0098d2e  eyJtZXNzYWdlIjogIjIwMjAtMTEtMDggMDM6NDI6MTQifQo=          2020-11-07T18:43:00+00:00  2020-11-14T18:43:00+00:00  0
88a05f28-4db8-4ec3-ba96-6eaf6236caa7  eyJtZXNzYWdlIjogIjIwMjAtMTEtMDggMDM6NDM6MjMifQ==          2020-11-07T18:44:00+00:00  2020-11-14T18:44:00+00:00  0

-- my-queue2 --
MessageId                             Content                                                   InsertionTime              ExpirationTime             DequeueCount
------------------------------------  --------------------------------------------------------  -------------------------  -------------------------  --------------
1991cce6-8783-4823-848d-18078700ca8b  IntcIm1lc3NhZ2VcIjogXCIyMDIwLTExLTA4IDAzOjQyOjA3XCJ9XG4i  2020-11-07T18:42:08+00:00  2020-11-14T18:42:08+00:00  0
953147c2-0c83-4af3-b11b-af9b45c02b3b  eyJtZXNzYWdlIjogIjIwMjAtMTEtMDggMDM6NDI6MTQifQo=          2020-11-07T18:43:00+00:00  2020-11-14T18:43:00+00:00  0
25e2ec62-c380-4517-b31b-685c8ff7b165  eyJtZXNzYWdlIjogIjIwMjAtMTEtMDggMDM6NDM6MjMifQ==          2020-11-07T18:44:00+00:00  2020-11-14T18:44:00+00:00  0

-- my-base-queue-poison --

}}

#html(</div>)

* 動作確認(Azure) [#e422a399]
#html(<div class="pl10">)

** デプロイ [#u75a0b91]
#myterm2(){{
./1_resources.sh
}}

** キューにメッセージを出力 [#na17bcb3]
#myterm2(){{
./2_put_message.sh
}}

キュー1 及び キュー2の内容をStorageExplorerで確認(ローカルと同じなので省略)

#html(</div>)

トップ   差分 バックアップ リロード   一覧 単語検索 最終更新   ヘルプ   最終更新のRSS