Documentation ¶
Index ¶
- func SetDefaults(flushPeriodMilliseconds int, flushItemsThreshold int)
- type BulkEventWriter
- type BulkQueueStepRunOpts
- type BulkSemaphoreReleaser
- type BulkStepRunQueuer
- type ConfigFileBuffer
- type FlushResponse
- type IngestBuf
- type IngestBufOpts
- type SemaphoreReleaseOpts
- type TenantBufManagerOpts
- type TenantBufferManager
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func SetDefaults ¶
Types ¶
type BulkEventWriter ¶
type BulkEventWriter struct { *TenantBufferManager[*repository.CreateStepRunEventOpts, int] // contains filtered or unexported fields }
func NewBulkEventWriter ¶
func NewBulkEventWriter(pool *pgxpool.Pool, v validator.Validator, l *zerolog.Logger, conf ConfigFileBuffer) (*BulkEventWriter, error)
func (*BulkEventWriter) BulkWriteStepRunEvents ¶
func (w *BulkEventWriter) BulkWriteStepRunEvents(ctx context.Context, opts []*repository.CreateStepRunEventOpts) ([]int, error)
func (*BulkEventWriter) Cleanup ¶
func (w *BulkEventWriter) Cleanup() error
func (*BulkEventWriter) SerialWriteStepRunEvent ¶ added in v0.50.2
func (w *BulkEventWriter) SerialWriteStepRunEvent(ctx context.Context, opts []*repository.CreateStepRunEventOpts) ([]int, error)
type BulkQueueStepRunOpts ¶
type BulkQueueStepRunOpts struct { *dbsqlc.GetStepRunForEngineRow Priority int IsRetry bool Input []byte }
type BulkSemaphoreReleaser ¶
type BulkSemaphoreReleaser struct { *TenantBufferManager[SemaphoreReleaseOpts, pgtype.UUID] // contains filtered or unexported fields }
func NewBulkSemaphoreReleaser ¶
func NewBulkSemaphoreReleaser(pool *pgxpool.Pool, v validator.Validator, l *zerolog.Logger, conf ConfigFileBuffer) (*BulkSemaphoreReleaser, error)
func (*BulkSemaphoreReleaser) BulkReleaseSemaphores ¶
func (w *BulkSemaphoreReleaser) BulkReleaseSemaphores(ctx context.Context, opts []SemaphoreReleaseOpts) ([]pgtype.UUID, error)
func (*BulkSemaphoreReleaser) Cleanup ¶
func (w *BulkSemaphoreReleaser) Cleanup() error
type BulkStepRunQueuer ¶
type BulkStepRunQueuer struct { *TenantBufferManager[BulkQueueStepRunOpts, pgtype.UUID] // contains filtered or unexported fields }
func NewBulkStepRunQueuer ¶
func NewBulkStepRunQueuer(pool *pgxpool.Pool, v validator.Validator, l *zerolog.Logger, conf ConfigFileBuffer) (*BulkStepRunQueuer, error)
func (*BulkStepRunQueuer) BulkQueueStepRuns ¶
func (w *BulkStepRunQueuer) BulkQueueStepRuns(ctx context.Context, opts []BulkQueueStepRunOpts) ([]pgtype.UUID, error)
func (*BulkStepRunQueuer) Cleanup ¶
func (w *BulkStepRunQueuer) Cleanup() error
type ConfigFileBuffer ¶
type ConfigFileBuffer struct { // WaitForFlush is the time to wait for the buffer to flush used for backpressure on writers WaitForFlush time.Duration `mapstructure:"waitForFlush" json:"waitForFlush,omitempty" default:"1ms"` // MaxConcurrent is the maximum number of concurrent flushes MaxConcurrent int `mapstructure:"maxConcurrent" json:"maxConcurrent,omitempty" default:"50"` // FlushPeriodMilliseconds is the number of milliseconds before flush FlushPeriodMilliseconds int `mapstructure:"flushPeriodMilliseconds" json:"flushPeriodMilliseconds,omitempty" default:"10"` // FlushItemsThreshold is the number of items to hold in memory until flushing to the database FlushItemsThreshold int `mapstructure:"flushItemsThreshold" json:"flushItemsThreshold,omitempty" default:"100"` // SerialBuffer is a flag to determine if the buffer should be serial or bulk SerialBuffer bool `mapstructure:"serialBuffer" json:"serialBuffer,omitempty" default:"false"` }
ConfigFileBuffer is the configuration for the buffer. We store it here to prevent circular dependencies
type FlushResponse ¶
type IngestBuf ¶
func NewIngestBuffer ¶
func NewIngestBuffer[T any, U any](opts IngestBufOpts[T, U]) *IngestBuf[T, U]
NewIngestBuffer creates a new buffer for any type T
func (*IngestBuf[T, U]) BuffItem ¶
func (b *IngestBuf[T, U]) BuffItem(item T) (chan *FlushResponse[U], error)
func (*IngestBuf[T, U]) StartDebugLoop ¶
func (b *IngestBuf[T, U]) StartDebugLoop()
type IngestBufOpts ¶
type IngestBufOpts[T any, U any] struct { Name string `validate:"required"` // MaxCapacity is the maximum number of items to hold in buffer before we initiate a flush MaxCapacity int `validate:"required,gt=0"` FlushPeriod time.Duration `validate:"required,gt=0"` MaxDataSizeInQueue int `validate:"required,gt=0"` OutputFunc func(ctx context.Context, items []T) ([]U, error) `validate:"required"` SizeFunc func(T) int `validate:"required"` L *zerolog.Logger `validate:"required"` MaxConcurrent int `validate:"omitempty,gt=0"` WaitForFlush time.Duration `validate:"omitempty,gt=0"` }
type SemaphoreReleaseOpts ¶
type TenantBufManagerOpts ¶
type TenantBufManagerOpts[T any, U any] struct { Name string `validate:"required"` OutputFunc func(ctx context.Context, items []T) ([]U, error) `validate:"required"` SizeFunc func(T) int `validate:"required"` L *zerolog.Logger `validate:"required"` V validator.Validator `validate:"required"` Config ConfigFileBuffer `validate:"required"` }
type TenantBufferManager ¶
func NewTenantBufManager ¶
func NewTenantBufManager[T any, U any](opts TenantBufManagerOpts[T, U]) (*TenantBufferManager[T, U], error)
Create a new TenantBufferManager with generic types T for input and U for output
func (*TenantBufferManager[T, U]) BuffItem ¶
func (t *TenantBufferManager[T, U]) BuffItem(tenantKey string, eventOps T) (chan *FlushResponse[U], error)
func (*TenantBufferManager[T, U]) Cleanup ¶
func (t *TenantBufferManager[T, U]) Cleanup() error
cleanup all tenant buffers
Click to show internal directories.
Click to hide internal directories.