influxdb2

package module
v1.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 19, 2020 License: MIT Imports: 13 Imported by: 124

README

InfluxDB Client Go

CircleCI codecov License Slack Status

This repository contains the reference Go client for InfluxDB 2.

Note: Use this client library with InfluxDB 2.x and InfluxDB 1.8+ (see details). For connecting to InfluxDB 1.7 or earlier instances, use the influxdb1-go client library.

Features

Documentation

Go API docs is available at: https://pkg.go.dev/github.com/influxdata/influxdb-client-go

How To Use

Installation

Go 1.3 or later is required.

Add import github.com/influxdata/influxdb-client-go to your source code and sync dependencies or directly edit the go.mod file.

Basic Example

The following example demonstrates how to write data to InfluxDB 2 and read them back using the Flux language:

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/influxdata/influxdb-client-go"
)

func main() {
    // create new client with default option for server url authenticate by token
    client := influxdb2.NewClient("http://localhost:9999", "my-token")
    // user blocking write client for writes to desired bucket
    writeApi := client.WriteApiBlocking("my-org", "my-bucket")
    // create point using full params constructor 
    p := influxdb2.NewPoint("stat",
        map[string]string{"unit": "temperature"},
        map[string]interface{}{"avg": 24.5, "max": 45},
        time.Now())
    // write point immediately 
    writeApi.WritePoint(context.Background(), p)
    // create point using fluent style
    p = influxdb2.NewPointWithMeasurement("stat").
        AddTag("unit", "temperature").
        AddField("avg", 23.2).
        AddField("max", 45).
        SetTime(time.Now())
    writeApi.WritePoint(context.Background(), p)
    
    // Or write directly line protocol
    line := fmt.Sprintf("stat,unit=temperature avg=%f,max=%f", 23.5, 45.0)
    writeApi.WriteRecord(context.Background(), line)

    // get query client
    queryApi := client.QueryApi("my-org")
    // get parser flux query result
    result, err := queryApi.Query(context.Background(), `from(bucket:"my-bucket")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "stat")`)
    if err == nil {
        // Use Next() to iterate over query result lines
        for result.Next() {
            // Observe when there is new grouping key producing new table
            if result.TableChanged() {
                fmt.Printf("table: %s\n", result.TableMetadata().String())
            }
            // read result
            fmt.Printf("row: %s\n", result.Record().String())
        }
        if result.Err() != nil {
            fmt.Printf("Query error: %s\n", result.Err().Error())
        }
    }
    // Ensures background processes finishes
    client.Close()
}
Options

The InfluxDBClient uses set of options to configure behavior. These are available in the Options object Creating a client instance using

client := influxdb2.NewClient("http://localhost:9999", "my-token")

will use the default options.

To set different configuration values, e.g. to set gzip compression and trust all server certificates, get default options and change what is needed:

client := influxdb2.NewClientWithOptions("http://localhost:9999", "my-token", 
    influxdb2.DefaultOptions().
        SetUseGZip(true).
        SetTlsConfig(&tls.Config{
            InsecureSkipVerify: true,
        }))
Writes

Client offers two ways of writing, non-blocking and blocking.

Non-blocking write client

Non-blocking write client uses implicit batching. Data are asynchronously written to the underlying buffer and they are automatically sent to a server when the size of the write buffer reaches the batch size, default 1000, or the flush interval, default 1s, times out. Writes are automatically retried on server back pressure.

This write client also offers synchronous blocking method to ensure that write buffer is flushed and all pending writes are finished, see Flush() method. Always use Close() method of the client to stop all background processes.

Asynchronous write client is recommended for frequent periodic writes.

package main

import (
    "fmt"
    "github.com/influxdata/influxdb-client-go"
    "math/rand"
    "time"
)

func main() {
    // Create client and set batch size to 20 
    client := influxdb2.NewClientWithOptions("http://localhost:9999", "my-token",
        influxdb2.DefaultOptions().SetBatchSize(20))
    // Get non-blocking write client
    writeApi := client.WriteApi("my-org","my-bucket")
    // write some points
    for i := 0; i <100; i++ {
        // create point
        p := influxdb2.NewPoint(
            "system",
            map[string]string{
                "id":       fmt.Sprintf("rack_%v", i%10),
                "vendor":   "AWS",
                "hostname": fmt.Sprintf("host_%v", i%100),
            },
            map[string]interface{}{
                "temperature": rand.Float64() * 80.0,
                "disk_free":   rand.Float64() * 1000.0,
                "disk_total":  (i/10 + 1) * 1000000,
                "mem_total":   (i/100 + 1) * 10000000,
                "mem_free":    rand.Uint64(),
            },
            time.Now())
        // write asynchronously
        writeApi.WritePoint(p)
    }
    // Force all unwritten data to be sent
    writeApi.Flush()
    // Ensures background processes finishes
    client.Close()
}
Reading async errors

Error() method returns a channel for reading errors which occurs during async writes. This channel is unbuffered and it must be read asynchronously otherwise will block write procedure:

package main

import (
    "fmt"
    "math/rand"
    "time"

    "github.com/influxdata/influxdb-client-go"
)

func main() {
    // Create client
    client := influxdb2.NewClient("http://localhost:9999", "my-token")
    // Get non-blocking write client
    writeApi := client.WriteApi("my-org", "my-bucket")
    // Get errors channel
    errorsCh := writeApi.Errors()
    // Create go proc for reading and logging errors
    go func() {
        for err := range errorsCh {
            fmt.Printf("write error: %s\n", err.Error())
        }
    }()
    // write some points
    for i := 0; i < 100; i++ {
        // create point
        p := influxdb2.NewPointWithMeasurement("stat").
            AddTag("id", fmt.Sprintf("rack_%v", i%10)).
            AddTag("vendor", "AWS").
            AddTag("hostname", fmt.Sprintf("host_%v", i%100)).
            AddField("temperature", rand.Float64()*80.0).
            AddField("disk_free", rand.Float64()*1000.0).
            AddField("disk_total", (i/10+1)*1000000).
            AddField("mem_total", (i/100+1)*10000000).
            AddField("mem_free", rand.Uint64()).
            SetTime(time.Now())
        // write asynchronously
        writeApi.WritePoint(p)
    }
    // Force all unwritten data to be sent
    writeApi.Flush()
    // Ensures background processes finishes
    client.Close()
}
Blocking write client

Blocking write client writes given point(s) synchronously. It doesn't have implicit batching. Batch is created from given set of points.

package main

import (
    "context"
    "fmt"
    "github.com/influxdata/influxdb-client-go"
    "math/rand"
    "time"
)

func main() {
    // Create client
    client := influxdb2.NewClient("http://localhost:9999", "my-token")
    // Get blocking write client
    writeApi := client.WriteApiBlocking("my-org","my-bucket")
    // write some points
    for i := 0; i <100; i++ {
        // create data point
        p := influxdb2.NewPoint(
            "system",
            map[string]string{
                "id":       fmt.Sprintf("rack_%v", i%10),
                "vendor":   "AWS",
                "hostname": fmt.Sprintf("host_%v", i%100),
            },
            map[string]interface{}{
                "temperature": rand.Float64() * 80.0,
                "disk_free":   rand.Float64() * 1000.0,
                "disk_total":  (i/10 + 1) * 1000000,
                "mem_total":   (i/100 + 1) * 10000000,
                "mem_free":    rand.Uint64(),
            },
            time.Now())
        // write synchronously
        err := writeApi.WritePoint(context.Background(), p)
        if err != nil {
            panic(err)
        }
    }
    // Ensures background processes finishes
    client.Close()
}
Queries

Query client offers two ways of retrieving query results, parsed representation in QueryTableResult and a raw result string.

QueryTableResult

QueryTableResult offers comfortable way how to deal with flux query CSV response. It parses CSV stream into FluxTableMetaData, FluxColumn and FluxRecord objects for easy reading the result.

package main

import (
    "context"
    "fmt"
    "github.com/influxdata/influxdb-client-go"
)

func main() {
    // Create client
    client := influxdb2.NewClient("http://localhost:9999", "my-token")
    // Get query client
    queryApi := client.QueryApi("my-org")
    // get QueryTableResult
    result, err := queryApi.Query(context.Background(), `from(bucket:"my-bucket")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "stat")`)
    if err == nil {
        // Iterate over query response
        for result.Next() {
            // Notice when group key has changed
            if result.TableChanged() {
                fmt.Printf("table: %s\n", result.TableMetadata().String())
            }
            // Access data
            fmt.Printf("value: %v\n", result.Record().Value())
        }
        // check for an error
        if result.Err() != nil {
            fmt.Printf("query parsing error: %s\n", result.Err().Error())
        }
    } else {
        panic(err)
    }
    // Ensures background processes finishes
    client.Close()
}
Raw

QueryRaw() returns raw, unparsed, query result string and process it on your own. Returned csv format
can be controlled by the third parameter, query dialect.

package main

import (
    "context"
    "fmt"
    "github.com/influxdata/influxdb-client-go"
)

func main() {
    // Create client
    client := influxdb2.NewClient("http://localhost:9999", "my-token")
    // Get query client
    queryApi := client.QueryApi("my-org")
    // Query and get complete result as a string
    // Use default dialect
    result, err := queryApi.QueryRaw(context.Background(), `from(bucket:"my-bucket")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "stat")`, influxdb2.DefaultDialect())
    if err == nil {
        fmt.Println("QueryResult:")
        fmt.Println(result)
    } else {
        panic(err)
    }
    // Ensures background processes finishes
    client.Close()
}    

InfluxDB 1.8 API compatibility

InfluxDB 1.8.0 introduced forward compatibility APIs for InfluxDB 2.0. This allow you to easily move from InfluxDB 1.x to InfluxDB 2.0 Cloud or open source.

Client API usage differences summary:

  1. Use the form username:password for an authentication token. Example: my-user:my-password. Use an empty string ("") if the server doesn't require authentication.
  2. The organization parameter is not used. Use an empty string ("") where necessary.
  3. Use the form database/retention-policy where a bucket is required. Skip retention policy if the default retention policy should be used. Examples: telegraf/autogen, telegraf.  

The following forward compatible APIs are available:

API Endpoint Description
WriteApi (also WriteApiBlocking) /api/v2/write Write data to InfluxDB 1.8.0+ using the InfluxDB 2.0 API
QueryApi /api/v2/query Query data in InfluxDB 1.8.0+ using the InfluxDB 2.0 API and Flux endpoint should be enabled by the flux-enabled option
Health() /health Check the health of your InfluxDB instance
Example
package main

import (
    "context"
    "fmt"
    "time"

    "github.com/influxdata/influxdb-client-go"
)

func main() {
    userName := "my-user"
    password := "my-password"
    // Create a client
    // Supply a string in the form: "username:password" as a token. Set empty value for an unauthenticated server
    client := influxdb2.NewClient("http://localhost:8086", fmt.Sprintf("%s:%s",userName, password))
    // Get the blocking write client
    // Supply a string in the form database/retention-policy as a bucket. Skip retention policy for the default one, use just a database name (without the slash character)
    // Org name is not used
    writeApi := client.WriteApiBlocking("", "test/autogen")
    // create point using full params constructor
    p := influxdb2.NewPoint("stat",
        map[string]string{"unit": "temperature"},
        map[string]interface{}{"avg": 24.5, "max": 45},
        time.Now())
    // Write data
    err := writeApi.WritePoint(context.Background(), p)
    if err != nil {
        fmt.Printf("Write error: %s\n", err.Error())
    }

    // Get query client. Org name is not used
    queryApi := client.QueryApi("")
    // Supply string in a form database/retention-policy as a bucket. Skip retention policy for the default one, use just a database name (without the slash character)
    result, err := queryApi.Query(context.Background(), `from(bucket:"test")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "stat")`)
    if err == nil {
        for result.Next() {
            if result.TableChanged() {
                fmt.Printf("table: %s\n", result.TableMetadata().String())
            }
            fmt.Printf("row: %s\n", result.Record().String())
        }
        if result.Err() != nil {
            fmt.Printf("Query error: %s\n", result.Err().Error())
        }
    } else {
        fmt.Printf("Query error: %s\n", err.Error())
    }
    // Close client
    client.Close()
}

Contributing

If you would like to contribute code you can do through GitHub by forking the repository and sending a pull request into the master branch.

License

The InfluxDB 2 Go Client is released under the MIT License.

Documentation

Overview

Package influxdb2 provides API for using InfluxDB client in Go. It's intended to use with InfluxDB 2 server. WriteApi, QueryApi and Health work also with InfluxDB 1.8

Index

Examples

Constants

View Source
const (
	Version = "1.3.0"
)

Variables

This section is empty.

Functions

func DefaultDialect added in v1.0.0

func DefaultDialect() *domain.Dialect

DefaultDialect return flux query Dialect with full annotations (datatype, group, default), header and comma char as a delimiter

func NewPoint added in v1.0.0

func NewPoint(
	measurement string,
	tags map[string]string,
	fields map[string]interface{},
	ts time.Time,
) *write.Point

NewPoint creates a Point from measurement name, tags, fields and a timestamp.

func NewPointWithMeasurement added in v1.0.0

func NewPointWithMeasurement(measurement string) *write.Point

NewPointWithMeasurement creates a empty Point Use AddTag and AddField to fill point with data

Types

type Client

type Client interface {
	// Setup sends request to initialise new InfluxDB server with user, org and bucket, and data retention period
	// and returns details about newly created entities along with the authorization object.
	// Retention period of zero will result to infinite retention.
	Setup(ctx context.Context, username, password, org, bucket string, retentionPeriodHours int) (*domain.OnboardingResponse, error)
	// Ready checks InfluxDB server is running. It doesn't validate authentication params.
	Ready(ctx context.Context) (bool, error)
	// Health returns an InfluxDB server health check result. Read the HealthCheck.Status field to get server status.
	// Health doesn't validate authentication params.
	Health(ctx context.Context) (*domain.HealthCheck, error)
	// Close ensures all ongoing asynchronous write clients finish
	Close()
	// Options returns the options associated with client
	Options() *Options
	// ServerUrl returns the url of the server url client talks to
	ServerUrl() string
	// WriteApi returns the asynchronous, non-blocking, Write client
	WriteApi(org, bucket string) api.WriteApi
	// WriteApi returns the synchronous, blocking, Write client
	WriteApiBlocking(org, bucket string) api.WriteApiBlocking
	// QueryApi returns Query client
	QueryApi(org string) api.QueryApi
	// AuthorizationsApi returns Authorizations API client
	AuthorizationsApi() api.AuthorizationsApi
	// OrganizationsApi returns Organizations API client
	OrganizationsApi() api.OrganizationsApi
	// UsersApi returns Users API client
	UsersApi() api.UsersApi
	// DeleteApi returns Delete API client
	DeleteApi() api.DeleteApi
	// BucketsApi returns Buckets API client
	BucketsApi() api.BucketsApi
	// LabelsApi returns Labels API client
	LabelsApi() api.LabelsApi
}

Client provides API to communicate with InfluxDBServer. There two APIs for writing, WriteApi and WriteApiBlocking. WriteApi provides asynchronous, non-blocking, methods for writing time series data. WriteApiBlocking provides blocking methods for writing time series data.

Example (NewClient)
// Create client
client := influxdb2.NewClient("http://localhost:9999", "my-token")

// always close client at the end
defer client.Close()
Output:

Example (NewClientWithOptions)
// Create client and set batch size to 20
client := influxdb2.NewClientWithOptions("http://localhost:9999", "my-token",
	influxdb2.DefaultOptions().SetBatchSize(20))

// always close client at the end
defer client.Close()
Output:

func NewClient added in v1.0.0

func NewClient(serverUrl string, authToken string) Client

NewClient creates Client for connecting to given serverUrl with provided authentication token, with the default options. Authentication token can be empty in case of connecting to newly installed InfluxDB server, which has not been set up yet. In such case Setup will set authentication token

func NewClientWithOptions added in v1.0.0

func NewClientWithOptions(serverUrl string, authToken string, options *Options) Client

NewClientWithOptions creates Client for connecting to given serverUrl with provided authentication token and configured with custom Options Authentication token can be empty in case of connecting to newly installed InfluxDB server, which has not been set up yet. In such case Setup will set authentication token

type Options added in v1.0.0

type Options struct {
	// contains filtered or unexported fields
}

Options holds configuration properties for communicating with InfluxDB server

func DefaultOptions added in v1.0.0

func DefaultOptions() *Options

DefaultOptions returns Options object with default values

func (*Options) AddDefaultTag added in v1.3.0

func (o *Options) AddDefaultTag(key, value string) *Options

AddDefaultTag adds a default tag. DefaultTags are added to each written point. If a tag with the same key already exist it is overwritten. If a point already defines such a tag, it is left unchanged

func (*Options) BatchSize added in v1.0.0

func (o *Options) BatchSize() uint

BatchSize returns size of batch

func (*Options) FlushInterval added in v1.0.0

func (o *Options) FlushInterval() uint

FlushInterval returns flush interval in ms

func (*Options) HttpOptions added in v1.2.0

func (o *Options) HttpOptions() *http.Options

HttpOptions returns http related options

func (*Options) HttpRequestTimeout added in v1.1.0

func (o *Options) HttpRequestTimeout() uint

HttpRequestTimeout returns HTTP request timeout

func (*Options) LogLevel added in v1.0.0

func (o *Options) LogLevel() uint

LogLevel returns log level

func (*Options) MaxRetries added in v1.0.0

func (o *Options) MaxRetries() uint

MaxRetries returns maximum count of retry attempts of failed writes

func (*Options) Precision added in v1.0.0

func (o *Options) Precision() time.Duration

Precision returns time precision for writes

func (*Options) RetryBufferLimit added in v1.0.0

func (o *Options) RetryBufferLimit() uint

RetryBufferLimit returns retry buffer limit

func (*Options) RetryInterval added in v1.0.0

func (o *Options) RetryInterval() uint

RetryInterval returns the retry interval in ms

func (*Options) SetBatchSize added in v1.0.0

func (o *Options) SetBatchSize(batchSize uint) *Options

SetBatchSize sets number of points sent in single request

func (*Options) SetFlushInterval added in v1.0.0

func (o *Options) SetFlushInterval(flushIntervalMs uint) *Options

SetFlushInterval sets flush interval in ms in which is buffer flushed if it has not been already written

func (*Options) SetHttpRequestTimeout added in v1.1.0

func (o *Options) SetHttpRequestTimeout(httpRequestTimeout uint) *Options

SetHttpRequestTimeout sets HTTP request timeout in sec

func (*Options) SetLogLevel added in v1.0.0

func (o *Options) SetLogLevel(logLevel uint) *Options

SetLogLevel set level to filter log messages. Each level mean to log all categories bellow. 0 error, 1 - warning, 2 - info, 3 - debug Debug level will print also content of writen batches

func (*Options) SetMaxRetries added in v1.0.0

func (o *Options) SetMaxRetries(maxRetries uint) *Options

SetMaxRetries sets maximum count of retry attempts of failed writes

func (*Options) SetPrecision added in v1.0.0

func (o *Options) SetPrecision(precision time.Duration) *Options

SetPrecision sets time precision to use in writes for timestamp. In unit of duration: time.Nanosecond, time.Microsecond, time.Millisecond, time.Second

func (*Options) SetRetryBufferLimit added in v1.0.0

func (o *Options) SetRetryBufferLimit(retryBufferLimit uint) *Options

SetRetryBufferLimit sets maximum number of points to keep for retry. Should be multiple of BatchSize.

func (*Options) SetRetryInterval added in v1.0.0

func (o *Options) SetRetryInterval(retryIntervalMs uint) *Options

SetRetryInterval sets retry interval in ms, which is set if not sent by server

func (*Options) SetTlsConfig added in v1.0.0

func (o *Options) SetTlsConfig(tlsConfig *tls.Config) *Options

SetTlsConfig sets TLS configuration for secure connection

func (*Options) SetUseGZip added in v1.0.0

func (o *Options) SetUseGZip(useGZip bool) *Options

SetUseGZip specifies whether to use GZip compression in write requests.

func (*Options) TlsConfig added in v1.0.0

func (o *Options) TlsConfig() *tls.Config

TlsConfig returns TlsConfig

func (*Options) UseGZip added in v1.0.0

func (o *Options) UseGZip() bool

UseGZip returns true if write request are gzip`ed

func (*Options) WriteOptions added in v1.2.0

func (o *Options) WriteOptions() *write.Options

WriteOptions returns write related options

Directories

Path Synopsis
api
Package api provides clients for InfluxDB server APIs.
Package api provides clients for InfluxDB server APIs.
Package domain provides primitives to interact the openapi HTTP API.
Package domain provides primitives to interact the openapi HTTP API.
internal
log

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL