#mynavi(AWSメモ) #mynavi(Azureメモ) #setlinebreak(on); #html(){{ <style> .images img { border: 1px solid #333; } .images div { vertical-align: top; } </style> }} * 概要 [#da86ba60] #html(<div class="pl10 lh15">) Azure Blob のアップロードをトリガーにして起動する処理は、Blobトリガーを使用すれば簡単に実装できる([[Azure Functions を Go で書く]])が、 ここでは Azure Blob のアップロード時に webhook で 他のWebAPI サーバにイベントデータを飛ばす仕組みを構築する。 また、この記事では WebAPI サーバは Azure の仮想マシンに上に構築する。(サーバ自体は今回はGoで実装する) 尚、ファイルのアップロード時に処理を走らせるだけなら Blob トリガーを使った方が簡単(※)で可用性も高いが、この構成には以下の利点もある。 ※ 関連: [[Azure Functions を Go で書く]] - 新規/変更以外のイベントも捕捉できる。(Blobトリガーは新規/変更のイベントのみ) - ファイルのデータ自体にはアクセスする必要がない場合は、通信データ量が減る(単純にイベント履歴が欲しい場合など) - 仮想ネットワーク内のリソースにアクセスする必要がある場合に VNet 統合を使用しなくて良い。 (関数アプリやApp Serviceで VNet統合を使用するには高めのプランを利用する必要がある。) #html(</div>) * 目次 [#t64b850c] #contents - 関連 -- [[Azureメモ]] -- [[Azure Functions を Go で書く]] -- [[GoでAzureのBlobファイルを読み書き]] - 参考 -- [[Azure Event Grid イベントに対するイベント ハンドラーとしての Webhook、Automation Runbook、Logic Apps>https://docs.microsoft.com/ja-jp/azure/event-grid/handler-webhooks]] -- [[Webhook のイベント配信>https://docs.microsoft.com/ja-jp/azure/event-grid/webhook-event-delivery]] -- [[HTTP エンドポイントへのイベントの受信>https://docs.microsoft.com/ja-jp/azure/event-grid/receive-events]] * 構築内容 [#g2495598] #html(<div class="pl10">) 当記事では以下の構成を Azure 上に構築する。 - Blobストレージにファイルがアップロードされたら Webhook によって、VM内に構築したサーバにイベント通知を行う。 - VM 内の nginx は httpsリクエストを受け付けて、Goで実装された WebAPIサーバ(ポート:8000)にリクエストを転送する。&br; (SSL証明書は VM の作成時に Let's Encrypt を使用して自動的に作成する ) - VMのNSG(ネットワークセキュリティグループ)で、Event Grid 以外からのリクエストは通さないように設定する。 #html(<div class="images"><div>) &ref(webhook_image.png,nolink); #html(</div></div>) #html(</div>) * エンドポイントの検証について [#w310fe27] #html(<div class="pl10">) ** 送信される検証イベント [#we9e3418] #html(<div class="pl10">) ストレージアカウントに Webhook を設定すると、設定したエンドポイントにサブスクリプション検証イベントが送信される。 https://docs.microsoft.com/ja-jp/azure/event-grid/webhook-event-delivery#endpoint-validation-with-event-grid-events (検証イベントのイメージ) #mycode3(){{ [{ "id": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", "topic": "/subscriptions/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/...", "subject": "", "data": { "validationCode": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX", "validationUrl": "https://rp-japanwest.eventgrid.azure.net:553/eventsubscriptions/..." }, "eventType": "Microsoft.EventGrid.SubscriptionValidationEvent", "eventTime": "YYYY-MM-DDTHH:MM:SS.SSSSSSSZ", "metadataVersion": "1", "dataVersion": "2" }] }} #html(</div>) このリクエストに対して、以下の何れかのアクションをとる事によって検証が完了する。 ** 同期ハンドシェイク [#mc8bee47] #html(<div class="pl10">) 検証リクエストに対して、以下のレスポンスを返す。 ※ 受信した validationCode をそのまま validationResponse として返却する。 #mycode3(){{ { "validationResponse": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX" } }} #html(</div>) ** 非同期ハンドシェイク [#c23a7513] #html(<div class="pl10">) 受信した validationUrl にブラウザ等からアクセスする。 #html(</div>) &br; という事なのだが、当記事では同期ハンドシェイクに対応出来るように予めWebAPI サーバの実装に処理を組み込む事とする。 ※後述の apiserver.go を参照。 #html(</div>) * 受信するイベントデータについて [#a6b1aed5] #html(<div class="pl10">) イベントスキーマとして "イベントグリッドスキーマ" を選択した場合(後述)、Webhook のエンドポイントには以下のようなリクエストデータが送信されてくる。 ※当記事のサンプルでは、以下に含まれる url の情報を元にストレージコンテナからファイルの内容を取得してログに出力している。 #mycode3(){{ [ { "topic": "/subscriptions/サブスクリプション/resourceGroups/RG名/providers/Microsoft.Storage/storageAccounts/ストレージアカウント名", "subject": "/blobServices/default/containers/ストレージコンテナ名/blobs/sample1.txt", "eventType": "Microsoft.Storage.BlobCreated", "id": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX", "data": { "api": "PutBlob", "clientRequestId": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX", "requestId": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX", "eTag": "0xXXXXXXXXXXXXXXX", "contentType": "text/plain; charset=utf-8", "contentLength": 60, "blobType": "BlockBlob", "url": "https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/sample1.txt", "sequencer": "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX", "storageDiagnostics": { "batchId": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX" } }, "dataVersion": "", "metadataVersion": "1", "eventTime": "2020-09-02T03:09:21.3390888Z" } ] }} #html(</div>) * リソース作成 [#ue84466d] #html(<div class="pl10">) // START tabs1 #html(){{ <div id="tabs1"> <ul> <li><a href="#tabs1-1">0_env.sh</a></li> <li><a href="#tabs1-2">1_create_resources.sh</a></li> <li><a href="#tabs1-3">2_setup_vm.tmpl</a></li> <li><a href="#tabs1-4">apiserver_Dockerfile</a></li> <li><a href="#tabs1-5">apiserver.go</a></li> <li><a href="#tabs1-6">9_upload_file.sh</a></li> <li><a href="#tabs1-7">tests/sample.csv</a></li> </ul> }} // START tabs1-1 #html(<div id="tabs1-1">) #mycode2(){{ #!/bin/bash # 全てのリソース名に付与する接頭文字 (Storageアカウント名などは世界でユニークな必要があるので他ユーザと被らないような名前を付ける) PREFIX=XXXXXXXX # サブスクリプションID subscriptionId=XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX # リージョン region=japanwest insightsRegion=japaneast # 2020/9時点では Appplication Insights で西日本(japanwest)は使用できない # リソースグループ名 resourceGroup=${PREFIX}ResourceGroup # ストレージアカウント名 storageAccountName=${PREFIX}straccount storageSku=Standard_LRS # Storageコンテナ名 storageContainerIn=${PREFIX}-strcontainer-i storageContainerOut=${PREFIX}-strcontainer-o # 仮想ネットワーク名 vnetName=${PREFIX}VNet vnetPrefix=10.1.0.0/16 # ネットワークセキュリティグループ nsgName=${vnetName}SecGrp nsgPubRuleName=${nsgName}PubRule nsgPriRuleName=${nsgName}PriRule # VM用のサブネット vmSubnetName=${PREFIX}VmSubnet vmSubnetPrefix=10.1.1.0/24 # 関数アプリ用のサブネット funcSubnetName=${PREFIX}FuncSubnet funcSubnetPrefix=10.1.2.0/24 # 仮想マシン vmName=${PREFIX}Vm vmImage=UbuntuLTS vmIpAddress=10.1.1.5 vmUser=sample # Application insights insightsName=${PREFIX}Insights insightsDays=30 # Web API server用 registryName=${PREFIX}registry apiServerImage=${PREFIX}-apiserver }} #html(</div>) // END tabs1-1 // START tabs1-2 #html(<div id="tabs1-2">) #mycode2(){{ #!/bin/bash # 設定の読み込み source 0_env.sh # リソース作成 if [ "$1" == "--create" ]; then # リソースグループの作成 echo "az group create" az group create --name $resourceGroup --location $region # ストレージアカウントの作成 echo az storage account create az storage account create \ --name $storageAccountName --location $region \ --resource-group $resourceGroup --sku $storageSku # ストレージアカウントキーの取得 storageAccountKey=`az storage account keys list -g $resourceGroup -n $storageAccountName -o table | grep key1 | awk '{print $3}'` # Storageコンテナの作成 echo "az storage container create(in)" az storage container create \ --name $storageContainerIn --resource-group $resourceGroup --account-name $storageAccountName echo "az storage container create(out)" az storage container create \ --name $storageContainerOut --resource-group $resourceGroup --account-name $storageAccountName # NSG(ネットワークセキュリティグループの)作成 echo "az network nsg create" az network nsg create --resource-group $resourceGroup --name $nsgName # コンテナレジストリ作成 echo "az acr create ( $registryName )" az acr create -n $registryName -g $resourceGroup --sku standard --admin-enabled true # レジストリユーザ名/パスワード取得 acr_credential="`az acr credential show --name $registryName -o table | tail -1`" registryUser="`echo "$acr_credential" | awk '{print $1}'`" registryPwd="`echo "$acr_credential" | awk '{print $2}'`" # API Server のビルド go get github.com/sendgrid/sendgrid-go GOOS=linux GOARCH=amd64 go build -o apiserver # go のビルド環境がない場合 # docker run --rm -v `pwd`:/tmp/work -w /tmp/work -i golang <<_EO_GOBUILD_ # go get github.com/Azure/azure-storage-blob-go/azblob # GOOS=linux GOARCH=amd64 go build -o apiserver #_EO_GOBUILD_ chmod 755 apiserver # docker イメージを作成してACRにプッシュしておく(内容は Dockerfile 参照) az acr build --registry $registryName --image ${apiServerImage} -f apiserver_Dockerfile . # VMセットアップ用スクリプトに環境変数を埋め込み cat 2_vm_setup.tmpl \ | sed "s/##storageAccountName##/${storageAccountName}/g" \ | sed "s~##storageAccountKey##~${storageAccountKey}~g" \ | sed "s/##registryName##/${registryName}/g" \ | sed "s/##registryUser##/${registryUser}/g" \ | sed "s~##registryPwd##~${registryPwd}~g" \ | sed "s/##apiServerImage##/${apiServerImage}/g" > 2_vm_setup.sh chmod 755 2_vm_setup.sh # WebAPIサーバ用のNSGルール(http) echo "az network nsg rule create(http)" az network nsg rule create \ --resource-group $resourceGroup --nsg-name $nsgName --name ${nsgPubRuleName}Http \ --access Allow --protocol Tcp --direction Inbound --priority 120 \ --source-address-prefix Internet --source-port-range "*" --destination-port-range "80" # WebAPIサーバ用のNSGルール(https) echo "az network nsg rule create(https)" az network nsg rule create \ --resource-group $resourceGroup --nsg-name $nsgName --name ${nsgPubRuleName}Https \ --access Allow --protocol Tcp --direction Inbound --priority 130 \ --source-address-prefix Internet --source-port-range "*" --destination-port-range "443" # SSH接続用のNSGルール echo "az network nsg rule create(ssh)" az network nsg rule create \ --resource-group $resourceGroup --nsg-name $nsgName --name ${nsgPubRuleName}Ssh \ --access Allow --protocol Tcp --direction Inbound --priority 1000 \ --source-address-prefix Internet --source-port-range "*" --destination-port-range "22" # 仮想ネットワーク 及び サブネット作成 echo "az network vnet create" az network vnet create \ --name $vnetName --resource-group $resourceGroup \ --address-prefixes $vnetPrefix --network-security-group $nsgName \ --subnet-name $vmSubnetName --subnet-prefixes $vmSubnetPrefix # 仮想マシンの作成 rm -rf ~/.ssh/id_rsa rm -rf ~/.ssh/id_rsa.pub echo "az vm create" az vm create \ --resource-group $resourceGroup --name $vmName --image $vmImage --generate-ssh-keys \ --vnet-name $vnetName --subnet $vmSubnetName \ --private-ip-address $vmIpAddress --admin-username $vmUser \ --public-ip-address-dns-name `echo $vmName | tr '[A-Z]' '[a-z]'` \ --custom-data 2_vm_setup.sh # VMポート開放 #echo "az vm open-port(8086)" #az vm open-port --resource-group $resourceGroup --name $vmName --port 8086 --priority 920 echo "az vm open-port(80)" az vm open-port --resource-group $resourceGroup --name $vmName --port 80 --priority 930 echo "az vm open-port(443)" az vm open-port --resource-group $resourceGroup --name $vmName --port 443 --priority 940 # 生成されたSSH鍵を退避しておく mkdir -p pem if [ -e ~/.ssh/id_rsa ]; then mv ~/.ssh/id_rsa ./pem/id_rsa_${vmName} mv ~/.ssh/id_rsa.pub ./pem/id_rsa_${vmName}.pub fi lowerVmName=`echo $vmName | tr [A-Z] [a-z]` echo "WebAPI Server: https://${lowerVmName}.${region}.cloudapp.azure.com" fi # リソース削除 if [ "$1" == "--delete" ]; then echo "az group delete" az group delete --name $resourceGroup fi }} #html(</div>) // END tabs1-2 // START tabs1-3 #html(<div id="tabs1-3">) VMのセットアップ処理。 ※nginx、API Server コンテナの作成 及び 起動。 #mycode2(){{ #!/bin/bash #-------------------------------------- # create work directory and cd. #-------------------------------------- workdir=/tmp/docker_container apiLogDir=${workdir}/logs/apiserver nginxLogDir=${workdir}/logs/nginx mkdir -p $workdir mkdir -p $apiLogDir mkdir -p $nginxLogDir cd $workdir #-------------------------------------- # install docker and docker compose. #-------------------------------------- apt-get update apt-get install -y apt-transport-https ca-certificates curl gnupg-agent software-properties-common curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add - add-apt-repository \ "deb [arch=amd64] https://download.docker.com/linux/ubuntu \ $(lsb_release -cs) \ stable" apt-get update apt-get install -y docker-ce docker-ce-cli containerd.io curl -L "https://github.com/docker/compose/releases/download/1.26.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose chmod +x /usr/local/bin/docker-compose #-------------------------------------- # set environments. #-------------------------------------- # web server url region=japanwest hostname=`hostname | tr [A-Z] [a-z]` server_name="${hostname}.${region}.cloudapp.azure.com" #-------------------------------------- # wait until azure dns name is available. # (If it is NG after waiting 5 minutes, give up..) #-------------------------------------- for i in `seq 30`; do x=`host $server_name` if [ "$?" == "0" ]; then break fi sleep 10 done #-------------------------------------- # get ssl certificate. #-------------------------------------- sudo docker run --rm -p 80:80 \ -v /etc/letsencrypt:/etc/letsencrypt \ -v /etc/letsencrypt/logs:/var/log/letsencrypt \ certbot/certbot certonly --standalone \ -d $server_name \ --register-unsafely-without-email \ --non-interactive --agree-tos \ --force-renewal \ --renew-by-default \ --preferred-challenges http #-------------------------------------- # create default.conf for nginx. #-------------------------------------- cat <<_EO_DEFAULT_CONF_>default.conf upstream backend { server api_server:8000; } # http server{ listen 80; listen [::]:80; server_name ${server_name}; return 301 https://\$host\$request_uri; } # https server{ listen 443 ssl; listen [::]:443 ssl; server_name ${server_name}; ssl_certificate /etc/letsencrypt/live/${server_name}/fullchain.pem; ssl_certificate_key /etc/letsencrypt/live/${server_name}/privkey.pem; proxy_set_header Host \$host; proxy_set_header X-Real-IP \$remote_addr; proxy_set_header X-Forwarded-Host \$host; proxy_set_header X-Forwarded-Server \$host; proxy_set_header X-Forwarded-For \$proxy_add_x_forwarded_for; location / { proxy_pass http://backend; } } _EO_DEFAULT_CONF_ #-------------------------------------- # set environment for ACR. #-------------------------------------- registryName=##registryName## apiServerImage=##apiServerImage## registryUser=##registryUser## registryPwd=##registryPwd## docker login ${registryName}.azurecr.io -u ${registryUser} -p ${registryPwd} docker pull ${registryName}.azurecr.io/${apiServerImage}:latest #-------------------------------------- # create docker-compose.yml #-------------------------------------- cat <<_EOYML_>docker-compose.yml version: "3" services: api_server: image: ${registryName}.azurecr.io/${apiServerImage}:latest hostname: api_server container_name: api_server ports: - "8000:8000" environment: HTTPWORKER_PORT: "8000" STORAGE_KEY_##storageAccountName##: "##storageAccountKey##" LOG_DIR: "${apiLogDir}" LOG_PREFIX: "apiserver_" networks: mynetwork: ipv4_address: 172.1.0.6 volumes: - ${apiLogDir}:${apiLogDir} restart: always nginx: image: nginx:latest hostname: proxy_container container_name: proxy_container ports: - "80:80" - "443:443" volumes: - ./default.conf:/etc/nginx/conf.d/default.conf - /etc/letsencrypt:/etc/letsencrypt - ${nginxLogDir}:/var/log/nginx networks: mynetwork: ipv4_address: 172.1.0.7 restart: always networks: mynetwork: driver: bridge ipam: config: - subnet: 172.1.0.0/24 _EOYML_ #-------------------------------------- # start containers #-------------------------------------- docker-compose up -d }} #html(</div>) // END tabs1-3 // START tabs1-4 #html(<div id="tabs1-4">) #mycode2(){{ FROM alpine COPY apiserver /apiserver EXPOSE 8000 ENTRYPOINT ["/apiserver"] }} #html(</div>) // END tabs1-4 // START tabs1-5 #html(<div id="tabs1-5">) #mycode2(){{ package main import ( "context" "encoding/json" "fmt" "io" "log" "net/http" "net/url" "os" //"regexp" "strings" "time" "github.com/Azure/azure-storage-blob-go/azblob" //"github.com/microsoft/ApplicationInsights-Go/appinsights" //"github.com/microsoft/ApplicationInsights-Go/appinsights/contracts" ) type InvokeResponse struct { File string } type InvokeRequest struct { Topic string Subject string EventType string EventTime string Id string Data map[string]interface{} DataVersion string MetadataVersion string } /** * 環境変数の取得. */ func getEnv(envName string, defaultValue string) string { value, exists := os.LookupEnv(envName) if exists { return value } else { return defaultValue } } //var telemetryClient appinsights.TelemetryClient func init(){ //telemetryClient = appinsights.NewTelemetryClient("インストルメンテーションキー") setLogfile(true) } func printDebug(format string, params ...interface{}){ setLogfile(false) msg := fmt.Sprintf(format, params...) log.Printf("[DEBUG] %s\n", msg) //telemetryClient.TrackTrace(fmt.Sprintf("[DEBUG] %s", msg), contracts.Verbose) } func printInfo(format string, params ...interface{}){ setLogfile(false) msg := fmt.Sprintf(format, params...) log.Printf("[INFO] %s\n", msg) //telemetryClient.TrackTrace(fmt.Sprintf("[INFO] %s", msg), contracts.Information) } func printError(format string, params ...interface{}){ setLogfile(false) msg := fmt.Sprintf(format, params...) log.Printf("[ERROR] %s\n", msg) //telemetryClient.TrackTrace(fmt.Sprintf("[ERROR] %s", msg), contracts.Error) } func fileExists(name string) bool { _, err := os.Stat(name) return !os.IsNotExist(err) } func setLogfile(init bool){ log.SetFlags(log.Ldate|log.Ltime|log.Lmicroseconds|log.Lshortfile) logDirPath := getEnv("LOG_DIR", "/tmp") logPrefix := getEnv("LOG_PREFIX", "apiserver_") logfilePath := fmt.Sprintf("%s/%s%s.log", logDirPath, logPrefix, time.Now().Format("2006-01-02")) if init || !fileExists(logfilePath) { newlogfile, err := os.OpenFile(logfilePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666) if err != nil { panic("cannnot open " + logfilePath + " : "+ err.Error()) } log.SetOutput(io.MultiWriter(newlogfile, os.Stdout)) printInfo("switch logfile: %s", logfilePath) } } func blobTriggerHandler(w http.ResponseWriter, r *http.Request) { printInfo("START blobTriggerHandler") var event InvokeRequest var fileUrl string defer func(){ err := recover() if fileUrl == "" { fileUrl = "unknown" } if err != nil { printError("Failure process %s , event: %v", fileUrl, event) } else { printInfo("Success process %s", fileUrl) } printInfo("END blobTriggerHandler") }() //------------------------------------------------ // リクエストデータのパース //------------------------------------------------ var invokeReq []InvokeRequest d := json.NewDecoder(r.Body) decodeErr := d.Decode(&invokeReq) if decodeErr != nil { http.Error(w, decodeErr.Error(), http.StatusBadRequest) return } // イベントデータの取得 event = invokeReq[0] // エンドポイントの検証用リクエストの時は validationCode を返して終了(同期ハンドシェイク) if event.EventType == "Microsoft.EventGrid.SubscriptionValidationEvent" { fileUrl = "SubscriptionValidationEvent" w.Header().Set("Content-Type", "application/json") w.Write([]byte(fmt.Sprintf("{\"validationResponse\": \"%s\"}", event.Data["validationCode"]))) return } //------------------------------------------------ // ○○処理. //------------------------------------------------ fileUrl = fmt.Sprintf("%s", event.Data["url"]) sampleProc(fileUrl) // コンテナ毎に処理を切り替える場合 //if regexp.MustCompile(`container1/.+`).MatchString(fileUrl) { // sampleProc1(fileUrl) //} else if regexp.MustCompile(`container2/.+`).MatchString(fileUrl) { // sampleProc2(fileUrl) //} //------------------------------------------------ // レスポンス生成 //------------------------------------------------ invokeResponse := InvokeResponse{File: fileUrl} js, err := json.Marshal(invokeResponse) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") w.Write(js) } /** * ○○処理. */ func sampleProc(fileUrl string) { // 対象のファイルURLをドメイン+コンテナ部分とファイル名部分に分ける urlParts := strings.Split(fileUrl, "/") containerUrlText := fmt.Sprintf("https://%s/%s", urlParts[2], urlParts[3]) rep := strings.NewReplacer(fmt.Sprintf("%s/",containerUrlText), "") fileName := rep.Replace(fileUrl) accountName := strings.Split(urlParts[2], ".")[0] containerName := urlParts[3] // ローカル実行用の環境変数がある場合はそちらを優先 localContainerUrl := getEnv("LOCAL_STORAGE_CONTAINER_URL", "") if localContainerUrl != "" { containerUrlText = fmt.Sprintf("%s/%s/%s", localContainerUrl, accountName, containerName) } printDebug("fileUrl: %s", fileUrl) printDebug("containerUrl: %s", containerUrlText) printDebug("fileName : %s", fileName) // 対象のファイルURLを取得 keyEnvName := fmt.Sprintf("STORAGE_KEY_%s", accountName) accountKey := getEnv(keyEnvName, "") if accountKey == "" { printError("環境変数(%s)が設定されていません。", keyEnvName) return } ctx := context.Background() credential, err := azblob.NewSharedKeyCredential(accountName, accountKey) if err != nil { printError("Get Credential Error: %v", err) } p := azblob.NewPipeline(credential, azblob.PipelineOptions{}) cURL, _ := url.Parse(containerUrlText) containerURL := azblob.NewContainerURL(*cURL, p) blobURL := containerURL.NewBlobURL(fileName) // サイズを取得 var blobSize int64 = 1024 blobPropResponse, err := blobURL.GetProperties(ctx, azblob.BlobAccessConditions{}) if err != nil { printError("Get Properties Error: %v", err) return } else { blobSize = blobPropResponse.ContentLength() } // ファイル内容をバッファに取得 downloadedData := make([]byte, blobSize) err = azblob.DownloadBlobToBuffer(ctx, blobURL, 0, azblob.CountToEnd, downloadedData, azblob.DownloadFromBlobOptions{}) if err != nil { printError("Download Error: %v", err) } else { // ファイル内容を表示 printInfo("Download Success %s", fileUrl) printInfo(string(downloadedData)) } } /** * メイン. */ func main() { httpInvokerPort, exists := os.LookupEnv("HTTPWORKER_PORT") if ! exists { httpInvokerPort = "8000" } printInfo("HTTPWORKER_PORT: " + httpInvokerPort) mux := http.NewServeMux() mux.HandleFunc("/", blobTriggerHandler) printInfo("Go server Listening...on httpInvokerPort: %s", httpInvokerPort) log.Fatal(http.ListenAndServe(":"+httpInvokerPort, mux)) } }} #html(</div>) // END tabs1-5 // START tabs1-6 #html(<div id="tabs1-6">) ファイルアップロード用 #mycode2(){{ #!/bin/bash source 0_env.sh connstr=`az storage account show-connection-string --name $storageAccountName -o table | tail -1` az storage blob upload -f tests/sample.csv -c $storageContainerIn -n sample1.csv --connection-string "${connstr}" }} #html(</div>) // END tabs1-6 // START tabs1-7 #html(<div id="tabs1-7">) 動作確認用のサンプルCSV #mycode2(){{ col1,col2,col3 aaaa,bbbb,cccc bbbb,cccc,dddd cccc,dddd,eeee }} #html(</div>) // END tabs1-7 #html(</div>) // END tabs1 #html(<script>$(function() { $("#tabs1").tabs(); });</script>) ** リソース作成の実行 [#n8627ad1] #myterm2(){{ ./1_create_resources.sh --create }} #html(</div>) * Webフックの設定 [#ccd81ffc] #html(<div class="pl10">) ここまでで Webフック 以外のリソース作成が完了しているので、次はいよいよ Webフック の設定を行う。 ** Azure ポータルから設定する場合 [#xd2df30c] #html(<div class="pl10">) #html(<div class="images"><div>) 対象のストレージアカウントの画面から [イベント] を選択し、[+イベント サブスクリプション] を押下。 &ref(webhook_setting01.png,nolink); #html(</div><div>) 以下の通り入力。 ※webhook の URL には 上記で作成したサーバの URL を記述する。 ※当記事のサンプルソースのまま実施すると 「https://vm名(小文字).リージョン.cloudapp.azure.com」 となる。 &ref(webhook_setting02.png,nolink); #html(</div><div>) 特定のコンテナやファイルのみを処理したい場合は、フィルターを設定する事も可能。 ※複数のコンテナを処理したい場合は、高度なフィルターの方を使用する。 &ref(webhook_filter_setting.png,nolink); #html(</div></div>) #html(</div>) ** Azure CL で設定する場合 [#q5b6df21] #html(<div class="pl10">) 参考: https://docs.microsoft.com/ja-jp/azure/storage/blobs/storage-blob-event-quickstart?toc=/azure/event-grid/toc.json #myterm2(){{ source 0_env.sh event_subscription_name=${storageAccountName}EventSubscrpt lowerVmName=`echo $vmName | tr [A-Z] [a-z]` endpoint_url=https://${lowerVmName}.${region}.cloudapp.azure.com # Event Grid リソース プロバイダーを有効にする az provider register --namespace Microsoft.EventGrid # 有効になるまで待つ while [ true ]; do result=`az provider show --namespace Microsoft.EventGrid --query "registrationState" | grep Registered | wc -l | awk '{print $1}'` if [ "$result" == "1" ]; then break fi sleep 5 done; # ストレージアカウントをサブスクライブする storageid=`az storage account show --name $storageAccountName --resource-group $resourceGroup --query id --output tsv` az eventgrid event-subscription create \ --source-resource-id $storageid \ --name $event_subscription_name \ --endpoint $endpoint_url \ --included-event-types "Microsoft.Storage.BlobCreated" \ --subject-begins-with "/blobServices/default/containers/ストレージコンテナ名" # コンテナ名でフィルタをかける場合 --advanced-filter subject StringBeginsWith "/blobServices/default/containers/ストレージコンテナ名" # 高度なフィルターで設定してもOK(複数可) }} #html(</div>) #html(</div>) * 動作確認 [#x46aacb8] #html(<div class="pl10">) tests/sample.csv をストレージコンテナにアップロードする。 #myterm2(){{ ./9_upload_file.sh }} webhook が機能すると API server のログに以下の内容が出力される。 ※当記事のサンプルではログは VM の /tmp/docker_container/logs/apiserver 配下に出力するようにしている。 #myterm2(){{ 2020/09/03 02:41:14.254662 apiserver.go:72: [INFO] START blobTriggerHandler 2020/09/03 02:41:14.255471 apiserver.go:66: [DEBUG] fileUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/sample1.csv 2020/09/03 02:41:14.255645 apiserver.go:66: [DEBUG] containerUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名 2020/09/03 02:41:14.255827 apiserver.go:66: [DEBUG] fileName : sample1.csv 2020/09/03 02:41:14.539471 apiserver.go:72: [INFO] Download Success https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/sample1.csv 2020/09/03 02:41:14.539819 apiserver.go:72: [INFO] col1,col2,col3 aaaa,bbbb,cccc bbbb,cccc,dddd cccc,dddd,eeee 2020/09/03 02:41:14.540434 apiserver.go:72: [INFO] Success process https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/sample1.csv 2020/09/03 02:41:14.540642 apiserver.go:72: [INFO] END blobTriggerHandler }} #html(</div>) #html(</div>) * VMのNSG(ネットワークセキュリティグループ)の変更 [#mb7bf23f] #html(<div class="pl10">) ここまでで構築したVMはインターネット上に公開されており、全ての送信元からのリクエストが受け入れられる状態になっている。 今回用意したAPIサーバは Event Grid からの Webhook専用なので、イベントデータの送信元を制限しておく。 リソース作成用のシェル(1_create_resources.sh) では 「az vm open-port」で VMのポート開放を行っているが、 このコマンドでは「VM用のNSGの作成」と「ルールの追加」が行われており、作成される NSG 及び ルールの名前は以下の通りとなる。 | リソース | 命名 | 例 |h | NSG名 | VM名+"NSG" | myvmNSG | | NSGルール | "open-port-" + ポート番号 | open-port-80 | 以降では、このルールを変更して 「EventGrid からのリクエストのみを受け付ける」 ように変更する。 ** Azure ポータルから行う場合 [#l7e53d9c] #html(<div class="pl10">) #html(<div class="images"><div>) 対象のNSGの画面から [受信セキュリティ規則] を選択。 &ref(webhook_nsg_update01.png,nolink); #html(</div><div>) 以下の通り変更する。(変更はできないので削除してから追加する) 変更前の状態 &ref(webhook_nsg_update02.png,nolink); #html(</div><div>) 変更後の状態 &ref(webhook_nsg_update03.png,nolink); #html(</div></div>) #html(</div>) ** Azure CLI で行う場合 [#u92786cb] #html(<div class="pl10">) #myterm2(){{ source 0_env.sh az network nsg rule delete -g $resourceGroup --nsg-name ${vmName}NSG --name open-port-80 az network nsg rule create -g $resourceGroup --nsg-name ${vmName}NSG --name open-port-80 \ --protocol '*' --direction inbound --priority 930 \ --source-address-prefix AzureEventGrid --source-port-range '*' \ --destination-address-prefix '*' --destination-port-range 80 --access allow az network nsg rule delete -g $resourceGroup --nsg-name ${vmName}NSG --name open-port-443 az network nsg rule create -g $resourceGroup --nsg-name ${vmName}NSG --name open-port-443 \ --protocol '*' --direction inbound --priority 940 \ --source-address-prefix AzureEventGrid --source-port-range '*' \ --destination-address-prefix '*' --destination-port-range 443 --access allow }} #html(</div>) #html(</div>) * 再試行について [#ke2ec5bb] #html(<div class="pl10">) [[Event Grid のメッセージの配信と再試行]] にも記載している為、そちらも参照。 既定では 24 時間以内 または 最大30回 の再試行が行われる。 ※参考: [[再試行のスケジュールと期間>https://docs.microsoft.com/ja-jp/azure/event-grid/delivery-and-retry#retry-schedule-and-duration]] 成功とみなされるレスポンスは以下の通りで、これ以外は全て失敗とみなされて再試行が行われる。 ※ 参考: [[メッセージの配信状態>https://docs.microsoft.com/ja-jp/azure/event-grid/delivery-and-retry#message-delivery-status]] | HTTPステータス | 説明 |h | 200 | OK | | 201 | Created | | 202 | Accepted | | 203 | 権限のない情報 | | 204 | No Content | ** 注意点 [#x2c89914] #html(<div class="pl10">) 上記の 参考URL([[再試行のスケジュールと期間>https://docs.microsoft.com/ja-jp/azure/event-grid/delivery-and-retry#retry-schedule-and-duration]]) の記載で気にしておかなければいけないのが以下の記述で、 ''同じデータが流れてくる可能性があるので、受け側は冪等性を持たせる実装をしておく必要がある。'' #html(){{ <div class="ib" style="padding: 10px; background: #eee; border: 1px solid #333;"> > エンドポイントが 3 分以内に応答した場合、Event Grid はベスト エフォート方式でイベントを再試行キューから削除しようとしますが、それでも重複が受信される可能性があります。 </div> }} #html(</div>) &br; 尚、有効期限、及び 最大再試行回数はイベントサブスクリプションの追加時に [追加の機能] タブからカスタマイズする事もできる。 ※ただし、細かな再試行スケジュールは指定できない。 #html(<div class="images"><div>) &ref(webhook_setting03.png,nolink); #html(</div></div>) #html(</div>) * Application Insights へのログの連携 [#gb4f1316] #html(<div class="pl10">) [[ApplicationInsights-Go>https://github.com/microsoft/ApplicationInsights-Go]] を使用すれば Application Insights にログを連携できるので、関数アプリや App Service アプリと同じように監視や通知ができる。 ※関連: [[Azure Functions の異常を検知する]] Application Insights 作成 #myterm2(){{ source 0_env.sh # Application Insights 拡張が利用できない場合は追加インストール x=`az monitor app-insights --help 2>&1` if [ "$?" != "0" ]; then az extension add -n application-insights fi # Application Insights コンポーネント作成 if [ "$subscriptionId" != "" ]; then echo az monitor app-insights component create az monitor app-insights component create \ --app $insightsName \ --location $insightsRegion \ --resource-group $resourceGroup \ --query-access Enabled \ --retention-time $insightsDays \ --subscription $subscriptionId fi # インストルメンテーションキーの取得 echo "instrumentationKey: `az monitor app-insights component show -g $resourceGroup -a $insightsName | grep instrumentationKey | awk '{print $2}' | sed -E 's/[",]//g'`" }} go ライブラリのインストール #myterm2(){{ go get github.com/microsoft/ApplicationInsights-Go/appinsights }} apiserver.go の変更 #mycode(){{ import ( // 以下を追加 "github.com/microsoft/ApplicationInsights-Go/appinsights" "github.com/microsoft/ApplicationInsights-Go/appinsights/contracts" ) var telemetryClient appinsights.TelemetryClient // 追加 func init(){ telemetryClient = appinsights.NewTelemetryClient("上記の Insights 作成時に表示された instrumentationKey") // 追加 setLogfile(true) } func printDebug(format string, params ...interface{}){ : telemetryClient.TrackTrace(fmt.Sprintf("[DEBUG] %s", msg), contracts.Verbose) // 追加 } func printInfo(format string, params ...interface{}){ : telemetryClient.TrackTrace(fmt.Sprintf("[INFO] %s", msg), contracts.Information) // 追加 } func printError(format string, params ...interface{}){ : telemetryClient.TrackTrace(fmt.Sprintf("[ERROR] %s", msg), contracts.Error) // 追加 } }} #html(</div>) * (おまけ) ローカルでの実行方法 [#l892a100] #html(<div class="pl10">) Azurite用のアカウント名やキー、接続文字列は決まっているので、それを使用して起動するだけ。 https://docs.microsoft.com/ja-jp/azure/storage/common/storage-configure-connection-string#configure-a-connection-string-for-azurite 事前に azurite のインストールは必要。 ※ https://docs.microsoft.com/ja-jp/azure/storage/common/storage-use-azurite // START tabs1 #html(){{ <div id="tabs2"> <ul> <li><a href="#tabs2-1">X_local_start.sh</a></li> <li><a href="#tabs2-2">X_local_upload.sh</a></li> <li><a href="#tabs2-3">tests/local_event.tmpl</a></li> </ul> }} // START tabs2-1 #html(<div id="tabs2-1">) X_local_start.sh #mycode2(){{ #!/bin/bash # リソース名一式を読み込み source 0_env.sh # ローカルエミュレータ(azurite用のアカウント名 及び キー) localAccountName=devstoreaccount1 localAccountKey="Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==" # azurite起動 mkdir -p local_azurite azurite --silent --location `pwd`/local_azurite & sleep 5 # ストレージコンテナ作成 export AZURE_STORAGE_CONNECTION_STRING="UseDevelopmentStorage=true" az storage container create -n $storageContainerIn az storage container create -n $storageContainerOut # API サーバ起動 mkdir -p logs export LOG_DIR=`pwd`/logs export LOG_PREFIX=apiserver_ export STORAGE_KEY_${localAccountName}=${localAccountKey} export LOCAL_STORAGE_CONTAINER_URL=http://127.0.0.1:10000 go run apiserver.go # azurite終了 kill -9 `ps | grep azurite | head -1 | awk '{print $1}'` # ローカル用の環境変数をクリアしておく export AZURE_STORAGE_CONNECTION_STRING= }} #html(</div>) // END tabs2-1 // START tabs2-2 #html(<div id="tabs2-2">) X_local_upload.sh #mycode2(){{ #!/bin/bash source 0_env.sh container=$storageContainerIn filename=tests/sample.csv # ファイルをazurite(エミュレータ)にアップロード export AZURE_STORAGE_CONNECTION_STRING="UseDevelopmentStorage=true" az storage blob upload -f $filename -c $container -n $filename # ローカル用のイベントデータ作成 cat tests/local_event.tmpl \ | sed "s/###storageContainer###/${container}/g" \ | sed "s~###filename###~${filename}~g" >tests/local_event.json # 疑似Webhookイベント発行 curl -v -H "Content-Type: application/json" -d @tests/local_event.json http://localhost:8000/ export AZURE_STORAGE_CONNECTION_STRING= }} #html(</div>) // END tabs2-2 // START tabs2-3 #html(<div id="tabs2-3">) tests/local_event.tmpl #mycode2(){{ [ { "topic": "/subscriptions/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/resourceGroups/xxxxxxxx/providers/Microsoft.Storage/storageAccounts/devstoreaccount1", "subject": "/blobServices/default/containers/###storageContainer###/blobs/###filename###", "eventType": "Microsoft.Storage.BlobCreated", "id": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", "data": { "api": "PutBlob", "clientRequestId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", "requestId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", "eTag": "0xAAAAAAAAAAA", "contentType": "text/plain; charset=utf-8", "contentLength": 60, "blobType": "BlockBlob", "url": "https://devstoreaccount1.blob.core.windows.net/###storageContainer###/###filename###", "sequencer": "00000000000000000000000000000E6E0000000000ff188d", "storageDiagnostics": { "batchId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxxx" } }, "dataVersion": "", "metadataVersion": "1", "eventTime": "2020-09-02T03:09:21.3390888Z" } ] }} #html(</div>) // END tabs2-3 #html(</div>) // END tabs2 #html(<script>$(function() { $("#tabs2").tabs(); });</script>) ** ローカルでサーバ起動 [#w0896654] #myterm2(){{ ./X_local_start.sh }} ** ローカルのエミュレータにファイルアップロード [#c6444d1b] #myterm2(){{ ./X_local_upload.sh }} #html(</div>)