目次

概要

Azure Event Grid の webhook で構築した環境で、正常にメッセージ配信が出来なかった時の対応について記載する。

エラーと判断されるのはいつか

200系以外のステータスコード以外は全て失敗とみなされて再試行が行われる。
※ 参考: メッセージの配信状態

再試行のスケジュール

既定で 24 時間以内 または 最大30回の再試行が行われる。
※参考: 再試行のスケジュールと期間

全ての再試行に失敗した場合の挙動

全ての再試行に失敗した時は、配信できなかったイベントをストレージに保存する設定が出来る。

Azure Portal で設定する場合

webhook_retry_setting.png

Azure CLI で設定する場合

# ストレージID取得 ( /subscriptions/XXXXXXXXXXXXX/resourceGroups/XXXXXXXXXXXX/providers/Microsoft.Storage/storageAccounts/XXXXXXXXXX )
storageid=`az storage account show --name $storageAccountName --resource-group $resourceGroup --query id --output tsv`

# ストレージアカウントをサブスクライブする
az eventgrid event-subscription create \
  --source-resource-id $storageid \
  --name $event_subscription_name \
  --endpoint $endpoint_url \
   :
  # 配信できなかった時の保存先
  --deadletter-endpoint $storageid/blobServices/default/containers/保存先のストレージコンテナ名

動作確認

処理の変更

Azure Event Grid の webhook で作成した処理を少し変更して、ファイル名の末尾が error.csv だった時はエラーステータスを返すようにする。

apiserver.go

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "io"
    "log"
    "net/http"
    "net/url"
    "os"
    "strings"
    "time"

    "github.com/Azure/azure-storage-blob-go/azblob"
)

type InvokeResponse struct {
    File string
}

type InvokeRequest struct {
    Topic           string
    Subject         string
    EventType       string
    EventTime       string
    Id              string
    Data            map[string]interface{}
    DataVersion     string
    MetadataVersion string
}

/**
 * 環境変数の取得.
 */
func getEnv(envName string, defaultValue string) string {
    value, exists := os.LookupEnv(envName)
    if exists {
        return value
    } else {
        return defaultValue
    }
}

func init(){
    setLogfile(true)
}

func printDebug(format string, params ...interface{}){
    setLogfile(false)
    msg := fmt.Sprintf(format, params...)
    log.Printf("[DEBUG] %s\n", msg)
}

func printInfo(format string, params ...interface{}){
    setLogfile(false)
    msg := fmt.Sprintf(format, params...)
    log.Printf("[INFO] %s\n", msg)
}

func printError(format string, params ...interface{}){
    setLogfile(false)
    msg := fmt.Sprintf(format, params...)
    log.Printf("[ERROR] %s\n", msg)
}

func fileExists(name string) bool {
    _, err := os.Stat(name)
    return !os.IsNotExist(err)
}

func setLogfile(init bool){
    log.SetFlags(log.Ldate|log.Ltime|log.Lmicroseconds|log.Lshortfile)
    logDirPath := getEnv("LOG_DIR", "/tmp")
    logPrefix  := getEnv("LOG_PREFIX", "apiserver_")
    logfilePath := fmt.Sprintf("%s/%s%s.log", logDirPath, logPrefix, time.Now().Format("2006-01-02"))
    if init || !fileExists(logfilePath) {
        newlogfile, err := os.OpenFile(logfilePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666)
        if err != nil {
            panic("cannnot open " + logfilePath + " : "+ err.Error())
        }
        log.SetOutput(io.MultiWriter(newlogfile, os.Stdout))
        printInfo("switch logfile: %s", logfilePath)
    }
}

func blobTriggerHandler(w http.ResponseWriter, r *http.Request) {

    printInfo("START blobTriggerHandler")

    var event InvokeRequest
    var fileUrl string

    defer func(){
        err := recover()
        if fileUrl == "" {
            fileUrl = "unknown"
        }
        if err != nil {
            // ### エラー時は異常系のステータスを返すようにする #######################################
            printError("Failure process %s , event: %v, error: %v", fileUrl, event, err)
            http.Error(w, fmt.Sprintf("%v",err), http.StatusInternalServerError)
            // #########################################################################
        } else {
            printInfo("Success process %s", fileUrl)
            printInfo("END blobTriggerHandler")
        }
    }()

    //------------------------------------------------
    // リクエストデータのパース
    //------------------------------------------------
    var invokeReq []InvokeRequest
    d := json.NewDecoder(r.Body)
    decodeErr := d.Decode(&invokeReq)
    if decodeErr != nil {
        http.Error(w, decodeErr.Error(), http.StatusBadRequest)
        return
    }

    // イベントデータの取得
    event = invokeReq[0]

    // エンドポイントの検証用リクエストの時は validationCode を返して終了(同期ハンドシェイク)
    if event.EventType == "Microsoft.EventGrid.SubscriptionValidationEvent" {
        fileUrl = "SubscriptionValidationEvent"
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(fmt.Sprintf("{\"validationResponse\": \"%s\"}", event.Data["validationCode"])))
        return
    }

    //------------------------------------------------
    // ○○処理.
    //------------------------------------------------
    fileUrl = fmt.Sprintf("%s", event.Data["url"])
    sampleProc(fileUrl)

    //------------------------------------------------
    // レスポンス生成
    //------------------------------------------------
    invokeResponse := InvokeResponse{File: fileUrl}
    js, err := json.Marshal(invokeResponse)
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    w.Header().Set("Content-Type", "application/json")
    w.Write(js)
}

/**
 * ○○処理.
 */
func sampleProc(fileUrl string) {

    // 対象のファイルURLをドメイン+コンテナ部分とファイル名部分に分ける
    urlParts := strings.Split(fileUrl, "/")
    containerUrlText := fmt.Sprintf("https://%s/%s", urlParts[2], urlParts[3])
    rep := strings.NewReplacer(fmt.Sprintf("%s/",containerUrlText), "")
    fileName    := rep.Replace(fileUrl)
    accountName := strings.Split(urlParts[2], ".")[0]
    containerName := urlParts[3]

    // ローカル実行用の環境変数がある場合はそちらを優先
    localContainerUrl := getEnv("LOCAL_STORAGE_CONTAINER_URL", "")
    if localContainerUrl != "" {
        containerUrlText = fmt.Sprintf("%s/%s/%s", localContainerUrl, accountName, containerName)
    }

    printDebug("fileUrl: %s", fileUrl)
    printDebug("containerUrl: %s", containerUrlText)
    printDebug("fileName    : %s", fileName)

    // ### ファイル名が error.csv の時は失敗させる ############
    if strings.HasSuffix(fileName, "error.csv") {
        panic("error sample!")
    }
    // ##########################################

    // 対象のファイルURLを取得
    keyEnvName  := fmt.Sprintf("STORAGE_KEY_%s", accountName)
    accountKey  := getEnv(keyEnvName, "")
    if accountKey == "" {
        panic(fmt.Sprintf("環境変数(%s)が設定されていません。", keyEnvName))
    }

    ctx := context.Background()
    credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
    if err != nil {
        panic(fmt.Sprintf("Get Credential Error: %v", err))
    }

    p := azblob.NewPipeline(credential, azblob.PipelineOptions{})
    cURL, _ := url.Parse(containerUrlText)
    containerURL := azblob.NewContainerURL(*cURL, p)
    blobURL := containerURL.NewBlobURL(fileName)

    // サイズを取得
    var blobSize int64 = 1024
    blobPropResponse, err := blobURL.GetProperties(ctx, azblob.BlobAccessConditions{})
    if err != nil {
        panic(fmt.Sprintf("Get Properties Error: %v", err))
    } else {
        blobSize = blobPropResponse.ContentLength()
    }

    // ファイル内容をバッファに取得
    downloadedData := make([]byte, blobSize)
    err = azblob.DownloadBlobToBuffer(ctx, blobURL, 0, azblob.CountToEnd, downloadedData, azblob.DownloadFromBlobOptions{})
    if err != nil {
        panic(fmt.Sprintf("Download Error: %v", err))
    } else {
        // ファイル内容を表示
        printInfo("Download Success %s", fileUrl)
        printInfo(string(downloadedData))
    }
}

/**
 * メイン.
 */
func main() {
    httpInvokerPort, exists := os.LookupEnv("HTTPWORKER_PORT")
    if ! exists {
        httpInvokerPort = "8000"
    }
    printInfo("HTTPWORKER_PORT: " + httpInvokerPort)
    mux := http.NewServeMux()
    mux.HandleFunc("/", blobTriggerHandler)
    printInfo("Go server Listening...on httpInvokerPort: %s", httpInvokerPort)
    log.Fatal(http.ListenAndServe(":"+httpInvokerPort, mux))
}

再試行ポリシーの変更

いったん再試行ポリシーを10分間、最大5回 に変更し、配信できなかった時の保存先に適当なストレージコンテナを指定する。

webhook_retry_setting.png

ファイルのアップロード

# error.csv をアップロードする。
connstr=`az storage account show-connection-string --name ストレージアカウント名 -o table | tail -1`
az storage blob upload -f error.csv -c ストレージコンテナ名 -n error.csv --connection-string "${connstr}"

ログの確認

サーバ側のログを確認してみると、設定した通り5回の試行が行われている。(再試行は4回)

2020/09/09 03:11:06.738022 apiserver.go:67: [INFO] START blobTriggerHandler
2020/09/09 03:11:06.738152 apiserver.go:60: [DEBUG] fileUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/error.csv
2020/09/09 03:11:06.738176 apiserver.go:60: [DEBUG] containerUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名
2020/09/09 03:11:06.738188 apiserver.go:60: [DEBUG] fileName    : error.csv
2020/09/09 03:11:06.738258 apiserver.go:74: [ERROR] Failure process https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/error.csv , event: ... , error sample!
2020/09/09 03:11:16.741857 apiserver.go:67: [INFO] START blobTriggerHandler
2020/09/09 03:11:16.741980 apiserver.go:60: [DEBUG] fileUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/error.csv
2020/09/09 03:11:16.741993 apiserver.go:60: [DEBUG] containerUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名
2020/09/09 03:11:16.742012 apiserver.go:60: [DEBUG] fileName    : error.csv
2020/09/09 03:11:16.742067 apiserver.go:74: [ERROR] Failure process https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/error.csv , event: ... , error sample!
2020/09/09 03:11:46.741888 apiserver.go:67: [INFO] START blobTriggerHandler
2020/09/09 03:11:46.742049 apiserver.go:60: [DEBUG] fileUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/error.csv
2020/09/09 03:11:46.742075 apiserver.go:60: [DEBUG] containerUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名
2020/09/09 03:11:46.742085 apiserver.go:60: [DEBUG] fileName    : error.csv
2020/09/09 03:11:46.742147 apiserver.go:74: [ERROR] Failure process https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/error.csv , event: ... , error sample!
2020/09/09 03:12:09.818845 apiserver.go:67: [INFO] START blobTriggerHandler
2020/09/09 03:12:09.818978 apiserver.go:60: [DEBUG] fileUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/error.csv
2020/09/09 03:12:09.818992 apiserver.go:60: [DEBUG] containerUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名
2020/09/09 03:12:09.819000 apiserver.go:60: [DEBUG] fileName    : error.csv
2020/09/09 03:12:09.819073 apiserver.go:74: [ERROR] Failure process https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/error.csv , event: ... , error sample!
2020/09/09 03:12:46.736035 apiserver.go:67: [INFO] START blobTriggerHandler
2020/09/09 03:12:46.736157 apiserver.go:60: [DEBUG] fileUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/error.csv
2020/09/09 03:12:46.736170 apiserver.go:60: [DEBUG] containerUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名
2020/09/09 03:12:46.736177 apiserver.go:60: [DEBUG] fileName    : error.csv
2020/09/09 03:12:46.736228 apiserver.go:74: [ERROR] Failure process https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/error.csv , event: ... , error sample!

ストレージアカウントに保存されたファイル

設定したストレージコンテナの以下のPATHに再試行に失敗したイベント情報のファイルが作成される。

/ストレージアカウント名/イベントサブスクリプション名/YYYY/M/D/H/XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX.json

