目次

influxdbインストール、起動

1.8 系の場合、flux-enabled = true にしておく。

influxdb.conf

[meta]
  dir = "/var/lib/influxdb/meta"

[data]
  dir = "/var/lib/influxdb/data"
  engine = "tsm1"
  wal-dir = "/var/lib/influxdb/wal"

[http]
  enabled = true
  flux-enabled = true

docker-compose.yml

version: "3" 

services:
  influxdb:
    image: influxdb:1.8
    hostname: influxdb_for_test
    container_name: influxdb_for_test
    volumes:
      - ./influxdb:/var/lib/influxdb
      - ./influxdb.conf:/etc/influxdb/influxdb.conf
      - ./work:/tmp/work
    ports:
      - 8086:8086

起動

docker-compose up -d

サンプルDB作成

docker exec -it influxdb_for_test bash
# influx
> create database sampledb
> create user sample with password 'sample' WITH ALL PRIVILEGES

Goクライアント取得

go get github.com/influxdata/influxdb-client-go

Goのソース作成

参考: https://github.com/influxdata/influxdb-client-go

main.go

package main

import (
    "context"
    "fmt"
    "math/rand"
    "time"
    "github.com/influxdata/influxdb-client-go"
)

/**
 * 登録
 */
func create(client influxdb2.Client) {

    fmt.Printf("Write START\n")

    writeAPI := client.WriteAPIBlocking("", "sampledb/autogen")

    // サンプル1
    for i := 0; i < 5; i++ {
        p := influxdb2.NewPoint("sample",
            map[string]string{"tag1": "sample1", "tag2": fmt.Sprintf("row%d", i + 1)},
            map[string]interface{}{"field1": i+1, "field2": rand.Float64()},
            time.Now())
        err := writeAPI.WritePoint(context.Background(), p)
        if err != nil {
            fmt.Printf("Write1 error: %s\n", err.Error())
        } else {
            fmt.Printf("Write1 success.\n")
        }
    }

    // サンプル2
    for i := 0; i < 5; i++ {
        p := influxdb2.NewPointWithMeasurement("sample").
            AddTag("tag1", "sample2").
            AddTag("tag2", fmt.Sprintf("row%d", i+1)).
            AddField("field1", i + 1).
            AddField("field2", rand.Float64()).
            SetTime(time.Now())
        err := writeAPI.WritePoint(context.Background(), p)
        if err != nil {
            fmt.Printf("Write2 error: %s\n", err.Error())
        } else {
            fmt.Printf("Write2 success.\n")
        }   
    }   

    fmt.Printf("Write END\n")
}

/**
 * 検索.
 */
func search(client influxdb2.Client) {
    queryAPI := client.QueryAPI("")
    result, err := queryAPI.Query(context.Background(),
                     `from(bucket:"sampledb")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "sample" and r.tag1 == "sample1")`)
    if err == nil {
        rowCount := 0
        for result.Next() {
            record := result.Record()
            values := record.Values()
            val := values["_value"]
            if result.TableChanged() {
                //fmt.Printf("%d列目\n", result.TablePosition() + 1)
                if (values["_field"] == "field1") {
                    rowCount = rowCount + 1 
                    fmt.Printf("%d行目\n", rowCount)
                    fmt.Printf("  time   : %s\n", values["_time"])
                    fmt.Printf("  tag1   : %s\n", values["tag1"])
                    fmt.Printf("  tag2   : %s\n", values["tag2"])
                }   
            }   
            column := values["_field"]
            switch val := val.(type){
            case int64:
              fmt.Printf("  %s : %d\n", column, val)
            case float64:
              fmt.Printf("  %s : %f\n", column, val)
            }   
            //fmt.Printf("record: %s\n", record)
        }   
        if result.Err() != nil {
            fmt.Printf("Query error: %s\n", result.Err().Error())
        }   
    } else {
        fmt.Printf("Query error: %s\n", err.Error())
    }   
}

/**
 * メイン処理.
 */
func main() {
    hostName := "localhost"
    hostPort := 8086
    userName := "sample"
    password := "sample"

    client := influxdb2.NewClient(fmt.Sprintf("http://%s:%d", hostName, hostPort), fmt.Sprintf("%s:%s",userName, password))

    create(client)
    search(client)

    client.Close()
}

補足

QueryメソッドでリクエストのタイプがFlux固定になっているのは、ちょっとよくわからない (1.8系の場合でも Fluxで問い合わせを行う必要がある)。
https://github.com/influxdata/influxdb-client-go/blob/master/api/query.go#L120

influxql もタイプとしては存在しているようだが、少なくとも query.go 内では使用箇所は見つからない。
https://github.com/influxdata/influxdb-client-go/blob/master/domain/types.gen.go#L256

 :
queryType := domain.QueryTypeFlux
 :

動作確認

go run main.go
Write START
Write1 success.
Write1 success.
Write1 success.
Write1 success.
Write1 success.
Write2 success.
Write2 success.
Write2 success.
Write2 success.
Write2 success.
Write END
1行目
  time   : 2020-07-26 18:20:32.51875 +0000 UTC
  tag1   : sample1
  tag2   : row1
  field1 : 1
  field2 : 0.604660
2行目
  time   : 2020-07-26 18:20:32.533407 +0000 UTC
  tag1   : sample1
  tag2   : row2
  field1 : 2
  field2 : 0.940509
3行目
  time   : 2020-07-26 18:20:32.535223 +0000 UTC
  tag1   : sample1
  tag2   : row3
  field1 : 3
  field2 : 0.664560
4行目
  time   : 2020-07-26 18:20:32.53716 +0000 UTC
  tag1   : sample1
  tag2   : row4
  field1 : 4
  field2 : 0.437714
5行目
  time   : 2020-07-26 18:20:32.539096 +0000 UTC
  tag1   : sample1
  tag2   : row5
  field1 : 5
  field2 : 0.424637

トップ   差分 バックアップ リロード   一覧 単語検索 最終更新   ヘルプ   最終更新のRSS
Last-modified: 2020-07-27 (月) 03:24:30 (1591d)