概要

Azure Blob のアップロードをトリガーにして起動する処理は、Blobトリガーを使用すれば簡単に実装できる(Azure Functions を Go で書く)が、
ここでは Azure Blob のアップロード時に webhook で 他のWebAPI サーバにイベントデータを飛ばす仕組みを構築する。
また、この記事では WebAPI サーバは Azure の仮想マシンに上に構築する。(サーバ自体は今回はGoで実装する)

尚、ファイルのアップロード時に処理を走らせるだけなら Blob トリガーを使った方が簡単(※)で可用性も高いが、この構成には以下の利点もある。
※ 関連: Azure Functions を Go で書く

  • 新規/変更以外のイベントも捕捉できる。(Blobトリガーは新規/変更のイベントのみ)
  • ファイルのデータ自体にはアクセスする必要がない場合は、通信データ量が減る(単純にイベント履歴が欲しい場合など)
  • 仮想ネットワーク内のリソースにアクセスする必要がある場合に VNet 統合を使用しなくて良い。
    (関数アプリやApp Serviceで VNet統合を使用するには高めのプランを利用する必要がある。)

目次

構築内容

当記事では以下の構成を Azure 上に構築する。

  • Blobストレージにファイルがアップロードされたら Webhook によって、VM内に構築したサーバにイベント通知を行う。
  • VM 内の nginx は httpsリクエストを受け付けて、Goで実装された WebAPIサーバ(ポート:8000)にリクエストを転送する。
    (SSL証明書は VM の作成時に Let's Encrypt を使用して自動的に作成する )
  • VMのNSG(ネットワークセキュリティグループ)で、Event Grid 以外からのリクエストは通さないように設定する。

webhook_image.png

エンドポイントの検証について

送信される検証イベント

ストレージアカウントに Webhook を設定すると、設定したエンドポイントにサブスクリプション検証イベントが送信される。
https://docs.microsoft.com/ja-jp/azure/event-grid/webhook-event-delivery#endpoint-validation-with-event-grid-events

(検証イベントのイメージ)

[{
  "id": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
  "topic": "/subscriptions/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/...",
  "subject": "",
  "data": {
    "validationCode": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
    "validationUrl": "https://rp-japanwest.eventgrid.azure.net:553/eventsubscriptions/..."
  },
  "eventType": "Microsoft.EventGrid.SubscriptionValidationEvent",
  "eventTime": "YYYY-MM-DDTHH:MM:SS.SSSSSSSZ",
  "metadataVersion": "1",
  "dataVersion": "2"
}]

このリクエストに対して、以下の何れかのアクションをとる事によって検証が完了する。

同期ハンドシェイク

検証リクエストに対して、以下のレスポンスを返す。
※ 受信した validationCode をそのまま validationResponse として返却する。

{
  "validationResponse": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"
}

非同期ハンドシェイク

受信した validationUrl にブラウザ等からアクセスする。



という事なのだが、当記事では同期ハンドシェイクに対応出来るように予めWebAPI サーバの実装に処理を組み込む事とする。
※後述の apiserver.go を参照。

受信するイベントデータについて

イベントスキーマとして "イベントグリッドスキーマ" を選択した場合(後述)、Webhook のエンドポイントには以下のようなリクエストデータが送信されてくる。
※当記事のサンプルでは、以下に含まれる url の情報を元にストレージコンテナからファイルの内容を取得してログに出力している。

[
  {
    "topic": "/subscriptions/サブスクリプション/resourceGroups/RG名/providers/Microsoft.Storage/storageAccounts/ストレージアカウント名",
    "subject": "/blobServices/default/containers/ストレージコンテナ名/blobs/sample1.txt",
    "eventType": "Microsoft.Storage.BlobCreated",
    "id": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
    "data": {
      "api": "PutBlob",
      "clientRequestId": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
      "requestId": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
      "eTag": "0xXXXXXXXXXXXXXXX",
      "contentType": "text/plain; charset=utf-8",
      "contentLength": 60,
      "blobType": "BlockBlob",
      "url": "https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/sample1.txt",
      "sequencer": "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
      "storageDiagnostics": {
        "batchId": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"
      }
    },
    "dataVersion": "",
    "metadataVersion": "1",
    "eventTime": "2020-09-02T03:09:21.3390888Z"
  }
]

リソース作成

#!/bin/bash

# 全てのリソース名に付与する接頭文字 (Storageアカウント名などは世界でユニークな必要があるので他ユーザと被らないような名前を付ける)
PREFIX=XXXXXXXX

# サブスクリプションID
subscriptionId=XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX

# リージョン
region=japanwest
insightsRegion=japaneast # 2020/9時点では Appplication Insights で西日本(japanwest)は使用できない

# リソースグループ名
resourceGroup=${PREFIX}ResourceGroup

# ストレージアカウント名
storageAccountName=${PREFIX}straccount
storageSku=Standard_LRS

# Storageコンテナ名
storageContainerIn=${PREFIX}-strcontainer-i
storageContainerOut=${PREFIX}-strcontainer-o

# 仮想ネットワーク名
vnetName=${PREFIX}VNet
vnetPrefix=10.1.0.0/16

# ネットワークセキュリティグループ
nsgName=${vnetName}SecGrp
nsgPubRuleName=${nsgName}PubRule
nsgPriRuleName=${nsgName}PriRule

# VM用のサブネット
vmSubnetName=${PREFIX}VmSubnet
vmSubnetPrefix=10.1.1.0/24

# 関数アプリ用のサブネット
funcSubnetName=${PREFIX}FuncSubnet
funcSubnetPrefix=10.1.2.0/24

# 仮想マシン
vmName=${PREFIX}Vm
vmImage=UbuntuLTS
vmIpAddress=10.1.1.5
vmUser=sample

# Application insights
insightsName=${PREFIX}Insights
insightsDays=30

# Web API server用
registryName=${PREFIX}registry
apiServerImage=${PREFIX}-apiserver
#!/bin/bash

# 設定の読み込み
source 0_env.sh

# リソース作成
if [ "$1" == "--create" ]; then

  # リソースグループの作成
  echo "az group create"
  az group create --name $resourceGroup --location $region

  # ストレージアカウントの作成
  echo az storage account create
  az storage account create \
    --name $storageAccountName --location $region \
    --resource-group $resourceGroup --sku $storageSku

  # ストレージアカウントキーの取得
  storageAccountKey=`az storage account keys list -g $resourceGroup -n $storageAccountName -o table | grep key1 | awk '{print $3}'`

  # Storageコンテナの作成
  echo "az storage container create(in)"
  az storage container create \
    --name $storageContainerIn --resource-group $resourceGroup --account-name $storageAccountName
  echo "az storage container create(out)"
  az storage container create \
    --name $storageContainerOut --resource-group $resourceGroup --account-name $storageAccountName

  # NSG(ネットワークセキュリティグループの)作成
  echo "az network nsg create"
  az network nsg create --resource-group $resourceGroup --name $nsgName

  # コンテナレジストリ作成
  echo "az acr create ( $registryName )"
  az acr create -n $registryName -g $resourceGroup --sku standard --admin-enabled true

  # レジストリユーザ名/パスワード取得
  acr_credential="`az acr credential show --name $registryName -o table | tail -1`"
  registryUser="`echo "$acr_credential" | awk '{print $1}'`"
  registryPwd="`echo "$acr_credential" | awk '{print $2}'`"

  # API Server のビルド
  go get github.com/sendgrid/sendgrid-go
  GOOS=linux GOARCH=amd64 go build -o apiserver
  # go のビルド環境がない場合
  #  docker run --rm -v `pwd`:/tmp/work -w /tmp/work -i golang <<_EO_GOBUILD_
  #  go get github.com/Azure/azure-storage-blob-go/azblob
  #  GOOS=linux GOARCH=amd64 go build -o apiserver
  #_EO_GOBUILD_
  chmod 755 apiserver

  # docker イメージを作成してACRにプッシュしておく(内容は Dockerfile 参照)
  az acr build --registry $registryName --image ${apiServerImage} -f apiserver_Dockerfile .

  # VMセットアップ用スクリプトに環境変数を埋め込み
  cat 2_vm_setup.tmpl \
      | sed "s/##storageAccountName##/${storageAccountName}/g" \
      | sed "s~##storageAccountKey##~${storageAccountKey}~g" \
      | sed "s/##registryName##/${registryName}/g" \
      | sed "s/##registryUser##/${registryUser}/g" \
      | sed "s~##registryPwd##~${registryPwd}~g" \
      | sed "s/##apiServerImage##/${apiServerImage}/g" > 2_vm_setup.sh
  chmod 755 2_vm_setup.sh

  # WebAPIサーバ用のNSGルール(http)
  echo "az network nsg rule create(http)"
  az network nsg rule create \
    --resource-group $resourceGroup --nsg-name $nsgName --name ${nsgPubRuleName}Http \
    --access Allow --protocol Tcp --direction Inbound --priority 120 \
    --source-address-prefix Internet --source-port-range "*" --destination-port-range "80"

  # WebAPIサーバ用のNSGルール(https)
  echo "az network nsg rule create(https)"
  az network nsg rule create \
    --resource-group $resourceGroup --nsg-name $nsgName --name ${nsgPubRuleName}Https \
    --access Allow --protocol Tcp --direction Inbound --priority 130 \
    --source-address-prefix Internet --source-port-range "*" --destination-port-range "443"

  # SSH接続用のNSGルール
  echo "az network nsg rule create(ssh)"
  az network nsg rule create \
    --resource-group $resourceGroup --nsg-name $nsgName --name ${nsgPubRuleName}Ssh \
    --access Allow --protocol Tcp --direction Inbound --priority 1000 \
    --source-address-prefix Internet --source-port-range "*" --destination-port-range "22"

  # 仮想ネットワーク 及び サブネット作成
  echo "az network vnet create"
  az network vnet create \
    --name $vnetName --resource-group $resourceGroup \
    --address-prefixes $vnetPrefix --network-security-group $nsgName \
    --subnet-name $vmSubnetName --subnet-prefixes $vmSubnetPrefix

  # 仮想マシンの作成
  rm -rf  ~/.ssh/id_rsa
  rm -rf  ~/.ssh/id_rsa.pub
  echo "az vm create"
  az vm create \
    --resource-group $resourceGroup --name $vmName --image $vmImage --generate-ssh-keys \
    --vnet-name $vnetName --subnet $vmSubnetName \
    --private-ip-address $vmIpAddress --admin-username $vmUser \
    --public-ip-address-dns-name `echo $vmName | tr '[A-Z]' '[a-z]'` \
    --custom-data 2_vm_setup.sh

  # VMポート開放
  #echo "az vm open-port(8086)"
  #az vm open-port --resource-group $resourceGroup --name $vmName --port 8086 --priority 920
  echo "az vm open-port(80)"
  az vm open-port --resource-group $resourceGroup --name $vmName --port 80 --priority 930
  echo "az vm open-port(443)"
  az vm open-port --resource-group $resourceGroup --name $vmName --port 443 --priority 940

  # 生成されたSSH鍵を退避しておく
  mkdir -p pem
  if [ -e ~/.ssh/id_rsa ]; then
    mv ~/.ssh/id_rsa     ./pem/id_rsa_${vmName}
    mv ~/.ssh/id_rsa.pub ./pem/id_rsa_${vmName}.pub
  fi

  lowerVmName=`echo $vmName | tr [A-Z] [a-z]`
  echo "WebAPI Server: https://${lowerVmName}.${region}.cloudapp.azure.com"
fi

# リソース削除
if [ "$1" == "--delete" ]; then
  echo "az group delete"
  az group delete --name $resourceGroup
fi

VMのセットアップ処理。
※nginx、API Server コンテナの作成 及び 起動。

#!/bin/bash

#--------------------------------------
# create work directory and cd.
#--------------------------------------
workdir=/tmp/docker_container
apiLogDir=${workdir}/logs/apiserver
nginxLogDir=${workdir}/logs/nginx

mkdir -p $workdir
mkdir -p $apiLogDir
mkdir -p $nginxLogDir

cd $workdir

#--------------------------------------
# install docker and docker compose.
#--------------------------------------
apt-get update
apt-get install -y apt-transport-https ca-certificates curl gnupg-agent software-properties-common

curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add -

add-apt-repository \
   "deb [arch=amd64] https://download.docker.com/linux/ubuntu \
   $(lsb_release -cs) \
   stable"

apt-get update
apt-get install -y docker-ce docker-ce-cli containerd.io

curl -L "https://github.com/docker/compose/releases/download/1.26.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
chmod +x /usr/local/bin/docker-compose

#--------------------------------------
# set environments.
#--------------------------------------
# web server url
region=japanwest
hostname=`hostname | tr [A-Z] [a-z]`
server_name="${hostname}.${region}.cloudapp.azure.com"

#--------------------------------------
# wait until azure dns name is available.
# (If it is NG after waiting 5 minutes, give up..)
#--------------------------------------
for i in `seq 30`; do
  x=`host $server_name`
  if [ "$?" == "0" ]; then
    break
  fi
  sleep 10
done

#--------------------------------------
# get ssl certificate.
#--------------------------------------
sudo docker run --rm -p 80:80 \
    -v /etc/letsencrypt:/etc/letsencrypt \
    -v /etc/letsencrypt/logs:/var/log/letsencrypt \
    certbot/certbot certonly --standalone \
    -d $server_name \
    --register-unsafely-without-email \
    --non-interactive --agree-tos \
    --force-renewal \
    --renew-by-default \
    --preferred-challenges http

#--------------------------------------
# create default.conf for nginx.
#--------------------------------------
cat <<_EO_DEFAULT_CONF_>default.conf
upstream backend {
    server api_server:8000;
}

# http
server{
    listen 80;
    listen [::]:80;
    server_name ${server_name};
    return 301 https://\$host\$request_uri;
}

# https
server{
    listen 443 ssl;
    listen [::]:443 ssl;
    server_name  ${server_name};
    ssl_certificate     /etc/letsencrypt/live/${server_name}/fullchain.pem;
    ssl_certificate_key /etc/letsencrypt/live/${server_name}/privkey.pem;
    proxy_set_header    Host               \$host;
    proxy_set_header    X-Real-IP          \$remote_addr;
    proxy_set_header    X-Forwarded-Host   \$host;
    proxy_set_header    X-Forwarded-Server \$host;
    proxy_set_header    X-Forwarded-For    \$proxy_add_x_forwarded_for;

    location / {
        proxy_pass http://backend;
    }
}
_EO_DEFAULT_CONF_


#--------------------------------------
# set environment for ACR.
#--------------------------------------
registryName=##registryName##
apiServerImage=##apiServerImage##
registryUser=##registryUser##
registryPwd=##registryPwd##
docker login ${registryName}.azurecr.io -u ${registryUser} -p ${registryPwd}
docker pull ${registryName}.azurecr.io/${apiServerImage}:latest

#--------------------------------------
# create docker-compose.yml
#--------------------------------------
cat <<_EOYML_>docker-compose.yml
version: "3"
services:
  api_server:
    image: ${registryName}.azurecr.io/${apiServerImage}:latest
    hostname: api_server
    container_name: api_server
    ports:
      - "8000:8000"
    environment:
      HTTPWORKER_PORT: "8000"
      STORAGE_KEY_##storageAccountName##: "##storageAccountKey##"
      LOG_DIR: "${apiLogDir}"
      LOG_PREFIX: "apiserver_"
    networks:
      mynetwork:
        ipv4_address: 172.1.0.6
    volumes:
      - ${apiLogDir}:${apiLogDir}
    restart: always
  nginx:
    image: nginx:latest
    hostname: proxy_container
    container_name: proxy_container
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./default.conf:/etc/nginx/conf.d/default.conf
      - /etc/letsencrypt:/etc/letsencrypt
      - ${nginxLogDir}:/var/log/nginx
    networks:
      mynetwork:
        ipv4_address: 172.1.0.7
    restart: always

networks:
  mynetwork:
    driver: bridge
    ipam:
      config:
        - subnet: 172.1.0.0/24
_EOYML_

#--------------------------------------
# start containers
#--------------------------------------
docker-compose up -d
FROM alpine
COPY apiserver /apiserver
EXPOSE 8000
ENTRYPOINT ["/apiserver"]
package main

import (
    "context"
    "encoding/json"
    "fmt"
    "io"
    "log"
    "net/http"
    "net/url"
    "os"
    //"regexp"
    "strings"
    "time"

    "github.com/Azure/azure-storage-blob-go/azblob"
    //"github.com/microsoft/ApplicationInsights-Go/appinsights"
    //"github.com/microsoft/ApplicationInsights-Go/appinsights/contracts"
)

type InvokeResponse struct {
    File string
}

type InvokeRequest struct {
    Topic           string
    Subject         string
    EventType       string
    EventTime       string
    Id              string
    Data            map[string]interface{}
    DataVersion     string
    MetadataVersion string
}

/**
 * 環境変数の取得.
 */
func getEnv(envName string, defaultValue string) string {
    value, exists := os.LookupEnv(envName)
    if exists {
        return value
    } else {
        return defaultValue
    }
}

//var telemetryClient appinsights.TelemetryClient

func init(){
    //telemetryClient = appinsights.NewTelemetryClient("インストルメンテーションキー")
    setLogfile(true)
}

func printDebug(format string, params ...interface{}){
    setLogfile(false)
    msg := fmt.Sprintf(format, params...)
    log.Printf("[DEBUG] %s\n", msg)
    //telemetryClient.TrackTrace(fmt.Sprintf("[DEBUG] %s", msg), contracts.Verbose)
}

func printInfo(format string, params ...interface{}){
    setLogfile(false)
    msg := fmt.Sprintf(format, params...)
    log.Printf("[INFO] %s\n", msg)
    //telemetryClient.TrackTrace(fmt.Sprintf("[INFO] %s", msg), contracts.Information)
}

func printError(format string, params ...interface{}){
    setLogfile(false)
    msg := fmt.Sprintf(format, params...)
    log.Printf("[ERROR] %s\n", msg)
    //telemetryClient.TrackTrace(fmt.Sprintf("[ERROR] %s", msg), contracts.Error)
}

func fileExists(name string) bool {
    _, err := os.Stat(name)
    return !os.IsNotExist(err)
}

func setLogfile(init bool){
    log.SetFlags(log.Ldate|log.Ltime|log.Lmicroseconds|log.Lshortfile)
    logDirPath := getEnv("LOG_DIR", "/tmp")
    logPrefix  := getEnv("LOG_PREFIX", "apiserver_")
    logfilePath := fmt.Sprintf("%s/%s%s.log", logDirPath, logPrefix, time.Now().Format("2006-01-02"))
    if init || !fileExists(logfilePath) {
        newlogfile, err := os.OpenFile(logfilePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666)
        if err != nil {
            panic("cannnot open " + logfilePath + " : "+ err.Error())
        }
        log.SetOutput(io.MultiWriter(newlogfile, os.Stdout))
        printInfo("switch logfile: %s", logfilePath)
    }
}

func blobTriggerHandler(w http.ResponseWriter, r *http.Request) {

    printInfo("START blobTriggerHandler")

    var event InvokeRequest
    var fileUrl string

    defer func(){
        err := recover()
        if fileUrl == "" {
            fileUrl = "unknown"
        }
        if err != nil {
            printError("Failure process %s , event: %v", fileUrl, event)
        } else {
            printInfo("Success process %s", fileUrl)
        }
        printInfo("END blobTriggerHandler")
    }()

    //------------------------------------------------
    // リクエストデータのパース
    //------------------------------------------------
    var invokeReq []InvokeRequest
    d := json.NewDecoder(r.Body)
    decodeErr := d.Decode(&invokeReq)
    if decodeErr != nil {
        http.Error(w, decodeErr.Error(), http.StatusBadRequest)
        return
    }

    // イベントデータの取得
    event = invokeReq[0]

    // エンドポイントの検証用リクエストの時は validationCode を返して終了(同期ハンドシェイク)
    if event.EventType == "Microsoft.EventGrid.SubscriptionValidationEvent" {
        fileUrl = "SubscriptionValidationEvent"
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(fmt.Sprintf("{\"validationResponse\": \"%s\"}", event.Data["validationCode"])))
        return
    }

    //------------------------------------------------
    // ○○処理.
    //------------------------------------------------
    fileUrl = fmt.Sprintf("%s", event.Data["url"])
    sampleProc(fileUrl)
    // コンテナ毎に処理を切り替える場合
    //if regexp.MustCompile(`container1/.+`).MatchString(fileUrl) {
    //    sampleProc1(fileUrl)
    //} else if regexp.MustCompile(`container2/.+`).MatchString(fileUrl) {
    //    sampleProc2(fileUrl)
    //}

    //------------------------------------------------
    // レスポンス生成
    //------------------------------------------------
    invokeResponse := InvokeResponse{File: fileUrl}
    js, err := json.Marshal(invokeResponse)
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }

    w.Header().Set("Content-Type", "application/json")
    w.Write(js)
}

/**
 * ○○処理.
 */
func sampleProc(fileUrl string) {

    // 対象のファイルURLをドメイン+コンテナ部分とファイル名部分に分ける
    urlParts := strings.Split(fileUrl, "/")
    containerUrlText := fmt.Sprintf("https://%s/%s", urlParts[2], urlParts[3])
    rep := strings.NewReplacer(fmt.Sprintf("%s/",containerUrlText), "")
    fileName    := rep.Replace(fileUrl)
    accountName := strings.Split(urlParts[2], ".")[0]
    containerName := urlParts[3]

    // ローカル実行用の環境変数がある場合はそちらを優先
    localContainerUrl := getEnv("LOCAL_STORAGE_CONTAINER_URL", "")
    if localContainerUrl != "" {
        containerUrlText = fmt.Sprintf("%s/%s/%s", localContainerUrl, accountName, containerName)
    }

    printDebug("fileUrl: %s", fileUrl)
    printDebug("containerUrl: %s", containerUrlText)
    printDebug("fileName    : %s", fileName)

    // 対象のファイルURLを取得
    keyEnvName  := fmt.Sprintf("STORAGE_KEY_%s", accountName)
    accountKey  := getEnv(keyEnvName, "")
    if accountKey == "" {
        printError("環境変数(%s)が設定されていません。", keyEnvName)
        return
    }

    ctx := context.Background()
    credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
    if err != nil {
        printError("Get Credential Error: %v", err)
    }

    p := azblob.NewPipeline(credential, azblob.PipelineOptions{})
    cURL, _ := url.Parse(containerUrlText)
    containerURL := azblob.NewContainerURL(*cURL, p)
    blobURL := containerURL.NewBlobURL(fileName)

    // サイズを取得
    var blobSize int64 = 1024
    blobPropResponse, err := blobURL.GetProperties(ctx, azblob.BlobAccessConditions{})
    if err != nil {
        printError("Get Properties Error: %v", err)
        return
    } else {
        blobSize = blobPropResponse.ContentLength()
    }

    // ファイル内容をバッファに取得
    downloadedData := make([]byte, blobSize)
    err = azblob.DownloadBlobToBuffer(ctx, blobURL, 0, azblob.CountToEnd, downloadedData, azblob.DownloadFromBlobOptions{})
    if err != nil {
        printError("Download Error: %v", err)
    } else {
        // ファイル内容を表示
        printInfo("Download Success %s", fileUrl)
        printInfo(string(downloadedData))
    }
}

