batching

package
v2.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 13, 2024 License: MIT Imports: 5 Imported by: 0

Documentation

Overview

Package batching provides a batcher to collect points and emit them as batches.

Example (Batcher)
package main

import (
	"context"
	"fmt"
	"log"
	"math/rand"
	"time"

	"github.com/InfluxCommunity/influxdb3-go/v2/influxdb3"
	"github.com/InfluxCommunity/influxdb3-go/v2/influxdb3/batching"
)

func main() {
	// Create a random number generator
	r := rand.New(rand.NewSource(456))

	// Instantiate a client using your credentials.
	client, err := influxdb3.NewFromEnv()
	if err != nil {
		log.Fatal(err)
	}

	// Close the client when finished and raise any errors.
	defer client.Close()

	// Synchronous use

	// Create a Batcher with a size of 5
	b := batching.NewBatcher(batching.WithSize(5))

	// Simulate delay of a second
	t := time.Now().Add(-54 * time.Second)

	// Write 54 points synchronously to the batcher
	for range 54 {
		p := influxdb3.NewPoint("stat",
			map[string]string{"location": "Paris"},
			map[string]any{
				"temperature": 15 + r.Float64()*20,
				"humidity":    30 + r.Int63n(40),
			},
			t)

		// Add the point to the batcher
		b.Add(p)
		// Update time
		t = t.Add(time.Second)

		// If the batcher is ready, write the batch to the client and reset the batcher
		if b.Ready() {
			err := client.WritePoints(context.Background(), b.Emit())
			if err != nil {
				log.Fatal(err)
			}
		}
	}

	// Write the final batch to the client
	err = client.WritePoints(context.Background(), b.Emit())
	if err != nil {
		panic(err)
	}

	// Asynchronous use

	// Create a batcher with a size of 5, a ready callback and an emit callback to write the batch to the client
	b = batching.NewBatcher(
		batching.WithSize(5),
		batching.WithReadyCallback(func() { fmt.Println("ready") }),
		batching.WithEmitCallback(func(points []*influxdb3.Point) {
			err = client.WritePoints(context.Background(), points)
			if err != nil {
				log.Fatal(err)
			}
		}),
	)

	// Simulate delay of a second
	t = time.Now().Add(-54 * time.Second)

	// Write 54 points synchronously to the batcher
	for range 54 {
		p := influxdb3.NewPoint("stat",
			map[string]string{"location": "Madrid"},
			map[string]any{
				"temperature": 15 + r.Float64()*20,
				"humidity":    30 + r.Int63n(40),
			},
			t)

		// Add the point to the batcher
		b.Add(p)
		// Update time
		t = t.Add(time.Second)
	}

	// Write the final batch to the client
	err = client.WritePoints(context.Background(), b.Emit())
	if err != nil {
		log.Fatal(err)
	}
}
Output:

Example (LineProtocol_batcher)
package main

import (
	"context"
	"fmt"
	"log"
	"math/rand"
	"time"

	"github.com/InfluxCommunity/influxdb3-go/v2/influxdb3"
	"github.com/InfluxCommunity/influxdb3-go/v2/influxdb3/batching"
)

func main() {
	// Create a random number generator
	rnd := rand.New(rand.NewSource(time.Now().UnixNano()))

	// initialize data
	dataTemplate := "cpu,host=%s load=%.3f,reg=%d %d"
	syncHosts := []string{"r2d2", "c3po", "robbie"}
	const recordCount = 200

	var wErr error

	// Instantiate a client using your credentials.
	client, err := influxdb3.NewFromEnv()
	if err != nil {
		log.Fatal(err)
	}
	defer func(client *influxdb3.Client) {
		err = client.Close()
		if err != nil {
			log.Fatal(err)
		}
	}(client)

	// SYNCHRONOUS USAGE
	// create a new Line Protocol Batcher with a batch size of 4096 bytes
	slpb := batching.NewLPBatcher(batching.WithBufferSize(4096)) // Set buffer size

	// Simulate delay of a second
	t := time.Now().Add(-recordCount * time.Second)

	// create and emit records
	for range recordCount {
		slpb.Add(fmt.Sprintf(dataTemplate,
			syncHosts[rnd.Intn(len(syncHosts))],
			rnd.Float64()*150,
			rnd.Intn(32),
			t))

		t = t.Add(time.Second)

		if slpb.Ready() {
			wErr = client.Write(context.Background(), slpb.Emit())
			if wErr != nil {
				log.Fatal(wErr)
			}
		}
	}

	// write any remaining records in batcher to client
	wErr = client.Write(context.Background(), slpb.Emit())
	if wErr != nil {
		log.Fatal(wErr)
	}

	// ASYNCHRONOUS USAGE
	asyncHosts := []string{"Z80", "C64", "i8088"}
	// create a new Line Protocol Batcher with a batch size of 4096 bytes
	// ... a callback to handle when ready state reached and
	// ... a callback to handle emits of bytes
	alpb := batching.NewLPBatcher(batching.WithBufferSize(4096),
		batching.WithByteEmitReadyCallback(func() { fmt.Println("ready") }),
		batching.WithEmitBytesCallback(func(bytes []byte) {
			wErr := client.Write(context.Background(), bytes)
			if wErr != nil {
				log.Fatal(wErr)
			}
		}))

	// Simulate delay of a second
	t = time.Now().Add(-recordCount * time.Second)

	// create and add data to the batcher
	for range recordCount {
		alpb.Add(fmt.Sprintf(dataTemplate,
			asyncHosts[rnd.Intn(len(asyncHosts))],
			rnd.Float64()*150,
			rnd.Intn(32),
			t))

		// update time
		t = t.Add(time.Second)
	}

	// write any remaining records in batcher to client
	wErr = client.Write(context.Background(), alpb.Emit())
	if wErr != nil {
		log.Fatal(wErr)
	}
}
Output:

Index

Examples

Constants

View Source
const DefaultBatchSize = 1000

DefaultBatchSize is the default number of points emitted

View Source
const DefaultBufferCapacity = DefaultByteBatchSize * 2
View Source
const DefaultByteBatchSize = 100000
View Source
const DefaultCapacity = 2 * DefaultBatchSize

DefaultCapacity is the default initial capacity of the point buffer

Variables

This section is empty.

Functions

This section is empty.

Types

type Batcher

type Batcher struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Batcher collects points and emits them as batches

func NewBatcher

func NewBatcher(options ...Option) *Batcher

NewBatcher creates and initializes a new Batcher instance applying the specified options. By default, a batch-size is DefaultBatchSize and the initial capacity is DefaultCapacity.

func (*Batcher) Add

func (b *Batcher) Add(p ...*influxdb3.Point)

Add metric(s) to the batcher and call the given callbacks if any

func (*Batcher) CurrentLoadSize

func (b *Batcher) CurrentLoadSize() int

func (*Batcher) Emit

func (b *Batcher) Emit() []*influxdb3.Point

Emit returns a new batch of points with the provided batch size or with the remaining points. Please drain the points at the end of your processing to get the remaining points not filling up a batch.

func (*Batcher) Flush

func (b *Batcher) Flush() []*influxdb3.Point

Flush drains all points even if the internal buffer is currently larger than size. It does not call the callbackEmit method

func (*Batcher) Ready

func (b *Batcher) Ready() bool

Ready tells the call if a new batch is ready to be emitted

func (*Batcher) SetCapacity

func (b *Batcher) SetCapacity(c int)

SetCapacity sets the initial Capacity of the internal []*influxdb3.Point buffer.

func (*Batcher) SetEmitCallback

func (b *Batcher) SetEmitCallback(f func([]*influxdb3.Point))

SetEmitCallback sets the callbackEmit function.

func (*Batcher) SetReadyCallback

func (b *Batcher) SetReadyCallback(f func())

SetReadyCallback sets the callbackReady function.

func (*Batcher) SetSize

func (b *Batcher) SetSize(s int)

SetSize sets the batch size. Units are Points.

type ByteEmittable

type ByteEmittable interface {
	Emittable
	SetEmitBytesCallback(ebcb func([]byte)) // callback for emitting bytes
}

ByteEmittable provides the basis for a type Emitting line protocol data as a byte array (i.e. []byte).

type Emittable

type Emittable interface {
	SetSize(s int)               // setsize
	SetCapacity(c int)           // set capacity
	SetReadyCallback(rcb func()) // ready Callback
}

Emittable provides the base for any type that will collect and then emit data upon reaching a ready state.

type LPBatcher

type LPBatcher struct {
	sync.Mutex
	// contains filtered or unexported fields
}

LPBatcher collects line protocol strings storing them to a byte buffer and then emitting them as []byte.

func NewLPBatcher

func NewLPBatcher(options ...LPOption) *LPBatcher

NewLPBatcher creates and initializes a new LPBatcher instance applying the supplied options. By default a batch size is DefaultByteBatchSize and the initial capacity is the DefaultBufferCapacity.

func (*LPBatcher) Add

func (lpb *LPBatcher) Add(lines ...string)

Add lines to the buffer and call appropriate callbacks when the ready state is reached.

func (*LPBatcher) CurrentLoadSize

func (lpb *LPBatcher) CurrentLoadSize() int

CurrentLoadSize returns the current size of the internal buffer

func (*LPBatcher) Emit

func (lpb *LPBatcher) Emit() []byte

Emit returns a new batch of bytes with upto to the provided batch size depending on when the last newline character in the potential batch is met, or with all the remaining bytes. Please drain the bytes at the end of your processing to get the remaining bytes not filling up a batch.

func (*LPBatcher) Flush

func (lpb *LPBatcher) Flush() []byte

Flush drains all bytes even if buffer currently larger than size

func (*LPBatcher) Ready

func (lpb *LPBatcher) Ready() bool

Ready reports when the ready state is reached.

func (*LPBatcher) SetCapacity

func (lpb *LPBatcher) SetCapacity(c int)

SetCapacity sets the initial capacity of the internal buffer

func (*LPBatcher) SetEmitBytesCallback

func (lpb *LPBatcher) SetEmitBytesCallback(f func([]byte))

SetEmitBytesCallback sets the callbackByteEmit function

func (*LPBatcher) SetReadyCallback

func (lpb *LPBatcher) SetReadyCallback(f func())

SetReadyCallback sets the ReadyCallback function

func (*LPBatcher) SetSize

func (lpb *LPBatcher) SetSize(s int)

SetSize sets the batch size of the batcher

type LPOption

type LPOption func(ByteEmittable)

func WithBufferCapacity

func WithBufferCapacity(capacity int) LPOption

WithBufferCapacity changes the initial capacity of the internal buffer The unit is byte

func WithBufferSize

func WithBufferSize(size int) LPOption

WithBufferSize changes the batch-size emitted by the LPbatcher The unit is byte

func WithByteEmitReadyCallback

func WithByteEmitReadyCallback(f func()) LPOption

WithByteEmitReadyCallback sets the function called when a new batch is ready. The batcher will wait for the callback to finish, so please return as fast as possible and move long-running processing to a go-routine.

func WithEmitBytesCallback

func WithEmitBytesCallback(f func([]byte)) LPOption

WithEmitBytesCallback sets the function called when a new batch is ready with the batch bytes. The batcher will wait for the callback to finish, so please return as quickly as possible and move any long-running processing to a go routine.

type Option

type Option func(PointEmittable)

func WithCapacity

func WithCapacity(capacity int) Option

WithCapacity changes the initial capacity of the internal buffer

func WithEmitCallback

func WithEmitCallback(f func([]*influxdb3.Point)) Option

WithEmitCallback sets the function called when a new batch is ready with the batch of points. The batcher will wait for the callback to finish, so please return as fast as possible and move long-running processing to a go-routine.

func WithReadyCallback

func WithReadyCallback(f func()) Option

WithReadyCallback sets the function called when a new batch is ready. The batcher will wait for the callback to finish, so please return as fast as possible and move long-running processing to a go-routine.

func WithSize

func WithSize(size int) Option

WithSize changes the batch-size emitted by the batcher

type PointEmittable

type PointEmittable interface {
	Emittable
	SetEmitCallback(epcb func([]*influxdb3.Point)) // callback for emitting points
}

PointEmittable provides the basis for any type emitting Point arrays as []*influxdb3.Point

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL