概要 †Azure のストレージキューのデータを読み書きする実装について確認する。 尚、GoでAzureのBlobファイルを読み書き にも記載したが、azure-sdk-for-go の将来的な機能廃止に備え、ここでは azure-storage-queue-go を使用して実装する。 目次 †
メッセージの取り出し方 †Dequeueによるメッセージ取り出し †
Dequeueによるメッセージ取り出しイメージ Peek によるメッセージ取り出し †
Peekによるメッセージ取り出しイメージ 実装サンプル †package main import ( "context" "fmt" "log" "net/url" "os" "time" "github.com/Azure/azure-storage-queue-go/azqueue" ) /** * 環境変数の取得. */ func getEnv(envName string, defaultValue string) string { value, exists := os.LookupEnv(envName) if exists { return value } else { return defaultValue } } /** * アカウント情報の取得 */ func getQueueInfo() (string, string, string) { // デフォルトはいったんローカルエミュレータ用のアカウント名、キー // https://docs.microsoft.com/ja-jp/azure/storage/common/storage-use-emulator#connect-to-the-emulator-account-using-the-well-known-account-name-and-key accountName := getEnv("ACCOUNT_NAME" , "devstoreaccount1") accountKey := getEnv("ACCOUNT_KEY" , "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==") queueName := getEnv("ACCOUNT_QUEUE", "webjobs-blobtrigger-poison") return accountName, accountKey, queueName } func getMessageCount(ctx context.Context, queueUrl azqueue.QueueURL) int32 { props, err := queueUrl.GetProperties(ctx) if err != nil { errorType := err.(azqueue.StorageError).ServiceCode() if (errorType == azqueue.ServiceCodeQueueNotFound) { log.Print("Queue does not exist") } else { log.Fatal("get queue properties error.", err) } } count := props.ApproximateMessagesCount() return count } func main(){ accountName, accountKey, queueName := getQueueInfo() // ローカルエミュレータへの接続の時はURLフォーマットを変える queueUrlFormat := "https://%s.queue.core.windows.net/%s" if accountName == "devstoreaccount1" { queueUrlFormat = "http://127.0.0.1:10001/%s/%s" } u, _ := url.Parse(fmt.Sprintf(queueUrlFormat, accountName, queueName)) credential, err := azqueue.NewSharedKeyCredential(accountName, accountKey) if err != nil { log.Fatal(err) } queueUrl := azqueue.NewQueueURL(*u, azqueue.NewPipeline(credential, azqueue.PipelineOptions{})) log.Printf("%#v", queueUrl) ctx := context.TODO() msgUrl := queueUrl.NewMessagesURL() // メッセージの追加 for i := 0; i < 10; i++ { messageText := fmt.Sprintf("Sample Message%d %v", i, time.Now().Format(time.RFC3339)) _, err := msgUrl.Enqueue(ctx, messageText, 0, 0) if err != nil { log.Fatal("Error Enqueue: ", err) } } // 属性から件数を取得 count := getMessageCount(ctx, queueUrl) log.Printf("message count(prop): %d", count) // メッセージ一覧の取得 maxMessages := int32(32) // 最大件数 visibilityTimeout := time.Second * 10 // 可視化タイムアウト dequeueResp, err := msgUrl.Dequeue(ctx, maxMessages, visibilityTimeout) if err != nil { log.Fatal("Error dequeue message: ", err) } else { log.Printf("message count(dequeue): %v", dequeueResp.NumMessages()) for i := int32(0); i < dequeueResp.NumMessages(); i++ { // 偶数番目のメッセージを削除 if i % 2 == 0 { msg := dequeueResp.Message(i) msgIdUrl := msgUrl.NewMessageIDURL(msg.ID) _, err = msgIdUrl.Delete(ctx, msg.PopReceipt) if err != nil { log.Fatal("Error delete message: ", err) } else { log.Print("Success delete message: ", msg.Text) } } } } // 属性から件数を取得 (属性から得られる件数は可視化タイムアウトに影響されない) count = getMessageCount(ctx, queueUrl) log.Printf("message count(prop): %d", count) // 処理直後のメッセージ一覧を取得 peekResponse, err := msgUrl.Peek(ctx, 32) if err != nil { log.Fatal("Error peeking queue messages: ", err) } else { log.Printf("message count(peek): %v", peekResponse.NumMessages()) for i := int32(0); i < peekResponse.NumMessages(); i++ { msg := peekResponse.Message(i) log.Printf("message(%d): %s", i, string(msg.Text)) } } log.Printf("sleep %v\n", visibilityTimeout) time.Sleep(visibilityTimeout) // 可視化タイムアウトの秒数が経過後に再度メッセージ一覧を取得 peekResponse, err = msgUrl.Peek(ctx, 32) if err != nil { log.Fatal("Error peeking queue messages: ", err) } else { log.Printf("message count(peek): %v", peekResponse.NumMessages()) for i := int32(0); i < peekResponse.NumMessages(); i++ { msg := peekResponse.Message(i) log.Printf("message(%d): %s", i, string(msg.Text)) } } } |