Documentation
¶
Overview ¶
Package rivo is a library for stream processing.
Index ¶
- Variables
- func Segregate[T any](p Pipeline[None, T], predicate func(item Item[T]) bool) (Pipeline[None, T], Pipeline[None, T])
- func SegregateErrors[T any](p Pipeline[None, T]) (Pipeline[None, T], Pipeline[None, T])
- func Tee[T any](p Pipeline[None, T]) (Pipeline[None, T], Pipeline[None, T])
- type BatchOption
- type DoOption
- type ErrorStream
- type FilterOption
- type FromFuncOption
- type FromSeq2Value
- type Item
- type MapOption
- type None
- type Pipeline
- func Batch[T any](n int, opt ...BatchOption) Pipeline[T, []T]
- func Connect[A any](pp ...Pipeline[A, None]) Pipeline[A, None]
- func Do[T any](f func(context.Context, Item[T]), opt ...DoOption) Pipeline[T, None]
- func Filter[T any](f func(context.Context, Item[T]) (bool, error), opt ...FilterOption) Pipeline[T, T]
- func Flatten[T any]() Pipeline[[]T, T]
- func FromFunc[T any](f func(context.Context) (T, error), options ...FromFuncOption) Pipeline[None, T]
- func FromSeq[T any](seq iter.Seq[T]) Pipeline[None, T]
- func FromSeq2[T, U any](seq iter.Seq2[T, U]) Pipeline[None, FromSeq2Value[T, U]]
- func Map[T, U any](f func(context.Context, Item[T]) (U, error), opt ...MapOption) Pipeline[T, U]
- func Of[T any](items ...T) Pipeline[None, T]
- func Pipe[A, B, C any](a Pipeline[A, B], b Pipeline[B, C]) Pipeline[A, C]
- func Pipe2[A, B, C any](a Pipeline[A, B], b Pipeline[B, C]) Pipeline[A, C]
- func Pipe3[A, B, C, D any](a Pipeline[A, B], b Pipeline[B, C], c Pipeline[C, D]) Pipeline[A, D]
- func Pipe4[A, B, C, D, E any](a Pipeline[A, B], b Pipeline[B, C], c Pipeline[C, D], d Pipeline[D, E]) Pipeline[A, E]
- func Pipe5[A, B, C, D, E, F any](a Pipeline[A, B], b Pipeline[B, C], c Pipeline[C, D], d Pipeline[D, E], ...) Pipeline[A, F]
- func TeeN[T any](p Pipeline[None, T], n int) []Pipeline[None, T]
- type Stream
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrEOS = errors.New("end of stream")
Functions ¶
func Segregate ¶
func Segregate[T any](p Pipeline[None, T], predicate func(item Item[T]) bool) (Pipeline[None, T], Pipeline[None, T])
Segregate returns two pipelines, where the first pipeline emits items that pass the predicate, and the second pipeline emits items that do not pass the predicate.
Example ¶
package main import ( "context" "fmt" "strconv" "github.com/agiac/rivo" ) func main() { ctx := context.Background() g := rivo.Of("1", "2", "3", "4", "5") toInt := rivo.Map(func(ctx context.Context, i rivo.Item[string]) (int, error) { return strconv.Atoi(i.Val) }) p := rivo.Pipe(g, toInt) even, odd := rivo.Segregate(p, func(item rivo.Item[int]) bool { return item.Val%2 == 0 }) evens := make([]int, 0) odds := make([]int, 0) <-rivo.Connect( rivo.Pipe(even, rivo.Do(func(ctx context.Context, i rivo.Item[int]) { evens = append(evens, i.Val) })), rivo.Pipe(odd, rivo.Do(func(ctx context.Context, i rivo.Item[int]) { odds = append(odds, i.Val) })), )(ctx, nil) for _, i := range append(evens, odds...) { fmt.Println(i) } }
Output: 2 4 1 3 5
func SegregateErrors ¶ added in v0.4.0
SegregateErrors returns two pipelines, where the first pipeline emits items without errors, and the second pipeline emits items with errors.
func Tee ¶
Tee returns two pipelines that each receive a copy of each item from the input stream.
Example ¶
package main import ( "context" "fmt" "sync" "github.com/agiac/rivo" ) func main() { ctx := context.Background() g := rivo.Of("hello", "hello", "hello") out1, out2 := rivo.Tee(g) wg := sync.WaitGroup{} wg.Add(2) go func() { defer wg.Done() for i := range out1(ctx, nil) { fmt.Println(i.Val) } }() go func() { defer wg.Done() for i := range out2(ctx, nil) { fmt.Println(i.Val) } }() wg.Wait() }
Output: hello hello hello hello hello hello
Types ¶
type BatchOption ¶ added in v0.4.0
type BatchOption func(*batchOptions) error
func BatchBufferSize ¶ added in v0.4.0
func BatchBufferSize(n int) BatchOption
func BatchMaxWait ¶ added in v0.4.0
func BatchMaxWait(d time.Duration) BatchOption
type DoOption ¶ added in v0.4.0
type DoOption func(*doOptions) error
func DoPoolSize ¶ added in v0.4.0
type ErrorStream ¶ added in v0.4.0
type ErrorStream = Stream[struct{}]
type FilterOption ¶ added in v0.4.0
type FilterOption func(*filterOptions) error
func FilterBufferSize ¶ added in v0.4.0
func FilterBufferSize(n int) FilterOption
func FilterPoolSize ¶ added in v0.4.0
func FilterPoolSize(n int) FilterOption
type FromFuncOption ¶ added in v0.4.0
type FromFuncOption func(*fromFuncOptions) error
func FromFuncBufferSize ¶ added in v0.4.0
func FromFuncBufferSize(bufferSize int) FromFuncOption
func FromFuncOnBeforeClose ¶ added in v0.4.0
func FromFuncOnBeforeClose(f func(context.Context) error) FromFuncOption
func FromFuncPoolSize ¶ added in v0.4.0
func FromFuncPoolSize(poolSize int) FromFuncOption
type FromSeq2Value ¶
type FromSeq2Value[T, U any] struct { Val1 T Val2 U }
type Item ¶
type Item[T any] struct { // Val is the value of the item when there is no error. Val T // Err is the optional error of the item. Err error }
Item represents a single item in a data stream. It contains a value of type T and an optional error.
type MapOption ¶ added in v0.4.0
type MapOption func(*mapOptions) error
func MapBufferSize ¶ added in v0.4.0
func MapPoolSize ¶ added in v0.4.0
type None ¶
type None struct{}
None is a type that represents no value. It is typically used as the input type of generator pipeline that does not depend on any input stream or for a sync pipeline that does not emit any items.
type Pipeline ¶ added in v0.3.0
Pipeline is a function that takes a context and a stream and returns a stream of the same type or a different type.
func Batch ¶ added in v0.1.0
func Batch[T any](n int, opt ...BatchOption) Pipeline[T, []T]
Batch returns a Pipeline that batches items from the input Stream into slices of n items. If the batch is not full after maxWait, it will be sent anyway. Any error in the input Stream will be propagated to the output Stream immediately.
Example ¶
ctx := context.Background() in := Of(1, 2, 3, 4, 5) b := Batch[int](2) p := Pipe(in, b) for item := range p(ctx, nil) { fmt.Printf("%v\n", item.Val) }
Output: [1 2] [3 4] [5]
func Connect ¶
Connect returns a sync pipelines that applies the given syncs pipelines to the input stream concurrently. The output stream will not emit any items, and it will be closed when the input stream is closed or the context is done.
Example ¶
package main import ( "context" "fmt" "strings" "github.com/agiac/rivo" ) func main() { ctx := context.Background() g := rivo.Of("Hello", "Hello", "Hello") capitalize := rivo.Map(func(ctx context.Context, i rivo.Item[string]) (string, error) { return strings.ToUpper(i.Val), nil }) lowercase := rivo.Map(func(ctx context.Context, i rivo.Item[string]) (string, error) { return strings.ToLower(i.Val), nil }) resA := make([]string, 0) a := rivo.Do(func(ctx context.Context, i rivo.Item[string]) { resA = append(resA, i.Val) }) resB := make([]string, 0) b := rivo.Do(func(ctx context.Context, i rivo.Item[string]) { resB = append(resB, i.Val) }) p1 := rivo.Pipe(capitalize, a) p2 := rivo.Pipe(lowercase, b) <-rivo.Connect(p1, p2)(ctx, g(ctx, nil)) for _, s := range resA { fmt.Println(s) } for _, s := range resB { fmt.Println(s) } }
Output: HELLO HELLO HELLO hello hello hello
func Do ¶
Do returns a sync pipeline that applies the given function to each item in the stream. The output stream will not emit any items, and it will be closed when the input stream is closed or the context is done.
Example ¶
ctx := context.Background() in := make(chan Item[int]) go func() { defer close(in) in <- Item[int]{Val: 1} in <- Item[int]{Val: 2} in <- Item[int]{Err: errors.New("error 1")} in <- Item[int]{Val: 4} in <- Item[int]{Err: errors.New("error 2")} }() d := Do(func(ctx context.Context, i Item[int]) { if i.Err != nil { fmt.Printf("ERROR: %v\n", i.Err) } }) <-d(ctx, in)
Output: ERROR: error 1 ERROR: error 2
func Filter ¶
func Filter[T any](f func(context.Context, Item[T]) (bool, error), opt ...FilterOption) Pipeline[T, T]
Filter returns a pipeline that filters the input stream using the given function.
Example ¶
ctx := context.Background() in := Of(1, 2, 3, 4, 5) onlyEven := Filter(func(ctx context.Context, i Item[int]) (bool, error) { // Always check for errors if i.Err != nil { return true, i.Err // Propagate the error } return i.Val%2 == 0, nil }) p := Pipe(in, onlyEven) s := p(ctx, nil) for item := range s { fmt.Println(item.Val) }
Output: 2 4
func Flatten ¶ added in v0.2.0
Flatten returns a Pipeline that flattens a Stream of slices into a Stream of individual items.
Example ¶
ctx := context.Background() in := Of([]int{1, 2}, []int{3, 4}, []int{5}) f := Flatten[int]() p := Pipe(in, f) for item := range p(ctx, nil) { fmt.Printf("%v\n", item.Val) }
Output: 1 2 3 4 5
func FromFunc ¶
func FromFunc[T any](f func(context.Context) (T, error), options ...FromFuncOption) Pipeline[None, T]
FromFunc returns a generator Pipeline that emits items generated by the given function. The input stream is ignored. The returned stream will emit items until the function returns ErrEOS.
Example ¶
ctx := context.Background() count := atomic.Int32{} genFn := func(ctx context.Context) (int32, error) { value := count.Add(1) if value > 5 { return 0, ErrEOS } return value, nil } in := FromFunc(genFn) s := in(ctx, nil) for item := range s { fmt.Println(item.Val) }
Output: 1 2 3 4 5
func FromSeq ¶
Example ¶
ctx := context.Background() seq := slices.Values([]int{1, 2, 3, 4, 5}) in := FromSeq(seq) s := in(ctx, nil) for item := range s { fmt.Println(item.Val) }
Output: 1 2 3 4 5
func FromSeq2 ¶
Example ¶
ctx := context.Background() seq := slices.All([]string{"a", "b", "c", "d", "e"}) in := FromSeq2(seq) s := in(ctx, nil) for item := range s { fmt.Printf("%d, %s\n", item.Val.Val1, item.Val.Val2) }
Output: 0, a 1, b 2, c 3, d 4, e
func Map ¶
Map returns a pipeline that applies a function to each item from the input stream.
Example ¶
ctx := context.Background() in := Of(1, 2, 3, 4, 5) double := Map(func(ctx context.Context, i Item[int]) (int, error) { // Always check for errors if i.Err != nil { return 0, i.Err // Propagate the error } return i.Val * 2, nil }) p := Pipe(in, double) s := p(ctx, nil) for item := range s { fmt.Println(item.Val) }
Output: 2 4 6 8 10
func Of ¶
Of returns a generator Pipeline that emits the given items. The input stream is ignored.
Example ¶
ctx := context.Background() in := Of(1, 2, 3, 4, 5) s := in(ctx, nil) for item := range s { fmt.Println(item.Val) }
Output: 1 2 3 4 5
func Pipe ¶
Pipe pipes two pipelines together. It is a convenience function that calls Pipe2.
Example ¶
ctx := context.Background() a := Of(1, 2, 3, 4, 5) b := Map(func(ctx context.Context, i Item[int]) (int, error) { return i.Val + 1, nil }) p := Pipe(a, b) s := p(ctx, nil) for item := range s { fmt.Println(item.Val) }
Output: 2 3 4 5 6
func Pipe4 ¶
func Pipe4[A, B, C, D, E any](a Pipeline[A, B], b Pipeline[B, C], c Pipeline[C, D], d Pipeline[D, E]) Pipeline[A, E]
Pipe4 pipes four pipelines together.