- 追加された行はこの色です。
- 削除された行はこの色です。
#author("2020-11-05T05:50:24+00:00","","")
#author("2020-11-05T07:50:28+00:00","","")
#mynavi(Azureメモ);
#setlinebreak(on);
#html(){{
<style>
.images img {
border: 1px solid #333;
margin: 0 0 20px 0 !important;
}
.img_margin {
margin: 0 !important;
}
</style>
}}
* 概要 [#pd7305e0]
#html(<div class="pl10">)
Azure Storage Queue のメッセージを他のキューにコピーする
Queueトリガーを使用してメッセージを他のキューにコピーする方法について記載する。
[イメージ]
#html(<div class="images">)
元キューにメッセージが追加されたら Queue トリガーで 関数アプリを起動して 他の2つのキューにメッセージをコピーする。
#ref(image1.png,nolink);
#html(</div>)
*contents [#c50b62ca]
#html(<div class="images">)
何らかの要因で関数アプリが動作していなかった場合を考慮して、HTTPトリガーからも起動できるようにしておく。
こちらは元キューに残っていいる全てのメッセージを取得して、外の2つのキューにコピーする。(ただし当サンプルでは最大32メッセージまで)
#ref(image2.png,nolink);
#html(</div>)
[注意]
・Queueトリガーで Functions を起動した場合、正常終了時には元キューの対象メッセージは自動的に削除される。
#html(</div>)
* 目次 [#q6a1cf43]
#contents
- 関連
-- [[Azureメモ]]
-- [[Azure Blobトリガーで起動される関数をリトライで再利用する]]
-- [[GoでAzureのストレージキューの読み書き]]
* 作成するファイル [#q3c136ab]
#html(<div class="pl10">)
#html(){{
<div style="padding: 0px 10px 10px 10px; border: 1px solid #333;">
<pre style="display: inline-block; margin: 0; padding-right: 10px; font-size: 1rem; background: transparent; border: 0px; vertical-align: top;">
.
├─ 0_env.sh
├─ 1_resources.sh
├─ 2_put_message.sh
├─ X1_local_start.sh
├─ X2_local_put_message.sh
├─ X3_local_relay_all.sh
├─ functions
│   ├─ server.go
│   ├─ host.json
│   └─ local.settings.json
│   ├─ RelayQueue
│   │   └─ function.json
│   ├─ RelayQueueAll
│   │   └─ function.json
</pre>
<pre style="display: inline-block; margin: 0; font-size: 1rem; background: transparent; border: 0px; vertical-align: top;">
... リソース名の定義など
... リソース作成用のシェル
... キューへのメッセージ出力用のシェル
... (ローカル確認用) 関数アプリ起動用シェル
... (ローカル確認用) ストレージエミュレータへのメッセージ出力用シェル
... (ローカル確認用) キューに溜まっている全メッセージコピー用のシェル
... 関数ソース(Goによるカスタムハンドラー)
... 関数アプリの定義ファイル
... ローカル実行用の設定ファイル
... 関数定義の定義ファイル(Queueトリガー定義)
... 関数定義の定義ファイル(HTTPトリガー定義)
</pre>
</div>
}}
#html(</div>)
* 関数ソース(functions配下) [#d2a8dd5e]
#html(<div class="pl10">)
#html(){{
<div id="tabs1">
<ul>
<li><a href="#tabs1-1">host.json</a></li>
<li><a href="#tabs1-2">local.settings.json</a></li>
<li><a href="#tabs1-3">server.go</a></li>
<li><a href="#tabs1-4">RelayQueue/function.json</a></li>
<li><a href="#tabs1-5">RelayQueueAll/function.json</a></li>
</ul>
}}
#html(<div id="tabs1-1">)
host.json
#mycode2(){{
{
"version": "2.0",
"httpWorker": {
"description": {
"defaultExecutablePath": "server.exe"
}
},
"extensions": {
"queues": {
"maxPollingInterval": "00:00:10",
"visibilityTimeout" : "00:00:00",
"batchSize": 16,
"maxDequeueCount": 5,
"newBatchThreshold": 8
}
},
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[1.*, 2.0.0)"
}
}
}}
#html(</div>)
#html(<div id="tabs1-2">)
local.settings.json
#mycode2(){{
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"ACCOUNT_NAME": "devstoreaccount1",
"ACCOUNT_KEY": "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==",
"BASE_QUEUE": "my-base-queue",
"RELAY_QUEUE1": "my-queue1",
"RELAY_QUEUE2": "my-queue2"
}
}
}}
※アカウントキーはローカルエミュレータ(Azurite)用のもの。
#html(</div>)
#html(<div id="tabs1-3">)
server.go
#mycode2(){{
package main
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"log"
"net/http"
"net/url"
"os"
"strings"
"time"
"github.com/Azure/azure-storage-queue-go/azqueue"
)
type ReturnValue struct {
Data string
}
type InvokeResponse struct {
Outputs map[string]interface{}
Logs []string
ReturnValue interface{}
}
type InvokeRequest struct {
Data map[string]interface{}
Metadata map[string]interface{}
}
func printDebug(format string, params ...interface{}){
log.SetOutput(os.Stdout)
msg := fmt.Sprintf(format, params...)
log.Printf("[DEBUG] %s\n", msg)
}
func printInfo(format string, params ...interface{}){
log.SetOutput(os.Stdout)
msg := fmt.Sprintf(format, params...)
log.Printf("[INFO] %s\n", msg)
}
func printError(format string, params ...interface{}){
log.SetOutput(os.Stderr)
msg := fmt.Sprintf(format, params...)
log.Printf("[ERROR] %s\n", msg)
log.SetOutput(os.Stdout)
}
func init(){
log.SetOutput(os.Stdout)
log.SetFlags(0)
}
func isLocalAccount(accountName string) bool {
return accountName == "devstoreaccount1"
}
func getQueueUrlFormat(accountName string) string {
queueUrlFormat := "https://%s.queue.core.windows.net/%s"
if isLocalAccount(accountName) {
queueUrlFormat = "http://127.0.0.1:10001/%s/%s"
}
return queueUrlFormat
}
/**
* 環境変数の取得.
*/
func getEnv(envName string, defaultValue string) string {
value, exists := os.LookupEnv(envName)
if exists {
return value
} else {
return defaultValue
}
}
/**
* キューアカウント情報の取得
*/
func getRelayQueueInfo() (string, string, string, []string) {
accountName := getEnv("ACCOUNT_NAME", "")
accountKey := getEnv("ACCOUNT_KEY" , "")
queueName0 := getEnv("BASE_QUEUE" , "")
queueName1 := getEnv("RELAY_QUEUE1" , "")
queueName2 := getEnv("RELAY_QUEUE2" , "")
return accountName, accountKey, queueName0, []string{queueName1, queueName2}
}
/**
* 過剰なエスケープを除去する.
*/
func removeExcessEscape(queueItem string) string {
queueItem = strings.TrimRight(queueItem, "\"")
queueItem = strings.TrimLeft(queueItem, "\"")
queueItem = strings.Replace(queueItem, "\\r\\n", "\n", -1)
queueItem = strings.Replace(queueItem, "\\n", "\n", -1)
queueItem = strings.Replace(queueItem, "\\\"", "\"", -1)
queueItem = strings.Replace(queueItem, "\\\\\"", "", -1)
return queueItem
}
/**
* 指定されたキューにメッセージを出力する.
*/
func relayQueueMessage(accountName string, accountKey string, queueName string, queueItem string){
queueUrlFormat := getQueueUrlFormat(accountName)
u, _ := url.Parse(fmt.Sprintf(queueUrlFormat, accountName, queueName))
credential, err := azqueue.NewSharedKeyCredential(accountName, accountKey)
if err != nil {
log.Fatal(err)
}
queueUrl := azqueue.NewQueueURL(*u, azqueue.NewPipeline(credential, azqueue.PipelineOptions{}))
ctx := context.TODO()
msgUrl := queueUrl.NewMessagesURL()
_, err = msgUrl.Enqueue(ctx, queueItem, 0, 0)
if err != nil {
log.Fatal("Error Enqueue: ", err)
printInfo("Error relay queue. (%s), error: %v", queueName, err)
} else {
printInfo("Success relay queue. (%s)", queueName)
}
}
/**
* キューメッセージのコピー.
*/
func relayQueueHandler(w http.ResponseWriter, r *http.Request) {
printInfo("[relayQueueHandler] START")
outputs := make(map[string]interface{})
var invokeReq InvokeRequest
d := json.NewDecoder(r.Body)
decodeErr := d.Decode(&invokeReq)
if decodeErr != nil {
http.Error(w, decodeErr.Error(), http.StatusBadRequest)
return
}
// 何回目の試行か
dequeueCount := fmt.Sprintf("%v",invokeReq.Metadata["DequeueCount"])
printInfo("[relayQueueHandler] dequeueCount: %v", dequeueCount)
printDebug("[relayQueueHandler] invokeReq: %v", invokeReq)
printDebug("[relayQueueHandler] queue metadata: %v", invokeReq.Metadata)
// キューメッセージを取得
queueItem := invokeReq.Data["queueItem"].(string)
// 見やすいように過剰エスケープを調整
queueItem = removeExcessEscape(queueItem)
printInfo("[relayQueueHandler] queue message(Value): %v", queueItem)
// キューメッセージをコピー
outputs["relayqueue1"] = queueItem
outputs["relayqueue2"] = queueItem
// 正常終了
invokeResponse := InvokeResponse{Outputs: outputs, Logs: []string{} }
js, err := json.Marshal(invokeResponse)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(js)
//http.Error(w, "process relayQueueHandler.", http.StatusInternalServerError)
printInfo("[relayQueueHandler] END")
}
/**
* キューに残っているメッセージをすべて処理する.
*/
func relayQueueAllHandler(w http.ResponseWriter, r *http.Request) {
printInfo("[relayQueueAllHandler] START")
// キューアカウント情報の取得
accountName, accountKey, baseQueue, queueNames := getRelayQueueInfo()
// キューURLの取得
credential, err := azqueue.NewSharedKeyCredential(accountName, accountKey)
if err != nil {
panic(fmt.Sprintf("NewSharedKeyCredential error: %v", err))
}
queueUrlFormat := getQueueUrlFormat(accountName)
u, _ := url.Parse(fmt.Sprintf(queueUrlFormat, accountName, baseQueue))
queueUrl := azqueue.NewQueueURL(*u, azqueue.NewPipeline(credential, azqueue.PipelineOptions{}))
// メッセージ一覧を取得
queueCtx := context.TODO()
maxMessages := int32(32) // 最大件数
visibilityTimeout := time.Second * 10 // 可視性タイムアウト
msgUrl := queueUrl.NewMessagesURL()
dequeueResp, err := msgUrl.Dequeue(queueCtx, maxMessages, visibilityTimeout)
if err != nil {
panic(err)
} else {
// 残っているメッセージを全て処理
for i := int32(0); i < dequeueResp.NumMessages(); i++ {
// リレー用のキューにコピー
msg := dequeueResp.Message(i)
messageBytes, _ := base64.StdEncoding.DecodeString(msg.Text)
queueItem := removeExcessEscape(string(messageBytes))
for _, queueName := range queueNames {
relayQueueMessage(accountName, accountKey, queueName, queueItem)
}
// 元キューからメッセージを削除
msgIdUrl := msgUrl.NewMessageIDURL(msg.ID)
_, err = msgIdUrl.Delete(queueCtx, msg.PopReceipt)
if err != nil {
printError("Error delete message. %s, %v", queueItem, err)
} else {
printInfo("Success delete message %s", queueItem)
}
}
}
w.Header().Set("Content-Type", "application/json")
w.Write([]byte("{\"message\": \"relayQueueAll\"}"))
printInfo("[relayQueueAllHandler] END")
}
/**
* メイン処理.
*/
func main() {
httpInvokerPort, exists := os.LookupEnv("FUNCTIONS_HTTPWORKER_PORT")
if exists {
printInfo("FUNCTIONS_HTTPWORKER_PORT: " + httpInvokerPort)
}
mux := http.NewServeMux()
mux.HandleFunc("/RelayQueue", relayQueueHandler)
mux.HandleFunc("/RelayQueueAll", relayQueueAllHandler)
log.Println("Go server Listening...on httpInvokerPort:", httpInvokerPort)
log.Fatal(http.ListenAndServe(":"+httpInvokerPort, mux))
}
}}
#html(</div>)
#html(<div id="tabs1-4">)
RelayQueue/function.json
#mycode2(){{
{
"bindings": [
{
"name": "queueItem",
"type": "queueTrigger",
"direction": "in",
"queueName": "my-base-queue",
"connection": "AzureWebJobsStorage"
},
{
"name": "relayqueue1",
"type": "queue",
"direction": "out",
"queueName": "my-queue1",
"connection": "AzureWebJobsStorage"
},
{
"name": "relayqueue2",
"type": "queue",
"direction": "out",
"queueName": "my-queue2",
"connection": "AzureWebJobsStorage"
}
]
}
}}
#html(</div>)
#html(<div id="tabs1-5">)
RelayQueueAll/function.json
#mycode2(){{
{
"bindings": [
{
"type": "httpTrigger",
"direction": "in",
"name": "req",
"methods": [
"get",
"post"
]
},
{
"type": "http",
"direction": "out",
"name": "res"
}
]
}
}}
#html(</div>)
#html(</div>)
// END tabs1
#html(<script>$(function() { $("#tabs1").tabs(); });</script>)
#html(</div>)
* デプロイ/動作確認用シェル [#zaed8683]
#html(<div class="pl10">)
// START tabs2
#html(){{
<div id="tabs2">
<ul>
<li><a href="#tabs2-1">0_env.sh</a></li>
<li><a href="#tabs2-2">1_resources.sh</a></li>
<li><a href="#tabs2-3">2_put_message.sh</a></li>
<li><a href="#tabs2-4">X1_local_start.sh</a></li>
<li><a href="#tabs2-5">X2_local_put_message.sh</a></li>
<li><a href="#tabs2-6">X3_local_relay_all.sh</a></li>
</ul>
}}
#html(<div id="tabs2-1">)
0_env.sh
#mycode2(){{
#!/bin/bash
# 全てのリソース名に付与する接頭文字 (Storageアカウント名などは世界でユニークな必要があるので他ユーザと被らないような名前を付ける)
PREFIX=xxxxxxxxxx
# サブスクリプションID (Application Insight を使用しない場合は空でも可)
subscriptionId=xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
# リージョン
region=japanwest
insightsRegion=japaneast # 2020/7時点では Appplication Insights で西日本(japanwest)は使用できない
# リソースグループ名
resourceGroup=${PREFIX}ResourceGroup
# ストレージアカウント名
storageAccountName=${PREFIX}straccount
storageSku=Standard_LRS
# Storageキュー名
storageQueue0=my-base-queue
storageQueue1=my-queue1
storageQueue2=my-queue2
# 関数アプリ名
funcAppName1=${PREFIX}RelayFunc
funcAppName2=${PREFIX}Queue1Func
# 使用するFunctionsのバージョン
funcVersion=2
# 関数アプリのプラン名
funcPlanName=${PREFIX}plan
#funcPlanSku=EP1
funcPlanSku=B1
# Application insights
insightsName=${PREFIX}Insights
insightsDays=30
insightsSetting=""
if [ "$subscriptionId" != "" ]; then
insightsSetting="--app-insights $insightsName"
fi
}}
#html(</div>)
#html(<div id="tabs2-2">)
1_resources.sh
#mycode2(){{
#!/bin/bash
source ./0_env.sh
# リソースの作成
if [ "$1" == "--create" ]; then
#-------------------------------
# リソースグループの作成
#-------------------------------
echo az group create
az group create \
--name $resourceGroup \
--location $region
#-------------------------------
# ストレージアカウントの作成
#-------------------------------
echo az storage account create
az storage account create \
--name $storageAccountName \
--location $region \
--resource-group $resourceGroup \
--sku $storageSku
# ストレージアカウントキーを取得
storageAccountKey=`az storage account keys list -n $storageAccountName -g $resourceGroup -o table | grep key1 | awk '{print $NF}'`
#-------------------------------
# Storageキューの作成
#-------------------------------
echo "az storage queue create"
az storage queue create --name $storageQueue0 --account-name $storageAccountName --account-key $storageAccountKey
az storage queue create --name $storageQueue1 --account-name $storageAccountName --account-key $storageAccountKey
az storage queue create --name $storageQueue2 --account-name $storageAccountName --account-key $storageAccountKey
#-------------------------------
# Application Insights 拡張が利用できない場合は追加インストール
#-------------------------------
x=`az monitor app-insights --help 2>&1`
if [ "$?" != "0" ]; then
az extension add -n application-insights
fi
#-------------------------------
# Application Insights コンポーネント作成
#-------------------------------
echo az monitor app-insights component create
if [ "$subscriptionId" != "" ]; then
az monitor app-insights component create \
--app $insightsName \
--location $insightsRegion \
--resource-group $resourceGroup \
--query-access Enabled \
--retention-time $insightsDays \
--subscription $subscriptionId
fi
#-------------------------------
# キューリレー用の関数
#-------------------------------
# 関数プランの作成
echo az functionapp plan create
az functionapp plan create \
--name $funcPlanName \
--resource-group $resourceGroup \
--location $region \
--sku $funcPlanSku
# 関数アプリの作成
echo az functionapp create
az functionapp create \
--name $funcAppName1 \
--storage-account $storageAccountName \
--plan $funcPlanName \
--resource-group $resourceGroup \
--functions-version $funcVersion $insightsSetting
# 関数アプリの環境変数の設定
echo "az functionapp config appsettings"
az functionapp config appsettings set \
--name $funcAppName1 \
--resource-group $resourceGroup \
--settings "ACCOUNT_NAME=$storageAccountName" \
"ACCOUNT_KEY=$storageAccountKey" \
"BASE_QUEUE=$storageQueue0"
"RELAY_QUEUE1=$storageQueue1" \
"RELAY_QUEUE2=$storageQueue2"
# 関数アプリのデプロイ
# すぐにデプロイするとエラー(Timeout)になる場合がある為、少し待つ
echo "sleep 10 seconds..."
sleep 10
cd functions
exefile=`cat host.json | grep defaultExecutablePath | awk '{print $2}' | sed 's/"//g'`
rm -rf $exefile
GOOS=windows GOARCH=amd64 go build -o $exefile
zip -r ../functions.zip *
cd ../
az functionapp deployment source config-zip -g $resourceGroup -n $funcAppName1 --src functions.zip
rm -rf functions.zip
fi
# リソースの削除
if [ "$1" == "--delete" ]; then
az group delete --name $resourceGroup
fi
}}
#html(</div>)
#html(<div id="tabs2-3">)
2_put_message.sh
#mycode2(){{
#!/bin/bash
source ./0_env.sh
date_text=`date "+%Y-%m-%d %H:%M:%S"`
message=`cat << _EOF_
{"message": "$date_text"}
_EOF_
`
base64_message=`echo $message | base64`
# ストレージアカウントキーを取得
storageAccountKey=`az storage account keys list -n $storageAccountName -g $resourceGroup -o table | grep key1 | awk '{print $NF}'`
# キューにメッセージをPUT
az storage message put \
--account-name $storageAccountName \
--account-key "$storageAccountKey" \
--queue-name $storageQueue0 \
--content "$base64_message"
}}
#html(</div>)
#html(<div id="tabs2-4">)
X1_local_start.sh
#mycode2(){{
#!/bin/bash
source ./0_env.sh
# ローカルのストレージエミュレータ起動
mkdir -p local_azurite
azurite --silent --location `pwd`/local_azurite &
# ストレージキュー作成
export AZURE_STORAGE_CONNECTION_STRING="UseDevelopmentStorage=true"
az storage queue create --name $storageQueue0
az storage queue create --name $storageQueue1
az storage queue create --name $storageQueue2
# 関数アプリの起動
cd ./functions
exefile=`cat host.json | grep defaultExecutablePath | awk '{print $2}' | sed 's/"//g'`
go build -o $exefile
func start
rm -rf $exefile
export AZURE_STORAGE_CONNECTION_STRING=
cd ../
}}
#html(</div>)
#html(<div id="tabs2-5">)
X2_local_put_message.sh
#mycode2(){{
#!/bin/bash
source ./0_env.sh
date_text=`date "+%Y-%m-%d %H:%M:%S"`
message=`cat << _EOF_
{"message": "$date_text"}
_EOF_
`
base64_message=`echo $message | base64`
export AZURE_STORAGE_CONNECTION_STRING="UseDevelopmentStorage=true"
az storage message put \
--queue-name $storageQueue0 \
--content "$base64_message"
export AZURE_STORAGE_CONNECTION_STRING=
}}
#html(</div>)
#html(<div id="tabs2-6">)
X3_local_relay_all.sh
#mycode2(){{
#!/bin/bash
curl -H "Content-Type: application/json" -d "{}" http://localhost:7071/api/RelayQueueAll
}}
#html(</div>)
#html(</div>)
// END tabs2
#html(<script>$(function() { $("#tabs2").tabs(); });</script>)
#html(</div>)
* 動作確認(ローカル) [#l6d1d871]
#html(<div class="pl10">)
** ローカルで関数アプリ起動 [#ca017985]
#myterm2(){{
./X1_local_start.sh
}}
** キューにメッセージを出力 [#qb3bf412]
#myterm2(){{
./X2_local_put_message.sh
}}
キュー1 及び キュー2の内容をStorageExplorerで確認
#html(<div class="images">)
#ref(azure_queue_trigger_test1.png,nolink);
#html(</div>)
#html(<div class="images">)
#ref(azure_queue_trigger_test2.png,nolink);
#html(</div>)
#html(</div>)
* 動作確認(Azure) [#e422a399]
#html(<div class="pl10">)
** デプロイ [#u75a0b91]
#myterm2(){{
./1_resources.sh
}}
** キューにメッセージを出力 [#na17bcb3]
#myterm2(){{
./2_put_message.sh
}}
キュー1 及び キュー2の内容をStorageExplorerで確認(ローカルと同じなので省略)
#html(</div>)