concurrent

package
v2.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 22, 2023 License: Apache-2.0 Imports: 2 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewOrderedProcessor

func NewOrderedProcessor(
	ctx context.Context,
	workers int,
	input chan interface{},
	errorStrategy ErrorStrategy,
	ps Processor,
) (
	output chan StreamOutput,
)

NewOrderedProcessor will read from input and run all the processors, in order, on top of the input; where possible in parrallell with upp to [workers] of parrallell threads and emit the output in preserved order. Is similar to an concurrent ordered .map() call in other languages. Designed for unbounded streams of data. The consumer is responsible to read out all items from output + errors chan to not end up with memory leaks Since the implementation makes heavy use of channels it is NOT meant for high throughput scenarios but rather when some processing in a pipeline can be done in parallel but order is still required.

func NewOrderedProcessors

func NewOrderedProcessors(
	ctx context.Context,
	workers int,
	input chan interface{},
	errorStrategy ErrorStrategy,
	ps ...Processor,
) (
	output chan StreamOutput,
)

NewOrderedProcessors works like NewOrderedProcessor but where each processor have their own work queue and thread pools, the next reading from the previous.

Types

type ErrorStrategy

type ErrorStrategy int

ErrorStrategy contains various methods for responding to errors during a concurrent execution

const (
	// ErrorsIgnore strategy will ignore errors from the processor and continue processing
	// input records until the input chan is closed or context is cancelled. Errors will be
	// forwarded to the error chan
	ErrorsIgnore ErrorStrategy = iota + 1

	// ErrorsAbort strategy will abort all processing after all successfull entires
	// ahead of the error causing entry has been flushed to the chan.
	ErrorsAbort

	// ErrorsDrop strategy will ignore errors from the processor and continue processing
	// input records until the input chan is closed or context is cancelled. Errors will be
	// NOT forwarded to the error chan but dropped
	ErrorsDrop
)

type Processor

type Processor func(ctx context.Context, intput interface{}) (interface{}, error)

Processor defines a function which takes an input and yields an output + error tuple

type StreamOutput

type StreamOutput struct {
	// Index is the index (starting at 0) counting from the source input.
	Index int

	// Res holds the last return value of the last processor, or the value of the first
	// processor which returned a non nil error.
	Res interface{}

	// Error of the first processor which returned an non nil error
	Err error
	// contains filtered or unexported fields
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL