#mynavi(Azureメモ) #setlinebreak(on); #TODO(){{ 「Azure Blobトリガーで起動される関数をリトライで再利用する」 と記事を統合する。 }} #html(){{ <style> .images img { border: 1px solid #333; } </style> }} * 概要 [#r5ca587b] #html(<div class="pl10">) [[Azure Functions の異常を検知する]] では、Azure Monitor を使用してメール通知を行ったが、&color(red){Azure Monitor ではメールの本文を指定して送信する事ができない};。 また、&color(red){Blobトリガー関数内でメール送信してしまうと再試行の度にメールが送信されてしまう};。(Blobトリガー関数内では再試行回数が判断できない) 等の問題がある。 そこで、ここでは有害キュー(webjobs-blobtrigger-poison)にキュートリガーを設定して関数アプリから異常通知(メール送信)を行う例を記載する。 - 基本的な内容は [[Azure Blobトリガーで起動される関数をリトライで再利用する]] と同じだが、これにメール通知が加わる形。&br;※ 関数アプリ自体は、Blobトリガー関数やリトライ用のhttpトリガーが乗っているものと同じもの。(関数を追加する形) - 通知処理を正常終了させてしまうとキューからメッセージが削除されてしまう為、リラン用のキューを別で用意して、そちらに内容をコピーする。 - メール送信自体は SendGrid を利用して行う。( 参照: [[Azureからのメール送信(SendGird使用)]] ) #html(<div class="images">) &ref(azure_blob_trigger_error_notice_mail2.png,nolink); #html(</div>) #html(</div>) * 目次 [#c3ab24a8] #contents - 関連 -- [[Azure Functions の異常を検知する]] -- [[Azure Blobトリガーで起動される関数をリトライで再利用する]] -- [[Azure Functions を Go で書く]] -- [[Azure Functions のログを参照する]] -- [[Azureからのメール送信(SendGird使用)]] * サンプル関数 [#z575ec59] #html(<div class="pl10">) 以下に記載がないファイルは [[Azure Blobトリガーで起動される関数をリトライで再利用する]] と同じ。 #html(){{ <div id="tabs1"> <ul> <li><a href="#tabs1-1">host.json</a></li> <li><a href="#tabs1-2">local.settings.json</a></li> <li><a href="#tabs1-4">ErrorNotice/function.json</a></li> <li><a href="#tabs1-5">server.go</a></li> </ul> }} // START tabs1-1 #html(<div id="tabs1-1">) #mycode2(){{ { "version": "2.0", "httpWorker": { "description": { "defaultExecutablePath": "go_server_sample.exe" "defaultExecutablePath": "server.exe" } }, "extensions": { "queues": { "maxPollingInterval": "00:00:10", "visibilityTimeout" : "00:00:00", "batchSize": 16, "maxDequeueCount": 5, "newBatchThreshold": 8 } }, "extensionBundle": { "id": "Microsoft.Azure.Functions.ExtensionBundle", "version": "[1.*, 2.0.0)" } } }} #html(</div>) // END tabs1-1 // START tabs1-2 #html(<div id="tabs1-2">) #mycode2(){{ { "IsEncrypted": false, "Values": { "AzureWebJobsStorage": "UseDevelopmentStorage=true", "DB_HOST": "localhost", "DB_PORT": "8086", "DB_NAME": "sampledb", "DB_USER": "sample", "DB_PW": "sample", "ACCOUNT_NAME": "devstoreaccount1", "ACCOUNT_KEY": "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==", "RERUN_QUEUE": "rerun-queue", "SENDGRID_APIKEY": "SendGridのAPIキー", "MAIL_FROM_ADDRESS": "送信元アドレス", "MAIL_TO_ADDRESS": "送信先アドレス" } } }} #html(</div>) // END tabs1-2 // START tabs1-4 #html(<div id="tabs1-4">) #mycode2(){{ { "bindings": [ { "name": "queueItem", "type": "queueTrigger", "direction": "in", "queueName": "webjobs-blobtrigger-poison", "connection": "AzureWebJobsStorage" }, { "name": "rerunqueue", "type": "queue", "direction": "out", "queueName": "rerun-queue", "connection": "AzureWebJobsStorage" } ] } }} #html(</div>) // END tabs1-4 // START tabs1-5 #html(<div id="tabs1-5">) #mycode2(){{ package main import ( "context" "encoding/base64" "encoding/json" "fmt" "log" "net/http" "net/url" "os" "time" "strconv" "strings" "github.com/Azure/azure-storage-blob-go/azblob" "github.com/Azure/azure-storage-queue-go/azqueue" "github.com/influxdata/influxdb-client-go" "github.com/sendgrid/sendgrid-go" "github.com/sendgrid/sendgrid-go/helpers/mail" ) type ReturnValue struct { Data string } type InvokeResponse struct { Outputs map[string]interface{} Logs []string ReturnValue interface{} } type InvokeRequest struct { Data map[string]interface{} Metadata map[string]interface{} } func printDebug(format string, params ...interface{}){ log.SetOutput(os.Stdout) msg := fmt.Sprintf(format, params...) log.Printf("[DEBUG] %s\n", msg) } func printInfo(format string, params ...interface{}){ log.SetOutput(os.Stdout) msg := fmt.Sprintf(format, params...) log.Printf("[INFO] %s\n", msg) } func printError(format string, params ...interface{}){ log.SetOutput(os.Stderr) msg := fmt.Sprintf(format, params...) log.Printf("[ERROR] %s\n", msg) log.SetOutput(os.Stdout) } func init(){ log.SetOutput(os.Stdout) log.SetFlags(0) } func isLocalAccount(accountName string) bool { return accountName == "devstoreaccount1" } /** * アップロードされたBlobファイル(CSV)の内容をInfluxDBに登録する. */ func blobTriggerHandler(w http.ResponseWriter, r *http.Request) { printInfo("START blobTriggerHandler") fileUrl := "" defer func(){ err := recover() if fileUrl == "" { fileUrl = "unknown" } if err != nil { panic(fmt.Sprintf("ERROR blobTriggerHandler %s, %v\n", fileUrl, err)); //http.Error(w, err.Error(), http.StatusInternalServerError) } else { printInfo("Result: Success %s", fileUrl) } }() logs := make([]string, 0) printDebug("Request: %v", r) printDebug("Request Body: %v", r.Body) // リクエストデータ取得 var invokeReq InvokeRequest d := json.NewDecoder(r.Body) decodeErr := d.Decode(&invokeReq) if decodeErr != nil { http.Error(w, decodeErr.Error(), http.StatusBadRequest) return } fileUrl = strings.Replace(invokeReq.Metadata["Uri"].(string), "\"", "", -1) // "が含まれているので除去 fileData, _ := base64.StdEncoding.DecodeString(invokeReq.Data["blobData"].(string)) // DB(InfluxDB)にデータ登録 errInsert := insertData(fileUrl, fileData) if errInsert != nil { panic(errInsert) } // レスポンスデータ設定 invokeResponse := InvokeResponse{Logs: logs} js, err := json.Marshal(invokeResponse) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") w.Write(js) } /** * 環境変数の取得. */ func getEnv(envName string, defaultValue string) string { value, exists := os.LookupEnv(envName) if exists { return value } else { return defaultValue } } /** * DBクライアント取得. */ func getDbClient() (influxdb2.Client, string) { dbHost := getEnv("DB_HOST", "localhost") dbPort := getEnv("DB_PORT", "8086") dbName := getEnv("DB_NAME", "sampledb") dbUser := getEnv("DB_USER", "sample") dbPw := getEnv("DB_PW", "sample") printInfo("http://%s:%s\n", dbHost, dbPort) printInfo("%s:%s\n", dbUser, dbPw) client := influxdb2.NewClient(fmt.Sprintf("http://%s:%s", dbHost, dbPort), fmt.Sprintf("%s:%s", dbUser, dbPw)) return client, dbName } /** * データ登録(InfluxDB). */ func insertData(fileUrl string, fileData []byte) error { printInfo("START insertData %s", fileUrl) var client influxdb2.Client var dbName string //var index int defer func(){ //err := recover() if client != nil { client.Close() } //if err != nil { // panic(fmt.Sprintf("Error insertData %v, line: %d", err, index)) //} }() rows := parseCsv(string(fileData)) client, dbName = getDbClient() writeAPI := client.WriteAPIBlocking("", fmt.Sprintf("%s/autogen", dbName)) // DB登録 for i, row := range rows { printDebug("line: %d, data: %v", i, row) //index = i + 1 rowtime, err0 := time.Parse("2006-01-02 15:04:05.000-0700", fmt.Sprintf("%s+0900",row["time"])) if err0 != nil { return err0 } col1, err1 := strconv.ParseFloat(row["col1"], 64) if err1 != nil { return err1 } col2, err2 := strconv.ParseFloat(row["col2"], 64) if err2 != nil { return err2 } col3, err3 := strconv.ParseFloat(row["col3"], 64) if err3 != nil { return err3 } p := influxdb2.NewPointWithMeasurement("sample"). AddTag("file", fileUrl). AddField("col1", col1). AddField("col2", col2). AddField("col3", col3). SetTime(rowtime) dberr := writeAPI.WritePoint(context.Background(), p) if dberr != nil { //panic(fmt.Sprintf("DB Write ERROR: %v", dberr)) return dberr } else { printInfo("DB Write SUCCESS. line: %d", i + 1) } } printInfo("END insertData %s", fileUrl) return nil } /** * CSV文字列のパース. */ func parseCsv(csvText string) ([]map[string]string) { printInfo("START parseCsv") procIndex := -1 defer func(){ err := recover() if err != nil { printError("error: file: %s, line: %d, %v", procIndex, err) panic("parseCsv Error!\n"); } }() lines := strings.Split(csvText, "\n") var columns []string rows := make([]map[string]string, 0) for i, line := range lines { if line == "" { break } procIndex = i if i == 0 { columns = strings.Split(line, ",") } else { values := strings.Split(line, ",") row := make(map[string]string, len(values)) for j, val := range values { // ヘッダの列数より多い時はコケるようにしておく colname := columns[j] row[colname] = val } rows = append(rows, row) } } printInfo("END parseCsv") return rows } /** * アカウント情報の取得 */ func getErrorQueueInfo() (string, string, string) { // デフォルトはローカルのエミュレータ(Azurite) accountName := getEnv("ACCOUNT_NAME", "") accountKey := getEnv("ACCOUNT_KEY" , "") queueName := getEnv("RERUN_QUEUE" , "") return accountName, accountKey, queueName } /** * 有害キューに溜まっている全てのメッセージからBlobファイルURLを取得し再処理する。 */ func rerunAllHandler(w http.ResponseWriter, r *http.Request) { rerunCount := 0 successCount := 0 defer func(){ errorCount := rerunCount - successCount err := recover() if err != nil { printError("ERROR rerunAllHandler, Success: %d, Error: %d, %v", successCount, errorCount, err) } else if (successCount < rerunCount) { printError("ERROR rerunAllHandler, Success: %d, Error: %d", successCount, errorCount) } else { printInfo("SUCCESS rerunAllHandler, Success: %d, Error: %d", successCount, errorCount) } }() accountName, accountKey, queueName := getErrorQueueInfo() // ローカルエミュレータへの接続の時はURLフォーマットを変える queueUrlFormat := "https://%s.queue.core.windows.net/%s" if isLocalAccount(accountName) { queueUrlFormat = "http://127.0.0.1:10001/%s/%s" } // キューURLの取得 credential, err := azqueue.NewSharedKeyCredential(accountName, accountKey) if err != nil { panic(fmt.Sprintf("NewSharedKeyCredential error: %v", err)) } u, _ := url.Parse(fmt.Sprintf(queueUrlFormat, accountName, queueName)) queueUrl := azqueue.NewQueueURL(*u, azqueue.NewPipeline(credential, azqueue.PipelineOptions{})) // メッセージ一覧の取得 queueCtx := context.TODO() msgUrl := queueUrl.NewMessagesURL() maxMessages := int32(32) // 最大件数 visibilityTimeout := time.Second * 10 // 可視化タイムアウト dequeueResp, err := msgUrl.Dequeue(queueCtx, maxMessages, visibilityTimeout) if err != nil { panic(err) } else { //------------------------------------------------------------ // 全てのメッセージを処理 //------------------------------------------------------------ for i := int32(0); i < dequeueResp.NumMessages(); i++ { msg := dequeueResp.Message(i) rerunCount = rerunCount + 1 //------------------------------------------------------------ // メッセージをBase64デコードしてJSON文字列に戻す //------------------------------------------------------------ eventData, _ := base64.StdEncoding.DecodeString(msg.Text) //------------------------------------------------------------ // JSON文字列をパースして構造体にする //------------------------------------------------------------ var poisonData map[string]interface{} decodeErr := json.Unmarshal(eventData, &poisonData) if decodeErr != nil { printError("json decode error: %v",decodeErr.Error()) continue } //------------------------------------------------------------ // メッセージに含まれるコンテナ名、Blobファイル名を取得する //------------------------------------------------------------ containerUrlFormat := "https://%s.blob.core.windows.net/%s" if isLocalAccount(accountName) { containerUrlFormat = "http://127.0.0.1:10000/%s/%s" } ctx := context.Background() containerUrlText := fmt.Sprintf(containerUrlFormat, accountName, poisonData["ContainerName"]) credential, err := azblob.NewSharedKeyCredential(accountName, accountKey) if err != nil { printError("Error azblob.NewSharedKeyCredential: %v", err) continue } fileUrl := fmt.Sprintf("%s/%s", containerUrlText, poisonData["BlobName"]) printInfo("START Rerun. %s", fileUrl) //------------------------------------------------------------ // Blobファイルのダウンロード //------------------------------------------------------------ p := azblob.NewPipeline(credential, azblob.PipelineOptions{}) cURL, _ := url.Parse(containerUrlText) containerURL := azblob.NewContainerURL(*cURL, p) blobURL := containerURL.NewBlobURL(poisonData["BlobName"].(string)) // サイズを取得 var blobSize int64 = 1024 blobPropResponse, err := blobURL.GetProperties(ctx, azblob.BlobAccessConditions{}) if err != nil { printError("GetProperties Error!") continue } else { blobSize = blobPropResponse.ContentLength() } // バッファに取得 fileData := make([]byte, blobSize) err = azblob.DownloadBlobToBuffer(ctx, blobURL, 0, azblob.CountToEnd, fileData, azblob.DownloadFromBlobOptions{}) if err != nil { printError("Download Error. %v", err) continue } else { printDebug("Download Success.") printDebug(string(fileData)) } printDebug("fileUrl: %s", fileUrl) //------------------------------------------------------------ // DB(InfluxDB)にデータ登録 //------------------------------------------------------------ errInsert := insertData(fileUrl, fileData) if errInsert != nil { printError("ERROR Rerun. %s, %v", fileUrl, errInsert) continue } else { successCount = successCount + 1 printInfo("SUCCESS Rerun. %s", fileUrl) // 有害キューからメッセージを削除 msgIdUrl := msgUrl.NewMessageIDURL(msg.ID) _, err = msgIdUrl.Delete(queueCtx, msg.PopReceipt) if err != nil { printError("Error delete poison message for %s (%v)", fileUrl, err) } else { printInfo("Success delete poison message for %s", fileUrl) } } printInfo("END Rerun. %s", fileUrl) } } w.Header().Set("Content-Type", "application/json") w.Write([]byte("{\"message\": \"RerunAll\"}")) } /** * エラー通知. */ func blobErrorHandler(w http.ResponseWriter, r *http.Request) { printInfo("[blobErrorHandler] START") var invokeReq InvokeRequest d := json.NewDecoder(r.Body) decodeErr := d.Decode(&invokeReq) if decodeErr != nil { http.Error(w, decodeErr.Error(), http.StatusBadRequest) return } printInfo("[blobErrorHandler] invokeReq: %v", invokeReq) printInfo("[blobErrorHandler] queue metadata: %v", invokeReq.Metadata) outputs := make(map[string]interface{}) dequeueCount := fmt.Sprintf("%v",invokeReq.Metadata["DequeueCount"]) if dequeueCount == "1" { queueItem := invokeReq.Data["queueItem"].(string) SENDGRID_API_KEY := getEnv("SENDGRID_API_KEY", "") MAIL_FROM_ADDRESS := getEnv("MAIL_FROM_ADDRESS", "") MAIL_TO_ADDRESS := getEnv("MAIL_TO_ADDRESS", "") // 見やすいように過剰エスケープを調整 queueItem = strings.TrimRight(queueItem, "\"") queueItem = strings.TrimLeft(queueItem, "\"") queueItem = strings.Replace(queueItem, "\\r\\n", "\n", -1) queueItem = strings.Replace(queueItem, "\\n", "\n", -1) queueItem = strings.Replace(queueItem, "\\\"", "\"", -1) queueItem = strings.Replace(queueItem, "\\\\\"", "", -1) printInfo("[blobErrorHandler] queue message(Value): %v", queueItem) from := mail.NewEmail("トリガー異常監視", MAIL_FROM_ADDRESS) subject := "Blobトリガーでエラーが発生しています" to := mail.NewEmail(MAIL_TO_ADDRESS, MAIL_TO_ADDRESS) plainTextContent := fmt.Sprintf("有害キューに出力された情報:\n%s", queueItem) htmlContent := fmt.Sprintf("有害キューに出力された情報:<br /><pre style=\"border: 1px solid #333; padding: 10px;\">%s</pre>", queueItem) message := mail.NewSingleEmail(from, subject, to, plainTextContent, htmlContent) client := sendgrid.NewSendClient(SENDGRID_API_KEY) response, err := client.Send(message) if err != nil { printError("[blobErrorHandler] [MAILSEND ERROR] %v", err) } else if (response.StatusCode == 202) { printInfo("[blobErrorHandler] [MAILSEND SUCCESS]") } else { printError("[blobErrorHandler] [MAILSEND ERROR] status: %v, body: %v, header: %v", response.StatusCode, response.Body, response.Headers) } // リラン用のキューにコピー(正常終了するとキューから削除される為) outputs["rerunqueue"] = queueItem } // 正常終了(キューから削除) invokeResponse := InvokeResponse{Outputs: outputs, Logs: []string{"Received blob trigger error."} } js, err := json.Marshal(invokeResponse) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") w.Write(js) //http.Error(w, "process blobErrorHandler.", http.StatusInternalServerError) } func main() { httpInvokerPort, exists := os.LookupEnv("FUNCTIONS_HTTPWORKER_PORT") if exists { printInfo("FUNCTIONS_HTTPWORKER_PORT: " + httpInvokerPort) } mux := http.NewServeMux() mux.HandleFunc("/BlobTrigger", blobTriggerHandler) //mux.HandleFunc("/Rerun" , rerunHandler) mux.HandleFunc("/RerunAll" , rerunAllHandler) mux.HandleFunc("/ErrorNotice", blobErrorHandler) // <- 追加 log.Println("Go server Listening...on httpInvokerPort:", httpInvokerPort) log.Fatal(http.ListenAndServe(":"+httpInvokerPort, mux)) } }} #html(</div>) // END tabs1-5 #html(</div>) // END tabs1 #html(<script>$(function() { $("#tabs1").tabs(); });</script>) #html(</div>) * メール通知イメージ [#jb4dc021] #html(<div class="pl10">) #html(<div class="images">) &ref(azure_blob_trigger_error_notice_mail.png,nolink); #html(</div>) #html(</div>)