Documentation ¶
Index ¶
- func PassThrough[Consumed any]() func(Consumer[Consumed]) Consumer[Consumed]
- type BufferLink
- func (l *BufferLink[Consumed]) Consume(ctx context.Context, consumed Consumed) error
- func (l *BufferLink[Consumed]) NotifyUpstreamCompleted()
- func (l *BufferLink[Consumed]) Starts(ctx context.Context, collector ChainableErrorCollector) error
- func (l *BufferLink[Consumed]) WaitForCompletion() chan error
- type ChainableErrorCollector
- type CloseWrapperLink
- func (w *CloseWrapperLink[Consumed]) Consume(ctx context.Context, consumed Consumed) error
- func (w *CloseWrapperLink[Consumed]) NotifyUpstreamCompleted()
- func (w *CloseWrapperLink[Consumed]) Starts(ctx context.Context, collector ChainableErrorCollector) error
- func (w *CloseWrapperLink[Consumed]) WaitForCompletion() chan error
- type CloserFunc
- type Consumer
- type ConsumerFunc
- type EndLink
- type ErrorObserver
- type Link
- type MultithreadedLink
- func (l *MultithreadedLink[Consumed, Produced]) Consume(ctx context.Context, consumed Consumed) error
- func (l *MultithreadedLink[Consumed, Produced]) NotifyUpstreamCompleted()
- func (l *MultithreadedLink[Consumed, Produced]) Starts(ctx context.Context, collector ChainableErrorCollector) error
- func (l *MultithreadedLink[Consumed, Produced]) WaitForCompletion() chan error
- type ReBufferLink
- type SingleLauncher
- func (s *SingleLauncher[Consumed, Produced]) Consume(ctx context.Context, consumed Consumed) error
- func (s *SingleLauncher[Consumed, Produced]) Process(ctx context.Context, consumed Consumed) chan error
- func (s *SingleLauncher[Consumed, Produced]) Starts(ctx context.Context, collector ChainableErrorCollector) error
- func (s *SingleLauncher[Consumed, Produced]) WaitForCompletion() chan error
- type StartLink
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func PassThrough ¶
PassThrough is a ConsumerBuilder that forwards the .
Types ¶
type BufferLink ¶
type BufferLink[Consumed any] struct { BufferCapacity int Next Link[[]Consumed] // contains filtered or unexported fields }
BufferLink use a channel to collect items and release them as soon as the BufferCapacity is reached. Next is running on a single thread.
func (*BufferLink[Consumed]) Consume ¶
func (l *BufferLink[Consumed]) Consume(ctx context.Context, consumed Consumed) error
func (*BufferLink[Consumed]) NotifyUpstreamCompleted ¶
func (l *BufferLink[Consumed]) NotifyUpstreamCompleted()
func (*BufferLink[Consumed]) Starts ¶
func (l *BufferLink[Consumed]) Starts(ctx context.Context, collector ChainableErrorCollector) error
func (*BufferLink[Consumed]) WaitForCompletion ¶
func (l *BufferLink[Consumed]) WaitForCompletion() chan error
type ChainableErrorCollector ¶
func NewErrorCollector ¶
func NewErrorCollector(observers ...ErrorObserver) ChainableErrorCollector
NewErrorCollector collects errors that occurred during an async process (is thread-safe)
type CloseWrapperLink ¶
type CloseWrapperLink[Consumed any] struct { CloserFuncs []CloserFunc Next Link[Consumed] }
func (*CloseWrapperLink[Consumed]) Consume ¶
func (w *CloseWrapperLink[Consumed]) Consume(ctx context.Context, consumed Consumed) error
func (*CloseWrapperLink[Consumed]) NotifyUpstreamCompleted ¶
func (w *CloseWrapperLink[Consumed]) NotifyUpstreamCompleted()
func (*CloseWrapperLink[Consumed]) Starts ¶
func (w *CloseWrapperLink[Consumed]) Starts(ctx context.Context, collector ChainableErrorCollector) error
func (*CloseWrapperLink[Consumed]) WaitForCompletion ¶
func (w *CloseWrapperLink[Consumed]) WaitForCompletion() chan error
type CloserFunc ¶
type CloserFunc func()
type ConsumerFunc ¶
type EndLink ¶
type EndLink[Consumed any] struct { Consumers []ConsumerFunc[Consumed] // contains filtered or unexported fields }
EndLink runs Operator on the same routines as the previous link, and return errors from the ChainableErrorCollector
func EndOfTheChain ¶
func EndOfTheChain[Consumed any](consumers ...ConsumerFunc[Consumed]) *EndLink[Consumed]
func (*EndLink[Consumed]) NotifyUpstreamCompleted ¶
func (l *EndLink[Consumed]) NotifyUpstreamCompleted()
func (*EndLink[Consumed]) Starts ¶
func (l *EndLink[Consumed]) Starts(ctx context.Context, collector ChainableErrorCollector) error
func (*EndLink[Consumed]) WaitForCompletion ¶
type ErrorObserver ¶
type ErrorObserver func(error)
type MultithreadedLink ¶
type MultithreadedLink[Consumed any, Produced any] struct { NumberOfRoutines int // NumberOfRoutines is the number of routines on which the ConsumerBuilder returned method will be called. Default is 1. ConsumerBuilder func(Consumer[Produced]) Consumer[Consumed] // ConsumerBuilder is the factory function to build the consumer that transforms Consumed into Produced. Use PassThrough if no transformation is needed. Cancellable bool // Cancellable is true if the cancelled context should stop the routine. Default is false. ChannelSize int // ChannelSize is defaulted to 255 Next Link[Produced] // Next will receive the product of the ConsumerBuilder returned method. It is mandatory to have one, use EndOfTheChain to end the chain. // contains filtered or unexported fields }
MultithreadedLink runs the Operator on as many routines as requested.
func (*MultithreadedLink[Consumed, Produced]) Consume ¶
func (l *MultithreadedLink[Consumed, Produced]) Consume(ctx context.Context, consumed Consumed) error
func (*MultithreadedLink[Consumed, Produced]) NotifyUpstreamCompleted ¶
func (l *MultithreadedLink[Consumed, Produced]) NotifyUpstreamCompleted()
func (*MultithreadedLink[Consumed, Produced]) Starts ¶
func (l *MultithreadedLink[Consumed, Produced]) Starts(ctx context.Context, collector ChainableErrorCollector) error
func (*MultithreadedLink[Consumed, Produced]) WaitForCompletion ¶
func (l *MultithreadedLink[Consumed, Produced]) WaitForCompletion() chan error
type ReBufferLink ¶
type ReBufferLink[Consumed any] struct { BufferLink[Consumed] }
type SingleLauncher ¶
type SingleLauncher[Consumed any, Produced any] struct { Next Link[Produced] Function func(ctx context.Context, consumed Consumed) ([]Produced, error) // contains filtered or unexported fields }
SingleLauncher launch the chain process by consume one and only one element.
func (*SingleLauncher[Consumed, Produced]) Consume ¶
func (s *SingleLauncher[Consumed, Produced]) Consume(ctx context.Context, consumed Consumed) error
func (*SingleLauncher[Consumed, Produced]) Process ¶
func (s *SingleLauncher[Consumed, Produced]) Process(ctx context.Context, consumed Consumed) chan error
Process combine Consume and WaitForCompletion to simplify consumption.
func (*SingleLauncher[Consumed, Produced]) Starts ¶
func (s *SingleLauncher[Consumed, Produced]) Starts(ctx context.Context, collector ChainableErrorCollector) error
func (*SingleLauncher[Consumed, Produced]) WaitForCompletion ¶
func (s *SingleLauncher[Consumed, Produced]) WaitForCompletion() chan error
type StartLink ¶
type StartLink[Consumed any] interface { Consumer[Consumed] // Starts is called first, it should create the channels and start the goroutines ; Next links Starts should also be called. Starts(ctx context.Context, collector ChainableErrorCollector) error // WaitForCompletion is called after NotifyUpstreamCompleted and should return the error collected WaitForCompletion() chan error }
Click to show internal directories.
Click to hide internal directories.