Documentation ¶
Overview ¶
Package influxclient provides client for InfluxDB server.
Index ¶
- Variables
- func SetRetryStrategyFactory(f NewRetryStrategyF)
- type AuthorizationsAPI
- func (a *AuthorizationsAPI) Create(ctx context.Context, auth *model.Authorization) (*model.Authorization, error)
- func (a *AuthorizationsAPI) Delete(ctx context.Context, authID string) error
- func (a *AuthorizationsAPI) Find(ctx context.Context, filter *Filter) ([]model.Authorization, error)
- func (a *AuthorizationsAPI) FindOne(ctx context.Context, filter *Filter) (*model.Authorization, error)
- func (a *AuthorizationsAPI) SetStatus(ctx context.Context, authID string, ...) (*model.Authorization, error)
- type BucketsAPI
- func (a *BucketsAPI) Create(ctx context.Context, bucket *model.Bucket) (*model.Bucket, error)
- func (a *BucketsAPI) Delete(ctx context.Context, bucketID string) error
- func (a *BucketsAPI) Find(ctx context.Context, filter *Filter) ([]model.Bucket, error)
- func (a *BucketsAPI) FindOne(ctx context.Context, filter *Filter) (*model.Bucket, error)
- func (a *BucketsAPI) Update(ctx context.Context, bucket *model.Bucket) (*model.Bucket, error)
- type BytesWrite
- type Client
- func (c *Client) APIClient() *model.Client
- func (c *Client) AuthorizationsAPI() *AuthorizationsAPI
- func (c *Client) BucketsAPI() *BucketsAPI
- func (c *Client) Close() error
- func (c *Client) DeletePoints(ctx context.Context, params *DeleteParams) error
- func (c *Client) Health(ctx context.Context) (*model.HealthCheck, error)
- func (c *Client) LabelsAPI() *LabelsAPI
- func (c *Client) OrganizationsAPI() *OrganizationsAPI
- func (c *Client) Ping(ctx context.Context) error
- func (c *Client) PointsWriter(bucket string) *PointsWriter
- func (c *Client) Query(ctx context.Context, query string, queryParams interface{}) (*QueryResultReader, error)
- func (c *Client) Ready(ctx context.Context) (time.Duration, error)
- func (c *Client) TasksAPI() *TasksAPI
- func (c *Client) UsersAPI() *UsersAPI
- func (c *Client) Write(ctx context.Context, bucket string, buff []byte) error
- func (c *Client) WriteData(ctx context.Context, bucket string, points ...interface{}) error
- func (c *Client) WritePoints(ctx context.Context, bucket string, points ...*Point) error
- type DeleteParams
- type Field
- type Filter
- type LabelsAPI
- func (a *LabelsAPI) Create(ctx context.Context, label *model.Label) (*model.Label, error)
- func (a *LabelsAPI) Delete(ctx context.Context, labelID string) error
- func (a *LabelsAPI) Find(ctx context.Context, filter *Filter) ([]model.Label, error)
- func (a *LabelsAPI) FindOne(ctx context.Context, filter *Filter) (*model.Label, error)
- func (a *LabelsAPI) Update(ctx context.Context, label *model.Label) (*model.Label, error)
- type NewRetryStrategyF
- type OnRemoveCallback
- type OrganizationsAPI
- func (o *OrganizationsAPI) AddMember(ctx context.Context, orgID, userID string) error
- func (o *OrganizationsAPI) AddOwner(ctx context.Context, orgID, userID string) error
- func (o *OrganizationsAPI) Create(ctx context.Context, org *model.Organization) (*model.Organization, error)
- func (o *OrganizationsAPI) Delete(ctx context.Context, orgID string) error
- func (o *OrganizationsAPI) Find(ctx context.Context, filter *Filter) ([]model.Organization, error)
- func (o *OrganizationsAPI) FindOne(ctx context.Context, filter *Filter) (*model.Organization, error)
- func (o *OrganizationsAPI) Members(ctx context.Context, orgID string) ([]model.ResourceMember, error)
- func (o *OrganizationsAPI) Owners(ctx context.Context, orgID string) ([]model.ResourceOwner, error)
- func (o *OrganizationsAPI) RemoveMember(ctx context.Context, orgID, userID string) error
- func (o *OrganizationsAPI) RemoveOwner(ctx context.Context, orgID, userID string) error
- func (o *OrganizationsAPI) Update(ctx context.Context, org *model.Organization) (*model.Organization, error)
- type Params
- type Point
- type PointsWriter
- type QueryError
- type QueryResultReader
- type RetryBuffer
- type RetryItem
- type RetryLines
- type RetryParams
- type RetryStrategy
- type ServerError
- type Tag
- type TasksAPI
- func (a *TasksAPI) AddLabel(ctx context.Context, taskID, labelID string) (*model.Label, error)
- func (a *TasksAPI) AddMember(ctx context.Context, taskID, userID string) error
- func (a *TasksAPI) AddOwner(ctx context.Context, taskID, userID string) error
- func (a *TasksAPI) CancelRun(ctx context.Context, taskID, runID string) error
- func (a *TasksAPI) Create(ctx context.Context, task *model.Task) (*model.Task, error)
- func (a *TasksAPI) Delete(ctx context.Context, taskID string) error
- func (a *TasksAPI) Find(ctx context.Context, filter *Filter) ([]model.Task, error)
- func (a *TasksAPI) FindLabels(ctx context.Context, taskID string) ([]model.Label, error)
- func (a *TasksAPI) FindLogs(ctx context.Context, taskID string) ([]model.LogEvent, error)
- func (a *TasksAPI) FindOne(ctx context.Context, filter *Filter) (*model.Task, error)
- func (a *TasksAPI) FindOneRun(ctx context.Context, taskID string, filter *Filter) (*model.Run, error)
- func (a *TasksAPI) FindRunLogs(ctx context.Context, taskID, runID string) ([]model.LogEvent, error)
- func (a *TasksAPI) FindRuns(ctx context.Context, taskID string, filter *Filter) ([]model.Run, error)
- func (a *TasksAPI) Members(ctx context.Context, taskID string) ([]model.ResourceMember, error)
- func (a *TasksAPI) Owners(ctx context.Context, taskID string) ([]model.ResourceOwner, error)
- func (a *TasksAPI) RemoveLabel(ctx context.Context, taskID, labelID string) error
- func (a *TasksAPI) RemoveMember(ctx context.Context, taskID, userID string) error
- func (a *TasksAPI) RemoveOwner(ctx context.Context, taskID, userID string) error
- func (a *TasksAPI) RetryRun(ctx context.Context, taskID, runID string) (*model.Run, error)
- func (a *TasksAPI) RunManually(ctx context.Context, taskID string) (*model.Run, error)
- func (a *TasksAPI) Update(ctx context.Context, task *model.Task) (*model.Task, error)
- type UsersAPI
- func (a *UsersAPI) Create(ctx context.Context, user *model.User) (*model.User, error)
- func (a *UsersAPI) Delete(ctx context.Context, userID string) error
- func (a *UsersAPI) Find(ctx context.Context, filter *Filter) ([]model.User, error)
- func (a *UsersAPI) FindOne(ctx context.Context, filter *Filter) (*model.User, error)
- func (a *UsersAPI) SetMyPassword(ctx context.Context, oldPassword, newPassword string) error
- func (a *UsersAPI) SetPassword(ctx context.Context, userID, password string) error
- func (a *UsersAPI) Update(ctx context.Context, user *model.User) (*model.User, error)
- type WriteBuffer
- type WriteParams
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var DefaultRetryParams = RetryParams{
RetryInterval: 5_000,
MaxRetries: 5,
RetryBufferLimit: 50_0000,
MaxRetryInterval: 125_000,
ExponentialBase: 2,
MaxRetryTime: 180_000,
}
var DefaultWriteParams = WriteParams{ RetryParams: DefaultRetryParams, BatchSize: 5_000, MaxBatchBytes: 50_000_000, FlushInterval: 60_000, Precision: lineprotocol.Nanosecond, GzipThreshold: 1_000, }
DefaultWriteParams specifies default write param
Functions ¶
func SetRetryStrategyFactory ¶
func SetRetryStrategyFactory(f NewRetryStrategyF)
Types ¶
type AuthorizationsAPI ¶
type AuthorizationsAPI struct {
// contains filtered or unexported fields
}
AuthorizationsAPI holds methods related to authorization, as found under the /authorizations endpoint.
Example ¶
package main import ( "context" "fmt" "os" "github.com/vlastahajek/influxdb-client-go/v3/influxclient" "github.com/vlastahajek/influxdb-client-go/v3/influxclient/model" ) func main() { // Create a new client using an InfluxDB server base URL and an authentication token client, err := influxclient.New(influxclient.Params{ ServerURL: "https://eu-central-1-1.aws.cloud2.influxdata.com/", AuthToken: "my-token", // Organization: "my-org", // Organization is optional for InfluxDB Cloud }) ctx := context.Background() // Find user to grant permission user, err := client.UsersAPI().FindOne(ctx, &influxclient.Filter{Name: "user-name"}) if err != nil { panic(err) } // Find organization org, err := client.OrganizationsAPI().FindOne(ctx, &influxclient.Filter{Name: "my-org"}) if err != nil { panic(err) } // group permissions permissions := []model.Permission{ { Action: model.PermissionActionWrite, Resource: model.Resource{ Type: model.ResourceTypeBuckets, }, }, { Action: model.PermissionActionRead, Resource: model.Resource{ Type: model.ResourceTypeBuckets, }, }, } // create authorization object using info above auth := &model.Authorization{ OrgID: org.Id, Permissions: &permissions, UserID: user.Id, } // grant permission and create token authCreated, err := client.AuthorizationsAPI().Create(ctx, auth) if err != nil { panic(err) } defer client.AuthorizationsAPI().Delete(ctx, *authCreated.Id) // only for E2E tests // Use token fmt.Fprintf(os.Stderr, "\tToken: %v\n", *authCreated.Token) }
Output:
func (*AuthorizationsAPI) Create ¶
func (a *AuthorizationsAPI) Create(ctx context.Context, auth *model.Authorization) (*model.Authorization, error)
Create creates a new authorization. The returned Authorization holds the new ID.
func (*AuthorizationsAPI) Delete ¶
func (a *AuthorizationsAPI) Delete(ctx context.Context, authID string) error
Delete deletes the organization with the given ID.
func (*AuthorizationsAPI) Find ¶
func (a *AuthorizationsAPI) Find(ctx context.Context, filter *Filter) ([]model.Authorization, error)
Find returns all authorizations matching the given filter. Supported filters:
- OrgName
- OrgID
- UserName
- UserID
func (*AuthorizationsAPI) FindOne ¶
func (a *AuthorizationsAPI) FindOne(ctx context.Context, filter *Filter) (*model.Authorization, error)
FindOne returns one authorizationsmatching the given filter. Supported filters:
- OrgName
- OrgID
- UserName
- UserID
func (*AuthorizationsAPI) SetStatus ¶
func (a *AuthorizationsAPI) SetStatus(ctx context.Context, authID string, status model.AuthorizationUpdateRequestStatus) (*model.Authorization, error)
SetStatus updates authorization status.
type BucketsAPI ¶
type BucketsAPI struct {
// contains filtered or unexported fields
}
BucketsAPI provides methods for managing buckets in a InfluxDB server.
Example ¶
package main import ( "context" "github.com/vlastahajek/influxdb-client-go/v3/influxclient" "github.com/vlastahajek/influxdb-client-go/v3/influxclient/model" ) func main() { // Create a new client using an InfluxDB server base URL and an authentication token client, err := influxclient.New(influxclient.Params{ ServerURL: "https://eu-central-1-1.aws.cloud2.influxdata.com/", AuthToken: "my-token", // Organization: "my-org", // Organization is optional for InfluxDB Cloud }) // Get Buckets API client bucketsAPI := client.BucketsAPI() ctx := context.Background() // Get organization that will own new bucket org, err := client.OrganizationsAPI().FindOne(ctx, &influxclient.Filter{Name: "org-name"}) if err != nil { panic(err) } // Create bucket with 1 day retention policy bucket, err := bucketsAPI.Create(ctx, &model.Bucket{ OrgID: org.Id, Name: "bucket-sensors", RetentionRules: []model.RetentionRule{ { EverySeconds: 3600 * 24, }, }, }) if err != nil { panic(err) } defer bucketsAPI.Delete(ctx, *bucket.Id) // only for E2E tests // Update description of the bucket desc := "Bucket for sensor data" bucket.Description = &desc bucket, err = bucketsAPI.Update(ctx, bucket) if err != nil { panic(err) } }
Output:
func (*BucketsAPI) Create ¶
Create creates a new bucket with the given information. The label.Name field must be non-empty. The returned Bucket holds the ID of the new bucket.
func (*BucketsAPI) Delete ¶
func (a *BucketsAPI) Delete(ctx context.Context, bucketID string) error
Delete deletes the bucket with the given ID.
type BytesWrite ¶
BytesWrite is a function writing data to a bucket
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client implements an InfluxDB client.
Example (CustomServerAPICall) ¶
package main import ( "context" "fmt" "os" "github.com/vlastahajek/influxdb-client-go/v3/influxclient" "github.com/vlastahajek/influxdb-client-go/v3/influxclient/model" ) func main() { // This example shows how to perform custom server API invocation for any endpoint // Here we will create a DBRP mapping which allows using buckets in legacy write and query (InfluxQL) endpoints // Create client. You need an admin token for creating DBRP mapping client, err := influxclient.New(influxclient.Params{ ServerURL: "https://eu-central-1-1.aws.cloud2.influxdata.com/", AuthToken: "my-token", // Organization: "my-org", // Organization is optional for InfluxDB Cloud }) // Get generated client for server API calls apiClient := client.APIClient() ctx := context.Background() // Get a bucket we would like to query using InfluxQL bucket, err := client.BucketsAPI().FindOne(ctx, &influxclient.Filter{Name: "bucket-name"}) if err != nil { panic(err) } // Get an organization that will own the mapping org, err := client.OrganizationsAPI().FindOne(ctx, &influxclient.Filter{Name: "org-name"}) if err != nil { panic(err) } yes := true // Fill required fields of the DBRP struct dbrp := model.DBRPCreate{ BucketID: *bucket.Id, Database: bucket.Name, Default: &yes, OrgID: org.Id, RetentionPolicy: "autogen", } params := &model.PostDBRPAllParams{ Body: model.PostDBRPJSONRequestBody(dbrp), } // Call server API newDbrp, err := apiClient.PostDBRP(ctx, params) if err != nil { panic(err) } defer apiClient.DeleteDBRPID(ctx, &model.DeleteDBRPIDAllParams{ DeleteDBRPIDParams: model.DeleteDBRPIDParams{OrgID: org.Id}, DbrpID: newDbrp.Id, }) // only for E2E tests // Check generated response fmt.Fprintf(os.Stderr, "\tCreated DBRP: %#v\n", newDbrp) }
Output:
Example (New) ¶
package main import ( "github.com/vlastahajek/influxdb-client-go/v3/influxclient" ) func main() { // Create a new client using an InfluxDB server base URL and an authentication token client, err := influxclient.New(influxclient.Params{ ServerURL: "https://eu-central-1-1.aws.cloud2.influxdata.com/", AuthToken: "my-token", // Organization: "my-org", // Organization is optional for InfluxDB Cloud }) if err != nil { panic(err) } // client.Close() }
Output:
func (*Client) AuthorizationsAPI ¶
func (c *Client) AuthorizationsAPI() *AuthorizationsAPI
AuthorizationsAPI returns a value that can be used to interact with the authorization-related parts of the InfluxDB API.
func (*Client) BucketsAPI ¶
func (c *Client) BucketsAPI() *BucketsAPI
BucketsAPI returns a value that can be used to interact with the bucket-related parts of the InfluxDB API.
func (*Client) DeletePoints ¶
func (c *Client) DeletePoints(ctx context.Context, params *DeleteParams) error
DeletePoints deletes data from a bucket.
func (*Client) Health ¶
Health returns an InfluxDB server health check result. Read the HealthCheck.Status field to get server status. Health doesn't validate authentication params.
func (*Client) LabelsAPI ¶
LabelsAPI returns a value that can be used to interact with the label-related parts of the InfluxDB API.
func (*Client) OrganizationsAPI ¶
func (c *Client) OrganizationsAPI() *OrganizationsAPI
OrganizationsAPI returns a value that can be used to interact with the organization-related parts of the InfluxDB API.
func (*Client) Ping ¶
Ping checks the status and InfluxDB version of the instance. Returns an error if it is not available.
func (*Client) PointsWriter ¶
func (c *Client) PointsWriter(bucket string) *PointsWriter
PointsWriter returns a PointsWriter value that support fast asynchronous writing of points to Influx. All the points are written into the given bucket.
The returned PointsWriter must be closed after use to release resources and flush any buffered points.
Example ¶
package main import ( "fmt" "math/rand" "time" "github.com/vlastahajek/influxdb-client-go/v3/influxclient" ) func main() { wp := influxclient.DefaultWriteParams // Set batch size to write 100 points in 2 batches wp.BatchSize = 50 // Set callback for failed writes wp.WriteFailed = func(err error, lines []byte, attempt int, expires time.Time) bool { fmt.Println("Write failed", err) return true } // Create client with custom WriteParams client, err := influxclient.New(influxclient.Params{ ServerURL: "https://eu-central-1-1.aws.cloud2.influxdata.com/", AuthToken: "my-token", // Organization: "my-org", // Organization is optional for InfluxDB Cloud WriteParams: wp, }) if err != nil { panic(err) } // client.Close() have to be called to clean http connections defer client.Close() // Get async writer writer := client.PointsWriter("my-bucket") // writer.Close() MUST be called at the end to ensure completing background operations and cleaning resources defer writer.Close() // write some points for i := 0; i < 100; i++ { // create point p := influxclient.NewPointWithMeasurement("stat"). AddTag("id", fmt.Sprintf("rack_%v", i%10)). AddTag("vendor", "AWS"). AddTag("hostname", fmt.Sprintf("host_%v", i%100)). AddField("temperature", rand.Float64()*80.0). AddField("disk_free", rand.Float64()*1000.0). AddField("disk_total", (i/10+1)*1000000). AddField("mem_total", (i/100+1)*10000000). AddField("mem_free", rand.Uint64()). SetTimestamp(time.Now()) // write asynchronously writer.WritePoints(p) } }
Output:
func (*Client) Query ¶
func (c *Client) Query(ctx context.Context, query string, queryParams interface{}) (*QueryResultReader, error)
Query sends the given Flux query to server and returns QueryResultReader for further parsing result. The result must be closed after use.
Flux query can contain reference to parameters, which must be passed via queryParams. it can be a struct or map. Param values can be only simple types or time.Time. Name of a struct field or a map key (must be a string) will be a param name.
Fields of a struct can be more specified by json annotations:
type Condition struct { Start time.Time `json:"start"` Field string `json:"field"` Value float64 `json:"value"` } cond := Condition { "iot_center", "Temperature", "-10m", 23.0, }
Parameters are then accessed via the params object:
query:= `from(bucket: "environment") |> range(start: time(v: params.start)) |> filter(fn: (r) => r._measurement == "air") |> filter(fn: (r) => r._field == params.field) |> filter(fn: (r) => r._value > params.value)`
And used in the call to Query:
result, err := client.Query(ctx, query, cond);
Use QueryResultReader.NextSection() for navigation to the sections in the query result set.
Use QueryResultReader.NextRow() for iterating over rows in the section.
Read the row raw data using QueryResultReader.Row() or deserialize data into a struct or a slice via QueryResultReader.Decode:
val := &struct { Time time.Time `csv:"_time"` Temp float64 `csv:"_value"` Sensor string `csv:"sensor"` }{} err = result.Decode(val)
Example ¶
package main import ( "context" "fmt" "os" "text/tabwriter" "time" "github.com/vlastahajek/influxdb-client-go/v3/influxclient" ) func main() { // Create client client, err := influxclient.New(influxclient.Params{ ServerURL: "https://eu-central-1-1.aws.cloud2.influxdata.com/", AuthToken: "my-token", // Organization: "my-org", // Organization is optional for InfluxDB Cloud }) if err != nil { panic(err) } defer client.Close() // Define query parameters params := struct { Since string `json:"since"` GreaterThan float64 `json:"greaterThan"` }{ "-10m", 23.0, } // Prepare a query query := `from(bucket: "iot_center") |> range(start: duration(v: params.since)) |> filter(fn: (r) => r._measurement == "environment") |> filter(fn: (r) => r._field == "Temperature") |> filter(fn: (r) => r._value > params.greaterThan)` // Execute query res, err := client.Query(context.Background(), query, params) if err != nil { panic(err) } // Make sure query result is always closed defer res.Close() // Declare custom type for data val := &struct { Time time.Time `csv:"_time"` Temp float64 `csv:"_value"` Sensor string `csv:"sensor"` }{} tw := tabwriter.NewWriter(os.Stdout, 10, 4, 2, ' ', 0) fmt.Fprintf(tw, "Time\tTemp\tSensor\n") // Iterate over result set for res.NextSection() { for res.NextRow() { err = res.Decode(val) if err != nil { fmt.Fprintf(tw, "%v\n", err) continue } fmt.Fprintf(tw, "%s\t%.2f\t%s\n", val.Time.String(), val.Temp, val.Sensor) } } tw.Flush() if res.Err() != nil { panic(res.Err()) } }
Output:
func (*Client) Ready ¶
Ready checks that the server is ready, and reports the duration the instance has been up if so. It does not validate authentication parameters. See https://docs.influxdata.com/influxdb/v2.0/api/#operation/GetReady.
func (*Client) TasksAPI ¶
TasksAPI returns a value that can be used to interact with the task-related parts of the InfluxDB API.
func (*Client) UsersAPI ¶
UsersAPI returns a value that can be used to interact with the user-related parts of the InfluxDB API.
func (*Client) Write ¶
Write writes line protocol record(s) to the server into the given bucket. Multiple records must be separated by the new line character (\n) Data are written synchronously. For a higher throughput API that buffers individual points and writes them asynchronously, use the PointsWriter method.
Example ¶
package main import ( "context" "fmt" "time" "github.com/vlastahajek/influxdb-client-go/v3/influxclient" ) func main() { // Create client client, err := influxclient.New(influxclient.Params{ ServerURL: "https://eu-central-1-1.aws.cloud2.influxdata.com/", AuthToken: "my-token", // Organization: "my-org", // Organization is optional for InfluxDB Cloud }) if err != nil { panic(err) } // Close client at the end defer client.Close() // Custom data record sensorData := struct { ID string Temperature float64 Humidity int }{"1012", 22.3, 55} // Create a line protocol from record line := fmt.Sprintf("air,device_id=%v,sensor=SHT31 humidity=%di,temperature=%f %d\n", sensorData.ID, sensorData.Humidity, sensorData.Temperature, time.Now().UnixNano()) // Write data err = client.Write(context.Background(), "my-bucket", []byte(line)) if err != nil { panic(err) } }
Output:
func (*Client) WriteData ¶
WriteData encodes fields of custom points into line protocol and writes line protocol record(s) to the server into the given bucket. Each custom point must be annotated with 'lp' prefix and values measurement,tag, field or timestamp. Valid point must contain measurement and at least one field.
A field with timestamp must be of a type time.Time
type TemperatureSensor struct { Measurement string `lp:"measurement"` Sensor string `lp:"tag,sensor"` ID string `lp:"tag,device_id"` Temp float64 `lp:"field,temperature"` Hum int `lp:"field,humidity"` Time time.Time `lp:"timestamp"` Description string `lp:"-"` }
The points are written synchronously. For a higher throughput API that buffers individual points and writes them asynchronously, use the PointsWriter method.
Example ¶
package main import ( "context" "time" "github.com/vlastahajek/influxdb-client-go/v3/influxclient" ) func main() { // Create client client, err := influxclient.New(influxclient.Params{ ServerURL: "https://eu-central-1-1.aws.cloud2.influxdata.com/", AuthToken: "my-token", // Organization: "my-org", // Organization is optional for InfluxDB Cloud }) if err != nil { panic(err) } // Close client at the end defer client.Close() sensorData := struct { Table string `lp:"measurement"` Sensor string `lp:"tag,sensor"` ID string `lp:"tag,device_id"` Temperature float64 `lp:"field,temperature"` Humidity int `lp:"field,humidity"` Time time.Time `lp:"timestamp"` }{"air", "SHT31", "1012", 22.3, 55, time.Now()} // Write point err = client.WriteData(context.Background(), "my-bucket", sensorData) if err != nil { panic(err) } }
Output:
func (*Client) WritePoints ¶
WritePoints writes all the given points to the server into the given bucket. The points are written synchronously. For a higher throughput API that buffers individual points and writes them asynchronously, use the PointsWriter method.
Example ¶
package main import ( "context" "time" "github.com/vlastahajek/influxdb-client-go/v3/influxclient" ) func main() { // Create client client, err := influxclient.New(influxclient.Params{ ServerURL: "https://eu-central-1-1.aws.cloud2.influxdata.com/", AuthToken: "my-token", // Organization: "my-org", // Organization is optional for InfluxDB Cloud }) if err != nil { panic(err) } // Close client at the end defer client.Close() // Create a point sensorData := influxclient.NewPointWithMeasurement("air").SetTimestamp(time.Now()) // Add tag sensorData.AddTag("sensor", "1012") // Add fields sensorData.AddField("temperature", 22.3).AddField("humidity", 55) // Write point err = client.WritePoints(context.Background(), "my-bucket", sensorData) if err != nil { panic(err) } }
Output:
type DeleteParams ¶
type DeleteParams struct { // Bucket holds bucket name. Bucket string // BucketID holds bucket ID. BucketID string // Org holds organization name. Org string // OrgID holds organization ID. OrgID string // Predicate is an expression in delete predicate syntax. Predicate string // Start is the earliest time to delete from. Start time.Time // Stop is the latest time to delete from. Stop time.Time }
DeleteParams holds options for DeletePoints.
type Field ¶
type Field struct { Key string Value lineprotocol.Value }
Field holds the keys and values for a bunch of Metric Field k/v pairs where Value can be a uint64, int64, int, float32, float64, string, or bool.
type Filter ¶
type Filter struct { // Filter by resource ID ID string // Filter by resource name Name string // Filter by organization name OrgName string // Filter by organization ID OrgID string // Filter by user ID UserID string // Filter by user name UserName string // Filter by status Status string // Maximum number of returned entities in a single request Limit uint // Starting offset for returning entities Offset uint // After specifies that the search results should start after the item of given ID. After string // BeforeTime selects items to those scheduled before this time. BeforeTime time.Time // AfterTime selects items to those scheduled after this time. AfterTime time.Time }
The Filter can be used to select the entities returned by the Find operations. Not all filter kinds are supported by all endpoints: see the individual API documentation for details.
Note that Filter also provides support for paging (using Offset and Limit) - this is only useful with filters that might return more than one entity.
For example, to a maximum of 10 buckets that have find all buckets that are associated with the organization "example.com" starting 40 entries into the results:
client.BucketsAPI().Find(ctx context.Context, &influxapi.Filter{ OrgName: "example.com", Limit: 10, Offset: 40, })
type LabelsAPI ¶
type LabelsAPI struct {
// contains filtered or unexported fields
}
LabelsAPI provides methods for managing labels in a InfluxDB server.
Example ¶
package main import ( "context" "github.com/vlastahajek/influxdb-client-go/v3/influxclient" "github.com/vlastahajek/influxdb-client-go/v3/influxclient/model" ) func main() { // Create a new client using an InfluxDB server base URL and an authentication token client, err := influxclient.New(influxclient.Params{ ServerURL: "https://eu-central-1-1.aws.cloud2.influxdata.com/", AuthToken: "my-token", // Organization: "my-org", // Organization is optional for InfluxDB Cloud }) // Get Labels API client labelsAPI := client.LabelsAPI() ctx := context.Background() // Get organization that will own label org, err := client.OrganizationsAPI().FindOne(ctx, &influxclient.Filter{Name: "org-name"}) if err != nil { panic(err) } labelName := "Active State" props := map[string]string{"color": "33ffdd", "description": "Marks org active"} label, err := labelsAPI.Create(ctx, &model.Label{ OrgID: org.Id, Name: &labelName, Properties: &model.Label_Properties{ AdditionalProperties: props, }, }) if err != nil { panic(err) } defer labelsAPI.Delete(ctx, *label.Id) // only for E2E tests // Change color property label.Properties.AdditionalProperties = map[string]string{"color": "ff1122"} label, err = labelsAPI.Update(ctx, label) if err != nil { panic(err) } }
Output:
func (*LabelsAPI) Create ¶
Create creates a new label with the given information. The label.Name field must be non-empty. The returned Label holds the ID of the new label.
func (*LabelsAPI) FindOne ¶
FindOne returns one label matching the given filter. Supported filters:
- OrgID
type NewRetryStrategyF ¶
type NewRetryStrategyF func(params RetryParams) RetryStrategy
NewRetryStrategyF factory function creates a new RetryStrategy
func GetRetryStrategyFactory ¶
func GetRetryStrategyFactory() NewRetryStrategyF
type OnRemoveCallback ¶
OnRemoveCallback is callback used when inform about removed batch from retry buffer Params:
lines - lines that were skipped retryCountDown - number of remaining attempts expires - time when batch expires
type OrganizationsAPI ¶
type OrganizationsAPI struct {
// contains filtered or unexported fields
}
OrganizationsAPI holds methods related to organization, as found under the /orgs endpoint.
Example ¶
package main import ( "context" "github.com/vlastahajek/influxdb-client-go/v3/influxclient" "github.com/vlastahajek/influxdb-client-go/v3/influxclient/model" ) func main() { // Create a new client using an InfluxDB server base URL and an authentication token client, err := influxclient.New(influxclient.Params{ ServerURL: "https://eu-central-1-1.aws.cloud2.influxdata.com/", AuthToken: "my-token", // Organization: "my-org", // Organization is optional for InfluxDB Cloud }) // Get Organizations API client orgAPI := client.OrganizationsAPI() ctx := context.Background() // Create new organization org, err := orgAPI.Create(ctx, &model.Organization{Name: "org-name-2"}) if err != nil { panic(err) } defer orgAPI.Delete(ctx, *org.Id) // only for E2E tests orgDescription := "My second org" org.Description = &orgDescription org, err = orgAPI.Update(ctx, org) if err != nil { panic(err) } // Create new user to add to org newUser, err := client.UsersAPI().Create(ctx, &model.User{Name: "user-name-2"}) if err != nil { panic(err) } defer client.UsersAPI().Delete(ctx, *newUser.Id) // only for E2E tests // Add new user to organization err = orgAPI.AddMember(ctx, *org.Id, *newUser.Id) if err != nil { panic(err) } }
Output:
func (*OrganizationsAPI) AddMember ¶
func (o *OrganizationsAPI) AddMember(ctx context.Context, orgID, userID string) error
AddMember adds the user with the given ID to the organization with the given ID.
func (*OrganizationsAPI) AddOwner ¶
func (o *OrganizationsAPI) AddOwner(ctx context.Context, orgID, userID string) error
AddOwner adds an owner with the given userID to the organization with the given id.
func (*OrganizationsAPI) Create ¶
func (o *OrganizationsAPI) Create(ctx context.Context, org *model.Organization) (*model.Organization, error)
Create creates a new organization. The returned Organization holds the new ID.
func (*OrganizationsAPI) Delete ¶
func (o *OrganizationsAPI) Delete(ctx context.Context, orgID string) error
Delete deletes the organization with the given ID.
func (*OrganizationsAPI) Find ¶
func (o *OrganizationsAPI) Find(ctx context.Context, filter *Filter) ([]model.Organization, error)
Find returns all organizations matching the given filter. Supported filters:
- OrgName
- OrgID
- UserID
func (*OrganizationsAPI) FindOne ¶
func (o *OrganizationsAPI) FindOne(ctx context.Context, filter *Filter) (*model.Organization, error)
FindOne returns one organization matching the given filter. Supported filters:
- OrgName
- OrgID
- UserID
func (*OrganizationsAPI) Members ¶
func (o *OrganizationsAPI) Members(ctx context.Context, orgID string) ([]model.ResourceMember, error)
Members returns all members of the organization with the given ID.
func (*OrganizationsAPI) Owners ¶
func (o *OrganizationsAPI) Owners(ctx context.Context, orgID string) ([]model.ResourceOwner, error)
Owners returns all the owners of the organization with the given id.
func (*OrganizationsAPI) RemoveMember ¶
func (o *OrganizationsAPI) RemoveMember(ctx context.Context, orgID, userID string) error
RemoveMember removes the user with the given ID from the organization with the given ID.
func (*OrganizationsAPI) RemoveOwner ¶
func (o *OrganizationsAPI) RemoveOwner(ctx context.Context, orgID, userID string) error
RemoveOwner Remove removes the user with the given userID from the organization with the given id.
func (*OrganizationsAPI) Update ¶
func (o *OrganizationsAPI) Update(ctx context.Context, org *model.Organization) (*model.Organization, error)
Update updates information about the organization. The org.ID field must hold the ID of the organization to be changed.
type Params ¶
type Params struct { // ServerURL holds the URL of the InfluxDB server to connect to. // This must be non-empty. E.g. http://localhost:8086 ServerURL string // AuthToken holds the authorization token for the API. // This can be obtained through the GUI web browser interface. AuthToken string // Organization is name or ID of organization where data (buckets, users, tasks, etc.) belongs to // Optional for InfluxDB Cloud Organization string // HTTPClient is used to make API requests. // // This can be used to specify a custom TLS configuration // (TLSClientConfig), a custom request timeout (Timeout), // or other customization as required. // // It HTTPClient is nil, http.DefaultClient will be used. HTTPClient *http.Client // Write Params WriteParams WriteParams }
Params holds the parameters for creating a new client. The only mandatory field is ServerURL. AuthToken is also important if authentication was not done outside this client.
type Point ¶
Point is represents InfluxDB time series point, holding tags and fields
func NewPoint ¶
func NewPoint(measurement string, tags map[string]string, fields map[string]interface{}, ts time.Time) *Point
NewPoint is a convenient function for creating a Point from measurement name, tags, fields and a timestamp.
func NewPointWithMeasurement ¶
NewPointWithMeasurement is a convenient function for creating a Point from measurement name for later adding data
func (*Point) MarshalBinary ¶
func (m *Point) MarshalBinary(precision lineprotocol.Precision) ([]byte, error)
func (*Point) SetTimestamp ¶
SetTimestamp is helper function for complete fluent interface
func (*Point) SortFields ¶
SortFields orders the fields of a point alphanumerically by key.
type PointsWriter ¶
type PointsWriter struct {
// contains filtered or unexported fields
}
PointsWriter is asynchronous writer with automated batching and retrying capabilities. It is parametrized by the WriteParams. It is obtained using the Client.PointsWriter() Use Write, WriteData or WritePoints for sending data Any error encountered during asynchronous processing is reported by WriteParams.WriteFailed callback. It must be created using NewPointsWriter All functions are thread-safe and can be used from different go-routines.
func NewPointsWriter ¶
func NewPointsWriter(writer BytesWrite, bucket string, params WriteParams) *PointsWriter
NewPointsWriter creates fast asynchronous PointsWriter writing to a bucket using given writer according the params
func (*PointsWriter) Close ¶
func (p *PointsWriter) Close()
Close stops internal routines and closes resources Must be called by user at the end
func (*PointsWriter) Flush ¶
func (p *PointsWriter) Flush()
Flush asynchronously flushes write buffer. This enforces sending data on demand, even when flush conditions (batch size, flush interval, max batch bytes) are not met.
func (*PointsWriter) Write ¶
func (p *PointsWriter) Write(line []byte)
Write asynchronously writes line protocol record(s) to the server. Multiple records must be separated by the new line character (\n).
func (*PointsWriter) WriteData ¶
func (p *PointsWriter) WriteData(points ...interface{})
WriteData asynchronously encodes fields of custom points into line protocol and writes line protocol record(s) to the server into the given bucket. Any error encountered during asynchronous processing is reported by WriteParams.WriteFailed callback. Each custom point must be annotated with 'lp' prefix and values measurement,tag, field or timestamp. Valid point must contain measurement and at least one field.
A field with timestamp must be of a type time.Time
type TemperatureSensor struct { Measurement string `lp:"measurement"` Sensor string `lp:"tag,sensor"` ID string `lp:"tag,device_id"` Temp float64 `lp:"field,temperature"` Hum int `lp:"field,humidity"` Time time.Time `lp:"timestamp"` Description string `lp:"-"` }
func (*PointsWriter) WritePoints ¶
func (p *PointsWriter) WritePoints(points ...*Point)
WritePoints asynchronously writes all the given points to the server. Any error encountered during encoding points is reported by WriteParams.WriteFailed callback.
type QueryError ¶
type QueryError struct { // Message is a Flux query error message Message string `csv:"error"` // Code is an Flux query error code Code int64 `csv:"reference"` }
QueryError defines the information of Flux query error
func (*QueryError) Error ¶
func (e *QueryError) Error() string
type QueryResultReader ¶
type QueryResultReader struct { *annotatedcsv.Reader // contains filtered or unexported fields }
QueryResultReader enhances annotatedcsv.Reader by allowing NextRow to go straight to the first data line and by treating error section as an error. QueryResultReader must be closed by calling Close at the end of reading.
func NewQueryResultReader ¶
func NewQueryResultReader(r io.ReadCloser) *QueryResultReader
NewQueryResultReader returns new QueryResultReader for parsing Flux query result stream.
func (*QueryResultReader) Close ¶
func (r *QueryResultReader) Close() error
Close closes the underlying reader
func (*QueryResultReader) Err ¶
func (r *QueryResultReader) Err() error
Err returns any error encountered when parsing.
func (*QueryResultReader) NextRow ¶
func (r *QueryResultReader) NextRow() bool
NextRow is like annotatedcsv.Reader.NextRow except if it is called in the beginning it advances to the first section. When an error section is encountered, NextRow will return false and Err will return the error.
func (*QueryResultReader) NextSection ¶
func (r *QueryResultReader) NextSection() bool
NextSection is like annotatedcsv.Reader.NextSection except that it treats error sections as a terminating section. When an error section is encountered, NextSection will return false and Err will return the error.
type RetryBuffer ¶
type RetryBuffer struct {
// contains filtered or unexported fields
}
func NewRetryBuffer ¶
func NewRetryBuffer(maxSize int, retryLines RetryLines, onRemove OnRemoveCallback) *RetryBuffer
func (*RetryBuffer) AddLines ¶
func (r *RetryBuffer) AddLines(lines []byte, retryCount, delay int, expires time.Time)
func (*RetryBuffer) Close ¶
func (r *RetryBuffer) Close() int
func (*RetryBuffer) Flush ¶
func (r *RetryBuffer) Flush()
type RetryParams ¶
type RetryParams struct { // Default retry interval in ms, if not sent by server. Default 5,000. RetryInterval int // Maximum count of retry attempts of failed writes, default 5. MaxRetries int // Maximum number of points to keep for retry. Should be multiple of BatchSize. Default 50,000. RetryBufferLimit int // The maximum delay between each retry attempt in milliseconds, default 125,000. MaxRetryInterval int // The maximum total retry timeout in milliseconds, default 180,000. MaxRetryTime int // The base for the exponential retry delay ExponentialBase int // Max random value added to the retry delay in milliseconds, default 200 RetryJitter int }
RetryParams configures retry behavior used by PointsWriter
type RetryStrategy ¶
type RetryStrategy interface { // NextDelay returns delay for a next retry // - error - reason for retrying // - failedAttempts - a count of already failed attempts, 1 being the first // Returns milliseconds to wait before retrying NextDelay(err error, failedAttempts int) int // Success implementation should reset its state, this is mandatory to call upon success Success() }
RetryStrategy is a strategy for calculating retry delays.
func NewDefaultRetryStrategy ¶
func NewDefaultRetryStrategy(params RetryParams) RetryStrategy
type ServerError ¶
type ServerError struct { // Code holds the Influx error code, or empty if the code is unknown. Code string `json:"code"` // Message holds the error message. Message string `json:"message"` // StatusCode holds the HTTP response status code. StatusCode int `json:"-"` // RetryAfter holds the value of Retry-After header if sent by server, otherwise zero RetryAfter int `json:"-"` }
ServerError holds InfluxDB server error info ServerError represents an error returned from an InfluxDB API server.
func NewServerError ¶
func NewServerError(message string) *ServerError
NewServerError returns new with just a message
type TasksAPI ¶
type TasksAPI struct {
// contains filtered or unexported fields
}
TasksAPI holds methods related to organization, as found under the /tasks endpoint.
Example ¶
package main import ( "context" "fmt" "os" "time" "github.com/vlastahajek/influxdb-client-go/v3/influxclient" "github.com/vlastahajek/influxdb-client-go/v3/influxclient/model" ) func main() { // Create a new client using an InfluxDB server base URL and an authentication token client, err := influxclient.New(influxclient.Params{ ServerURL: "https://eu-central-1-1.aws.cloud2.influxdata.com/", AuthToken: "my-token", // Organization: "my-org", // Organization is optional for InfluxDB Cloud }) // Get Delete API client tasksAPI := client.TasksAPI() ctx := context.Background() // Get organization that will own task org, err := client.OrganizationsAPI().FindOne(ctx, &influxclient.Filter{Name: "org-name"}) if err != nil { panic(err) } // task flux script from https://www.influxdata.com/blog/writing-tasks-and-setting-up-alerts-for-influxdb-cloud/ flux := `fruitCollected = from(bucket: "farming") |> range(start: -task.every) |> filter(fn: (r) => r["_measurement"] == "totalFruitsCollected") |> filter(fn: (r) => r["_field"] == "fruits") |> group(columns: ["farmName"]) |> aggregateWindow(fn: sum, every: task.every) |> map(fn: (r) => ({ _time: r._time, _stop: r._stop, _start: r._start, _measurement: "fruitCollectionRate", _field: "fruits", _value: r._value, farmName: r.farmName, })) fruitCollected |> to(bucket: "farming") ` every := "1h" task, err := tasksAPI.Create(ctx, &model.Task{ OrgID: *org.Id, Name: "fruitCollectedRate", Flux: flux, Every: &every, }) if err != nil { panic(err) } defer tasksAPI.Delete(ctx, task.Id) // only for E2E tests // Force running a task run, err := tasksAPI.RunManually(ctx, task.Id) if err != nil { panic(err) } // Print run info fmt.Fprint(os.Stderr, "\tForced run scheduled for ", *run.ScheduledFor, " with status ", *run.Status, "\n") //wait for tasks to start and be running <-time.After(1500 * time.Millisecond) // Find logs logs, err := tasksAPI.FindRunLogs(ctx, task.Id, *run.Id) if err != nil { panic(err) } // Print logs fmt.Fprintln(os.Stderr, "\tLogs:") for _, logEvent := range logs { fmt.Fprint(os.Stderr, "\t Time:", *logEvent.Time, ", Message: ", *logEvent.Message, "\n") } }
Output:
func (*TasksAPI) AddMember ¶
AddMember adds the user with the given ID to the task with the given ID.
func (*TasksAPI) AddOwner ¶
AddOwner adds an owner with the given userID to the task with the given id.
func (*TasksAPI) Create ¶
Create creates a new task according the the task object. Set OrgId, Name, Description, Flux, Status and Every or Cron properties. Every and Cron are mutually exclusive. Every has higher priority.
func (*TasksAPI) Find ¶
Find returns all tasks matching the given filter. Supported filters:
After Name OrgName OrgID UserName Status Limit
func (*TasksAPI) FindLabels ¶
FindLabels retrieves labels of a task with given ID.
func (*TasksAPI) FindOne ¶
FindOne returns one task matching the given filter. Supported filters:
After Name OrgName OrgID UserName Status Limit
func (*TasksAPI) FindOneRun ¶
func (a *TasksAPI) FindOneRun(ctx context.Context, taskID string, filter *Filter) (*model.Run, error)
FindOneRun returns one task run that matches the given filter. Supported filters:
ID
TODO or just pass runID instead of a filter?
func (*TasksAPI) FindRunLogs ¶
FindRunLogs return all log events for a task run with given ID.
func (*TasksAPI) FindRuns ¶
func (a *TasksAPI) FindRuns(ctx context.Context, taskID string, filter *Filter) ([]model.Run, error)
FindRuns returns a task runs according the filter. Supported filters:
After AfterTime BeforeTime Limit
func (*TasksAPI) RemoveLabel ¶
RemoveLabel removes a label with given ID from a task with given ID.
func (*TasksAPI) RemoveMember ¶
RemoveMember removes the user with the given ID from the task with the given ID.
func (*TasksAPI) RemoveOwner ¶
RemoveOwner removes the user with the given userID from the task with the given id.
func (*TasksAPI) RunManually ¶
RunManually manually start a run of a task with given ID now, overriding the current schedule.
type UsersAPI ¶
type UsersAPI struct {
// contains filtered or unexported fields
}
UsersAPI holds methods related to user, as found under the /users endpoint.
Example ¶
package main import ( "context" "github.com/vlastahajek/influxdb-client-go/v3/influxclient" "github.com/vlastahajek/influxdb-client-go/v3/influxclient/model" ) func main() { // Create a new client using an InfluxDB server base URL and an authentication token client, err := influxclient.New(influxclient.Params{ ServerURL: "https://eu-central-1-1.aws.cloud2.influxdata.com/", AuthToken: "my-token", // Organization: "my-org", // Organization is optional for InfluxDB Cloud }) ctx := context.Background() // Find organization org, err := client.OrganizationsAPI().FindOne(ctx, &influxclient.Filter{Name: "org-name"}) if err != nil { panic(err) } // Get users API client usersAPI := client.UsersAPI() // Create new user user, err := usersAPI.Create(ctx, &model.User{Name: "user-01"}) if err != nil { panic(err) } defer usersAPI.Delete(ctx, *user.Id) // only for E2E tests // Set user password err = usersAPI.SetPassword(ctx, *user.Id, "pass-at-least-8-chars") if err != nil { panic(err) } // Add user to organization err = client.OrganizationsAPI().AddMember(ctx, *org.Id, *user.Id) if err != nil { panic(err) } }
Output:
func (*UsersAPI) Find ¶
Find returns all users matching the given filter. Supported filters:
- ID
- Name
func (*UsersAPI) FindOne ¶
FindOne returns one user matching the given filter. Supported filters:
- ID
- Name
func (*UsersAPI) SetMyPassword ¶
SetMyPassword sets the password associated with the current user. The oldPassword parameter must match the previously set password for the user.
func (*UsersAPI) SetPassword ¶
SetPassword sets the password for the user with the given ID.
type WriteBuffer ¶
type WriteBuffer struct {
// contains filtered or unexported fields
}
WriteBuffer stores lines after line and flushes batch when maxLength (bach size) is reached or maxBytes exceeds
func (*WriteBuffer) Add ¶
func (w *WriteBuffer) Add(line []byte)
func (*WriteBuffer) Flush ¶
func (w *WriteBuffer) Flush()
func (*WriteBuffer) Reset ¶
func (w *WriteBuffer) Reset() []byte
type WriteParams ¶
type WriteParams struct { RetryParams // Maximum number of points sent to server in single request, used by PointsWriter. Default 5000 BatchSize int // Maximum size of batch in bytes, used by PointsWriter. Default 50_000_000. MaxBatchBytes int // Interval, in ms, used by PointsWriter, in which is buffer flushed if it has not been already written (by reaching batch size) . Default 1000ms FlushInterval int // Precision to use in writes for timestamp. // Default lineprotocol.Nanosecond Precision lineprotocol.Precision // Tags added to each point during writing. If a point already has a tag with the same key, it is left unchanged. DefaultTags map[string]string // Write body larger than the threshold is gzipped. 0 to don't gzip at all GzipThreshold int // WriteFailed is called to inform about an error occurred during writing procedure. // It can be called when point encoding fails, sending batch over network fails or batch expires during retrying. // Params: // error - write error. // lines - failed batch of lines. nil in case of error occur before sending, e.g. in case of conversion error // attempt - count of already failed attempts to write the lines (1 ... maxRetries+1). 0 if error occur before sending, e.g. in case of conversion error // expires - expiration time for the lines to be retried in millis since epoch. 0 if error occur before sending, e.g. in case of conversion error // Returns true to continue using default retry mechanism (applies only in case of write of an error) WriteFailed func(err error, lines []byte, attempt int, expires time.Time) bool // WriteRetrySkipped is informed about lines that were removed from the retry buffer // to keep the size of the retry buffer under the configured limit (maxBufferLines). WriteRetrySkipped OnRemoveCallback }
WriteParams holds configuration properties for write