Documentation ¶
Index ¶
- type Duplicator
- type Flattener
- type Hole
- type JoinController
- type Joiner
- type Mapper
- type Processor
- func (self *Processor[In, Out]) WithBackoff(maxElapsedTime, maxInterval time.Duration) *Processor[In, Out]
- func (self *Processor[In, Out]) WithBatchSize(batchSize int) *Processor[In, Out]
- func (self *Processor[In, Out]) WithInputChannel(v chan In) *Processor[In, Out]
- func (self *Processor[In, Out]) WithOnFlush(interval time.Duration, f func([]Out) ([]Out, error)) *Processor[In, Out]
- func (self *Processor[In, Out]) WithOnProcess(f func(In) ([]Out, error)) *Processor[In, Out]
- type Retry
- func (self *Retry) Run(f func() error) error
- func (self *Retry) WithAcceptableDuration(v time.Duration) *Retry
- func (self *Retry) WithContext(ctx context.Context) *Retry
- func (self *Retry) WithMaxElapsedTime(maxElapsedTime time.Duration) *Retry
- func (self *Retry) WithMaxInterval(maxInterval time.Duration) *Retry
- func (self *Retry) WithOnError(v func(err error, isDurationAcceptable bool) error) *Retry
- type SinkTask
- func (self *SinkTask[In]) WithBackoff(maxElapsedTime, maxInterval time.Duration) *SinkTask[In]
- func (self *SinkTask[In]) WithBatchSize(batchSize int) *SinkTask[In]
- func (self *SinkTask[In]) WithInputChannel(v chan In) *SinkTask[In]
- func (self *SinkTask[In]) WithOnFlush(interval time.Duration, f func([]In) error) *SinkTask[In]
- type Task
- func (self *Task) GetWorkerQueueFillFactor() float32
- func (self *Task) Start() (err error)
- func (self *Task) Stop()
- func (self *Task) StopWait()
- func (self *Task) SubmitToWorker(f func())
- func (self *Task) SubmitToWorkerIfEmpty(f func())
- func (self *Task) WithConditionalSubtask(isEnabled bool, t *Task) *Task
- func (self *Task) WithEnable(v bool) *Task
- func (self *Task) WithLoger(v time.Duration) *Task
- func (self *Task) WithOnAfterStop(f func()) *Task
- func (self *Task) WithOnBeforeStart(f func() error) *Task
- func (self *Task) WithOnStop(f func()) *Task
- func (self *Task) WithPeriodicSubtaskFunc(period time.Duration, f func() error) *Task
- func (self *Task) WithRepeatedSubtaskFunc(period time.Duration, f func() (repeat bool, err error)) *Task
- func (self *Task) WithStopChannel(channel chan struct{}) *Task
- func (self *Task) WithStopTimeout(v time.Duration) *Task
- func (self *Task) WithSubtask(t *Task) *Task
- func (self *Task) WithSubtaskFunc(f func() error) *Task
- func (self *Task) WithSubtaskSlice(tasks []*Task) *Task
- func (self *Task) WithWorkerPool(maxWorkers int, maxQueueSize int) *Task
- type Watchdog
- type Websocket
- func (self *Websocket[T]) WithKeepAliveTimeout(v time.Duration) *Websocket[T]
- func (self *Websocket[T]) WithMaxMessageSize(maxMessageSize int64) *Websocket[T]
- func (self *Websocket[T]) WithMinTimeBetweenMessages(minTimeBetweenMessages time.Duration) *Websocket[T]
- func (self *Websocket[T]) WithOnConnected(onConnected func(ws *websocket.Conn) error) *Websocket[T]
- func (self *Websocket[T]) WithOnDisconnected(onDisconnected func()) *Websocket[T]
- func (self *Websocket[T]) WithUrl(url string) *Websocket[T]
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Duplicator ¶
Takes item from input channel and in parallel puts to all output channels. Payload needs to be broadcasted into all channels, otherwise it blocks
func NewDuplicator ¶
func NewDuplicator[In any](config *config.Config, name string) (self *Duplicator[In])
func (*Duplicator[In]) NextChannel ¶
func (self *Duplicator[In]) NextChannel() (out chan In)
func (*Duplicator[In]) WithInputChannel ¶
func (self *Duplicator[In]) WithInputChannel(input chan In) *Duplicator[In]
func (*Duplicator[In]) WithOutputChannels ¶
func (self *Duplicator[In]) WithOutputChannels(numChannels, capacity int) *Duplicator[In]
type Flattener ¶
Accepts slices of elements and emits each element individually
func NewFlattener ¶
func (*Flattener[In]) WithCapacity ¶
func (*Flattener[In]) WithInputChannel ¶
type Hole ¶ added in v0.2.105
Processing task. Ensures flush is over before another is started:
func (*Hole[In]) WithBackoff ¶ added in v0.2.105
func (*Hole[In]) WithBatchSize ¶ added in v0.2.105
func (*Hole[In]) WithInputChannel ¶ added in v0.2.105
type JoinController ¶ added in v0.1.217
type JoinController interface { // Returns true if message is the first message of a batch // After this message is sent, the channel will be the only forwarded channel until IsLast() is called IsFirst() bool // Returns true if message is the last message of a batch // After this message is sent the channel will no longer be the only forwarded channel IsLast() bool }
type Joiner ¶ added in v0.1.217
type Joiner[In JoinController] struct { *Task Output chan In // contains filtered or unexported fields }
Takes elements from multiple channels and sends them to a single channel Each channel may take assume and give up control of the output channel by sending messages Passed messages have to implement the interface JoinController for this to work
func NewJoiner ¶ added in v0.1.217
func NewJoiner[In JoinController](config *config.Config, name string) (self *Joiner[In])
func (*Joiner[In]) WithCapacity ¶ added in v0.1.217
func (*Joiner[In]) WithInputChannel ¶ added in v0.1.217
type Mapper ¶
type Mapper[In any, Out any] struct { *Task Output chan Out // contains filtered or unexported fields }
Takes item from input channel, processes it and inserts it into the output channel
func (*Mapper[In, Out]) WithInputChannel ¶
func (*Mapper[In, Out]) WithProcessFunc ¶
func (*Mapper[In, Out]) WithWorkerPool ¶
type Processor ¶
type Processor[In any, Out any] struct { *Task // Output channel that forwards successfuly processed data Output chan []Out // contains filtered or unexported fields }
Implements a two step processing task: - onProcess is called for each incoming data item for pre processing - onFlush is called periodically to handle a batch of processed data
func NewProcessor ¶
func (*Processor[In, Out]) WithBackoff ¶
func (*Processor[In, Out]) WithBatchSize ¶
func (*Processor[In, Out]) WithInputChannel ¶
func (*Processor[In, Out]) WithOnFlush ¶
func (*Processor[In, Out]) WithOnProcess ¶
type Retry ¶
type Retry struct {
// contains filtered or unexported fields
}
Implement operation retrying
func (*Retry) WithAcceptableDuration ¶
func (*Retry) WithMaxElapsedTime ¶
func (*Retry) WithMaxInterval ¶
type SinkTask ¶
Store handles saving data to the database in na robust way. - groups incoming Interactions into batches, - ensures data isn't stuck even if a batch isn't big enough
func NewSinkTask ¶
func (*SinkTask[In]) WithBackoff ¶
func (*SinkTask[In]) WithBatchSize ¶
func (*SinkTask[In]) WithInputChannel ¶
type Task ¶
type Task struct { Config *config.Config Log *logrus.Entry Name string // Stopping IsStopping *atomic.Bool StopChannel chan bool // Context active as long as there's anything running in the task. // Used OUTSIDE the task. CtxRunning context.Context // Context cancelled when Stop() is called. // Used INSIDE the task Ctx context.Context // contains filtered or unexported fields }
Boilerplate for running long lived tasks.
func (*Task) GetWorkerQueueFillFactor ¶
func (*Task) SubmitToWorker ¶
func (self *Task) SubmitToWorker(f func())
func (*Task) SubmitToWorkerIfEmpty ¶
func (self *Task) SubmitToWorkerIfEmpty(f func())
func (*Task) WithConditionalSubtask ¶
func (*Task) WithEnable ¶
func (*Task) WithOnAfterStop ¶
func (*Task) WithOnBeforeStart ¶
func (*Task) WithOnStop ¶
func (*Task) WithPeriodicSubtaskFunc ¶
Repeatedly run the callback with a period.
func (*Task) WithRepeatedSubtaskFunc ¶
func (self *Task) WithRepeatedSubtaskFunc(period time.Duration, f func() (repeat bool, err error)) *Task
Callback is run again and again until it returns false or an error. Rerun after period.
func (*Task) WithStopChannel ¶ added in v0.2.40
func (*Task) WithStopTimeout ¶ added in v0.2.32
func (*Task) WithSubtask ¶
func (*Task) WithSubtaskFunc ¶
func (*Task) WithSubtaskSlice ¶
type Watchdog ¶
type Watchdog struct { *Task // contains filtered or unexported fields }
func NewWatchdog ¶
func (*Watchdog) WithOnAfterStop ¶ added in v0.1.283
type Websocket ¶ added in v0.1.283
type Websocket[T any] struct { *Watchdog Output chan *T // Config Url string // contains filtered or unexported fields }
func NewWebsocket ¶ added in v0.1.283
Maintains a persistent websocket connection to the sequencer Gets events that it subscribed for
func (*Websocket[T]) WithKeepAliveTimeout ¶ added in v0.1.283
func (*Websocket[T]) WithMaxMessageSize ¶ added in v0.1.283
func (*Websocket[T]) WithMinTimeBetweenMessages ¶ added in v0.1.283
func (self *Websocket[T]) WithMinTimeBetweenMessages(minTimeBetweenMessages time.Duration) *Websocket[T]
Used to detect dead connections At least this often a message should be received Message may be PONG response for a PING request