chain

package
v1.7.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2025 License: AGPL-3.0 Imports: 4 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func PassThrough

func PassThrough[Consumed any]() func(Consumer[Consumed]) Consumer[Consumed]

PassThrough is a ConsumerBuilder that forwards the .

Types

type BufferLink[Consumed any] struct {
	BufferCapacity int
	Next           Link[[]Consumed]
	// contains filtered or unexported fields
}

BufferLink use a channel to collect items and release them as soon as the BufferCapacity is reached. Next is running on a single thread.

func (*BufferLink[Consumed]) Consume

func (l *BufferLink[Consumed]) Consume(ctx context.Context, consumed Consumed) error

func (*BufferLink[Consumed]) NotifyUpstreamCompleted

func (l *BufferLink[Consumed]) NotifyUpstreamCompleted()

func (*BufferLink[Consumed]) Starts

func (l *BufferLink[Consumed]) Starts(ctx context.Context, collector ChainableErrorCollector) error

func (*BufferLink[Consumed]) WaitForCompletion

func (l *BufferLink[Consumed]) WaitForCompletion() chan error

type ChainableErrorCollector

type ChainableErrorCollector interface {
	OnError(err error)
	Error() error
}

func NewErrorCollector

func NewErrorCollector(observers ...ErrorObserver) ChainableErrorCollector

NewErrorCollector collects errors that occurred during an async process (is thread-safe)

type CloseWrapperLink[Consumed any] struct {
	CloserFuncs []CloserFunc
	Next        Link[Consumed]
}

func (*CloseWrapperLink[Consumed]) Consume

func (w *CloseWrapperLink[Consumed]) Consume(ctx context.Context, consumed Consumed) error

func (*CloseWrapperLink[Consumed]) NotifyUpstreamCompleted

func (w *CloseWrapperLink[Consumed]) NotifyUpstreamCompleted()

func (*CloseWrapperLink[Consumed]) Starts

func (w *CloseWrapperLink[Consumed]) Starts(ctx context.Context, collector ChainableErrorCollector) error

func (*CloseWrapperLink[Consumed]) WaitForCompletion

func (w *CloseWrapperLink[Consumed]) WaitForCompletion() chan error

type CloserFunc

type CloserFunc func()

type Consumer

type Consumer[Consumed any] interface {
	Consume(ctx context.Context, consumed Consumed) error
}

type ConsumerFunc

type ConsumerFunc[Consumed any] func(ctx context.Context, consumed Consumed) error

func (ConsumerFunc[Consumed]) Consume

func (c ConsumerFunc[Consumed]) Consume(ctx context.Context, consumed Consumed) error
type EndLink[Consumed any] struct {
	Consumers []ConsumerFunc[Consumed]
	// contains filtered or unexported fields
}

EndLink runs Operator on the same routines as the previous link, and return errors from the ChainableErrorCollector

func EndOfTheChain

func EndOfTheChain[Consumed any](consumers ...ConsumerFunc[Consumed]) *EndLink[Consumed]

func (*EndLink[Consumed]) Consume

func (l *EndLink[Consumed]) Consume(ctx context.Context, produced Consumed) error

func (*EndLink[Consumed]) NotifyUpstreamCompleted

func (l *EndLink[Consumed]) NotifyUpstreamCompleted()

func (*EndLink[Consumed]) Starts

func (l *EndLink[Consumed]) Starts(ctx context.Context, collector ChainableErrorCollector) error

func (*EndLink[Consumed]) WaitForCompletion

func (l *EndLink[Consumed]) WaitForCompletion() chan error

type ErrorObserver

type ErrorObserver func(error)
type Link[Consumed any] interface {
	StartLink[Consumed]

	// NotifyUpstreamCompleted is called when the previous link will not call Consume anymore
	NotifyUpstreamCompleted()
}
type MultithreadedLink[Consumed any, Produced any] struct {
	NumberOfRoutines int                                         // NumberOfRoutines is the number of routines on which the ConsumerBuilder returned method will be called. Default is 1.
	ConsumerBuilder  func(Consumer[Produced]) Consumer[Consumed] // ConsumerBuilder is the factory function to build the consumer that transforms Consumed into Produced. Use PassThrough if no transformation is needed.
	Cancellable      bool                                        // Cancellable is true if the cancelled context should stop the routine. Default is false.
	ChannelSize      int                                         // ChannelSize is defaulted to 255
	Next             Link[Produced]                              // Next will receive the product of the ConsumerBuilder returned method. It is mandatory to have one, use EndOfTheChain to end the chain.
	// contains filtered or unexported fields
}

MultithreadedLink runs the Operator on as many routines as requested.

func (*MultithreadedLink[Consumed, Produced]) Consume

func (l *MultithreadedLink[Consumed, Produced]) Consume(ctx context.Context, consumed Consumed) error

func (*MultithreadedLink[Consumed, Produced]) NotifyUpstreamCompleted

func (l *MultithreadedLink[Consumed, Produced]) NotifyUpstreamCompleted()

func (*MultithreadedLink[Consumed, Produced]) Starts

func (l *MultithreadedLink[Consumed, Produced]) Starts(ctx context.Context, collector ChainableErrorCollector) error

func (*MultithreadedLink[Consumed, Produced]) WaitForCompletion

func (l *MultithreadedLink[Consumed, Produced]) WaitForCompletion() chan error
type ReBufferLink[Consumed any] struct {
	BufferLink[Consumed]
}

func (*ReBufferLink[Consumed]) Consume

func (l *ReBufferLink[Consumed]) Consume(ctx context.Context, buf []Consumed) error

type SingleLauncher

type SingleLauncher[Consumed any, Produced any] struct {
	Next     Link[Produced]
	Function func(ctx context.Context, consumed Consumed) ([]Produced, error)
	// contains filtered or unexported fields
}

SingleLauncher launch the chain process by consume one and only one element.

func (*SingleLauncher[Consumed, Produced]) Consume

func (s *SingleLauncher[Consumed, Produced]) Consume(ctx context.Context, consumed Consumed) error

func (*SingleLauncher[Consumed, Produced]) Process

func (s *SingleLauncher[Consumed, Produced]) Process(ctx context.Context, consumed Consumed) chan error

Process combine Consume and WaitForCompletion to simplify consumption.

func (*SingleLauncher[Consumed, Produced]) Starts

func (s *SingleLauncher[Consumed, Produced]) Starts(ctx context.Context, collector ChainableErrorCollector) error

func (*SingleLauncher[Consumed, Produced]) WaitForCompletion

func (s *SingleLauncher[Consumed, Produced]) WaitForCompletion() chan error
type StartLink[Consumed any] interface {
	Consumer[Consumed]

	// Starts is called first, it should create the channels and start the goroutines ; Next links Starts should also be called.
	Starts(ctx context.Context, collector ChainableErrorCollector) error

	// WaitForCompletion is called after NotifyUpstreamCompleted and should return the error collected
	WaitForCompletion() chan error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL