目次 †概要 †Azure Event Grid の webhook で構築した環境で、正常にメッセージ配信が出来なかった時の対応について記載する。 エラーと判断されるのはいつか †200系以外のステータスコード以外は全て失敗とみなされて再試行が行われる。 再試行のスケジュール †既定で 24 時間以内 または 最大30回の再試行が行われる。 全ての再試行に失敗した場合の挙動 †全ての再試行に失敗した時は、配信できなかったイベントをストレージに保存する設定が出来る。 Azure Portal で設定する場合 †Azure CLI で設定する場合 †# ストレージID取得 ( /subscriptions/XXXXXXXXXXXXX/resourceGroups/XXXXXXXXXXXX/providers/Microsoft.Storage/storageAccounts/XXXXXXXXXX ) storageid=`az storage account show --name $storageAccountName --resource-group $resourceGroup --query id --output tsv` # ストレージアカウントをサブスクライブする az eventgrid event-subscription create \ --source-resource-id $storageid \ --name $event_subscription_name \ --endpoint $endpoint_url \ : # 配信できなかった時の保存先 --deadletter-endpoint $storageid/blobServices/default/containers/保存先のストレージコンテナ名 動作確認 †処理の変更 †Azure Event Grid の webhook で作成した処理を少し変更して、ファイル名の末尾が error.csv だった時はエラーステータスを返すようにする。 apiserver.go package main import ( "context" "encoding/json" "fmt" "io" "log" "net/http" "net/url" "os" "strings" "time" "github.com/Azure/azure-storage-blob-go/azblob" ) type InvokeResponse struct { File string } type InvokeRequest struct { Topic string Subject string EventType string EventTime string Id string Data map[string]interface{} DataVersion string MetadataVersion string } /** * 環境変数の取得. */ func getEnv(envName string, defaultValue string) string { value, exists := os.LookupEnv(envName) if exists { return value } else { return defaultValue } } func init(){ setLogfile(true) } func printDebug(format string, params ...interface{}){ setLogfile(false) msg := fmt.Sprintf(format, params...) log.Printf("[DEBUG] %s\n", msg) } func printInfo(format string, params ...interface{}){ setLogfile(false) msg := fmt.Sprintf(format, params...) log.Printf("[INFO] %s\n", msg) } func printError(format string, params ...interface{}){ setLogfile(false) msg := fmt.Sprintf(format, params...) log.Printf("[ERROR] %s\n", msg) } func fileExists(name string) bool { _, err := os.Stat(name) return !os.IsNotExist(err) } func setLogfile(init bool){ log.SetFlags(log.Ldate|log.Ltime|log.Lmicroseconds|log.Lshortfile) logDirPath := getEnv("LOG_DIR", "/tmp") logPrefix := getEnv("LOG_PREFIX", "apiserver_") logfilePath := fmt.Sprintf("%s/%s%s.log", logDirPath, logPrefix, time.Now().Format("2006-01-02")) if init || !fileExists(logfilePath) { newlogfile, err := os.OpenFile(logfilePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666) if err != nil { panic("cannnot open " + logfilePath + " : "+ err.Error()) } log.SetOutput(io.MultiWriter(newlogfile, os.Stdout)) printInfo("switch logfile: %s", logfilePath) } } func blobTriggerHandler(w http.ResponseWriter, r *http.Request) { printInfo("START blobTriggerHandler") var event InvokeRequest var fileUrl string defer func(){ err := recover() if fileUrl == "" { fileUrl = "unknown" } if err != nil { // ### エラー時は異常系のステータスを返すようにする ####################################### printError("Failure process %s , event: %v, error: %v", fileUrl, event, err) http.Error(w, fmt.Sprintf("%v",err), http.StatusInternalServerError) // ######################################################################### } else { printInfo("Success process %s", fileUrl) printInfo("END blobTriggerHandler") } }() //------------------------------------------------ // リクエストデータのパース //------------------------------------------------ var invokeReq []InvokeRequest d := json.NewDecoder(r.Body) decodeErr := d.Decode(&invokeReq) if decodeErr != nil { http.Error(w, decodeErr.Error(), http.StatusBadRequest) return } // イベントデータの取得 event = invokeReq[0] // エンドポイントの検証用リクエストの時は validationCode を返して終了(同期ハンドシェイク) if event.EventType == "Microsoft.EventGrid.SubscriptionValidationEvent" { fileUrl = "SubscriptionValidationEvent" w.Header().Set("Content-Type", "application/json") w.Write([]byte(fmt.Sprintf("{\"validationResponse\": \"%s\"}", event.Data["validationCode"]))) return } //------------------------------------------------ // ○○処理. //------------------------------------------------ fileUrl = fmt.Sprintf("%s", event.Data["url"]) sampleProc(fileUrl) //------------------------------------------------ // レスポンス生成 //------------------------------------------------ invokeResponse := InvokeResponse{File: fileUrl} js, err := json.Marshal(invokeResponse) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") w.Write(js) } /** * ○○処理. */ func sampleProc(fileUrl string) { // 対象のファイルURLをドメイン+コンテナ部分とファイル名部分に分ける urlParts := strings.Split(fileUrl, "/") containerUrlText := fmt.Sprintf("https://%s/%s", urlParts[2], urlParts[3]) rep := strings.NewReplacer(fmt.Sprintf("%s/",containerUrlText), "") fileName := rep.Replace(fileUrl) accountName := strings.Split(urlParts[2], ".")[0] containerName := urlParts[3] // ローカル実行用の環境変数がある場合はそちらを優先 localContainerUrl := getEnv("LOCAL_STORAGE_CONTAINER_URL", "") if localContainerUrl != "" { containerUrlText = fmt.Sprintf("%s/%s/%s", localContainerUrl, accountName, containerName) } printDebug("fileUrl: %s", fileUrl) printDebug("containerUrl: %s", containerUrlText) printDebug("fileName : %s", fileName) // ### ファイル名が error.csv の時は失敗させる ############ if strings.HasSuffix(fileName, "error.csv") { panic("error sample!") } // ########################################## // 対象のファイルURLを取得 keyEnvName := fmt.Sprintf("STORAGE_KEY_%s", accountName) accountKey := getEnv(keyEnvName, "") if accountKey == "" { panic(fmt.Sprintf("環境変数(%s)が設定されていません。", keyEnvName)) } ctx := context.Background() credential, err := azblob.NewSharedKeyCredential(accountName, accountKey) if err != nil { panic(fmt.Sprintf("Get Credential Error: %v", err)) } p := azblob.NewPipeline(credential, azblob.PipelineOptions{}) cURL, _ := url.Parse(containerUrlText) containerURL := azblob.NewContainerURL(*cURL, p) blobURL := containerURL.NewBlobURL(fileName) // サイズを取得 var blobSize int64 = 1024 blobPropResponse, err := blobURL.GetProperties(ctx, azblob.BlobAccessConditions{}) if err != nil { panic(fmt.Sprintf("Get Properties Error: %v", err)) } else { blobSize = blobPropResponse.ContentLength() } // ファイル内容をバッファに取得 downloadedData := make([]byte, blobSize) err = azblob.DownloadBlobToBuffer(ctx, blobURL, 0, azblob.CountToEnd, downloadedData, azblob.DownloadFromBlobOptions{}) if err != nil { panic(fmt.Sprintf("Download Error: %v", err)) } else { // ファイル内容を表示 printInfo("Download Success %s", fileUrl) printInfo(string(downloadedData)) } } /** * メイン. */ func main() { httpInvokerPort, exists := os.LookupEnv("HTTPWORKER_PORT") if ! exists { httpInvokerPort = "8000" } printInfo("HTTPWORKER_PORT: " + httpInvokerPort) mux := http.NewServeMux() mux.HandleFunc("/", blobTriggerHandler) printInfo("Go server Listening...on httpInvokerPort: %s", httpInvokerPort) log.Fatal(http.ListenAndServe(":"+httpInvokerPort, mux)) } 再試行ポリシーの変更 †いったん再試行ポリシーを10分間、最大5回 に変更し、配信できなかった時の保存先に適当なストレージコンテナを指定する。 ファイルのアップロード †# error.csv をアップロードする。 connstr=`az storage account show-connection-string --name ストレージアカウント名 -o table | tail -1` az storage blob upload -f error.csv -c ストレージコンテナ名 -n error.csv --connection-string "${connstr}" ログの確認 †サーバ側のログを確認してみると、設定した通り5回の試行が行われている。(再試行は4回) 2020/09/09 03:11:06.738022 apiserver.go:67: [INFO] START blobTriggerHandler 2020/09/09 03:11:06.738152 apiserver.go:60: [DEBUG] fileUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/error.csv 2020/09/09 03:11:06.738176 apiserver.go:60: [DEBUG] containerUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名 2020/09/09 03:11:06.738188 apiserver.go:60: [DEBUG] fileName : error.csv 2020/09/09 03:11:06.738258 apiserver.go:74: [ERROR] Failure process https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/error.csv , event: ... , error sample! 2020/09/09 03:11:16.741857 apiserver.go:67: [INFO] START blobTriggerHandler 2020/09/09 03:11:16.741980 apiserver.go:60: [DEBUG] fileUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/error.csv 2020/09/09 03:11:16.741993 apiserver.go:60: [DEBUG] containerUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名 2020/09/09 03:11:16.742012 apiserver.go:60: [DEBUG] fileName : error.csv 2020/09/09 03:11:16.742067 apiserver.go:74: [ERROR] Failure process https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/error.csv , event: ... , error sample! 2020/09/09 03:11:46.741888 apiserver.go:67: [INFO] START blobTriggerHandler 2020/09/09 03:11:46.742049 apiserver.go:60: [DEBUG] fileUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/error.csv 2020/09/09 03:11:46.742075 apiserver.go:60: [DEBUG] containerUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名 2020/09/09 03:11:46.742085 apiserver.go:60: [DEBUG] fileName : error.csv 2020/09/09 03:11:46.742147 apiserver.go:74: [ERROR] Failure process https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/error.csv , event: ... , error sample! 2020/09/09 03:12:09.818845 apiserver.go:67: [INFO] START blobTriggerHandler 2020/09/09 03:12:09.818978 apiserver.go:60: [DEBUG] fileUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/error.csv 2020/09/09 03:12:09.818992 apiserver.go:60: [DEBUG] containerUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名 2020/09/09 03:12:09.819000 apiserver.go:60: [DEBUG] fileName : error.csv 2020/09/09 03:12:09.819073 apiserver.go:74: [ERROR] Failure process https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/error.csv , event: ... , error sample! 2020/09/09 03:12:46.736035 apiserver.go:67: [INFO] START blobTriggerHandler 2020/09/09 03:12:46.736157 apiserver.go:60: [DEBUG] fileUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/error.csv 2020/09/09 03:12:46.736170 apiserver.go:60: [DEBUG] containerUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名 2020/09/09 03:12:46.736177 apiserver.go:60: [DEBUG] fileName : error.csv 2020/09/09 03:12:46.736228 apiserver.go:74: [ERROR] Failure process https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/error.csv , event: ... , error sample! ストレージアカウントに保存されたファイル †設定したストレージコンテナの以下のPATHに再試行に失敗したイベント情報のファイルが作成される。 /ストレージアカウント名/イベントサブスクリプション名/YYYY/M/D/H/XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX.json [ { "id": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxx, "eventTime": "2020-09-09T03:11:06.3440237Z", "eventType": "Microsoft.Storage.BlobCreated", "dataVersion": "", "metadataVersion": "1", "topic": "/subscriptions/xxxxxxxxxxxxxxx/resourceGroups/リソースグループ名/providers/Microsoft.Storage/storageAccounts/ストレージアカウント名", "subject": "/blobServices/default/containers/ストレージコンテナ名/blobs/error.csv", "deadLetterReason": "TimeToLiveExceeded", "deliveryAttempts": 4, "lastDeliveryOutcome": "GenericError", "lastHttpStatusCode": 500, "publishTime": "2020-09-09T03:11:06.7291287Z", "lastDeliveryAttemptTime": "2020-09-09T03:12:46.7329410Z", "data": { "api": "PutBlob", "clientRequestId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", "requestId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", "eTag": "0xXXXXXXXXXXXXXXX", "contentType": "text/csv", "contentLength": 60, "blobType": "BlockBlob", "url": "https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/error.csv", "sequencer": "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX", "storageDiagnostics": { "batchId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" } } } ] 何回目の配信(試行)かを判断するには †リクエストデータをダンプしてみた所 Aeg-Delivery-Count というリクエストヘッダに何回目の試行かがセットされいる模様。 注意)
: func blobTriggerHandler(w http.ResponseWriter, r *http.Request) { printInfo("START blobTriggerHandler") var event InvokeRequest var fileUrl string defer func(){ err := recover() if fileUrl == "" { fileUrl = "unknown" } if err != nil { printError("Failure process %s , event: %v, error: %v, request: %v", fileUrl, event, err, r) deliveryCount := r.Header.Get("Aeg-Delivery-Count") printError("deliveryCount: %s", deliveryCount) // Aeg-Delivery-Count ヘッダがない場合 または 5回目の配信の時 ( 0 から始まるので 5回目は 4 ) if deliveryCount == "" || deliveryCount == "4" { printError("Failed %s times delivery", deliveryCount) // メール通知など } printInfo("END blobTriggerHandler") http.Error(w, fmt.Sprintf("%v",err), http.StatusInternalServerError) } else { printInfo("Success process %s", fileUrl) printInfo("END blobTriggerHandler") } }() : 前述のコード何が問題か †このサンプルの Webhook は 自前のサーバ等に対してイベント送信を行っている為、当たり前だが サーバが常に起動している事が前提となる。 これを解決する1つの案として、前述のエラー情報が出力されるコンテナに Blobトリガーを設定して、そちらで通知のみを行う方法がある。 関数アプリの場合はどうか †Blobトリガーで起動される関数アプリの場合は、以下の通りとなる。
以下、キューに登録される内容 { "Type": "BlobTrigger", "FunctionId": "Host.Functions.関数名", "BlobType": "BlockBlob", "ContainerName": "ストレージコンテナ名", "BlobName": "ファイル名", "ETag": "\"0xXXXXXXXXXXXXXXX\"" } |