buffer

package
v0.51.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 8, 2024 License: MIT Imports: 23 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func SetDefaults

func SetDefaults(flushPeriodMilliseconds int, flushItemsThreshold int)

Types

type BulkEventWriter

type BulkEventWriter struct {
	*TenantBufferManager[*repository.CreateStepRunEventOpts, int]
	// contains filtered or unexported fields
}

func (*BulkEventWriter) BulkWriteStepRunEvents

func (w *BulkEventWriter) BulkWriteStepRunEvents(ctx context.Context, opts []*repository.CreateStepRunEventOpts) ([]int, error)

func (*BulkEventWriter) Cleanup

func (w *BulkEventWriter) Cleanup() error

func (*BulkEventWriter) SerialWriteStepRunEvent added in v0.50.2

func (w *BulkEventWriter) SerialWriteStepRunEvent(ctx context.Context, opts []*repository.CreateStepRunEventOpts) ([]int, error)

type BulkQueueStepRunOpts

type BulkQueueStepRunOpts struct {
	*dbsqlc.GetStepRunForEngineRow

	Priority int
	IsRetry  bool
	Input    []byte
}

type BulkSemaphoreReleaser

type BulkSemaphoreReleaser struct {
	*TenantBufferManager[SemaphoreReleaseOpts, pgtype.UUID]
	// contains filtered or unexported fields
}

func (*BulkSemaphoreReleaser) BulkReleaseSemaphores

func (w *BulkSemaphoreReleaser) BulkReleaseSemaphores(ctx context.Context, opts []SemaphoreReleaseOpts) ([]pgtype.UUID, error)

func (*BulkSemaphoreReleaser) Cleanup

func (w *BulkSemaphoreReleaser) Cleanup() error

type BulkStepRunQueuer

type BulkStepRunQueuer struct {
	*TenantBufferManager[BulkQueueStepRunOpts, pgtype.UUID]
	// contains filtered or unexported fields
}

func (*BulkStepRunQueuer) BulkQueueStepRuns

func (w *BulkStepRunQueuer) BulkQueueStepRuns(ctx context.Context, opts []BulkQueueStepRunOpts) ([]pgtype.UUID, error)

func (*BulkStepRunQueuer) Cleanup

func (w *BulkStepRunQueuer) Cleanup() error

type ConfigFileBuffer

type ConfigFileBuffer struct {

	// WaitForFlush is the time to wait for the buffer to flush used for backpressure on writers
	WaitForFlush time.Duration `mapstructure:"waitForFlush" json:"waitForFlush,omitempty" default:"1ms"`

	// MaxConcurrent is the maximum number of concurrent flushes
	MaxConcurrent int `mapstructure:"maxConcurrent" json:"maxConcurrent,omitempty" default:"50"`

	// FlushPeriodMilliseconds is the number of milliseconds before flush
	FlushPeriodMilliseconds int `mapstructure:"flushPeriodMilliseconds" json:"flushPeriodMilliseconds,omitempty" default:"10"`

	// FlushItemsThreshold is the number of items to hold in memory until flushing to the database
	FlushItemsThreshold int `mapstructure:"flushItemsThreshold" json:"flushItemsThreshold,omitempty" default:"100"`

	// SerialBuffer is a flag to determine if the buffer should be serial or bulk
	SerialBuffer bool `mapstructure:"serialBuffer" json:"serialBuffer,omitempty" default:"false"`
}

ConfigFileBuffer is the configuration for the buffer. We store it here to prevent circular dependencies

type FlushResponse

type FlushResponse[U any] struct {
	Result U
	Err    error
}

type IngestBuf

type IngestBuf[T any, U any] struct {
	// contains filtered or unexported fields
}

func NewIngestBuffer

func NewIngestBuffer[T any, U any](opts IngestBufOpts[T, U]) *IngestBuf[T, U]

NewIngestBuffer creates a new buffer for any type T

func (*IngestBuf[T, U]) BuffItem

func (b *IngestBuf[T, U]) BuffItem(item T) (chan *FlushResponse[U], error)

func (*IngestBuf[T, U]) Start

func (b *IngestBuf[T, U]) Start() (func() error, error)

func (*IngestBuf[T, U]) StartDebugLoop

func (b *IngestBuf[T, U]) StartDebugLoop()

type IngestBufOpts

type IngestBufOpts[T any, U any] struct {
	Name string `validate:"required"`
	// MaxCapacity is the maximum number of items to hold in buffer before we initiate a flush
	MaxCapacity        int                                               `validate:"required,gt=0"`
	FlushPeriod        time.Duration                                     `validate:"required,gt=0"`
	MaxDataSizeInQueue int                                               `validate:"required,gt=0"`
	OutputFunc         func(ctx context.Context, items []T) ([]U, error) `validate:"required"`
	SizeFunc           func(T) int                                       `validate:"required"`
	L                  *zerolog.Logger                                   `validate:"required"`
	MaxConcurrent      int                                               `validate:"omitempty,gt=0"`
	WaitForFlush       time.Duration                                     `validate:"omitempty,gt=0"`
}

type SemaphoreReleaseOpts

type SemaphoreReleaseOpts struct {
	StepRunId pgtype.UUID
	TenantId  pgtype.UUID
}

type TenantBufManagerOpts

type TenantBufManagerOpts[T any, U any] struct {
	Name       string                                            `validate:"required"`
	OutputFunc func(ctx context.Context, items []T) ([]U, error) `validate:"required"`
	SizeFunc   func(T) int                                       `validate:"required"`
	L          *zerolog.Logger                                   `validate:"required"`
	V          validator.Validator                               `validate:"required"`
	Config     ConfigFileBuffer                                  `validate:"required"`
}

type TenantBufferManager

type TenantBufferManager[T any, U any] struct {
	// contains filtered or unexported fields
}

func NewTenantBufManager

func NewTenantBufManager[T any, U any](opts TenantBufManagerOpts[T, U]) (*TenantBufferManager[T, U], error)

Create a new TenantBufferManager with generic types T for input and U for output

func (*TenantBufferManager[T, U]) BuffItem

func (t *TenantBufferManager[T, U]) BuffItem(tenantKey string, eventOps T) (chan *FlushResponse[U], error)

func (*TenantBufferManager[T, U]) Cleanup

func (t *TenantBufferManager[T, U]) Cleanup() error

cleanup all tenant buffers

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL