目次 †influxdbインストール、起動 †1.8 系の場合、flux-enabled = true にしておく。 influxdb.conf [meta] dir = "/var/lib/influxdb/meta" [data] dir = "/var/lib/influxdb/data" engine = "tsm1" wal-dir = "/var/lib/influxdb/wal" [http] enabled = true flux-enabled = true docker-compose.yml version: "3" services: influxdb: image: influxdb:1.8 hostname: influxdb_for_test container_name: influxdb_for_test volumes: - ./influxdb:/var/lib/influxdb - ./influxdb.conf:/etc/influxdb/influxdb.conf - ./work:/tmp/work ports: - 8086:8086 起動 docker-compose up -d サンプルDB作成 †docker exec -it influxdb_for_test bash # influx > create database sampledb > create user sample with password 'sample' WITH ALL PRIVILEGES Goクライアント取得 †go get github.com/influxdata/influxdb-client-go Goのソース作成 †参考: https://github.com/influxdata/influxdb-client-go main.go package main import ( "context" "fmt" "math/rand" "time" "github.com/influxdata/influxdb-client-go" ) /** * 登録 */ func create(client influxdb2.Client) { fmt.Printf("Write START\n") writeAPI := client.WriteAPIBlocking("", "sampledb/autogen") // サンプル1 for i := 0; i < 5; i++ { p := influxdb2.NewPoint("sample", map[string]string{"tag1": "sample1", "tag2": fmt.Sprintf("row%d", i + 1)}, map[string]interface{}{"field1": i+1, "field2": rand.Float64()}, time.Now()) err := writeAPI.WritePoint(context.Background(), p) if err != nil { fmt.Printf("Write1 error: %s\n", err.Error()) } else { fmt.Printf("Write1 success.\n") } } // サンプル2 for i := 0; i < 5; i++ { p := influxdb2.NewPointWithMeasurement("sample"). AddTag("tag1", "sample2"). AddTag("tag2", fmt.Sprintf("row%d", i+1)). AddField("field1", i + 1). AddField("field2", rand.Float64()). SetTime(time.Now()) err := writeAPI.WritePoint(context.Background(), p) if err != nil { fmt.Printf("Write2 error: %s\n", err.Error()) } else { fmt.Printf("Write2 success.\n") } } fmt.Printf("Write END\n") } /** * 検索. */ func search(client influxdb2.Client) { queryAPI := client.QueryAPI("") result, err := queryAPI.Query(context.Background(), `from(bucket:"sampledb")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "sample" and r.tag1 == "sample1")`) if err == nil { rowCount := 0 for result.Next() { record := result.Record() values := record.Values() val := values["_value"] if result.TableChanged() { //fmt.Printf("%d列目\n", result.TablePosition() + 1) if (values["_field"] == "field1") { rowCount = rowCount + 1 fmt.Printf("%d行目\n", rowCount) fmt.Printf(" time : %s\n", values["_time"]) fmt.Printf(" tag1 : %s\n", values["tag1"]) fmt.Printf(" tag2 : %s\n", values["tag2"]) } } column := values["_field"] switch val := val.(type){ case int64: fmt.Printf(" %s : %d\n", column, val) case float64: fmt.Printf(" %s : %f\n", column, val) } //fmt.Printf("record: %s\n", record) } if result.Err() != nil { fmt.Printf("Query error: %s\n", result.Err().Error()) } } else { fmt.Printf("Query error: %s\n", err.Error()) } } /** * メイン処理. */ func main() { hostName := "localhost" hostPort := 8086 userName := "sample" password := "sample" client := influxdb2.NewClient(fmt.Sprintf("http://%s:%d", hostName, hostPort), fmt.Sprintf("%s:%s",userName, password)) create(client) search(client) client.Close() } 補足 †QueryメソッドでリクエストのタイプがFlux固定になっているのは、ちょっとよくわからない (1.8系の場合でも Fluxで問い合わせを行う必要がある)。 influxql もタイプとしては存在しているようだが、少なくとも query.go 内では使用箇所は見つからない。 : queryType := domain.QueryTypeFlux : 動作確認 †go run main.go Write START Write1 success. Write1 success. Write1 success. Write1 success. Write1 success. Write2 success. Write2 success. Write2 success. Write2 success. Write2 success. Write END 1行目 time : 2020-07-26 18:20:32.51875 +0000 UTC tag1 : sample1 tag2 : row1 field1 : 1 field2 : 0.604660 2行目 time : 2020-07-26 18:20:32.533407 +0000 UTC tag1 : sample1 tag2 : row2 field1 : 2 field2 : 0.940509 3行目 time : 2020-07-26 18:20:32.535223 +0000 UTC tag1 : sample1 tag2 : row3 field1 : 3 field2 : 0.664560 4行目 time : 2020-07-26 18:20:32.53716 +0000 UTC tag1 : sample1 tag2 : row4 field1 : 4 field2 : 0.437714 5行目 time : 2020-07-26 18:20:32.539096 +0000 UTC tag1 : sample1 tag2 : row5 field1 : 5 field2 : 0.424637 |