- 追加された行はこの色です。
- 削除された行はこの色です。
#author("2020-09-23T16:39:18+00:00","","")
#mynavi(Azureメモ)
#setlinebreak(on);
#TODO(){{
「Azure Blobトリガーで起動される関数をリトライで再利用する」 と記事を統合する。
}}
#html(){{
<style>
.images img { border: 1px solid #333; }
</style>
}}
* 概要 [#r5ca587b]
#html(<div class="pl10">)
[[Azure Functions の異常を検知する]] では、Azure Monitor を使用してメール通知を行ったが、&color(red){Azure Monitor ではメールの本文を指定して送信する事ができない};。
また、&color(red){Blobトリガー関数内でメール送信してしまうと再試行の度にメールが送信されてしまう};。(Blobトリガー関数内では再試行回数が判断できない) 等の問題がある。
そこで、ここでは有害キュー(webjobs-blobtrigger-poison)にキュートリガーを設定して関数アプリから異常通知(メール送信)を行う例を記載する。
- 基本的な内容は [[Azure Blobトリガーで起動される関数をリトライで再利用する]] と同じだが、これにメール通知が加わる形。&br;※ 関数アプリ自体は、Blobトリガー関数やリトライ用のhttpトリガーが乗っているものと同じもの。(関数を追加する形)
- 通知処理を正常終了させてしまうとキューからメッセージが削除されてしまう為、リラン用のキューを別で用意して、そちらに内容をコピーする。
- メール送信自体は SendGrid を利用して行う。( 参照: [[Azureからのメール送信(SendGird使用)]] )
#html(<div class="images">)
&ref(azure_blob_trigger_error_notice_mail2.png,nolink);
#html(</div>)
#html(</div>)
* 目次 [#c3ab24a8]
#contents
- 関連
-- [[Azure Functions の異常を検知する]]
-- [[Azure Blobトリガーで起動される関数をリトライで再利用する]]
-- [[Azure Functions を Go で書く]]
-- [[Azure Functions のログを参照する]]
-- [[Azureからのメール送信(SendGird使用)]]
* サンプル関数 [#z575ec59]
#html(<div class="pl10">)
以下に記載がないファイルは [[Azure Blobトリガーで起動される関数をリトライで再利用する]] と同じ。
#html(){{
<div id="tabs1">
<ul>
<li><a href="#tabs1-1">host.json</a></li>
<li><a href="#tabs1-2">local.settings.json</a></li>
<li><a href="#tabs1-4">ErrorNotice/function.json</a></li>
<li><a href="#tabs1-5">server.go</a></li>
</ul>
}}
// START tabs1-1
#html(<div id="tabs1-1">)
#mycode2(){{
{
"version": "2.0",
"httpWorker": {
"description": {
"defaultExecutablePath": "server.exe"
}
},
"extensions": {
"queues": {
"maxPollingInterval": "00:00:10",
"visibilityTimeout" : "00:00:00",
"batchSize": 16,
"maxDequeueCount": 5,
"newBatchThreshold": 8
}
},
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[1.*, 2.0.0)"
}
}
}}
#html(</div>)
// END tabs1-1
// START tabs1-2
#html(<div id="tabs1-2">)
#mycode2(){{
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"DB_HOST": "localhost",
"DB_PORT": "8086",
"DB_NAME": "sampledb",
"DB_USER": "sample",
"DB_PW": "sample",
"ACCOUNT_NAME": "devstoreaccount1",
"ACCOUNT_KEY": "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==",
"RERUN_QUEUE": "rerun-queue",
"SENDGRID_APIKEY": "SendGridのAPIキー",
"MAIL_FROM_ADDRESS": "送信元アドレス",
"MAIL_TO_ADDRESS": "送信先アドレス"
}
}
}}
#html(</div>)
// END tabs1-2
// START tabs1-4
#html(<div id="tabs1-4">)
#mycode2(){{
{
"bindings": [
{
"name": "queueItem",
"type": "queueTrigger",
"direction": "in",
"queueName": "webjobs-blobtrigger-poison",
"connection": "AzureWebJobsStorage"
},
{
"name": "rerunqueue",
"type": "queue",
"direction": "out",
"queueName": "rerun-queue",
"connection": "AzureWebJobsStorage"
}
]
}
}}
#html(</div>)
// END tabs1-4
// START tabs1-5
#html(<div id="tabs1-5">)
#mycode2(){{
package main
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"log"
"net/http"
"net/url"
"os"
"time"
"strconv"
"strings"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/Azure/azure-storage-queue-go/azqueue"
"github.com/influxdata/influxdb-client-go"
"github.com/sendgrid/sendgrid-go"
"github.com/sendgrid/sendgrid-go/helpers/mail"
)
type ReturnValue struct {
Data string
}
type InvokeResponse struct {
Outputs map[string]interface{}
Logs []string
ReturnValue interface{}
}
type InvokeRequest struct {
Data map[string]interface{}
Metadata map[string]interface{}
}
func printDebug(format string, params ...interface{}){
log.SetOutput(os.Stdout)
msg := fmt.Sprintf(format, params...)
log.Printf("[DEBUG] %s\n", msg)
}
func printInfo(format string, params ...interface{}){
log.SetOutput(os.Stdout)
msg := fmt.Sprintf(format, params...)
log.Printf("[INFO] %s\n", msg)
}
func printError(format string, params ...interface{}){
log.SetOutput(os.Stderr)
msg := fmt.Sprintf(format, params...)
log.Printf("[ERROR] %s\n", msg)
log.SetOutput(os.Stdout)
}
func init(){
log.SetOutput(os.Stdout)
log.SetFlags(0)
}
func isLocalAccount(accountName string) bool {
return accountName == "devstoreaccount1"
}
/**
* アップロードされたBlobファイル(CSV)の内容をInfluxDBに登録する.
*/
func blobTriggerHandler(w http.ResponseWriter, r *http.Request) {
printInfo("START blobTriggerHandler")
fileUrl := ""
defer func(){
err := recover()
if fileUrl == "" {
fileUrl = "unknown"
}
if err != nil {
panic(fmt.Sprintf("ERROR blobTriggerHandler %s, %v\n", fileUrl, err));
//http.Error(w, err.Error(), http.StatusInternalServerError)
} else {
printInfo("Result: Success %s", fileUrl)
}
}()
logs := make([]string, 0)
printDebug("Request: %v", r)
printDebug("Request Body: %v", r.Body)
// リクエストデータ取得
var invokeReq InvokeRequest
d := json.NewDecoder(r.Body)
decodeErr := d.Decode(&invokeReq)
if decodeErr != nil {
http.Error(w, decodeErr.Error(), http.StatusBadRequest)
return
}
fileUrl = strings.Replace(invokeReq.Metadata["Uri"].(string), "\"", "", -1) // "が含まれているので除去
fileData, _ := base64.StdEncoding.DecodeString(invokeReq.Data["blobData"].(string))
// DB(InfluxDB)にデータ登録
errInsert := insertData(fileUrl, fileData)
if errInsert != nil {
panic(errInsert)
}
// レスポンスデータ設定
invokeResponse := InvokeResponse{Logs: logs}
js, err := json.Marshal(invokeResponse)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(js)
}
/**
* 環境変数の取得.
*/
func getEnv(envName string, defaultValue string) string {
value, exists := os.LookupEnv(envName)
if exists {
return value
} else {
return defaultValue
}
}
/**
* DBクライアント取得.
*/
func getDbClient() (influxdb2.Client, string) {
dbHost := getEnv("DB_HOST", "localhost")
dbPort := getEnv("DB_PORT", "8086")
dbName := getEnv("DB_NAME", "sampledb")
dbUser := getEnv("DB_USER", "sample")
dbPw := getEnv("DB_PW", "sample")
printInfo("http://%s:%s\n", dbHost, dbPort)
printInfo("%s:%s\n", dbUser, dbPw)
client := influxdb2.NewClient(fmt.Sprintf("http://%s:%s", dbHost, dbPort), fmt.Sprintf("%s:%s", dbUser, dbPw))
return client, dbName
}
/**
* データ登録(InfluxDB).
*/
func insertData(fileUrl string, fileData []byte) error {
printInfo("START insertData %s", fileUrl)
var client influxdb2.Client
var dbName string
//var index int
defer func(){
//err := recover()
if client != nil {
client.Close()
}
//if err != nil {
// panic(fmt.Sprintf("Error insertData %v, line: %d", err, index))
//}
}()
rows := parseCsv(string(fileData))
client, dbName = getDbClient()
writeAPI := client.WriteAPIBlocking("", fmt.Sprintf("%s/autogen", dbName))
// DB登録
for i, row := range rows {
printDebug("line: %d, data: %v", i, row)
//index = i + 1
rowtime, err0 := time.Parse("2006-01-02 15:04:05.000-0700", fmt.Sprintf("%s+0900",row["time"]))
if err0 != nil {
return err0
}
col1, err1 := strconv.ParseFloat(row["col1"], 64)
if err1 != nil {
return err1
}
col2, err2 := strconv.ParseFloat(row["col2"], 64)
if err2 != nil {
return err2
}
col3, err3 := strconv.ParseFloat(row["col3"], 64)
if err3 != nil {
return err3
}
p := influxdb2.NewPointWithMeasurement("sample").
AddTag("file", fileUrl).
AddField("col1", col1).
AddField("col2", col2).
AddField("col3", col3).
SetTime(rowtime)
dberr := writeAPI.WritePoint(context.Background(), p)
if dberr != nil {
//panic(fmt.Sprintf("DB Write ERROR: %v", dberr))
return dberr
} else {
printInfo("DB Write SUCCESS. line: %d", i + 1)
}
}
printInfo("END insertData %s", fileUrl)
return nil
}
/**
* CSV文字列のパース.
*/
func parseCsv(csvText string) ([]map[string]string) {
printInfo("START parseCsv")
procIndex := -1
defer func(){
err := recover()
if err != nil {
printError("error: file: %s, line: %d, %v", procIndex, err)
panic("parseCsv Error!\n");
}
}()
lines := strings.Split(csvText, "\n")
var columns []string
rows := make([]map[string]string, 0)
for i, line := range lines {
if line == "" {
break
}
procIndex = i
if i == 0 {
columns = strings.Split(line, ",")
} else {
values := strings.Split(line, ",")
row := make(map[string]string, len(values))
for j, val := range values {
// ヘッダの列数より多い時はコケるようにしておく
colname := columns[j]
row[colname] = val
}
rows = append(rows, row)
}
}
printInfo("END parseCsv")
return rows
}
/**
* アカウント情報の取得
*/
func getErrorQueueInfo() (string, string, string) {
// デフォルトはローカルのエミュレータ(Azurite)
accountName := getEnv("ACCOUNT_NAME", "")
accountKey := getEnv("ACCOUNT_KEY" , "")
queueName := getEnv("RERUN_QUEUE" , "")
return accountName, accountKey, queueName
}
/**
* 有害キューに溜まっている全てのメッセージからBlobファイルURLを取得し再処理する。
*/
func rerunAllHandler(w http.ResponseWriter, r *http.Request) {
rerunCount := 0
successCount := 0
defer func(){
errorCount := rerunCount - successCount
err := recover()
if err != nil {
printError("ERROR rerunAllHandler, Success: %d, Error: %d, %v", successCount, errorCount, err)
} else if (successCount < rerunCount) {
printError("ERROR rerunAllHandler, Success: %d, Error: %d", successCount, errorCount)
} else {
printInfo("SUCCESS rerunAllHandler, Success: %d, Error: %d", successCount, errorCount)
}
}()
accountName, accountKey, queueName := getErrorQueueInfo()
// ローカルエミュレータへの接続の時はURLフォーマットを変える
queueUrlFormat := "https://%s.queue.core.windows.net/%s"
if isLocalAccount(accountName) {
queueUrlFormat = "http://127.0.0.1:10001/%s/%s"
}
// キューURLの取得
credential, err := azqueue.NewSharedKeyCredential(accountName, accountKey)
if err != nil {
panic(fmt.Sprintf("NewSharedKeyCredential error: %v", err))
}
u, _ := url.Parse(fmt.Sprintf(queueUrlFormat, accountName, queueName))
queueUrl := azqueue.NewQueueURL(*u, azqueue.NewPipeline(credential, azqueue.PipelineOptions{}))
// メッセージ一覧の取得
queueCtx := context.TODO()
msgUrl := queueUrl.NewMessagesURL()
maxMessages := int32(32) // 最大件数
visibilityTimeout := time.Second * 10 // 可視化タイムアウト
dequeueResp, err := msgUrl.Dequeue(queueCtx, maxMessages, visibilityTimeout)
if err != nil {
panic(err)
} else {
//------------------------------------------------------------
// 全てのメッセージを処理
//------------------------------------------------------------
for i := int32(0); i < dequeueResp.NumMessages(); i++ {
msg := dequeueResp.Message(i)
rerunCount = rerunCount + 1
//------------------------------------------------------------
// メッセージをBase64デコードしてJSON文字列に戻す
//------------------------------------------------------------
eventData, _ := base64.StdEncoding.DecodeString(msg.Text)
//------------------------------------------------------------
// JSON文字列をパースして構造体にする
//------------------------------------------------------------
var poisonData map[string]interface{}
decodeErr := json.Unmarshal(eventData, &poisonData)
if decodeErr != nil {
printError("json decode error: %v",decodeErr.Error())
continue
}
//------------------------------------------------------------
// メッセージに含まれるコンテナ名、Blobファイル名を取得する
//------------------------------------------------------------
containerUrlFormat := "https://%s.blob.core.windows.net/%s"
if isLocalAccount(accountName) {
containerUrlFormat = "http://127.0.0.1:10000/%s/%s"
}
ctx := context.Background()
containerUrlText := fmt.Sprintf(containerUrlFormat, accountName, poisonData["ContainerName"])
credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
if err != nil {
printError("Error azblob.NewSharedKeyCredential: %v", err)
continue
}
fileUrl := fmt.Sprintf("%s/%s", containerUrlText, poisonData["BlobName"])
printInfo("START Rerun. %s", fileUrl)
//------------------------------------------------------------
// Blobファイルのダウンロード
//------------------------------------------------------------
p := azblob.NewPipeline(credential, azblob.PipelineOptions{})
cURL, _ := url.Parse(containerUrlText)
containerURL := azblob.NewContainerURL(*cURL, p)
blobURL := containerURL.NewBlobURL(poisonData["BlobName"].(string))
// サイズを取得
var blobSize int64 = 1024
blobPropResponse, err := blobURL.GetProperties(ctx, azblob.BlobAccessConditions{})
if err != nil {
printError("GetProperties Error!")
continue
} else {
blobSize = blobPropResponse.ContentLength()
}
// バッファに取得
fileData := make([]byte, blobSize)
err = azblob.DownloadBlobToBuffer(ctx, blobURL, 0, azblob.CountToEnd, fileData, azblob.DownloadFromBlobOptions{})
if err != nil {
printError("Download Error. %v", err)
continue
} else {
printDebug("Download Success.")
printDebug(string(fileData))
}
printDebug("fileUrl: %s", fileUrl)
//------------------------------------------------------------
// DB(InfluxDB)にデータ登録
//------------------------------------------------------------
errInsert := insertData(fileUrl, fileData)
if errInsert != nil {
printError("ERROR Rerun. %s, %v", fileUrl, errInsert)
continue
} else {
successCount = successCount + 1
printInfo("SUCCESS Rerun. %s", fileUrl)
// 有害キューからメッセージを削除
msgIdUrl := msgUrl.NewMessageIDURL(msg.ID)
_, err = msgIdUrl.Delete(queueCtx, msg.PopReceipt)
if err != nil {
printError("Error delete poison message for %s (%v)", fileUrl, err)
} else {
printInfo("Success delete poison message for %s", fileUrl)
}
}
printInfo("END Rerun. %s", fileUrl)
}
}
w.Header().Set("Content-Type", "application/json")
w.Write([]byte("{\"message\": \"RerunAll\"}"))
}
/**
* エラー通知.
*/
func blobErrorHandler(w http.ResponseWriter, r *http.Request) {
printInfo("[blobErrorHandler] START")
var invokeReq InvokeRequest
d := json.NewDecoder(r.Body)
decodeErr := d.Decode(&invokeReq)
if decodeErr != nil {
http.Error(w, decodeErr.Error(), http.StatusBadRequest)
return
}
printInfo("[blobErrorHandler] invokeReq: %v", invokeReq)
printInfo("[blobErrorHandler] queue metadata: %v", invokeReq.Metadata)
outputs := make(map[string]interface{})
dequeueCount := fmt.Sprintf("%v",invokeReq.Metadata["DequeueCount"])
if dequeueCount == "1" {
queueItem := invokeReq.Data["queueItem"].(string)
SENDGRID_API_KEY := getEnv("SENDGRID_API_KEY", "")
MAIL_FROM_ADDRESS := getEnv("MAIL_FROM_ADDRESS", "")
MAIL_TO_ADDRESS := getEnv("MAIL_TO_ADDRESS", "")
// 見やすいように過剰エスケープを調整
queueItem = strings.TrimRight(queueItem, "\"")
queueItem = strings.TrimLeft(queueItem, "\"")
queueItem = strings.Replace(queueItem, "\\r\\n", "\n", -1)
queueItem = strings.Replace(queueItem, "\\n", "\n", -1)
queueItem = strings.Replace(queueItem, "\\\"", "\"", -1)
queueItem = strings.Replace(queueItem, "\\\\\"", "", -1)
printInfo("[blobErrorHandler] queue message(Value): %v", queueItem)
from := mail.NewEmail("トリガー異常監視", MAIL_FROM_ADDRESS)
subject := "Blobトリガーでエラーが発生しています"
to := mail.NewEmail(MAIL_TO_ADDRESS, MAIL_TO_ADDRESS)
plainTextContent := fmt.Sprintf("有害キューに出力された情報:\n%s", queueItem)
htmlContent := fmt.Sprintf("有害キューに出力された情報:<br /><pre style=\"border: 1px solid #333; padding: 10px;\">%s</pre>", queueItem)
message := mail.NewSingleEmail(from, subject, to, plainTextContent, htmlContent)
client := sendgrid.NewSendClient(SENDGRID_API_KEY)
response, err := client.Send(message)
if err != nil {
printError("[blobErrorHandler] [MAILSEND ERROR] %v", err)
} else if (response.StatusCode == 202) {
printInfo("[blobErrorHandler] [MAILSEND SUCCESS]")
} else {
printError("[blobErrorHandler] [MAILSEND ERROR] status: %v, body: %v, header: %v", response.StatusCode, response.Body, response.Headers)
}
// リラン用のキューにコピー(正常終了するとキューから削除される為)
outputs["rerunqueue"] = queueItem
}
// 正常終了(キューから削除)
invokeResponse := InvokeResponse{Outputs: outputs, Logs: []string{"Received blob trigger error."} }
js, err := json.Marshal(invokeResponse)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(js)
//http.Error(w, "process blobErrorHandler.", http.StatusInternalServerError)
}
func main() {
httpInvokerPort, exists := os.LookupEnv("FUNCTIONS_HTTPWORKER_PORT")
if exists {
printInfo("FUNCTIONS_HTTPWORKER_PORT: " + httpInvokerPort)
}
mux := http.NewServeMux()
mux.HandleFunc("/BlobTrigger", blobTriggerHandler)
//mux.HandleFunc("/Rerun" , rerunHandler)
mux.HandleFunc("/RerunAll" , rerunAllHandler)
mux.HandleFunc("/ErrorNotice", blobErrorHandler) // <- 追加
log.Println("Go server Listening...on httpInvokerPort:", httpInvokerPort)
log.Fatal(http.ListenAndServe(":"+httpInvokerPort, mux))
}
}}
#html(</div>)
// END tabs1-5
#html(</div>)
// END tabs1
#html(<script>$(function() { $("#tabs1").tabs(); });</script>)
#html(</div>)
* メール通知イメージ [#jb4dc021]
#html(<div class="pl10">)
#html(<div class="images">)
&ref(azure_blob_trigger_error_notice_mail.png,nolink);
#html(</div>)
#html(</div>)