Documentation
¶
Overview ¶
Package batching provides a batcher to collect points and emit them as batches.
Example (Batcher) ¶
package main import ( "context" "fmt" "log" "math/rand" "time" "github.com/InfluxCommunity/influxdb3-go/v2/influxdb3" "github.com/InfluxCommunity/influxdb3-go/v2/influxdb3/batching" ) func main() { // Create a random number generator r := rand.New(rand.NewSource(456)) // Instantiate a client using your credentials. client, err := influxdb3.NewFromEnv() if err != nil { log.Fatal(err) } // Close the client when finished and raise any errors. defer client.Close() // Synchronous use // Create a Batcher with a size of 5 b := batching.NewBatcher(batching.WithSize(5)) // Simulate delay of a second t := time.Now().Add(-54 * time.Second) // Write 54 points synchronously to the batcher for range 54 { p := influxdb3.NewPoint("stat", map[string]string{"location": "Paris"}, map[string]any{ "temperature": 15 + r.Float64()*20, "humidity": 30 + r.Int63n(40), }, t) // Add the point to the batcher b.Add(p) // Update time t = t.Add(time.Second) // If the batcher is ready, write the batch to the client and reset the batcher if b.Ready() { err := client.WritePoints(context.Background(), b.Emit()) if err != nil { log.Fatal(err) } } } // Write the final batch to the client err = client.WritePoints(context.Background(), b.Emit()) if err != nil { panic(err) } // Asynchronous use // Create a batcher with a size of 5, a ready callback and an emit callback to write the batch to the client b = batching.NewBatcher( batching.WithSize(5), batching.WithReadyCallback(func() { fmt.Println("ready") }), batching.WithEmitCallback(func(points []*influxdb3.Point) { err = client.WritePoints(context.Background(), points) if err != nil { log.Fatal(err) } }), ) // Simulate delay of a second t = time.Now().Add(-54 * time.Second) // Write 54 points synchronously to the batcher for range 54 { p := influxdb3.NewPoint("stat", map[string]string{"location": "Madrid"}, map[string]any{ "temperature": 15 + r.Float64()*20, "humidity": 30 + r.Int63n(40), }, t) // Add the point to the batcher b.Add(p) // Update time t = t.Add(time.Second) } // Write the final batch to the client err = client.WritePoints(context.Background(), b.Emit()) if err != nil { log.Fatal(err) } }
Output:
Example (LineProtocol_batcher) ¶
package main import ( "context" "fmt" "log" "math/rand" "time" "github.com/InfluxCommunity/influxdb3-go/v2/influxdb3" "github.com/InfluxCommunity/influxdb3-go/v2/influxdb3/batching" ) func main() { // Create a random number generator rnd := rand.New(rand.NewSource(time.Now().UnixNano())) // initialize data dataTemplate := "cpu,host=%s load=%.3f,reg=%d %d" syncHosts := []string{"r2d2", "c3po", "robbie"} const recordCount = 200 var wErr error // Instantiate a client using your credentials. client, err := influxdb3.NewFromEnv() if err != nil { log.Fatal(err) } defer func(client *influxdb3.Client) { err = client.Close() if err != nil { log.Fatal(err) } }(client) // SYNCHRONOUS USAGE // create a new Line Protocol Batcher with a batch size of 4096 bytes slpb := batching.NewLPBatcher(batching.WithBufferSize(4096)) // Set buffer size // Simulate delay of a second t := time.Now().Add(-recordCount * time.Second) // create and emit records for range recordCount { slpb.Add(fmt.Sprintf(dataTemplate, syncHosts[rnd.Intn(len(syncHosts))], rnd.Float64()*150, rnd.Intn(32), t)) t = t.Add(time.Second) if slpb.Ready() { wErr = client.Write(context.Background(), slpb.Emit()) if wErr != nil { log.Fatal(wErr) } } } // write any remaining records in batcher to client wErr = client.Write(context.Background(), slpb.Emit()) if wErr != nil { log.Fatal(wErr) } // ASYNCHRONOUS USAGE asyncHosts := []string{"Z80", "C64", "i8088"} // create a new Line Protocol Batcher with a batch size of 4096 bytes // ... a callback to handle when ready state reached and // ... a callback to handle emits of bytes alpb := batching.NewLPBatcher(batching.WithBufferSize(4096), batching.WithByteEmitReadyCallback(func() { fmt.Println("ready") }), batching.WithEmitBytesCallback(func(bytes []byte) { wErr := client.Write(context.Background(), bytes) if wErr != nil { log.Fatal(wErr) } })) // Simulate delay of a second t = time.Now().Add(-recordCount * time.Second) // create and add data to the batcher for range recordCount { alpb.Add(fmt.Sprintf(dataTemplate, asyncHosts[rnd.Intn(len(asyncHosts))], rnd.Float64()*150, rnd.Intn(32), t)) // update time t = t.Add(time.Second) } // write any remaining records in batcher to client wErr = client.Write(context.Background(), alpb.Emit()) if wErr != nil { log.Fatal(wErr) } }
Output:
Index ¶
- Constants
- type Batcher
- func (b *Batcher) Add(p ...*influxdb3.Point)
- func (b *Batcher) CurrentLoadSize() int
- func (b *Batcher) Emit() []*influxdb3.Point
- func (b *Batcher) Flush() []*influxdb3.Point
- func (b *Batcher) Ready() bool
- func (b *Batcher) SetCapacity(c int)
- func (b *Batcher) SetEmitCallback(f func([]*influxdb3.Point))
- func (b *Batcher) SetReadyCallback(f func())
- func (b *Batcher) SetSize(s int)
- type ByteEmittable
- type Emittable
- type LPBatcher
- func (lpb *LPBatcher) Add(lines ...string)
- func (lpb *LPBatcher) CurrentLoadSize() int
- func (lpb *LPBatcher) Emit() []byte
- func (lpb *LPBatcher) Flush() []byte
- func (lpb *LPBatcher) Ready() bool
- func (lpb *LPBatcher) SetCapacity(c int)
- func (lpb *LPBatcher) SetEmitBytesCallback(f func([]byte))
- func (lpb *LPBatcher) SetReadyCallback(f func())
- func (lpb *LPBatcher) SetSize(s int)
- type LPOption
- type Option
- type PointEmittable
Examples ¶
Constants ¶
const DefaultBatchSize = 1000
DefaultBatchSize is the default number of points emitted
const DefaultBufferCapacity = DefaultByteBatchSize * 2
const DefaultByteBatchSize = 100000
const DefaultCapacity = 2 * DefaultBatchSize
DefaultCapacity is the default initial capacity of the point buffer
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Batcher ¶
Batcher collects points and emits them as batches
func NewBatcher ¶
NewBatcher creates and initializes a new Batcher instance applying the specified options. By default, a batch-size is DefaultBatchSize and the initial capacity is DefaultCapacity.
func (*Batcher) CurrentLoadSize ¶
func (*Batcher) Emit ¶
Emit returns a new batch of points with the provided batch size or with the remaining points. Please drain the points at the end of your processing to get the remaining points not filling up a batch.
func (*Batcher) Flush ¶
Flush drains all points even if the internal buffer is currently larger than size. It does not call the callbackEmit method
func (*Batcher) SetCapacity ¶
SetCapacity sets the initial Capacity of the internal []*influxdb3.Point buffer.
func (*Batcher) SetEmitCallback ¶
SetEmitCallback sets the callbackEmit function.
func (*Batcher) SetReadyCallback ¶
func (b *Batcher) SetReadyCallback(f func())
SetReadyCallback sets the callbackReady function.
type ByteEmittable ¶
type ByteEmittable interface { Emittable SetEmitBytesCallback(ebcb func([]byte)) // callback for emitting bytes }
ByteEmittable provides the basis for a type Emitting line protocol data as a byte array (i.e. []byte).
type Emittable ¶
type Emittable interface { SetSize(s int) // setsize SetCapacity(c int) // set capacity SetReadyCallback(rcb func()) // ready Callback }
Emittable provides the base for any type that will collect and then emit data upon reaching a ready state.
type LPBatcher ¶
LPBatcher collects line protocol strings storing them to a byte buffer and then emitting them as []byte.
func NewLPBatcher ¶
NewLPBatcher creates and initializes a new LPBatcher instance applying the supplied options. By default a batch size is DefaultByteBatchSize and the initial capacity is the DefaultBufferCapacity.
func (*LPBatcher) Add ¶
Add lines to the buffer and call appropriate callbacks when the ready state is reached.
func (*LPBatcher) CurrentLoadSize ¶
CurrentLoadSize returns the current size of the internal buffer
func (*LPBatcher) Emit ¶
Emit returns a new batch of bytes with upto to the provided batch size depending on when the last newline character in the potential batch is met, or with all the remaining bytes. Please drain the bytes at the end of your processing to get the remaining bytes not filling up a batch.
func (*LPBatcher) SetCapacity ¶
SetCapacity sets the initial capacity of the internal buffer
func (*LPBatcher) SetEmitBytesCallback ¶
SetEmitBytesCallback sets the callbackByteEmit function
func (*LPBatcher) SetReadyCallback ¶
func (lpb *LPBatcher) SetReadyCallback(f func())
SetReadyCallback sets the ReadyCallback function
type LPOption ¶
type LPOption func(ByteEmittable)
func WithBufferCapacity ¶
WithBufferCapacity changes the initial capacity of the internal buffer The unit is byte
func WithBufferSize ¶
WithBufferSize changes the batch-size emitted by the LPbatcher The unit is byte
func WithByteEmitReadyCallback ¶
func WithByteEmitReadyCallback(f func()) LPOption
WithByteEmitReadyCallback sets the function called when a new batch is ready. The batcher will wait for the callback to finish, so please return as fast as possible and move long-running processing to a go-routine.
func WithEmitBytesCallback ¶
WithEmitBytesCallback sets the function called when a new batch is ready with the batch bytes. The batcher will wait for the callback to finish, so please return as quickly as possible and move any long-running processing to a go routine.
type Option ¶
type Option func(PointEmittable)
func WithCapacity ¶
WithCapacity changes the initial capacity of the internal buffer
func WithEmitCallback ¶
WithEmitCallback sets the function called when a new batch is ready with the batch of points. The batcher will wait for the callback to finish, so please return as fast as possible and move long-running processing to a go-routine.
func WithReadyCallback ¶
func WithReadyCallback(f func()) Option
WithReadyCallback sets the function called when a new batch is ready. The batcher will wait for the callback to finish, so please return as fast as possible and move long-running processing to a go-routine.
type PointEmittable ¶
type PointEmittable interface { Emittable SetEmitCallback(epcb func([]*influxdb3.Point)) // callback for emitting points }
PointEmittable provides the basis for any type emitting Point arrays as []*influxdb3.Point