task

package
v0.2.154 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 23, 2023 License: MIT Imports: 18 Imported by: 2

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Duplicator

type Duplicator[In any] struct {
	*Task
	// contains filtered or unexported fields
}

Takes item from input channel and in parallel puts to all output channels. Payload needs to be broadcasted into all channels, otherwise it blocks

func NewDuplicator

func NewDuplicator[In any](config *config.Config, name string) (self *Duplicator[In])

func (*Duplicator[In]) NextChannel

func (self *Duplicator[In]) NextChannel() (out chan In)

func (*Duplicator[In]) WithInputChannel

func (self *Duplicator[In]) WithInputChannel(input chan In) *Duplicator[In]

func (*Duplicator[In]) WithOutputChannels

func (self *Duplicator[In]) WithOutputChannels(numChannels, capacity int) *Duplicator[In]

type Flattener

type Flattener[In any] struct {
	*Task

	Output chan In
	// contains filtered or unexported fields
}

Accepts slices of elements and emits each element individually

func NewFlattener

func NewFlattener[In any](config *config.Config, name string) (self *Flattener[In])

func (*Flattener[In]) WithCapacity

func (self *Flattener[In]) WithCapacity(capacity int) *Flattener[In]

func (*Flattener[In]) WithInputChannel

func (self *Flattener[In]) WithInputChannel(input chan []In) *Flattener[In]

type Hole added in v0.2.105

type Hole[In any] struct {
	*Task
	// contains filtered or unexported fields
}

Processing task. Ensures flush is over before another is started:

func NewHole added in v0.2.105

func NewHole[In any](config *config.Config, name string) (self *Hole[In])

func (*Hole[In]) WithBackoff added in v0.2.105

func (self *Hole[In]) WithBackoff(maxElapsedTime, maxInterval time.Duration) *Hole[In]

func (*Hole[In]) WithBatchSize added in v0.2.105

func (self *Hole[In]) WithBatchSize(batchSize int) *Hole[In]

func (*Hole[In]) WithInputChannel added in v0.2.105

func (self *Hole[In]) WithInputChannel(v chan In) *Hole[In]

func (*Hole[In]) WithOnFlush added in v0.2.105

func (self *Hole[In]) WithOnFlush(interval time.Duration, f func([]In) error) *Hole[In]

type JoinController added in v0.1.217

type JoinController interface {
	// Returns true if message is the first message of a batch
	// After this message is sent, the channel will be the only forwarded channel until IsLast() is called
	IsFirst() bool

	// Returns true if message is the last message of a batch
	// After this message is sent the channel will no longer be the only forwarded channel
	IsLast() bool
}

type Joiner added in v0.1.217

type Joiner[In JoinController] struct {
	*Task

	Output chan In
	// contains filtered or unexported fields
}

Takes elements from multiple channels and sends them to a single channel Each channel may take assume and give up control of the output channel by sending messages Passed messages have to implement the interface JoinController for this to work

func NewJoiner added in v0.1.217

func NewJoiner[In JoinController](config *config.Config, name string) (self *Joiner[In])

func (*Joiner[In]) WithCapacity added in v0.1.217

func (self *Joiner[In]) WithCapacity(capacity int) *Joiner[In]

func (*Joiner[In]) WithInputChannel added in v0.1.217

func (self *Joiner[In]) WithInputChannel(input chan In) *Joiner[In]

type Mapper

type Mapper[In any, Out any] struct {
	*Task

	Output chan Out
	// contains filtered or unexported fields
}

Takes item from input channel, processes it and inserts it into the output channel

func NewMapper

func NewMapper[In any, Out any](config *config.Config, name string) (self *Mapper[In, Out])

func (*Mapper[In, Out]) WithInputChannel

func (self *Mapper[In, Out]) WithInputChannel(input chan In) *Mapper[In, Out]

func (*Mapper[In, Out]) WithProcessFunc

func (self *Mapper[In, Out]) WithProcessFunc(f func(in In, out chan Out) (err error)) *Mapper[In, Out]

func (*Mapper[In, Out]) WithWorkerPool

func (self *Mapper[In, Out]) WithWorkerPool(maxWorkers, maxQueueSize int) *Mapper[In, Out]

type Processor

type Processor[In any, Out any] struct {
	*Task

	// Output channel that forwards successfuly processed data
	Output chan []Out
	// contains filtered or unexported fields
}

Implements a two step processing task: - onProcess is called for each incoming data item for pre processing - onFlush is called periodically to handle a batch of processed data

func NewProcessor

func NewProcessor[In any, Out any](config *config.Config, name string) (self *Processor[In, Out])

func (*Processor[In, Out]) WithBackoff

func (self *Processor[In, Out]) WithBackoff(maxElapsedTime, maxInterval time.Duration) *Processor[In, Out]

func (*Processor[In, Out]) WithBatchSize

func (self *Processor[In, Out]) WithBatchSize(batchSize int) *Processor[In, Out]

func (*Processor[In, Out]) WithInputChannel

func (self *Processor[In, Out]) WithInputChannel(v chan In) *Processor[In, Out]

func (*Processor[In, Out]) WithOnFlush

func (self *Processor[In, Out]) WithOnFlush(interval time.Duration, f func([]Out) ([]Out, error)) *Processor[In, Out]

func (*Processor[In, Out]) WithOnProcess

func (self *Processor[In, Out]) WithOnProcess(f func(In) ([]Out, error)) *Processor[In, Out]

type RepeatedSubtaskFunc added in v0.2.144

type RepeatedSubtaskFunc func() (repeat bool, err error)

type Retry

type Retry struct {
	// contains filtered or unexported fields
}

Implement operation retrying

func NewRetry

func NewRetry() *Retry

func (*Retry) Run

func (self *Retry) Run(f func() error) error

func (*Retry) WithAcceptableDuration

func (self *Retry) WithAcceptableDuration(v time.Duration) *Retry

func (*Retry) WithContext

func (self *Retry) WithContext(ctx context.Context) *Retry

func (*Retry) WithMaxElapsedTime

func (self *Retry) WithMaxElapsedTime(maxElapsedTime time.Duration) *Retry

func (*Retry) WithMaxInterval

func (self *Retry) WithMaxInterval(maxInterval time.Duration) *Retry

func (*Retry) WithOnError

func (self *Retry) WithOnError(v func(err error, isDurationAcceptable bool) error) *Retry

type SinkTask

type SinkTask[In any] struct {
	*Task
	// contains filtered or unexported fields
}

Store handles saving data to the database in na robust way. - groups incoming Interactions into batches, - ensures data isn't stuck even if a batch isn't big enough

func NewSinkTask

func NewSinkTask[In any](config *config.Config, name string) (self *SinkTask[In])

func (*SinkTask[In]) WithBackoff

func (self *SinkTask[In]) WithBackoff(maxElapsedTime, maxInterval time.Duration) *SinkTask[In]

func (*SinkTask[In]) WithBatchSize

func (self *SinkTask[In]) WithBatchSize(batchSize int) *SinkTask[In]

func (*SinkTask[In]) WithInputChannel

func (self *SinkTask[In]) WithInputChannel(v chan In) *SinkTask[In]

func (*SinkTask[In]) WithOnFlush

func (self *SinkTask[In]) WithOnFlush(interval time.Duration, f func([]In) error) *SinkTask[In]

type Task

type Task struct {
	Config *config.Config
	Log    *logrus.Entry
	Name   string

	// Stopping
	IsStopping  *atomic.Bool
	StopChannel chan bool

	// Context active as long as there's anything running in the task.
	// Used OUTSIDE the task.
	CtxRunning context.Context

	// Context cancelled when Stop() is called.
	// Used INSIDE the task
	Ctx context.Context
	// contains filtered or unexported fields
}

Boilerplate for running long lived tasks.

func NewTask

func NewTask(config *config.Config, name string) (self *Task)

func (*Task) GetWorkerQueueFillFactor

func (self *Task) GetWorkerQueueFillFactor() float32

func (*Task) Start

func (self *Task) Start() (err error)

func (*Task) Stop

func (self *Task) Stop()

func (*Task) StopWait

func (self *Task) StopWait()

func (*Task) SubmitToWorker

func (self *Task) SubmitToWorker(f func())

func (*Task) SubmitToWorkerIfEmpty

func (self *Task) SubmitToWorkerIfEmpty(f func())

func (*Task) WithConditionalSubtask

func (self *Task) WithConditionalSubtask(isEnabled bool, t *Task) *Task

func (*Task) WithEnable

func (self *Task) WithEnable(v bool) *Task

func (*Task) WithLoger added in v0.2.32

func (self *Task) WithLoger(v time.Duration) *Task

func (*Task) WithOnAfterStop

func (self *Task) WithOnAfterStop(f func()) *Task

func (*Task) WithOnBeforeStart

func (self *Task) WithOnBeforeStart(f func() error) *Task

func (*Task) WithOnStop

func (self *Task) WithOnStop(f func()) *Task

func (*Task) WithPeriodicSubtaskFunc

func (self *Task) WithPeriodicSubtaskFunc(period time.Duration, f func() error) *Task

Repeatedly run the callback with a period.

func (*Task) WithRepeatedSubtaskFunc

func (self *Task) WithRepeatedSubtaskFunc(period time.Duration, f RepeatedSubtaskFunc) *Task

Callback is run again and again until it returns false or an error. Rerun after period.

func (*Task) WithStopChannel added in v0.2.40

func (self *Task) WithStopChannel(channel chan struct{}) *Task

func (*Task) WithStopTimeout added in v0.2.32

func (self *Task) WithStopTimeout(v time.Duration) *Task

func (*Task) WithSubtask

func (self *Task) WithSubtask(t *Task) *Task

func (*Task) WithSubtaskFunc

func (self *Task) WithSubtaskFunc(f func() error) *Task

func (*Task) WithSubtaskSlice

func (self *Task) WithSubtaskSlice(tasks []*Task) *Task

func (*Task) WithWorkerPool

func (self *Task) WithWorkerPool(maxWorkers int, maxQueueSize int) *Task

type Watchdog

type Watchdog struct {
	*Task
	// contains filtered or unexported fields
}

func NewWatchdog

func NewWatchdog(config *config.Config) (self *Watchdog)

func (*Watchdog) Restart added in v0.2.83

func (self *Watchdog) Restart() (err error)

func (*Watchdog) WithIsOK

func (self *Watchdog) WithIsOK(interval time.Duration, isOK func() bool) *Watchdog

func (*Watchdog) WithOnAfterStop added in v0.1.283

func (self *Watchdog) WithOnAfterStop(onAfterStop func()) *Watchdog

func (*Watchdog) WithTask

func (self *Watchdog) WithTask(f func() *Task) *Watchdog

type Websocket added in v0.1.283

type Websocket[T any] struct {
	*Watchdog

	Output chan *T

	// Config
	Url string
	// contains filtered or unexported fields
}

func NewWebsocket added in v0.1.283

func NewWebsocket[T any](config *config.Config, name string) (self *Websocket[T])

Maintains a persistent websocket connection to the sequencer Gets events that it subscribed for

func (*Websocket[T]) WithKeepAliveTimeout added in v0.1.283

func (self *Websocket[T]) WithKeepAliveTimeout(v time.Duration) *Websocket[T]

func (*Websocket[T]) WithMaxMessageSize added in v0.1.283

func (self *Websocket[T]) WithMaxMessageSize(maxMessageSize int64) *Websocket[T]

func (*Websocket[T]) WithMinTimeBetweenMessages added in v0.1.283

func (self *Websocket[T]) WithMinTimeBetweenMessages(minTimeBetweenMessages time.Duration) *Websocket[T]

Used to detect dead connections At least this often a message should be received Message may be PONG response for a PING request

func (*Websocket[T]) WithOnConnected added in v0.1.283

func (self *Websocket[T]) WithOnConnected(onConnected func(ws *websocket.Conn) error) *Websocket[T]

func (*Websocket[T]) WithOnDisconnected added in v0.1.283

func (self *Websocket[T]) WithOnDisconnected(onDisconnected func()) *Websocket[T]

func (*Websocket[T]) WithUrl added in v0.1.283

func (self *Websocket[T]) WithUrl(url string) *Websocket[T]

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL