Documentation ¶
Index ¶
- type Duplicator
- type Flattener
- type Hole
- type JoinController
- type Joiner
- type Mapper
- type Processor
- func (self *Processor[In, Out]) WithBackoff(maxElapsedTime, maxInterval time.Duration) *Processor[In, Out]
- func (self *Processor[In, Out]) WithBatchSize(batchSize int) *Processor[In, Out]
- func (self *Processor[In, Out]) WithInputChannel(v chan In) *Processor[In, Out]
- func (self *Processor[In, Out]) WithOnFlush(interval time.Duration, f func([]Out) ([]Out, error)) *Processor[In, Out]
- func (self *Processor[In, Out]) WithOnProcess(f func(In) ([]Out, error)) *Processor[In, Out]
- type RepeatedSubtaskFunc
- type Retry
- func (self *Retry) Run(f func() error) error
- func (self *Retry) WithAcceptableDuration(v time.Duration) *Retry
- func (self *Retry) WithContext(ctx context.Context) *Retry
- func (self *Retry) WithMaxElapsedTime(maxElapsedTime time.Duration) *Retry
- func (self *Retry) WithMaxInterval(maxInterval time.Duration) *Retry
- func (self *Retry) WithOnError(v func(err error, isDurationAcceptable bool) error) *Retry
- type SinkTask
- func (self *SinkTask[In]) WithBackoff(maxElapsedTime, maxInterval time.Duration) *SinkTask[In]
- func (self *SinkTask[In]) WithBatchSize(batchSize int) *SinkTask[In]
- func (self *SinkTask[In]) WithInputChannel(v chan In) *SinkTask[In]
- func (self *SinkTask[In]) WithOnFlush(interval time.Duration, f func([]In) error) *SinkTask[In]
- type Task
- func (self *Task) GetWorkerQueueFillFactor() float32
- func (self *Task) Start() (err error)
- func (self *Task) Stop()
- func (self *Task) StopWait()
- func (self *Task) SubmitToWorker(f func())
- func (self *Task) SubmitToWorkerIfEmpty(f func())
- func (self *Task) WithConditionalSubtask(isEnabled bool, t *Task) *Task
- func (self *Task) WithCronSubtaskFunc(cronFormat string, f func() error) *Task
- func (self *Task) WithEnable(v bool) *Task
- func (self *Task) WithLoger(v time.Duration) *Task
- func (self *Task) WithOnAfterStop(f func()) *Task
- func (self *Task) WithOnBeforeStart(f func() error) *Task
- func (self *Task) WithOnStop(f func()) *Task
- func (self *Task) WithPeriodicSubtaskFunc(period time.Duration, f func() error) *Task
- func (self *Task) WithRepeatedSubtaskFunc(period time.Duration, f RepeatedSubtaskFunc) *Task
- func (self *Task) WithStopChannel(channel chan struct{}) *Task
- func (self *Task) WithStopTimeout(v time.Duration) *Task
- func (self *Task) WithSubtask(t *Task) *Task
- func (self *Task) WithSubtaskFunc(f func() error) *Task
- func (self *Task) WithSubtaskSlice(tasks []*Task) *Task
- func (self *Task) WithWorkerPool(maxWorkers int, maxQueueSize int) *Task
- type Watchdog
- type Websocket
- func (self *Websocket[T]) WithKeepAliveTimeout(v time.Duration) *Websocket[T]
- func (self *Websocket[T]) WithMaxMessageSize(maxMessageSize int64) *Websocket[T]
- func (self *Websocket[T]) WithMinTimeBetweenMessages(minTimeBetweenMessages time.Duration) *Websocket[T]
- func (self *Websocket[T]) WithOnConnected(onConnected func(ws *websocket.Conn) error) *Websocket[T]
- func (self *Websocket[T]) WithOnDisconnected(onDisconnected func()) *Websocket[T]
- func (self *Websocket[T]) WithUrl(url string) *Websocket[T]
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Duplicator ¶
Takes item from input channel and in parallel puts to all output channels. Payload needs to be broadcasted into all channels, otherwise it blocks
func NewDuplicator ¶
func NewDuplicator[In any](config *config.Config, name string) (self *Duplicator[In])
func (*Duplicator[In]) NextChannel ¶
func (self *Duplicator[In]) NextChannel() (out chan In)
func (*Duplicator[In]) WithInputChannel ¶
func (self *Duplicator[In]) WithInputChannel(input chan In) *Duplicator[In]
func (*Duplicator[In]) WithOutputChannels ¶
func (self *Duplicator[In]) WithOutputChannels(numChannels, capacity int) *Duplicator[In]
type Flattener ¶
Accepts slices of elements and emits each element individually
func NewFlattener ¶
func (*Flattener[In]) WithCapacity ¶
func (*Flattener[In]) WithInputChannel ¶
type Hole ¶
Processing task. Ensures flush is over before another is started:
func (*Hole[In]) WithBackoff ¶
func (*Hole[In]) WithBatchSize ¶
func (*Hole[In]) WithInputChannel ¶
type JoinController ¶
type JoinController interface { // Returns true if message is the first message of a batch // After this message is sent, the channel will be the only forwarded channel until IsLast() is called IsFirst() bool // Returns true if message is the last message of a batch // After this message is sent the channel will no longer be the only forwarded channel IsLast() bool }
type Joiner ¶
type Joiner[In JoinController] struct { *Task Output chan In // contains filtered or unexported fields }
Takes elements from multiple channels and sends them to a single channel Each channel may take assume and give up control of the output channel by sending messages Passed messages have to implement the interface JoinController for this to work
func NewJoiner ¶
func NewJoiner[In JoinController](config *config.Config, name string) (self *Joiner[In])
func (*Joiner[In]) WithCapacity ¶
func (*Joiner[In]) WithInputChannel ¶
type Mapper ¶
type Mapper[In any, Out any] struct { *Task Output chan Out // contains filtered or unexported fields }
Takes item from input channel, processes it and inserts it into the output channel
func (*Mapper[In, Out]) WithInputChannel ¶
func (*Mapper[In, Out]) WithProcessFunc ¶
func (*Mapper[In, Out]) WithWorkerPool ¶
type Processor ¶
type Processor[In any, Out any] struct { *Task // Output channel that forwards successfuly processed data Output chan []Out // contains filtered or unexported fields }
Implements a two step processing task: - onProcess is called for each incoming data item for pre processing - onFlush is called periodically to handle a batch of processed data
func NewProcessor ¶
func (*Processor[In, Out]) WithBackoff ¶
func (*Processor[In, Out]) WithBatchSize ¶
func (*Processor[In, Out]) WithInputChannel ¶
func (*Processor[In, Out]) WithOnFlush ¶
func (*Processor[In, Out]) WithOnProcess ¶
type RepeatedSubtaskFunc ¶
type Retry ¶
type Retry struct {
// contains filtered or unexported fields
}
Implement operation retrying
func (*Retry) WithAcceptableDuration ¶
func (*Retry) WithMaxElapsedTime ¶
func (*Retry) WithMaxInterval ¶
type SinkTask ¶
Store handles saving data to the database in na robust way. - groups incoming Interactions into batches, - ensures data isn't stuck even if a batch isn't big enough
func NewSinkTask ¶
func (*SinkTask[In]) WithBackoff ¶
func (*SinkTask[In]) WithBatchSize ¶
func (*SinkTask[In]) WithInputChannel ¶
type Task ¶
type Task struct { Config *config.Config Log *logrus.Entry Name string // Stopping IsStopping *atomic.Bool StopChannel chan bool // Context active as long as there's anything running in the task. // Used OUTSIDE the task. CtxRunning context.Context // Context cancelled when Stop() is called. // Used INSIDE the task Ctx context.Context // contains filtered or unexported fields }
Boilerplate for running long lived tasks.
func (*Task) GetWorkerQueueFillFactor ¶
func (*Task) SubmitToWorker ¶
func (self *Task) SubmitToWorker(f func())
func (*Task) SubmitToWorkerIfEmpty ¶
func (self *Task) SubmitToWorkerIfEmpty(f func())
func (*Task) WithConditionalSubtask ¶
func (*Task) WithCronSubtaskFunc ¶
Repeatedly run the callback with a period.
func (*Task) WithEnable ¶
func (*Task) WithOnAfterStop ¶
func (*Task) WithOnBeforeStart ¶
func (*Task) WithOnStop ¶
func (*Task) WithPeriodicSubtaskFunc ¶
Repeatedly run the callback with a period.
func (*Task) WithRepeatedSubtaskFunc ¶
func (self *Task) WithRepeatedSubtaskFunc(period time.Duration, f RepeatedSubtaskFunc) *Task
Callback is run again and again until it returns false or an error. Rerun after period.
func (*Task) WithStopChannel ¶
func (*Task) WithSubtask ¶
func (*Task) WithSubtaskFunc ¶
func (*Task) WithSubtaskSlice ¶
type Watchdog ¶
type Watchdog struct { *Task // contains filtered or unexported fields }
func NewWatchdog ¶
func (*Watchdog) WithOnAfterStop ¶
type Websocket ¶
type Websocket[T any] struct { *Watchdog Output chan *T // Config Url string // contains filtered or unexported fields }
func NewWebsocket ¶
Maintains a persistent websocket connection to the sequencer Gets events that it subscribed for
func (*Websocket[T]) WithKeepAliveTimeout ¶
func (*Websocket[T]) WithMaxMessageSize ¶
func (*Websocket[T]) WithMinTimeBetweenMessages ¶
func (self *Websocket[T]) WithMinTimeBetweenMessages(minTimeBetweenMessages time.Duration) *Websocket[T]
Used to detect dead connections At least this often a message should be received Message may be PONG response for a PING request