目次 †influxdbインストール、起動 †1.8 系の場合、flux-enabled = true にしておく。 influxdb.conf [meta] dir = "/var/lib/influxdb/meta" [data] dir = "/var/lib/influxdb/data" engine = "tsm1" wal-dir = "/var/lib/influxdb/wal" [http] enabled = true flux-enabled = true docker-compose.yml version: "3"
services:
influxdb:
image: influxdb:1.8
hostname: influxdb_for_test
container_name: influxdb_for_test
volumes:
- ./influxdb:/var/lib/influxdb
- ./influxdb.conf:/etc/influxdb/influxdb.conf
- ./work:/tmp/work
ports:
- 8086:8086
起動 docker-compose up -d サンプルDB作成 †docker exec -it influxdb_for_test bash # influx > create database sampledb > create user sample with password 'sample' WITH ALL PRIVILEGES Goクライアント取得 †go get github.com/influxdata/influxdb-client-go Goのソース作成 †参考: https://github.com/influxdata/influxdb-client-go main.go package main
import (
"context"
"fmt"
"math/rand"
"time"
"github.com/influxdata/influxdb-client-go"
)
/**
* 登録
*/
func create(client influxdb2.Client) {
fmt.Printf("Write START\n")
writeAPI := client.WriteAPIBlocking("", "sampledb/autogen")
// サンプル1
for i := 0; i < 5; i++ {
p := influxdb2.NewPoint("sample",
map[string]string{"tag1": "sample1", "tag2": fmt.Sprintf("row%d", i + 1)},
map[string]interface{}{"field1": i+1, "field2": rand.Float64()},
time.Now())
err := writeAPI.WritePoint(context.Background(), p)
if err != nil {
fmt.Printf("Write1 error: %s\n", err.Error())
} else {
fmt.Printf("Write1 success.\n")
}
}
// サンプル2
for i := 0; i < 5; i++ {
p := influxdb2.NewPointWithMeasurement("sample").
AddTag("tag1", "sample2").
AddTag("tag2", fmt.Sprintf("row%d", i+1)).
AddField("field1", i + 1).
AddField("field2", rand.Float64()).
SetTime(time.Now())
err := writeAPI.WritePoint(context.Background(), p)
if err != nil {
fmt.Printf("Write2 error: %s\n", err.Error())
} else {
fmt.Printf("Write2 success.\n")
}
}
fmt.Printf("Write END\n")
}
/**
* 検索.
*/
func search(client influxdb2.Client) {
queryAPI := client.QueryAPI("")
result, err := queryAPI.Query(context.Background(),
`from(bucket:"sampledb")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "sample" and r.tag1 == "sample1")`)
if err == nil {
rowCount := 0
for result.Next() {
record := result.Record()
values := record.Values()
val := values["_value"]
if result.TableChanged() {
//fmt.Printf("%d列目\n", result.TablePosition() + 1)
if (values["_field"] == "field1") {
rowCount = rowCount + 1
fmt.Printf("%d行目\n", rowCount)
fmt.Printf(" time : %s\n", values["_time"])
fmt.Printf(" tag1 : %s\n", values["tag1"])
fmt.Printf(" tag2 : %s\n", values["tag2"])
}
}
column := values["_field"]
switch val := val.(type){
case int64:
fmt.Printf(" %s : %d\n", column, val)
case float64:
fmt.Printf(" %s : %f\n", column, val)
}
//fmt.Printf("record: %s\n", record)
}
if result.Err() != nil {
fmt.Printf("Query error: %s\n", result.Err().Error())
}
} else {
fmt.Printf("Query error: %s\n", err.Error())
}
}
/**
* メイン処理.
*/
func main() {
hostName := "localhost"
hostPort := 8086
userName := "sample"
password := "sample"
client := influxdb2.NewClient(fmt.Sprintf("http://%s:%d", hostName, hostPort), fmt.Sprintf("%s:%s",userName, password))
create(client)
search(client)
client.Close()
}
補足 †QueryメソッドでリクエストのタイプがFlux固定になっているのは、ちょっとよくわからない (1.8系の場合でも Fluxで問い合わせを行う必要がある)。 influxql もタイプとしては存在しているようだが、少なくとも query.go 内では使用箇所は見つからない。 : queryType := domain.QueryTypeFlux : 動作確認 †go run main.go Write START Write1 success. Write1 success. Write1 success. Write1 success. Write1 success. Write2 success. Write2 success. Write2 success. Write2 success. Write2 success. Write END 1行目 time : 2020-07-26 18:20:32.51875 +0000 UTC tag1 : sample1 tag2 : row1 field1 : 1 field2 : 0.604660 2行目 time : 2020-07-26 18:20:32.533407 +0000 UTC tag1 : sample1 tag2 : row2 field1 : 2 field2 : 0.940509 3行目 time : 2020-07-26 18:20:32.535223 +0000 UTC tag1 : sample1 tag2 : row3 field1 : 3 field2 : 0.664560 4行目 time : 2020-07-26 18:20:32.53716 +0000 UTC tag1 : sample1 tag2 : row4 field1 : 4 field2 : 0.437714 5行目 time : 2020-07-26 18:20:32.539096 +0000 UTC tag1 : sample1 tag2 : row5 field1 : 5 field2 : 0.424637 |