- 追加された行はこの色です。
- 削除された行はこの色です。
#author("2020-09-09T23:46:40+00:00","","")
#mynavi(Azureメモ)
#setlinebreak(on);
#html(){{
<style>
.images img { border: 1px solid #333;}
</style>
}}
* 目次 [#j5b11eec]
#contents
- 関連
-- [[Azure Event Grid の webhook]]
- 参考
-- [[Event Grid のメッセージの配信と再試行>https://docs.microsoft.com/ja-jp/azure/event-grid/delivery-and-retry]]
* 概要 [#q01681e6]
#html(<div class="pl10">)
[[Azure Event Grid の webhook]] で構築した環境で、正常にメッセージ配信が出来なかった時の対応について記載する。
#html(</div>)
* エラーと判断されるのはいつか [#b282fdd8]
#html(<div class="pl10">)
200系以外のステータスコード以外は全て失敗とみなされて再試行が行われる。
※ 参考: [[メッセージの配信状態>https://docs.microsoft.com/ja-jp/azure/event-grid/delivery-and-retry#message-delivery-status]]
#html(</div>)
* 再試行のスケジュール [#n6826ab0]
#html(<div class="pl10">)
既定で 24 時間以内 または 最大30回の再試行が行われる。
※参考: [[再試行のスケジュールと期間>https://docs.microsoft.com/ja-jp/azure/event-grid/delivery-and-retry#retry-schedule-and-duration]]
#html(</div>)
* 全ての再試行に失敗した場合の挙動 [#b78aecd4]
#html(<div class="pl10">)
全ての再試行に失敗した時は、配信できなかったイベントをストレージに保存する設定が出来る。
** Azure Portal で設定する場合 [#v347a6d6]
#html(<div class="pl10">)
#html(<div class="images">)
&ref(webhook_retry_setting.png,nolink);
#html(</div>)
#html(</div>)
** Azure CLI で設定する場合 [#a32771d9]
#html(<div class="pl10">)
#myterm2(){{
# ストレージID取得 ( /subscriptions/XXXXXXXXXXXXX/resourceGroups/XXXXXXXXXXXX/providers/Microsoft.Storage/storageAccounts/XXXXXXXXXX )
storageid=`az storage account show --name $storageAccountName --resource-group $resourceGroup --query id --output tsv`
# ストレージアカウントをサブスクライブする
az eventgrid event-subscription create \
--source-resource-id $storageid \
--name $event_subscription_name \
--endpoint $endpoint_url \
:
# 配信できなかった時の保存先
--deadletter-endpoint $storageid/blobServices/default/containers/保存先のストレージコンテナ名
}}
#html(</div>)
#html(</div>)
* 動作確認 [#x5cd5e0c]
#html(<div class="pl10">)
** 処理の変更 [#n1e5ca08]
#html(<div class="pl10">)
[[Azure Event Grid の webhook]] で作成した処理を少し変更して、ファイル名の末尾が error.csv だった時はエラーステータスを返すようにする。
apiserver.go
#mycode2(){{
package main
import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"net/url"
"os"
"strings"
"time"
"github.com/Azure/azure-storage-blob-go/azblob"
)
type InvokeResponse struct {
File string
}
type InvokeRequest struct {
Topic string
Subject string
EventType string
EventTime string
Id string
Data map[string]interface{}
DataVersion string
MetadataVersion string
}
/**
* 環境変数の取得.
*/
func getEnv(envName string, defaultValue string) string {
value, exists := os.LookupEnv(envName)
if exists {
return value
} else {
return defaultValue
}
}
func init(){
setLogfile(true)
}
func printDebug(format string, params ...interface{}){
setLogfile(false)
msg := fmt.Sprintf(format, params...)
log.Printf("[DEBUG] %s\n", msg)
}
func printInfo(format string, params ...interface{}){
setLogfile(false)
msg := fmt.Sprintf(format, params...)
log.Printf("[INFO] %s\n", msg)
}
func printError(format string, params ...interface{}){
setLogfile(false)
msg := fmt.Sprintf(format, params...)
log.Printf("[ERROR] %s\n", msg)
}
func fileExists(name string) bool {
_, err := os.Stat(name)
return !os.IsNotExist(err)
}
func setLogfile(init bool){
log.SetFlags(log.Ldate|log.Ltime|log.Lmicroseconds|log.Lshortfile)
logDirPath := getEnv("LOG_DIR", "/tmp")
logPrefix := getEnv("LOG_PREFIX", "apiserver_")
logfilePath := fmt.Sprintf("%s/%s%s.log", logDirPath, logPrefix, time.Now().Format("2006-01-02"))
if init || !fileExists(logfilePath) {
newlogfile, err := os.OpenFile(logfilePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
panic("cannnot open " + logfilePath + " : "+ err.Error())
}
log.SetOutput(io.MultiWriter(newlogfile, os.Stdout))
printInfo("switch logfile: %s", logfilePath)
}
}
func blobTriggerHandler(w http.ResponseWriter, r *http.Request) {
printInfo("START blobTriggerHandler")
var event InvokeRequest
var fileUrl string
defer func(){
err := recover()
if fileUrl == "" {
fileUrl = "unknown"
}
if err != nil {
// ### エラー時は異常系のステータスを返すようにする #######################################
printError("Failure process %s , event: %v, error: %v", fileUrl, event, err)
http.Error(w, fmt.Sprintf("%v",err), http.StatusInternalServerError)
// #########################################################################
} else {
printInfo("Success process %s", fileUrl)
printInfo("END blobTriggerHandler")
}
}()
//------------------------------------------------
// リクエストデータのパース
//------------------------------------------------
var invokeReq []InvokeRequest
d := json.NewDecoder(r.Body)
decodeErr := d.Decode(&invokeReq)
if decodeErr != nil {
http.Error(w, decodeErr.Error(), http.StatusBadRequest)
return
}
// イベントデータの取得
event = invokeReq[0]
// エンドポイントの検証用リクエストの時は validationCode を返して終了(同期ハンドシェイク)
if event.EventType == "Microsoft.EventGrid.SubscriptionValidationEvent" {
fileUrl = "SubscriptionValidationEvent"
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(fmt.Sprintf("{\"validationResponse\": \"%s\"}", event.Data["validationCode"])))
return
}
//------------------------------------------------
// ○○処理.
//------------------------------------------------
fileUrl = fmt.Sprintf("%s", event.Data["url"])
sampleProc(fileUrl)
//------------------------------------------------
// レスポンス生成
//------------------------------------------------
invokeResponse := InvokeResponse{File: fileUrl}
js, err := json.Marshal(invokeResponse)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(js)
}
/**
* ○○処理.
*/
func sampleProc(fileUrl string) {
// 対象のファイルURLをドメイン+コンテナ部分とファイル名部分に分ける
urlParts := strings.Split(fileUrl, "/")
containerUrlText := fmt.Sprintf("https://%s/%s", urlParts[2], urlParts[3])
rep := strings.NewReplacer(fmt.Sprintf("%s/",containerUrlText), "")
fileName := rep.Replace(fileUrl)
accountName := strings.Split(urlParts[2], ".")[0]
containerName := urlParts[3]
// ローカル実行用の環境変数がある場合はそちらを優先
localContainerUrl := getEnv("LOCAL_STORAGE_CONTAINER_URL", "")
if localContainerUrl != "" {
containerUrlText = fmt.Sprintf("%s/%s/%s", localContainerUrl, accountName, containerName)
}
printDebug("fileUrl: %s", fileUrl)
printDebug("containerUrl: %s", containerUrlText)
printDebug("fileName : %s", fileName)
// ### ファイル名が error.csv の時は失敗させる ############
if strings.HasSuffix(fileName, "error.csv") {
panic("error sample!")
}
// ##########################################
// 対象のファイルURLを取得
keyEnvName := fmt.Sprintf("STORAGE_KEY_%s", accountName)
accountKey := getEnv(keyEnvName, "")
if accountKey == "" {
panic(fmt.Sprintf("環境変数(%s)が設定されていません。", keyEnvName))
}
ctx := context.Background()
credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
if err != nil {
panic(fmt.Sprintf("Get Credential Error: %v", err))
}
p := azblob.NewPipeline(credential, azblob.PipelineOptions{})
cURL, _ := url.Parse(containerUrlText)
containerURL := azblob.NewContainerURL(*cURL, p)
blobURL := containerURL.NewBlobURL(fileName)
// サイズを取得
var blobSize int64 = 1024
blobPropResponse, err := blobURL.GetProperties(ctx, azblob.BlobAccessConditions{})
if err != nil {
panic(fmt.Sprintf("Get Properties Error: %v", err))
} else {
blobSize = blobPropResponse.ContentLength()
}
// ファイル内容をバッファに取得
downloadedData := make([]byte, blobSize)
err = azblob.DownloadBlobToBuffer(ctx, blobURL, 0, azblob.CountToEnd, downloadedData, azblob.DownloadFromBlobOptions{})
if err != nil {
panic(fmt.Sprintf("Download Error: %v", err))
} else {
// ファイル内容を表示
printInfo("Download Success %s", fileUrl)
printInfo(string(downloadedData))
}
}
/**
* メイン.
*/
func main() {
httpInvokerPort, exists := os.LookupEnv("HTTPWORKER_PORT")
if ! exists {
httpInvokerPort = "8000"
}
printInfo("HTTPWORKER_PORT: " + httpInvokerPort)
mux := http.NewServeMux()
mux.HandleFunc("/", blobTriggerHandler)
printInfo("Go server Listening...on httpInvokerPort: %s", httpInvokerPort)
log.Fatal(http.ListenAndServe(":"+httpInvokerPort, mux))
}
}}
#html(</div>)
** 再試行ポリシーの変更 [#n55054c5]
#html(<div class="pl10">)
いったん再試行ポリシーを10分間、最大5回 に変更し、配信できなかった時の保存先に適当なストレージコンテナを指定する。
#html(<div class="images">)
&ref(webhook_retry_setting.png,nolink);
#html(</div>)
#html(</div>)
** ファイルのアップロード [#e0454778]
#html(<div class="pl10">)
#myterm2(){{
# error.csv をアップロードする。
connstr=`az storage account show-connection-string --name ストレージアカウント名 -o table | tail -1`
az storage blob upload -f error.csv -c ストレージコンテナ名 -n error.csv --connection-string "${connstr}"
}}
#html(</div>)
** ログの確認 [#g9473b3d]
#html(<div class="pl10">)
サーバ側のログを確認してみると、設定した通り5回の試行が行われている。(再試行は4回)
#myterm2(){{
2020/09/09 03:11:06.738022 apiserver.go:67: [INFO] START blobTriggerHandler
2020/09/09 03:11:06.738152 apiserver.go:60: [DEBUG] fileUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/error.csv
2020/09/09 03:11:06.738176 apiserver.go:60: [DEBUG] containerUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名
2020/09/09 03:11:06.738188 apiserver.go:60: [DEBUG] fileName : error.csv
2020/09/09 03:11:06.738258 apiserver.go:74: [ERROR] Failure process https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/error.csv , event: ... , error sample!
2020/09/09 03:11:16.741857 apiserver.go:67: [INFO] START blobTriggerHandler
2020/09/09 03:11:16.741980 apiserver.go:60: [DEBUG] fileUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/error.csv
2020/09/09 03:11:16.741993 apiserver.go:60: [DEBUG] containerUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名
2020/09/09 03:11:16.742012 apiserver.go:60: [DEBUG] fileName : error.csv
2020/09/09 03:11:16.742067 apiserver.go:74: [ERROR] Failure process https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/error.csv , event: ... , error sample!
2020/09/09 03:11:46.741888 apiserver.go:67: [INFO] START blobTriggerHandler
2020/09/09 03:11:46.742049 apiserver.go:60: [DEBUG] fileUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/error.csv
2020/09/09 03:11:46.742075 apiserver.go:60: [DEBUG] containerUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名
2020/09/09 03:11:46.742085 apiserver.go:60: [DEBUG] fileName : error.csv
2020/09/09 03:11:46.742147 apiserver.go:74: [ERROR] Failure process https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/error.csv , event: ... , error sample!
2020/09/09 03:12:09.818845 apiserver.go:67: [INFO] START blobTriggerHandler
2020/09/09 03:12:09.818978 apiserver.go:60: [DEBUG] fileUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/error.csv
2020/09/09 03:12:09.818992 apiserver.go:60: [DEBUG] containerUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名
2020/09/09 03:12:09.819000 apiserver.go:60: [DEBUG] fileName : error.csv
2020/09/09 03:12:09.819073 apiserver.go:74: [ERROR] Failure process https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/error.csv , event: ... , error sample!
2020/09/09 03:12:46.736035 apiserver.go:67: [INFO] START blobTriggerHandler
2020/09/09 03:12:46.736157 apiserver.go:60: [DEBUG] fileUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/error.csv
2020/09/09 03:12:46.736170 apiserver.go:60: [DEBUG] containerUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名
2020/09/09 03:12:46.736177 apiserver.go:60: [DEBUG] fileName : error.csv
2020/09/09 03:12:46.736228 apiserver.go:74: [ERROR] Failure process https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/error.csv , event: ... , error sample!
}}
#html(</div>)
** ストレージアカウントに保存されたファイル [#b749ba6c]
#html(<div class="pl10">)
設定したストレージコンテナの以下のPATHに再試行に失敗したイベント情報のファイルが作成される。
/ストレージアカウント名/イベントサブスクリプション名/YYYY/M/D/H/XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX.json
#mycode2(){{
[
{
"id": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxx,
"eventTime": "2020-09-09T03:11:06.3440237Z",
"eventType": "Microsoft.Storage.BlobCreated",
"dataVersion": "",
"metadataVersion": "1",
"topic": "/subscriptions/xxxxxxxxxxxxxxx/resourceGroups/リソースグループ名/providers/Microsoft.Storage/storageAccounts/ストレージアカウント名",
"subject": "/blobServices/default/containers/ストレージコンテナ名/blobs/error.csv",
"deadLetterReason": "TimeToLiveExceeded",
"deliveryAttempts": 4,
"lastDeliveryOutcome": "GenericError",
"lastHttpStatusCode": 500,
"publishTime": "2020-09-09T03:11:06.7291287Z",
"lastDeliveryAttemptTime": "2020-09-09T03:12:46.7329410Z",
"data": {
"api": "PutBlob",
"clientRequestId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
"requestId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
"eTag": "0xXXXXXXXXXXXXXXX",
"contentType": "text/csv",
"contentLength": 60,
"blobType": "BlockBlob",
"url": "https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/error.csv",
"sequencer": "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
"storageDiagnostics": {
"batchId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
}
}
}
]
}}
#html(</div>)
#html(</div>)
* 何回目の配信(試行)かを判断するには [#wb37cdc4]
#html(<div class="pl10">)
リクエストデータをダンプしてみた所 ''Aeg-Delivery-Count'' というリクエストヘッダに何回目の試行かがセットされいる模様。
なので、例えば 5 回目の試行でエラーだった時だけ、何かしらの処理(例えばメール通知など)を行いたい場合、以下のコードで判断できる。
※ &color(red){''ただし、この方法には問題がある''}; (後述)
注意)
- Aeg-Delivery-Count は 0 始まり。
- このヘッダについては [[webhookのドキュメント>https://docs.microsoft.com/ja-jp/azure/event-grid/handler-webhooks]] には記載されておらず、[[Event Hubsのドキュメント>https://docs.microsoft.com/ja-jp/azure/event-grid/handler-event-hubs]] に記載されている為、ヘッダがない可能性も考慮しておく必要があるか。
#mycode2(){{
:
func blobTriggerHandler(w http.ResponseWriter, r *http.Request) {
printInfo("START blobTriggerHandler")
var event InvokeRequest
var fileUrl string
defer func(){
err := recover()
if fileUrl == "" {
fileUrl = "unknown"
}
if err != nil {
printError("Failure process %s , event: %v, error: %v, request: %v", fileUrl, event, err, r)
deliveryCount := r.Header.Get("Aeg-Delivery-Count")
printError("deliveryCount: %s", deliveryCount)
// Aeg-Delivery-Count ヘッダがない場合 または 5回目の配信の時 ( 0 から始まるので 5回目は 4 )
if deliveryCount == "" || deliveryCount == "4" {
printError("Failed %s times delivery", deliveryCount)
// メール通知など
}
printInfo("END blobTriggerHandler")
http.Error(w, fmt.Sprintf("%v",err), http.StatusInternalServerError)
} else {
printInfo("Success process %s", fileUrl)
printInfo("END blobTriggerHandler")
}
}()
:
}}
#html(</div>)
** 関数アプリの場合はどうか [#j5619420]
* 前述のコード何が問題か [#e2c05f01]
#html(<div class="pl10">)
このサンプルの Webhook は 自前のサーバ等に対してイベント送信を行っている為、当たり前だが サーバが常に起動している事が前提となる。
つまり、サーバ処理のデプロイや再起動、またはサーバ停止しているタイミングで hook された場合、処理自体が走らない可能性がある。
これを解決する1つの案として、前述のエラー情報が出力されるコンテナに Blobトリガーを設定して、そちらで通知のみを行う方法がある。
※ [[Azure Functions の SLA>https://azure.microsoft.com/ja-jp/support/legal/sla/functions/v1_1/]] によると、それでも SLAは 99.95% との事だが。。
#html(<div class="images">)
&ref(webhook_error_notice.png,nolink);
#html(</div>)
#html(</div>)
* 関数アプリの場合はどうか [#j5619420]
#html(<div class="pl10">)
Blobトリガーで起動される関数アプリの場合は、以下の通りとなる。
- 試行回数が判断できるような http ヘッダは送信されない。
- 再試行は5回行われる。
- 5回の試行で全てエラーとなった場合は webjobs-blobtrigger-poison というストレージキューに情報が記録される。
以下、キューに登録される内容
#mycode2(){{
{
"Type": "BlobTrigger",
"FunctionId": "Host.Functions.関数名",
"BlobType": "BlockBlob",
"ContainerName": "ストレージコンテナ名",
"BlobName": "ファイル名",
"ETag": "\"0xXXXXXXXXXXXXXXX\""
}
}}
#html(</div>)