Documentation
¶
Index ¶
- func Buffered(size int) options.Buffered
- func Concurrent(concurrency int) options.Concurrent
- func ErrorItem[T any](err error) item.Item[T]
- func Item[T any](value T, err error, ctx context.Context) item.Item[T]
- func KeepFirst() options.Keep
- func KeepLast() options.Keep
- func NewOrderingBuffer[T any, R any](node workerNode[T, R], concurrency int, bufferSize int) *orderingBuffer[T, R]
- func Ordered(orderBufferSize int) options.Ordered
- func Reduce[T any, R any](input *Channel[T], reducer func(R, T) R) <-chan R
- func ToMap[T any, K comparable](input *Channel[T], getKey func(T) K, opts ...options.ToMapOption) <-chan map[K]T
- func ValueItem[T any](value T) item.Item[T]
- type Channel
- func Batch[T any](input *Channel[T], size int, timeout time.Duration) *Channel[[]T]
- func Concat[T any](inputs ...*Channel[T]) *Channel[T]
- func Distinct[T any, K comparable](input *Channel[T], getKey func(T) K) *Channel[T]
- func FlatMap[T any, R any](input *Channel[T], mapper func(T) *Channel[R], opts ...options.FlatMapOption) *Channel[R]
- func FromGenerator[T any](pipeline *Pipeline, generator func(i uint64) T) *Channel[T]
- func FromGoChannel[T any](pipeline *Pipeline, channel <-chan T) *Channel[T]
- func FromRange[T constraints.Integer](pipeline *Pipeline, start T, end T) *Channel[T]
- func FromSlice[T any](pipeline *Pipeline, slice []T) *Channel[T]
- func Map[T any, R any](input *Channel[T], mapper func(T) R, opts ...options.MapOption) *Channel[R]
- func Merge[T any](inputs ...*Channel[T]) *Channel[T]
- func Wrap[T any](input *Channel[T]) *Channel[item.Item[T]]
- func (input *Channel[T]) All(predicate func(T) bool) <-chan bool
- func (input *Channel[T]) Any(predicate func(T) bool) <-chan bool
- func (input *Channel[T]) Broadcast(numOutputs int, opts ...options.BroadcastOption) []*Channel[T]
- func (input *Channel[T]) Buffer(n int) *Channel[T]
- func (input *Channel[T]) Count() <-chan int64
- func (input *Channel[T]) Filter(predicate func(T) bool, opts ...options.FilterOption) *Channel[T]
- func (input *Channel[T]) ForEach(function func(T), opts ...options.ForEachOption) <-chan struct{}
- func (input *Channel[T]) Interval(interval func(value T) time.Duration) *Channel[T]
- func (input *Channel[T]) Last() <-chan T
- func (input *Channel[T]) None(predicate func(T) bool) <-chan bool
- func (c *Channel[T]) Pipeline() *Pipeline
- func (input *Channel[T]) Skip(n uint64) *Channel[T]
- func (input *Channel[T]) Split(numOutputs int, opts ...options.SplitOption) []*Channel[T]
- func (input *Channel[T]) Take(n uint64) *Channel[T]
- func (input *Channel[T]) Tap(function func(T), opts ...options.TapOption) *Channel[T]
- func (input *Channel[T]) ToGoChannel() <-chan T
- func (input *Channel[T]) ToSlice() <-chan []T
- type Config
- type Pipeline
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Concurrent ¶ added in v0.1.22
func Concurrent(concurrency int) options.Concurrent
func NewOrderingBuffer ¶ added in v0.2.0
func Reduce ¶
Reduce performs a stateful reduction of the input values. The reducer receives the current state and the current value, and must return the new state. The final state is sent to the returned channel when all input values have been processed, or the pipeline is canceled.
Example. Calculating the sum of all input values:
output := Reduce(input, func(acc int64, value int) int64 { return acc + int64(value) }) input : 0--1--2--3--X output: ------------6
func ToMap ¶
func ToMap[T any, K comparable](input *Channel[T], getKey func(T) K, opts ...options.ToMapOption) <-chan map[K]T
ToMap puts all values coming from the input channel in a map, using the getKey parameter to calculate the key. The resulting map is sent to the returned channel when all input values have been processed, or the pipeline is canceled.
Example:
output := ToMap(input, func(value string) string { return strings.Split(value, "_")[0] }) input : A_0--B_1--C_2--X output: ---------------{A:A_0, B:B_1, C:C_2}
Types ¶
type Channel ¶
type Channel[T any] struct { // contains filtered or unexported fields }
A Channel is a wrapper for a Go channel. It provides chainable methods to construct pipelines, but conceptually it must be seen as a nothing but an enhanced Go channel.
func Batch ¶
Batch batches input values in slices and sends those slices to the output channel Batches can be limited by size and by time. Size/time are ignored if they are 0
Example:
output := Batch(input, 3, 0) input : 0--1----2----------3------4--5----------6--7----X output: --------{1-2-3}--------------{3-4-5}-------{6-7}X
func Concat ¶
Concat concatenates multiple input channels to a single output channel. Channels are consumed in order, e.g., the second channel won't be consumed until the first channel is closed.
Example:
output := Concat(input1, input2) input 1: 0----1----2------3-X input 2: -----5------6--------------7--X output : 0----1----2------3-5-6-----7--X
func Distinct ¶
func Distinct[T any, K comparable](input *Channel[T], getKey func(T) K) *Channel[T]
Distinct sends only input values for which the key hasn't been seen before to the output channel. It uses an internal map to keep track of all keys seen, so keep in mind that it could exhaust memory if too many distinct values are received.
Example:
output := Distinct(input, func(value int) int { return value }) input : 0--1--2--1--3--2-X output: 0--1--2-----3----X
func FlatMap ¶
func FlatMap[T any, R any](input *Channel[T], mapper func(T) *Channel[R], opts ...options.FlatMapOption) *Channel[R]
FlatMap transforms every input value into a Channel and for each of those, it sends all values to the output channel.
Example:
output := FlatMap(input, func(i int) *Channel[int] { return FromSlice([]int{i, i + 10}) }) input : 0------1------2------3------4------5------X output: 0-10---1-11---2-12---3-13---4-14---5-15---X
func FromGenerator ¶
FromGenerator creates a Channel from a stateless generator function. Values returned by the function are sent to the channel in order.
func FromGoChannel ¶
FromGoChannel creates a Channel from a Go channel
func FromRange ¶
func FromRange[T constraints.Integer](pipeline *Pipeline, start T, end T) *Channel[T]
FromRange creates a Channel from a range of integers. All integers between start and end (both inclusive) are sent to the channel in order
func FromSlice ¶
FromSlice creates a Channel from a slice. All values in the slice are sent to the channel in order
func Map ¶
Map transforms every input value with a mapper function and sends the results to the output channel.
Example:
output := Map(input, func(i int) int { return i + 10 }) input : 0--1--2--3--4--5--X output: 10-11-12-13-14-15-X
func Merge ¶
Merge merges multiple input channels to a single output channel. Values from input channels are sent to the output channel as they arrive, with no specific priority.
Example:
output := Merge(input1, input2) input1: 0----1----2------3-X input2: -----5------6------X output: 0----5-1--2-6----3-X
func Wrap ¶
Wrap wraps every input value T in an Item[T] and sends it to the output channel. Item[T] is used mostly to represent items that can have either a value or an error. Another use for Item[T] is using the Context in it and enrich it in successive operators.
func (*Channel[T]) All ¶
All determines if all input values match the predicate. If all values match the predicate, true is sent to the returned channel when all input values have been processed, or the pipeline is canceled. If instead some value does not match the predicate, false is immediately sent to the returned channel and no more input values are read.
Example 1:
output := input.All(func(value int) bool { return value < 4 }) input : 0--1--2--3--X output: ------------true
Example 2:
output := input.All(func(value int) bool { return value < 2 }) input : 0--1--2--3--X output: ------false
func (*Channel[T]) Any ¶
Any determines if any input value matches the predicate. If no value matches the predicate, false is sent to the returned channel when all input values have been processed, or the pipeline is canceled. If instead some value is found to match the predicate, true is immediately sent to the returned channel and no more input values are read.
Example 1:
output := input.Any(func(value int) bool { return value > 3 }) input : 0--1--2--3--X output: ------------false
Example 2:
output := input.Any(func(value int) bool { return value >= 2 }) input : 0--1--2--3--X output: ------true
func (*Channel[T]) Broadcast ¶
func (input *Channel[T]) Broadcast(numOutputs int, opts ...options.BroadcastOption) []*Channel[T]
Broadcast sends each input value to every output channel. The next input value is not read by this operator until all output channels have read the current one. Bear in mind that if one of the output channels is a slow consumer, it may block the other consumers. This is a particularly annoying type of backpressure, cause not only does it block the input, it also blocks other consumers. To avoid this, consider using options.Buffered and the output channels will be buffered, with no need for an extra Buffer operator.
Example:
outputs := input.Broadcast(2, Buffered(4)) input : 0--1--2--3--4--5---X output1: 0--1--2--3--4--5---X output2: 0--1--2--3--4--5---X
func (*Channel[T]) Buffer ¶
Buffer transparently passes input values to the output channel, but the output channel is buffered. It is useful to avoid backpressure from slow consumers.
func (*Channel[T]) Count ¶
Count counts input values and sends the final count to the output channel. The final count is sent to the return channel when all input values have been processed, or the pipeline is canceled.
Example:
output := input.ToGoChannel() input : 9--8--7--6--X output: ------------4
func (*Channel[T]) Filter ¶
func (input *Channel[T]) Filter(predicate func(T) bool, opts ...options.FilterOption) *Channel[T]
Filter sends to the output channel only the input values that match the predicate.
Example:
output := input.Filter(func(i int) bool { return i%2==1 }) input : 0--1--2--3--4--5-X output: ---1-----3-----5-X
func (*Channel[T]) ForEach ¶
func (input *Channel[T]) ForEach(function func(T), opts ...options.ForEachOption) <-chan struct{}
ForEach calls the function passed as parameter for every value coming from the input channel. The returned channel will close when all input values have been processed, or the pipeline is canceled.
func (*Channel[T]) Interval ¶
Interval transparently passes all input values to the output channel, but a time interval is awaited after each element before sending another one. No value is sent to the output while that interval is active. This operator is prone to generating backpressure, so use it with care, and consider adding a Buffer before it.
Example(assume each hyphen is 1 ms):
output := input.Interval(4*time.Millisecond) input : 0--1--2--------------3--4--5--X output: 0----1----2----------3----4----5-X
func (*Channel[T]) Last ¶
func (input *Channel[T]) Last() <-chan T
Last sends the last value received from the input channel to the output channel. The last value is sent to the returned channel when all input values have been processed, or the pipeline is canceled.
Example:
output := input.Last() input : 0--1--2--3------X output: ----------------3
func (*Channel[T]) None ¶
None determines if no input value matches the predicate. If no value matches the predicate, true is sent to the returned channel when all input values have been processed, or the pipeline is canceled. If instead some value matches the predicate, false is immediately sent to the returned channel and no more input values are read.
Example 1:
output := input.None(func(value int) bool { return value > 3 }) input : 0--1--2--3--X output: ------------true
Example 2:
output := input.None(func(value int) bool { return value >= 2 }) input : 0--1--2--3--X output: ------false
func (*Channel[T]) Skip ¶
Skip skips the first n input values, and then starts sending values from n+1 on to the output channel
Example:
output := input.Skip(2) input : 0--1--2--3--4--5-X output: ------2--3-----5-X
func (*Channel[T]) Split ¶ added in v0.2.0
func (input *Channel[T]) Split(numOutputs int, opts ...options.SplitOption) []*Channel[T]
Split sends each input value to any of the output channels, with no specific priority.
Example:
outputs := input.Split(2, Buffered(4)) input : 0--1--2--3--4--5---X output1: 0-----2--3-----5---X output2: ---1--------4------X
func (*Channel[T]) Take ¶
Take sends the first n input values to the output channel, and then stops processing and closes the output channel.
Example:
output := input.Take(3) input : 0--1--2--3--4--5-X output: 0--1--2-X
func (*Channel[T]) Tap ¶
Tap runs a function as a side effect for each input value, and then sends the input values transparently to the output channel. A common use case is logging.
func (*Channel[T]) ToGoChannel ¶
func (input *Channel[T]) ToGoChannel() <-chan T
ToGoChannel sends all values from the input channel to the returned Go channel. The returned Go channel closes when all input values have been processed, or the pipeline is canceled.
Example:
output := input.ToGoChannel() input : 0--1--2--3--X output: 0--1--2--3--X
func (*Channel[T]) ToSlice ¶
func (input *Channel[T]) ToSlice() <-chan []T
ToSlice puts all values coming from the input channel in a slice. The resulting slice is sent to the returned channel when all input values have been processed, or the pipeline is canceled. The slice may have partial results if the pipeline failed, so you must remember to check the pipeline's Error() method.
Example:
output := input.ToSlice() input : 0--1--2--3--X output: ------------{0,1,2,3}
type Config ¶
type Config struct { // Context is used by a Pipeline for cancellation. // If the context gets cancelled, the pipeline gets canceled too. Context context.Context // StartManually determines whether [Pipeline.Start] must be called manually. // If false, the first sink operator(ForEach, ToSlice, etc) to be created in the pipeline automatically starts it. // If true, the pipeline will be dormant until [Pipeline.Start] is called. StartManually bool }
A Config can be used to create a pipeline with certain settings
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
A Pipeline is a container for the classic Pipelines and Cancellation pattern. Pipelines are safe to use from multiple goroutines.
Pipeline works as a coordinator or context for multiple operators and Channels. It can also be seen as a sort of context and passed around as such.
func New ¶
New returns a pipeline with the given backing context. StartManually is false by default, meaning the pipeline will start when a sink operator(ForEach, ToSlice, etc) is created for it.
func NewPipeline ¶
NewPipeline returns a Pipeline with the given jpipe.Config
func (*Pipeline) Done ¶
func (p *Pipeline) Done() <-chan struct{}
Done returns a channel that's close when the pipeline either completed successfully or failed.
func (*Pipeline) Error ¶
Error returns the error in the pipeline if any. It returns nil if the pipeline is still running, or it completed successfully.