Documentation ¶
Overview ¶
Package defines Pipe which is a type of a task. A Pipe works between two tasks, and usually applies any transformations to elements.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BatchOp ¶ added in v0.3.0
type BatchOp[S any] struct { // contains filtered or unexported fields }
A Pipe operator that makes a fixed size of batches.
type FanoutAggregateFn ¶
A function to aggregate results from downstream tasks, and send outputs to the passed output channel. It is ensured that all of the passed elements are created from the same input, and the order of results is the same as the order of registered tasks.
type FanoutMapFn ¶
A function to aggregate results from downstream tasks, and return an output. This is a variation of FanoutAggregateFn that emits only one output.
type FanoutOp ¶
type FanoutOp[S, I, T any] struct { // contains filtered or unexported fields }
A Pipe task that has multiple downstream tasks, and aggregates those results. Each input to this operator is sent to all its downstreams and processed by them, and those results will be passed to this operator's aggregate function. This operator emits elements that the aggregate function returns.
func Fanout ¶
func Fanout[S, I, T any](aggFn FanoutAggregateFn[I, T]) *FanoutOp[S, I, T]
Create a fanout operator from an aggregate function.
func FanoutWithMap ¶
func FanoutWithMap[S, I, T any](mapFn FanoutMapFn[I, T]) *FanoutOp[S, I, T]
Create a fanout operator from a map function.
type FlattenSliceOp ¶ added in v0.4.0
type FlattenSliceOp[S any] struct { // contains filtered or unexported fields }
A Pipe operator that receives slices and emits those elements one by one.
func FlattenSlice ¶ added in v0.4.0
func FlattenSlice[S any]() *FlattenSliceOp[S]
Create a FlattenSlice operator.
type MapOp ¶
type MapOp[S, T any] struct { // contains filtered or unexported fields }
A Pipe task that processes an element and emits a corresponding output.
func MapWithCache ¶
Create a map operator with cache. The caching behavior is defined by the provided cache.Spec.
func (*MapOp) Concurrent ¶
Create a concurrent Pipe from multiple operators that have the same behavior.
Each input is processed whenever it is possible. For this reason, the concurrent Pipe doesn't preserve the order.
func (*MapOp[S, T]) ConcurrentPreservingOrder ¶ added in v0.7.0
Create a concurrent Pipe to apply the map operator.
Unlike a Pipe created with Concurrent, a concurrent Pipe created with this ConcurrentPreservingOrder, preserves the order of elements.
type Pipe ¶
A task that is used as an intermediate process of a data pipeline.
A Pipe usually receives elements from an upstream task via an input channel, process them, and feeds them to a downstream task.
func Concurrent ¶
Create a Pipe from multiple Pipes. The passed Pipes will run concurrently, and those outputs will be merged as outputs of the created Pipe.
Each input is processed whenever it is possible. For this reason, the concurrent Pipe doesn't preserve the order.
func ConcurrentFromFn ¶
Create a Pipe to run the provided PipeFn concurrently. This is a shorthand to create a concurrent Pipe from Pipes with the same function.
type PipeFn ¶
A function that defines a Pipe's behavior. This function should receive elements from the passed input channel, process them, and pass the results to the passed output channel. Please note that this function should not close the passed channels because pipe.FromFn automatically closes the output channel and closing the input channel is the upstream task's responsibility. The whole pipeline will be aborted when the returned error is not nil.
type SelectOp ¶ added in v0.4.0
type SelectOp[S any] struct { // contains filtered or unexported fields }
A Pipe operator that emits only elements that the passed predicate function returns true.
type TakeOp ¶ added in v0.4.0
type TakeOp[S any] struct { // contains filtered or unexported fields }
A Pipe operator that emits only N elements from its upstream task.
type TapOp ¶
A Pipe task that receives an element and emits the same element without any processing. This can be used to make a side effect with an input element, for example, logging elements for debug.
func (*TapOp) Concurrent ¶ added in v0.4.0
Create a concurrent Pipe from multiple operators that have the same behavior.
Each input is processed whenever it is possible. For this reason, the concurrent Pipe doesn't preserve the order.