influxclient

package
v3.0.0-...-7b93ebd Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 2, 2023 License: MIT Imports: 22 Imported by: 0

Documentation

Overview

Package influxclient provides client for InfluxDB server.

Index

Examples

Constants

This section is empty.

Variables

View Source
var DefaultRetryParams = RetryParams{
	RetryInterval:    5_000,
	MaxRetries:       5,
	RetryBufferLimit: 50_0000,
	MaxRetryInterval: 125_000,
	ExponentialBase:  2,
	MaxRetryTime:     180_000,
}
View Source
var DefaultWriteParams = WriteParams{
	RetryParams:   DefaultRetryParams,
	BatchSize:     5_000,
	MaxBatchBytes: 50_000_000,
	FlushInterval: 60_000,
	Precision:     lineprotocol.Nanosecond,
	GzipThreshold: 1_000,
}

DefaultWriteParams specifies default write param

Functions

func SetRetryStrategyFactory

func SetRetryStrategyFactory(f NewRetryStrategyF)

Types

type AuthorizationsAPI

type AuthorizationsAPI struct {
	// contains filtered or unexported fields
}

AuthorizationsAPI holds methods related to authorization, as found under the /authorizations endpoint.

Example
package main

import (
	"context"
	"fmt"
	"os"

	"github.com/vlastahajek/influxdb-client-go/v3/influxclient"
	"github.com/vlastahajek/influxdb-client-go/v3/influxclient/model"
)

func main() {
	// Create a new client using an InfluxDB server base URL and an authentication token
	client, err := influxclient.New(influxclient.Params{
		ServerURL: "https://eu-central-1-1.aws.cloud2.influxdata.com/",
		AuthToken: "my-token",
		//		Organization: "my-org", // Organization is optional for InfluxDB Cloud
	})

	ctx := context.Background()

	// Find user to grant permission
	user, err := client.UsersAPI().FindOne(ctx, &influxclient.Filter{Name: "user-name"})
	if err != nil {
		panic(err)
	}

	// Find organization
	org, err := client.OrganizationsAPI().FindOne(ctx, &influxclient.Filter{Name: "my-org"})
	if err != nil {
		panic(err)
	}

	// group permissions
	permissions := []model.Permission{
		{
			Action: model.PermissionActionWrite,
			Resource: model.Resource{
				Type: model.ResourceTypeBuckets,
			},
		},
		{
			Action: model.PermissionActionRead,
			Resource: model.Resource{
				Type: model.ResourceTypeBuckets,
			},
		},
	}

	// create authorization object using info above
	auth := &model.Authorization{
		OrgID:       org.Id,
		Permissions: &permissions,
		UserID:      user.Id,
	}

	// grant permission and create token
	authCreated, err := client.AuthorizationsAPI().Create(ctx, auth)
	if err != nil {
		panic(err)
	}
	defer client.AuthorizationsAPI().Delete(ctx, *authCreated.Id) // only for E2E tests

	// Use token
	fmt.Fprintf(os.Stderr, "\tToken: %v\n", *authCreated.Token)
}
Output:

func (*AuthorizationsAPI) Create

Create creates a new authorization. The returned Authorization holds the new ID.

func (*AuthorizationsAPI) Delete

func (a *AuthorizationsAPI) Delete(ctx context.Context, authID string) error

Delete deletes the organization with the given ID.

func (*AuthorizationsAPI) Find

func (a *AuthorizationsAPI) Find(ctx context.Context, filter *Filter) ([]model.Authorization, error)

Find returns all authorizations matching the given filter. Supported filters:

  • OrgName
  • OrgID
  • UserName
  • UserID

func (*AuthorizationsAPI) FindOne

func (a *AuthorizationsAPI) FindOne(ctx context.Context, filter *Filter) (*model.Authorization, error)

FindOne returns one authorizationsmatching the given filter. Supported filters:

  • OrgName
  • OrgID
  • UserName
  • UserID

func (*AuthorizationsAPI) SetStatus

SetStatus updates authorization status.

type BucketsAPI

type BucketsAPI struct {
	// contains filtered or unexported fields
}

BucketsAPI provides methods for managing buckets in a InfluxDB server.

Example
package main

import (
	"context"

	"github.com/vlastahajek/influxdb-client-go/v3/influxclient"
	"github.com/vlastahajek/influxdb-client-go/v3/influxclient/model"
)

func main() {
	// Create a new client using an InfluxDB server base URL and an authentication token
	client, err := influxclient.New(influxclient.Params{
		ServerURL: "https://eu-central-1-1.aws.cloud2.influxdata.com/",
		AuthToken: "my-token",
		//		Organization: "my-org", // Organization is optional for InfluxDB Cloud
	})

	// Get Buckets API client
	bucketsAPI := client.BucketsAPI()
	ctx := context.Background()

	// Get organization that will own new bucket
	org, err := client.OrganizationsAPI().FindOne(ctx, &influxclient.Filter{Name: "org-name"})
	if err != nil {
		panic(err)
	}

	// Create bucket with 1 day retention policy
	bucket, err := bucketsAPI.Create(ctx, &model.Bucket{
		OrgID: org.Id,
		Name:  "bucket-sensors",
		RetentionRules: []model.RetentionRule{
			{
				EverySeconds: 3600 * 24,
			},
		},
	})
	if err != nil {
		panic(err)
	}
	defer bucketsAPI.Delete(ctx, *bucket.Id) // only for E2E tests

	// Update description of the bucket
	desc := "Bucket for sensor data"
	bucket.Description = &desc
	bucket, err = bucketsAPI.Update(ctx, bucket)
	if err != nil {
		panic(err)
	}

}
Output:

func (*BucketsAPI) Create

func (a *BucketsAPI) Create(ctx context.Context, bucket *model.Bucket) (*model.Bucket, error)

Create creates a new bucket with the given information. The label.Name field must be non-empty. The returned Bucket holds the ID of the new bucket.

func (*BucketsAPI) Delete

func (a *BucketsAPI) Delete(ctx context.Context, bucketID string) error

Delete deletes the bucket with the given ID.

func (*BucketsAPI) Find

func (a *BucketsAPI) Find(ctx context.Context, filter *Filter) ([]model.Bucket, error)

Find returns all buckets matching the given filter.

func (*BucketsAPI) FindOne

func (a *BucketsAPI) FindOne(ctx context.Context, filter *Filter) (*model.Bucket, error)

FindOne returns one label that matches the given filter.

func (*BucketsAPI) Update

func (a *BucketsAPI) Update(ctx context.Context, bucket *model.Bucket) (*model.Bucket, error)

Update updates information about a bucket. The bucket ID and OrgID fields must be specified.

type BytesWrite

type BytesWrite func(ctx context.Context, bucket string, bs []byte) error

BytesWrite is a function writing data to a bucket

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client implements an InfluxDB client.

Example (CustomServerAPICall)
package main

import (
	"context"
	"fmt"
	"os"

	"github.com/vlastahajek/influxdb-client-go/v3/influxclient"
	"github.com/vlastahajek/influxdb-client-go/v3/influxclient/model"
)

func main() {
	// This example shows how to perform custom server API invocation for any endpoint
	// Here we will create a DBRP mapping which allows using buckets in legacy write and query (InfluxQL) endpoints

	// Create client. You need an admin token for creating DBRP mapping
	client, err := influxclient.New(influxclient.Params{
		ServerURL: "https://eu-central-1-1.aws.cloud2.influxdata.com/",
		AuthToken: "my-token",
		//		Organization: "my-org", // Organization is optional for InfluxDB Cloud
	})

	// Get generated client for server API calls
	apiClient := client.APIClient()
	ctx := context.Background()

	// Get a bucket we would like to query using InfluxQL
	bucket, err := client.BucketsAPI().FindOne(ctx, &influxclient.Filter{Name: "bucket-name"})
	if err != nil {
		panic(err)
	}

	// Get an organization that will own the mapping
	org, err := client.OrganizationsAPI().FindOne(ctx, &influxclient.Filter{Name: "org-name"})
	if err != nil {
		panic(err)
	}

	yes := true
	// Fill required fields of the DBRP struct
	dbrp := model.DBRPCreate{
		BucketID:        *bucket.Id,
		Database:        bucket.Name,
		Default:         &yes,
		OrgID:           org.Id,
		RetentionPolicy: "autogen",
	}

	params := &model.PostDBRPAllParams{
		Body: model.PostDBRPJSONRequestBody(dbrp),
	}
	// Call server API
	newDbrp, err := apiClient.PostDBRP(ctx, params)
	if err != nil {
		panic(err)
	}
	defer apiClient.DeleteDBRPID(ctx, &model.DeleteDBRPIDAllParams{
		DeleteDBRPIDParams: model.DeleteDBRPIDParams{OrgID: org.Id}, DbrpID: newDbrp.Id,
	}) // only for E2E tests

	// Check generated response
	fmt.Fprintf(os.Stderr, "\tCreated DBRP: %#v\n", newDbrp)
}
Output:

Example (New)
package main

import (
	"github.com/vlastahajek/influxdb-client-go/v3/influxclient"
)

func main() {
	// Create a new client using an InfluxDB server base URL and an authentication token
	client, err := influxclient.New(influxclient.Params{
		ServerURL: "https://eu-central-1-1.aws.cloud2.influxdata.com/",
		AuthToken: "my-token",
		//		Organization: "my-org", // Organization is optional for InfluxDB Cloud
	})

	if err != nil {
		panic(err)
	}

	//

	client.Close()
}
Output:

func New

func New(params Params) (*Client, error)

New creates new Client with given Params, where ServerURL and AuthToken are mandatory.

func (*Client) APIClient

func (c *Client) APIClient() *model.Client

APIClient returns generates API client

func (*Client) AuthorizationsAPI

func (c *Client) AuthorizationsAPI() *AuthorizationsAPI

AuthorizationsAPI returns a value that can be used to interact with the authorization-related parts of the InfluxDB API.

func (*Client) BucketsAPI

func (c *Client) BucketsAPI() *BucketsAPI

BucketsAPI returns a value that can be used to interact with the bucket-related parts of the InfluxDB API.

func (*Client) Close

func (c *Client) Close() error

Close closes all idle connections.

func (*Client) DeletePoints

func (c *Client) DeletePoints(ctx context.Context, params *DeleteParams) error

DeletePoints deletes data from a bucket.

func (*Client) Health

func (c *Client) Health(ctx context.Context) (*model.HealthCheck, error)

Health returns an InfluxDB server health check result. Read the HealthCheck.Status field to get server status. Health doesn't validate authentication params.

func (*Client) LabelsAPI

func (c *Client) LabelsAPI() *LabelsAPI

LabelsAPI returns a value that can be used to interact with the label-related parts of the InfluxDB API.

func (*Client) OrganizationsAPI

func (c *Client) OrganizationsAPI() *OrganizationsAPI

OrganizationsAPI returns a value that can be used to interact with the organization-related parts of the InfluxDB API.

func (*Client) Ping

func (c *Client) Ping(ctx context.Context) error

Ping checks the status and InfluxDB version of the instance. Returns an error if it is not available.

func (*Client) PointsWriter

func (c *Client) PointsWriter(bucket string) *PointsWriter

PointsWriter returns a PointsWriter value that support fast asynchronous writing of points to Influx. All the points are written into the given bucket.

The returned PointsWriter must be closed after use to release resources and flush any buffered points.

Example
package main

import (
	"fmt"
	"math/rand"
	"time"

	"github.com/vlastahajek/influxdb-client-go/v3/influxclient"
)

func main() {
	wp := influxclient.DefaultWriteParams
	// Set batch size to write 100 points in 2 batches
	wp.BatchSize = 50
	// Set callback for failed writes
	wp.WriteFailed = func(err error, lines []byte, attempt int, expires time.Time) bool {
		fmt.Println("Write failed", err)
		return true
	}
	// Create client with custom WriteParams
	client, err := influxclient.New(influxclient.Params{
		ServerURL: "https://eu-central-1-1.aws.cloud2.influxdata.com/",
		AuthToken: "my-token",
		//		Organization: "my-org", // Organization is optional for InfluxDB Cloud
		WriteParams: wp,
	})
	if err != nil {
		panic(err)
	}
	// client.Close() have to be called to clean http connections
	defer client.Close()
	// Get async writer
	writer := client.PointsWriter("my-bucket")
	// writer.Close() MUST be called at the end to ensure completing background operations and cleaning resources
	defer writer.Close()
	// write some points
	for i := 0; i < 100; i++ {
		// create point
		p := influxclient.NewPointWithMeasurement("stat").
			AddTag("id", fmt.Sprintf("rack_%v", i%10)).
			AddTag("vendor", "AWS").
			AddTag("hostname", fmt.Sprintf("host_%v", i%100)).
			AddField("temperature", rand.Float64()*80.0).
			AddField("disk_free", rand.Float64()*1000.0).
			AddField("disk_total", (i/10+1)*1000000).
			AddField("mem_total", (i/100+1)*10000000).
			AddField("mem_free", rand.Uint64()).
			SetTimestamp(time.Now())
		// write asynchronously
		writer.WritePoints(p)
	}
}
Output:

func (*Client) Query

func (c *Client) Query(ctx context.Context, query string, queryParams interface{}) (*QueryResultReader, error)

Query sends the given Flux query to server and returns QueryResultReader for further parsing result. The result must be closed after use.

Flux query can contain reference to parameters, which must be passed via queryParams. it can be a struct or map. Param values can be only simple types or time.Time. Name of a struct field or a map key (must be a string) will be a param name.

Fields of a struct can be more specified by json annotations:

  type Condition struct {
     Start  time.Time  `json:"start"`
     Field  string     `json:"field"`
     Value  float64    `json:"value"`
  }

  cond  := Condition {
	  "iot_center",
	  "Temperature",
	  "-10m",
	  23.0,
 }

Parameters are then accessed via the params object:

 query:= `from(bucket: "environment")
	 |> range(start: time(v: params.start))
	 |> filter(fn: (r) => r._measurement == "air")
	 |> filter(fn: (r) => r._field == params.field)
	 |> filter(fn: (r) => r._value > params.value)`

And used in the call to Query:

result, err := client.Query(ctx, query, cond);

Use QueryResultReader.NextSection() for navigation to the sections in the query result set.

Use QueryResultReader.NextRow() for iterating over rows in the section.

Read the row raw data using QueryResultReader.Row() or deserialize data into a struct or a slice via QueryResultReader.Decode:

 val := &struct {
	 Time  time.Time	`csv:"_time"`
	 Temp  float64		`csv:"_value"`
	 Sensor string		`csv:"sensor"`
 }{}
 err = result.Decode(val)
Example
package main

import (
	"context"
	"fmt"
	"os"
	"text/tabwriter"
	"time"

	"github.com/vlastahajek/influxdb-client-go/v3/influxclient"
)

func main() {
	// Create client
	client, err := influxclient.New(influxclient.Params{
		ServerURL: "https://eu-central-1-1.aws.cloud2.influxdata.com/",
		AuthToken: "my-token",
		//		Organization: "my-org", // Organization is optional for InfluxDB Cloud
	})
	if err != nil {
		panic(err)
	}
	defer client.Close()

	// Define query parameters
	params := struct {
		Since       string  `json:"since"`
		GreaterThan float64 `json:"greaterThan"`
	}{
		"-10m",
		23.0,
	}
	// Prepare a query
	query := `from(bucket: "iot_center") 
		|> range(start: duration(v: params.since)) 
		|> filter(fn: (r) => r._measurement == "environment")
		|> filter(fn: (r) => r._field == "Temperature")
		|> filter(fn: (r) => r._value > params.greaterThan)`

	// Execute query
	res, err := client.Query(context.Background(), query, params)
	if err != nil {
		panic(err)
	}

	// Make sure query result is always closed
	defer res.Close()

	// Declare custom type for data
	val := &struct {
		Time   time.Time `csv:"_time"`
		Temp   float64   `csv:"_value"`
		Sensor string    `csv:"sensor"`
	}{}

	tw := tabwriter.NewWriter(os.Stdout, 10, 4, 2, ' ', 0)
	fmt.Fprintf(tw, "Time\tTemp\tSensor\n")

	// Iterate over result set
	for res.NextSection() {
		for res.NextRow() {
			err = res.Decode(val)
			if err != nil {
				fmt.Fprintf(tw, "%v\n", err)
				continue
			}
			fmt.Fprintf(tw, "%s\t%.2f\t%s\n", val.Time.String(), val.Temp, val.Sensor)
		}
	}
	tw.Flush()
	if res.Err() != nil {
		panic(res.Err())
	}
}
Output:

func (*Client) Ready

func (c *Client) Ready(ctx context.Context) (time.Duration, error)

Ready checks that the server is ready, and reports the duration the instance has been up if so. It does not validate authentication parameters. See https://docs.influxdata.com/influxdb/v2.0/api/#operation/GetReady.

func (*Client) TasksAPI

func (c *Client) TasksAPI() *TasksAPI

TasksAPI returns a value that can be used to interact with the task-related parts of the InfluxDB API.

func (*Client) UsersAPI

func (c *Client) UsersAPI() *UsersAPI

UsersAPI returns a value that can be used to interact with the user-related parts of the InfluxDB API.

func (*Client) Write

func (c *Client) Write(ctx context.Context, bucket string, buff []byte) error

Write writes line protocol record(s) to the server into the given bucket. Multiple records must be separated by the new line character (\n) Data are written synchronously. For a higher throughput API that buffers individual points and writes them asynchronously, use the PointsWriter method.

Example
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/vlastahajek/influxdb-client-go/v3/influxclient"
)

func main() {
	// Create client
	client, err := influxclient.New(influxclient.Params{
		ServerURL: "https://eu-central-1-1.aws.cloud2.influxdata.com/",
		AuthToken: "my-token",
		//		Organization: "my-org", // Organization is optional for InfluxDB Cloud
	})
	if err != nil {
		panic(err)
	}
	// Close client at the end
	defer client.Close()
	// Custom data record
	sensorData := struct {
		ID          string
		Temperature float64
		Humidity    int
	}{"1012", 22.3, 55}

	// Create a line protocol from record
	line := fmt.Sprintf("air,device_id=%v,sensor=SHT31 humidity=%di,temperature=%f %d\n", sensorData.ID, sensorData.Humidity, sensorData.Temperature, time.Now().UnixNano())
	// Write data
	err = client.Write(context.Background(), "my-bucket", []byte(line))
	if err != nil {
		panic(err)
	}

}
Output:

func (*Client) WriteData

func (c *Client) WriteData(ctx context.Context, bucket string, points ...interface{}) error

WriteData encodes fields of custom points into line protocol and writes line protocol record(s) to the server into the given bucket. Each custom point must be annotated with 'lp' prefix and values measurement,tag, field or timestamp. Valid point must contain measurement and at least one field.

A field with timestamp must be of a type time.Time

 type TemperatureSensor struct {
	  Measurement string `lp:"measurement"`
	  Sensor string `lp:"tag,sensor"`
	  ID string `lp:"tag,device_id"`
	  Temp float64 `lp:"field,temperature"`
	  Hum int	`lp:"field,humidity"`
	  Time time.Time `lp:"timestamp"`
	  Description string `lp:"-"`
 }

The points are written synchronously. For a higher throughput API that buffers individual points and writes them asynchronously, use the PointsWriter method.

Example
package main

import (
	"context"
	"time"

	"github.com/vlastahajek/influxdb-client-go/v3/influxclient"
)

func main() {
	// Create client
	client, err := influxclient.New(influxclient.Params{
		ServerURL: "https://eu-central-1-1.aws.cloud2.influxdata.com/",
		AuthToken: "my-token",
		//		Organization: "my-org", // Organization is optional for InfluxDB Cloud
	})
	if err != nil {
		panic(err)
	}
	// Close client at the end
	defer client.Close()

	sensorData := struct {
		Table       string    `lp:"measurement"`
		Sensor      string    `lp:"tag,sensor"`
		ID          string    `lp:"tag,device_id"`
		Temperature float64   `lp:"field,temperature"`
		Humidity    int       `lp:"field,humidity"`
		Time        time.Time `lp:"timestamp"`
	}{"air", "SHT31", "1012", 22.3, 55, time.Now()}

	// Write point
	err = client.WriteData(context.Background(), "my-bucket", sensorData)
	if err != nil {
		panic(err)
	}
}
Output:

func (*Client) WritePoints

func (c *Client) WritePoints(ctx context.Context, bucket string, points ...*Point) error

WritePoints writes all the given points to the server into the given bucket. The points are written synchronously. For a higher throughput API that buffers individual points and writes them asynchronously, use the PointsWriter method.

Example
package main

import (
	"context"
	"time"

	"github.com/vlastahajek/influxdb-client-go/v3/influxclient"
)

func main() {
	// Create client
	client, err := influxclient.New(influxclient.Params{
		ServerURL: "https://eu-central-1-1.aws.cloud2.influxdata.com/",
		AuthToken: "my-token",
		//		Organization: "my-org", // Organization is optional for InfluxDB Cloud
	})
	if err != nil {
		panic(err)
	}
	// Close client at the end
	defer client.Close()

	// Create a point
	sensorData := influxclient.NewPointWithMeasurement("air").SetTimestamp(time.Now())
	// Add tag
	sensorData.AddTag("sensor", "1012")
	// Add fields
	sensorData.AddField("temperature", 22.3).AddField("humidity", 55)

	// Write point
	err = client.WritePoints(context.Background(), "my-bucket", sensorData)
	if err != nil {
		panic(err)
	}
}
Output:

type DeleteParams

type DeleteParams struct {
	// Bucket holds bucket name.
	Bucket string
	// BucketID holds bucket ID.
	BucketID string
	// Org holds organization name.
	Org string
	// OrgID holds organization ID.
	OrgID string
	// Predicate is an expression in delete predicate syntax.
	Predicate string
	// Start is the earliest time to delete from.
	Start time.Time
	// Stop is the latest time to delete from.
	Stop time.Time
}

DeleteParams holds options for DeletePoints.

type Field

type Field struct {
	Key   string
	Value lineprotocol.Value
}

Field holds the keys and values for a bunch of Metric Field k/v pairs where Value can be a uint64, int64, int, float32, float64, string, or bool.

type Filter

type Filter struct {
	// Filter by resource ID
	ID string

	// Filter by resource name
	Name string

	// Filter by organization name
	OrgName string

	// Filter by organization ID
	OrgID string

	// Filter by user ID
	UserID string

	// Filter by user name
	UserName string

	// Filter by status
	Status string

	// Maximum number of returned entities in a single request
	Limit uint

	// Starting offset for returning entities
	Offset uint

	// After specifies that the search results should start after the item of given ID.
	After string

	// BeforeTime selects items to those scheduled before this time.
	BeforeTime time.Time

	// AfterTime selects items to those scheduled after this time.
	AfterTime time.Time
}

The Filter can be used to select the entities returned by the Find operations. Not all filter kinds are supported by all endpoints: see the individual API documentation for details.

Note that Filter also provides support for paging (using Offset and Limit) - this is only useful with filters that might return more than one entity.

For example, to a maximum of 10 buckets that have find all buckets that are associated with the organization "example.com" starting 40 entries into the results:

client.BucketsAPI().Find(ctx context.Context, &influxapi.Filter{
	OrgName: "example.com",
	Limit: 10,
	Offset: 40,
})

type LabelsAPI

type LabelsAPI struct {
	// contains filtered or unexported fields
}

LabelsAPI provides methods for managing labels in a InfluxDB server.

Example
package main

import (
	"context"

	"github.com/vlastahajek/influxdb-client-go/v3/influxclient"
	"github.com/vlastahajek/influxdb-client-go/v3/influxclient/model"
)

func main() {
	// Create a new client using an InfluxDB server base URL and an authentication token
	client, err := influxclient.New(influxclient.Params{
		ServerURL: "https://eu-central-1-1.aws.cloud2.influxdata.com/",
		AuthToken: "my-token",
		//		Organization: "my-org", // Organization is optional for InfluxDB Cloud
	})

	// Get Labels API client
	labelsAPI := client.LabelsAPI()
	ctx := context.Background()

	// Get organization that will own label
	org, err := client.OrganizationsAPI().FindOne(ctx, &influxclient.Filter{Name: "org-name"})
	if err != nil {
		panic(err)
	}

	labelName := "Active State"
	props := map[string]string{"color": "33ffdd", "description": "Marks org active"}
	label, err := labelsAPI.Create(ctx, &model.Label{
		OrgID: org.Id,
		Name:  &labelName,
		Properties: &model.Label_Properties{
			AdditionalProperties: props,
		},
	})
	if err != nil {
		panic(err)
	}
	defer labelsAPI.Delete(ctx, *label.Id) // only for E2E tests

	// Change color property
	label.Properties.AdditionalProperties = map[string]string{"color": "ff1122"}
	label, err = labelsAPI.Update(ctx, label)
	if err != nil {
		panic(err)
	}
}
Output:

func (*LabelsAPI) Create

func (a *LabelsAPI) Create(ctx context.Context, label *model.Label) (*model.Label, error)

Create creates a new label with the given information. The label.Name field must be non-empty. The returned Label holds the ID of the new label.

func (*LabelsAPI) Delete

func (a *LabelsAPI) Delete(ctx context.Context, labelID string) error

Delete deletes the label with the given ID.

func (*LabelsAPI) Find

func (a *LabelsAPI) Find(ctx context.Context, filter *Filter) ([]model.Label, error)

Find returns labels matching the given filter. Supported filters:

  • OrgID

func (*LabelsAPI) FindOne

func (a *LabelsAPI) FindOne(ctx context.Context, filter *Filter) (*model.Label, error)

FindOne returns one label matching the given filter. Supported filters:

  • OrgID

func (*LabelsAPI) Update

func (a *LabelsAPI) Update(ctx context.Context, label *model.Label) (*model.Label, error)

Update updates the label's name and properties. If the name is empty, it won't be changed. If a property isn't mentioned, it won't be changed. A property can be removed by using an empty value for that property.

Update returns the fully updated label.

type NewRetryStrategyF

type NewRetryStrategyF func(params RetryParams) RetryStrategy

NewRetryStrategyF factory function creates a new RetryStrategy

func GetRetryStrategyFactory

func GetRetryStrategyFactory() NewRetryStrategyF

type OnRemoveCallback

type OnRemoveCallback func(lines []byte, retryCountDown int, expires time.Time)

OnRemoveCallback is callback used when inform about removed batch from retry buffer Params:

lines - lines that were skipped
retryCountDown - number of remaining attempts
expires - time when batch expires

type OrganizationsAPI

type OrganizationsAPI struct {
	// contains filtered or unexported fields
}

OrganizationsAPI holds methods related to organization, as found under the /orgs endpoint.

Example
package main

import (
	"context"

	"github.com/vlastahajek/influxdb-client-go/v3/influxclient"
	"github.com/vlastahajek/influxdb-client-go/v3/influxclient/model"
)

func main() {
	// Create a new client using an InfluxDB server base URL and an authentication token
	client, err := influxclient.New(influxclient.Params{
		ServerURL: "https://eu-central-1-1.aws.cloud2.influxdata.com/",
		AuthToken: "my-token",
		//		Organization: "my-org", // Organization is optional for InfluxDB Cloud
	})

	// Get Organizations API client
	orgAPI := client.OrganizationsAPI()
	ctx := context.Background()

	// Create new organization
	org, err := orgAPI.Create(ctx, &model.Organization{Name: "org-name-2"})
	if err != nil {
		panic(err)
	}
	defer orgAPI.Delete(ctx, *org.Id) // only for E2E tests

	orgDescription := "My second org"
	org.Description = &orgDescription
	org, err = orgAPI.Update(ctx, org)
	if err != nil {
		panic(err)
	}

	// Create new user to add to org
	newUser, err := client.UsersAPI().Create(ctx, &model.User{Name: "user-name-2"})
	if err != nil {
		panic(err)
	}
	defer client.UsersAPI().Delete(ctx, *newUser.Id) // only for E2E tests

	// Add new user to organization
	err = orgAPI.AddMember(ctx, *org.Id, *newUser.Id)
	if err != nil {
		panic(err)
	}

}
Output:

func (*OrganizationsAPI) AddMember

func (o *OrganizationsAPI) AddMember(ctx context.Context, orgID, userID string) error

AddMember adds the user with the given ID to the organization with the given ID.

func (*OrganizationsAPI) AddOwner

func (o *OrganizationsAPI) AddOwner(ctx context.Context, orgID, userID string) error

AddOwner adds an owner with the given userID to the organization with the given id.

func (*OrganizationsAPI) Create

Create creates a new organization. The returned Organization holds the new ID.

func (*OrganizationsAPI) Delete

func (o *OrganizationsAPI) Delete(ctx context.Context, orgID string) error

Delete deletes the organization with the given ID.

func (*OrganizationsAPI) Find

func (o *OrganizationsAPI) Find(ctx context.Context, filter *Filter) ([]model.Organization, error)

Find returns all organizations matching the given filter. Supported filters:

  • OrgName
  • OrgID
  • UserID

func (*OrganizationsAPI) FindOne

func (o *OrganizationsAPI) FindOne(ctx context.Context, filter *Filter) (*model.Organization, error)

FindOne returns one organization matching the given filter. Supported filters:

  • OrgName
  • OrgID
  • UserID

func (*OrganizationsAPI) Members

func (o *OrganizationsAPI) Members(ctx context.Context, orgID string) ([]model.ResourceMember, error)

Members returns all members of the organization with the given ID.

func (*OrganizationsAPI) Owners

func (o *OrganizationsAPI) Owners(ctx context.Context, orgID string) ([]model.ResourceOwner, error)

Owners returns all the owners of the organization with the given id.

func (*OrganizationsAPI) RemoveMember

func (o *OrganizationsAPI) RemoveMember(ctx context.Context, orgID, userID string) error

RemoveMember removes the user with the given ID from the organization with the given ID.

func (*OrganizationsAPI) RemoveOwner

func (o *OrganizationsAPI) RemoveOwner(ctx context.Context, orgID, userID string) error

RemoveOwner Remove removes the user with the given userID from the organization with the given id.

func (*OrganizationsAPI) Update

Update updates information about the organization. The org.ID field must hold the ID of the organization to be changed.

type Params

type Params struct {
	// ServerURL holds the URL of the InfluxDB server to connect to.
	// This must be non-empty. E.g. http://localhost:8086
	ServerURL string

	// AuthToken holds the authorization token for the API.
	// This can be obtained through the GUI web browser interface.
	AuthToken string

	// Organization is name or ID of organization where data (buckets, users, tasks, etc.) belongs to
	// Optional for InfluxDB Cloud
	Organization string

	// HTTPClient is used to make API requests.
	//
	// This can be used to specify a custom TLS configuration
	// (TLSClientConfig), a custom request timeout (Timeout),
	// or other customization as required.
	//
	// It HTTPClient is nil, http.DefaultClient will be used.
	HTTPClient *http.Client
	// Write Params
	WriteParams WriteParams
}

Params holds the parameters for creating a new client. The only mandatory field is ServerURL. AuthToken is also important if authentication was not done outside this client.

type Point

type Point struct {
	Measurement string
	Tags        []Tag
	Fields      []Field
	Timestamp   time.Time
}

Point is represents InfluxDB time series point, holding tags and fields

func NewPoint

func NewPoint(measurement string, tags map[string]string, fields map[string]interface{}, ts time.Time) *Point

NewPoint is a convenient function for creating a Point from measurement name, tags, fields and a timestamp.

func NewPointWithMeasurement

func NewPointWithMeasurement(measurement string) *Point

NewPointWithMeasurement is a convenient function for creating a Point from measurement name for later adding data

func (*Point) AddField

func (m *Point) AddField(k string, v interface{}) *Point

AddField adds a field to a point.

func (*Point) AddTag

func (m *Point) AddTag(k, v string) *Point

AddTag adds a tag to a point.

func (*Point) MarshalBinary

func (m *Point) MarshalBinary(precision lineprotocol.Precision) ([]byte, error)

func (*Point) SetTimestamp

func (m *Point) SetTimestamp(t time.Time) *Point

SetTimestamp is helper function for complete fluent interface

func (*Point) SortFields

func (m *Point) SortFields() *Point

SortFields orders the fields of a point alphanumerically by key.

func (*Point) SortTags

func (m *Point) SortTags() *Point

SortTags orders the tags of a point alphanumerically by key. This is just here as a helper, to make it easy to keep tags sorted if you are creating a Point manually.

type PointsWriter

type PointsWriter struct {
	// contains filtered or unexported fields
}

PointsWriter is asynchronous writer with automated batching and retrying capabilities. It is parametrized by the WriteParams. It is obtained using the Client.PointsWriter() Use Write, WriteData or WritePoints for sending data Any error encountered during asynchronous processing is reported by WriteParams.WriteFailed callback. It must be created using NewPointsWriter All functions are thread-safe and can be used from different go-routines.

func NewPointsWriter

func NewPointsWriter(writer BytesWrite, bucket string, params WriteParams) *PointsWriter

NewPointsWriter creates fast asynchronous PointsWriter writing to a bucket using given writer according the params

func (*PointsWriter) Close

func (p *PointsWriter) Close()

Close stops internal routines and closes resources Must be called by user at the end

func (*PointsWriter) Flush

func (p *PointsWriter) Flush()

Flush asynchronously flushes write buffer. This enforces sending data on demand, even when flush conditions (batch size, flush interval, max batch bytes) are not met.

func (*PointsWriter) Write

func (p *PointsWriter) Write(line []byte)

Write asynchronously writes line protocol record(s) to the server. Multiple records must be separated by the new line character (\n).

func (*PointsWriter) WriteData

func (p *PointsWriter) WriteData(points ...interface{})

WriteData asynchronously encodes fields of custom points into line protocol and writes line protocol record(s) to the server into the given bucket. Any error encountered during asynchronous processing is reported by WriteParams.WriteFailed callback. Each custom point must be annotated with 'lp' prefix and values measurement,tag, field or timestamp. Valid point must contain measurement and at least one field.

A field with timestamp must be of a type time.Time

 type TemperatureSensor struct {
	  Measurement string `lp:"measurement"`
	  Sensor string `lp:"tag,sensor"`
	  ID string `lp:"tag,device_id"`
	  Temp float64 `lp:"field,temperature"`
	  Hum int	`lp:"field,humidity"`
	  Time time.Time `lp:"timestamp"`
	  Description string `lp:"-"`
 }

func (*PointsWriter) WritePoints

func (p *PointsWriter) WritePoints(points ...*Point)

WritePoints asynchronously writes all the given points to the server. Any error encountered during encoding points is reported by WriteParams.WriteFailed callback.

type QueryError

type QueryError struct {
	// Message is a Flux query error message
	Message string `csv:"error"`
	// Code is an Flux query error code
	Code int64 `csv:"reference"`
}

QueryError defines the information of Flux query error

func (*QueryError) Error

func (e *QueryError) Error() string

type QueryResultReader

type QueryResultReader struct {
	*annotatedcsv.Reader
	// contains filtered or unexported fields
}

QueryResultReader enhances annotatedcsv.Reader by allowing NextRow to go straight to the first data line and by treating error section as an error. QueryResultReader must be closed by calling Close at the end of reading.

func NewQueryResultReader

func NewQueryResultReader(r io.ReadCloser) *QueryResultReader

NewQueryResultReader returns new QueryResultReader for parsing Flux query result stream.

func (*QueryResultReader) Close

func (r *QueryResultReader) Close() error

Close closes the underlying reader

func (*QueryResultReader) Err

func (r *QueryResultReader) Err() error

Err returns any error encountered when parsing.

func (*QueryResultReader) NextRow

func (r *QueryResultReader) NextRow() bool

NextRow is like annotatedcsv.Reader.NextRow except if it is called in the beginning it advances to the first section. When an error section is encountered, NextRow will return false and Err will return the error.

func (*QueryResultReader) NextSection

func (r *QueryResultReader) NextSection() bool

NextSection is like annotatedcsv.Reader.NextSection except that it treats error sections as a terminating section. When an error section is encountered, NextSection will return false and Err will return the error.

type RetryBuffer

type RetryBuffer struct {
	// contains filtered or unexported fields
}

func NewRetryBuffer

func NewRetryBuffer(maxSize int, retryLines RetryLines, onRemove OnRemoveCallback) *RetryBuffer

func (*RetryBuffer) AddLines

func (r *RetryBuffer) AddLines(lines []byte, retryCount, delay int, expires time.Time)

func (*RetryBuffer) Close

func (r *RetryBuffer) Close() int

func (*RetryBuffer) Flush

func (r *RetryBuffer) Flush()

type RetryItem

type RetryItem struct {
	Lines      []byte
	RetryCount int
	RetryTime  time.Time
	Expires    time.Time
	Next       *RetryItem
}

type RetryLines

type RetryLines func(lines []byte, retryCountDown int, expires time.Time)

type RetryParams

type RetryParams struct {
	// Default retry interval in ms, if not sent by server. Default 5,000.
	RetryInterval int
	// Maximum count of retry attempts of failed writes, default 5.
	MaxRetries int
	// Maximum number of points to keep for retry. Should be multiple of BatchSize. Default 50,000.
	RetryBufferLimit int
	// The maximum delay between each retry attempt in milliseconds, default 125,000.
	MaxRetryInterval int
	// The maximum total retry timeout in milliseconds, default 180,000.
	MaxRetryTime int
	// The base for the exponential retry delay
	ExponentialBase int
	// Max random value added to the retry delay in milliseconds, default 200
	RetryJitter int
}

RetryParams configures retry behavior used by PointsWriter

type RetryStrategy

type RetryStrategy interface {
	// NextDelay returns delay for a next retry
	//  - error - reason for retrying
	//  - failedAttempts - a count of already failed attempts, 1 being the first
	// Returns milliseconds to wait before retrying
	NextDelay(err error, failedAttempts int) int
	// Success implementation should reset its state, this is mandatory to call upon success
	Success()
}

RetryStrategy is a strategy for calculating retry delays.

func NewDefaultRetryStrategy

func NewDefaultRetryStrategy(params RetryParams) RetryStrategy

type ServerError

type ServerError struct {
	// Code holds the Influx error code, or empty if the code is unknown.
	Code string `json:"code"`
	// Message holds the error message.
	Message string `json:"message"`
	// StatusCode holds the HTTP response status code.
	StatusCode int `json:"-"`
	// RetryAfter holds the value of Retry-After header if sent by server, otherwise zero
	RetryAfter int `json:"-"`
}

ServerError holds InfluxDB server error info ServerError represents an error returned from an InfluxDB API server.

func NewServerError

func NewServerError(message string) *ServerError

NewServerError returns new with just a message

func (ServerError) Error

func (e ServerError) Error() string

Error implements Error interface

type Tag

type Tag struct {
	Key   string
	Value string
}

Tag holds the keys and values for a bunch of Tag k/v pairs.

type TasksAPI

type TasksAPI struct {
	// contains filtered or unexported fields
}

TasksAPI holds methods related to organization, as found under the /tasks endpoint.

Example
package main

import (
	"context"
	"fmt"
	"os"
	"time"

	"github.com/vlastahajek/influxdb-client-go/v3/influxclient"
	"github.com/vlastahajek/influxdb-client-go/v3/influxclient/model"
)

func main() {
	// Create a new client using an InfluxDB server base URL and an authentication token
	client, err := influxclient.New(influxclient.Params{
		ServerURL: "https://eu-central-1-1.aws.cloud2.influxdata.com/",
		AuthToken: "my-token",
		//		Organization: "my-org", // Organization is optional for InfluxDB Cloud
	})

	// Get Delete API client
	tasksAPI := client.TasksAPI()
	ctx := context.Background()

	// Get organization that will own task
	org, err := client.OrganizationsAPI().FindOne(ctx, &influxclient.Filter{Name: "org-name"})
	if err != nil {
		panic(err)
	}

	// task flux script from https://www.influxdata.com/blog/writing-tasks-and-setting-up-alerts-for-influxdb-cloud/
	flux := `fruitCollected = from(bucket: "farming")
  |> range(start: -task.every)
  |> filter(fn: (r)  => r["_measurement"] == "totalFruitsCollected")
  |> filter(fn: (r)  => r["_field"] == "fruits")
  |> group(columns: ["farmName"])
  |> aggregateWindow(fn: sum, every: task.every)
  |> map(fn: (r) => ({
     _time: r._time,  _stop: r._stop, _start: r._start, _measurement: "fruitCollectionRate", _field: "fruits", _value: r._value, farmName: r.farmName,
  }))

fruitCollected 
  |> to(bucket: "farming")
`
	every := "1h"
	task, err := tasksAPI.Create(ctx, &model.Task{
		OrgID: *org.Id,
		Name:  "fruitCollectedRate",
		Flux:  flux,
		Every: &every,
	})
	if err != nil {
		panic(err)
	}
	defer tasksAPI.Delete(ctx, task.Id) // only for E2E tests

	// Force running a task
	run, err := tasksAPI.RunManually(ctx, task.Id)
	if err != nil {
		panic(err)
	}

	// Print run info
	fmt.Fprint(os.Stderr, "\tForced run scheduled for ", *run.ScheduledFor, " with status ", *run.Status, "\n")
	//wait for tasks to start and be running
	<-time.After(1500 * time.Millisecond)

	// Find logs
	logs, err := tasksAPI.FindRunLogs(ctx, task.Id, *run.Id)
	if err != nil {
		panic(err)
	}

	// Print logs
	fmt.Fprintln(os.Stderr, "\tLogs:")
	for _, logEvent := range logs {
		fmt.Fprint(os.Stderr, "\t Time:", *logEvent.Time, ", Message: ", *logEvent.Message, "\n")
	}
}
Output:

func (*TasksAPI) AddLabel

func (a *TasksAPI) AddLabel(ctx context.Context, taskID, labelID string) (*model.Label, error)

AddLabel adds a label with given ID to a task with given ID.

func (*TasksAPI) AddMember

func (a *TasksAPI) AddMember(ctx context.Context, taskID, userID string) error

AddMember adds the user with the given ID to the task with the given ID.

func (*TasksAPI) AddOwner

func (a *TasksAPI) AddOwner(ctx context.Context, taskID, userID string) error

AddOwner adds an owner with the given userID to the task with the given id.

func (*TasksAPI) CancelRun

func (a *TasksAPI) CancelRun(ctx context.Context, taskID, runID string) error

CancelRun cancels a running task with given ID and given run ID.

func (*TasksAPI) Create

func (a *TasksAPI) Create(ctx context.Context, task *model.Task) (*model.Task, error)

Create creates a new task according the the task object. Set OrgId, Name, Description, Flux, Status and Every or Cron properties. Every and Cron are mutually exclusive. Every has higher priority.

func (*TasksAPI) Delete

func (a *TasksAPI) Delete(ctx context.Context, taskID string) error

Delete deletes the task with the given ID.

func (*TasksAPI) Find

func (a *TasksAPI) Find(ctx context.Context, filter *Filter) ([]model.Task, error)

Find returns all tasks matching the given filter. Supported filters:

  After
  Name
  OrgName
	 OrgID
	 UserName
  Status
  Limit

func (*TasksAPI) FindLabels

func (a *TasksAPI) FindLabels(ctx context.Context, taskID string) ([]model.Label, error)

FindLabels retrieves labels of a task with given ID.

func (*TasksAPI) FindLogs

func (a *TasksAPI) FindLogs(ctx context.Context, taskID string) ([]model.LogEvent, error)

FindLogs retrieves all logs for a task with given ID.

func (*TasksAPI) FindOne

func (a *TasksAPI) FindOne(ctx context.Context, filter *Filter) (*model.Task, error)

FindOne returns one task matching the given filter. Supported filters:

  After
  Name
  OrgName
	 OrgID
	 UserName
  Status
  Limit

func (*TasksAPI) FindOneRun

func (a *TasksAPI) FindOneRun(ctx context.Context, taskID string, filter *Filter) (*model.Run, error)

FindOneRun returns one task run that matches the given filter. Supported filters:

ID

TODO or just pass runID instead of a filter?

func (*TasksAPI) FindRunLogs

func (a *TasksAPI) FindRunLogs(ctx context.Context, taskID, runID string) ([]model.LogEvent, error)

FindRunLogs return all log events for a task run with given ID.

func (*TasksAPI) FindRuns

func (a *TasksAPI) FindRuns(ctx context.Context, taskID string, filter *Filter) ([]model.Run, error)

FindRuns returns a task runs according the filter. Supported filters:

After
AfterTime
BeforeTime
Limit

func (*TasksAPI) Members

func (a *TasksAPI) Members(ctx context.Context, taskID string) ([]model.ResourceMember, error)

Members returns all members of the task with the given ID.

func (*TasksAPI) Owners

func (a *TasksAPI) Owners(ctx context.Context, taskID string) ([]model.ResourceOwner, error)

Owners returns all the owners of the task with the given id.

func (*TasksAPI) RemoveLabel

func (a *TasksAPI) RemoveLabel(ctx context.Context, taskID, labelID string) error

RemoveLabel removes a label with given ID from a task with given ID.

func (*TasksAPI) RemoveMember

func (a *TasksAPI) RemoveMember(ctx context.Context, taskID, userID string) error

RemoveMember removes the user with the given ID from the task with the given ID.

func (*TasksAPI) RemoveOwner

func (a *TasksAPI) RemoveOwner(ctx context.Context, taskID, userID string) error

RemoveOwner removes the user with the given userID from the task with the given id.

func (*TasksAPI) RetryRun

func (a *TasksAPI) RetryRun(ctx context.Context, taskID, runID string) (*model.Run, error)

RetryRun retry a run with given ID of a task with given ID.

func (*TasksAPI) RunManually

func (a *TasksAPI) RunManually(ctx context.Context, taskID string) (*model.Run, error)

RunManually manually start a run of a task with given ID now, overriding the current schedule.

func (*TasksAPI) Update

func (a *TasksAPI) Update(ctx context.Context, task *model.Task) (*model.Task, error)

Update updates a task. The task.ID field must be specified. The complete task information is returned.

type UsersAPI

type UsersAPI struct {
	// contains filtered or unexported fields
}

UsersAPI holds methods related to user, as found under the /users endpoint.

Example
package main

import (
	"context"

	"github.com/vlastahajek/influxdb-client-go/v3/influxclient"
	"github.com/vlastahajek/influxdb-client-go/v3/influxclient/model"
)

func main() {
	// Create a new client using an InfluxDB server base URL and an authentication token
	client, err := influxclient.New(influxclient.Params{
		ServerURL: "https://eu-central-1-1.aws.cloud2.influxdata.com/",
		AuthToken: "my-token",
		//		Organization: "my-org", // Organization is optional for InfluxDB Cloud
	})

	ctx := context.Background()

	// Find organization
	org, err := client.OrganizationsAPI().FindOne(ctx, &influxclient.Filter{Name: "org-name"})
	if err != nil {
		panic(err)
	}

	// Get users API client
	usersAPI := client.UsersAPI()

	// Create new user
	user, err := usersAPI.Create(ctx, &model.User{Name: "user-01"})
	if err != nil {
		panic(err)
	}
	defer usersAPI.Delete(ctx, *user.Id) // only for E2E tests

	// Set user password
	err = usersAPI.SetPassword(ctx, *user.Id, "pass-at-least-8-chars")
	if err != nil {
		panic(err)
	}

	// Add user to organization
	err = client.OrganizationsAPI().AddMember(ctx, *org.Id, *user.Id)
	if err != nil {
		panic(err)
	}
}
Output:

func (*UsersAPI) Create

func (a *UsersAPI) Create(ctx context.Context, user *model.User) (*model.User, error)

Create creates a user. The user.Name must not be empty.

func (*UsersAPI) Delete

func (a *UsersAPI) Delete(ctx context.Context, userID string) error

Delete deletes the user with the given ID.

func (*UsersAPI) Find

func (a *UsersAPI) Find(ctx context.Context, filter *Filter) ([]model.User, error)

Find returns all users matching the given filter. Supported filters:

  • ID
  • Name

func (*UsersAPI) FindOne

func (a *UsersAPI) FindOne(ctx context.Context, filter *Filter) (*model.User, error)

FindOne returns one user matching the given filter. Supported filters:

  • ID
  • Name

func (*UsersAPI) SetMyPassword

func (a *UsersAPI) SetMyPassword(ctx context.Context, oldPassword, newPassword string) error

SetMyPassword sets the password associated with the current user. The oldPassword parameter must match the previously set password for the user.

func (*UsersAPI) SetPassword

func (a *UsersAPI) SetPassword(ctx context.Context, userID, password string) error

SetPassword sets the password for the user with the given ID.

func (*UsersAPI) Update

func (a *UsersAPI) Update(ctx context.Context, user *model.User) (*model.User, error)

Update updates a user. The user.ID field must be specified. The complete user information is returned.

type WriteBuffer

type WriteBuffer struct {
	// contains filtered or unexported fields
}

WriteBuffer stores lines after line and flushes batch when maxLength (bach size) is reached or maxBytes exceeds

func (*WriteBuffer) Add

func (w *WriteBuffer) Add(line []byte)

func (*WriteBuffer) Flush

func (w *WriteBuffer) Flush()

func (*WriteBuffer) Reset

func (w *WriteBuffer) Reset() []byte

type WriteParams

type WriteParams struct {
	RetryParams
	// Maximum number of points sent to server in single request, used by PointsWriter. Default 5000
	BatchSize int
	// Maximum size of batch in bytes, used by PointsWriter. Default 50_000_000.
	MaxBatchBytes int
	// Interval, in ms, used by PointsWriter, in which is buffer flushed if it has not been already written (by reaching batch size) . Default 1000ms
	FlushInterval int
	// Precision to use in writes for timestamp.
	// Default lineprotocol.Nanosecond
	Precision lineprotocol.Precision
	// Tags added to each point during writing. If a point already has a tag with the same key, it is left unchanged.
	DefaultTags map[string]string
	// Write body larger than the threshold is gzipped. 0 to don't gzip at all
	GzipThreshold int
	// WriteFailed is called to inform about an error occurred during writing procedure.
	// It can be called when point encoding fails, sending batch over network fails or batch expires during retrying.
	// Params:
	//   error - write error.
	//   lines - failed batch of lines. nil in case of error occur before sending, e.g. in case of conversion error
	//   attempt - count of already failed attempts to write the lines (1 ... maxRetries+1). 0 if error occur before sending, e.g. in case of conversion error
	//   expires - expiration time for the lines to be retried in millis since epoch. 0 if error occur before sending, e.g. in case of conversion error
	// Returns true to continue using default retry mechanism (applies only in case of write of an error)
	WriteFailed func(err error, lines []byte, attempt int, expires time.Time) bool
	// WriteRetrySkipped is informed about lines that were removed from the retry buffer
	// to keep the size of the retry buffer under the configured limit (maxBufferLines).
	WriteRetrySkipped OnRemoveCallback
}

WriteParams holds configuration properties for write

Directories

Path Synopsis
Package gzip provides GZip related functionality
Package gzip provides GZip related functionality
Package model provides primitives to interact with the openapi HTTP API.
Package model provides primitives to interact with the openapi HTTP API.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL