#mynavi(Azureメモ) #setlinebreak(on); #html(){{ <style> .images img { border: 1px solid #333;} </style> }} * 目次 [#j5b11eec] #contents - 関連 -- [[Azure Event Grid の webhook]] - 参考 -- [[Event Grid のメッセージの配信と再試行>https://docs.microsoft.com/ja-jp/azure/event-grid/delivery-and-retry]] * 概要 [#q01681e6] #html(<div class="pl10">) [[Azure Event Grid の webhook]] で構築した環境で、正常にメッセージ配信が出来なかった時の対応について記載する。 #html(</div>) * エラーと判断されるのはいつか [#b282fdd8] #html(<div class="pl10">) 200系以外のステータスコード以外は全て失敗とみなされて再試行が行われる。 ※ 参考: [[メッセージの配信状態>https://docs.microsoft.com/ja-jp/azure/event-grid/delivery-and-retry#message-delivery-status]] #html(</div>) * 再試行のスケジュール [#n6826ab0] #html(<div class="pl10">) 既定で 24 時間以内 または 最大30回の再試行が行われる。 ※参考: [[再試行のスケジュールと期間>https://docs.microsoft.com/ja-jp/azure/event-grid/delivery-and-retry#retry-schedule-and-duration]] #html(</div>) * 全ての再試行に失敗した場合の挙動 [#b78aecd4] #html(<div class="pl10">) 全ての再試行に失敗した時は、配信できなかったイベントをストレージに保存する設定が出来る。 ** Azure Portal で設定する場合 [#v347a6d6] #html(<div class="pl10">) #html(<div class="images">) &ref(webhook_retry_setting.png,nolink); #html(</div>) #html(</div>) ** Azure CLI で設定する場合 [#a32771d9] #html(<div class="pl10">) #myterm2(){{ # ストレージID取得 ( /subscriptions/XXXXXXXXXXXXX/resourceGroups/XXXXXXXXXXXX/providers/Microsoft.Storage/storageAccounts/XXXXXXXXXX ) storageid=`az storage account show --name $storageAccountName --resource-group $resourceGroup --query id --output tsv` # ストレージアカウントをサブスクライブする az eventgrid event-subscription create \ --source-resource-id $storageid \ --name $event_subscription_name \ --endpoint $endpoint_url \ : # 配信できなかった時の保存先 --deadletter-endpoint $storageid/blobServices/default/containers/保存先のストレージコンテナ名 }} #html(</div>) #html(</div>) * 動作確認 [#x5cd5e0c] #html(<div class="pl10">) ** 処理の変更 [#n1e5ca08] #html(<div class="pl10">) [[Azure Event Grid の webhook]] で作成した処理を少し変更して、ファイル名の末尾が error.csv だった時はエラーステータスを返すようにする。 apiserver.go #mycode2(){{ package main import ( "context" "encoding/json" "fmt" "io" "log" "net/http" "net/url" "os" "strings" "time" "github.com/Azure/azure-storage-blob-go/azblob" ) type InvokeResponse struct { File string } type InvokeRequest struct { Topic string Subject string EventType string EventTime string Id string Data map[string]interface{} DataVersion string MetadataVersion string } /** * 環境変数の取得. */ func getEnv(envName string, defaultValue string) string { value, exists := os.LookupEnv(envName) if exists { return value } else { return defaultValue } } func init(){ setLogfile(true) } func printDebug(format string, params ...interface{}){ setLogfile(false) msg := fmt.Sprintf(format, params...) log.Printf("[DEBUG] %s\n", msg) } func printInfo(format string, params ...interface{}){ setLogfile(false) msg := fmt.Sprintf(format, params...) log.Printf("[INFO] %s\n", msg) } func printError(format string, params ...interface{}){ setLogfile(false) msg := fmt.Sprintf(format, params...) log.Printf("[ERROR] %s\n", msg) } func fileExists(name string) bool { _, err := os.Stat(name) return !os.IsNotExist(err) } func setLogfile(init bool){ log.SetFlags(log.Ldate|log.Ltime|log.Lmicroseconds|log.Lshortfile) logDirPath := getEnv("LOG_DIR", "/tmp") logPrefix := getEnv("LOG_PREFIX", "apiserver_") logfilePath := fmt.Sprintf("%s/%s%s.log", logDirPath, logPrefix, time.Now().Format("2006-01-02")) if init || !fileExists(logfilePath) { newlogfile, err := os.OpenFile(logfilePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666) if err != nil { panic("cannnot open " + logfilePath + " : "+ err.Error()) } log.SetOutput(io.MultiWriter(newlogfile, os.Stdout)) printInfo("switch logfile: %s", logfilePath) } } func blobTriggerHandler(w http.ResponseWriter, r *http.Request) { printInfo("START blobTriggerHandler") var event InvokeRequest var fileUrl string defer func(){ err := recover() if fileUrl == "" { fileUrl = "unknown" } if err != nil { // ### エラー時は異常系のステータスを返すようにする ####################################### printError("Failure process %s , event: %v, error: %v", fileUrl, event, err) http.Error(w, fmt.Sprintf("%v",err), http.StatusInternalServerError) // ######################################################################### } else { printInfo("Success process %s", fileUrl) printInfo("END blobTriggerHandler") } }() //------------------------------------------------ // リクエストデータのパース //------------------------------------------------ var invokeReq []InvokeRequest d := json.NewDecoder(r.Body) decodeErr := d.Decode(&invokeReq) if decodeErr != nil { http.Error(w, decodeErr.Error(), http.StatusBadRequest) return } // イベントデータの取得 event = invokeReq[0] // エンドポイントの検証用リクエストの時は validationCode を返して終了(同期ハンドシェイク) if event.EventType == "Microsoft.EventGrid.SubscriptionValidationEvent" { fileUrl = "SubscriptionValidationEvent" w.Header().Set("Content-Type", "application/json") w.Write([]byte(fmt.Sprintf("{\"validationResponse\": \"%s\"}", event.Data["validationCode"]))) return } //------------------------------------------------ // ○○処理. //------------------------------------------------ fileUrl = fmt.Sprintf("%s", event.Data["url"]) sampleProc(fileUrl) //------------------------------------------------ // レスポンス生成 //------------------------------------------------ invokeResponse := InvokeResponse{File: fileUrl} js, err := json.Marshal(invokeResponse) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") w.Write(js) } /** * ○○処理. */ func sampleProc(fileUrl string) { // 対象のファイルURLをドメイン+コンテナ部分とファイル名部分に分ける urlParts := strings.Split(fileUrl, "/") containerUrlText := fmt.Sprintf("https://%s/%s", urlParts[2], urlParts[3]) rep := strings.NewReplacer(fmt.Sprintf("%s/",containerUrlText), "") fileName := rep.Replace(fileUrl) accountName := strings.Split(urlParts[2], ".")[0] containerName := urlParts[3] // ローカル実行用の環境変数がある場合はそちらを優先 localContainerUrl := getEnv("LOCAL_STORAGE_CONTAINER_URL", "") if localContainerUrl != "" { containerUrlText = fmt.Sprintf("%s/%s/%s", localContainerUrl, accountName, containerName) } printDebug("fileUrl: %s", fileUrl) printDebug("containerUrl: %s", containerUrlText) printDebug("fileName : %s", fileName) // ### ファイル名が error.csv の時は失敗させる ############ if strings.HasSuffix(fileName, "error.csv") { panic("error sample!") } // ########################################## // 対象のファイルURLを取得 keyEnvName := fmt.Sprintf("STORAGE_KEY_%s", accountName) accountKey := getEnv(keyEnvName, "") if accountKey == "" { panic(fmt.Sprintf("環境変数(%s)が設定されていません。", keyEnvName)) } ctx := context.Background() credential, err := azblob.NewSharedKeyCredential(accountName, accountKey) if err != nil { panic(fmt.Sprintf("Get Credential Error: %v", err)) } p := azblob.NewPipeline(credential, azblob.PipelineOptions{}) cURL, _ := url.Parse(containerUrlText) containerURL := azblob.NewContainerURL(*cURL, p) blobURL := containerURL.NewBlobURL(fileName) // サイズを取得 var blobSize int64 = 1024 blobPropResponse, err := blobURL.GetProperties(ctx, azblob.BlobAccessConditions{}) if err != nil { panic(fmt.Sprintf("Get Properties Error: %v", err)) } else { blobSize = blobPropResponse.ContentLength() } // ファイル内容をバッファに取得 downloadedData := make([]byte, blobSize) err = azblob.DownloadBlobToBuffer(ctx, blobURL, 0, azblob.CountToEnd, downloadedData, azblob.DownloadFromBlobOptions{}) if err != nil { panic(fmt.Sprintf("Download Error: %v", err)) } else { // ファイル内容を表示 printInfo("Download Success %s", fileUrl) printInfo(string(downloadedData)) } } /** * メイン. */ func main() { httpInvokerPort, exists := os.LookupEnv("HTTPWORKER_PORT") if ! exists { httpInvokerPort = "8000" } printInfo("HTTPWORKER_PORT: " + httpInvokerPort) mux := http.NewServeMux() mux.HandleFunc("/", blobTriggerHandler) printInfo("Go server Listening...on httpInvokerPort: %s", httpInvokerPort) log.Fatal(http.ListenAndServe(":"+httpInvokerPort, mux)) } }} #html(</div>) ** 再試行ポリシーの変更 [#n55054c5] #html(<div class="pl10">) いったん再試行ポリシーを10分間、最大5回 に変更し、配信できなかった時の保存先に適当なストレージコンテナを指定する。 #html(<div class="images">) &ref(webhook_retry_setting.png,nolink); #html(</div>) #html(</div>) ** ファイルのアップロード [#e0454778] #html(<div class="pl10">) #myterm2(){{ # error.csv をアップロードする。 connstr=`az storage account show-connection-string --name ストレージアカウント名 -o table | tail -1` az storage blob upload -f error.csv -c ストレージコンテナ名 -n error.csv --connection-string "${connstr}" }} #html(</div>) ** ログの確認 [#g9473b3d] #html(<div class="pl10">) サーバ側のログを確認してみると、設定した通り5回の試行が行われている。(再試行は4回) #myterm2(){{ 2020/09/09 03:11:06.738022 apiserver.go:67: [INFO] START blobTriggerHandler 2020/09/09 03:11:06.738152 apiserver.go:60: [DEBUG] fileUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/error.csv 2020/09/09 03:11:06.738176 apiserver.go:60: [DEBUG] containerUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名 2020/09/09 03:11:06.738188 apiserver.go:60: [DEBUG] fileName : error.csv 2020/09/09 03:11:06.738258 apiserver.go:74: [ERROR] Failure process https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/error.csv , event: ... , error sample! 2020/09/09 03:11:16.741857 apiserver.go:67: [INFO] START blobTriggerHandler 2020/09/09 03:11:16.741980 apiserver.go:60: [DEBUG] fileUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/error.csv 2020/09/09 03:11:16.741993 apiserver.go:60: [DEBUG] containerUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名 2020/09/09 03:11:16.742012 apiserver.go:60: [DEBUG] fileName : error.csv 2020/09/09 03:11:16.742067 apiserver.go:74: [ERROR] Failure process https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/error.csv , event: ... , error sample! 2020/09/09 03:11:46.741888 apiserver.go:67: [INFO] START blobTriggerHandler 2020/09/09 03:11:46.742049 apiserver.go:60: [DEBUG] fileUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/error.csv 2020/09/09 03:11:46.742075 apiserver.go:60: [DEBUG] containerUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名 2020/09/09 03:11:46.742085 apiserver.go:60: [DEBUG] fileName : error.csv 2020/09/09 03:11:46.742147 apiserver.go:74: [ERROR] Failure process https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/error.csv , event: ... , error sample! 2020/09/09 03:12:09.818845 apiserver.go:67: [INFO] START blobTriggerHandler 2020/09/09 03:12:09.818978 apiserver.go:60: [DEBUG] fileUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/error.csv 2020/09/09 03:12:09.818992 apiserver.go:60: [DEBUG] containerUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名 2020/09/09 03:12:09.819000 apiserver.go:60: [DEBUG] fileName : error.csv 2020/09/09 03:12:09.819073 apiserver.go:74: [ERROR] Failure process https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/error.csv , event: ... , error sample! 2020/09/09 03:12:46.736035 apiserver.go:67: [INFO] START blobTriggerHandler 2020/09/09 03:12:46.736157 apiserver.go:60: [DEBUG] fileUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/error.csv 2020/09/09 03:12:46.736170 apiserver.go:60: [DEBUG] containerUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名 2020/09/09 03:12:46.736177 apiserver.go:60: [DEBUG] fileName : error.csv 2020/09/09 03:12:46.736228 apiserver.go:74: [ERROR] Failure process https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/error.csv , event: ... , error sample! }} #html(</div>) ** ストレージアカウントに保存されたファイル [#b749ba6c] #html(<div class="pl10">) 設定したストレージコンテナの以下のPATHに再試行に失敗したイベント情報のファイルが作成される。 /ストレージアカウント名/イベントサブスクリプション名/YYYY/M/D/H/XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX.json #mycode2(){{ [ { "id": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxx, "eventTime": "2020-09-09T03:11:06.3440237Z", "eventType": "Microsoft.Storage.BlobCreated", "dataVersion": "", "metadataVersion": "1", "topic": "/subscriptions/xxxxxxxxxxxxxxx/resourceGroups/リソースグループ名/providers/Microsoft.Storage/storageAccounts/ストレージアカウント名", "subject": "/blobServices/default/containers/ストレージコンテナ名/blobs/error.csv", "deadLetterReason": "TimeToLiveExceeded", "deliveryAttempts": 4, "lastDeliveryOutcome": "GenericError", "lastHttpStatusCode": 500, "publishTime": "2020-09-09T03:11:06.7291287Z", "lastDeliveryAttemptTime": "2020-09-09T03:12:46.7329410Z", "data": { "api": "PutBlob", "clientRequestId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", "requestId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", "eTag": "0xXXXXXXXXXXXXXXX", "contentType": "text/csv", "contentLength": 60, "blobType": "BlockBlob", "url": "https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/error.csv", "sequencer": "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX", "storageDiagnostics": { "batchId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" } } } ] }} #html(</div>) #html(</div>) * 何回目の配信(試行)かを判断するには [#wb37cdc4] #html(<div class="pl10">) リクエストデータをダンプしてみた所 ''Aeg-Delivery-Count'' というリクエストヘッダに何回目の試行かがセットされいる模様。 なので、例えば 5 回目の試行でエラーだった時だけ、何かしらの処理(例えばメール通知など)を行いたい場合、以下のコードで判断できる。 ※ &color(red){''ただし、この方法には問題がある''}; (後述) 注意) - Aeg-Delivery-Count は 0 始まり。 - このヘッダについては [[webhookのドキュメント>https://docs.microsoft.com/ja-jp/azure/event-grid/handler-webhooks]] には記載されておらず、[[Event Hubsのドキュメント>https://docs.microsoft.com/ja-jp/azure/event-grid/handler-event-hubs]] に記載されている為、ヘッダがない可能性も考慮しておく必要があるか。 #mycode2(){{ : func blobTriggerHandler(w http.ResponseWriter, r *http.Request) { printInfo("START blobTriggerHandler") var event InvokeRequest var fileUrl string defer func(){ err := recover() if fileUrl == "" { fileUrl = "unknown" } if err != nil { printError("Failure process %s , event: %v, error: %v, request: %v", fileUrl, event, err, r) deliveryCount := r.Header.Get("Aeg-Delivery-Count") printError("deliveryCount: %s", deliveryCount) // Aeg-Delivery-Count ヘッダがない場合 または 5回目の配信の時 ( 0 から始まるので 5回目は 4 ) if deliveryCount == "" || deliveryCount == "4" { printError("Failed %s times delivery", deliveryCount) // メール通知など } printInfo("END blobTriggerHandler") http.Error(w, fmt.Sprintf("%v",err), http.StatusInternalServerError) } else { printInfo("Success process %s", fileUrl) printInfo("END blobTriggerHandler") } }() : }} #html(</div>) * 前述のコード何が問題か [#e2c05f01] #html(<div class="pl10">) このサンプルの Webhook は 自前のサーバ等に対してイベント送信を行っている為、当たり前だが サーバが常に起動している事が前提となる。 つまり、サーバ処理のデプロイや再起動、またはサーバ停止しているタイミングで hook された場合、処理自体が走らない可能性がある。 これを解決する1つの案として、前述のエラー情報が出力されるコンテナに Blobトリガーを設定して、そちらで通知のみを行う方法がある。 ※ [[Azure Functions の SLA>https://azure.microsoft.com/ja-jp/support/legal/sla/functions/v1_1/]] によると、それでも SLAは 99.95% との事だが。。 #html(<div class="images">) &ref(webhook_error_notice.png,nolink); #html(</div>) #html(</div>) ** 関数アプリの場合はどうか [#j5619420] * 関数アプリの場合はどうか [#j5619420] #html(<div class="pl10">) Blobトリガーで起動される関数アプリの場合は、以下の通りとなる。 - 試行回数が判断できるような http ヘッダは送信されない。 - 再試行は5回行われる。 - 5回の試行で全てエラーとなった場合は webjobs-blobtrigger-poison というストレージキューに情報が記録される。 以下、キューに登録される内容 #mycode2(){{ { "Type": "BlobTrigger", "FunctionId": "Host.Functions.関数名", "BlobType": "BlockBlob", "ContainerName": "ストレージコンテナ名", "BlobName": "ファイル名", "ETag": "\"0xXXXXXXXXXXXXXXX\"" } }} #html(</div>)