- 追加された行はこの色です。
- 削除された行はこの色です。
#author("2020-09-02T09:10:16+00:00","","")
#mynavi(AWSメモ)
#mynavi(Azureメモ)
#setlinebreak(on);
#html(){{
<style>
.images img { border: 1px solid #333; }
.images div { vertical-align: top; }
</style>
}}
* 概要 [#da86ba60]
#html(<div class="pl10">)
Azure Blob のアップロードをトリガーにして起動する処理は、Blobトリガーを使用すれば簡単に実装できるが、
ここでは Azure Blob のアップロード時に webhook で 他のWebAPI サーバにイベントデータを飛ばす仕組みを構築してみた。
尚、この記事では WebAPI サーバは Azure の仮想マシンに上に構築する。(サーバ自体は今回はGoで実装する)
#html(<div class="pl10 lh15">)
Azure Blob のアップロードをトリガーにして起動する処理は、Blobトリガーを使用すれば簡単に実装できる([[Azure Functions を Go で書く]])が、
ここでは Azure Blob のアップロード時に webhook で 他のWebAPI サーバにイベントデータを飛ばす仕組みを構築する。
また、この記事では WebAPI サーバは Azure の仮想マシンに上に構築する。(サーバ自体は今回はGoで実装する)
尚、ファイルのアップロード時に処理を走らせるだけなら Blob トリガーを使った方が簡単(※)で可用性も高いが、この構成には以下の利点もある。
※ 関連: [[Azure Functions を Go で書く]]
- 新規/変更以外のイベントも捕捉できる。(Blobトリガーは新規/変更のイベントのみ)
- ファイルのデータ自体にはアクセスする必要がない場合は、通信データ量が減る(単純にイベント履歴が欲しい場合など)
- 仮想ネットワーク内のリソースにアクセスする必要がある場合に VNet 統合を使用しなくて良い。
(関数アプリやApp Serviceで VNet統合を使用するには高めのプランを利用する必要がある。)
#html(</div>)
* 目次 [#t64b850c]
#contents
- 関連
-- [[Azureメモ]]
-- [[Azure Functions を Go で書く]]
-- [[GoでAzureのBlobファイルを読み書き]]
- 参考
-- [[Azure Event Grid イベントに対するイベント ハンドラーとしての Webhook、Automation Runbook、Logic Apps>https://docs.microsoft.com/ja-jp/azure/event-grid/handler-webhooks]]
-- [[Webhook のイベント配信>https://docs.microsoft.com/ja-jp/azure/event-grid/webhook-event-delivery]]
-- [[HTTP エンドポイントへのイベントの受信>https://docs.microsoft.com/ja-jp/azure/event-grid/receive-events]]
* 構築イメージ [#oe0c917e]
* 構築内容 [#g2495598]
#html(<div class="pl10">)
#TODO
当記事では以下の構成を Azure 上に構築する。
- Blobストレージにファイルがアップロードされたら Webhook によって、VM内に構築したサーバにイベント通知を行う。
- VM 内の nginx は httpsリクエストを受け付けて、Goで実装された WebAPIサーバ(ポート:8000)にリクエストを転送する。&br; (SSL証明書は VM の作成時に Let's Encrypt を使用して自動的に作成する )
- VMのNSG(ネットワークセキュリティグループ)で、Event Grid 以外からのリクエストは通さないように設定する。
#html(<div class="images"><div>)
&ref(webhook_image.png,nolink);
#html(</div></div>)
#html(</div>)
* エンドポイントの検証について [#w310fe27]
#html(<div class="pl10">)
** 送信される検証イベント [#we9e3418]
#html(<div class="pl10">)
ストレージアカウントに Webhook を設定すると、設定したエンドポイントにサブスクリプション検証イベントが送信される。
https://docs.microsoft.com/ja-jp/azure/event-grid/webhook-event-delivery#endpoint-validation-with-event-grid-events
(検証イベントのイメージ)
#mycode3(){{
[{
"id": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
"topic": "/subscriptions/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/...",
"subject": "",
"data": {
"validationCode": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
"validationUrl": "https://rp-japanwest.eventgrid.azure.net:553/eventsubscriptions/..."
},
"eventType": "Microsoft.EventGrid.SubscriptionValidationEvent",
"eventTime": "YYYY-MM-DDTHH:MM:SS.SSSSSSSZ",
"metadataVersion": "1",
"dataVersion": "2"
}]
}}
#html(</div>)
このリクエストに対して、以下の何れかのアクションをとる事によって検証が完了する。
** 同期ハンドシェイク [#mc8bee47]
#html(<div class="pl10">)
検証リクエストに対して、以下のレスポンスを返す。
※ 受信した validationCode をそのまま validationResponse として返却する。
#mycode3(){{
{
"validationResponse": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"
}
}}
#html(</div>)
** 非同期ハンドシェイク [#c23a7513]
#html(<div class="pl10">)
受信した validationUrl にブラウザ等からアクセスする。
#html(</div>)
&br;
という事なのだが、当記事では同期ハンドシェイクに対応出来るように予めWebAPI サーバの実装に処理を組み込む事とする。
※後述の apiserver.go を参照。
#html(</div>)
* 受信するイベントデータについて [#a6b1aed5]
#html(<div class="pl10">)
イベントスキーマとして "イベントグリッドスキーマ" を選択した場合(後述)、Webhook のエンドポイントには以下のようなリクエストデータが送信されてくる。
※当記事のサンプルでは、以下に含まれる url の情報を元にストレージコンテナからファイルの内容を取得してログに出力している。
#mycode3(){{
[
{
"topic": "/subscriptions/サブスクリプション/resourceGroups/RG名/providers/Microsoft.Storage/storageAccounts/ストレージアカウント名",
"subject": "/blobServices/default/containers/ストレージコンテナ名/blobs/sample1.txt",
"eventType": "Microsoft.Storage.BlobCreated",
"id": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
"data": {
"api": "PutBlob",
"clientRequestId": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
"requestId": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
"eTag": "0xXXXXXXXXXXXXXXX",
"contentType": "text/plain; charset=utf-8",
"contentLength": 60,
"blobType": "BlockBlob",
"url": "https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/sample1.txt",
"sequencer": "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
"storageDiagnostics": {
"batchId": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"
}
},
"dataVersion": "",
"metadataVersion": "1",
"eventTime": "2020-09-02T03:09:21.3390888Z"
}
]
}}
#html(</div>)
* リソース作成 [#ue84466d]
#html(<div class="pl10">)
// START tabs1
#html(){{
<div id="tabs1">
<ul>
<li><a href="#tabs1-1">0_env.sh</a></li>
<li><a href="#tabs1-2">1_create_resources.sh</a></li>
<li><a href="#tabs1-3">2_setup_vm.tmpl</a></li>
<li><a href="#tabs1-4">Dockerfile</a></li>
<li><a href="#tabs1-4">apiserver_Dockerfile</a></li>
<li><a href="#tabs1-5">apiserver.go</a></li>
<li><a href="#tabs1-6">9_upload_file.sh</a></li>
<li><a href="#tabs1-7">tests/sample.csv</a></li>
</ul>
}}
// START tabs1-1
#html(<div id="tabs1-1">)
#mycode2(){{
#!/bin/bash
# 全てのリソース名に付与する接頭文字 (Storageアカウント名などは世界でユニークな必要があるので他ユーザと被らないような名前を付ける)
PREFIX=XXXXXXXX
# サブスクリプションID
subscriptionId=XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX
# リージョン
region=japanwest
insightsRegion=japaneast # 2020/9時点では Appplication Insights で西日本(japanwest)は使用できない
# リソースグループ名
resourceGroup=${PREFIX}ResourceGroup
# ストレージアカウント名
storageAccountName=${PREFIX}straccount
storageSku=Standard_LRS
# Storageコンテナ名
storageContainerIn=${PREFIX}-strcontainer-i
storageContainerOut=${PREFIX}-strcontainer-o
# 仮想ネットワーク名
vnetName=${PREFIX}VNet
vnetPrefix=10.1.0.0/16
# ネットワークセキュリティグループ
nsgName=${vnetName}SecGrp
nsgPubRuleName=${nsgName}PubRule
nsgPriRuleName=${nsgName}PriRule
# VM用のサブネット
vmSubnetName=${PREFIX}VmSubnet
vmSubnetPrefix=10.1.1.0/24
# 関数アプリ用のサブネット
funcSubnetName=${PREFIX}FuncSubnet
funcSubnetPrefix=10.1.2.0/24
# 仮想マシン
vmName=${PREFIX}Vm
vmImage=UbuntuLTS
vmIpAddress=10.1.1.5
vmUser=sample
# Application insights
insightsName=${PREFIX}Insights
insightsDays=30
# Web API server用
registryName=${PREFIX}registry
apiServerImage=${PREFIX}-apiserver
}}
#html(</div>)
// END tabs1-1
// START tabs1-2
#html(<div id="tabs1-2">)
#mycode2(){{
#!/bin/bash
# 設定の読み込み
source 0_env.sh
# リソース作成
if [ "$1" == "--create" ]; then
# リソースグループの作成
echo "az group create"
az group create --name $resourceGroup --location $region
# ストレージアカウントの作成
echo az storage account create
az storage account create \
--name $storageAccountName --location $region \
--resource-group $resourceGroup --sku $storageSku
# ストレージアカウントキーの取得
storageAccountKey=`az storage account keys list -g $resourceGroup -n $storageAccountName -o table | grep key1 | awk '{print $3}'`
# Storageコンテナの作成
echo "az storage container create(in)"
az storage container create \
--name $storageContainerIn --resource-group $resourceGroup --account-name $storageAccountName
echo "az storage container create(out)"
az storage container create \
--name $storageContainerOut --resource-group $resourceGroup --account-name $storageAccountName
# NSG(ネットワークセキュリティグループの)作成
echo "az network nsg create"
az network nsg create --resource-group $resourceGroup --name $nsgName
# コンテナレジストリ作成
echo "az acr create ( $registryName )"
az acr create -n $registryName -g $resourceGroup --sku standard --admin-enabled true
# レジストリユーザ名/パスワード取得
acr_credential="`az acr credential show --name $registryName -o table | tail -1`"
registryUser="`echo "$acr_credential" | awk '{print $1}'`"
registryPwd="`echo "$acr_credential" | awk '{print $2}'`"
# API Server のビルド
go get github.com/sendgrid/sendgrid-go
GOOS=linux GOARCH=amd64 go build -o apiserver
# go のビルド環境がない場合
# docker run --rm -v `pwd`:/tmp/work -w /tmp/work -i golang <<_EO_GOBUILD_
# go get github.com/Azure/azure-storage-blob-go/azblob
# GOOS=linux GOARCH=amd64 go build -o apiserver
#_EO_GOBUILD_
chmod 755 apiserver
# docker イメージを作成してACRにプッシュしておく(内容は Dockerfile 参照)
az acr build --registry $registryName --image ${apiServerImage} -f apiserver_Dockerfile .
# VMセットアップ用スクリプトに環境変数を埋め込み
cat 2_vm_setup.tmpl \
| sed "s/##storageAccountName##/${storageAccountName}/g" \
| sed "s~##storageAccountKey##~${storageAccountKey}~g" \
| sed "s/##registryName##/${registryName}/g" \
| sed "s/##registryUser##/${registryUser}/g" \
| sed "s~##registryPwd##~${registryPwd}~g" \
| sed "s/##apiServerImage##/${apiServerImage}/g" > 2_vm_setup.sh
chmod 755 2_vm_setup.sh
# WebAPIサーバ用のNSGルール(http)
echo "az network nsg rule create(http)"
az network nsg rule create \
--resource-group $resourceGroup --nsg-name $nsgName --name ${nsgPubRuleName}Http \
--access Allow --protocol Tcp --direction Inbound --priority 120 \
--source-address-prefix Internet --source-port-range "*" --destination-port-range "80"
# WebAPIサーバ用のNSGルール(https)
echo "az network nsg rule create(https)"
az network nsg rule create \
--resource-group $resourceGroup --nsg-name $nsgName --name ${nsgPubRuleName}Https \
--access Allow --protocol Tcp --direction Inbound --priority 130 \
--source-address-prefix Internet --source-port-range "*" --destination-port-range "443"
# SSH接続用のNSGルール
echo "az network nsg rule create(ssh)"
az network nsg rule create \
--resource-group $resourceGroup --nsg-name $nsgName --name ${nsgPubRuleName}Ssh \
--access Allow --protocol Tcp --direction Inbound --priority 1000 \
--source-address-prefix Internet --source-port-range "*" --destination-port-range "22"
# 仮想ネットワーク 及び サブネット作成
echo "az network vnet create"
az network vnet create \
--name $vnetName --resource-group $resourceGroup \
--address-prefixes $vnetPrefix --network-security-group $nsgName \
--subnet-name $vmSubnetName --subnet-prefixes $vmSubnetPrefix
# 仮想マシンの作成
rm -rf ~/.ssh/id_rsa
rm -rf ~/.ssh/id_rsa.pub
echo "az vm create"
az vm create \
--resource-group $resourceGroup --name $vmName --image $vmImage --generate-ssh-keys \
--vnet-name $vnetName --subnet $vmSubnetName \
--private-ip-address $vmIpAddress --admin-username $vmUser \
--public-ip-address-dns-name `echo $vmName | tr '[A-Z]' '[a-z]'` \
--custom-data 2_vm_setup.sh
# VMポート開放
#echo "az vm open-port(8086)"
#az vm open-port --resource-group $resourceGroup --name $vmName --port 8086 --priority 920
echo "az vm open-port(80)"
az vm open-port --resource-group $resourceGroup --name $vmName --port 80 --priority 930
echo "az vm open-port(443)"
az vm open-port --resource-group $resourceGroup --name $vmName --port 443 --priority 940
# 生成されたSSH鍵を退避しておく
mkdir -p pem
if [ -e ~/.ssh/id_rsa ]; then
mv ~/.ssh/id_rsa ./pem/id_rsa_${vmName}
mv ~/.ssh/id_rsa.pub ./pem/id_rsa_${vmName}.pub
fi
lowerVmName=`echo $vmName | tr [A-Z] [a-z]`
echo "WebAPI Server: https://${lowerVmName}.${region}.cloudapp.azure.com"
fi
# リソース削除
if [ "$1" == "--delete" ]; then
echo "az group delete"
az group delete --name $resourceGroup
fi
}}
#html(</div>)
// END tabs1-2
// START tabs1-3
#html(<div id="tabs1-3">)
VMのセットアップ処理。
※nginx、API Server コンテナの作成 及び 起動。
#mycode2(){{
#!/bin/bash
#--------------------------------------
# create work directory and cd.
#--------------------------------------
workdir=/tmp/docker_container
apiLogDir=${workdir}/logs/apiserver
nginxLogDir=${workdir}/logs/nginx
mkdir -p $workdir
mkdir -p $apiLogDir
mkdir -p $nginxLogDir
cd $workdir
#--------------------------------------
# install docker and docker compose.
#--------------------------------------
apt-get update
apt-get install -y apt-transport-https ca-certificates curl gnupg-agent software-properties-common
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add -
add-apt-repository \
"deb [arch=amd64] https://download.docker.com/linux/ubuntu \
$(lsb_release -cs) \
stable"
apt-get update
apt-get install -y docker-ce docker-ce-cli containerd.io
curl -L "https://github.com/docker/compose/releases/download/1.26.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
chmod +x /usr/local/bin/docker-compose
#--------------------------------------
# set environments.
#--------------------------------------
# web server url
region=japanwest
hostname=`hostname | tr [A-Z] [a-z]`
server_name="${hostname}.${region}.cloudapp.azure.com"
#--------------------------------------
# wait until azure dns name is available.
# (If it is NG after waiting 5 minutes, give up..)
#--------------------------------------
for i in `seq 30`; do
x=`host $server_name`
if [ "$?" == "0" ]; then
break
fi
sleep 10
done
#--------------------------------------
# get ssl certificate.
#--------------------------------------
sudo docker run --rm -p 80:80 \
-v /etc/letsencrypt:/etc/letsencrypt \
-v /etc/letsencrypt/logs:/var/log/letsencrypt \
certbot/certbot certonly --standalone \
-d $server_name \
--register-unsafely-without-email \
--non-interactive --agree-tos \
--force-renewal \
--renew-by-default \
--preferred-challenges http
#--------------------------------------
# create default.conf for nginx.
#--------------------------------------
cat <<_EO_DEFAULT_CONF_>default.conf
upstream backend {
server api_server:8000;
}
# http
server{
listen 80;
listen [::]:80;
server_name ${server_name};
return 301 https://\$host\$request_uri;
}
# https
server{
listen 443 ssl;
listen [::]:443 ssl;
server_name ${server_name};
ssl_certificate /etc/letsencrypt/live/${server_name}/fullchain.pem;
ssl_certificate_key /etc/letsencrypt/live/${server_name}/privkey.pem;
proxy_set_header Host \$host;
proxy_set_header X-Real-IP \$remote_addr;
proxy_set_header X-Forwarded-Host \$host;
proxy_set_header X-Forwarded-Server \$host;
proxy_set_header X-Forwarded-For \$proxy_add_x_forwarded_for;
location / {
proxy_pass http://backend;
}
}
_EO_DEFAULT_CONF_
#--------------------------------------
# set environment for ACR.
#--------------------------------------
registryName=##registryName##
apiServerImage=##apiServerImage##
registryUser=##registryUser##
registryPwd=##registryPwd##
docker login ${registryName}.azurecr.io -u ${registryUser} -p ${registryPwd}
docker pull ${registryName}.azurecr.io/${apiServerImage}:latest
#--------------------------------------
# create docker-compose.yml
#--------------------------------------
cat <<_EOYML_>docker-compose.yml
version: "3"
services:
api_server:
image: ${registryName}.azurecr.io/${apiServerImage}:latest
hostname: api_server
container_name: api_server
ports:
- "8000:8000"
environment:
HTTPWORKER_PORT: "8000"
STORAGE_KEY_##storageAccountName##: "##storageAccountKey##"
LOG_DIR: "${apiLogDir}"
LOG_PREFIX: "apiserver_"
networks:
mynetwork:
ipv4_address: 172.1.0.6
volumes:
- ${apiLogDir}:${apiLogDir}
restart: always
nginx:
image: nginx:latest
hostname: proxy_container
container_name: proxy_container
ports:
- "80:80"
- "443:443"
volumes:
- ./default.conf:/etc/nginx/conf.d/default.conf
- /etc/letsencrypt:/etc/letsencrypt
- ${nginxLogDir}:/var/log/nginx
networks:
mynetwork:
ipv4_address: 172.1.0.7
restart: always
networks:
mynetwork:
driver: bridge
ipam:
config:
- subnet: 172.1.0.0/24
_EOYML_
#--------------------------------------
# start containers
#--------------------------------------
docker-compose up -d
}}
#html(</div>)
// END tabs1-3
// START tabs1-4
#html(<div id="tabs1-4">)
#mycode2(){{
FROM alpine
COPY apiserver /apiserver
EXPOSE 8000
ENTRYPOINT ["/apiserver"]
}}
#html(</div>)
// END tabs1-4
// START tabs1-5
#html(<div id="tabs1-5">)
#mycode2(){{
package main
import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"net/url"
"os"
//"regexp"
"strings"
"time"
"github.com/Azure/azure-storage-blob-go/azblob"
//"github.com/microsoft/ApplicationInsights-Go/appinsights"
//"github.com/microsoft/ApplicationInsights-Go/appinsights/contracts"
)
type InvokeResponse struct {
File string
}
type InvokeRequest struct {
Topic string
Subject string
EventType string
EventTime string
Id string
Data map[string]interface{}
DataVersion string
MetadataVersion string
}
/**
* 環境変数の取得.
*/
func getEnv(envName string, defaultValue string) string {
value, exists := os.LookupEnv(envName)
if exists {
return value
} else {
return defaultValue
}
}
//var telemetryClient appinsights.TelemetryClient
func init(){
//telemetryClient = appinsights.NewTelemetryClient("インストルメンテーションキー")
setLogfile(true)
}
func printDebug(format string, params ...interface{}){
setLogfile(false)
msg := fmt.Sprintf(format, params...)
log.Printf("[DEBUG] %s\n", msg)
//telemetryClient.TrackTrace(fmt.Sprintf("[DEBUG] %s", msg), contracts.Verbose)
}
func printInfo(format string, params ...interface{}){
setLogfile(false)
msg := fmt.Sprintf(format, params...)
log.Printf("[INFO] %s\n", msg)
//telemetryClient.TrackTrace(fmt.Sprintf("[INFO] %s", msg), contracts.Information)
}
func printError(format string, params ...interface{}){
setLogfile(false)
msg := fmt.Sprintf(format, params...)
log.Printf("[ERROR] %s\n", msg)
//telemetryClient.TrackTrace(fmt.Sprintf("[ERROR] %s", msg), contracts.Error)
}
func fileExists(name string) bool {
_, err := os.Stat(name)
return !os.IsNotExist(err)
}
func setLogfile(init bool){
log.SetFlags(log.Ldate|log.Ltime|log.Lmicroseconds|log.Lshortfile)
logDirPath := getEnv("LOG_DIR", "/tmp")
logPrefix := getEnv("LOG_PREFIX", "apiserver_")
logfilePath := fmt.Sprintf("%s/%s%s.log", logDirPath, logPrefix, time.Now().Format("2006-01-02"))
if init || !fileExists(logfilePath) {
newlogfile, err := os.OpenFile(logfilePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
panic("cannnot open " + logfilePath + " : "+ err.Error())
}
log.SetOutput(io.MultiWriter(newlogfile, os.Stdout))
printInfo("switch logfile: %s", logfilePath)
}
}
func blobTriggerHandler(w http.ResponseWriter, r *http.Request) {
printInfo("START blobTriggerHandler")
var event InvokeRequest
var fileUrl string
defer func(){
err := recover()
if fileUrl == "" {
fileUrl = "unknown"
}
if err != nil {
printError("Failure process %s , event: %v", fileUrl, event)
} else {
printInfo("Success process %s", fileUrl)
}
printInfo("END blobTriggerHandler")
}()
//------------------------------------------------
// リクエストデータのパース
//------------------------------------------------
var invokeReq []InvokeRequest
d := json.NewDecoder(r.Body)
decodeErr := d.Decode(&invokeReq)
if decodeErr != nil {
http.Error(w, decodeErr.Error(), http.StatusBadRequest)
return
}
// イベントデータの取得
event = invokeReq[0]
// エンドポイントの検証用リクエストの時は validationCode を返して終了(同期ハンドシェイク)
if event.EventType == "Microsoft.EventGrid.SubscriptionValidationEvent" {
fileUrl = "SubscriptionValidationEvent"
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(fmt.Sprintf("{\"validationResponse\": \"%s\"}", event.Data["validationCode"])))
return
}
//------------------------------------------------
// ○○処理.
//------------------------------------------------
fileUrl = fmt.Sprintf("%s", event.Data["url"])
sampleProc(fileUrl)
// コンテナ毎に処理を切り替える場合
//if regexp.MustCompile(`container1/.+`).MatchString(fileUrl) {
// sampleProc1(fileUrl)
//} else if regexp.MustCompile(`container2/.+`).MatchString(fileUrl) {
// sampleProc2(fileUrl)
//}
//------------------------------------------------
// レスポンス生成
//------------------------------------------------
invokeResponse := InvokeResponse{File: fileUrl}
js, err := json.Marshal(invokeResponse)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(js)
}
/**
* ○○処理.
*/
func sampleProc(fileUrl string) {
// 対象のファイルURLをドメイン+コンテナ部分とファイル名部分に分ける
urlParts := strings.Split(fileUrl, "/")
containerUrlText := fmt.Sprintf("https://%s/%s", urlParts[2], urlParts[3])
rep := strings.NewReplacer(fmt.Sprintf("%s/",containerUrlText), "")
fileName := rep.Replace(fileUrl)
accountName := strings.Split(urlParts[2], ".")[0]
containerName := urlParts[3]
// ローカル実行用の環境変数がある場合はそちらを優先
localContainerUrl := getEnv("LOCAL_STORAGE_CONTAINER_URL", "")
if localContainerUrl != "" {
containerUrlText = fmt.Sprintf("%s/%s/%s", localContainerUrl, accountName, containerName)
}
printDebug("fileUrl: %s", fileUrl)
printDebug("containerUrl: %s", containerUrlText)
printDebug("fileName : %s", fileName)
// 対象のファイルURLを取得
keyEnvName := fmt.Sprintf("STORAGE_KEY_%s", accountName)
accountKey := getEnv(keyEnvName, "")
if accountKey == "" {
printError("環境変数(%s)が設定されていません。", keyEnvName)
return
}
ctx := context.Background()
credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
if err != nil {
printError("Get Credential Error: %v", err)
}
p := azblob.NewPipeline(credential, azblob.PipelineOptions{})
cURL, _ := url.Parse(containerUrlText)
containerURL := azblob.NewContainerURL(*cURL, p)
blobURL := containerURL.NewBlobURL(fileName)
// サイズを取得
var blobSize int64 = 1024
blobPropResponse, err := blobURL.GetProperties(ctx, azblob.BlobAccessConditions{})
if err != nil {
printError("Get Properties Error: %v", err)
return
} else {
blobSize = blobPropResponse.ContentLength()
}
// ファイル内容をバッファに取得
downloadedData := make([]byte, blobSize)
err = azblob.DownloadBlobToBuffer(ctx, blobURL, 0, azblob.CountToEnd, downloadedData, azblob.DownloadFromBlobOptions{})
if err != nil {
printError("Download Error: %v", err)
} else {
// ファイル内容を表示
printInfo("Download Success %s", fileUrl)
printInfo(string(downloadedData))
}
}
/**
* メイン.
*/
func main() {
httpInvokerPort, exists := os.LookupEnv("HTTPWORKER_PORT")
if ! exists {
httpInvokerPort = "8000"
}
printInfo("HTTPWORKER_PORT: " + httpInvokerPort)
mux := http.NewServeMux()
mux.HandleFunc("/", blobTriggerHandler)
printInfo("Go server Listening...on httpInvokerPort: %s", httpInvokerPort)
log.Fatal(http.ListenAndServe(":"+httpInvokerPort, mux))
}
}}
#html(</div>)
// END tabs1-5
// START tabs1-6
#html(<div id="tabs1-6">)
ファイルアップロード用
#mycode2(){{
#!/bin/bash
source 0_env.sh
connstr=`az storage account show-connection-string --name $storageAccountName -o table | tail -1`
az storage blob upload -f tests/sample.csv -c $storageContainerIn -n sample1.csv --connection-string "${connstr}"
}}
#html(</div>)
// END tabs1-6
// START tabs1-7
#html(<div id="tabs1-7">)
動作確認用のサンプルCSV
#mycode2(){{
col1,col2,col3
aaaa,bbbb,cccc
bbbb,cccc,dddd
cccc,dddd,eeee
}}
#html(</div>)
// END tabs1-7
#html(</div>)
// END tabs1
#html(<script>$(function() { $("#tabs1").tabs(); });</script>)
** リソース作成の実行 [#n8627ad1]
#myterm2(){{
./1_create_resources.sh --create
}}
#html(</div>)
* フックの設定 [#ccd81ffc]
* Webフックの設定 [#ccd81ffc]
#html(<div class="pl10">)
#TODO
ここまでで Webフック 以外のリソース作成が完了しているので、次はいよいよ Webフック の設定を行う。
** Azure ポータルから設定する場合 [#xd2df30c]
#html(<div class="pl10">)
#html(<div class="images"><div>)
対象のストレージアカウントの画面から [イベント] を選択し、[+イベント サブスクリプション] を押下。
&ref(webhook_setting01.png,nolink);
#html(</div><div>)
以下の通り入力。
※webhook の URL には 上記で作成したサーバの URL を記述する。
※当記事のサンプルソースのまま実施すると 「https://vm名(小文字).リージョン.cloudapp.azure.com」 となる。
&ref(webhook_setting02.png,nolink);
#html(</div><div>)
特定のコンテナやファイルのみを処理したい場合は、フィルターを設定する事も可能。
※複数のコンテナを処理したい場合は、高度なフィルターの方を使用する。
&ref(webhook_filter_setting.png,nolink);
#html(</div></div>)
#html(</div>)
** Azure CL で設定する場合 [#q5b6df21]
#html(<div class="pl10">)
参考: https://docs.microsoft.com/ja-jp/azure/storage/blobs/storage-blob-event-quickstart?toc=/azure/event-grid/toc.json
#myterm2(){{
source 0_env.sh
event_subscription_name=${storageAccountName}EventSubscrpt
lowerVmName=`echo $vmName | tr [A-Z] [a-z]`
endpoint_url=https://${lowerVmName}.${region}.cloudapp.azure.com
# Event Grid リソース プロバイダーを有効にする
az provider register --namespace Microsoft.EventGrid
# 有効になるまで待つ
while [ true ]; do
result=`az provider show --namespace Microsoft.EventGrid --query "registrationState" | grep Registered | wc -l | awk '{print $1}'`
if [ "$result" == "1" ]; then
break
fi
sleep 5
done;
# ストレージアカウントをサブスクライブする
storageid=`az storage account show --name $storageAccountName --resource-group $resourceGroup --query id --output tsv`
az eventgrid event-subscription create \
--source-resource-id $storageid \
--name $event_subscription_name \
--endpoint $endpoint_url \
--included-event-types "Microsoft.Storage.BlobCreated" \
--subject-begins-with "/blobServices/default/containers/ストレージコンテナ名" # コンテナ名でフィルタをかける場合
--advanced-filter subject StringBeginsWith "/blobServices/default/containers/ストレージコンテナ名" # 高度なフィルターで設定してもOK(複数可)
}}
#html(</div>)
#html(</div>)
* 動作確認 [#x46aacb8]
#html(<div class="pl10">)
#TODO
tests/sample.csv をストレージコンテナにアップロードする。
#myterm2(){{
./9_upload_file.sh
}}
webhook が機能すると API server のログに以下の内容が出力される。
※当記事のサンプルではログは VM の /tmp/docker_container/logs/apiserver 配下に出力するようにしている。
#myterm2(){{
2020/09/03 02:41:14.254662 apiserver.go:72: [INFO] START blobTriggerHandler
2020/09/03 02:41:14.255471 apiserver.go:66: [DEBUG] fileUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/sample1.csv
2020/09/03 02:41:14.255645 apiserver.go:66: [DEBUG] containerUrl: https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名
2020/09/03 02:41:14.255827 apiserver.go:66: [DEBUG] fileName : sample1.csv
2020/09/03 02:41:14.539471 apiserver.go:72: [INFO] Download Success https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/sample1.csv
2020/09/03 02:41:14.539819 apiserver.go:72: [INFO] col1,col2,col3
aaaa,bbbb,cccc
bbbb,cccc,dddd
cccc,dddd,eeee
2020/09/03 02:41:14.540434 apiserver.go:72: [INFO] Success process https://ストレージアカウント名.blob.core.windows.net/ストレージコンテナ名/sample1.csv
2020/09/03 02:41:14.540642 apiserver.go:72: [INFO] END blobTriggerHandler
}}
#html(</div>)
#html(</div>)
* 解説 [#bb978839]
* VMのNSG(ネットワークセキュリティグループ)の変更 [#mb7bf23f]
#html(<div class="pl10">)
#TODO
ここまでで構築したVMはインターネット上に公開されており、全ての送信元からのリクエストが受け入れられる状態になっている。
今回用意したAPIサーバは Event Grid からの Webhook専用なので、イベントデータの送信元を制限しておく。
リソース作成用のシェル(1_create_resources.sh) では 「az vm open-port」で VMのポート開放を行っているが、
このコマンドでは「VM用のNSGの作成」と「ルールの追加」が行われており、作成される NSG 及び ルールの名前は以下の通りとなる。
| リソース | 命名 | 例 |h
| NSG名 | VM名+"NSG" | myvmNSG |
| NSGルール | "open-port-" + ポート番号 | open-port-80 |
以降では、このルールを変更して 「EventGrid からのリクエストのみを受け付ける」 ように変更する。
** Azure ポータルから行う場合 [#l7e53d9c]
#html(<div class="pl10">)
#html(<div class="images"><div>)
対象のNSGの画面から [受信セキュリティ規則] を選択。
&ref(webhook_nsg_update01.png,nolink);
#html(</div><div>)
以下の通り変更する。(変更はできないので削除してから追加する)
変更前の状態
&ref(webhook_nsg_update02.png,nolink);
#html(</div><div>)
変更後の状態
&ref(webhook_nsg_update03.png,nolink);
#html(</div></div>)
#html(</div>)
* TODO [#e920c52a]
** Azure CLI で行う場合 [#u92786cb]
#html(<div class="pl10">)
- APIサーバのログ出力 及び docker volume のマウント
- Application insight への連携(実現可否の調査)
- NSG で EventGrid 以外を拒否
- ストレージアカウントの料金
#myterm2(){{
source 0_env.sh
az network nsg rule delete -g $resourceGroup --nsg-name ${vmName}NSG --name open-port-80
az network nsg rule create -g $resourceGroup --nsg-name ${vmName}NSG --name open-port-80 \
--protocol '*' --direction inbound --priority 930 \
--source-address-prefix AzureEventGrid --source-port-range '*' \
--destination-address-prefix '*' --destination-port-range 80 --access allow
az network nsg rule delete -g $resourceGroup --nsg-name ${vmName}NSG --name open-port-443
az network nsg rule create -g $resourceGroup --nsg-name ${vmName}NSG --name open-port-443 \
--protocol '*' --direction inbound --priority 940 \
--source-address-prefix AzureEventGrid --source-port-range '*' \
--destination-address-prefix '*' --destination-port-range 443 --access allow
}}
#html(</div>)
#html(</div>)
* 再試行について [#ke2ec5bb]
#html(<div class="pl10">)
[[Event Grid のメッセージの配信と再試行]] にも記載している為、そちらも参照。
既定では 24 時間以内 または 最大30回 の再試行が行われる。
※参考: [[再試行のスケジュールと期間>https://docs.microsoft.com/ja-jp/azure/event-grid/delivery-and-retry#retry-schedule-and-duration]]
成功とみなされるレスポンスは以下の通りで、これ以外は全て失敗とみなされて再試行が行われる。
※ 参考: [[メッセージの配信状態>https://docs.microsoft.com/ja-jp/azure/event-grid/delivery-and-retry#message-delivery-status]]
| HTTPステータス | 説明 |h
| 200 | OK |
| 201 | Created |
| 202 | Accepted |
| 203 | 権限のない情報 |
| 204 | No Content |
** 注意点 [#x2c89914]
#html(<div class="pl10">)
上記の 参考URL([[再試行のスケジュールと期間>https://docs.microsoft.com/ja-jp/azure/event-grid/delivery-and-retry#retry-schedule-and-duration]]) の記載で気にしておかなければいけないのが以下の記述で、
''同じデータが流れてくる可能性があるので、受け側は冪等性を持たせる実装をしておく必要がある。''
#html(){{
<div class="ib" style="padding: 10px; background: #eee; border: 1px solid #333;">
> エンドポイントが 3 分以内に応答した場合、Event Grid はベスト エフォート方式でイベントを再試行キューから削除しようとしますが、それでも重複が受信される可能性があります。
</div>
}}
#html(</div>)
&br;
尚、有効期限、及び 最大再試行回数はイベントサブスクリプションの追加時に [追加の機能] タブからカスタマイズする事もできる。
※ただし、細かな再試行スケジュールは指定できない。
#html(<div class="images"><div>)
&ref(webhook_setting03.png,nolink);
#html(</div></div>)
#html(</div>)
* Application Insights へのログの連携 [#gb4f1316]
#html(<div class="pl10">)
[[ApplicationInsights-Go>https://github.com/microsoft/ApplicationInsights-Go]] を使用すれば Application Insights にログを連携できるので、関数アプリや App Service アプリと同じように監視や通知ができる。
※関連: [[Azure Functions の異常を検知する]]
Application Insights 作成
#myterm2(){{
source 0_env.sh
# Application Insights 拡張が利用できない場合は追加インストール
x=`az monitor app-insights --help 2>&1`
if [ "$?" != "0" ]; then
az extension add -n application-insights
fi
# Application Insights コンポーネント作成
if [ "$subscriptionId" != "" ]; then
echo az monitor app-insights component create
az monitor app-insights component create \
--app $insightsName \
--location $insightsRegion \
--resource-group $resourceGroup \
--query-access Enabled \
--retention-time $insightsDays \
--subscription $subscriptionId
fi
# インストルメンテーションキーの取得
echo "instrumentationKey: `az monitor app-insights component show -g $resourceGroup -a $insightsName | grep instrumentationKey | awk '{print $2}' | sed -E 's/[",]//g'`"
}}
go ライブラリのインストール
#myterm2(){{
go get github.com/microsoft/ApplicationInsights-Go/appinsights
}}
apiserver.go の変更
#mycode(){{
import (
// 以下を追加
"github.com/microsoft/ApplicationInsights-Go/appinsights"
"github.com/microsoft/ApplicationInsights-Go/appinsights/contracts"
)
var telemetryClient appinsights.TelemetryClient // 追加
func init(){
telemetryClient = appinsights.NewTelemetryClient("上記の Insights 作成時に表示された instrumentationKey") // 追加
setLogfile(true)
}
func printDebug(format string, params ...interface{}){
:
telemetryClient.TrackTrace(fmt.Sprintf("[DEBUG] %s", msg), contracts.Verbose) // 追加
}
func printInfo(format string, params ...interface{}){
:
telemetryClient.TrackTrace(fmt.Sprintf("[INFO] %s", msg), contracts.Information) // 追加
}
func printError(format string, params ...interface{}){
:
telemetryClient.TrackTrace(fmt.Sprintf("[ERROR] %s", msg), contracts.Error) // 追加
}
}}
#html(</div>)
* (おまけ) ローカルでの実行方法 [#l892a100]
#html(<div class="pl10">)
Azurite用のアカウント名やキー、接続文字列は決まっているので、それを使用して起動するだけ。
https://docs.microsoft.com/ja-jp/azure/storage/common/storage-configure-connection-string#configure-a-connection-string-for-azurite
事前に azurite のインストールは必要。
※ https://docs.microsoft.com/ja-jp/azure/storage/common/storage-use-azurite
// START tabs1
#html(){{
<div id="tabs2">
<ul>
<li><a href="#tabs2-1">X_local_start.sh</a></li>
<li><a href="#tabs2-2">X_local_upload.sh</a></li>
<li><a href="#tabs2-3">tests/local_event.tmpl</a></li>
</ul>
}}
// START tabs2-1
#html(<div id="tabs2-1">)
X_local_start.sh
#mycode2(){{
#!/bin/bash
# リソース名一式を読み込み
source 0_env.sh
# ローカルエミュレータ(azurite用のアカウント名 及び キー)
localAccountName=devstoreaccount1
localAccountKey="Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
# azurite起動
mkdir -p local_azurite
azurite --silent --location `pwd`/local_azurite &
sleep 5
# ストレージコンテナ作成
export AZURE_STORAGE_CONNECTION_STRING="UseDevelopmentStorage=true"
az storage container create -n $storageContainerIn
az storage container create -n $storageContainerOut
# API サーバ起動
mkdir -p logs
export LOG_DIR=`pwd`/logs
export LOG_PREFIX=apiserver_
export STORAGE_KEY_${localAccountName}=${localAccountKey}
export LOCAL_STORAGE_CONTAINER_URL=http://127.0.0.1:10000
go run apiserver.go
# azurite終了
kill -9 `ps | grep azurite | head -1 | awk '{print $1}'`
# ローカル用の環境変数をクリアしておく
export AZURE_STORAGE_CONNECTION_STRING=
}}
#html(</div>)
// END tabs2-1
// START tabs2-2
#html(<div id="tabs2-2">)
X_local_upload.sh
#mycode2(){{
#!/bin/bash
source 0_env.sh
container=$storageContainerIn
filename=tests/sample.csv
# ファイルをazurite(エミュレータ)にアップロード
export AZURE_STORAGE_CONNECTION_STRING="UseDevelopmentStorage=true"
az storage blob upload -f $filename -c $container -n $filename
# ローカル用のイベントデータ作成
cat tests/local_event.tmpl \
| sed "s/###storageContainer###/${container}/g" \
| sed "s~###filename###~${filename}~g" >tests/local_event.json
# 疑似Webhookイベント発行
curl -v -H "Content-Type: application/json" -d @tests/local_event.json http://localhost:8000/
export AZURE_STORAGE_CONNECTION_STRING=
}}
#html(</div>)
// END tabs2-2
// START tabs2-3
#html(<div id="tabs2-3">)
tests/local_event.tmpl
#mycode2(){{
[
{
"topic": "/subscriptions/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/resourceGroups/xxxxxxxx/providers/Microsoft.Storage/storageAccounts/devstoreaccount1",
"subject": "/blobServices/default/containers/###storageContainer###/blobs/###filename###",
"eventType": "Microsoft.Storage.BlobCreated",
"id": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
"data": {
"api": "PutBlob",
"clientRequestId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
"requestId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
"eTag": "0xAAAAAAAAAAA",
"contentType": "text/plain; charset=utf-8",
"contentLength": 60,
"blobType": "BlockBlob",
"url": "https://devstoreaccount1.blob.core.windows.net/###storageContainer###/###filename###",
"sequencer": "00000000000000000000000000000E6E0000000000ff188d",
"storageDiagnostics": {
"batchId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxxx"
}
},
"dataVersion": "",
"metadataVersion": "1",
"eventTime": "2020-09-02T03:09:21.3390888Z"
}
]
}}
#html(</div>)
// END tabs2-3
#html(</div>)
// END tabs2
#html(<script>$(function() { $("#tabs2").tabs(); });</script>)
** ローカルでサーバ起動 [#w0896654]
#myterm2(){{
./X_local_start.sh
}}
** ローカルのエミュレータにファイルアップロード [#c6444d1b]
#myterm2(){{
./X_local_upload.sh
}}
#html(</div>)