[
  {
    "id": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxx,
    "eventTime": "2020-09-09T03:11:06.3440237Z",
    "eventType": "Microsoft.Storage.BlobCreated",
    "dataVersion": "",
    "metadataVersion": "1",
    "topic": "/subscriptions/xxxxxxxxxxxxxxx/resourceGroups/リソースグループ名/providers/Microsoft.Storage/storageAccounts/ストレージアカウント名",
    "subject": "/blobServices/default/containers/ストレージコンテナ名/blobs/error.csv",
    "deadLetterReason": "TimeToLiveExceeded",
    "deliveryAttempts": 4,
    "lastDeliveryOutcome": "GenericError",
    "lastHttpStatusCode": 500,
    "publishTime": "2020-09-09T03:11:06.7291287Z",
    "lastDeliveryAttemptTime": "2020-09-09T03:12:46.7329410Z",
    "data": {
      "api": "PutBlob",
      "clientRequestId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
      "requestId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
      "eTag": "0xXXXXXXXXXXXXXXX",
      "contentType": "text/csv",
      "contentLength": 60,
      "blobType": "BlockBlob",
      "url": "https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/error.csv",
      "sequencer": "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
      "storageDiagnostics": {
        "batchId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
      }
    }
  }
]

何回目の配信(試行)かを判断するには

リクエストデータをダンプしてみた所 Aeg-Delivery-Count というリクエストヘッダに何回目の試行かがセットされいる模様。
なので、例えば 5 回目の試行でエラーだった時だけ、何かしらの処理(例えばメール通知など)を行いたい場合、以下のコードで判断できる。
ただし、この方法には問題がある (後述)

注意)

  :
func blobTriggerHandler(w http.ResponseWriter, r *http.Request) {

    printInfo("START blobTriggerHandler")

    var event InvokeRequest
    var fileUrl string

    defer func(){
        err := recover()
        if fileUrl == "" {
            fileUrl = "unknown"
        }
        if err != nil {
            printError("Failure process %s , event: %v, error: %v, request: %v", fileUrl, event, err, r)
            deliveryCount := r.Header.Get("Aeg-Delivery-Count")
            printError("deliveryCount: %s", deliveryCount)

            // Aeg-Delivery-Count ヘッダがない場合 または 5回目の配信の時 ( 0 から始まるので 5回目は 4 )
            if deliveryCount == "" || deliveryCount == "4" {
                printError("Failed %s times delivery", deliveryCount)
                // メール通知など
            }
            printInfo("END blobTriggerHandler")
            http.Error(w, fmt.Sprintf("%v",err), http.StatusInternalServerError)
        } else {
            printInfo("Success process %s", fileUrl)
            printInfo("END blobTriggerHandler")
        }
    }()

    :

前述のコード何が問題か

このサンプルの Webhook は 自前のサーバ等に対してイベント送信を行っている為、当たり前だが サーバが常に起動している事が前提となる。
つまり、サーバ処理のデプロイや再起動、またはサーバ停止しているタイミングで hook された場合、処理自体が走らない可能性がある。

これを解決する1つの案として、前述のエラー情報が出力されるコンテナに Blobトリガーを設定して、そちらで通知のみを行う方法がある。
Azure Functions の SLA によると、それでも SLAは 99.95% との事だが。。

webhook_error_notice.png

関数アプリの場合はどうか

Blobトリガーで起動される関数アプリの場合は、以下の通りとなる。

  • 試行回数が判断できるような http ヘッダは送信されない。
  • 再試行は5回行われる。
  • 5回の試行で全てエラーとなった場合は webjobs-blobtrigger-poison というストレージキューに情報が記録される。

以下、キューに登録される内容

{
  "Type": "BlobTrigger",
  "FunctionId": "Host.Functions.関数名",
  "BlobType": "BlockBlob",
  "ContainerName": "ストレージコンテナ名",
  "BlobName": "ファイル名",
  "ETag": "\"0xXXXXXXXXXXXXXXX\""
}

添付ファイル: filewebhook_error_notice.png 262件 [詳細] filewebhook_retry_setting.png 327件 [詳細]

トップ   差分 バックアップ リロード   一覧 単語検索 最終更新   ヘルプ   最終更新のRSS
Last-modified: 2020-09-10 (木) 08:14:06 (1463d)