TODO: 「Azure Blobトリガーで起動される関数をリトライで再利用する」 と記事を統合する。
概要 †Azure Functions の異常を検知する では、Azure Monitor を使用してメール通知を行ったが、Azure Monitor ではメールの本文を指定して送信する事ができない。 そこで、ここでは有害キュー(webjobs-blobtrigger-poison)にキュートリガーを設定して関数アプリから異常通知(メール送信)を行う例を記載する。
目次 †
サンプル関数 †以下に記載がないファイルは Azure Blobトリガーで起動される関数をリトライで再利用する と同じ。 { "version": "2.0", "httpWorker": { "description": { "defaultExecutablePath": "server.exe" } }, "extensions": { "queues": { "maxPollingInterval": "00:00:10", "visibilityTimeout" : "00:00:00", "batchSize": 16, "maxDequeueCount": 5, "newBatchThreshold": 8 } }, "extensionBundle": { "id": "Microsoft.Azure.Functions.ExtensionBundle", "version": "[1.*, 2.0.0)" } } { "IsEncrypted": false, "Values": { "AzureWebJobsStorage": "UseDevelopmentStorage=true", "DB_HOST": "localhost", "DB_PORT": "8086", "DB_NAME": "sampledb", "DB_USER": "sample", "DB_PW": "sample", "ACCOUNT_NAME": "devstoreaccount1", "ACCOUNT_KEY": "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==", "RERUN_QUEUE": "rerun-queue", "SENDGRID_APIKEY": "SendGridのAPIキー", "MAIL_FROM_ADDRESS": "送信元アドレス", "MAIL_TO_ADDRESS": "送信先アドレス" } } { "bindings": [ { "name": "queueItem", "type": "queueTrigger", "direction": "in", "queueName": "webjobs-blobtrigger-poison", "connection": "AzureWebJobsStorage" }, { "name": "rerunqueue", "type": "queue", "direction": "out", "queueName": "rerun-queue", "connection": "AzureWebJobsStorage" } ] } package main import ( "context" "encoding/base64" "encoding/json" "fmt" "log" "net/http" "net/url" "os" "time" "strconv" "strings" "github.com/Azure/azure-storage-blob-go/azblob" "github.com/Azure/azure-storage-queue-go/azqueue" "github.com/influxdata/influxdb-client-go" "github.com/sendgrid/sendgrid-go" "github.com/sendgrid/sendgrid-go/helpers/mail" ) type ReturnValue struct { Data string } type InvokeResponse struct { Outputs map[string]interface{} Logs []string ReturnValue interface{} } type InvokeRequest struct { Data map[string]interface{} Metadata map[string]interface{} } func printDebug(format string, params ...interface{}){ log.SetOutput(os.Stdout) msg := fmt.Sprintf(format, params...) log.Printf("[DEBUG] %s\n", msg) } func printInfo(format string, params ...interface{}){ log.SetOutput(os.Stdout) msg := fmt.Sprintf(format, params...) log.Printf("[INFO] %s\n", msg) } func printError(format string, params ...interface{}){ log.SetOutput(os.Stderr) msg := fmt.Sprintf(format, params...) log.Printf("[ERROR] %s\n", msg) log.SetOutput(os.Stdout) } func init(){ log.SetOutput(os.Stdout) log.SetFlags(0) } func isLocalAccount(accountName string) bool { return accountName == "devstoreaccount1" } /** * アップロードされたBlobファイル(CSV)の内容をInfluxDBに登録する. */ func blobTriggerHandler(w http.ResponseWriter, r *http.Request) { printInfo("START blobTriggerHandler") fileUrl := "" defer func(){ err := recover() if fileUrl == "" { fileUrl = "unknown" } if err != nil { panic(fmt.Sprintf("ERROR blobTriggerHandler %s, %v\n", fileUrl, err)); //http.Error(w, err.Error(), http.StatusInternalServerError) } else { printInfo("Result: Success %s", fileUrl) } }() logs := make([]string, 0) printDebug("Request: %v", r) printDebug("Request Body: %v", r.Body) // リクエストデータ取得 var invokeReq InvokeRequest d := json.NewDecoder(r.Body) decodeErr := d.Decode(&invokeReq) if decodeErr != nil { http.Error(w, decodeErr.Error(), http.StatusBadRequest) return } fileUrl = strings.Replace(invokeReq.Metadata["Uri"].(string), "\"", "", -1) // "が含まれているので除去 fileData, _ := base64.StdEncoding.DecodeString(invokeReq.Data["blobData"].(string)) // DB(InfluxDB)にデータ登録 errInsert := insertData(fileUrl, fileData) if errInsert != nil { panic(errInsert) } // レスポンスデータ設定 invokeResponse := InvokeResponse{Logs: logs} js, err := json.Marshal(invokeResponse) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") w.Write(js) } /** * 環境変数の取得. */ func getEnv(envName string, defaultValue string) string { value, exists := os.LookupEnv(envName) if exists { return value } else { return defaultValue } } /** * DBクライアント取得. */ func getDbClient() (influxdb2.Client, string) { dbHost := getEnv("DB_HOST", "localhost") dbPort := getEnv("DB_PORT", "8086") dbName := getEnv("DB_NAME", "sampledb") dbUser := getEnv("DB_USER", "sample") dbPw := getEnv("DB_PW", "sample") printInfo("http://%s:%s\n", dbHost, dbPort) printInfo("%s:%s\n", dbUser, dbPw) client := influxdb2.NewClient(fmt.Sprintf("http://%s:%s", dbHost, dbPort), fmt.Sprintf("%s:%s", dbUser, dbPw)) return client, dbName } /** * データ登録(InfluxDB). */ func insertData(fileUrl string, fileData []byte) error { printInfo("START insertData %s", fileUrl) var client influxdb2.Client var dbName string //var index int defer func(){ //err := recover() if client != nil { client.Close() } //if err != nil { // panic(fmt.Sprintf("Error insertData %v, line: %d", err, index)) //} }() rows := parseCsv(string(fileData)) client, dbName = getDbClient() writeAPI := client.WriteAPIBlocking("", fmt.Sprintf("%s/autogen", dbName)) // DB登録 for i, row := range rows { printDebug("line: %d, data: %v", i, row) //index = i + 1 rowtime, err0 := time.Parse("2006-01-02 15:04:05.000-0700", fmt.Sprintf("%s+0900",row["time"])) if err0 != nil { return err0 } col1, err1 := strconv.ParseFloat(row["col1"], 64) if err1 != nil { return err1 } col2, err2 := strconv.ParseFloat(row["col2"], 64) if err2 != nil { return err2 } col3, err3 := strconv.ParseFloat(row["col3"], 64) if err3 != nil { return err3 } p := influxdb2.NewPointWithMeasurement("sample"). AddTag("file", fileUrl). AddField("col1", col1). AddField("col2", col2). AddField("col3", col3). SetTime(rowtime) dberr := writeAPI.WritePoint(context.Background(), p) if dberr != nil { //panic(fmt.Sprintf("DB Write ERROR: %v", dberr)) return dberr } else { printInfo("DB Write SUCCESS. line: %d", i + 1) } } printInfo("END insertData %s", fileUrl) return nil } /** * CSV文字列のパース. */ func parseCsv(csvText string) ([]map[string]string) { printInfo("START parseCsv") procIndex := -1 defer func(){ err := recover() if err != nil { printError("error: file: %s, line: %d, %v", procIndex, err) panic("parseCsv Error!\n"); } }() lines := strings.Split(csvText, "\n") var columns []string rows := make([]map[string]string, 0) for i, line := range lines { if line == "" { break } procIndex = i if i == 0 { columns = strings.Split(line, ",") } else { values := strings.Split(line, ",") row := make(map[string]string, len(values)) for j, val := range values { // ヘッダの列数より多い時はコケるようにしておく colname := columns[j] row[colname] = val } rows = append(rows, row) } } printInfo("END parseCsv") return rows } /** * アカウント情報の取得 */ func getErrorQueueInfo() (string, string, string) { // デフォルトはローカルのエミュレータ(Azurite) accountName := getEnv("ACCOUNT_NAME", "") accountKey := getEnv("ACCOUNT_KEY" , "") queueName := getEnv("RERUN_QUEUE" , "") return accountName, accountKey, queueName } /** * 有害キューに溜まっている全てのメッセージからBlobファイルURLを取得し再処理する。 */ func rerunAllHandler(w http.ResponseWriter, r *http.Request) { rerunCount := 0 successCount := 0 defer func(){ errorCount := rerunCount - successCount err := recover() if err != nil { printError("ERROR rerunAllHandler, Success: %d, Error: %d, %v", successCount, errorCount, err) } else if (successCount < rerunCount) { printError("ERROR rerunAllHandler, Success: %d, Error: %d", successCount, errorCount) } else { printInfo("SUCCESS rerunAllHandler, Success: %d, Error: %d", successCount, errorCount) } }() accountName, accountKey, queueName := getErrorQueueInfo() // ローカルエミュレータへの接続の時はURLフォーマットを変える queueUrlFormat := "https://%s.queue.core.windows.net/%s" if isLocalAccount(accountName) { queueUrlFormat = "http://127.0.0.1:10001/%s/%s" } // キューURLの取得 credential, err := azqueue.NewSharedKeyCredential(accountName, accountKey) if err != nil { panic(fmt.Sprintf("NewSharedKeyCredential error: %v", err)) } u, _ := url.Parse(fmt.Sprintf(queueUrlFormat, accountName, queueName)) queueUrl := azqueue.NewQueueURL(*u, azqueue.NewPipeline(credential, azqueue.PipelineOptions{})) // メッセージ一覧の取得 queueCtx := context.TODO() msgUrl := queueUrl.NewMessagesURL() maxMessages := int32(32) // 最大件数 visibilityTimeout := time.Second * 10 // 可視化タイムアウト dequeueResp, err := msgUrl.Dequeue(queueCtx, maxMessages, visibilityTimeout) if err != nil { panic(err) } else { //------------------------------------------------------------ // 全てのメッセージを処理 //------------------------------------------------------------ for i := int32(0); i < dequeueResp.NumMessages(); i++ { msg := dequeueResp.Message(i) rerunCount = rerunCount + 1 //------------------------------------------------------------ // メッセージをBase64デコードしてJSON文字列に戻す //------------------------------------------------------------ eventData, _ := base64.StdEncoding.DecodeString(msg.Text) //------------------------------------------------------------ // JSON文字列をパースして構造体にする //------------------------------------------------------------ var poisonData map[string]interface{} decodeErr := json.Unmarshal(eventData, &poisonData) if decodeErr != nil { printError("json decode error: %v",decodeErr.Error()) continue } //------------------------------------------------------------ // メッセージに含まれるコンテナ名、Blobファイル名を取得する //------------------------------------------------------------ containerUrlFormat := "https://%s.blob.core.windows.net/%s" if isLocalAccount(accountName) { containerUrlFormat = "http://127.0.0.1:10000/%s/%s" } ctx := context.Background() containerUrlText := fmt.Sprintf(containerUrlFormat, accountName, poisonData["ContainerName"]) credential, err := azblob.NewSharedKeyCredential(accountName, accountKey) if err != nil { printError("Error azblob.NewSharedKeyCredential: %v", err) continue } fileUrl := fmt.Sprintf("%s/%s", containerUrlText, poisonData["BlobName"]) printInfo("START Rerun. %s", fileUrl) //------------------------------------------------------------ // Blobファイルのダウンロード //------------------------------------------------------------ p := azblob.NewPipeline(credential, azblob.PipelineOptions{}) cURL, _ := url.Parse(containerUrlText) containerURL := azblob.NewContainerURL(*cURL, p) blobURL := containerURL.NewBlobURL(poisonData["BlobName"].(string)) // サイズを取得 var blobSize int64 = 1024 blobPropResponse, err := blobURL.GetProperties(ctx, azblob.BlobAccessConditions{}) if err != nil { printError("GetProperties Error!") continue } else { blobSize = blobPropResponse.ContentLength() } // バッファに取得 fileData := make([]byte, blobSize) err = azblob.DownloadBlobToBuffer(ctx, blobURL, 0, azblob.CountToEnd, fileData, azblob.DownloadFromBlobOptions{}) if err != nil { printError("Download Error. %v", err) continue } else { printDebug("Download Success.") printDebug(string(fileData)) } printDebug("fileUrl: %s", fileUrl) //------------------------------------------------------------ // DB(InfluxDB)にデータ登録 //------------------------------------------------------------ errInsert := insertData(fileUrl, fileData) if errInsert != nil { printError("ERROR Rerun. %s, %v", fileUrl, errInsert) continue } else { successCount = successCount + 1 printInfo("SUCCESS Rerun. %s", fileUrl) // 有害キューからメッセージを削除 msgIdUrl := msgUrl.NewMessageIDURL(msg.ID) _, err = msgIdUrl.Delete(queueCtx, msg.PopReceipt) if err != nil { printError("Error delete poison message for %s (%v)", fileUrl, err) } else { printInfo("Success delete poison message for %s", fileUrl) } } printInfo("END Rerun. %s", fileUrl) } } w.Header().Set("Content-Type", "application/json") w.Write([]byte("{\"message\": \"RerunAll\"}")) } /** * エラー通知. */ func blobErrorHandler(w http.ResponseWriter, r *http.Request) { printInfo("[blobErrorHandler] START") var invokeReq InvokeRequest d := json.NewDecoder(r.Body) decodeErr := d.Decode(&invokeReq) if decodeErr != nil { http.Error(w, decodeErr.Error(), http.StatusBadRequest) return } printInfo("[blobErrorHandler] invokeReq: %v", invokeReq) printInfo("[blobErrorHandler] queue metadata: %v", invokeReq.Metadata) outputs := make(map[string]interface{}) dequeueCount := fmt.Sprintf("%v",invokeReq.Metadata["DequeueCount"]) if dequeueCount == "1" { queueItem := invokeReq.Data["queueItem"].(string) SENDGRID_API_KEY := getEnv("SENDGRID_API_KEY", "") MAIL_FROM_ADDRESS := getEnv("MAIL_FROM_ADDRESS", "") MAIL_TO_ADDRESS := getEnv("MAIL_TO_ADDRESS", "") // 見やすいように過剰エスケープを調整 queueItem = strings.TrimRight(queueItem, "\"") queueItem = strings.TrimLeft(queueItem, "\"") queueItem = strings.Replace(queueItem, "\\r\\n", "\n", -1) queueItem = strings.Replace(queueItem, "\\n", "\n", -1) queueItem = strings.Replace(queueItem, "\\\"", "\"", -1) queueItem = strings.Replace(queueItem, "\\\\\"", "", -1) printInfo("[blobErrorHandler] queue message(Value): %v", queueItem) from := mail.NewEmail("トリガー異常監視", MAIL_FROM_ADDRESS) subject := "Blobトリガーでエラーが発生しています" to := mail.NewEmail(MAIL_TO_ADDRESS, MAIL_TO_ADDRESS) plainTextContent := fmt.Sprintf("有害キューに出力された情報:\n%s", queueItem) htmlContent := fmt.Sprintf("有害キューに出力された情報:<br /><pre style=\"border: 1px solid #333; padding: 10px;\">%s</pre>", queueItem) message := mail.NewSingleEmail(from, subject, to, plainTextContent, htmlContent) client := sendgrid.NewSendClient(SENDGRID_API_KEY) response, err := client.Send(message) if err != nil { printError("[blobErrorHandler] [MAILSEND ERROR] %v", err) } else if (response.StatusCode == 202) { printInfo("[blobErrorHandler] [MAILSEND SUCCESS]") } else { printError("[blobErrorHandler] [MAILSEND ERROR] status: %v, body: %v, header: %v", response.StatusCode, response.Body, response.Headers) } // リラン用のキューにコピー(正常終了するとキューから削除される為) outputs["rerunqueue"] = queueItem } // 正常終了(キューから削除) invokeResponse := InvokeResponse{Outputs: outputs, Logs: []string{"Received blob trigger error."} } js, err := json.Marshal(invokeResponse) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") w.Write(js) //http.Error(w, "process blobErrorHandler.", http.StatusInternalServerError) } func main() { httpInvokerPort, exists := os.LookupEnv("FUNCTIONS_HTTPWORKER_PORT") if exists { printInfo("FUNCTIONS_HTTPWORKER_PORT: " + httpInvokerPort) } mux := http.NewServeMux() mux.HandleFunc("/BlobTrigger", blobTriggerHandler) //mux.HandleFunc("/Rerun" , rerunHandler) mux.HandleFunc("/RerunAll" , rerunAllHandler) mux.HandleFunc("/ErrorNotice", blobErrorHandler) // <- 追加 log.Println("Go server Listening...on httpInvokerPort:", httpInvokerPort) log.Fatal(http.ListenAndServe(":"+httpInvokerPort, mux)) } メール通知イメージ † |