Documentation ¶
Index ¶
- type Duplicator
- type Flattener
- type Mapper
- type Processor
- func (self *Processor[In, Out]) WithBackoff(maxElapsedTime, maxInterval time.Duration) *Processor[In, Out]
- func (self *Processor[In, Out]) WithBatchSize(batchSize int) *Processor[In, Out]
- func (self *Processor[In, Out]) WithInputChannel(v chan In) *Processor[In, Out]
- func (self *Processor[In, Out]) WithOnFlush(interval time.Duration, f func([]Out) ([]Out, error)) *Processor[In, Out]
- func (self *Processor[In, Out]) WithOnProcess(f func(In) ([]Out, error)) *Processor[In, Out]
- type Retry
- func (self *Retry) Run(f func() error) error
- func (self *Retry) WithAcceptableDuration(v time.Duration) *Retry
- func (self *Retry) WithContext(ctx context.Context) *Retry
- func (self *Retry) WithMaxElapsedTime(maxElapsedTime time.Duration) *Retry
- func (self *Retry) WithMaxInterval(maxInterval time.Duration) *Retry
- func (self *Retry) WithOnError(v func(err error, isDurationAcceptable bool) error) *Retry
- type SinkTask
- func (self *SinkTask[In]) WithBackoff(maxElapsedTime, maxInterval time.Duration) *SinkTask[In]
- func (self *SinkTask[In]) WithBatchSize(batchSize int) *SinkTask[In]
- func (self *SinkTask[In]) WithInputChannel(v chan In) *SinkTask[In]
- func (self *SinkTask[In]) WithOnFlush(interval time.Duration, f func([]In) error) *SinkTask[In]
- type Task
- func (self *Task) GetWorkerQueueFillFactor() float32
- func (self *Task) Start() (err error)
- func (self *Task) Stop()
- func (self *Task) StopWait()
- func (self *Task) SubmitToWorker(f func())
- func (self *Task) SubmitToWorkerIfEmpty(f func())
- func (self *Task) WithConditionalSubtask(isEnabled bool, t *Task) *Task
- func (self *Task) WithEnable(v bool) *Task
- func (self *Task) WithOnAfterStop(f func()) *Task
- func (self *Task) WithOnBeforeStart(f func() error) *Task
- func (self *Task) WithOnStop(f func()) *Task
- func (self *Task) WithPeriodicSubtaskFunc(period time.Duration, f func() error) *Task
- func (self *Task) WithRepeatedSubtaskFunc(period time.Duration, f func() (repeat bool, err error)) *Task
- func (self *Task) WithSubtask(t *Task) *Task
- func (self *Task) WithSubtaskFunc(f func() error) *Task
- func (self *Task) WithSubtaskSlice(tasks []*Task) *Task
- func (self *Task) WithWorkerPool(maxWorkers int, maxQueueSize int) *Task
- type Watchdog
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Duplicator ¶
Takes item from input channel and in parallel puts to all output channels. Payload needs to be broadcasted into all channels, otherwise it blocks
func NewDuplicator ¶
func NewDuplicator[In any](config *config.Config, name string) (self *Duplicator[In])
func (*Duplicator[In]) NextChannel ¶
func (self *Duplicator[In]) NextChannel() (out chan In)
func (*Duplicator[In]) WithInputChannel ¶
func (self *Duplicator[In]) WithInputChannel(input chan In) *Duplicator[In]
func (*Duplicator[In]) WithOutputChannels ¶
func (self *Duplicator[In]) WithOutputChannels(numChannels, capacity int) *Duplicator[In]
type Flattener ¶
Accepts slices of elements and emits each element individually
func NewFlattener ¶
func (*Flattener[In]) WithCapacity ¶
func (*Flattener[In]) WithInputChannel ¶
type Mapper ¶
type Mapper[In any, Out any] struct { *Task Output chan Out // contains filtered or unexported fields }
Takes item from input channel, processes it and inserts it into the output channel
func (*Mapper[In, Out]) WithInputChannel ¶
func (*Mapper[In, Out]) WithProcessFunc ¶
func (*Mapper[In, Out]) WithWorkerPool ¶
type Processor ¶
type Processor[In any, Out any] struct { *Task // Output channel that forwards successfuly processed data Output chan []Out // contains filtered or unexported fields }
Store handles saving data to the database in na robust way. - groups incoming Interactions into batches, - ensures data isn't stuck even if a batch isn't big enough - passes the data returned by the onFlush function to the output channel
func NewProcessor ¶
func (*Processor[In, Out]) WithBackoff ¶
func (*Processor[In, Out]) WithBatchSize ¶
func (*Processor[In, Out]) WithInputChannel ¶
func (*Processor[In, Out]) WithOnFlush ¶
func (*Processor[In, Out]) WithOnProcess ¶
type Retry ¶
type Retry struct {
// contains filtered or unexported fields
}
Implement operation retrying
func (*Retry) WithAcceptableDuration ¶
func (*Retry) WithMaxElapsedTime ¶
func (*Retry) WithMaxInterval ¶
type SinkTask ¶
Store handles saving data to the database in na robust way. - groups incoming Interactions into batches, - ensures data isn't stuck even if a batch isn't big enough
func NewSinkTask ¶
func (*SinkTask[In]) WithBackoff ¶
func (*SinkTask[In]) WithBatchSize ¶
func (*SinkTask[In]) WithInputChannel ¶
type Task ¶
type Task struct { Config *config.Config Log *logrus.Entry Name string // Stopping IsStopping *atomic.Bool StopChannel chan bool // Context active as long as there's anything running in the task. // Used OUTSIDE the task. CtxRunning context.Context // Context cancelled when Stop() is called. // Used INSIDE the task Ctx context.Context // contains filtered or unexported fields }
Boilerplate for running long lived tasks.
func (*Task) GetWorkerQueueFillFactor ¶
func (*Task) SubmitToWorker ¶
func (self *Task) SubmitToWorker(f func())
func (*Task) SubmitToWorkerIfEmpty ¶
func (self *Task) SubmitToWorkerIfEmpty(f func())
func (*Task) WithConditionalSubtask ¶
func (*Task) WithEnable ¶
func (*Task) WithOnAfterStop ¶
func (*Task) WithOnBeforeStart ¶
func (*Task) WithOnStop ¶
func (*Task) WithPeriodicSubtaskFunc ¶
Repeatedly run the callback with a period.
func (*Task) WithRepeatedSubtaskFunc ¶
func (self *Task) WithRepeatedSubtaskFunc(period time.Duration, f func() (repeat bool, err error)) *Task
Callback is run again and again until it returns false or an error. Rerun after period.
func (*Task) WithSubtask ¶
func (*Task) WithSubtaskFunc ¶
func (*Task) WithSubtaskSlice ¶
type Watchdog ¶
type Watchdog struct { *Task // contains filtered or unexported fields }