Documentation ¶
Index ¶
- type DataProcessor
- type ProcessorWorker
- type TokenProcessor
- func NewChangeNamespace(source, destination *string) TokenProcessor
- func NewExpirationSetter(expired *atomic.Uint64, logger *slog.Logger) TokenProcessor
- func NewFilterByBin(binList []string, skipped *atomic.Uint64) TokenProcessor
- func NewFilterBySet(setList []string, skipped *atomic.Uint64) TokenProcessor
- func NewFilterByType(noRecords, noIndexes, noUdf bool) TokenProcessor
- func NewRecordCounter(counter *atomic.Uint64) TokenProcessor
- func NewSizeCounter(counter *atomic.Uint64) TokenProcessor
- func NewVoidTimeSetter(logger *slog.Logger) TokenProcessor
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DataProcessor ¶
DataProcessor is an interface for processing data
func NewTPSLimiter ¶
func NewTPSLimiter[T any](ctx context.Context, n int) DataProcessor[T]
NewTPSLimiter Create a new TPS limiter. n — allowed number of tokens per second, n = 0 means no limit.
type ProcessorWorker ¶
type ProcessorWorker[T any] struct { // contains filtered or unexported fields }
ProcessorWorker implements the pipeline.Worker interface It wraps a DataProcessor and processes data with it
func NewProcessorWorker ¶
func NewProcessorWorker[T any](processor DataProcessor[T]) *ProcessorWorker[T]
NewProcessorWorker creates a new ProcessorWorker
func (*ProcessorWorker[T]) Run ¶
func (w *ProcessorWorker[T]) Run(ctx context.Context) error
Run starts the ProcessorWorker
func (*ProcessorWorker[T]) SetReceiveChan ¶
func (w *ProcessorWorker[T]) SetReceiveChan(c <-chan T)
SetReceiveChan sets receive channel for the ProcessorWorker
func (*ProcessorWorker[T]) SetSendChan ¶
func (w *ProcessorWorker[T]) SetSendChan(c chan<- T)
SetSendChan sets the send channel for the ProcessorWorker
type TokenProcessor ¶
type TokenProcessor = DataProcessor[*models.Token]
func NewChangeNamespace ¶
func NewChangeNamespace(source, destination *string) TokenProcessor
NewChangeNamespace creates new changeNamespace
func NewExpirationSetter ¶
func NewExpirationSetter(expired *atomic.Uint64, logger *slog.Logger) TokenProcessor
NewExpirationSetter creates a new expirationSetter processor
func NewFilterByBin ¶
func NewFilterByBin(binList []string, skipped *atomic.Uint64) TokenProcessor
NewFilterByBin creates new filterByBin processor with given binList.
func NewFilterBySet ¶
func NewFilterBySet(setList []string, skipped *atomic.Uint64) TokenProcessor
NewFilterBySet creates new filterBySet processor with given setList.
func NewFilterByType ¶
func NewFilterByType(noRecords, noIndexes, noUdf bool) TokenProcessor
NewFilterByType creates new filterByType processor
func NewRecordCounter ¶
func NewRecordCounter(counter *atomic.Uint64) TokenProcessor
func NewSizeCounter ¶
func NewSizeCounter(counter *atomic.Uint64) TokenProcessor
func NewVoidTimeSetter ¶
func NewVoidTimeSetter(logger *slog.Logger) TokenProcessor
NewVoidTimeSetter creates a new VoidTimeProcessor