/**
 * メイン.
 */
func main() {
    httpInvokerPort, exists := os.LookupEnv("HTTPWORKER_PORT")
    if ! exists {
        httpInvokerPort = "8000"
    }
    printInfo("HTTPWORKER_PORT: " + httpInvokerPort)
    mux := http.NewServeMux()
    mux.HandleFunc("/", blobTriggerHandler)
    printInfo("Go server Listening...on httpInvokerPort: %s", httpInvokerPort)
    log.Fatal(http.ListenAndServe(":"+httpInvokerPort, mux))
}

ファイルアップロード用

#!/bin/bash
source 0_env.sh

connstr=`az storage account show-connection-string --name $storageAccountName -o table | tail -1`
az storage blob upload -f tests/sample.csv -c $storageContainerIn -n sample1.csv --connection-string "${connstr}"

動作確認用のサンプルCSV

col1,col2,col3
aaaa,bbbb,cccc
bbbb,cccc,dddd
cccc,dddd,eeee

リソース作成の実行

./1_create_resources.sh --create

Webフックの設定

ここまでで Webフック 以外のリソース作成が完了しているので、次はいよいよ Webフック の設定を行う。

Azure ポータルから設定する場合

対象のストレージアカウントの画面から [イベント] を選択し、[+イベント サブスクリプション] を押下。
webhook_setting01.png

以下の通り入力。
※webhook の URL には 上記で作成したサーバの URL を記述する。
※当記事のサンプルソースのまま実施すると 「https://vm名(小文字).リージョン.cloudapp.azure.com」 となる。
webhook_setting02.png

特定のコンテナやファイルのみを処理したい場合は、フィルターを設定する事も可能。
※複数のコンテナを処理したい場合は、高度なフィルターの方を使用する。
webhook_filter_setting.png

Azure CL で設定する場合

参考: https://docs.microsoft.com/ja-jp/azure/storage/blobs/storage-blob-event-quickstart?toc=/azure/event-grid/toc.json

source 0_env.sh

event_subscription_name=${storageAccountName}EventSubscrpt
lowerVmName=`echo $vmName | tr [A-Z] [a-z]`
endpoint_url=https://${lowerVmName}.${region}.cloudapp.azure.com

# Event Grid リソース プロバイダーを有効にする
az provider register --namespace Microsoft.EventGrid

# 有効になるまで待つ
while [ true ]; do
  result=`az provider show --namespace Microsoft.EventGrid --query "registrationState" | grep Registered | wc -l | awk '{print $1}'`
  if [ "$result" == "1" ]; then
    break
  fi
  sleep 5
done;

# ストレージアカウントをサブスクライブする
storageid=`az storage account show --name $storageAccountName --resource-group $resourceGroup --query id --output tsv`

az eventgrid event-subscription create \
  --source-resource-id $storageid \
  --name $event_subscription_name \
  --endpoint $endpoint_url \
  --included-event-types "Microsoft.Storage.BlobCreated" \
  --subject-begins-with "/blobServices/default/containers/ストレージコンテナ名"    # コンテナ名でフィルタをかける場合
  --advanced-filter subject StringBeginsWith "/blobServices/default/containers/ストレージコンテナ名"     # 高度なフィルターで設定してもOK(複数可)

動作確認

tests/sample.csv をストレージコンテナにアップロードする。

./9_upload_file.sh

webhook が機能すると API server のログに以下の内容が出力される。
※当記事のサンプルではログは VM の /tmp/docker_container/logs/apiserver 配下に出力するようにしている。

2020/09/03 02:41:14.254662 apiserver.go:72: [INFO] START blobTriggerHandler
2020/09/03 02:41:14.255471 apiserver.go:66: [DEBUG] fileUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/sample1.csv
2020/09/03 02:41:14.255645 apiserver.go:66: [DEBUG] containerUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名
2020/09/03 02:41:14.255827 apiserver.go:66: [DEBUG] fileName    : sample1.csv
2020/09/03 02:41:14.539471 apiserver.go:72: [INFO] Download Success https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/sample1.csv
2020/09/03 02:41:14.539819 apiserver.go:72: [INFO] col1,col2,col3
aaaa,bbbb,cccc
bbbb,cccc,dddd
cccc,dddd,eeee

2020/09/03 02:41:14.540434 apiserver.go:72: [INFO] Success process https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/sample1.csv
2020/09/03 02:41:14.540642 apiserver.go:72: [INFO] END blobTriggerHandler

VMのNSG(ネットワークセキュリティグループ)の変更

ここまでで構築したVMはインターネット上に公開されており、全ての送信元からのリクエストが受け入れられる状態になっている。
今回用意したAPIサーバは Event Grid からの Webhook専用なので、イベントデータの送信元を制限しておく。

リソース作成用のシェル(1_create_resources.sh) では 「az vm open-port」で VMのポート開放を行っているが、
このコマンドでは「VM用のNSGの作成」と「ルールの追加」が行われており、作成される NSG 及び ルールの名前は以下の通りとなる。

リソース命名
NSG名VM名+"NSG"myvmNSG
NSGルール"open-port-" + ポート番号open-port-80

以降では、このルールを変更して 「EventGrid からのリクエストのみを受け付ける」 ように変更する。

Azure ポータルから行う場合

対象のNSGの画面から [受信セキュリティ規則] を選択。
webhook_nsg_update01.png

以下の通り変更する。(変更はできないので削除してから追加する)

変更前の状態
webhook_nsg_update02.png

変更後の状態
webhook_nsg_update03.png

Azure CLI で行う場合

source 0_env.sh

az network nsg rule delete -g $resourceGroup --nsg-name ${vmName}NSG --name open-port-80
az network nsg rule create -g $resourceGroup --nsg-name ${vmName}NSG --name open-port-80 \
  --protocol '*' --direction inbound --priority 930 \
  --source-address-prefix AzureEventGrid --source-port-range '*' \
  --destination-address-prefix '*' --destination-port-range 80 --access allow

az network nsg rule delete -g $resourceGroup --nsg-name ${vmName}NSG --name open-port-443
az network nsg rule create -g $resourceGroup --nsg-name ${vmName}NSG --name open-port-443 \
  --protocol '*' --direction inbound --priority 940 \
  --source-address-prefix AzureEventGrid --source-port-range '*' \
  --destination-address-prefix '*' --destination-port-range 443 --access allow

再試行について

Event Grid のメッセージの配信と再試行 にも記載している為、そちらも参照。

既定では 24 時間以内 または 最大30回 の再試行が行われる。
※参考: 再試行のスケジュールと期間

成功とみなされるレスポンスは以下の通りで、これ以外は全て失敗とみなされて再試行が行われる。
※ 参考: メッセージの配信状態

HTTPステータス説明
200OK
201Created
202Accepted
203権限のない情報
204No Content

注意点

上記の 参考URL(再試行のスケジュールと期間) の記載で気にしておかなければいけないのが以下の記述で、
同じデータが流れてくる可能性があるので、受け側は冪等性を持たせる実装をしておく必要がある。

> エンドポイントが 3 分以内に応答した場合、Event Grid はベスト エフォート方式でイベントを再試行キューから削除しようとしますが、それでも重複が受信される可能性があります。


尚、有効期限、及び 最大再試行回数はイベントサブスクリプションの追加時に [追加の機能] タブからカスタマイズする事もできる。
※ただし、細かな再試行スケジュールは指定できない。

webhook_setting03.png

Application Insights へのログの連携

ApplicationInsights-Go を使用すれば Application Insights にログを連携できるので、関数アプリや App Service アプリと同じように監視や通知ができる。
※関連: Azure Functions の異常を検知する

Application Insights 作成

source 0_env.sh

# Application Insights 拡張が利用できない場合は追加インストール
x=`az monitor app-insights --help 2>&1`
if [ "$?" != "0" ]; then
  az extension add -n application-insights
fi

# Application Insights コンポーネント作成
if [ "$subscriptionId" != "" ]; then
  echo az monitor app-insights component create
  az monitor app-insights component create \
      --app $insightsName \
      --location $insightsRegion \
      --resource-group $resourceGroup \
      --query-access Enabled \
      --retention-time $insightsDays \
      --subscription $subscriptionId
fi

# インストルメンテーションキーの取得
echo "instrumentationKey: `az monitor app-insights component show -g $resourceGroup -a $insightsName | grep instrumentationKey | awk '{print $2}' | sed -E 's/[",]//g'`"

go ライブラリのインストール

go get github.com/microsoft/ApplicationInsights-Go/appinsights

apiserver.go の変更

import (
    // 以下を追加
    "github.com/microsoft/ApplicationInsights-Go/appinsights"
    "github.com/microsoft/ApplicationInsights-Go/appinsights/contracts"
)

var telemetryClient appinsights.TelemetryClient  // 追加

func init(){
    telemetryClient = appinsights.NewTelemetryClient("上記の Insights 作成時に表示された instrumentationKey")    // 追加
    setLogfile(true)
}

func printDebug(format string, params ...interface{}){
     :
    telemetryClient.TrackTrace(fmt.Sprintf("[DEBUG] %s", msg), contracts.Verbose)  // 追加
}

func printInfo(format string, params ...interface{}){
     :
    telemetryClient.TrackTrace(fmt.Sprintf("[INFO] %s", msg), contracts.Information)  // 追加
}

func printError(format string, params ...interface{}){
     :
    telemetryClient.TrackTrace(fmt.Sprintf("[ERROR] %s", msg), contracts.Error)  // 追加
}

(おまけ) ローカルでの実行方法

Azurite用のアカウント名やキー、接続文字列は決まっているので、それを使用して起動するだけ。
https://docs.microsoft.com/ja-jp/azure/storage/common/storage-configure-connection-string#configure-a-connection-string-for-azurite

事前に azurite のインストールは必要。
https://docs.microsoft.com/ja-jp/azure/storage/common/storage-use-azurite

X_local_start.sh

#!/bin/bash

# リソース名一式を読み込み
source 0_env.sh

# ローカルエミュレータ(azurite用のアカウント名 及び キー)
localAccountName=devstoreaccount1
localAccountKey="Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="

# azurite起動
mkdir -p local_azurite
azurite --silent --location `pwd`/local_azurite &
sleep 5

# ストレージコンテナ作成
export AZURE_STORAGE_CONNECTION_STRING="UseDevelopmentStorage=true"
az storage container create -n $storageContainerIn
az storage container create -n $storageContainerOut

# API サーバ起動
mkdir -p logs
export LOG_DIR=`pwd`/logs
export LOG_PREFIX=apiserver_
export STORAGE_KEY_${localAccountName}=${localAccountKey}
export LOCAL_STORAGE_CONTAINER_URL=http://127.0.0.1:10000
go run apiserver.go

# azurite終了
kill -9 `ps | grep azurite | head -1 | awk '{print $1}'`

# ローカル用の環境変数をクリアしておく
export AZURE_STORAGE_CONNECTION_STRING=

X_local_upload.sh

#!/bin/bash

source 0_env.sh

container=$storageContainerIn
filename=tests/sample.csv

# ファイルをazurite(エミュレータ)にアップロード
export AZURE_STORAGE_CONNECTION_STRING="UseDevelopmentStorage=true"
az storage blob upload -f $filename -c $container -n $filename

# ローカル用のイベントデータ作成
cat tests/local_event.tmpl \
    | sed "s/###storageContainer###/${container}/g" \
    | sed "s~###filename###~${filename}~g" >tests/local_event.json

# 疑似Webhookイベント発行
curl -v -H "Content-Type: application/json" -d @tests/local_event.json http://localhost:8000/

export AZURE_STORAGE_CONNECTION_STRING=

tests/local_event.tmpl

[
  {
    "topic": "/subscriptions/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/resourceGroups/xxxxxxxx/providers/Microsoft.Storage/storageAccounts/devstoreaccount1",
    "subject": "/blobServices/default/containers/###storageContainer###/blobs/###filename###",
    "eventType": "Microsoft.Storage.BlobCreated",
    "id": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
    "data": {
      "api": "PutBlob",
      "clientRequestId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
      "requestId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
      "eTag": "0xAAAAAAAAAAA",
      "contentType": "text/plain; charset=utf-8",
      "contentLength": 60, 
      "blobType": "BlockBlob",
      "url": "https://devstoreaccount1.blob.core.windows.net/###storageContainer###/###filename###",
      "sequencer": "00000000000000000000000000000E6E0000000000ff188d",
      "storageDiagnostics": {
        "batchId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxxx"
      }   
    },  
    "dataVersion": "", 
    "metadataVersion": "1",
    "eventTime": "2020-09-02T03:09:21.3390888Z"
  }
]

ローカルでサーバ起動

./X_local_start.sh

ローカルのエミュレータにファイルアップロード

./X_local_upload.sh

添付ファイル: filewebhook_filter_setting.png 339件 [詳細] filewebhook_image.png 320件 [詳細] filewebhook_nsg_update02.png 330件 [詳細] filewebhook_setting02.png 326件 [詳細] filewebhook_nsg_update03.png 340件 [詳細] filewebhook_nsg_update01.png 352件 [詳細] filewebhook_setting03.png 329件 [詳細] filewebhook_setting01.png 317件 [詳細]

トップ   差分 バックアップ リロード   一覧 単語検索 最終更新   ヘルプ   最終更新のRSS
Last-modified: 2020-09-03 (木) 22:05:23 (1502d)