influxdb2

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 1, 2020 License: MIT Imports: 28 Imported by: 122

README

InfluxDB Client Go

CircleCI codecov License

This repository contains the reference Go client for InfluxDB 2.

Features

Installation

Go 1.3 or later is required.

Add import github.com/influxdata/influxdb-client-go to your source code and sync dependencies or directly edit go.mod.

Usage

Basic example with blocking write and flux query.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/influxdata/influxdb-client-go"
)

func main() {
    // create new client with default option for server url authenticate by token
	client := influxdb2.NewClient("http://localhost:9999", "my-token")
    // user blocking write client for writes to desired bucket
	writeApi := client.WriteApiBlocking("my-org", "my-bucket")
	// create point using full params constructor 
	p := influxdb2.NewPoint("stat",
        map[string]string{"unit": "temperature"},
        map[string]interface{}{"avg": 24.5, "max": 45},
        time.Now())
    // write point immediately 
    writeApi.WritePoint(context.Background(), p)
    // create point using fluent style
    p = influxdb2.NewPointWithMeasurement("stat").
        AddTag("unit", "temperature").
        AddField("avg", 23.2).
        AddField("max", 45).
        SetTime(time.Now())
	writeApi.WritePoint(context.Background(), p)
    
    // Or write directly line protocol
	line := fmt.Sprintf("stat,unit=temperature avg=%f,max=%f", 23.5, 45.0)
	writeApi.WriteRecord(context.Background(), line)

    // get query client
	queryApi := client.QueryApi("my-org")
    // get parser flux query result
	result, err := queryApi.Query(context.Background(), `from(bucket:"my-bucket")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "stat")`)
	if err == nil {
        // Use Next() to iterate over query result lines
		for result.Next() {
            // Observe when there is new grouping key producing new table
			if result.TableChanged() {
				fmt.Printf("table: %s\n", result.TableMetadata().String())
			}
            // read result
			fmt.Printf("row: %s\n", result.Record().String())
		}
		if result.Err() != nil {
			fmt.Printf("Query error: %s\n", result.Err().Error())
		}
	}
	
}
Options

Client uses set of options to configure behavior. These are available in the Options object Creating client using:

client := influxdb2.NewClient("http://localhost:9999", "my-token")

To set different configuration values, e.g. to set gzip compression and trust all server certificates, get default options and change what needed:

    client := influxdb2.NewClientWithOptions("http://localhost:9999", "my-token", 
		influxdb2.DefaultOptions().
			SetUseGZip(true).
			SetTlsConfig(&tls.Config{
				InsecureSkipVerify: true,
			}))
Writes

Client offers two ways of writing, non-blocking and blocking.

Non-blocking write client

Non-blocking write client uses implicit batching. Data are asynchronously written to the underlying buffer and are automatically sent to server when size of the write buffer reaches the batch size, default 1000, or flush interval, default 1s, times out. Writes are automatically retried on server back pressure.

This client also offers synchronous blocking method to ensure that write buffer is flushed and all pending writes are finished, see Flush() method. Always use Close() method of the client to stop background processes.

This write client recommended for frequent periodic writes.

Example:

package main

import (
	"fmt"
	"github.com/influxdata/influxdb-client-go"
	"math/rand"
	"time"
)

func main() {
	// Create client and set batch size to 20 
	client := influxdb2.NewClientWithOptions("http://localhost:9999", "my-token",
		influxdb2.DefaultOptions().SetBatchSize(20))
	// Get non-blocking write client
	writeApi := client.WriteApi("my-org","my-bucket")
	// write some points
	for i := 0; i <100; i++ {
        // create point
		p := influxdb2.NewPoint(
			"system",
			map[string]string{
				"id":       fmt.Sprintf("rack_%v", i%10),
				"vendor":   "AWS",
				"hostname": fmt.Sprintf("host_%v", i%100),
			},
			map[string]interface{}{
				"temperature": rand.Float64() * 80.0,
				"disk_free":   rand.Float64() * 1000.0,
				"disk_total":  (i/10 + 1) * 1000000,
				"mem_total":   (i/100 + 1) * 10000000,
				"mem_free":    rand.Uint64(),
			},
			time.Now())
        // write asynchronously
		writeApi.WritePoint(p)
	}
	// Force all unwritten data to be sent
	writeApi.Flush()
	// Ensures background processes finishes
	client.Close()
}
Blocking write client

Blocking write client writes given point(s) synchronously. No implicit batching. Batch is created from given set of points

Example:

package main

import (
	"context"
	"fmt"
	"github.com/influxdata/influxdb-client-go"
	"math/rand"
	"time"
)

func main() {
	// Create client
	client := influxdb2.NewClient("http://localhost:9999", "my-token")
	// Get non-blocking write client
	writeApi := client.WriteApiBlocking("my-org","my-bucket")
	// write some points
	for i := 0; i <100; i++ {
		// create data point
		p := influxdb2.NewPoint(
			"system",
			map[string]string{
				"id":       fmt.Sprintf("rack_%v", i%10),
				"vendor":   "AWS",
				"hostname": fmt.Sprintf("host_%v", i%100),
			},
			map[string]interface{}{
				"temperature": rand.Float64() * 80.0,
				"disk_free":   rand.Float64() * 1000.0,
				"disk_total":  (i/10 + 1) * 1000000,
				"mem_total":   (i/100 + 1) * 10000000,
				"mem_free":    rand.Uint64(),
			},
			time.Now())
		// write synchronously
		err := writeApi.WritePoint(context.Background(), p)
		if err != nil {
			panic(err)
		}
	}
}
Queries

Query client offer two ways of retrieving query results, parsed representation in QueryTableResult and a raw result string. which parses response stream into FluxTableMetaData, FluxColumn and FluxRecord objects.

QueryTableResult

QueryTableResult offers comfortable way how to deal with flux query CSV response. It parses CSV stream into FluxTableMetaData, FluxColumn and FluxRecord objects for easy reading the result.

package main

import (
	"context"
	"fmt"
	"github.com/influxdata/influxdb-client-go"
)

func main() {
	// Create client
	client := influxdb2.NewClient("http://localhost:9999", "my-token")
	// Get query client
	queryApi := client.QueryApi("my-org")
	// get QueryTableResult
	result, err := queryApi.Query(context.Background(), `from(bucket:"my-bucket")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "stat")`)
	if err == nil {
		// Iterate over query response
		for result.Next() {
			// Notice when group key has changed
			if result.TableChanged() {
				fmt.Printf("table: %s\n", result.TableMetadata().String())
			}
			// Access data
			fmt.Printf("value: %v\n", result.Record().Value())
		}
		// check for an error
		if result.Err() != nil {
			fmt.Printf("query parsing error: %s\n", result.Err().Error())
		}
	} else {
		panic(err)
	}
}
Raw

QueryRaw() returns raw, unparsed, query result string and process it on your own. Returned csv format
can controlled by third parameter, query dialect.

package main

import (
	"context"
	"fmt"
	"github.com/influxdata/influxdb-client-go"
)

func main() {
	// Create client
	client := influxdb2.NewClient("http://localhost:9999", "my-token")
	// Get query client
	queryApi := client.QueryApi("my-org")
	// Query and get complete result as a string
	// Use default dialect
	result, err := queryApi.QueryRaw(context.Background(), `from(bucket:"my-bucket")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "stat")`, influxdb2.DefaultDialect())
	if err == nil {
		fmt.Println("QueryResult:")
		fmt.Println(result)
	} else {
		panic(err)
	}
}    

Contributing

If you would like to contribute code you can do through GitHub by forking the repository and sending a pull request into the master branch.

License

The InfluxDB 2 Go Client is released under the MIT License.

Documentation

Overview

package influxdb2 provides API for using InfluxDB client in Go It's intended to use with InfluxDB 2 server

Index

Constants

View Source
const (
	Version = "1.0.0"
)

Variables

This section is empty.

Functions

func DefaultDialect added in v1.0.0

func DefaultDialect() *domain.Dialect

DefaultDialect return flux query Dialect with full annotations (datatype, group, default), header and comma char as a delimiter

Types

type Error

type Error struct {
	StatusCode int
	Code       string
	Message    string
	Err        error
	RetryAfter uint
}

Error represent error response from InfluxDBServer or http error

func NewError added in v1.0.0

func NewError(err error) *Error

NewError returns newly created Error initialised with nested error and default values

func (*Error) Error

func (e *Error) Error() string

Error fulfils error interface

type FluxColumn added in v1.0.0

type FluxColumn struct {
	// contains filtered or unexported fields
}

FluxColumn holds flux query table column properties

func (*FluxColumn) DataType added in v1.0.0

func (f *FluxColumn) DataType() string

DataType returns data type of the column

func (*FluxColumn) DefaultValue added in v1.0.0

func (f *FluxColumn) DefaultValue() string

DefaultValue returns default value of the column

func (*FluxColumn) Index added in v1.0.0

func (f *FluxColumn) Index() int

Index returns index of the column

func (*FluxColumn) IsGroup added in v1.0.0

func (f *FluxColumn) IsGroup() bool

IsGroup return true if the column is grouping column

func (*FluxColumn) Name added in v1.0.0

func (f *FluxColumn) Name() string

Name returns name of the column

func (*FluxColumn) SetDataType added in v1.0.0

func (f *FluxColumn) SetDataType(dataType string)

SetDataType sets data type for the column

func (*FluxColumn) SetDefaultValue added in v1.0.0

func (f *FluxColumn) SetDefaultValue(defaultValue string)

SetDefaultValue sets default value for the column

func (*FluxColumn) SetGroup added in v1.0.0

func (f *FluxColumn) SetGroup(group bool)

SetGroup set group flag for the column

func (*FluxColumn) SetName added in v1.0.0

func (f *FluxColumn) SetName(name string)

SetName sets name of the column

func (*FluxColumn) String added in v1.0.0

func (f *FluxColumn) String() string

String returns FluxColumn string dump

type FluxRecord added in v1.0.0

type FluxRecord struct {
	// contains filtered or unexported fields
}

FluxRecord represents row in the flux query result table

func (*FluxRecord) Field added in v1.0.0

func (r *FluxRecord) Field() string

Field returns the field name

func (*FluxRecord) Measurement added in v1.0.0

func (r *FluxRecord) Measurement() string

Measurement returns the measurement name of the record

func (*FluxRecord) Start added in v1.0.0

func (r *FluxRecord) Start() time.Time

Start returns the inclusive lower time bound of all records in the current table

func (*FluxRecord) Stop added in v1.0.0

func (r *FluxRecord) Stop() time.Time

Stop returns the exclusive upper time bound of all records in the current table

func (*FluxRecord) String added in v1.0.0

func (r *FluxRecord) String() string

String returns FluxRecord string dump

func (*FluxRecord) Table added in v1.0.0

func (r *FluxRecord) Table() int

Table returns index of the table record belongs to

func (*FluxRecord) Time added in v1.0.0

func (r *FluxRecord) Time() time.Time

Start returns the time of the record

func (*FluxRecord) Value added in v1.0.0

func (r *FluxRecord) Value() interface{}

Value returns the actual field value

func (*FluxRecord) ValueByKey added in v1.0.0

func (r *FluxRecord) ValueByKey(key string) interface{}

ValueByKey returns value for given column key for the record

func (*FluxRecord) Values added in v1.0.0

func (r *FluxRecord) Values() map[string]interface{}

Values returns map of the values where key is the column name

type FluxTableMetadata added in v1.0.0

type FluxTableMetadata struct {
	// contains filtered or unexported fields
}

FluxTableMetadata holds flux query result table information represented by collection of columns. Each new table is introduced by annotations

func (*FluxTableMetadata) AddColumn added in v1.0.0

func (f *FluxTableMetadata) AddColumn(column *FluxColumn) *FluxTableMetadata

AddColumn adds column definition to table metadata

func (*FluxTableMetadata) Column added in v1.0.0

func (f *FluxTableMetadata) Column(index int) *FluxColumn

Column returns flux table column by index Returns nil if index is out of the bounds

func (*FluxTableMetadata) Columns added in v1.0.0

func (f *FluxTableMetadata) Columns() []*FluxColumn

Columns returns slice of flux query result table

func (*FluxTableMetadata) Position added in v1.0.0

func (f *FluxTableMetadata) Position() int

Position returns position of the table in the flux query result

func (*FluxTableMetadata) String added in v1.0.0

func (f *FluxTableMetadata) String() string

String returns FluxTableMetadata string dump

type InfluxDBClient added in v1.0.0

type InfluxDBClient interface {
	// WriteApi returns the asynchronous, non-blocking, Write client.
	WriteApi(org, bucket string) WriteApi
	// WriteApi returns the synchronous, blocking, Write client.
	WriteApiBlocking(org, bucket string) WriteApiBlocking
	// QueryApi returns Query client
	QueryApi(org string) QueryApi
	// Close ensures all ongoing asynchronous write clients finish
	Close()
	// Options returns the options associated with client
	Options() *Options
	// ServerUrl returns the url of the server url client talks to
	ServerUrl() string
	// Setup sends request to initialise new InfluxDB server with user, org and bucket, and data retention period
	// Retention period of zero will result to infinite retention
	// and returns details about newly created entities along with the authorization object
	Setup(ctx context.Context, username, password, org, bucket string, retentionPeriodHours int) (*domain.OnboardingResponse, error)
	// Ready checks InfluxDB server is running
	Ready(ctx context.Context) (bool, error)
	// contains filtered or unexported methods
}

InfluxDBClient provides API to communicate with InfluxDBServer There two APIs for writing, WriteApi and WriteApiBlocking. WriteApi provides asynchronous, non-blocking, methods for writing time series data. WriteApiBlocking provides blocking methods for writing time series data

func NewClient added in v1.0.0

func NewClient(serverUrl string, authToken string) InfluxDBClient

NewClient creates InfluxDBClient for connecting to given serverUrl with provided authentication token, with default options Authentication token can be empty in case of connecting to newly installed InfluxDB server, which has not been set up yet. In such case Setup will set authentication token

func NewClientWithOptions added in v1.0.0

func NewClientWithOptions(serverUrl string, authToken string, options *Options) InfluxDBClient

NewClientWithOptions creates InfluxDBClient for connecting to given serverUrl with provided authentication token and configured with custom Options Authentication token can be empty in case of connecting to newly installed InfluxDB server, which has not been set up yet. In such case Setup will set authentication token

type Options added in v1.0.0

type Options struct {
	// contains filtered or unexported fields
}

Options holds configuration properties for communicating with InfluxDB server

func DefaultOptions added in v1.0.0

func DefaultOptions() *Options

DefaultOptions returns Options object with default values

func (*Options) BatchSize added in v1.0.0

func (o *Options) BatchSize() uint

BatchSize returns size of batch

func (*Options) FlushInterval added in v1.0.0

func (o *Options) FlushInterval() uint

FlushInterval returns flush interval in ms

func (*Options) LogLevel added in v1.0.0

func (o *Options) LogLevel() uint

LogLevel returns log level

func (*Options) MaxRetries added in v1.0.0

func (o *Options) MaxRetries() uint

MaxRetries returns maximum count of retry attempts of failed writes

func (*Options) Precision added in v1.0.0

func (o *Options) Precision() time.Duration

Precision returns time precision for writes

func (*Options) RetryBufferLimit added in v1.0.0

func (o *Options) RetryBufferLimit() uint

RetryBufferLimit returns retry buffer limit

func (*Options) RetryInterval added in v1.0.0

func (o *Options) RetryInterval() uint

RetryInterval returns retry interval in ms

func (*Options) SetBatchSize added in v1.0.0

func (o *Options) SetBatchSize(batchSize uint) *Options

SetBatchSize sets number of points sent in single request

func (*Options) SetFlushInterval added in v1.0.0

func (o *Options) SetFlushInterval(flushIntervalMs uint) *Options

SetFlushInterval sets flush interval in ms in which is buffer flushed if it has not been already written

func (*Options) SetLogLevel added in v1.0.0

func (o *Options) SetLogLevel(logLevel uint) *Options

SetLogLevel set level to filter log messages. Each level mean to log all categories bellow. 0 error, 1 - warning, 2 - info, 3 - debug Debug level will print also content of writen batches

func (*Options) SetMaxRetries added in v1.0.0

func (o *Options) SetMaxRetries(maxRetries uint) *Options

SetMaxRetries sets maximum count of retry attempts of failed writes

func (*Options) SetPrecision added in v1.0.0

func (o *Options) SetPrecision(precision time.Duration) *Options

SetPrecision sets time precision to use in writes for timestamp. In unit of duration: time.Nanosecond, time.Microsecond, time.Millisecond, time.Second

func (*Options) SetRetryBufferLimit added in v1.0.0

func (o *Options) SetRetryBufferLimit(retryBufferLimit uint) *Options

SetRetryBufferLimit sets maximum number of points to keep for retry. Should be multiple of BatchSize.

func (*Options) SetRetryInterval added in v1.0.0

func (o *Options) SetRetryInterval(retryIntervalMs uint) *Options

SetRetryInterval sets retry interval in ms, which is set if not sent by server

func (*Options) SetTlsConfig added in v1.0.0

func (o *Options) SetTlsConfig(tlsConfig *tls.Config) *Options

SetTlsConfig sets TLS configuration for secure connection

func (*Options) SetUseGZip added in v1.0.0

func (o *Options) SetUseGZip(useGZip bool) *Options

SetUseGZip specifies whether to use GZip compression in write requests.

func (*Options) TlsConfig added in v1.0.0

func (o *Options) TlsConfig() *tls.Config

TlsConfig returns TlsConfig

func (*Options) UseGZip added in v1.0.0

func (o *Options) UseGZip() bool

UseGZip returns true if write request are gzip`ed

type Point added in v1.0.0

type Point struct {
	// contains filtered or unexported fields
}

Point is represents InfluxDB time series point, holding tags and fields

func NewPoint added in v1.0.0

func NewPoint(
	measurement string,
	tags map[string]string,
	fields map[string]interface{},
	ts time.Time,
) *Point

NewPoint creates a Point from measurement name, tags, fields and a timestamp.

func NewPointWithMeasurement added in v1.0.0

func NewPointWithMeasurement(measurement string) *Point

NewPointWithMeasurement creates a empty Point Use AddTag and AddField to fill point with data

func (*Point) AddField added in v1.0.0

func (m *Point) AddField(k string, v interface{}) *Point

AddField adds a field to a point.

func (*Point) AddTag added in v1.0.0

func (m *Point) AddTag(k, v string) *Point

AddTag adds a tag to a point.

func (*Point) FieldList added in v1.0.0

func (m *Point) FieldList() []*lp.Field

FieldList returns a slice containing the fields of a Point.

func (*Point) Name added in v1.0.0

func (m *Point) Name() string

Name returns the name of measurement of a point.

func (*Point) SetTime added in v1.0.0

func (m *Point) SetTime(timestamp time.Time) *Point

SetTime set timestamp for a Point.

func (*Point) SortFields added in v1.0.0

func (m *Point) SortFields() *Point

SortFields orders the fields of a point alphanumerically by key.

func (*Point) SortTags added in v1.0.0

func (m *Point) SortTags() *Point

SortTags orders the tags of a point alphanumerically by key. This is just here as a helper, to make it easy to keep tags sorted if you are creating a Point manually.

func (*Point) TagList added in v1.0.0

func (m *Point) TagList() []*lp.Tag

TagList returns a slice containing tags of a Point.

func (*Point) Time added in v1.0.0

func (m *Point) Time() time.Time

Time is the timestamp of a Point.

type QueryApi added in v1.0.0

type QueryApi interface {
	// QueryRaw executes flux query on the InfluxDB server and returns complete query result as a string with table annotations according to dialect
	QueryRaw(ctx context.Context, query string, dialect *domain.Dialect) (string, error)
	// Query executes flux query on the InfluxDB server and returns QueryTableResult which parses streamed response into structures representing flux table parts
	Query(ctx context.Context, query string) (*QueryTableResult, error)
}

QueryApi provides methods for performing synchronously flux query against InfluxDB server

type QueryTableResult added in v1.0.0

type QueryTableResult struct {
	io.Closer
	// contains filtered or unexported fields
}

QueryTableResult parses streamed flux query response into structures representing flux table parts Walking though the result is done by repeatedly calling Next() until returns false. Actual flux table info (columns with names, data types, etc) is returned by TableMetadata() method. Data are acquired by Record() method. Preliminary end can be caused by an error, so when Next() return false, check Err() for an error

func (*QueryTableResult) Err added in v1.0.0

func (q *QueryTableResult) Err() error

Err returns an error raised during flux query response parsing

func (*QueryTableResult) Next added in v1.0.0

func (q *QueryTableResult) Next() bool

Next advances to next row in query result. During the first time it is called, Next creates also table metadata Actual parsed row is available through Record() function Returns false in case of end or an error, otherwise true

func (*QueryTableResult) Record added in v1.0.0

func (q *QueryTableResult) Record() *FluxRecord

Record returns last parsed flux table data row Use Record methods to access value and row properties

func (*QueryTableResult) TableChanged added in v1.0.0

func (q *QueryTableResult) TableChanged() bool

TableChanged returns true if last call of Next() found also new result table Table information is available via TableMetadata method

func (*QueryTableResult) TableMetadata added in v1.0.0

func (q *QueryTableResult) TableMetadata() *FluxTableMetadata

TableMetadata returns actual flux table metadata

func (*QueryTableResult) TablePosition added in v1.0.0

func (q *QueryTableResult) TablePosition() int

TablePosition returns actual flux table position in the result. Each new table is introduced by annotations

type RequestCallback added in v1.0.0

type RequestCallback func(req *http.Request)

Http operation callbacks

type ResponseCallback added in v1.0.0

type ResponseCallback func(req *http.Response) error

type WriteApi added in v1.0.0

type WriteApi interface {
	// WriteRecord writes asynchronously line protocol record into bucket.
	// WriteRecord adds record into the buffer which is sent on the background when it reaches the batch size.
	// Blocking alternative is available in the WriteApiBlocking interface
	WriteRecord(line string)
	// WritePoint writes asynchronously Point into bucket.
	// WritePoint adds Point into the buffer which is sent on the background when it reaches the batch size.
	// Blocking alternative is available in the WriteApiBlocking interface
	WritePoint(point *Point)
	// Flush forces all pending writes from the buffer to be sent
	Flush()
	// Flushes all pending writes and stop async processes. After this the Write client cannot be used
	Close()
	// Errors return channel for reading errors which occurs during async writes
	Errors() <-chan error
}

WriteApiBlocking is Write client interface with non-blocking methods for writing time series data asynchronously in batches into an InfluxDB server.

type WriteApiBlocking added in v1.0.0

type WriteApiBlocking interface {
	// WriteRecord writes line protocol record(s) into bucket.
	// WriteRecord writes without implicit batching. Batch is created from given number of records
	// Non-blocking alternative is available in the WriteApi interface
	WriteRecord(ctx context.Context, line ...string) error
	// WritePoint data point into bucket.
	// WritePoint writes without implicit batching. Batch is created from given number of points
	// Non-blocking alternative is available in the WriteApi interface
	WritePoint(ctx context.Context, point ...*Point) error
}

WriteApiBlocking offers blocking methods for writing time series data synchronously into an InfluxDB server.

Directories

Path Synopsis
Package domain provides primitives to interact the openapi HTTP API.
Package domain provides primitives to interact the openapi HTTP API.
internal
log

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL