client

package
v1.9.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 12, 2023 License: MIT Imports: 19 Imported by: 927

README

InfluxDB Client

GoDoc

Description

NOTE: The Go client library now has a "v2" version, with the old version being deprecated. The new version can be imported at import "github.com/influxdata/influxdb/client/v2". It is not backwards-compatible.

A Go client library written and maintained by the InfluxDB team. This package provides convenience functions to read and write time series data. It uses the HTTP protocol to communicate with your InfluxDB cluster.

Getting Started

Connecting To Your Database

Connecting to an InfluxDB database is straightforward. You will need a host name, a port and the cluster user credentials if applicable. The default port is 8086. You can customize these settings to your specific installation via the InfluxDB configuration file.

Though not necessary for experimentation, you may want to create a new user and authenticate the connection to your database.

For more information please check out the Admin Docs.

For the impatient, you can create a new admin user bubba by firing off the InfluxDB CLI.

influx
> create user bubba with password 'bumblebeetuna'
> grant all privileges to bubba

And now for good measure set the credentials in you shell environment. In the example below we will use $INFLUX_USER and $INFLUX_PWD

Now with the administrivia out of the way, let's connect to our database.

NOTE: If you've opted out of creating a user, you can omit Username and Password in the configuration below.

package main

import (
	"log"
	"time"

	"github.com/influxdata/influxdb/client/v2"
)

const (
	MyDB = "square_holes"
	username = "bubba"
	password = "bumblebeetuna"
)


func main() {
	// Create a new HTTPClient
	c, err := client.NewHTTPClient(client.HTTPConfig{
		Addr:     "http://localhost:8086",
		Username: username,
		Password: password,
	})
	if err != nil {
		log.Fatal(err)
	}
	defer c.Close()

	// Create a new point batch
	bp, err := client.NewBatchPoints(client.BatchPointsConfig{
		Database:  MyDB,
		Precision: "s",
	})
	if err != nil {
		log.Fatal(err)
	}

	// Create a point and add to batch
	tags := map[string]string{"cpu": "cpu-total"}
	fields := map[string]interface{}{
		"idle":   10.1,
		"system": 53.3,
		"user":   46.6,
	}

	pt, err := client.NewPoint("cpu_usage", tags, fields, time.Now())
	if err != nil {
		log.Fatal(err)
	}
	bp.AddPoint(pt)

	// Write the batch
	if err := c.Write(bp); err != nil {
		log.Fatal(err)
	}

	// Close client resources
	if err := c.Close(); err != nil {
    		log.Fatal(err)
	}
}

Inserting Data

Time series data aka points are written to the database using batch inserts. The mechanism is to create one or more points and then create a batch aka batch points and write these to a given database and series. A series is a combination of a measurement (time/values) and a set of tags.

In this sample we will create a batch of a 1,000 points. Each point has a time and a single value as well as 2 tags indicating a shape and color. We write these points to a database called square_holes using a measurement named shapes.

NOTE: You can specify a RetentionPolicy as part of the batch points. If not provided InfluxDB will use the database default retention policy.


func writePoints(clnt client.Client) {
	sampleSize := 1000

	bp, err := client.NewBatchPoints(client.BatchPointsConfig{
		Database:  "systemstats",
		Precision: "us",
	})
	if err != nil {
		log.Fatal(err)
	}

    rand.Seed(time.Now().UnixNano())
	for i := 0; i < sampleSize; i++ {
		regions := []string{"us-west1", "us-west2", "us-west3", "us-east1"}
		tags := map[string]string{
			"cpu":    "cpu-total",
			"host":   fmt.Sprintf("host%d", rand.Intn(1000)),
			"region": regions[rand.Intn(len(regions))],
		}

		idle := rand.Float64() * 100.0
		fields := map[string]interface{}{
			"idle": idle,
			"busy": 100.0 - idle,
		}

		pt, err := client.NewPoint(
			"cpu_usage",
			tags,
			fields,
			time.Now(),
		)
		if err != nil {
			log.Fatal(err)
		}
		bp.AddPoint(pt)
	}

	if err := clnt.Write(bp); err != nil {
		log.Fatal(err)
	}
}
Uint64 Support

The uint64 data type is supported if your server is version 1.4.0 or greater. To write a data point as an unsigned integer, you must insert the point as uint64. You cannot use uint or any of the other derivatives because previous versions of the client have supported writing those types as an integer.

Querying Data

One nice advantage of using InfluxDB the ability to query your data using familiar SQL constructs. In this example we can create a convenience function to query the database as follows:

// queryDB convenience function to query the database
func queryDB(clnt client.Client, cmd string, params map[string]interface{}) (res []client.Result, err error) {
	q := client.Query{
		Command:    cmd,
		Database:   MyDB,
		Parameters: params,
	}
	if response, err := clnt.Query(q); err == nil {
		if response.Error() != nil {
			return res, response.Error()
		}
		res = response.Results
	} else {
		return res, err
	}
	return res, nil
}
Parameter support

The special parameters such as client.Identifier and client.IntegerValue are only supported in version 1.8 or greater. Versions before 1.8 only support literal types like string, int64, float64, and bool. The below queries cannot use parameters in versions before 1.8 and will have to use fmt.Sprintf to construct the query instead. Constructing queries using fmt.Sprintf is unsafe and vulnerable to query injection if they are constructed with user data.

Creating a Database
_, err := queryDB(clnt, "CREATE DATABASE $db", client.Params{
	"db": client.Identifier(MyDB),
})
if err != nil {
	log.Fatal(err)
}
Count Records
q := "SELECT count($field) FROM $m"
res, err := queryDB(clnt, q, client.Params{
	"field": client.Identifier("value"),
	"m":     client.Identifier(MyMeasurement),
})
if err != nil {
	log.Fatal(err)
}
count := res[0].Series[0].Values[0][1]
log.Printf("Found a total of %v records\n", count)
Find the last 10 shapes records
q := "SELECT * FROM $m LIMIT $n"
res, err = queryDB(clnt, q, client.Params{
	"m": client.Identifier(MyMeasurement),
	"n": 10,
})
if err != nil {
	log.Fatal(err)
}

for i, row := range res[0].Series[0].Values {
	t, err := time.Parse(time.RFC3339, row[0].(string))
	if err != nil {
		log.Fatal(err)
	}
	val := row[1].(string)
	log.Printf("[%2d] %s: %s\n", i, t.Format(time.Stamp), val)
}
Using the UDP Client

The InfluxDB client also supports writing over UDP.

func WriteUDP() {
	// Make client
	// PayloadSize is default value(512)
	c, err := client.NewUDPClient(client.UDPConfig{Addr: "localhost:8089"})
	if err != nil {
		panic(err.Error())
	}

	// Create a new point batch
	bp, _ := client.NewBatchPoints(client.BatchPointsConfig{
		Precision: "s",
	})

	// Create a point and add to batch
	tags := map[string]string{"cpu": "cpu-total"}
	fields := map[string]interface{}{
		"idle":   10.1,
		"system": 53.3,
		"user":   46.6,
	}
	pt, err := client.NewPoint("cpu_usage", tags, fields, time.Now())
	if err != nil {
		panic(err.Error())
	}
	bp.AddPoint(pt)

	// Write the batch
	c.Write(bp)
}
Point Splitting

The UDP client now supports splitting single points that exceed the configured payload size. The logic for processing each point is listed here, starting with an empty payload.

  1. If adding the point to the current (non-empty) payload would exceed the configured size, send the current payload. Otherwise, add it to the current payload.
  2. If the point is smaller than the configured size, add it to the payload.
  3. If the point has no timestamp, just try to send the entire point as a single UDP payload, and process the next point.
  4. Since the point has a timestamp, re-use the existing measurement name, tagset, and timestamp and create multiple new points by splitting up the fields. The per-point length will be kept close to the configured size, staying under it if possible. This does mean that one large field, maybe a long string, could be sent as a larger-than-configured payload.

The above logic attempts to respect configured payload sizes, but not sacrifice any data integrity. Points without a timestamp can't be split, as that may cause fields to have differing timestamps when processed by the server.

Go Docs

Please refer to http://godoc.org/github.com/influxdata/influxdb/client/v2 for documentation.

See Also

You can also examine how the client library is used by the InfluxDB CLI.

Documentation

Overview

Package client implements a now-deprecated client for InfluxDB; use github.com/influxdata/influxdb/client/v2 instead.

Index

Examples

Constants

View Source
const (
	// DefaultHost is the default host used to connect to an InfluxDB instance
	DefaultHost = "localhost"

	// DefaultPort is the default port used to connect to an InfluxDB instance
	DefaultPort = 8086

	// DefaultTimeout is the default connection timeout used to connect to an InfluxDB instance
	DefaultTimeout = 0
	DefaultPath    = ""
)
View Source
const (
	// ConsistencyOne requires at least one data node acknowledged a write.
	ConsistencyOne = "one"

	// ConsistencyAll requires all data nodes to acknowledge a write.
	ConsistencyAll = "all"

	// ConsistencyQuorum requires a quorum of data nodes to acknowledge a write.
	ConsistencyQuorum = "quorum"

	// ConsistencyAny allows for hinted hand off, potentially no write happened yet.
	ConsistencyAny = "any"
)

Variables

This section is empty.

Functions

func CheckError added in v1.9.0

func CheckError(resp *http.Response) error

CheckError reads the http.Response and returns an error if one exists. It will automatically recognize the errors returned by Influx services and decode the error into an internal error type. If the error cannot be determined in that way, it will create a generic error message.

If there is no error, then this returns nil.

func EpochToTime added in v0.9.0

func EpochToTime(epoch int64, precision string) (time.Time, error)

EpochToTime takes a unix epoch time and uses precision to return back a time.Time

func ParseConnectionString added in v0.9.3

func ParseConnectionString(path string, ssl bool) (url.URL, error)

ParseConnectionString will parse a string to create a valid connection URL

func SetPrecision added in v0.9.0

func SetPrecision(t time.Time, precision string) time.Time

SetPrecision will round a time to the specified precision

func SplitPath added in v1.8.0

func SplitPath(v string) (string, string)

SplitPath gets the path of a url

Types

type BatchPoints added in v0.9.0

type BatchPoints struct {
	Points           []Point           `json:"points,omitempty"`
	Database         string            `json:"database,omitempty"`
	RetentionPolicy  string            `json:"retentionPolicy,omitempty"`
	Tags             map[string]string `json:"tags,omitempty"`
	Time             time.Time         `json:"time,omitempty"`
	Precision        string            `json:"precision,omitempty"`
	WriteConsistency string            `json:"-"`
}

BatchPoints is used to send batched data in a single write. Database and Points are required If no retention policy is specified, it will use the databases default retention policy. If tags are specified, they will be "merged" with all points. If a point already has that tag, it will be ignored. If time is specified, it will be applied to any point with an empty time. Precision can be specified if the time is in epoch format (integer). Valid values for Precision are n, u, ms, s, m, and h

func (*BatchPoints) UnmarshalJSON added in v0.9.0

func (bp *BatchPoints) UnmarshalJSON(b []byte) error

UnmarshalJSON decodes the data into the BatchPoints struct

type ChunkedResponse added in v0.12.0

type ChunkedResponse struct {
	// contains filtered or unexported fields
}

ChunkedResponse represents a response from the server that uses chunking to stream the output.

func NewChunkedResponse added in v0.12.0

func NewChunkedResponse(r io.Reader) *ChunkedResponse

NewChunkedResponse reads a stream and produces responses from the stream.

func (*ChunkedResponse) NextResponse added in v0.12.0

func (r *ChunkedResponse) NextResponse() (*Response, error)

NextResponse reads the next line of the stream and returns a response.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is used to make calls to the server.

func NewClient

func NewClient(c Config) (*Client, error)

NewClient will instantiate and return a connected client to issue commands to the server.

Example
package main

import (
	"fmt"
	"log"
	"net/url"
	"os"

	"github.com/influxdata/influxdb/client"
)

func main() {
	host, err := url.Parse(fmt.Sprintf("http://%s:%d", "localhost", 8086))
	if err != nil {
		log.Fatal(err)
	}

	// NOTE: this assumes you've setup a user and have setup shell env variables,
	// namely INFLUX_USER/INFLUX_PWD. If not just omit Username/Password below.
	conf := client.Config{
		URL:      *host,
		Username: os.Getenv("INFLUX_USER"),
		Password: os.Getenv("INFLUX_PWD"),
	}
	con, err := client.NewClient(conf)
	if err != nil {
		log.Fatal(err)
	}
	log.Println("Connection", con)
}
Output:

func (*Client) Addr added in v0.9.0

func (c *Client) Addr() string

Addr provides the current url as a string of the server the client is connected to.

func (*Client) Ping

func (c *Client) Ping() (time.Duration, string, error)

Ping will check to see if the server is up Ping returns how long the request took, the version of the server it connected to, and an error if one occurred.

Example
package main

import (
	"fmt"
	"log"
	"net/url"

	"github.com/influxdata/influxdb/client"
)

func main() {
	host, err := url.Parse(fmt.Sprintf("http://%s:%d", "localhost", 8086))
	if err != nil {
		log.Fatal(err)
	}
	con, err := client.NewClient(client.Config{URL: *host})
	if err != nil {
		log.Fatal(err)
	}

	dur, ver, err := con.Ping()
	if err != nil {
		log.Fatal(err)
	}
	log.Printf("Happy as a hippo! %v, %s", dur, ver)
}
Output:

func (*Client) Query

func (c *Client) Query(q Query) (*Response, error)

Query sends a command to the server and returns the Response

Example
package main

import (
	"fmt"
	"log"
	"net/url"

	"github.com/influxdata/influxdb/client"
)

func main() {
	host, err := url.Parse(fmt.Sprintf("http://%s:%d", "localhost", 8086))
	if err != nil {
		log.Fatal(err)
	}
	con, err := client.NewClient(client.Config{URL: *host})
	if err != nil {
		log.Fatal(err)
	}

	q := client.Query{
		Command:  "select count(value) from shapes",
		Database: "square_holes",
	}
	if response, err := con.Query(q); err == nil && response.Error() == nil {
		log.Println(response.Results)
	}
}
Output:

func (*Client) QueryContext added in v1.4.0

func (c *Client) QueryContext(ctx context.Context, q Query) (*Response, error)

QueryContext sends a command to the server and returns the Response It uses a context that can be cancelled by the command line client

func (*Client) QueryFlux added in v1.9.0

func (c *Client) QueryFlux(ctx context.Context, query *fluxClient.QueryRequest) (flux.ResultIterator, error)

QueryContext sends a command to the server and returns the Response It uses a context that can be cancelled by the command line client

func (*Client) SetAuth added in v0.9.0

func (c *Client) SetAuth(u, p string)

SetAuth will update the username and passwords

func (*Client) SetPrecision added in v0.9.5

func (c *Client) SetPrecision(precision string)

SetPrecision will update the precision

func (*Client) Write added in v0.9.0

func (c *Client) Write(bp BatchPoints) (*Response, error)

Write takes BatchPoints and allows for writing of multiple points with defaults If successful, error is nil and Response is nil If an error occurs, Response may contain additional information if populated.

Example
package main

import (
	"fmt"
	"log"
	"math/rand"
	"net/url"
	"strconv"
	"time"

	"github.com/influxdata/influxdb/client"
)

func main() {
	host, err := url.Parse(fmt.Sprintf("http://%s:%d", "localhost", 8086))
	if err != nil {
		log.Fatal(err)
	}
	con, err := client.NewClient(client.Config{URL: *host})
	if err != nil {
		log.Fatal(err)
	}

	var (
		shapes     = []string{"circle", "rectangle", "square", "triangle"}
		colors     = []string{"red", "blue", "green"}
		sampleSize = 1000
		pts        = make([]client.Point, sampleSize)
	)

	rand.Seed(42)
	for i := 0; i < sampleSize; i++ {
		pts[i] = client.Point{
			Measurement: "shapes",
			Tags: map[string]string{
				"color": strconv.Itoa(rand.Intn(len(colors))),
				"shape": strconv.Itoa(rand.Intn(len(shapes))),
			},
			Fields: map[string]interface{}{
				"value": rand.Intn(sampleSize),
			},
			Time:      time.Now(),
			Precision: "s",
		}
	}

	bps := client.BatchPoints{
		Points:          pts,
		Database:        "BumbeBeeTuna",
		RetentionPolicy: "default",
	}
	_, err = con.Write(bps)
	if err != nil {
		log.Fatal(err)
	}
}
Output:

func (*Client) WriteLineProtocol added in v0.9.3

func (c *Client) WriteLineProtocol(data, database, retentionPolicy, precision, writeConsistency string) (*Response, error)

WriteLineProtocol takes a string with line returns to delimit each write If successful, error is nil and Response is nil If an error occurs, Response may contain additional information if populated.

type Config added in v0.9.0

type Config struct {
	URL              url.URL
	UnixSocket       string
	Username         string
	Password         string
	UserAgent        string
	Timeout          time.Duration
	Precision        string
	WriteConsistency string
	UnsafeSsl        bool
	Proxy            func(req *http.Request) (*url.URL, error)
	TLS              *tls.Config
}

Config is used to specify what server to connect to. URL: The URL of the server connecting to. Username/Password are optional. They will be passed via basic auth if provided. UserAgent: If not provided, will default "InfluxDBClient", Timeout: If not provided, will default to 0 (no timeout)

func NewConfig added in v0.9.3

func NewConfig() Config

NewConfig will create a config to be used in connecting to the client

type Message added in v0.13.0

type Message struct {
	Level string `json:"level,omitempty"`
	Text  string `json:"text,omitempty"`
}

Message represents a user message.

type Point added in v0.9.0

type Point struct {
	Measurement string
	Tags        map[string]string
	Time        time.Time
	Fields      map[string]interface{}
	Precision   string
	Raw         string
}

Point defines the fields that will be written to the database Measurement, Time, and Fields are required Precision can be specified if the time is in epoch format (integer). Valid values for Precision are n, u, ms, s, m, and h

func (*Point) MarshalJSON added in v0.9.0

func (p *Point) MarshalJSON() ([]byte, error)

MarshalJSON will format the time in RFC3339Nano Precision is also ignored as it is only used for writing, not reading Or another way to say it is we always send back in nanosecond precision

func (*Point) MarshalString added in v0.9.0

func (p *Point) MarshalString() string

MarshalString renders string representation of a Point with specified precision. The default precision is nanoseconds.

func (*Point) UnmarshalJSON added in v0.9.0

func (p *Point) UnmarshalJSON(b []byte) error

UnmarshalJSON decodes the data into the Point struct

type Query added in v0.9.0

type Query struct {
	Command  string
	Database string

	// RetentionPolicy tells the server which retention policy to use by default.
	// This option is only effective when querying a server of version 1.6.0 or later.
	RetentionPolicy string

	// Chunked tells the server to send back chunked responses. This places
	// less load on the server by sending back chunks of the response rather
	// than waiting for the entire response all at once.
	Chunked bool

	// ChunkSize sets the maximum number of rows that will be returned per
	// chunk. Chunks are either divided based on their series or if they hit
	// the chunk size limit.
	//
	// Chunked must be set to true for this option to be used.
	ChunkSize int

	// NodeID sets the data node to use for the query results. This option only
	// has any effect in the enterprise version of the software where there can be
	// more than one data node and is primarily useful for analyzing differences in
	// data. The default behavior is to automatically select the appropriate data
	// nodes to retrieve all of the data. On a database where the number of data nodes
	// is greater than the replication factor, it is expected that setting this option
	// will only retrieve partial data.
	NodeID int
}

Query is used to send a command to the server. Both Command and Database are required.

type Response added in v0.9.0

type Response struct {
	Results []Result
	Err     error
}

Response represents a list of statement results.

func (*Response) Error added in v0.9.0

func (r *Response) Error() error

Error returns the first error from any statement. Returns nil if no errors occurred on any statements.

func (*Response) MarshalJSON added in v0.9.0

func (r *Response) MarshalJSON() ([]byte, error)

MarshalJSON encodes the response into JSON.

func (*Response) UnmarshalJSON added in v0.9.0

func (r *Response) UnmarshalJSON(b []byte) error

UnmarshalJSON decodes the data into the Response struct

type Result added in v0.9.0

type Result struct {
	Series   []models.Row
	Messages []*Message
	Err      error
}

Result represents a resultset returned from a single statement.

func (*Result) MarshalJSON added in v0.9.0

func (r *Result) MarshalJSON() ([]byte, error)

MarshalJSON encodes the result into JSON.

func (*Result) UnmarshalJSON added in v0.9.0

func (r *Result) UnmarshalJSON(b []byte) error

UnmarshalJSON decodes the data into the Result struct

Directories

Path Synopsis
Package client (v2) is the current official Go client for InfluxDB.
Package client (v2) is the current official Go client for InfluxDB.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL