Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewOrderedProcessor ¶
func NewOrderedProcessor( ctx context.Context, workers int, input chan interface{}, errorStrategy ErrorStrategy, ps Processor, ) ( output chan StreamOutput, )
NewOrderedProcessor will read from input and run all the processors, in order, on top of the input; where possible in parrallell with upp to [workers] of parrallell threads and emit the output in preserved order. Is similar to an concurrent ordered .map() call in other languages. Designed for unbounded streams of data. The consumer is responsible to read out all items from output + errors chan to not end up with memory leaks Since the implementation makes heavy use of channels it is NOT meant for high throughput scenarios but rather when some processing in a pipeline can be done in parallel but order is still required.
func NewOrderedProcessors ¶
func NewOrderedProcessors( ctx context.Context, workers int, input chan interface{}, errorStrategy ErrorStrategy, ps ...Processor, ) ( output chan StreamOutput, )
NewOrderedProcessors works like NewOrderedProcessor but where each processor have their own work queue and thread pools, the next reading from the previous.
Types ¶
type ErrorStrategy ¶
type ErrorStrategy int
ErrorStrategy contains various methods for responding to errors during a concurrent execution
const ( // ErrorsIgnore strategy will ignore errors from the processor and continue processing // input records until the input chan is closed or context is cancelled. Errors will be // forwarded to the error chan ErrorsIgnore ErrorStrategy = iota + 1 // ErrorsAbort strategy will abort all processing after all successfull entires // ahead of the error causing entry has been flushed to the chan. ErrorsAbort // ErrorsDrop strategy will ignore errors from the processor and continue processing // input records until the input chan is closed or context is cancelled. Errors will be // NOT forwarded to the error chan but dropped ErrorsDrop )
type Processor ¶
Processor defines a function which takes an input and yields an output + error tuple
type StreamOutput ¶
type StreamOutput struct { // Index is the index (starting at 0) counting from the source input. Index int // Res holds the last return value of the last processor, or the value of the first // processor which returned a non nil error. Res interface{} // Error of the first processor which returned an non nil error Err error // contains filtered or unexported fields }