Documentation
¶
Overview ¶
Package rivo is a library for stream processing. It provides a simple and flexible way to create and compose streams of data.
There are three main types in this library: Item, Stream, and Pipeable.
Item is a struct which contains a value and an optional error. Just like errors are returned next to the result of a function in synchronous code, they should be passed along into asynchronous code and handled where more appropriate.
Stream is a read only channel of items. As the name suggests, it represents a stream of data.
Pipeable is a function that takes a context.Context and a Stream of one type and returns a Stream of the same or a different type. Pipeables can be composed together using the one of the Pipe functions. Pipeables are divided in three categories: generators, sinks and transformers.
- Generator is a pipeable that does not read from its input stream. It starts a new stream from scratch.
- Sync is a pipeable function that does not emit any items. It is used at the end of a pipeline.
- Transformer is a pipeable that reads from its input stream and emits items to its output stream.
Index ¶
- Variables
- func Segregate[T, U any](p Pipeable[T, U], predicate func(ctx context.Context, item Item[U]) bool) func(context.Context, Stream[T]) (Generator[U], Generator[U])
- func Tee[T any](ctx context.Context, in Stream[T]) (Stream[T], Stream[T])
- type DoFunc
- type FilterFunc
- type ForEachFunc
- type FromFuncFunc
- type FromSeq2Value
- type Generator
- type Item
- type MapFunc
- type None
- type Option
- type Pipeable
- func Batch[T any](n int, maxWait time.Duration, opt ...Option) Pipeable[T, []T]
- func Flatten[T any](opt ...Option) Pipeable[[]T, T]
- func Pipe[A, B, C any](a Pipeable[A, B], b Pipeable[B, C]) Pipeable[A, C]
- func Pipe2[A, B, C any](a Pipeable[A, B], b Pipeable[B, C]) Pipeable[A, C]
- func Pipe3[A, B, C, D any](a Pipeable[A, B], b Pipeable[B, C], c Pipeable[C, D]) Pipeable[A, D]
- func Pipe4[A, B, C, D, E any](a Pipeable[A, B], b Pipeable[B, C], c Pipeable[C, D], d Pipeable[D, E]) Pipeable[A, E]
- func Pipe5[A, B, C, D, E, F any](a Pipeable[A, B], b Pipeable[B, C], c Pipeable[C, D], d Pipeable[D, E], ...) Pipeable[A, F]
- type Stream
- type Sync
- type Transformer
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrEOS = errors.New("end of stream")
Functions ¶
func Segregate ¶
func Segregate[T, U any](p Pipeable[T, U], predicate func(ctx context.Context, item Item[U]) bool) func(context.Context, Stream[T]) (Generator[U], Generator[U])
Segregate returns a function that returns two pipeables, where the first pipeable emits items that pass the predicate, and the second pipeable emits items that do not pass the predicate.
Example ¶
package main import ( "context" "fmt" "strconv" "github.com/agiac/rivo" ) func main() { ctx := context.Background() g := rivo.Of("1", "2", "3", "4", "5") toInt := rivo.Map(func(ctx context.Context, i rivo.Item[string]) (int, error) { return strconv.Atoi(i.Val) }) p := rivo.Pipe(g, toInt) even, odd := rivo.Segregate(p, func(ctx context.Context, item rivo.Item[int]) bool { return item.Val%2 == 0 })(ctx, nil) evens := make([]int, 0) odds := make([]int, 0) <-rivo.Connect( rivo.Pipe(even, rivo.Do(func(ctx context.Context, i rivo.Item[int]) { evens = append(evens, i.Val) })), rivo.Pipe(odd, rivo.Do(func(ctx context.Context, i rivo.Item[int]) { odds = append(odds, i.Val) })), )(ctx, nil) for _, i := range append(evens, odds...) { fmt.Println(i) } }
Output: 2 4 1 3 5
func Tee ¶
Tee returns two streams that each receive a copy of each item from the input stream. It is equivalent to TeeN(ctx, in, 2).
Example ¶
package main import ( "context" "fmt" "github.com/agiac/rivo" "sync" ) func main() { ctx := context.Background() in := rivo.Of("hello", "hello", "hello")(ctx, nil) out1, out2 := rivo.Tee(ctx, in) wg := sync.WaitGroup{} wg.Add(2) go func() { defer wg.Done() for i := range out1 { fmt.Println(i.Val) } }() go func() { defer wg.Done() for i := range out2 { fmt.Println(i.Val) } }() wg.Wait() }
Output: hello hello hello hello hello hello
Types ¶
type FromSeq2Value ¶
type FromSeq2Value[T, U any] struct { Val1 T Val2 U }
type Generator ¶
Generator is a pipeable function that does not read from its input stream. It starts a new stream from scratch.
func FromFunc ¶
func FromFunc[T any](f FromFuncFunc[T], options ...Option) Generator[T]
FromFunc returns a generator Pipeable that emits items generated by the given function. The input stream is ignored. The returned stream will emit items until the function returns ErrEOS.
Example ¶
ctx := context.Background() count := atomic.Int32{} genFn := func(ctx context.Context) (int32, error) { value := count.Add(1) if value > 5 { return 0, ErrEOS } return value, nil } in := FromFunc(genFn) s := in(ctx, nil) for item := range s { fmt.Println(item.Val) }
Output: 1 2 3 4 5
func FromSeq ¶
Example ¶
ctx := context.Background() seq := slices.Values([]int{1, 2, 3, 4, 5}) in := FromSeq(seq) s := in(ctx, nil) for item := range s { fmt.Println(item.Val) }
Output: 1 2 3 4 5
type Item ¶
type Item[T any] struct { // Val is the value of the item when there is no error. Val T // Err is the optional error of the item. Err error }
Item represents a single item in a data stream. It contains a value of type T and an optional error.
type Option ¶
type Option func(*options)
Option is a configuration option for a Pipeable.
func WithBufferSize ¶
WithBufferSize sets the size of the output channel buffer. The default is 0 (unbuffered).
func WithOnBeforeClose ¶
WithOnBeforeClose sets a function that will be called before the Pipeable output channel is closed.
func WithPoolSize ¶
WithPoolSize sets the number of goroutines that will be used to process items. The default is 1.
func WithStopOnError ¶
WithStopOnError determines whether the Pipeable should stop processing items when an error occurs. The default is false.
type Pipeable ¶
Pipeable is a function that takes a context and a stream and returns a stream. It is the building block of a data pipeline.
func Batch ¶ added in v0.1.0
Batch returns a Pipeable that batches items from the input Stream into slices of n items. If the batch is not full after maxWait, it will be sent anyway. Any error in the input Stream will be propagated to the output Stream immediately.
Example ¶
ctx := context.Background() in := Of(1, 2, 3, 4, 5) b := Batch[int](2, time.Second) p := Pipe(in, b) for item := range p(ctx, nil) { fmt.Printf("%v\n", item.Val) }
Output: [1 2] [3 4] [5]
func Flatten ¶ added in v0.2.0
Flatten returns a Pipeable that flattens a Stream of slices into a Stream of individual items.
Example ¶
ctx := context.Background() in := Of([]int{1, 2}, []int{3, 4}, []int{5}) f := Flatten[int]() p := Pipe(in, f) for item := range p(ctx, nil) { fmt.Printf("%v\n", item.Val) }
Output: 1 2 3 4 5
func Pipe ¶
Pipe pipes two pipeable functions together. It is a convenience function that calls Pipe2.
Example ¶
ctx := context.Background() a := Of(1, 2, 3, 4, 5) b := Map(func(ctx context.Context, i Item[int]) (int, error) { return i.Val + 1, nil }) p := Pipe(a, b) s := p(ctx, nil) for item := range s { fmt.Println(item.Val) }
Output: 2 3 4 5 6
type Stream ¶
Stream represents a data stream of items. It is a read only channel of Item[T].
type Sync ¶
Sync is a pipeable function that does not emit any items. It is used at the end of a pipeline.
func Connect ¶
Connect returns a Sync that applies the given syncs to the input stream concurrently. The output stream will not emit any items, and it will be closed when the input stream is closed or the context is done.
Example ¶
package main import ( "context" "fmt" "strings" "github.com/agiac/rivo" ) func main() { ctx := context.Background() g := rivo.Of("Hello", "Hello", "Hello") capitalize := rivo.Map(func(ctx context.Context, i rivo.Item[string]) (string, error) { return strings.ToUpper(i.Val), nil }) lowercase := rivo.Map(func(ctx context.Context, i rivo.Item[string]) (string, error) { return strings.ToLower(i.Val), nil }) resA := make([]string, 0) a := rivo.Do(func(ctx context.Context, i rivo.Item[string]) { resA = append(resA, i.Val) }) resB := make([]string, 0) b := rivo.Do(func(ctx context.Context, i rivo.Item[string]) { resB = append(resB, i.Val) }) p1 := rivo.Pipe(capitalize, a) p2 := rivo.Pipe(lowercase, b) <-rivo.Connect(p1, p2)(ctx, g(ctx, nil)) for _, s := range resA { fmt.Println(s) } for _, s := range resB { fmt.Println(s) } }
Output: HELLO HELLO HELLO hello hello hello
func Do ¶
Do returns a Sync that applies the given function to each item in the stream. The output stream will not emit any items, and it will be closed when the input stream is closed or the context is done.
Example ¶
ctx := context.Background() in := make(chan Item[int]) go func() { defer close(in) in <- Item[int]{Val: 1} in <- Item[int]{Val: 2} in <- Item[int]{Err: errors.New("error 1")} in <- Item[int]{Val: 4} in <- Item[int]{Err: errors.New("error 2")} }() d := Do(func(ctx context.Context, i Item[int]) { if i.Err != nil { fmt.Printf("ERROR: %v\n", i.Err) } }) <-d(ctx, in)
Output: ERROR: error 1 ERROR: error 2
type Transformer ¶
Transformer is a pipeable that reads from its input stream and emits items to its output stream.
func Filter ¶
func Filter[T any](f FilterFunc[T], opt ...Option) Transformer[T, T]
Filter returns a Transformer that filters the input stream using the given function.
Example ¶
ctx := context.Background() in := Of(1, 2, 3, 4, 5) onlyEven := Filter(func(ctx context.Context, i Item[int]) (bool, error) { // Always check for errors if i.Err != nil { return true, i.Err // Propagate the error } return i.Val%2 == 0, nil }) p := Pipe(in, onlyEven) s := p(ctx, nil) for item := range s { fmt.Println(item.Val) }
Output: 2 4
func ForEach ¶
func ForEach[T any](f ForEachFunc[T], opt ...Option) Transformer[T, struct{}]
ForEach returns a Transformer that applies a function to each item from the input stream. It is intended for side effect and the output stream will only emit the errors returned by the function.
Example ¶
ctx := context.Background() g := Of(1, 2, 3, 4, 5) f := ForEach(func(ctx context.Context, i Item[int]) error { // Do some side effect // ... // Simulate an error if i.Val == 3 { return fmt.Errorf("an error") } return nil }) s := Pipe(g, f)(ctx, nil) for item := range s { fmt.Printf("item: %v; error: %v\n", item.Val, item.Err) }
Output: item: {}; error: an error
func Map ¶
func Map[T, U any](f MapFunc[T, U], opt ...Option) Transformer[T, U]
Map returns a Transformer that applies a function to each item from the input stream.
Example ¶
ctx := context.Background() in := Of(1, 2, 3, 4, 5) double := Map(func(ctx context.Context, i Item[int]) (int, error) { // Always check for errors if i.Err != nil { return 0, i.Err // Propagate the error } return i.Val * 2, nil }) p := Pipe(in, double) s := p(ctx, nil) for item := range s { fmt.Println(item.Val) }
Output: 2 4 6 8 10