Documentation ¶
Overview ¶
Package writer contains useful types for buffering, batching and periodically syncing writes onto a provided metric writing client.
The following example demonstrate the usage of a *writer.PointWriter. This is designed to buffer calls to Write metrics and flush them in configurable batch sizes (see WithBufferSize). It is also designed to periodically flush the buffer if a configurable duration ellapses between calls to Write. This is useful to ensure metrics are flushed to the client during a pause in their production.
Example Usage
import ( "github.com/influxdata/influxdb-client-go" "github.com/influxdata/influxdb-client-go/writer" ) func main() { var ( cli, _ = influxdb.New("http://localhost:9999", "some-token") bucket = "default" org = "influx" ) wr := writer.New(cli, bucket, org, writer.WithBufferSize(10)) wr.Write(influxdb.NewRowMetric( map[string]interface{}{ "value": 16, }, "temp_celsius", map[string]string{ "room": "living_room", }, time.Now(), ), influxdb.NewRowMetric( map[string]interface{}{ "value": 17, }, "temp_celsius", map[string]string{ "room": "living_room", }, time.Now(), )) wr.Close() }
writer.New(...) return a PointerWriter which is composed of multiple other types available in this package.
It first wraps the provided client in a *BucketWriter which takes care of ensuring all written metrics are called on the underyling client with a specific organisation and bucket. This is not safe for concurrent use.
It then wraps this writer in a *BufferedWriter and configures its buffer size accordingly. This type implements the buffering of metrics and exposes a flush method. Once the buffer size is exceed flush is called automatically. However, Flush() can be called manually on this type. This is also not safe for concurrent use.
Finally, it wraps the buffered writer in a *PointsWriter which takes care of ensuring Flush is called automatically when it hasn't been called for a configured duration. This final type is safe for concurrent use.
Automatic Retries ¶
The writer package offers automatic retry capabilities during known transient failures This is when the API being consumed reports "unavailable" or "too many requests" error conditions
import (
"time" "github.com/influxdata/influxdb-client-go" "github.com/influxdata/influxdb-client-go/writer"
)
func main() { var ( cli, _ = influxdb.New("http://localhost:9999", "some-token") bucket = "default" org = "influx" ) // construct a writer with 3 maximum attempts per call to Write and linear backoff derived from number of attempts // i.e. a first attempt is followed by a 1 second delay before the second attempt // a second attempt is followed by a 2 second delay before the third attempt var ( retryOpts = []writer.RetryOption{writer.WithMaxAttempts(3), writer.WithBackoff(writer.LinearBackoff(time.Second))} wr = writer.New(cli, bucket, org, writer.WithRetries(retryOpts...)) ) }
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BackoffFunc ¶
BackoffFunc is a function which when called with an attempt number returns a duration which should be waited for until a subsequent attempt is made
func LinearBackoff ¶
func LinearBackoff(scale time.Duration) BackoffFunc
LinearBackoff returns a BackoffFunc which when called returns attempt * scale. e.g. LinearBackoff(time.Second)(5) returns 5 seconds
type BucketMetricWriter ¶
type BucketMetricWriter interface {
Write(ctx context.Context, bucket string, org string, m ...influxdb.Metric) (int, error)
}
BucketMetricWriter is a type which Metrics can be written to a particular bucket in a particular organisation
type BucketWriter ¶
type BucketWriter struct {
// contains filtered or unexported fields
}
BucketWriter writes metrics to a particular bucket within a particular organisation
func NewBucketWriter ¶
func NewBucketWriter(w BucketMetricWriter, bucket, org string) *BucketWriter
NewBucketWriter allocates, configures and returned a new BucketWriter for writing metrics to a specific organisations bucket
func (*BucketWriter) Write ¶
func (b *BucketWriter) Write(m ...influxdb.Metric) (int, error)
Write writes the provided metrics to the underlying metrics writer using the org and bucket configured on the bucket writer
type BufferedWriter ¶
type BufferedWriter struct {
// contains filtered or unexported fields
}
BufferedWriter is a buffered implementation of the MetricsWriter interface It is unashamedly derived from the bufio pkg https://golang.org/pkg/bufio Metrics are buffered up until the buffer size is met and then flushed to an underlying MetricsWriter The writer can also be flushed manually by calling Flush BufferedWriter is not safe to be called concurrently and therefore concurrency should be managed by the caller
func NewBufferedWriter ¶
func NewBufferedWriter(w MetricsWriter) *BufferedWriter
NewBufferedWriter returns a new *BufferedWriter with the default buffer size
func NewBufferedWriterSize ¶
func NewBufferedWriterSize(w MetricsWriter, size int) *BufferedWriter
NewBufferedWriterSize returns a new *BufferedWriter with a buffer allocated with the provided size
func (*BufferedWriter) Available ¶
func (b *BufferedWriter) Available() int
Available returns how many bytes are unused in the buffer.
func (*BufferedWriter) Buffered ¶
func (b *BufferedWriter) Buffered() int
Buffered returns the number of bytes that have been written into the current buffer.
func (*BufferedWriter) Flush ¶
func (b *BufferedWriter) Flush() error
Flush writes any buffered data to the underlying MetricsWriter
func (*BufferedWriter) Write ¶
func (b *BufferedWriter) Write(m ...influxdb.Metric) (nn int, err error)
Write writes the provided metrics to the underlying buffer if there is available capacity. Otherwise it flushes the buffer and attempts to assign the remain metrics to the buffer. This process repeats until all the metrics are either flushed or in the buffer
type Config ¶
type Config struct {
// contains filtered or unexported fields
}
Config is a structure used to configure a point writer
type MetricsWriteFlusher ¶
type MetricsWriteFlusher interface { Write(m ...influxdb.Metric) (int, error) Available() int Flush() error }
MetricsWriteFlush is a type of metrics writer which is buffered and metrics can be flushed to
type MetricsWriter ¶
MetricsWriter is a type which metrics can be written to
type Option ¶
type Option func(*Config)
Option is a functional option for Configuring point writers
func WithBufferSize ¶
WithBufferSize sets the size of the underlying buffer on the point writer
func WithContext ¶ added in v0.1.3
WithContext sets the context.Context used for each flush
func WithFlushInterval ¶
WithFlushInterval sets the flush interval on the writer The point writer will wait at least this long between flushes of the undeyling buffered writer
func WithRetries ¶
func WithRetries(options ...RetryOption) Option
WithRetries configures automatic retry behavior on specific transient error conditions when attempting to Write metrics to a client
type Options ¶
type Options []Option
Options is a slice of Option
type PointWriter ¶
type PointWriter struct {
// contains filtered or unexported fields
}
PointWriter delegates calls to Write to an underlying flushing writer implementation. It also periodically calls flush on the underlying writer and is safe to be called concurrently. As the flushing writer can also flush on calls to Write when the number of metrics being written exceeds the buffer capacity, it also ensures to reset its timer in this scenario as to avoid calling flush multiple times
func New ¶
func New(writer BucketMetricWriter, bkt, org string, opts ...Option) *PointWriter
New constructs a point writer with an underlying buffer from the provided BucketMetricWriter The writer will flushed metrics to the underlying BucketMetricWriter when the buffer is full or the configured flush interval ellapses without a flush occuring
func NewPointWriter ¶
func NewPointWriter(w MetricsWriteFlusher, flushInterval time.Duration) *PointWriter
NewPointWriter configures and returns a *PointWriter writer type The new writer will automatically begin scheduling periodic flushes based on the provided duration
func (*PointWriter) Close ¶
func (p *PointWriter) Close() error
Close signals to stop flushing metrics and causes subsequent calls to Write to return a closed pipe error Close returns once scheduledge flushing has stopped Close does a final flush on return and returns any error from the final flush if it occurs
func (*PointWriter) Write ¶
func (p *PointWriter) Write(m ...influxdb.Metric) (int, error)
Write delegates to an underlying metrics writer If the delegating call is going to cause a flush, it signals to the schduled periodic flush to reset its timer
type RetryOption ¶
type RetryOption func(*RetryWriter)
RetryOption is a functional option for the RetryWriters type
func WithBackoff ¶
func WithBackoff(fn BackoffFunc) RetryOption
WithBackoff sets of the BackoffFunc on the RetryWriter
func WithMaxAttempts ¶
func WithMaxAttempts(maxAttempts int) RetryOption
WithMaxAttempts sets the maximum number of attempts for a Write operation attempt
func WithRetrySleepLimit ¶ added in v0.2.0
func WithRetrySleepLimit(retrySleepLimit int) RetryOption
WithRetrySleepLimit sets the retry sleep limit. This optiona allows us to abort retry sleeps past some number of seconds.
type RetryWriter ¶
type RetryWriter struct { MetricsWriter // contains filtered or unexported fields }
RetryWriter is a metrics writers which decorates other metrics writer implementations and automatically retries attempts to write metrics under certain error conditions
func NewRetryWriter ¶
func NewRetryWriter(w MetricsWriter, opts ...RetryOption) *RetryWriter
NewRetryWriter returns a configured *RetryWriter which decorates the supplied MetricsWriter
func (*RetryWriter) Write ¶
func (r *RetryWriter) Write(m ...influxdb.Metric) (n int, err error)
Write delegates to underlying MetricsWriter and then automatically retries when errors occur