Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func SetDefaults ¶
Types ¶
type BulkEventWriter ¶
type BulkEventWriter struct { *TenantBufferManager[*repository.CreateStepRunEventOpts, int] // contains filtered or unexported fields }
func NewBulkEventWriter ¶
func (*BulkEventWriter) BulkWriteStepRunEvents ¶
func (w *BulkEventWriter) BulkWriteStepRunEvents(ctx context.Context, opts []*repository.CreateStepRunEventOpts) ([]int, error)
func (*BulkEventWriter) Cleanup ¶
func (w *BulkEventWriter) Cleanup() error
type BulkQueueStepRunOpts ¶
type BulkQueueStepRunOpts struct { *dbsqlc.GetStepRunForEngineRow Priority int IsRetry bool Input []byte }
type BulkSemaphoreReleaser ¶
type BulkSemaphoreReleaser struct { *TenantBufferManager[SemaphoreReleaseOpts, pgtype.UUID] // contains filtered or unexported fields }
func (*BulkSemaphoreReleaser) BulkReleaseSemaphores ¶
func (w *BulkSemaphoreReleaser) BulkReleaseSemaphores(ctx context.Context, opts []SemaphoreReleaseOpts) ([]pgtype.UUID, error)
func (*BulkSemaphoreReleaser) Cleanup ¶
func (w *BulkSemaphoreReleaser) Cleanup() error
type BulkStepRunQueuer ¶
type BulkStepRunQueuer struct { *TenantBufferManager[BulkQueueStepRunOpts, pgtype.UUID] // contains filtered or unexported fields }
func NewBulkStepRunQueuer ¶
func (*BulkStepRunQueuer) BulkQueueStepRuns ¶
func (w *BulkStepRunQueuer) BulkQueueStepRuns(ctx context.Context, opts []BulkQueueStepRunOpts) ([]pgtype.UUID, error)
func (*BulkStepRunQueuer) Cleanup ¶
func (w *BulkStepRunQueuer) Cleanup() error
type FlushResponse ¶
type IngestBuf ¶
func NewIngestBuffer ¶
func NewIngestBuffer[T any, U any](opts IngestBufOpts[T, U]) *IngestBuf[T, U]
NewIngestBuffer creates a new buffer for any type T
func (*IngestBuf[T, U]) BuffItem ¶
func (b *IngestBuf[T, U]) BuffItem(item T) (chan *FlushResponse[U], error)
func (*IngestBuf[T, U]) StartDebugLoop ¶
func (b *IngestBuf[T, U]) StartDebugLoop()
type IngestBufOpts ¶
type IngestBufOpts[T any, U any] struct { Name string `validate:"required"` MaxCapacity int `validate:"required,gt=0"` FlushPeriod time.Duration `validate:"required,gt=0"` MaxDataSizeInQueue int `validate:"required,gt=0"` OutputFunc func(ctx context.Context, items []T) ([]U, error) `validate:"required"` SizeFunc func(T) int `validate:"required"` L *zerolog.Logger `validate:"required"` }
type SemaphoreReleaseOpts ¶
type TenantBufManagerOpts ¶
type TenantBufManagerOpts[T any, U any] struct { Name string `validate:"required"` OutputFunc func(ctx context.Context, items []T) ([]U, error) `validate:"required"` SizeFunc func(T) int `validate:"required"` L *zerolog.Logger `validate:"required"` V validator.Validator `validate:"required"` FlushPeriod *time.Duration FlushItemsThreshold int }
type TenantBufferManager ¶
func NewTenantBufManager ¶
func NewTenantBufManager[T any, U any](opts TenantBufManagerOpts[T, U]) (*TenantBufferManager[T, U], error)
Create a new TenantBufferManager with generic types T for input and U for output
func (*TenantBufferManager[T, U]) BuffItem ¶
func (t *TenantBufferManager[T, U]) BuffItem(tenantKey string, eventOps T) (chan *FlushResponse[U], error)
func (*TenantBufferManager[T, U]) Cleanup ¶
func (t *TenantBufferManager[T, U]) Cleanup() error
cleanup all tenant buffers
Click to show internal directories.
Click to hide internal directories.