influxdb2

package module
v2.14.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 12, 2024 License: MIT Imports: 16 Imported by: 958

README

InfluxDB Client Go

CircleCI codecov License Slack Status

This repository contains the Go client library for use with InfluxDB 2.x and Flux. InfluxDB 3.x users should instead use the lightweight v3 client library. InfluxDB 1.x users should use the v1 client library.

For ease of migration and a consistent query and write experience, v2 users should consider using InfluxQL and the v1 client library.

Features

Documentation

This section contains links to the client library documentation.

Examples

Examples for basic writing and querying data are shown below in this document

There are also other examples in the API docs:

How To Use

Installation

Go 1.17 or later is required.

Go mod project
  1. Add the latest version of the client package to your project dependencies (go.mod).
    go get github.com/influxdata/influxdb-client-go/v2
    
  2. Add import github.com/influxdata/influxdb-client-go/v2 to your source code.
GOPATH project
```sh
go get github.com/influxdata/influxdb-client-go
```

Note: To have go get in the GOPATH mode, the environment variable GO111MODULE must have the off value.

Basic Example

The following example demonstrates how to write data to InfluxDB 2 and read them back using the Flux language:

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/influxdata/influxdb-client-go/v2"
)

func main() {
    // Create a new client using an InfluxDB server base URL and an authentication token
    client := influxdb2.NewClient("http://localhost:8086", "my-token")
    // Use blocking write client for writes to desired bucket
    writeAPI := client.WriteAPIBlocking("my-org", "my-bucket")
    // Create point using full params constructor
    p := influxdb2.NewPoint("stat",
        map[string]string{"unit": "temperature"},
        map[string]interface{}{"avg": 24.5, "max": 45.0},
        time.Now())
    // write point immediately
    writeAPI.WritePoint(context.Background(), p)
    // Create point using fluent style
    p = influxdb2.NewPointWithMeasurement("stat").
        AddTag("unit", "temperature").
        AddField("avg", 23.2).
        AddField("max", 45.0).
        SetTime(time.Now())
    err := writeAPI.WritePoint(context.Background(), p)
	if err != nil {
		panic(err)
	}
    // Or write directly line protocol
    line := fmt.Sprintf("stat,unit=temperature avg=%f,max=%f", 23.5, 45.0)
    err = writeAPI.WriteRecord(context.Background(), line)
	if err != nil {
		panic(err)
	}

    // Get query client
    queryAPI := client.QueryAPI("my-org")
    // Get parser flux query result
    result, err := queryAPI.Query(context.Background(), `from(bucket:"my-bucket")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "stat")`)
    if err == nil {
        // Use Next() to iterate over query result lines
        for result.Next() {
            // Observe when there is new grouping key producing new table
            if result.TableChanged() {
                fmt.Printf("table: %s\n", result.TableMetadata().String())
            }
            // read result
            fmt.Printf("row: %s\n", result.Record().String())
        }
        if result.Err() != nil {
            fmt.Printf("Query error: %s\n", result.Err().Error())
        }
    } else {
		panic(err)
    }
    // Ensures background processes finishes
    client.Close()
}
Options

The InfluxDBClient uses set of options to configure behavior. These are available in the Options object Creating a client instance using

client := influxdb2.NewClient("http://localhost:8086", "my-token")

will use the default options.

To set different configuration values, e.g. to set gzip compression and trust all server certificates, get default options and change what is needed:

client := influxdb2.NewClientWithOptions("http://localhost:8086", "my-token",
    influxdb2.DefaultOptions().
        SetUseGZip(true).
        SetTLSConfig(&tls.Config{
            InsecureSkipVerify: true,
        }))
Writes

Client offers two ways of writing, non-blocking and blocking.

Non-blocking write client

Non-blocking write client uses implicit batching. Data are asynchronously written to the underlying buffer and they are automatically sent to a server when the size of the write buffer reaches the batch size, default 5000, or the flush interval, default 1s, times out. Writes are automatically retried on server back pressure.

This write client also offers synchronous blocking method to ensure that write buffer is flushed and all pending writes are finished, see Flush() method. Always use Close() method of the client to stop all background processes.

Asynchronous write client is recommended for frequent periodic writes.

package main

import (
    "fmt"
    "math/rand"
    "time"

    "github.com/influxdata/influxdb-client-go/v2"
)

func main() {
    // Create a new client using an InfluxDB server base URL and an authentication token
    // and set batch size to 20
    client := influxdb2.NewClientWithOptions("http://localhost:8086", "my-token",
        influxdb2.DefaultOptions().SetBatchSize(20))
    // Get non-blocking write client
    writeAPI := client.WriteAPI("my-org","my-bucket")
    // write some points
    for i := 0; i <100; i++ {
        // create point
        p := influxdb2.NewPoint(
            "system",
            map[string]string{
                "id":       fmt.Sprintf("rack_%v", i%10),
                "vendor":   "AWS",
                "hostname": fmt.Sprintf("host_%v", i%100),
            },
            map[string]interface{}{
                "temperature": rand.Float64() * 80.0,
                "disk_free":   rand.Float64() * 1000.0,
                "disk_total":  (i/10 + 1) * 1000000,
                "mem_total":   (i/100 + 1) * 10000000,
                "mem_free":    rand.Uint64(),
            },
            time.Now())
        // write asynchronously
        writeAPI.WritePoint(p)
    }
    // Force all unwritten data to be sent
    writeAPI.Flush()
    // Ensures background processes finishes
    client.Close()
}
Handling of failed async writes

WriteAPI by default continues with retrying of failed writes. Retried are automatically writes that fail on a connection failure or when server returns response HTTP status code >= 429.

Retrying algorithm uses random exponential strategy to set retry time. The delay for the next retry attempt is a random value in the interval retryInterval * exponentialBase^(attempts) and retryInterval * exponentialBase^(attempts+1). If writes of batch repeatedly fails, WriteAPI continues with retrying until maxRetries is reached or the overall retry time of batch exceeds maxRetryTime.

The defaults parameters (part of the WriteOptions) are:

  • retryInterval=5,000ms
  • exponentialBase=2
  • maxRetryDelay=125,000ms
  • maxRetries=5
  • maxRetryTime=180,000ms

Retry delays are by default randomly distributed within the ranges:

  1. 5,000-10,000
  2. 10,000-20,000
  3. 20,000-40,000
  4. 40,000-80,000
  5. 80,000-125,000

Setting retryInterval to 0 disables retry strategy and any failed write will discard the batch.

WriteFailedCallback allows advanced controlling of retrying. It is synchronously notified in case async write fails. It controls further batch handling by its return value. If it returns true, WriteAPI continues with retrying of writes of this batch. Returned false means the batch should be discarded.

Reading async errors

WriteAPI automatically logs write errors. Use Errors() method, which returns the channel for reading errors occuring during async writes, for writing write error to a custom target:

package main

import (
    "fmt"
    "math/rand"
    "time"

    "github.com/influxdata/influxdb-client-go/v2"
)

func main() {
    // Create a new client using an InfluxDB server base URL and an authentication token
    client := influxdb2.NewClient("http://localhost:8086", "my-token")
    // Get non-blocking write client
    writeAPI := client.WriteAPI("my-org", "my-bucket")
    // Get errors channel
    errorsCh := writeAPI.Errors()
    // Create go proc for reading and logging errors
    go func() {
        for err := range errorsCh {
            fmt.Printf("write error: %s\n", err.Error())
        }
    }()
    // write some points
    for i := 0; i < 100; i++ {
        // create point
        p := influxdb2.NewPointWithMeasurement("stat").
            AddTag("id", fmt.Sprintf("rack_%v", i%10)).
            AddTag("vendor", "AWS").
            AddTag("hostname", fmt.Sprintf("host_%v", i%100)).
            AddField("temperature", rand.Float64()*80.0).
            AddField("disk_free", rand.Float64()*1000.0).
            AddField("disk_total", (i/10+1)*1000000).
            AddField("mem_total", (i/100+1)*10000000).
            AddField("mem_free", rand.Uint64()).
            SetTime(time.Now())
        // write asynchronously
        writeAPI.WritePoint(p)
    }
    // Force all unwritten data to be sent
    writeAPI.Flush()
    // Ensures background processes finishes
    client.Close()
}
Blocking write client

Blocking write client writes given point(s) synchronously. It doesn't do implicit batching. Batch is created from given set of points. Implicit batching can be enabled with WriteAPIBlocking.EnableBatching().

package main

import (
    "context"
    "fmt"
    "math/rand"
    "time"

    "github.com/influxdata/influxdb-client-go/v2"
)

func main() {
    // Create a new client using an InfluxDB server base URL and an authentication token
    client := influxdb2.NewClient("http://localhost:8086", "my-token")
    // Get blocking write client
    writeAPI := client.WriteAPIBlocking("my-org","my-bucket")
    // write some points
    for i := 0; i <100; i++ {
        // create data point
        p := influxdb2.NewPoint(
            "system",
            map[string]string{
                "id":       fmt.Sprintf("rack_%v", i%10),
                "vendor":   "AWS",
                "hostname": fmt.Sprintf("host_%v", i%100),
            },
            map[string]interface{}{
                "temperature": rand.Float64() * 80.0,
                "disk_free":   rand.Float64() * 1000.0,
                "disk_total":  (i/10 + 1) * 1000000,
                "mem_total":   (i/100 + 1) * 10000000,
                "mem_free":    rand.Uint64(),
            },
            time.Now())
        // write synchronously
        err := writeAPI.WritePoint(context.Background(), p)
        if err != nil {
            panic(err)
        }
    }
    // Ensures background processes finishes
    client.Close()
}
Queries

Query client offers retrieving of query results to a parsed representation in a QueryTableResult or to a raw string.

QueryTableResult

QueryTableResult offers comfortable way how to deal with flux query CSV response. It parses CSV stream into FluxTableMetaData, FluxColumn and FluxRecord objects for easy reading the result.

package main

import (
    "context"
    "fmt"

    "github.com/influxdata/influxdb-client-go/v2"
)

func main() {
    // Create a new client using an InfluxDB server base URL and an authentication token
    client := influxdb2.NewClient("http://localhost:8086", "my-token")
    // Get query client
    queryAPI := client.QueryAPI("my-org")
    // get QueryTableResult
    result, err := queryAPI.Query(context.Background(), `from(bucket:"my-bucket")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "stat")`)
    if err == nil {
        // Iterate over query response
        for result.Next() {
            // Notice when group key has changed
            if result.TableChanged() {
                fmt.Printf("table: %s\n", result.TableMetadata().String())
            }
            // Access data
            fmt.Printf("value: %v\n", result.Record().Value())
        }
        // check for an error
        if result.Err() != nil {
            fmt.Printf("query parsing error: %s\n", result.Err().Error())
        }
    } else {
        panic(err)
    }
    // Ensures background processes finishes
    client.Close()
}
Raw

QueryRaw() returns raw, unparsed, query result string and process it on your own. Returned csv format can be controlled by the third parameter, query dialect.

package main

import (
    "context"
    "fmt"

    "github.com/influxdata/influxdb-client-go/v2"
)

func main() {
    // Create a new client using an InfluxDB server base URL and an authentication token
    client := influxdb2.NewClient("http://localhost:8086", "my-token")
    // Get query client
    queryAPI := client.QueryAPI("my-org")
    // Query and get complete result as a string
    // Use default dialect
    result, err := queryAPI.QueryRaw(context.Background(), `from(bucket:"my-bucket")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "stat")`, influxdb2.DefaultDialect())
    if err == nil {
        fmt.Println("QueryResult:")
        fmt.Println(result)
    } else {
        panic(err)
    }
    // Ensures background processes finishes
    client.Close()
}
Parametrized Queries

InfluxDB Cloud supports Parameterized Queries that let you dynamically change values in a query using the InfluxDB API. Parameterized queries make Flux queries more reusable and can also be used to help prevent injection attacks.

InfluxDB Cloud inserts the params object into the Flux query as a Flux record named params. Use dot or bracket notation to access parameters in the params record in your Flux query. Parameterized Flux queries support only int , float, and string data types. To convert the supported data types into other Flux basic data types, use Flux type conversion functions.

Query parameters can be passed as a struct or map. Param values can be only simple types or time.Time. The name of the parameter represented by a struct field can be specified by JSON annotation.

Parameterized query example:

⚠ Parameterized Queries are supported only in InfluxDB Cloud. There is no support in InfluxDB OSS currently.

package main

import (
	"context"
	"fmt"

	"github.com/influxdata/influxdb-client-go/v2"
)

func main() {
	// Create a new client using an InfluxDB server base URL and an authentication token
	client := influxdb2.NewClient("http://localhost:8086", "my-token")
	// Get query client
	queryAPI := client.QueryAPI("my-org")
	// Define parameters
	parameters := struct {
		Start string  `json:"start"`
		Field string  `json:"field"`
		Value float64 `json:"value"`
	}{
		"-1h",
		"temperature",
		25,
	}
	// Query with parameters
	query := `from(bucket:"my-bucket")
				|> range(start: duration(params.start))
				|> filter(fn: (r) => r._measurement == "stat")
				|> filter(fn: (r) => r._field == params.field)
				|> filter(fn: (r) => r._value > params.value)`

	// Get result
	result, err := queryAPI.QueryWithParams(context.Background(), query, parameters)
	if err == nil {
		// Iterate over query response
		for result.Next() {
			// Notice when group key has changed
			if result.TableChanged() {
				fmt.Printf("table: %s\n", result.TableMetadata().String())
			}
			// Access data
			fmt.Printf("value: %v\n", result.Record().Value())
		}
		// check for an error
		if result.Err() != nil {
			fmt.Printf("query parsing error: %s\n", result.Err().Error())
		}
	} else {
		panic(err)
	}
	// Ensures background processes finishes
	client.Close()
}
Concurrency

InfluxDB Go Client can be used in a concurrent environment. All its functions are thread-safe.

The best practise is to use a single Client instance per server URL. This ensures optimized resources usage, most importantly reusing HTTP connections.

For efficient reuse of HTTP resources among multiple clients, create an HTTP client and use Options.SetHTTPClient() for setting it to all clients:

    // Create HTTP client
    httpClient := &http.Client{
        Timeout: time.Second * time.Duration(60),
        Transport: &http.Transport{
            DialContext: (&net.Dialer{
                Timeout: 5 * time.Second,
            }).DialContext,
            TLSHandshakeTimeout: 5 * time.Second,
            TLSClientConfig: &tls.Config{
                InsecureSkipVerify: true,
            },
            MaxIdleConns:        100,
            MaxIdleConnsPerHost: 100,
            IdleConnTimeout:     90 * time.Second,
        },
    }
    // Client for server 1
    client1 := influxdb2.NewClientWithOptions("https://server:8086", "my-token", influxdb2.DefaultOptions().SetHTTPClient(httpClient))
    // Client for server 2
    client2 := influxdb2.NewClientWithOptions("https://server:9999", "my-token2", influxdb2.DefaultOptions().SetHTTPClient(httpClient))

Client ensures that there is a single instance of each server API sub-client for the specific area. E.g. a single WriteAPI instance for each org/bucket pair, a single QueryAPI for each org.

Such a single API sub-client instance can be used concurrently:

package main

import (
	"math/rand"
	"sync"
	"time"

	influxdb2 "github.com/influxdata/influxdb-client-go"
	"github.com/influxdata/influxdb-client-go/v2/api/write"
)

func main() {
    // Create client
    client := influxdb2.NewClient("http://localhost:8086", "my-token")
    // Ensure closing the client
    defer client.Close()

    // Get write client
    writeApi := client.WriteAPI("my-org", "my-bucket")

    // Create channel for points feeding
    pointsCh := make(chan *write.Point, 200)

    threads := 5

    var wg sync.WaitGroup
    go func(points int) {
        for i := 0; i < points; i++ {
            p := influxdb2.NewPoint("meas",
                map[string]string{"tag": "tagvalue"},
                map[string]interface{}{"val1": rand.Int63n(1000), "val2": rand.Float64()*100.0 - 50.0},
                time.Now())
            pointsCh <- p
        }
        close(pointsCh)
    }(1000000)

    // Launch write routines
    for t := 0; t < threads; t++ {
        wg.Add(1)
        go func() {
            for p := range pointsCh {
                writeApi.WritePoint(p)
            }
            wg.Done()
        }()
    }
    // Wait for writes complete
    wg.Wait()
}
Proxy and redirects

You can configure InfluxDB Go client behind a proxy in two ways:

  1. Using environment variable Set environment variable HTTP_PROXY (or HTTPS_PROXY based on the scheme of your server url). e.g. (linux) export HTTP_PROXY=http://my-proxy:8080 or in Go code os.Setenv("HTTP_PROXY","http://my-proxy:8080")

  2. Configure http.Client to use proxy
    Create a custom http.Client with a proxy configuration:

    proxyUrl, err := url.Parse("http://my-proxy:8080")
    httpClient := &http.Client{
        Transport: &http.Transport{
            Proxy: http.ProxyURL(proxyUrl)
        }
    }
    client := influxdb2.NewClientWithOptions("http://localhost:8086", token, influxdb2.DefaultOptions().SetHTTPClient(httpClient))
    

Client automatically follows HTTP redirects. The default redirect policy is to follow up to 10 consecutive requests. Due to a security reason Authorization header is not forwarded when redirect leads to a different domain. To overcome this limitation you have to set a custom redirect handler:

token := "my-token"

httpClient := &http.Client{
    CheckRedirect: func(req *http.Request, via []*http.Request) error {
        req.Header.Add("Authorization","Token " + token)
        return nil
    },
}
client := influxdb2.NewClientWithOptions("http://localhost:8086", token, influxdb2.DefaultOptions().SetHTTPClient(httpClient))
Checking Server State

There are three functions for checking whether a server is up and ready for communication:

Function Description Availability
Health() Detailed info about the server status, along with version string OSS
Ready() Server uptime info OSS
Ping() Whether a server is up OSS, Cloud

Only the Ping() function works in InfluxDB Cloud server.

InfluxDB 1.8 API compatibility

InfluxDB 1.8.0 introduced forward compatibility APIs for InfluxDB 2.0. This allow you to easily move from InfluxDB 1.x to InfluxDB 2.0 Cloud or open source.

Client API usage differences summary:

  1. Use the form username:password for an authentication token. Example: my-user:my-password. Use an empty string ("") if the server doesn't require authentication.
  2. The organization parameter is not used. Use an empty string ("") where necessary.
  3. Use the form database/retention-policy where a bucket is required. Skip retention policy if the default retention policy should be used. Examples: telegraf/autogen, telegraf.  

The following forward compatible APIs are available:

API Endpoint Description
WriteAPI (also WriteAPIBlocking) /api/v2/write Write data to InfluxDB 1.8.0+ using the InfluxDB 2.0 API
QueryAPI /api/v2/query Query data in InfluxDB 1.8.0+ using the InfluxDB 2.0 API and Flux endpoint should be enabled by the flux-enabled option
Health() /health Check the health of your InfluxDB instance
Example
package main

import (
    "context"
    "fmt"
    "time"

    "github.com/influxdata/influxdb-client-go/v2"
)

func main() {
    userName := "my-user"
    password := "my-password"
     // Create a new client using an InfluxDB server base URL and an authentication token
    // For authentication token supply a string in the form: "username:password" as a token. Set empty value for an unauthenticated server
    client := influxdb2.NewClient("http://localhost:8086", fmt.Sprintf("%s:%s",userName, password))
    // Get the blocking write client
    // Supply a string in the form database/retention-policy as a bucket. Skip retention policy for the default one, use just a database name (without the slash character)
    // Org name is not used
    writeAPI := client.WriteAPIBlocking("", "test/autogen")
    // create point using full params constructor
    p := influxdb2.NewPoint("stat",
        map[string]string{"unit": "temperature"},
        map[string]interface{}{"avg": 24.5, "max": 45},
        time.Now())
    // Write data
    err := writeAPI.WritePoint(context.Background(), p)
    if err != nil {
        fmt.Printf("Write error: %s\n", err.Error())
    }

    // Get query client. Org name is not used
    queryAPI := client.QueryAPI("")
    // Supply string in a form database/retention-policy as a bucket. Skip retention policy for the default one, use just a database name (without the slash character)
    result, err := queryAPI.Query(context.Background(), `from(bucket:"test")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "stat")`)
    if err == nil {
        for result.Next() {
            if result.TableChanged() {
                fmt.Printf("table: %s\n", result.TableMetadata().String())
            }
            fmt.Printf("row: %s\n", result.Record().String())
        }
        if result.Err() != nil {
            fmt.Printf("Query error: %s\n", result.Err().Error())
        }
    } else {
        fmt.Printf("Query error: %s\n", err.Error())
    }
    // Close client
    client.Close()
}

Contributing

If you would like to contribute code you can do through GitHub by forking the repository and sending a pull request into the master branch.

License

The InfluxDB 2 Go Client is released under the MIT License.

Documentation

Overview

Package influxdb2 provides API for using InfluxDB client in Go. It's intended to use with InfluxDB 2 server. WriteAPI, QueryAPI and Health work also with InfluxDB 1.8

Index

Examples

Constants

View Source
const (
	// Version defines current version
	Version = "2.14.0"
)

Variables

This section is empty.

Functions

func DefaultDialect

func DefaultDialect() *domain.Dialect

DefaultDialect return flux query Dialect with full annotations (datatype, group, default), header and comma char as a delimiter

func NewPoint

func NewPoint(
	measurement string,
	tags map[string]string,
	fields map[string]interface{},
	ts time.Time,
) *write.Point

NewPoint creates a Point from measurement name, tags, fields and a timestamp.

func NewPointWithMeasurement

func NewPointWithMeasurement(measurement string) *write.Point

NewPointWithMeasurement creates a empty Point Use AddTag and AddField to fill point with data

Types

type Client

type Client interface {
	// Setup sends request to initialise new InfluxDB server with user, org and bucket, and data retention period
	// and returns details about newly created entities along with the authorization object.
	// Retention period of zero will result to infinite retention.
	Setup(ctx context.Context, username, password, org, bucket string, retentionPeriodHours int) (*domain.OnboardingResponse, error)
	// SetupWithToken sends request to initialise new InfluxDB server with user, org and bucket, data retention period and token
	// and returns details about newly created entities along with the authorization object.
	// Retention period of zero will result to infinite retention.
	SetupWithToken(ctx context.Context, username, password, org, bucket string, retentionPeriodHours int, token string) (*domain.OnboardingResponse, error)
	// Ready returns InfluxDB uptime info of server. It doesn't validate authentication params.
	Ready(ctx context.Context) (*domain.Ready, error)
	// Health returns an InfluxDB server health check result. Read the HealthCheck.Status field to get server status.
	// Health doesn't validate authentication params.
	Health(ctx context.Context) (*domain.HealthCheck, error)
	// Ping validates whether InfluxDB server is running. It doesn't validate authentication params.
	Ping(ctx context.Context) (bool, error)
	// Close ensures all ongoing asynchronous write clients finish.
	// Also closes all idle connections, in case of HTTP client was created internally.
	Close()
	// Options returns the options associated with client
	Options() *Options
	// ServerURL returns the url of the server url client talks to
	ServerURL() string
	// HTTPService returns underlying HTTP service object used by client
	HTTPService() http.Service
	// WriteAPI returns the asynchronous, non-blocking, Write client.
	// Ensures using a single WriteAPI instance for each org/bucket pair.
	WriteAPI(org, bucket string) api.WriteAPI
	// WriteAPIBlocking returns the synchronous, blocking, Write client.
	// Ensures using a single WriteAPIBlocking instance for each org/bucket pair.
	WriteAPIBlocking(org, bucket string) api.WriteAPIBlocking
	// QueryAPI returns Query client.
	// Ensures using a single QueryAPI instance each org.
	QueryAPI(org string) api.QueryAPI
	// AuthorizationsAPI returns Authorizations API client.
	AuthorizationsAPI() api.AuthorizationsAPI
	// OrganizationsAPI returns Organizations API client
	OrganizationsAPI() api.OrganizationsAPI
	// UsersAPI returns Users API client.
	UsersAPI() api.UsersAPI
	// DeleteAPI returns Delete API client
	DeleteAPI() api.DeleteAPI
	// BucketsAPI returns Buckets API client
	BucketsAPI() api.BucketsAPI
	// LabelsAPI returns Labels API client
	LabelsAPI() api.LabelsAPI
	// TasksAPI returns Tasks API client
	TasksAPI() api.TasksAPI

	APIClient() *domain.Client
}

Client provides API to communicate with InfluxDBServer. There two APIs for writing, WriteAPI and WriteAPIBlocking. WriteAPI provides asynchronous, non-blocking, methods for writing time series data. WriteAPIBlocking provides blocking methods for writing time series data.

Example (CheckAPICall)
// This example shows how to perform custom server API invocation for checks API

// Create client. You need an admin token for creating DBRP mapping
client := influxdb2.NewClient("http://localhost:8086", "my-token")

// Always close client at the end
defer client.Close()

ctx := context.Background()

// Create a new threshold check
greater := domain.GreaterThreshold{}
greater.Value = 10.0
lc := domain.CheckStatusLevelCRIT
greater.Level = &lc
greater.AllValues = &[]bool{true}[0]

lesser := domain.LesserThreshold{}
lesser.Value = 1.0
lo := domain.CheckStatusLevelOK
lesser.Level = &lo

rang := domain.RangeThreshold{}
rang.Min = 3.0
rang.Max = 8.0
lw := domain.CheckStatusLevelWARN
rang.Level = &lw

thresholds := []domain.Threshold{&greater, &lesser, &rang}

// Get organization where check will be created
org, err := client.OrganizationsAPI().FindOrganizationByName(ctx, "my-org")
if err != nil {
	panic(err)
}

// Prepare necessary parameters
msg := "Check: ${ r._check_name } is: ${ r._level }"
flux := `from(bucket: "foo") |> range(start: -1d, stop: now()) |> aggregateWindow(every: 1m, fn: mean) |> filter(fn: (r) => r._field == "usage_user") |> yield()`
every := "1h"
offset := "0s"

c := domain.ThresholdCheck{
	CheckBaseExtend: domain.CheckBaseExtend{
		CheckBase: domain.CheckBase{
			Name:   "My threshold check",
			OrgID:  *org.Id,
			Query:  domain.DashboardQuery{Text: &flux},
			Status: domain.TaskStatusTypeActive,
		},
		Every:                 &every,
		Offset:                &offset,
		StatusMessageTemplate: &msg,
	},
	Thresholds: &thresholds,
}
params := domain.CreateCheckAllParams{
	Body: &c,
}
// Call checks API using internal API client
check, err := client.APIClient().CreateCheck(context.Background(), &params)
if err != nil {
	panic(err)
}
// Optionally verify type
if check.Type() != string(domain.ThresholdCheckTypeThreshold) {
	panic("Check type is not threshold")
}
// Cast check to threshold check
thresholdCheck := check.(*domain.ThresholdCheck)
fmt.Printf("Created threshold check with id %s\n", *thresholdCheck.Id)
Output:

Example (CustomServerAPICall)
// This example shows how to perform custom server API invocation for any endpoint
// Here we will create a DBRP mapping which allows using buckets in legacy write and query (InfluxQL) endpoints

// Create client. You need an admin token for creating DBRP mapping
client := influxdb2.NewClient("http://localhost:8086", "my-token")

// Always close client at the end
defer client.Close()

// Get generated client for server API calls
apiClient := client.APIClient()
ctx := context.Background()

// Get a bucket we would like to query using InfluxQL
b, err := client.BucketsAPI().FindBucketByName(ctx, "my-bucket")
if err != nil {
	panic(err)
}
// Get an organization that will own the mapping
o, err := client.OrganizationsAPI().FindOrganizationByName(ctx, "my-org")
if err != nil {
	panic(err)
}

yes := true
// Fill required fields of the DBRP struct
dbrp := domain.DBRPCreate{
	BucketID:        *b.Id,
	Database:        "my-bucket",
	Default:         &yes,
	OrgID:           o.Id,
	RetentionPolicy: "autogen",
}

params := &domain.PostDBRPAllParams{
	Body: domain.PostDBRPJSONRequestBody(dbrp),
}
// Call server API
newDbrp, err := apiClient.PostDBRP(ctx, params)
if err != nil {
	panic(err)
}
// Check generated response errors

fmt.Printf("Created DBRP: %#v\n", newDbrp)
Output:

Example (CustomUserAgentHeader)
package main

import (
	"context"
	"fmt"
	"net/http"

	"github.com/influxdata/influxdb-client-go/v2"
	ihttp "github.com/influxdata/influxdb-client-go/v2/api/http"
)

// UserAgentSetter is the implementation of Doer interface for setting User-Agent header
type UserAgentSetter struct {
	UserAgent   string
	RequestDoer ihttp.Doer
}

// Do fulfills the Doer interface
func (u *UserAgentSetter) Do(req *http.Request) (*http.Response, error) {
	// Set User-Agent header to request
	req.Header.Set("User-Agent", u.UserAgent)
	// Call original Doer to proceed with request
	return u.RequestDoer.Do(req)
}

func main() {
	// Set custom Doer to HTTPOptions
	opts := influxdb2.DefaultOptions()
	opts.HTTPOptions().SetHTTPDoer(&UserAgentSetter{
		UserAgent:   "NetMonitor/1.1",
		RequestDoer: http.DefaultClient,
	})

	//Create client with customized options
	client := influxdb2.NewClientWithOptions("http://localhost:8086", "my-token", opts)

	// Always close client at the end
	defer client.Close()

	// Issue a call with custom User-Agent header
	resp, err := client.Ping(context.Background())
	if err != nil {
		panic(err)
	}
	if resp {
		fmt.Println("Server is up")
	} else {
		fmt.Println("Server is down")
	}
}
Output:

Example (NewClient)
// Create a new client using an InfluxDB server base URL and an authentication token
client := influxdb2.NewClient("http://localhost:8086", "my-token")

// Always close client at the end
defer client.Close()
Output:

Example (NewClientWithOptions)
// Create a new client using an InfluxDB server base URL and an authentication token
// Create client and set batch size to 20
client := influxdb2.NewClientWithOptions("http://localhost:8086", "my-token",
	influxdb2.DefaultOptions().SetBatchSize(20))

// Always close client at the end
defer client.Close()
Output:

func NewClient

func NewClient(serverURL string, authToken string) Client

NewClient creates Client for connecting to given serverURL with provided authentication token, with the default options. serverURL is the InfluxDB server base URL, e.g. http://localhost:8086, authToken is an authentication token. It can be empty in case of connecting to newly installed InfluxDB server, which has not been set up yet. In such case, calling Setup() will set the authentication token.

func NewClientWithOptions

func NewClientWithOptions(serverURL string, authToken string, options *Options) Client

NewClientWithOptions creates Client for connecting to given serverURL with provided authentication token and configured with custom Options. serverURL is the InfluxDB server base URL, e.g. http://localhost:8086, authToken is an authentication token. It can be empty in case of connecting to newly installed InfluxDB server, which has not been set up yet. In such case, calling Setup() will set authentication token

type Options

type Options struct {
	// contains filtered or unexported fields
}

Options holds configuration properties for communicating with InfluxDB server

func DefaultOptions

func DefaultOptions() *Options

DefaultOptions returns Options object with default values

func (*Options) AddDefaultTag

func (o *Options) AddDefaultTag(key, value string) *Options

AddDefaultTag adds a default tag. DefaultTags are added to each written point. If a tag with the same key already exist it is overwritten. If a point already defines such a tag, it is left unchanged

func (*Options) ApplicationName added in v2.12.0

func (o *Options) ApplicationName() string

ApplicationName returns application name used in the User-Agent HTTP header

func (*Options) BatchSize

func (o *Options) BatchSize() uint

BatchSize returns size of batch

func (*Options) ExponentialBase added in v2.5.0

func (o *Options) ExponentialBase() uint

ExponentialBase returns the base for the exponential retry delay. Default 2.

func (*Options) FlushInterval

func (o *Options) FlushInterval() uint

FlushInterval returns flush interval in ms

func (*Options) HTTPClient

func (o *Options) HTTPClient() *nethttp.Client

HTTPClient returns the http.Client that is configured to be used for HTTP requests. It will return the one that has been set using SetHTTPClient or it will construct a default client using the other configured options.

func (*Options) HTTPOptions

func (o *Options) HTTPOptions() *http.Options

HTTPOptions returns HTTP related options

func (*Options) HTTPRequestTimeout

func (o *Options) HTTPRequestTimeout() uint

HTTPRequestTimeout returns HTTP request timeout

func (*Options) LogLevel

func (o *Options) LogLevel() uint

LogLevel returns log level

func (*Options) MaxRetries

func (o *Options) MaxRetries() uint

MaxRetries returns maximum count of retry attempts of failed writes, default 5.

func (*Options) MaxRetryInterval

func (o *Options) MaxRetryInterval() uint

MaxRetryInterval returns the maximum delay between each retry attempt in milliseconds, default 125,000.

func (*Options) MaxRetryTime added in v2.5.0

func (o *Options) MaxRetryTime() uint

MaxRetryTime returns the maximum total retry timeout in millisecond, default 180,000.

func (*Options) Precision

func (o *Options) Precision() time.Duration

Precision returns time precision for writes

func (*Options) RetryBufferLimit

func (o *Options) RetryBufferLimit() uint

RetryBufferLimit returns retry buffer limit

func (*Options) RetryInterval

func (o *Options) RetryInterval() uint

RetryInterval returns the retry interval in ms

func (*Options) SetApplicationName added in v2.12.0

func (o *Options) SetApplicationName(appName string) *Options

SetApplicationName sets an application name to the User-Agent HTTP header

func (*Options) SetBatchSize

func (o *Options) SetBatchSize(batchSize uint) *Options

SetBatchSize sets number of points sent in single request

func (*Options) SetExponentialBase added in v2.5.0

func (o *Options) SetExponentialBase(exponentialBase uint) *Options

SetExponentialBase sets the base for the exponential retry delay.

func (*Options) SetFlushInterval

func (o *Options) SetFlushInterval(flushIntervalMs uint) *Options

SetFlushInterval sets flush interval in ms in which is buffer flushed if it has not been already written

func (*Options) SetHTTPClient

func (o *Options) SetHTTPClient(c *nethttp.Client) *Options

SetHTTPClient will configure the http.Client that is used for HTTP requests. If set to nil, an HTTPClient will be generated.

Setting the HTTPClient will cause the other HTTP options to be ignored. In case of UsersAPI.SignIn() is used, HTTPClient.Jar will be used for storing session cookie.

func (*Options) SetHTTPRequestTimeout

func (o *Options) SetHTTPRequestTimeout(httpRequestTimeout uint) *Options

SetHTTPRequestTimeout sets HTTP request timeout in sec

func (*Options) SetLogLevel

func (o *Options) SetLogLevel(logLevel uint) *Options

SetLogLevel set level to filter log messages. Each level mean to log all categories bellow. Default is ErrorLevel. There are four level constant int the log package in this library:

  • ErrorLevel
  • WarningLevel
  • InfoLevel
  • DebugLevel

The DebugLevel will print also content of writen batches, queries. The InfoLevel prints HTTP requests info, among others. Set log.Log to nil in order to completely disable logging.

func (*Options) SetMaxRetries

func (o *Options) SetMaxRetries(maxRetries uint) *Options

SetMaxRetries sets maximum count of retry attempts of failed writes. Setting zero value disables retry strategy.

func (*Options) SetMaxRetryInterval

func (o *Options) SetMaxRetryInterval(maxRetryIntervalMs uint) *Options

SetMaxRetryInterval sets the maximum delay between each retry attempt in millisecond.

func (*Options) SetMaxRetryTime added in v2.5.0

func (o *Options) SetMaxRetryTime(maxRetryTimeMs uint) *Options

SetMaxRetryTime sets the maximum total retry timeout in millisecond.

func (*Options) SetPrecision

func (o *Options) SetPrecision(precision time.Duration) *Options

SetPrecision sets time precision to use in writes for timestamp. In unit of duration: time.Nanosecond, time.Microsecond, time.Millisecond, time.Second

func (*Options) SetRetryBufferLimit

func (o *Options) SetRetryBufferLimit(retryBufferLimit uint) *Options

SetRetryBufferLimit sets maximum number of points to keep for retry. Should be multiple of BatchSize.

func (*Options) SetRetryInterval

func (o *Options) SetRetryInterval(retryIntervalMs uint) *Options

SetRetryInterval sets retry interval in ms, which is set if not sent by server

func (*Options) SetTLSConfig

func (o *Options) SetTLSConfig(tlsConfig *tls.Config) *Options

SetTLSConfig sets TLS configuration for secure connection

func (*Options) SetUseGZip

func (o *Options) SetUseGZip(useGZip bool) *Options

SetUseGZip specifies whether to use GZip compression in write requests.

func (*Options) TLSConfig

func (o *Options) TLSConfig() *tls.Config

TLSConfig returns TLS config

func (*Options) UseGZip

func (o *Options) UseGZip() bool

UseGZip returns true if write request are gzip`ed

func (*Options) WriteOptions

func (o *Options) WriteOptions() *write.Options

WriteOptions returns write related options

Directories

Path Synopsis
api
Package api provides clients for InfluxDB server APIs.
Package api provides clients for InfluxDB server APIs.
http
Package http provides HTTP servicing related code.
Package http provides HTTP servicing related code.
query
Package query defined types for representing flux query result
Package query defined types for representing flux query result
write
Package write provides the Point struct
Package write provides the Point struct
Package domain provides primitives to interact with the openapi HTTP API.
Package domain provides primitives to interact with the openapi HTTP API.
internal
examples
Package examples contains fake client with the same interface as real client to overcome import-cycle problem to allow real E2E examples for apis in this package
Package examples contains fake client with the same interface as real client to overcome import-cycle problem to allow real E2E examples for apis in this package
gzip
Package gzip provides GZip related functionality
Package gzip provides GZip related functionality
http
Package http hold internal HTTP related stuff
Package http hold internal HTTP related stuff
log
Package log provides internal logging infrastructure
Package log provides internal logging infrastructure
test
Package test provides shared test utils
Package test provides shared test utils
write
Package write provides service and its stuff
Package write provides service and its stuff
Package log defines Logging API.
Package log defines Logging API.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL