Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func SetDefaults ¶
Types ¶
type BuffStrategy ¶ added in v0.51.6
type BuffStrategy string
const ( Dynamic BuffStrategy = "DYNAMIC" Static BuffStrategy = "STATIC" )
type ConfigFileBuffer ¶
type ConfigFileBuffer struct { // WaitForFlush is the time to wait for the buffer to flush used for backpressure on writers WaitForFlush time.Duration `mapstructure:"waitForFlush" json:"waitForFlush,omitempty" default:"1ms"` // MaxConcurrent is the maximum number of concurrent flushes MaxConcurrent int `mapstructure:"maxConcurrent" json:"maxConcurrent,omitempty" default:"50"` // FlushPeriodMilliseconds is the number of milliseconds before flush FlushPeriodMilliseconds int `mapstructure:"flushPeriodMilliseconds" json:"flushPeriodMilliseconds,omitempty" default:"10"` // FlushItemsThreshold is the number of items to hold in memory until flushing to the database FlushItemsThreshold int `mapstructure:"flushItemsThreshold" json:"flushItemsThreshold,omitempty" default:"100"` // SerialBuffer is a flag to determine if the buffer should be serial or bulk SerialBuffer bool `mapstructure:"serialBuffer" json:"serialBuffer,omitempty" default:"false"` // FlushStrategy is the strategy to use for flushing the buffer FlushStrategy BuffStrategy `mapstructure:"flushStrategy" json:"flushStrategy" default:"DYNAMIC"` }
ConfigFileBuffer is the configuration for the buffer. We store it here to prevent circular dependencies
type FlushResponse ¶
type IngestBuf ¶
func NewIngestBuffer ¶
func NewIngestBuffer[T any, U any](opts IngestBufOpts[T, U]) *IngestBuf[T, U]
NewIngestBuffer creates a new buffer for any type T
func (*IngestBuf[T, U]) FireAndWait ¶ added in v0.52.12
func (*IngestBuf[T, U]) FireForget ¶ added in v0.52.12
func (*IngestBuf[T, U]) StartDebugLoop ¶
func (b *IngestBuf[T, U]) StartDebugLoop()
type IngestBufOpts ¶
type IngestBufOpts[T any, U any] struct { Name string `validate:"required"` // MaxCapacity is the maximum number of items to hold in buffer before we initiate a flush MaxCapacity int `validate:"required,gt=0"` FlushPeriod time.Duration `validate:"required,gt=0"` MaxDataSizeInQueue int `validate:"required,gt=0"` OutputFunc func(ctx context.Context, items []T) ([]*U, error) `validate:"required"` SizeFunc func(T) int `validate:"required"` L *zerolog.Logger `validate:"required"` MaxConcurrent int `validate:"omitempty,gt=0"` WaitForFlush time.Duration `validate:"omitempty,gt=0"` FlushStrategy BuffStrategy `validate:"required"` }
type TenantBufManagerOpts ¶
type TenantBufManagerOpts[T any, U any] struct { Name string `validate:"required"` OutputFunc func(ctx context.Context, items []T) ([]*U, error) `validate:"required"` SizeFunc func(T) int `validate:"required"` L *zerolog.Logger `validate:"required"` V validator.Validator `validate:"required"` Config ConfigFileBuffer `validate:"required"` }
type TenantBufferManager ¶
func NewTenantBufManager ¶
func NewTenantBufManager[T any, U any](opts TenantBufManagerOpts[T, U]) (*TenantBufferManager[T, U], error)
Create a new TenantBufferManager with generic types T for input and U for output
func (*TenantBufferManager[T, U]) Cleanup ¶
func (t *TenantBufferManager[T, U]) Cleanup() error
cleanup all tenant buffers
func (*TenantBufferManager[T, U]) FireAndWait ¶ added in v0.52.12
func (t *TenantBufferManager[T, U]) FireAndWait(ctx context.Context, tenantKey string, item T) (*U, error)
func (*TenantBufferManager[T, U]) FireForget ¶ added in v0.52.12
func (t *TenantBufferManager[T, U]) FireForget(tenantKey string, item T) error
Click to show internal directories.
Click to hide internal directories.