概要 †Azure FunctionsからVMにアクセスする では、ストレージコンテナにアップロードされたCSVを VMで稼働する InfluxDB に登録する処理を作成したが、 エラー時のイメージ
リトライのイメージ
目次 †実装サンプル †以下に記載がないファイルは Azure FunctionsからVMにアクセスする と同じ { "bindings": [ { "type": "httpTrigger", "direction": "in", "name": "req", "methods": [ "get", "post" ] }, { "type": "http", "direction": "out", "name": "res" } ] } 以下、ACCOUNT_NAME 及び KEY は local のエミュレータ(Azurite)のもの。 { "IsEncrypted": false, "Values": { "AzureWebJobsStorage": "UseDevelopmentStorage=true", "DB_HOST": "localhost", "DB_PORT": "8086", "DB_NAME": "sampledb", "DB_USER": "sample", "DB_PW": "sample", "ACCOUNT_NAME": "devstoreaccount1", "ACCOUNT_KEY": "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==", "ACCOUNT_QUEUE": "webjobs-blobtrigger-poison" } } package main import ( "context" "encoding/base64" "encoding/json" "fmt" "log" "net/http" "net/url" "os" "time" "strconv" "strings" "github.com/Azure/azure-storage-blob-go/azblob" "github.com/Azure/azure-storage-queue-go/azqueue" "github.com/influxdata/influxdb-client-go" ) type ReturnValue struct { Data string } type InvokeResponse struct { Outputs map[string]interface{} Logs []string ReturnValue interface{} } type InvokeRequest struct { Data map[string]interface{} Metadata map[string]interface{} } func printDebug(format string, params ...interface{}){ log.SetOutput(os.Stdout) msg := fmt.Sprintf(format, params...) log.Printf("[DEBUG] %s\n", msg) } func printInfo(format string, params ...interface{}){ log.SetOutput(os.Stdout) msg := fmt.Sprintf(format, params...) log.Printf("[INFO] %s\n", msg) } func printError(format string, params ...interface{}){ log.SetOutput(os.Stderr) msg := fmt.Sprintf(format, params...) log.Printf("[ERROR] %s\n", msg) log.SetOutput(os.Stdout) } func init(){ log.SetOutput(os.Stdout) log.SetFlags(0) } func isLocalAccount(accountName string) bool { return accountName == "devstoreaccount1" } /** * アップロードされたBlobファイル(CSV)の内容をInfluxDBに登録する. */ func blobTriggerHandler(w http.ResponseWriter, r *http.Request) { printInfo("START blobTriggerHandler") fileUrl := "" defer func(){ err := recover() if fileUrl == "" { fileUrl = "unknown" } if err != nil { panic(fmt.Sprintf("ERROR blobTriggerHandler %s, %v\n", fileUrl, err)); //http.Error(w, err.Error(), http.StatusInternalServerError) } else { printInfo("Result: Success %s", fileUrl) } }() logs := make([]string, 0) // リクエストデータ取得 var invokeReq InvokeRequest d := json.NewDecoder(r.Body) decodeErr := d.Decode(&invokeReq) if decodeErr != nil { http.Error(w, decodeErr.Error(), http.StatusBadRequest) return } fileUrl = strings.Replace(invokeReq.Metadata["Uri"].(string), "\"", "", -1) // "が含まれているので除去 fileData, _ := base64.StdEncoding.DecodeString(invokeReq.Data["blobData"].(string)) // DB(InfluxDB)にデータ登録 errInsert := insertData(fileUrl, fileData) if errInsert != nil { panic(errInsert) } // レスポンスデータ設定 invokeResponse := InvokeResponse{Logs: logs} js, err := json.Marshal(invokeResponse) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") w.Write(js) } /** * 環境変数の取得. */ func getEnv(envName string, defaultValue string) string { value, exists := os.LookupEnv(envName) if exists { return value } else { return defaultValue } } /** * DBクライアント取得. */ func getDbClient() (influxdb2.Client, string) { dbHost := getEnv("DB_HOST", "localhost") dbPort := getEnv("DB_PORT", "8086") dbName := getEnv("DB_NAME", "sampledb") dbUser := getEnv("DB_USER", "sample") dbPw := getEnv("DB_PW", "sample") printInfo("http://%s:%s\n", dbHost, dbPort) printInfo("%s:%s\n", dbUser, dbPw) client := influxdb2.NewClient(fmt.Sprintf("http://%s:%s", dbHost, dbPort), fmt.Sprintf("%s:%s", dbUser, dbPw)) return client, dbName } /** * データ登録(InfluxDB). */ func insertData(fileUrl string, fileData []byte) error { printInfo("START insertData %s", fileUrl) var client influxdb2.Client var dbName string //var index int defer func(){ //err := recover() if client != nil { client.Close() } //if err != nil { // panic(fmt.Sprintf("Error insertData %v, line: %d", err, index)) //} }() rows := parseCsv(string(fileData)) client, dbName = getDbClient() writeAPI := client.WriteAPIBlocking("", fmt.Sprintf("%s/autogen", dbName)) // DB登録 for i, row := range rows { printDebug("line: %d, data: %v", i, row) //index = i + 1 rowtime, err0 := time.Parse("2006-01-02 15:04:05.000-0700", fmt.Sprintf("%s+0900",row["time"])) if err0 != nil { return err0 } col1, err1 := strconv.ParseFloat(row["col1"], 64) if err1 != nil { return err1 } col2, err2 := strconv.ParseFloat(row["col2"], 64) if err2 != nil { return err2 } col3, err3 := strconv.ParseFloat(row["col3"], 64) if err3 != nil { return err3 } p := influxdb2.NewPointWithMeasurement("sample"). AddTag("file", fileUrl). AddField("col1", col1). AddField("col2", col2). AddField("col3", col3). SetTime(rowtime) dberr := writeAPI.WritePoint(context.Background(), p) if dberr != nil { //panic(fmt.Sprintf("DB Write ERROR: %v", dberr)) return dberr } else { printInfo("DB Write SUCCESS. line: %d", i + 1) } } printInfo("END insertData %s", fileUrl) return nil } /** * CSV文字列のパース. */ func parseCsv(csvText string) ([]map[string]string) { printInfo("START parseCsv") procIndex := -1 defer func(){ err := recover() if err != nil { printError("error: file: %s, line: %d, %v", procIndex, err) panic("parseCsv Error!\n"); } }() lines := strings.Split(csvText, "\n") var columns []string rows := make([]map[string]string, 0) for i, line := range lines { if line == "" { break } procIndex = i if i == 0 { columns = strings.Split(line, ",") } else { values := strings.Split(line, ",") row := make(map[string]string, len(values)) for j, val := range values { // ヘッダの列数より多い時はコケるようにしておく colname := columns[j] row[colname] = val } rows = append(rows, row) } } printInfo("END parseCsv") return rows } /** * アカウント情報の取得 */ func getErrorQueueInfo() (string, string, string) { // デフォルトはローカルのエミュレータ(Azurite) accountName := getEnv("ACCOUNT_NAME" , "") accountKey := getEnv("ACCOUNT_KEY" , "") queueName := getEnv("ACCOUNT_QUEUE", "") return accountName, accountKey, queueName } /** * 有害キューに溜まっている全てのメッセージからBlobファイルURLを取得し再処理する。 */ func rerunAllHandler(w http.ResponseWriter, r *http.Request) { rerunCount := 0 successCount := 0 defer func(){ errorCount := rerunCount - successCount err := recover() if err != nil { printError("ERROR rerunAllHandler, Success: %d, Error: %d, %v", successCount, errorCount, err) } else if (successCount < rerunCount) { printError("ERROR rerunAllHandler, Success: %d, Error: %d", successCount, errorCount) } else { printInfo("SUCCESS rerunAllHandler, Success: %d, Error: %d", successCount, errorCount) } }() accountName, accountKey, queueName := getErrorQueueInfo() // ローカルエミュレータへの接続の時はURLフォーマットを変える queueUrlFormat := "https://%s.queue.core.windows.net/%s" if isLocalAccount(accountName) { queueUrlFormat = "http://127.0.0.1:10001/%s/%s" } // キューURLの取得 credential, err := azqueue.NewSharedKeyCredential(accountName, accountKey) if err != nil { panic(fmt.Sprintf("NewSharedKeyCredential error: %v", err)) } u, _ := url.Parse(fmt.Sprintf(queueUrlFormat, accountName, queueName)) queueUrl := azqueue.NewQueueURL(*u, azqueue.NewPipeline(credential, azqueue.PipelineOptions{})) // メッセージ一覧の取得 queueCtx := context.TODO() msgUrl := queueUrl.NewMessagesURL() maxMessages := int32(32) // 最大件数 visibilityTimeout := time.Second * 10 // 可視化タイムアウト dequeueResp, err := msgUrl.Dequeue(queueCtx, maxMessages, visibilityTimeout) if err != nil { panic(err) } else { //------------------------------------------------------------ // 全てのメッセージを処理 //------------------------------------------------------------ for i := int32(0); i < dequeueResp.NumMessages(); i++ { msg := dequeueResp.Message(i) rerunCount = rerunCount + 1 //------------------------------------------------------------ // メッセージをBase64デコードしてJSON文字列に戻す //------------------------------------------------------------ eventData, _ := base64.StdEncoding.DecodeString(msg.Text) //------------------------------------------------------------ // JSON文字列をパースして構造体にする //------------------------------------------------------------ var poisonData map[string]interface{} decodeErr := json.Unmarshal(eventData, &poisonData) if decodeErr != nil { printError("json decode error: %v",decodeErr.Error()) continue } //------------------------------------------------------------ // メッセージに含まれるコンテナ名、Blobファイル名を取得する //------------------------------------------------------------ containerUrlFormat := "https://%s.blob.core.windows.net/%s" if isLocalAccount(accountName) { containerUrlFormat = "http://127.0.0.1:10000/%s/%s" } ctx := context.Background() containerUrlText := fmt.Sprintf(containerUrlFormat, accountName, poisonData["ContainerName"]) credential, err := azblob.NewSharedKeyCredential(accountName, accountKey) if err != nil { printError("Error azblob.NewSharedKeyCredential: %v", err) continue } fileUrl := fmt.Sprintf("%s/%s", containerUrlText, poisonData["BlobName"]) printInfo("START Rerun. %s", fileUrl) //------------------------------------------------------------ // Blobファイルのダウンロード //------------------------------------------------------------ p := azblob.NewPipeline(credential, azblob.PipelineOptions{}) cURL, _ := url.Parse(containerUrlText) containerURL := azblob.NewContainerURL(*cURL, p) blobURL := containerURL.NewBlobURL(poisonData["BlobName"].(string)) // サイズを取得 var blobSize int64 = 1024 blobPropResponse, err := blobURL.GetProperties(ctx, azblob.BlobAccessConditions{}) if err != nil { printError("GetProperties Error!") continue } else { blobSize = blobPropResponse.ContentLength() } // バッファに取得 fileData := make([]byte, blobSize) err = azblob.DownloadBlobToBuffer(ctx, blobURL, 0, azblob.CountToEnd, fileData, azblob.DownloadFromBlobOptions{}) if err != nil { printError("Download Error. %v", err) continue } else { printDebug("Download Success.") printDebug(string(fileData)) } printDebug("fileUrl: %s", fileUrl) //------------------------------------------------------------ // DB(InfluxDB)にデータ登録 //------------------------------------------------------------ errInsert := insertData(fileUrl, fileData) if errInsert != nil { printError("ERROR Rerun. %s, %v", fileUrl, errInsert) continue } else { successCount = successCount + 1 printInfo("SUCCESS Rerun. %s", fileUrl) // 有害キューからメッセージを削除 msgIdUrl := msgUrl.NewMessageIDURL(msg.ID) _, err = msgIdUrl.Delete(queueCtx, msg.PopReceipt) if err != nil { printError("Error delete poison message for %s (%v)", fileUrl, err) } else { printInfo("Success delete poison message for %s", fileUrl) } } printInfo("END Rerun. %s", fileUrl) } } w.Header().Set("Content-Type", "application/json") w.Write([]byte("{\"message\": \"RerunAll\"}")) } func main() { httpInvokerPort, exists := os.LookupEnv("FUNCTIONS_HTTPWORKER_PORT") if exists { printInfo("FUNCTIONS_HTTPWORKER_PORT: " + httpInvokerPort) } mux := http.NewServeMux() mux.HandleFunc("/BlobTrigger", blobTriggerHandler) //mux.HandleFunc("/Rerun" , rerunHandler) mux.HandleFunc("/RerunAll" , rerunAllHandler) log.Println("Go server Listening...on httpInvokerPort:", httpInvokerPort) log.Fatal(http.ListenAndServe(":"+httpInvokerPort, mux)) } 注意事項 †上記の実装はキューに溜まっている複数のメッセージ(最大:32)を取得して、リランする実装になっているが、 場合によっては、HTTPトリガーに対して複数回のリクエストを送信して滞留メッセージを処理するような設計が必要 になるかもしれない。 動作確認 †以下ローカルでの動作確認手順。 ローカルで Azurite 及び 関数アプリを起動 mkdir -p local_azurite azurite --silent --location `pwd`/local_azurite & ストレージコンテナ作成 export AZURE_STORAGE_CONNECTION_STRING="UseDevelopmentStorage=true" az storage container create -n ストレージコンテナ名 Goサーバ起動 exefile=`cat host.json | grep defaultExecutablePath | awk '{print $2}' | sed 's/"//g'` go build -o $exefile func start InfluxDBをいったん止める。 docker stop local_influxdb ストレージエミュレータの対象コンテナに以下のCSVファイルをアップロードする。 sample1.csv time,col1,col2,col3 2020-09-01 10:04:01.000,10.1,12.4,15.45 2020-09-01 10:04:01.200,11.34,14.11,10.87 2020-09-01 10:04:01.400,9.76,13.53,15.12 webjobs-blobtrigger-poison キューにメッセージが登録されている事を確認。 InfluxDBを起動する。 docker start local_influxdb リラン指示を発行 curl -H "Content-Type: application/json" -d "{}" http://localhost:ポート/api/RerunAll azure上の関数のURL †azure上の関数のURLは https://関数アプリ名.azurewebsites.net/api/RerunAll?code=XXXXXXXXXXXXXXXXXX となる。 code は Azure ポータルの [アプリ キー] から確認するか、Azure CLI で以下を実行する事で取得可能。 az functionapp keys list -n 関数アプリ名 -g リソースグループ名 | grep masterKey | awk '{print $2}' | sed -E 's/("|,)//g' InfluxDBに対象ファイルのデータが登録されている事を確認。 > select * from sample name: sample time col1 col2 col3 file ---- ---- ---- ---- ---- 1598922241000000000 10.1 12.4 15.45 http://127.0.0.1:10000/devstoreaccount1/コンテナ名/sample1.csv 1598922241200000000 11.34 14.11 10.87 http://127.0.0.1:10000/devstoreaccount1/コンテナ名/sample1.csv 1598922241400000000 9.76 13.53 15.12 http://127.0.0.1:10000/devstoreaccount1/コンテナ名/sample1.csv |