- 追加された行はこの色です。
- 削除された行はこの色です。
#author("2020-09-02T09:10:16+00:00","","")
#author("2020-09-03T02:36:12+00:00","","")
#mynavi(AWSメモ)
#setlinebreak(on);
* 概要 [#da86ba60]
#html(<div class="pl10">)
Azure Blob のアップロードをトリガーにして起動する処理は、Blobトリガーを使用すれば簡単に実装できるが、
ここでは Azure Blob のアップロード時に webhook で 他のWebAPI サーバにイベントデータを飛ばす仕組みを構築してみた。
Azure Blob のアップロードをトリガーにして起動する処理は、Blobトリガーを使用すれば簡単に実装できる([[Azure Functions を Go で書く]])が、
ここでは Azure Blob のアップロード時に webhook で 他のWebAPI サーバにイベントデータを飛ばす仕組みを構築する。
尚、この記事では WebAPI サーバは Azure の仮想マシンに上に構築する。(サーバ自体は今回はGoで実装する)
#html(</div>)
* 目次 [#t64b850c]
#contents
- 関連
-- [[Azureメモ]]
-- [[Azure Functions を Go で書く]]
-- [[GoでAzureのBlobファイルを読み書き]]
- 参考
-- [[Azure Event Grid イベントに対するイベント ハンドラーとしての Webhook、Automation Runbook、Logic Apps>https://docs.microsoft.com/ja-jp/azure/event-grid/handler-webhooks]]
-- [[Webhook のイベント配信>https://docs.microsoft.com/ja-jp/azure/event-grid/webhook-event-delivery]]
-- [[HTTP エンドポイントへのイベントの受信>https://docs.microsoft.com/ja-jp/azure/event-grid/receive-events]]
* 構築イメージ [#oe0c917e]
#html(<div class="pl10">)
#TODO
#html(</div>)
* エンドポイントの検証について [#w310fe27]
#html(<div class="pl10">)
Web hook を設定すると、設定したエンドポイントにサブスクリプション検証イベントが送信される。
https://docs.microsoft.com/ja-jp/azure/event-grid/webhook-event-delivery#endpoint-validation-with-event-grid-events
送信されるリクエストデータ
#mycode3(){{
[{
"id": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
"topic": "/subscriptions/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/...",
"subject": "",
"data": {
"validationCode": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
"validationUrl": "https://rp-japanwest.eventgrid.azure.net:553/eventsubscriptions/..."
},
"eventType": "Microsoft.EventGrid.SubscriptionValidationEvent",
"eventTime": "YYYY-MM-DDTHH:MM:SS.SSSSSSSZ",
"metadataVersion": "1",
"dataVersion": "2"
}]
}}
このリクエストに対して、以下の何れかのアクションをとる事によって検証が完了する。
** 同期ハンドシェイク [#mc8bee47]
#html(<div class="pl10">)
検証リクエストに対して、以下のレスポンスを返す。
※ 受信した validationCode をそのまま validationResponse として返却する。
#mycode3(){{
{
"validationResponse": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"
}
}}
#html(</div>)
** 非同期ハンドシェイク [#c23a7513]
#html(<div class="pl10">)
受信した validationUrl にブラウザ等からアクセスする。
#html(</div>)
''尚、当記事では検証用リクエストにも対応できるように予めWebAPI サーバに処理を組み込んでいる。(同期ハンドシェイク)''
#html(</div>)
* 受信するイベントデータについて [#a6b1aed5]
#html(<div class="pl10">)
#TODO
#html(</div>)
* リソース作成 [#ue84466d]
#html(<div class="pl10">)
// START tabs1
#html(){{
<div id="tabs1">
<ul>
<li><a href="#tabs1-1">0_env.sh</a></li>
<li><a href="#tabs1-2">1_create_resources.sh</a></li>
<li><a href="#tabs1-3">2_setup_vm.tmpl</a></li>
<li><a href="#tabs1-4">Dockerfile</a></li>
<li><a href="#tabs1-4">apiserver_Dockerfile</a></li>
<li><a href="#tabs1-5">apiserver.go</a></li>
</ul>
}}
// START tabs1-1
#html(<div id="tabs1-1">)
#mycode2(){{
#!/bin/bash
# 全てのリソース名に付与する接頭文字 (Storageアカウント名などは世界でユニークな必要があるので他ユーザと被らないような名前を付ける)
PREFIX=XXXXXXXX
# サブスクリプションID
subscriptionId=XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX
# リージョン
region=japanwest
# リソースグループ名
resourceGroup=${PREFIX}ResourceGroup
# ストレージアカウント名
storageAccountName=${PREFIX}straccount
storageSku=Standard_LRS
# Storageコンテナ名
storageContainerIn=${PREFIX}-strcontainer-i
storageContainerOut=${PREFIX}-strcontainer-o
# 仮想ネットワーク名
vnetName=${PREFIX}VNet
vnetPrefix=10.1.0.0/16
# ネットワークセキュリティグループ
nsgName=${vnetName}SecGrp
nsgPubRuleName=${nsgName}PubRule
nsgPriRuleName=${nsgName}PriRule
# VM用のサブネット
vmSubnetName=${PREFIX}VmSubnet
vmSubnetPrefix=10.1.1.0/24
# 関数アプリ用のサブネット
funcSubnetName=${PREFIX}FuncSubnet
funcSubnetPrefix=10.1.2.0/24
# 仮想マシン
vmName=${PREFIX}Vm
vmImage=UbuntuLTS
vmIpAddress=10.1.1.5
vmUser=sample
# Application insights
insightsName=${PREFIX}Insights
insightsDays=30
# Web API server用
registryName=${PREFIX}registry
apiServerImage=${PREFIX}-apiserver
}}
#html(</div>)
// END tabs1-1
// START tabs1-2
#html(<div id="tabs1-2">)
#mycode2(){{
#!/bin/bash
# 設定の読み込み
source 0_env.sh
# リソース作成
if [ "$1" == "--create" ]; then
# リソースグループの作成
echo "az group create"
az group create --name $resourceGroup --location $region
# ストレージアカウントの作成
echo az storage account create
az storage account create \
--name $storageAccountName --location $region \
--resource-group $resourceGroup --sku $storageSku
# ストレージアカウントキーの取得
storageAccountKey=`az storage account keys list -g $resourceGroup -n $storageAccountName -o table | grep key1 | awk '{print $3}'`
# Storageコンテナの作成
echo "az storage container create(in)"
az storage container create \
--name $storageContainerIn --resource-group $resourceGroup --account-name $storageAccountName
echo "az storage container create(out)"
az storage container create \
--name $storageContainerOut --resource-group $resourceGroup --account-name $storageAccountName
# NSG(ネットワークセキュリティグループの)作成
echo "az network nsg create"
az network nsg create --resource-group $resourceGroup --name $nsgName
# コンテナレジストリ作成
echo "az acr create ( $registryName )"
az acr create -n $registryName -g $resourceGroup --sku standard --admin-enabled true
# レジストリユーザ名/パスワード取得
acr_credential="`az acr credential show --name $registryName -o table | tail -1`"
registryUser="`echo "$acr_credential" | awk '{print $1}'`"
registryPwd="`echo "$acr_credential" | awk '{print $2}'`"
# API Server のビルド
go get github.com/sendgrid/sendgrid-go
GOOS=linux GOARCH=amd64 go build -o apiserver
# go のビルド環境がない場合
# docker run --rm -v `pwd`:/tmp/work -w /tmp/work -i golang <<_EO_GOBUILD_
# go get github.com/Azure/azure-storage-blob-go/azblob
# GOOS=linux GOARCH=amd64 go build -o apiserver
#_EO_GOBUILD_
chmod 755 apiserver
# docker イメージを作成してACRにプッシュしておく(内容は Dockerfile 参照)
az acr build --registry $registryName --image ${apiServerImage} -f apiserver_Dockerfile .
# VMセットアップ用スクリプトに環境変数を埋め込み
cat 2_vm_setup.tmpl \
| sed "s/##storageAccountName##/${storageAccountName}/g" \
| sed "s~##storageAccountKey##~${storageAccountKey}~g" \
| sed "s/##registryName##/${registryName}/g" \
| sed "s/##registryUser##/${registryUser}/g" \
| sed "s~##registryPwd##~${registryPwd}~g" \
| sed "s/##apiServerImage##/${apiServerImage}/g" > 2_vm_setup.sh
chmod 755 2_vm_setup.sh
# WebAPIサーバ用のNSGルール(http)
echo "az network nsg rule create(http)"
az network nsg rule create \
--resource-group $resourceGroup --nsg-name $nsgName --name ${nsgPubRuleName}Http \
--access Allow --protocol Tcp --direction Inbound --priority 120 \
--source-address-prefix Internet --source-port-range "*" --destination-port-range "80"
# WebAPIサーバ用のNSGルール(https)
echo "az network nsg rule create(https)"
az network nsg rule create \
--resource-group $resourceGroup --nsg-name $nsgName --name ${nsgPubRuleName}Https \
--access Allow --protocol Tcp --direction Inbound --priority 130 \
--source-address-prefix Internet --source-port-range "*" --destination-port-range "443"
# SSH接続用のNSGルール
echo "az network nsg rule create(ssh)"
az network nsg rule create \
--resource-group $resourceGroup --nsg-name $nsgName --name ${nsgPubRuleName}Ssh \
--access Allow --protocol Tcp --direction Inbound --priority 1000 \
--source-address-prefix Internet --source-port-range "*" --destination-port-range "22"
# 仮想ネットワーク 及び サブネット作成
echo "az network vnet create"
az network vnet create \
--name $vnetName --resource-group $resourceGroup \
--address-prefixes $vnetPrefix --network-security-group $nsgName \
--subnet-name $vmSubnetName --subnet-prefixes $vmSubnetPrefix
# 仮想マシンの作成
rm -rf ~/.ssh/id_rsa
rm -rf ~/.ssh/id_rsa.pub
echo "az vm create"
az vm create \
--resource-group $resourceGroup --name $vmName --image $vmImage --generate-ssh-keys \
--vnet-name $vnetName --subnet $vmSubnetName \
--private-ip-address $vmIpAddress --admin-username $vmUser \
--public-ip-address-dns-name `echo $vmName | tr '[A-Z]' '[a-z]'` \
--custom-data 2_vm_setup.sh
# VMポート開放
#echo "az vm open-port(8086)"
#az vm open-port --resource-group $resourceGroup --name $vmName --port 8086 --priority 920
echo "az vm open-port(80)"
az vm open-port --resource-group $resourceGroup --name $vmName --port 80 --priority 930
echo "az vm open-port(443)"
az vm open-port --resource-group $resourceGroup --name $vmName --port 443 --priority 940
# 生成されたSSH鍵を退避しておく
mkdir -p pem
if [ -e ~/.ssh/id_rsa ]; then
mv ~/.ssh/id_rsa ./pem/id_rsa_${vmName}
mv ~/.ssh/id_rsa.pub ./pem/id_rsa_${vmName}.pub
fi
lowerVmName=`echo $vmName | tr [A-Z] [a-z]`
echo "WebAPI Server: https://${lowerVmName}.${region}.cloudapp.azure.com"
fi
# リソース削除
if [ "$1" == "--delete" ]; then
echo "az group delete"
az group delete --name $resourceGroup
fi
}}
#html(</div>)
// END tabs1-2
// START tabs1-3
#html(<div id="tabs1-3">)
VMのセットアップ処理。
※nginx、API Server コンテナの作成 及び 起動。
#mycode2(){{
#!/bin/bash
#--------------------------------------
# create work directory and cd.
#--------------------------------------
workdir=/tmp/docker_container
apiLogDir=${workdir}/logs/apiserver
nginxLogDir=${workdir}/logs/nginx
mkdir -p $workdir
mkdir -p $apiLogDir
mkdir -p $nginxLogDir
cd $workdir
#--------------------------------------
# install docker and docker compose.
#--------------------------------------
apt-get update
apt-get install -y apt-transport-https ca-certificates curl gnupg-agent software-properties-common
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add -
add-apt-repository \
"deb [arch=amd64] https://download.docker.com/linux/ubuntu \
$(lsb_release -cs) \
stable"
apt-get update
apt-get install -y docker-ce docker-ce-cli containerd.io
curl -L "https://github.com/docker/compose/releases/download/1.26.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
chmod +x /usr/local/bin/docker-compose
#--------------------------------------
# set environments.
#--------------------------------------
# web server url
region=japanwest
hostname=`hostname | tr [A-Z] [a-z]`
server_name="${hostname}.${region}.cloudapp.azure.com"
#--------------------------------------
# wait until azure dns name is available.
# (If it is NG after waiting 5 minutes, give up..)
#--------------------------------------
for i in `seq 30`; do
x=`host $server_name`
if [ "$?" == "0" ]; then
break
fi
sleep 10
done
#--------------------------------------
# get ssl certificate.
#--------------------------------------
sudo docker run --rm -p 80:80 \
-v /etc/letsencrypt:/etc/letsencrypt \
-v /etc/letsencrypt/logs:/var/log/letsencrypt \
certbot/certbot certonly --standalone \
-d $server_name \
--register-unsafely-without-email \
--non-interactive --agree-tos \
--force-renewal \
--renew-by-default \
--preferred-challenges http
#--------------------------------------
# create default.conf for nginx.
#--------------------------------------
cat <<_EO_DEFAULT_CONF_>default.conf
upstream backend {
server api_server:8000;
}
# http
server{
listen 80;
listen [::]:80;
server_name ${server_name};
return 301 https://\$host\$request_uri;
}
# https
server{
listen 443 ssl;
listen [::]:443 ssl;
server_name ${server_name};
ssl_certificate /etc/letsencrypt/live/${server_name}/fullchain.pem;
ssl_certificate_key /etc/letsencrypt/live/${server_name}/privkey.pem;
proxy_set_header Host \$host;
proxy_set_header X-Real-IP \$remote_addr;
proxy_set_header X-Forwarded-Host \$host;
proxy_set_header X-Forwarded-Server \$host;
proxy_set_header X-Forwarded-For \$proxy_add_x_forwarded_for;
location / {
proxy_pass http://backend;
}
}
_EO_DEFAULT_CONF_
#--------------------------------------
# set environment for ACR.
#--------------------------------------
registryName=##registryName##
apiServerImage=##apiServerImage##
registryUser=##registryUser##
registryPwd=##registryPwd##
docker login ${registryName}.azurecr.io -u ${registryUser} -p ${registryPwd}
docker pull ${registryName}.azurecr.io/${apiServerImage}:latest
#--------------------------------------
# create docker-compose.yml
#--------------------------------------
cat <<_EOYML_>docker-compose.yml
version: "3"
services:
api_server:
image: ${registryName}.azurecr.io/${apiServerImage}:latest
hostname: api_server
container_name: api_server
ports:
- "8000:8000"
environment:
HTTPWORKER_PORT: "8000"
STORAGE_KEY_##storageAccountName##: "##storageAccountKey##"
LOG_DIR: "${apiLogDir}"
LOG_PREFIX: "apiserver_"
networks:
mynetwork:
ipv4_address: 172.1.0.6
volumes:
- ${apiLogDir}:${apiLogDir}
restart: always
nginx:
image: nginx:latest
hostname: proxy_container
container_name: proxy_container
ports:
- "80:80"
- "443:443"
volumes:
- ./default.conf:/etc/nginx/conf.d/default.conf
- /etc/letsencrypt:/etc/letsencrypt
- ${nginxLogDir}:/var/log/nginx
networks:
mynetwork:
ipv4_address: 172.1.0.7
restart: always
networks:
mynetwork:
driver: bridge
ipam:
config:
- subnet: 172.1.0.0/24
_EOYML_
#--------------------------------------
# start containers
#--------------------------------------
docker-compose up -d
}}
#html(</div>)
// END tabs1-3
// START tabs1-4
#html(<div id="tabs1-4">)
#mycode2(){{
FROM alpine
COPY apiserver /apiserver
EXPOSE 8000
ENTRYPOINT ["/apiserver"]
}}
#html(</div>)
// END tabs1-4
// START tabs1-5
#html(<div id="tabs1-5">)
#mycode2(){{
package main
import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"net/url"
"os"
//"regexp"
"strings"
"time"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/sendgrid/sendgrid-go"
"github.com/sendgrid/sendgrid-go/helpers/mail"
)
type InvokeResponse struct {
File string
}
type InvokeRequest struct {
Topic string
Subject string
EventType string
EventTime string
Id string
Data map[string]interface{}
DataVersion string
MetadataVersion string
}
/**
* 環境変数の取得.
*/
func getEnv(envName string, defaultValue string) string {
value, exists := os.LookupEnv(envName)
if exists {
return value
} else {
return defaultValue
}
}
func printDebug(format string, params ...interface{}){
setLogfile(false)
msg := fmt.Sprintf(format, params...)
log.Printf("[DEBUG] %s\n", msg)
}
func printInfo(format string, params ...interface{}){
setLogfile(false)
msg := fmt.Sprintf(format, params...)
log.Printf("[INFO] %s\n", msg)
}
func printError(format string, params ...interface{}){
setLogfile(false)
msg := fmt.Sprintf(format, params...)
log.Printf("[ERROR] %s\n", msg)
}
func fileExists(name string) bool {
_, err := os.Stat(name)
return !os.IsNotExist(err)
}
func setLogfile(init bool){
log.SetFlags(log.Ldate|log.Ltime|log.Lmicroseconds|log.Lshortfile)
logDirPath := getEnv("LOG_DIR", "/tmp")
logPrefix := getEnv("LOG_PREFIX", "apiserver_")
logfilePath := fmt.Sprintf("%s/%s%s.log", logDirPath, logPrefix, time.Now().Format("2006-01-02"))
if init || !fileExists(logfilePath) {
newlogfile, err := os.OpenFile(logfilePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
panic("cannnot open " + logfilePath + " : "+ err.Error())
}
log.SetOutput(io.MultiWriter(newlogfile, os.Stdout))
printInfo("switch logfile: %s", logfilePath)
}
}
func init(){
setLogfile(true)
}
func blobTriggerHandler(w http.ResponseWriter, r *http.Request) {
printInfo("START blobTriggerHandler")
var event InvokeRequest
var fileUrl string
defer func(){
err := recover()
if fileUrl == "" {
fileUrl = "unknown"
}
if err != nil {
printError("Failure process %s , event: %v", fileUrl, event)
} else {
printInfo("Success process %s", fileUrl)
}
printInfo("END blobTriggerHandler")
}()
//------------------------------------------------
// リクエストデータのパース
//------------------------------------------------
var invokeReq []InvokeRequest
d := json.NewDecoder(r.Body)
decodeErr := d.Decode(&invokeReq)
if decodeErr != nil {
http.Error(w, decodeErr.Error(), http.StatusBadRequest)
return
}
// イベントデータの取得
event = invokeReq[0]
// エンドポイントの検証用リクエストの時は validationCode を返して終了(同期ハンドシェイク)
if event.EventType == "Microsoft.EventGrid.SubscriptionValidationEvent" {
fileUrl = "SubscriptionValidationEvent"
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(fmt.Sprintf("{\"validationResponse\": \"%s\"}", event.Data["validationCode"])))
return
}
//------------------------------------------------
// ○○処理.
//------------------------------------------------
fileUrl = fmt.Sprintf("%s", event.Data["url"])
sampleProc(fileUrl)
// コンテナ毎に処理を切り替える場合
//if regexp.MustCompile(`container1/.+`).MatchString(fileUrl) {
// sampleProc1(fileUrl)
//} else if regexp.MustCompile(`container2/.+`).MatchString(fileUrl) {
// sampleProc2(fileUrl)
//}
//------------------------------------------------
// レスポンス生成
//------------------------------------------------
invokeResponse := InvokeResponse{File: fileUrl}
js, err := json.Marshal(invokeResponse)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(js)
printInfo("END blobTriggerHandler")
}
/**
* ○○処理.
*/
func sampleProc(fileUrl string) {
// 対象のファイルURLをドメイン+コンテナ部分とファイル名部分に分ける
urlParts := strings.Split(fileUrl, "/")
containerUrlText := fmt.Sprintf("https://%s/%s", urlParts[2], urlParts[3])
rep := strings.NewReplacer(fmt.Sprintf("%s/",containerUrlText), "")
fileName := rep.Replace(fileUrl)
printDebug("fileUrl: %s", fileUrl)
printDebug("containerUrl: %s", containerUrlText)
printDebug("fileName : %s", fileName)
// 対象のファイルURLを取得
accountName := strings.Split(urlParts[2], ".")[0]
keyEnvName := fmt.Sprintf("STORAGE_KEY_%s", accountName)
accountKey := getEnv(keyEnvName, "")
if accountKey == "" {
printError("環境変数(%s)が設定されていません。", keyEnvName)
return
}
ctx := context.Background()
credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
if err != nil {
printError("Get Credential Error: %v", err)
}
p := azblob.NewPipeline(credential, azblob.PipelineOptions{})
cURL, _ := url.Parse(containerUrlText)
containerURL := azblob.NewContainerURL(*cURL, p)
blobURL := containerURL.NewBlobURL(fileName)
// サイズを取得
var blobSize int64 = 1024
blobPropResponse, err := blobURL.GetProperties(ctx, azblob.BlobAccessConditions{})
if err != nil {
printError("Get Properties Error: %v", err)
return
} else {
blobSize = blobPropResponse.ContentLength()
}
// ファイル内容をバッファに取得
downloadedData := make([]byte, blobSize)
err = azblob.DownloadBlobToBuffer(ctx, blobURL, 0, azblob.CountToEnd, downloadedData, azblob.DownloadFromBlobOptions{})
if err != nil {
printError("Download Error: %v", err)
} else {
// ファイル内容を表示
printInfo("Download Success %s", fileUrl)
printInfo(string(downloadedData))
}
}
/**
* メイン.
*/
func main() {
httpInvokerPort, exists := os.LookupEnv("HTTPWORKER_PORT")
if ! exists {
httpInvokerPort = "8000"
}
printInfo("HTTPWORKER_PORT: " + httpInvokerPort)
mux := http.NewServeMux()
mux.HandleFunc("/", blobTriggerHandler)
printInfo("Go server Listening...on httpInvokerPort: %s", httpInvokerPort)
log.Fatal(http.ListenAndServe(":"+httpInvokerPort, mux))
}
}}
#html(</div>)
// END tabs1-5
#html(</div>)
// END tabs1
#html(<script>$(function() { $("#tabs1").tabs(); });</script>)
** リソース作成の実行 [#n8627ad1]
#myterm2(){{
./1_create_resources.sh --create
}}
#html(</div>)
* フックの設定 [#ccd81ffc]
#html(<div class="pl10">)
#TODO
#html(</div>)
* 動作確認 [#x46aacb8]
#html(<div class="pl10">)
#TODO
#html(</div>)
* 補足 [#wed3d9b2]
#html(<div class="pl10">)
* 解説 [#bb978839]
** ソース説明 [#qffdbfb0]
#html(<div class="pl10">)
#TODO
#html(</div>)
** 再試行のスケジュールと期間 [#t37dc1a5]
#html(<div class="pl10">)
https://docs.microsoft.com/ja-jp/azure/event-grid/delivery-and-retry#retry-schedule-and-duration
#html(</div>)
#html(</div>)
* TODO [#e920c52a]
#html(<div class="pl10">)
- APIサーバのログ出力 及び docker volume のマウント
- Application insight への連携(実現可否の調査)
- NSG で EventGrid 以外を拒否
- ストレージアカウントの料金
#html(</div>)