pipe

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 27, 2023 License: MIT Imports: 8 Imported by: 2

Documentation

Overview

Package defines Pipe which is a type of a task. A Pipe works between two tasks, and usually applies any transformations to elements.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type FanoutAggregateFn

type FanoutAggregateFn[I, T any] func(context.Context, []I, chan<- T) error

A function to aggregate results from downstream tasks, and send outputs to the passed output channel. It is ensured that all of the passed elements are created from the same input, and the order of results is the same as the order of registered tasks.

type FanoutMapFn

type FanoutMapFn[I, T any] func(context.Context, []I) (T, error)

A function to aggregate results from downstream tasks, and return an output. This is a variation of FanoutAggregateFn that emits only one output.

type FanoutOp

type FanoutOp[S, I, T any] struct {
	deferrer.Deferrer
	// contains filtered or unexported fields
}

A Pipe task that has multiple downstream tasks, and aggregates those results. Each input to this operator is sent to all its downstreams and processed by them, and those results will be passed to this operator's aggregate function. This operator emits elements that the aggregate function returns.

func Fanout

func Fanout[S, I, T any](aggFn FanoutAggregateFn[I, T]) *FanoutOp[S, I, T]

Create a fanout operator from an aggregate function.

func FanoutWithMap

func FanoutWithMap[S, I, T any](mapFn FanoutMapFn[I, T]) *FanoutOp[S, I, T]

Create a fanout operator from a map function.

func (*FanoutOp[S, I, T]) Add

func (op *FanoutOp[S, I, T]) Add(t task.Task[S, I], inBuffer, outBuffer int)

Register a task as a downstream of the fanout operator.

func (*FanoutOp[S, I, T]) AsTask

func (op *FanoutOp[S, I, T]) AsTask() task.Task[S, T]

Convert the fanout operator as a task.

func (*FanoutOp[S, I, T]) Run

func (op *FanoutOp[S, I, T]) Run(ctx context.Context, in <-chan S, out chan<- T) error

Run this fanout operator.

type MapFn

type MapFn[S, T any] func(context.Context, S) (T, error)

A function that defines the behavior of a map operator.

type MapOp

type MapOp[S, T any] struct {
	// contains filtered or unexported fields
}

A Pipe task that processes an element and emits a corresponding output.

func Map

func Map[S, T any](fn MapFn[S, T]) *MapOp[S, T]

Create a map operator from a MapFn.

func MapWithCache

func MapWithCache[S, T, K, V any](fn MapFn[S, T], sp cache.Spec[S, T, K, V]) *MapOp[S, T]

Create a map operator with cache. The caching behavior is defined by the provided cache.Spec.

func (*MapOp[S, T]) AsTask

func (op *MapOp[S, T]) AsTask() task.Task[S, T]

Convert the map operator as a task.

func (*MapOp[S, T]) Concurrent

func (op *MapOp[S, T]) Concurrent(concurrency int) Pipe[S, T]

Create a concurrent Pipe from multiple map operators that have the same behavior.

type Pipe

type Pipe[S, T any] task.Task[S, T]

A task that is used as an intermediate process of a data pipeline.

A Pipe usually receives elements from an upstream task via an input channel, process them, and feeds them to a downstream task.

func Concurrent

func Concurrent[S, T any](ps []Pipe[S, T]) Pipe[S, T]

Create a Pipe from multiple Pipes. The passed Pipes will run concurrently, and those outputs will be merged as outputs of the created Pipe.

func ConcurrentFromFn

func ConcurrentFromFn[S, T any](fn PipeFn[S, T], concurrency int) Pipe[S, T]

Create a Pipe to run the provided PipeFn concurrently. This is a shorthand to create a concurrent Pipe from Pipes with the same function.

func FromFn

func FromFn[S any, T any](fn PipeFn[S, T]) Pipe[S, T]

Build a Pipe with a PipeFn.

type PipeFn

type PipeFn[S, T any] func(ctx context.Context, in <-chan S, out chan<- T) error

A function that defines a Pipe's behavior. This function should receive elements from the passed input channel, process them, and pass the results to the passed output channel. Please note that this function should not close the passed channels. The whole pipeline will be aborted when the returned error is not nil.

type TapFn

type TapFn[S any] func(context.Context, S) error

A function that defines the behavior of a tap operator.

type TapOp

type TapOp[S any] struct {
	// contains filtered or unexported fields
}

A Pipe task that receives an element and emits the same element without any processing. This can be used to make a side effect with an input element, for example, logging elements for debug.

func Tap

func Tap[S any](fn TapFn[S]) *TapOp[S]

Create a tap operator from a TapFn.

func (*TapOp[S]) AsTask

func (op *TapOp[S]) AsTask() task.Task[S, S]

Convert the tap operator as a task.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL