task

package
v0.1.213 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 29, 2023 License: MIT Imports: 14 Imported by: 2

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Duplicator

type Duplicator[In any] struct {
	*Task
	// contains filtered or unexported fields
}

Takes item from input channel and in parallel puts to all output channels. Payload needs to be broadcasted into all channels, otherwise it blocks

func NewDuplicator

func NewDuplicator[In any](config *config.Config, name string) (self *Duplicator[In])

func (*Duplicator[In]) NextChannel

func (self *Duplicator[In]) NextChannel() (out chan In)

func (*Duplicator[In]) WithInputChannel

func (self *Duplicator[In]) WithInputChannel(input chan In) *Duplicator[In]

func (*Duplicator[In]) WithOutputChannels

func (self *Duplicator[In]) WithOutputChannels(numChannels, capacity int) *Duplicator[In]

type Flattener

type Flattener[In any] struct {
	*Task

	Output chan In
	// contains filtered or unexported fields
}

Accepts slices of elements and emits each element individually

func NewFlattener

func NewFlattener[In any](config *config.Config, name string) (self *Flattener[In])

func (*Flattener[In]) WithCapacity

func (self *Flattener[In]) WithCapacity(capacity int) *Flattener[In]

func (*Flattener[In]) WithInputChannel

func (self *Flattener[In]) WithInputChannel(input chan []In) *Flattener[In]

type Mapper

type Mapper[In any, Out any] struct {
	*Task

	Output chan Out
	// contains filtered or unexported fields
}

Takes item from input channel, processes it and inserts it into the output channel

func NewMapper

func NewMapper[In any, Out any](config *config.Config, name string) (self *Mapper[In, Out])

func (*Mapper[In, Out]) WithInputChannel

func (self *Mapper[In, Out]) WithInputChannel(input chan In) *Mapper[In, Out]

func (*Mapper[In, Out]) WithProcessFunc

func (self *Mapper[In, Out]) WithProcessFunc(f func(in In, out chan Out) (err error)) *Mapper[In, Out]

func (*Mapper[In, Out]) WithWorkerPool

func (self *Mapper[In, Out]) WithWorkerPool(maxWorkers, maxQueueSize int) *Mapper[In, Out]

type Processor

type Processor[In any, Out any] struct {
	*Task

	// Output channel that forwards successfuly processed data
	Output chan []Out
	// contains filtered or unexported fields
}

Store handles saving data to the database in na robust way. - groups incoming Interactions into batches, - ensures data isn't stuck even if a batch isn't big enough - passes the data returned by the onFlush function to the output channel

func NewProcessor

func NewProcessor[In any, Out any](config *config.Config, name string) (self *Processor[In, Out])

func (*Processor[In, Out]) WithBackoff

func (self *Processor[In, Out]) WithBackoff(maxElapsedTime, maxInterval time.Duration) *Processor[In, Out]

func (*Processor[In, Out]) WithBatchSize

func (self *Processor[In, Out]) WithBatchSize(batchSize int) *Processor[In, Out]

func (*Processor[In, Out]) WithInputChannel

func (self *Processor[In, Out]) WithInputChannel(v chan In) *Processor[In, Out]

func (*Processor[In, Out]) WithOnFlush

func (self *Processor[In, Out]) WithOnFlush(interval time.Duration, f func([]Out) ([]Out, error)) *Processor[In, Out]

func (*Processor[In, Out]) WithOnProcess

func (self *Processor[In, Out]) WithOnProcess(f func(In) ([]Out, error)) *Processor[In, Out]

type Retry

type Retry struct {
	// contains filtered or unexported fields
}

Implement operation retrying

func NewRetry

func NewRetry() *Retry

func (*Retry) Run

func (self *Retry) Run(f func() error) error

func (*Retry) WithAcceptableDuration

func (self *Retry) WithAcceptableDuration(v time.Duration) *Retry

func (*Retry) WithContext

func (self *Retry) WithContext(ctx context.Context) *Retry

func (*Retry) WithMaxElapsedTime

func (self *Retry) WithMaxElapsedTime(maxElapsedTime time.Duration) *Retry

func (*Retry) WithMaxInterval

func (self *Retry) WithMaxInterval(maxInterval time.Duration) *Retry

func (*Retry) WithOnError

func (self *Retry) WithOnError(v func(err error, isDurationAcceptable bool) error) *Retry

type SinkTask

type SinkTask[In any] struct {
	*Task
	// contains filtered or unexported fields
}

Store handles saving data to the database in na robust way. - groups incoming Interactions into batches, - ensures data isn't stuck even if a batch isn't big enough

func NewSinkTask

func NewSinkTask[In any](config *config.Config, name string) (self *SinkTask[In])

func (*SinkTask[In]) WithBackoff

func (self *SinkTask[In]) WithBackoff(maxElapsedTime, maxInterval time.Duration) *SinkTask[In]

func (*SinkTask[In]) WithBatchSize

func (self *SinkTask[In]) WithBatchSize(batchSize int) *SinkTask[In]

func (*SinkTask[In]) WithInputChannel

func (self *SinkTask[In]) WithInputChannel(v chan In) *SinkTask[In]

func (*SinkTask[In]) WithOnFlush

func (self *SinkTask[In]) WithOnFlush(interval time.Duration, f func([]In) error) *SinkTask[In]

type Task

type Task struct {
	Config *config.Config
	Log    *logrus.Entry
	Name   string

	// Stopping
	IsStopping  *atomic.Bool
	StopChannel chan bool

	// Context active as long as there's anything running in the task.
	// Used OUTSIDE the task.
	CtxRunning context.Context

	// Context cancelled when Stop() is called.
	// Used INSIDE the task
	Ctx context.Context
	// contains filtered or unexported fields
}

Boilerplate for running long lived tasks.

func NewTask

func NewTask(config *config.Config, name string) (self *Task)

func (*Task) GetWorkerQueueFillFactor

func (self *Task) GetWorkerQueueFillFactor() float32

func (*Task) Start

func (self *Task) Start() (err error)

func (*Task) Stop

func (self *Task) Stop()

func (*Task) StopWait

func (self *Task) StopWait()

func (*Task) SubmitToWorker

func (self *Task) SubmitToWorker(f func())

func (*Task) SubmitToWorkerIfEmpty

func (self *Task) SubmitToWorkerIfEmpty(f func())

func (*Task) WithConditionalSubtask

func (self *Task) WithConditionalSubtask(isEnabled bool, t *Task) *Task

func (*Task) WithEnable

func (self *Task) WithEnable(v bool) *Task

func (*Task) WithOnAfterStop

func (self *Task) WithOnAfterStop(f func()) *Task

func (*Task) WithOnBeforeStart

func (self *Task) WithOnBeforeStart(f func() error) *Task

func (*Task) WithOnStop

func (self *Task) WithOnStop(f func()) *Task

func (*Task) WithPeriodicSubtaskFunc

func (self *Task) WithPeriodicSubtaskFunc(period time.Duration, f func() error) *Task

Repeatedly run the callback with a period.

func (*Task) WithRepeatedSubtaskFunc

func (self *Task) WithRepeatedSubtaskFunc(period time.Duration, f func() (repeat bool, err error)) *Task

Callback is run again and again until it returns false or an error. Rerun after period.

func (*Task) WithSubtask

func (self *Task) WithSubtask(t *Task) *Task

func (*Task) WithSubtaskFunc

func (self *Task) WithSubtaskFunc(f func() error) *Task

func (*Task) WithSubtaskSlice

func (self *Task) WithSubtaskSlice(tasks []*Task) *Task

func (*Task) WithWorkerPool

func (self *Task) WithWorkerPool(maxWorkers int, maxQueueSize int) *Task

type Watchdog

type Watchdog struct {
	*Task
	// contains filtered or unexported fields
}

func NewWatchdog

func NewWatchdog(config *config.Config) (self *Watchdog)

func (*Watchdog) WithIsOK

func (self *Watchdog) WithIsOK(isOK func() bool) *Watchdog

func (*Watchdog) WithTask

func (self *Watchdog) WithTask(f func() *Task) *Watchdog

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL