Documentation ¶
Overview ¶
Package api provides clients for InfluxDB server APIs.
Index ¶
- func DataToPoint(x interface{}) (*write.Point, error)
- func DefaultDialect() *domain.Dialect
- type AuthorizationsAPI
- type BucketsAPI
- type DeleteAPI
- type LabelsAPI
- type OrganizationsAPI
- type Paging
- type PagingOption
- type QueryAPI
- type QueryTableResult
- func (q *QueryTableResult) Close() error
- func (q *QueryTableResult) Err() error
- func (q *QueryTableResult) Next() bool
- func (q *QueryTableResult) Record() *query.FluxRecord
- func (q *QueryTableResult) TableChanged() bool
- func (q *QueryTableResult) TableMetadata() *query.FluxTableMetadata
- func (q *QueryTableResult) TablePosition() int
- type RunFilter
- type TaskFilter
- type TasksAPI
- type UsersAPI
- type WriteAPI
- type WriteAPIBlocking
- type WriteAPIImpl
- type WriteFailedCallback
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DataToPoint ¶ added in v2.13.0
DataToPoint converts custom point structures into a Point. Each visible field of the point on input must be annotated with 'lp' prefix and values measurement,tag, field or timestamp. Valid point must contain measurement and at least one field.
A field with timestamp must be of a type time.Time
type TemperatureSensor struct { Measurement string `lp:"measurement"` Sensor string `lp:"tag,sensor"` ID string `lp:"tag,device_id"` Temp float64 `lp:"field,temperature"` Hum int `lp:"field,humidity"` Time time.Time `lp:"timestamp,temperature"` Description string `lp:"-"` }
func DefaultDialect ¶
DefaultDialect return flux query Dialect with full annotations (datatype, group, default), header and comma char as a delimiter
Types ¶
type AuthorizationsAPI ¶
type AuthorizationsAPI interface { // GetAuthorizations returns all authorizations GetAuthorizations(ctx context.Context) (*[]domain.Authorization, error) // FindAuthorizationsByUserName returns all authorizations for given userName FindAuthorizationsByUserName(ctx context.Context, userName string) (*[]domain.Authorization, error) // FindAuthorizationsByUserID returns all authorizations for given userID FindAuthorizationsByUserID(ctx context.Context, userID string) (*[]domain.Authorization, error) // FindAuthorizationsByOrgName returns all authorizations for given organization name FindAuthorizationsByOrgName(ctx context.Context, orgName string) (*[]domain.Authorization, error) // FindAuthorizationsByOrgID returns all authorizations for given organization id FindAuthorizationsByOrgID(ctx context.Context, orgID string) (*[]domain.Authorization, error) // CreateAuthorization creates new authorization CreateAuthorization(ctx context.Context, authorization *domain.Authorization) (*domain.Authorization, error) // CreateAuthorizationWithOrgID creates new authorization with given permissions scoped to given orgID CreateAuthorizationWithOrgID(ctx context.Context, orgID string, permissions []domain.Permission) (*domain.Authorization, error) // UpdateAuthorizationStatus updates status of authorization UpdateAuthorizationStatus(ctx context.Context, authorization *domain.Authorization, status domain.AuthorizationUpdateRequestStatus) (*domain.Authorization, error) // UpdateAuthorizationStatusWithID updates status of authorization with authID UpdateAuthorizationStatusWithID(ctx context.Context, authID string, status domain.AuthorizationUpdateRequestStatus) (*domain.Authorization, error) // DeleteAuthorization deletes authorization DeleteAuthorization(ctx context.Context, authorization *domain.Authorization) error // DeleteAuthorization deletes authorization with authID DeleteAuthorizationWithID(ctx context.Context, authID string) error }
AuthorizationsAPI provides methods for organizing Authorization in a InfluxDB server
Example ¶
package main import ( "context" "fmt" "github.com/influxdata/influxdb-client-go/v2/domain" influxdb2 "github.com/influxdata/influxdb-client-go/v2/internal/examples" ) func main() { // Create a new client using an InfluxDB server base URL and an authentication token client := influxdb2.NewClient("http://localhost:8086", "my-token") // Find user to grant permission user, err := client.UsersAPI().FindUserByName(context.Background(), "user-01") if err != nil { panic(err) } // Find organization org, err := client.OrganizationsAPI().FindOrganizationByName(context.Background(), "my-org") if err != nil { panic(err) } // create write permission for buckets permissionWrite := &domain.Permission{ Action: domain.PermissionActionWrite, Resource: domain.Resource{ Type: domain.ResourceTypeBuckets, }, } // create read permission for buckets permissionRead := &domain.Permission{ Action: domain.PermissionActionRead, Resource: domain.Resource{ Type: domain.ResourceTypeBuckets, }, } // group permissions permissions := []domain.Permission{*permissionWrite, *permissionRead} // create authorization object using info above auth := &domain.Authorization{ OrgID: org.Id, Permissions: &permissions, UserID: user.Id, } // grant permission and create token authCreated, err := client.AuthorizationsAPI().CreateAuthorization(context.Background(), auth) if err != nil { panic(err) } // Use token fmt.Println("Token: ", *authCreated.Token) // Ensures background processes finishes client.Close() }
Output:
func NewAuthorizationsAPI ¶
func NewAuthorizationsAPI(apiClient *domain.Client) AuthorizationsAPI
NewAuthorizationsAPI creates new instance of AuthorizationsAPI
type BucketsAPI ¶
type BucketsAPI interface { // GetBuckets returns all buckets. // GetBuckets supports PagingOptions: Offset, Limit, After. Empty pagingOptions means the default paging (first 20 results). GetBuckets(ctx context.Context, pagingOptions ...PagingOption) (*[]domain.Bucket, error) // FindBucketByName returns a bucket found using bucketName. FindBucketByName(ctx context.Context, bucketName string) (*domain.Bucket, error) // FindBucketByID returns a bucket found using bucketID. FindBucketByID(ctx context.Context, bucketID string) (*domain.Bucket, error) // FindBucketsByOrgID returns buckets belonging to the organization with ID orgID. // FindBucketsByOrgID supports PagingOptions: Offset, Limit, After. Empty pagingOptions means the default paging (first 20 results). FindBucketsByOrgID(ctx context.Context, orgID string, pagingOptions ...PagingOption) (*[]domain.Bucket, error) // FindBucketsByOrgName returns buckets belonging to the organization with name orgName, with the specified paging. Empty pagingOptions means the default paging (first 20 results). FindBucketsByOrgName(ctx context.Context, orgName string, pagingOptions ...PagingOption) (*[]domain.Bucket, error) // CreateBucket creates a new bucket. CreateBucket(ctx context.Context, bucket *domain.Bucket) (*domain.Bucket, error) // CreateBucketWithName creates a new bucket with bucketName in organization org, with retention specified in rules. Empty rules means infinite retention. CreateBucketWithName(ctx context.Context, org *domain.Organization, bucketName string, rules ...domain.RetentionRule) (*domain.Bucket, error) // CreateBucketWithNameWithID creates a new bucket with bucketName in organization with orgID, with retention specified in rules. Empty rules means infinite retention. CreateBucketWithNameWithID(ctx context.Context, orgID, bucketName string, rules ...domain.RetentionRule) (*domain.Bucket, error) // UpdateBucket updates a bucket. UpdateBucket(ctx context.Context, bucket *domain.Bucket) (*domain.Bucket, error) // DeleteBucket deletes a bucket. DeleteBucket(ctx context.Context, bucket *domain.Bucket) error // DeleteBucketWithID deletes a bucket with bucketID. DeleteBucketWithID(ctx context.Context, bucketID string) error // GetMembers returns members of a bucket. GetMembers(ctx context.Context, bucket *domain.Bucket) (*[]domain.ResourceMember, error) // GetMembersWithID returns members of a bucket with bucketID. GetMembersWithID(ctx context.Context, bucketID string) (*[]domain.ResourceMember, error) // AddMember adds a member to a bucket. AddMember(ctx context.Context, bucket *domain.Bucket, user *domain.User) (*domain.ResourceMember, error) // AddMemberWithID adds a member with id memberID to a bucket with bucketID. AddMemberWithID(ctx context.Context, bucketID, memberID string) (*domain.ResourceMember, error) // RemoveMember removes a member from a bucket. RemoveMember(ctx context.Context, bucket *domain.Bucket, user *domain.User) error // RemoveMemberWithID removes a member with id memberID from a bucket with bucketID. RemoveMemberWithID(ctx context.Context, bucketID, memberID string) error // GetOwners returns owners of a bucket. GetOwners(ctx context.Context, bucket *domain.Bucket) (*[]domain.ResourceOwner, error) // GetOwnersWithID returns owners of a bucket with bucketID. GetOwnersWithID(ctx context.Context, bucketID string) (*[]domain.ResourceOwner, error) // AddOwner adds an owner to a bucket. AddOwner(ctx context.Context, bucket *domain.Bucket, user *domain.User) (*domain.ResourceOwner, error) // AddOwnerWithID adds an owner with id memberID to a bucket with bucketID. AddOwnerWithID(ctx context.Context, bucketID, memberID string) (*domain.ResourceOwner, error) // RemoveOwner removes an owner from a bucket. RemoveOwner(ctx context.Context, bucket *domain.Bucket, user *domain.User) error // RemoveOwnerWithID removes a member with id memberID from a bucket with bucketID. RemoveOwnerWithID(ctx context.Context, bucketID, memberID string) error }
BucketsAPI provides methods for managing Buckets in a InfluxDB server.
Example ¶
package main import ( "context" "github.com/influxdata/influxdb-client-go/v2/domain" influxdb2 "github.com/influxdata/influxdb-client-go/v2/internal/examples" ) func main() { // Create a new client using an InfluxDB server base URL and an authentication token client := influxdb2.NewClient("http://localhost:8086", "my-token") ctx := context.Background() // Get Buckets API client bucketsAPI := client.BucketsAPI() // Get organization that will own new bucket org, err := client.OrganizationsAPI().FindOrganizationByName(ctx, "my-org") if err != nil { panic(err) } // Create a bucket with 1 day retention policy bucket, err := bucketsAPI.CreateBucketWithName(ctx, org, "bucket-sensors", domain.RetentionRule{EverySeconds: 3600 * 24}) if err != nil { panic(err) } // Update description of the bucket desc := "Bucket for sensor data" bucket.Description = &desc bucket, err = bucketsAPI.UpdateBucket(ctx, bucket) if err != nil { panic(err) } // Close the client client.Close() }
Output:
func NewBucketsAPI ¶
func NewBucketsAPI(apiClient *domain.Client) BucketsAPI
NewBucketsAPI creates new instance of BucketsAPI
type DeleteAPI ¶
type DeleteAPI interface { // Delete deletes series selected by the time range specified by start and stop arguments and optional predicate string from the bucket bucket belonging to the organization org. Delete(ctx context.Context, org *domain.Organization, bucket *domain.Bucket, start, stop time.Time, predicate string) error // DeleteWithID deletes series selected by the time range specified by start and stop arguments and optional predicate string from the bucket with ID bucketID belonging to the organization with ID orgID. DeleteWithID(ctx context.Context, orgID, bucketID string, start, stop time.Time, predicate string) error // DeleteWithName deletes series selected by the time range specified by start and stop arguments and optional predicate string from the bucket with name bucketName belonging to the organization with name orgName. DeleteWithName(ctx context.Context, orgName, bucketName string, start, stop time.Time, predicate string) error }
DeleteAPI provides methods for deleting time series data from buckets. Deleted series are selected by the time range specified by start and stop arguments and optional predicate string which contains condition for selecting data for deletion, such as:
tag1="value1" and (tag2="value2" and tag3!="value3")
Empty predicate string means all data from the given time range will be deleted. See https://v2.docs.influxdata.com/v2.0/reference/syntax/delete-predicate/ for more info about predicate syntax.
Example ¶
package main import ( "context" "time" influxdb2 "github.com/influxdata/influxdb-client-go/v2/internal/examples" ) func main() { // Create a new client using an InfluxDB server base URL and an authentication token client := influxdb2.NewClient("http://localhost:8086", "my-token") ctx := context.Background() // Get Delete API client deleteAPI := client.DeleteAPI() // Delete last hour data with tag b = static err := deleteAPI.DeleteWithName(ctx, "org", "my-bucket", time.Now().Add(-time.Hour), time.Now(), "b=static") if err != nil { panic(err) } // Close the client client.Close() }
Output:
func NewDeleteAPI ¶
NewDeleteAPI creates new instance of DeleteAPI
type LabelsAPI ¶
type LabelsAPI interface { // GetLabels returns all labels. GetLabels(ctx context.Context) (*[]domain.Label, error) // FindLabelsByOrg returns labels belonging to organization org. FindLabelsByOrg(ctx context.Context, org *domain.Organization) (*[]domain.Label, error) // FindLabelsByOrgID returns labels belonging to organization with id orgID. FindLabelsByOrgID(ctx context.Context, orgID string) (*[]domain.Label, error) // FindLabelByID returns a label with labelID. FindLabelByID(ctx context.Context, labelID string) (*domain.Label, error) // FindLabelByName returns a label with name labelName under an organization orgID. FindLabelByName(ctx context.Context, orgID, labelName string) (*domain.Label, error) // CreateLabel creates a new label. CreateLabel(ctx context.Context, label *domain.LabelCreateRequest) (*domain.Label, error) // CreateLabelWithName creates a new label with label labelName and properties, under the organization org. // Properties example: {"color": "ffb3b3", "description": "this is a description"}. CreateLabelWithName(ctx context.Context, org *domain.Organization, labelName string, properties map[string]string) (*domain.Label, error) // CreateLabelWithNameWithID creates a new label with label labelName and properties, under the organization with id orgID. // Properties example: {"color": "ffb3b3", "description": "this is a description"}. CreateLabelWithNameWithID(ctx context.Context, orgID, labelName string, properties map[string]string) (*domain.Label, error) // UpdateLabel updates the label. // Properties can be removed by sending an update with an empty value. UpdateLabel(ctx context.Context, label *domain.Label) (*domain.Label, error) // DeleteLabelWithID deletes a label with labelID. DeleteLabelWithID(ctx context.Context, labelID string) error // DeleteLabel deletes a label. DeleteLabel(ctx context.Context, label *domain.Label) error }
LabelsAPI provides methods for managing labels in a InfluxDB server.
Example ¶
package main import ( "context" influxdb2 "github.com/influxdata/influxdb-client-go/v2/internal/examples" ) func main() { // Create a new client using an InfluxDB server base URL and an authentication token client := influxdb2.NewClient("http://localhost:8086", "my-token") ctx := context.Background() // Get Labels API client labelsAPI := client.LabelsAPI() // Get Organizations API client orgsAPI := client.OrganizationsAPI() // Get organization that will own label myorg, err := orgsAPI.FindOrganizationByName(ctx, "my-org") if err != nil { panic(err) } labelName := "Active State" props := map[string]string{"color": "33ffdd", "description": "Marks org active"} label, err := labelsAPI.CreateLabelWithName(ctx, myorg, labelName, props) if err != nil { panic(err) } // Change color property label.Properties.AdditionalProperties = map[string]string{"color": "ff1122"} label, err = labelsAPI.UpdateLabel(ctx, label) if err != nil { panic(err) } // Close the client client.Close() }
Output:
func NewLabelsAPI ¶
NewLabelsAPI creates new instance of LabelsAPI
type OrganizationsAPI ¶
type OrganizationsAPI interface { // GetOrganizations returns all organizations. // GetOrganizations supports PagingOptions: Offset, Limit, Descending GetOrganizations(ctx context.Context, pagingOptions ...PagingOption) (*[]domain.Organization, error) // FindOrganizationByName returns an organization found using orgName. FindOrganizationByName(ctx context.Context, orgName string) (*domain.Organization, error) // FindOrganizationByID returns an organization found using orgID. FindOrganizationByID(ctx context.Context, orgID string) (*domain.Organization, error) // FindOrganizationsByUserID returns organizations an user with userID belongs to. // FindOrganizationsByUserID supports PagingOptions: Offset, Limit, Descending FindOrganizationsByUserID(ctx context.Context, userID string, pagingOptions ...PagingOption) (*[]domain.Organization, error) // CreateOrganization creates new organization. CreateOrganization(ctx context.Context, org *domain.Organization) (*domain.Organization, error) // CreateOrganizationWithName creates new organization with orgName and with status active. CreateOrganizationWithName(ctx context.Context, orgName string) (*domain.Organization, error) // UpdateOrganization updates organization. UpdateOrganization(ctx context.Context, org *domain.Organization) (*domain.Organization, error) // DeleteOrganization deletes an organization. DeleteOrganization(ctx context.Context, org *domain.Organization) error // DeleteOrganizationWithID deletes an organization with orgID. DeleteOrganizationWithID(ctx context.Context, orgID string) error // GetMembers returns members of an organization. GetMembers(ctx context.Context, org *domain.Organization) (*[]domain.ResourceMember, error) // GetMembersWithID returns members of an organization with orgID. GetMembersWithID(ctx context.Context, orgID string) (*[]domain.ResourceMember, error) // AddMember adds a member to an organization. AddMember(ctx context.Context, org *domain.Organization, user *domain.User) (*domain.ResourceMember, error) // AddMemberWithID adds a member with id memberID to an organization with orgID. AddMemberWithID(ctx context.Context, orgID, memberID string) (*domain.ResourceMember, error) // RemoveMember removes a member from an organization. RemoveMember(ctx context.Context, org *domain.Organization, user *domain.User) error // RemoveMemberWithID removes a member with id memberID from an organization with orgID. RemoveMemberWithID(ctx context.Context, orgID, memberID string) error // GetOwners returns owners of an organization. GetOwners(ctx context.Context, org *domain.Organization) (*[]domain.ResourceOwner, error) // GetOwnersWithID returns owners of an organization with orgID. GetOwnersWithID(ctx context.Context, orgID string) (*[]domain.ResourceOwner, error) // AddOwner adds an owner to an organization. AddOwner(ctx context.Context, org *domain.Organization, user *domain.User) (*domain.ResourceOwner, error) // AddOwnerWithID adds an owner with id memberID to an organization with orgID. AddOwnerWithID(ctx context.Context, orgID, memberID string) (*domain.ResourceOwner, error) // RemoveOwner removes an owner from an organization. RemoveOwner(ctx context.Context, org *domain.Organization, user *domain.User) error // RemoveOwnerWithID removes an owner with id memberID from an organization with orgID. RemoveOwnerWithID(ctx context.Context, orgID, memberID string) error }
OrganizationsAPI provides methods for managing Organizations in a InfluxDB server.
Example ¶
package main import ( "context" influxdb2 "github.com/influxdata/influxdb-client-go/v2/internal/examples" ) func main() { // Create a new client using an InfluxDB server base URL and an authentication token client := influxdb2.NewClient("http://localhost:8086", "my-token") // Get Organizations API client orgAPI := client.OrganizationsAPI() // Create new organization org, err := orgAPI.CreateOrganizationWithName(context.Background(), "org-2") if err != nil { panic(err) } orgDescription := "My second org " org.Description = &orgDescription org, err = orgAPI.UpdateOrganization(context.Background(), org) if err != nil { panic(err) } // Find user to set owner user, err := client.UsersAPI().FindUserByName(context.Background(), "user-01") if err != nil { panic(err) } // Add another owner (first owner is the one who create organization _, err = orgAPI.AddOwner(context.Background(), org, user) if err != nil { panic(err) } // Create new user to add to org newUser, err := client.UsersAPI().CreateUserWithName(context.Background(), "user-02") if err != nil { panic(err) } // Add new user to organization _, err = orgAPI.AddMember(context.Background(), org, newUser) if err != nil { panic(err) } // Ensures background processes finishes client.Close() }
Output:
func NewOrganizationsAPI ¶
func NewOrganizationsAPI(apiClient *domain.Client) OrganizationsAPI
NewOrganizationsAPI creates new instance of OrganizationsAPI
type Paging ¶
type Paging struct {
// contains filtered or unexported fields
}
Paging holds pagination parameters for various Get* functions of InfluxDB 2 API Not the all options are usable for some Get* functions
type PagingOption ¶
type PagingOption func(p *Paging)
PagingOption is the function type for applying paging option
func PagingWithAfter ¶ added in v2.1.0
func PagingWithAfter(after string) PagingOption
PagingWithAfter set after option - the last resource ID from which to seek from (but not including). This is to be used instead of `offset`.
func PagingWithDescending ¶
func PagingWithDescending(descending bool) PagingOption
PagingWithDescending changes sorting direction
func PagingWithLimit ¶
func PagingWithLimit(limit int) PagingOption
PagingWithLimit sets limit option - maximum number of items returned.
func PagingWithOffset ¶
func PagingWithOffset(offset int) PagingOption
PagingWithOffset set starting offset for returning items. Default 0.
func PagingWithSortBy ¶
func PagingWithSortBy(sortBy string) PagingOption
PagingWithSortBy sets field name which should be used for sorting
type QueryAPI ¶
type QueryAPI interface { // QueryRaw executes flux query on the InfluxDB server and returns complete query result as a string with table annotations according to dialect QueryRaw(ctx context.Context, query string, dialect *domain.Dialect) (string, error) // QueryRawWithParams executes flux parametrized query on the InfluxDB server and returns complete query result as a string with table annotations according to dialect QueryRawWithParams(ctx context.Context, query string, dialect *domain.Dialect, params interface{}) (string, error) // Query executes flux query on the InfluxDB server and returns QueryTableResult which parses streamed response into structures representing flux table parts Query(ctx context.Context, query string) (*QueryTableResult, error) // QueryWithParams executes flux parametrized query on the InfluxDB server and returns QueryTableResult which parses streamed response into structures representing flux table parts QueryWithParams(ctx context.Context, query string, params interface{}) (*QueryTableResult, error) }
QueryAPI provides methods for performing synchronously flux query against InfluxDB server.
Flux query can contain reference to parameters, which must be passed via queryParams. it can be a struct or map. Param values can be only simple types or time.Time. The name of a struct field or a map key (must be a string) will be a param name. The name of the parameter represented by a struct field can be specified by JSON annotation:
type Condition struct { Start time.Time `json:"start"` Field string `json:"field"` Value float64 `json:"value"` } Parameters are then accessed via the Flux params object: query:= `from(bucket: "environment") |> range(start: time(v: params.start)) |> filter(fn: (r) => r._measurement == "air") |> filter(fn: (r) => r._field == params.field) |> filter(fn: (r) => r._value > params.value)`
Example (Query) ¶
package main import ( "context" "fmt" influxdb2 "github.com/influxdata/influxdb-client-go/v2/internal/examples" ) func main() { // Create a new client using an InfluxDB server base URL and an authentication token client := influxdb2.NewClient("http://localhost:8086", "my-token") // Get query client queryAPI := client.QueryAPI("my-org") // get QueryTableResult result, err := queryAPI.Query(context.Background(), `from(bucket:"my-bucket")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "stat")`) if err == nil { // Iterate over query response for result.Next() { // Notice when group key has changed if result.TableChanged() { fmt.Printf("table: %s\n", result.TableMetadata().String()) } // Access data fmt.Printf("value: %v\n", result.Record().Value()) } // check for an error if result.Err() != nil { fmt.Printf("query parsing error: %s\n", result.Err().Error()) } } else { panic(err) } // Ensures background processes finishes client.Close() }
Output:
Example (QueryRaw) ¶
package main import ( "context" "fmt" "github.com/influxdata/influxdb-client-go/v2/api" influxdb2 "github.com/influxdata/influxdb-client-go/v2/internal/examples" ) func main() { // Create a new client using an InfluxDB server base URL and an authentication token client := influxdb2.NewClient("http://localhost:8086", "my-token") // Get query client queryAPI := client.QueryAPI("my-org") // Query and get complete result as a string // Use default dialect result, err := queryAPI.QueryRaw(context.Background(), `from(bucket:"my-bucket")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "stat")`, api.DefaultDialect()) if err == nil { fmt.Println("QueryResult:") fmt.Println(result) } else { panic(err) } // Ensures background processes finishes client.Close() }
Output:
Example (QueryWithParams) ¶
package main import ( "context" "fmt" influxdb2 "github.com/influxdata/influxdb-client-go/v2/internal/examples" ) func main() { // Create a new client using an InfluxDB server base URL and an authentication token client := influxdb2.NewClient("http://localhost:8086", "my-token") // Get query client queryAPI := client.QueryAPI("my-org") // Define parameters parameters := struct { Start string `json:"start"` Field string `json:"field"` Value float64 `json:"value"` }{ "-1h", "temperature", 25, } // Query with parameters query := `from(bucket:"my-bucket") |> range(start: duration(params.start)) |> filter(fn: (r) => r._measurement == "stat") |> filter(fn: (r) => r._field == params.field) |> filter(fn: (r) => r._value > params.value)` // Get result result, err := queryAPI.QueryWithParams(context.Background(), query, parameters) if err == nil { // Iterate over query response for result.Next() { // Notice when group key has changed if result.TableChanged() { fmt.Printf("table: %s\n", result.TableMetadata().String()) } // Access data fmt.Printf("value: %v\n", result.Record().Value()) } // check for an error if result.Err() != nil { fmt.Printf("query parsing error: %s\n", result.Err().Error()) } } else { panic(err) } // Ensures background processes finishes client.Close() }
Output:
type QueryTableResult ¶
QueryTableResult parses streamed flux query response into structures representing flux table parts Walking though the result is done by repeatedly calling Next() until returns false. Actual flux table info (columns with names, data types, etc) is returned by TableMetadata() method. Data are acquired by Record() method. Preliminary end can be caused by an error, so when Next() return false, check Err() for an error
func NewQueryTableResult ¶ added in v2.8.0
func NewQueryTableResult(rawResponse io.ReadCloser) *QueryTableResult
NewQueryTableResult returns new QueryTableResult
func (*QueryTableResult) Close ¶ added in v2.4.0
func (q *QueryTableResult) Close() error
Close reads remaining data and closes underlying Closer
func (*QueryTableResult) Err ¶
func (q *QueryTableResult) Err() error
Err returns an error raised during flux query response parsing
func (*QueryTableResult) Next ¶
func (q *QueryTableResult) Next() bool
Next advances to next row in query result. During the first time it is called, Next creates also table metadata Actual parsed row is available through Record() function Returns false in case of end or an error, otherwise true
func (*QueryTableResult) Record ¶
func (q *QueryTableResult) Record() *query.FluxRecord
Record returns last parsed flux table data row Use Record methods to access value and row properties
func (*QueryTableResult) TableChanged ¶
func (q *QueryTableResult) TableChanged() bool
TableChanged returns true if last call of Next() found also new result table Table information is available via TableMetadata method
func (*QueryTableResult) TableMetadata ¶
func (q *QueryTableResult) TableMetadata() *query.FluxTableMetadata
TableMetadata returns actual flux table metadata
func (*QueryTableResult) TablePosition ¶
func (q *QueryTableResult) TablePosition() int
TablePosition returns actual flux table position in the result, or -1 if no table was found yet Each new table is introduced by an annotation in csv
type RunFilter ¶ added in v2.2.0
type RunFilter struct { // Return runs after a specified ID. After string // The number of runs to return. // Default 100, minimum 1, maximum 500. Limit int // Filter runs to those scheduled before this time. BeforeTime time.Time // Filter runs to those scheduled after this time. AfterTime time.Time }
RunFilter defines filtering options for FindRun* functions.
type TaskFilter ¶ added in v2.2.0
type TaskFilter struct { // Returns task with a specific name Name string // Filter tasks to a specific organization name. OrgName string // Filter tasks to a specific organization ID. OrgID string // Filter tasks to a specific user ID. User string // Filter tasks by a status--"inactive" or "active". Status domain.TaskStatusType // Return tasks after a specified ID. After string // The number of tasks to return. // Default 100, minimum: 1, maximum 500 Limit int }
TaskFilter defines filtering options for FindTasks functions.
type TasksAPI ¶ added in v2.2.0
type TasksAPI interface { // FindTasks retrieves tasks according to the filter. More fields can be applied. Filter can be nil. FindTasks(ctx context.Context, filter *TaskFilter) ([]domain.Task, error) // GetTask retrieves a refreshed instance of task. GetTask(ctx context.Context, task *domain.Task) (*domain.Task, error) // GetTaskByID retrieves a task found using taskID. GetTaskByID(ctx context.Context, taskID string) (*domain.Task, error) // CreateTask creates a new task according the task object. // It copies OrgId, Name, Description, Flux, Status and Every or Cron properties. Every and Cron are mutually exclusive. // Every has higher priority. CreateTask(ctx context.Context, task *domain.Task) (*domain.Task, error) // CreateTaskWithEvery creates a new task with the name, flux script and every repetition setting, in the org orgID. // Every means duration values. CreateTaskWithEvery(ctx context.Context, name, flux, every, orgID string) (*domain.Task, error) // CreateTaskWithCron creates a new task with the name, flux script and cron repetition setting, in the org orgID // Cron holds cron-like setting, e.g. once an hour at beginning of the hour "0 * * * *". CreateTaskWithCron(ctx context.Context, name, flux, cron, orgID string) (*domain.Task, error) // CreateTaskByFlux creates a new task with complete definition in flux script, in the org orgID CreateTaskByFlux(ctx context.Context, flux, orgID string) (*domain.Task, error) // UpdateTask updates a task. // It copies Description, Flux, Status, Offset and Every or Cron properties. Every and Cron are mutually exclusive. // Every has higher priority. UpdateTask(ctx context.Context, task *domain.Task) (*domain.Task, error) // DeleteTask deletes a task. DeleteTask(ctx context.Context, task *domain.Task) error // DeleteTaskWithID deletes a task with taskID. DeleteTaskWithID(ctx context.Context, taskID string) error // FindMembers retrieves members of a task. FindMembers(ctx context.Context, task *domain.Task) ([]domain.ResourceMember, error) // FindMembersWithID retrieves members of a task with taskID. FindMembersWithID(ctx context.Context, taskID string) ([]domain.ResourceMember, error) // AddMember adds a member to a task. AddMember(ctx context.Context, task *domain.Task, user *domain.User) (*domain.ResourceMember, error) // AddMemberWithID adds a member with id memberID to a task with taskID. AddMemberWithID(ctx context.Context, taskID, memberID string) (*domain.ResourceMember, error) // RemoveMember removes a member from a task. RemoveMember(ctx context.Context, task *domain.Task, user *domain.User) error // RemoveMemberWithID removes a member with id memberID from a task with taskID. RemoveMemberWithID(ctx context.Context, taskID, memberID string) error // FindOwners retrieves owners of a task. FindOwners(ctx context.Context, task *domain.Task) ([]domain.ResourceOwner, error) // FindOwnersWithID retrieves owners of a task with taskID. FindOwnersWithID(ctx context.Context, taskID string) ([]domain.ResourceOwner, error) // AddOwner adds an owner to a task. AddOwner(ctx context.Context, task *domain.Task, user *domain.User) (*domain.ResourceOwner, error) // AddOwnerWithID adds an owner with id memberID to a task with taskID. AddOwnerWithID(ctx context.Context, taskID, memberID string) (*domain.ResourceOwner, error) // RemoveOwner removes an owner from a task. RemoveOwner(ctx context.Context, task *domain.Task, user *domain.User) error // RemoveOwnerWithID removes a member with id memberID from a task with taskID. RemoveOwnerWithID(ctx context.Context, taskID, memberID string) error // FindRuns retrieves a task runs according the filter. More fields can be applied. Filter can be nil. FindRuns(ctx context.Context, task *domain.Task, filter *RunFilter) ([]domain.Run, error) // FindRunsWithID retrieves runs of a task with taskID according the filter. More fields can be applied. Filter can be nil. FindRunsWithID(ctx context.Context, taskID string, filter *RunFilter) ([]domain.Run, error) // GetRun retrieves a refreshed instance if a task run. GetRun(ctx context.Context, run *domain.Run) (*domain.Run, error) // GetRunByID retrieves a specific task run by taskID and runID GetRunByID(ctx context.Context, taskID, runID string) (*domain.Run, error) // FindRunLogs return all log events for a task run. FindRunLogs(ctx context.Context, run *domain.Run) ([]domain.LogEvent, error) // FindRunLogsWithID return all log events for a run with runID of a task with taskID. FindRunLogsWithID(ctx context.Context, taskID, runID string) ([]domain.LogEvent, error) // RunManually manually start a run of the task now, overriding the current schedule. RunManually(ctx context.Context, task *domain.Task) (*domain.Run, error) // RunManuallyWithID manually start a run of a task with taskID now, overriding the current schedule. RunManuallyWithID(ctx context.Context, taskID string) (*domain.Run, error) // RetryRun retry a task run. RetryRun(ctx context.Context, run *domain.Run) (*domain.Run, error) // RetryRunWithID retry a run with runID of a task with taskID. RetryRunWithID(ctx context.Context, taskID, runID string) (*domain.Run, error) // CancelRun cancels a running task. CancelRun(ctx context.Context, run *domain.Run) error // CancelRunWithID cancels a running task. CancelRunWithID(ctx context.Context, taskID, runID string) error // FindLogs retrieves all logs for a task. FindLogs(ctx context.Context, task *domain.Task) ([]domain.LogEvent, error) // FindLogsWithID retrieves all logs for a task with taskID. FindLogsWithID(ctx context.Context, taskID string) ([]domain.LogEvent, error) // FindLabels retrieves labels of a task. FindLabels(ctx context.Context, task *domain.Task) ([]domain.Label, error) // FindLabelsWithID retrieves labels of a task with taskID. FindLabelsWithID(ctx context.Context, taskID string) ([]domain.Label, error) // AddLabel adds a label to a task. AddLabel(ctx context.Context, task *domain.Task, label *domain.Label) (*domain.Label, error) // AddLabelWithID adds a label with id labelID to a task with taskID. AddLabelWithID(ctx context.Context, taskID, labelID string) (*domain.Label, error) // RemoveLabel removes a label from a task. RemoveLabel(ctx context.Context, task *domain.Task, label *domain.Label) error // RemoveLabelWithID removes a label with id labelID from a task with taskID. RemoveLabelWithID(ctx context.Context, taskID, labelID string) error }
TasksAPI provides methods for managing tasks and task runs in an InfluxDB server.
Example ¶
package main import ( "context" "fmt" influxdb2 "github.com/influxdata/influxdb-client-go/v2/internal/examples" ) func main() { // Create a new client using an InfluxDB server base URL and an authentication token client := influxdb2.NewClient("http://localhost:8086", "my-token") ctx := context.Background() // Get Delete API client tasksAPI := client.TasksAPI() // Get organization that will own task myorg, err := client.OrganizationsAPI().FindOrganizationByName(ctx, "my-org") if err != nil { panic(err) } // task flux script from https://www.influxdata.com/blog/writing-tasks-and-setting-up-alerts-for-influxdb-cloud/ flux := `fruitCollected = from(bucket: “farming”) |> range(start: -task.every) |> filter(fn: (r) => (r._measurement == “totalFruitsCollected)) |> filter(fn: (r) => (r._field == “fruits)) |> group(columns: [“farmName”]) |> aggregateWindow(fn: sum, every: task.every) |> map(fn: (r) => { return: _time: r._time, _stop: r._stop, _start: r._start, _measurement: “fruitCollectionRate”, _field: “fruits”, _value: r._value, farmName: farmName, } }) fruitCollected |> to(bucket: “farming”) ` task, err := tasksAPI.CreateTaskWithEvery(ctx, "fruitCollectedRate", flux, "1h", *myorg.Id) if err != nil { panic(err) } // Force running a task run, err := tasksAPI.RunManually(ctx, task) if err != nil { panic(err) } fmt.Println("Forced run completed on ", *run.FinishedAt, " with status ", *run.Status) // Print logs logs, err := tasksAPI.FindRunLogs(ctx, run) if err != nil { panic(err) } fmt.Println("Log:") for _, logEvent := range logs { fmt.Println(" Time:", *logEvent.Time, ", Message: ", *logEvent.Message) } // Close the client client.Close() }
Output:
func NewTasksAPI ¶ added in v2.2.0
NewTasksAPI creates new instance of TasksAPI
type UsersAPI ¶
type UsersAPI interface { // GetUsers returns all users GetUsers(ctx context.Context) (*[]domain.User, error) // FindUserByID returns user with userID FindUserByID(ctx context.Context, userID string) (*domain.User, error) // FindUserByName returns user with name userName FindUserByName(ctx context.Context, userName string) (*domain.User, error) // CreateUser creates new user CreateUser(ctx context.Context, user *domain.User) (*domain.User, error) // CreateUserWithName creates new user with userName CreateUserWithName(ctx context.Context, userName string) (*domain.User, error) // UpdateUser updates user UpdateUser(ctx context.Context, user *domain.User) (*domain.User, error) // UpdateUserPassword sets password for a user UpdateUserPassword(ctx context.Context, user *domain.User, password string) error // UpdateUserPasswordWithID sets password for a user with userID UpdateUserPasswordWithID(ctx context.Context, userID string, password string) error // DeleteUserWithID deletes an user with userID DeleteUserWithID(ctx context.Context, userID string) error // DeleteUser deletes an user DeleteUser(ctx context.Context, user *domain.User) error // Me returns actual user Me(ctx context.Context) (*domain.User, error) // MeUpdatePassword set password of actual user MeUpdatePassword(ctx context.Context, oldPassword, newPassword string) error // SignIn exchanges username and password credentials to establish an authenticated session with the InfluxDB server. The Client's authentication token is then ignored, it can be empty. SignIn(ctx context.Context, username, password string) error // SignOut signs out previously signed-in user SignOut(ctx context.Context) error }
UsersAPI provides methods for managing users in a InfluxDB server
Example ¶
package main import ( "context" influxdb2 "github.com/influxdata/influxdb-client-go/v2/internal/examples" ) func main() { // Create a new client using an InfluxDB server base URL and an authentication token client := influxdb2.NewClient("http://localhost:8086", "my-token") // Find organization org, err := client.OrganizationsAPI().FindOrganizationByName(context.Background(), "my-org") if err != nil { panic(err) } // Get users API client usersAPI := client.UsersAPI() // Create new user user, err := usersAPI.CreateUserWithName(context.Background(), "user-01") if err != nil { panic(err) } // Set user password err = usersAPI.UpdateUserPassword(context.Background(), user, "pass-at-least-8-chars") if err != nil { panic(err) } // Add user to organization _, err = client.OrganizationsAPI().AddMember(context.Background(), org, user) if err != nil { panic(err) } // Ensures background processes finishes client.Close() }
Output:
Example (SignInOut) ¶
package main import ( "context" influxdb2 "github.com/influxdata/influxdb-client-go/v2/internal/examples" ) func main() { // Create a new client using an InfluxDB server base URL and empty token client := influxdb2.NewClient("http://localhost:8086", "") // Always close client at the end defer client.Close() ctx := context.Background() // The first call must be signIn err := client.UsersAPI().SignIn(ctx, "username", "password") if err != nil { panic(err) } // Perform some authorized operations err = client.WriteAPIBlocking("my-org", "my-bucket").WriteRecord(ctx, "test,a=rock,b=local f=1.2,i=-5i") if err != nil { panic(err) } // Sign out at the end err = client.UsersAPI().SignOut(ctx) if err != nil { panic(err) } }
Output:
type WriteAPI ¶
type WriteAPI interface { // WriteRecord writes asynchronously line protocol record into bucket. // WriteRecord adds record into the buffer which is sent on the background when it reaches the batch size. // Blocking alternative is available in the WriteAPIBlocking interface WriteRecord(line string) // WritePoint writes asynchronously Point into bucket. // WritePoint adds Point into the buffer which is sent on the background when it reaches the batch size. // Blocking alternative is available in the WriteAPIBlocking interface WritePoint(point *write.Point) // Flush forces all pending writes from the buffer to be sent Flush() // Errors returns a channel for reading errors which occurs during async writes. // Must be called before performing any writes for errors to be collected. // The chan is unbuffered and must be drained or the writer will block. Errors() <-chan error // SetWriteFailedCallback sets callback allowing custom handling of failed writes. // If callback returns true, failed batch will be retried, otherwise discarded. SetWriteFailedCallback(cb WriteFailedCallback) }
WriteAPI is Write client interface with non-blocking methods for writing time series data asynchronously in batches into an InfluxDB server. WriteAPI can be used concurrently. When using multiple goroutines for writing, use a single WriteAPI instance in all goroutines.
Example ¶
package main import ( "fmt" "math/rand" "time" "github.com/influxdata/influxdb-client-go/v2/api/write" influxdb2 "github.com/influxdata/influxdb-client-go/v2/internal/examples" ) func main() { // Create a new client using an InfluxDB server base URL and an authentication token client := influxdb2.NewClient("http://localhost:8086", "my-token") // Get non-blocking write client writeAPI := client.WriteAPI("my-org", "my-bucket") // write some points for i := 0; i < 100; i++ { // create point p := write.NewPoint( "system", map[string]string{ "id": fmt.Sprintf("rack_%v", i%10), "vendor": "AWS", "hostname": fmt.Sprintf("host_%v", i%100), }, map[string]interface{}{ "temperature": rand.Float64() * 80.0, "disk_free": rand.Float64() * 1000.0, "disk_total": (i/10 + 1) * 1000000, "mem_total": (i/100 + 1) * 10000000, "mem_free": rand.Uint64(), }, time.Now()) // write asynchronously writeAPI.WritePoint(p) } // Force all unwritten data to be sent writeAPI.Flush() // Ensures background processes finishes client.Close() }
Output:
Example (Errors) ¶
package main import ( "fmt" "math/rand" "time" apiHttp "github.com/influxdata/influxdb-client-go/v2/api/http" "github.com/influxdata/influxdb-client-go/v2/api/write" influxdb2 "github.com/influxdata/influxdb-client-go/v2/internal/examples" ) func main() { // Create a new client using an InfluxDB server base URL and an authentication token client := influxdb2.NewClient("http://localhost:8086", "my-token") // Get non-blocking write client writeAPI := client.WriteAPI("my-org", "my-bucket") // Get errors channel errorsCh := writeAPI.Errors() // Create go proc for reading and logging errors go func() { for err := range errorsCh { fmt.Printf("write error: %s\n", err.Error()) fmt.Printf("trace-id: %s\n", err.(*apiHttp.Error).Header.Get("Trace-ID")) } }() // write some points for i := 0; i < 100; i++ { // create point p := write.NewPointWithMeasurement("stat"). AddTag("id", fmt.Sprintf("rack_%v", i%10)). AddTag("vendor", "AWS"). AddTag("hostname", fmt.Sprintf("host_%v", i%100)). AddField("temperature", rand.Float64()*80.0). AddField("disk_free", rand.Float64()*1000.0). AddField("disk_total", (i/10+1)*1000000). AddField("mem_total", (i/100+1)*10000000). AddField("mem_free", rand.Uint64()). SetTime(time.Now()) // write asynchronously writeAPI.WritePoint(p) } // Force all unwritten data to be sent writeAPI.Flush() // Ensures background processes finishes client.Close() }
Output:
type WriteAPIBlocking ¶
type WriteAPIBlocking interface { // WriteRecord writes line protocol record(s) into bucket. // WriteRecord writes lines without implicit batching by default, batch is created from given number of records. // Automatic batching can be enabled by EnableBatching() // Individual arguments can also be batches (multiple records separated by newline). // Non-blocking alternative is available in the WriteAPI interface WriteRecord(ctx context.Context, line ...string) error // WritePoint data point into bucket. // WriteRecord writes points without implicit batching by default, batch is created from given number of points. // Automatic batching can be enabled by EnableBatching(). // Non-blocking alternative is available in the WriteAPI interface WritePoint(ctx context.Context, point ...*write.Point) error // EnableBatching turns on implicit batching // Batch size is controlled via write.Options EnableBatching() // Flush forces write of buffer if batching is enabled, even buffer doesn't have the batch-size. Flush(ctx context.Context) error }
WriteAPIBlocking can be used concurrently. When using multiple goroutines for writing, use a single WriteAPIBlocking instance in all goroutines.
Example ¶
package main import ( "context" "fmt" "math/rand" "time" "github.com/influxdata/influxdb-client-go/v2/api/write" influxdb2 "github.com/influxdata/influxdb-client-go/v2/internal/examples" ) func main() { // Create a new client using an InfluxDB server base URL and an authentication token client := influxdb2.NewClient("http://localhost:8086", "my-token") // Get blocking write client writeAPI := client.WriteAPIBlocking("my-org", "my-bucket") // write some points for i := 0; i < 100; i++ { // create data point p := write.NewPoint( "system", map[string]string{ "id": fmt.Sprintf("rack_%v", i%10), "vendor": "AWS", "hostname": fmt.Sprintf("host_%v", i%100), }, map[string]interface{}{ "temperature": rand.Float64() * 80.0, "disk_free": rand.Float64() * 1000.0, "disk_total": (i/10 + 1) * 1000000, "mem_total": (i/100 + 1) * 10000000, "mem_free": rand.Uint64(), }, time.Now()) // write synchronously err := writeAPI.WritePoint(context.Background(), p) if err != nil { panic(err) } } // Ensures background processes finishes client.Close() }
Output:
func NewWriteAPIBlocking ¶
func NewWriteAPIBlocking(org string, bucket string, service http2.Service, writeOptions *write.Options) WriteAPIBlocking
NewWriteAPIBlocking creates new instance of blocking write client for writing data to bucket belonging to org
func NewWriteAPIBlockingWithBatching ¶ added in v2.10.0
func NewWriteAPIBlockingWithBatching(org string, bucket string, service http2.Service, writeOptions *write.Options) WriteAPIBlocking
NewWriteAPIBlockingWithBatching creates new instance of blocking write client for writing data to bucket belonging to org with batching enabled
type WriteAPIImpl ¶
type WriteAPIImpl struct {
// contains filtered or unexported fields
}
WriteAPIImpl provides main implementation for WriteAPI
func NewWriteAPI ¶
func NewWriteAPI(org string, bucket string, service http2.Service, writeOptions *write.Options) *WriteAPIImpl
NewWriteAPI returns new non-blocking write client for writing data to bucket belonging to org
func (*WriteAPIImpl) Close ¶
func (w *WriteAPIImpl) Close()
Close finishes outstanding write operations, stop background routines and closes all channels
func (*WriteAPIImpl) Errors ¶
func (w *WriteAPIImpl) Errors() <-chan error
Errors returns a channel for reading errors which occurs during async writes. Must be called before performing any writes for errors to be collected. New error is skipped when channel is not read.
func (*WriteAPIImpl) Flush ¶
func (w *WriteAPIImpl) Flush()
Flush forces all pending writes from the buffer to be sent. Flush also tries sending batches from retry queue without additional retrying.
func (*WriteAPIImpl) SetWriteFailedCallback ¶ added in v2.5.0
func (w *WriteAPIImpl) SetWriteFailedCallback(cb WriteFailedCallback)
SetWriteFailedCallback sets callback allowing custom handling of failed writes. If callback returns true, failed batch will be retried, otherwise discarded.
func (*WriteAPIImpl) WritePoint ¶
func (w *WriteAPIImpl) WritePoint(point *write.Point)
WritePoint writes asynchronously Point into bucket. WritePoint adds Point into the buffer which is sent on the background when it reaches the batch size. Blocking alternative is available in the WriteAPIBlocking interface
func (*WriteAPIImpl) WriteRecord ¶
func (w *WriteAPIImpl) WriteRecord(line string)
WriteRecord writes asynchronously line protocol record into bucket. WriteRecord adds record into the buffer which is sent on the background when it reaches the batch size. Blocking alternative is available in the WriteAPIBlocking interface
type WriteFailedCallback ¶ added in v2.5.0
WriteFailedCallback is synchronously notified in case non-blocking write fails. batch contains complete payload, error holds detailed error information, retryAttempts means number of retries, 0 if it failed during first write. It must return true if WriteAPI should continue with retrying, false will discard the batch.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package http provides HTTP servicing related code.
|
Package http provides HTTP servicing related code. |
Package query defined types for representing flux query result
|
Package query defined types for representing flux query result |
Package write provides the Point struct
|
Package write provides the Point struct |