buffer

package
v0.53.4-alpha.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 7, 2025 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Fibonacci added in v0.51.6

func Fibonacci(n int) int

func SetDefaults

func SetDefaults(flushPeriodMilliseconds int, flushItemsThreshold int)

Types

type BuffStrategy added in v0.51.6

type BuffStrategy string
const (
	Dynamic BuffStrategy = "DYNAMIC"
	Static  BuffStrategy = "STATIC"
)

type ConfigFileBuffer

type ConfigFileBuffer struct {

	// WaitForFlush is the time to wait for the buffer to flush used for backpressure on writers
	WaitForFlush time.Duration `mapstructure:"waitForFlush" json:"waitForFlush,omitempty" default:"1ms"`

	// MaxConcurrent is the maximum number of concurrent flushes
	MaxConcurrent int `mapstructure:"maxConcurrent" json:"maxConcurrent,omitempty" default:"50"`

	// FlushPeriodMilliseconds is the number of milliseconds before flush
	FlushPeriodMilliseconds int `mapstructure:"flushPeriodMilliseconds" json:"flushPeriodMilliseconds,omitempty" default:"10"`

	// FlushItemsThreshold is the number of items to hold in memory until flushing to the database
	FlushItemsThreshold int `mapstructure:"flushItemsThreshold" json:"flushItemsThreshold,omitempty" default:"100"`

	// SerialBuffer is a flag to determine if the buffer should be serial or bulk
	SerialBuffer bool `mapstructure:"serialBuffer" json:"serialBuffer,omitempty" default:"false"`

	// FlushStrategy is the strategy to use for flushing the buffer
	FlushStrategy BuffStrategy `mapstructure:"flushStrategy" json:"flushStrategy" default:"DYNAMIC"`
}

ConfigFileBuffer is the configuration for the buffer. We store it here to prevent circular dependencies

type FlushResponse

type FlushResponse[U any] struct {
	Result *U
	Err    error
}

type IngestBuf

type IngestBuf[T any, U any] struct {
	// contains filtered or unexported fields
}

func NewIngestBuffer

func NewIngestBuffer[T any, U any](opts IngestBufOpts[T, U]) *IngestBuf[T, U]

NewIngestBuffer creates a new buffer for any type T

func (*IngestBuf[T, U]) FireAndWait added in v0.52.12

func (b *IngestBuf[T, U]) FireAndWait(ctx context.Context, item T) (*U, error)

func (*IngestBuf[T, U]) FireForget added in v0.52.12

func (b *IngestBuf[T, U]) FireForget(item T) error

func (*IngestBuf[T, U]) Start

func (b *IngestBuf[T, U]) Start() (func() error, error)

func (*IngestBuf[T, U]) StartDebugLoop

func (b *IngestBuf[T, U]) StartDebugLoop()

type IngestBufOpts

type IngestBufOpts[T any, U any] struct {
	Name string `validate:"required"`
	// MaxCapacity is the maximum number of items to hold in buffer before we initiate a flush
	MaxCapacity        int                                                `validate:"required,gt=0"`
	FlushPeriod        time.Duration                                      `validate:"required,gt=0"`
	MaxDataSizeInQueue int                                                `validate:"required,gt=0"`
	OutputFunc         func(ctx context.Context, items []T) ([]*U, error) `validate:"required"`
	SizeFunc           func(T) int                                        `validate:"required"`
	L                  *zerolog.Logger                                    `validate:"required"`
	MaxConcurrent      int                                                `validate:"omitempty,gt=0"`
	WaitForFlush       time.Duration                                      `validate:"omitempty,gt=0"`
	FlushStrategy      BuffStrategy                                       `validate:"required"`
}

type TenantBufManagerOpts

type TenantBufManagerOpts[T any, U any] struct {
	Name       string                                             `validate:"required"`
	OutputFunc func(ctx context.Context, items []T) ([]*U, error) `validate:"required"`
	SizeFunc   func(T) int                                        `validate:"required"`
	L          *zerolog.Logger                                    `validate:"required"`
	V          validator.Validator                                `validate:"required"`
	Config     ConfigFileBuffer                                   `validate:"required"`
}

type TenantBufferManager

type TenantBufferManager[T any, U any] struct {
	// contains filtered or unexported fields
}

func NewTenantBufManager

func NewTenantBufManager[T any, U any](opts TenantBufManagerOpts[T, U]) (*TenantBufferManager[T, U], error)

Create a new TenantBufferManager with generic types T for input and U for output

func (*TenantBufferManager[T, U]) Cleanup

func (t *TenantBufferManager[T, U]) Cleanup() error

cleanup all tenant buffers

func (*TenantBufferManager[T, U]) FireAndWait added in v0.52.12

func (t *TenantBufferManager[T, U]) FireAndWait(ctx context.Context, tenantKey string, item T) (*U, error)

func (*TenantBufferManager[T, U]) FireForget added in v0.52.12

func (t *TenantBufferManager[T, U]) FireForget(tenantKey string, item T) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL