rivo

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 8, 2025 License: MIT Imports: 6 Imported by: 0

README

rivo

rivo is a library for stream processing in Go.

NOTE: THIS LIBRARY IS STILL IN ACTIVE DEVELOPMENT AND IS NOT YET PRODUCTION READY.

About

rivo had two major inspirations:

  1. The book "Concurrency in Go";
  2. ReactiveX, in particular the Go and JS libraries;

Compared to these sources, rivo aims to provide better type safety (both "Concurrency in Go" and RxGo were written in a pre-generics era and make a heavy use of interface{}) and a more intuitive API and developer experience (Rx is very powerful, but can be overwhelming for newcomers).

Getting started

Prerequisites

rivo requires Go 1.24 or later.

For the time being you'll need to use the release candidate version of Go 1.24, which can be installed with:

  go install golang.org/dl/go1.24rc1@latest
  go1.24rc1 download
Installation
  go get github.com/agiac/rivo
Basic concepts

rivo has 3 main types, which are the building blocks of the library: Item, Stream and Pipeable.

Item is a struct which contains a value and an optional error. Just like errors are returned next to the result of a function in synchronous code, they should be passed along into asynchronous code and handled where more appropriate.

type Item[T any] struct {
	Val T
	Err error
}

Stream is a read only channel of items. As the name suggests, it represents a stream of data.

type Stream[T any] <-chan Item[T]

Pipeable is a function that takes a context.Context and a Stream of one type and returns a Stream of the same or a different type. Pipeables can be composed together using the one of the Pipe functions.

type Pipeable[T, U any] func(ctx context.Context, stream Stream[T]) Stream[U]

Pipeables are divided in three categories: generators, sinks and transformers.

  • Generator is a pipeable that does not read from its input stream. It starts a new stream from scratch.
  • Sync is a pipeable function that does not emit any items. It is used at the end of a pipeline.
  • Transformer is a pipeable that reads from its input stream and emits items to its output stream.

Here's a basic example:

package main

import (
	"context"
	"fmt"
	"github.com/agiac/rivo"
)

func main() {
	ctx := context.Background()

	// `Of` returns a generator which returns a stream that will emit the provided values
	in := rivo.Of(1, 2, 3, 4, 5)

	// `Filter` returns a pipeable that filters the input stream using the given function.
	onlyEven := rivo.Filter(func(ctx context.Context, i rivo.Item[int]) (bool, error) {
		// Always check for errors
		if i.Err != nil {
			return true, i.Err // Propagate the error
		}

		return i.Val%2 == 0, nil
	})

	log := rivo.Do(func(ctx context.Context, i rivo.Item[int]) {
		if i.Err != nil {
			fmt.Printf("ERROR: %v\n", i.Err)
			return
		}

		fmt.Println(i.Val)
	})

	// `Pipe` composes pipeables together, returning a new pipeable
	p := rivo.Pipe3(in, onlyEven, log)

	// By passing a context and an input channel to our pipeable, we can get the output stream.
	// Since our first pipeable `in` is a generator and does not depend on an input stream, we can pass a nil channel.
	// Also, since log is a sink, we only have to read once from the output channel to know that the pipe has finished.
	<-p(ctx, nil)

	// Expected output:
	// 2
	// 4
}

Pipeable factories

rivo comes with a set of built-in pipeable factories.

Generators
  • Of: returns a pipeable which returns a stream that will emit the provided values;
  • FromFunc: returns a pipeable which returns a stream that will emit the values returned by the provided function;
  • FromSeq and FromSeq2: returns a pipeable which returns a stream that will emit the values from the provided iterator;
Sinks
  • Do: returns a pipeable which performs a side effect for each item in the input stream;
Transformers
  • Filter: returns a pipeable which filters the input stream using the given function;
  • Map: returns a pipeable which maps the input stream using the given function;
  • ForEach: returns a pipeable which applies the given function to each item in the input stream and forwards only the errors;
  • Batch: returns a pipeable which groups the input stream into batches of the provided size;
  • Flatten: returns a pipeable which flattens the input stream of slices;

Besides these, the directories of the library contain more specialized pipeables factories.

Package rivo/io
  • FromReader: returns a pipeable which reads from the provided io.Reader and emits the read bytes;
  • ToWriter: returns a pipeable which writes the input stream to the provided io.Writer;
Package rivo/bufio
  • FromScanner: returns a pipeable which reads from the provided bufio.Scanner and emits the scanned items;
  • ToScanner: returns a pipeable which writes the input stream to the provided bufio.Writer;
Package rivo/csv
  • FromReader: returns a pipeable which reads from the provided csv.Reader and emits the read records;
  • ToWriter: returns a pipeable which writes the input stream to the provided csv.Writer;
Package rivio/errors
  • WithErrorHandler: returns a pipeable that connects the input pipeable to an error handling pipeable.

Optional parameters

Many pipeable factories accepts a common set of optional parameters. These can be provided via functional options.

  double := rivo.Map(
	  func(ctx context.Context, i rivo.Item[int]) (int, error) { return i.Val * 2, nil  },
	  // `Pass additional options to the pipeable
	  rivo.WithBufferSize(1), 
	  rivo.WithPoolSize(runtime.NumCPU()), 
	  )

The currently available options are:

  • WithPoolSize(int): sets the number of goroutines that will be used to process items. Default is 1.
  • WithBufferSize(int): sets the buffer size of the output channel. Default is 0;
  • WithStopOnError(bool): if true, the pipeable will stop processing items when an error is encountered. Default is false.
  • WithOnBeforeClosed(func(context.Context) error): a function that will be called before the output channel is closed.

Higher order pipeables

rivo also provides a set of higher order pipeables, which are pipeables that take other pipeables as arguments.

  • Pipe, Pipe2, Pipe3, Pipe4, Pipe5: return a pipeable which composes the provided pipeables together;
  • Connect: returns a sync which applies the given syncs to the input stream concurrently;
  • Segregate: returns a function that returns two pipeables, where the first pipeable emits items that pass the predicate, and the second pipeable emits items that do not pass the predicate.

Utilities

rivo also comes with a set of utilities which cannot be expressed as pipeables but can be useful when working with streams:

  • OrDone: returns a channel which will be closed when the provided context is done;
  • Tee and TeeN: returns n streams that each receive a copy of each item from the input stream;

Error handling

As mentioned, each values contains a value and an optional error. You can handle error either individually inside pipeables' callbacks like Map or Do or (recommended) create dedicated pipelines for error handling. See examples/errorHanlidng for this regard.

Examples

More examples can be found in the examples folder.


Contributing

Contributions are welcome! If you have any ideas, suggestions or bug reports, please open an issue or a pull request.

Roadmap

  • Review docs, in particular where "pipeable" is used instead of "generator", "sink" or "transformer"
  • Add more pipeables, also using the RxJS list of operators as a reference:
    • Tap
    • Better error handling
    • Time-based
    • SQL
    • AWS
    • ...
  • Add more utilities
    • Merge
  • Add more examples
  • Error handling section in the README

License

rivo is licensed under the MIT license. See the LICENSE file for details.

Documentation

Overview

Package rivo is a library for stream processing. It provides a simple and flexible way to create and compose streams of data.

There are three main types in this library: Item, Stream, and Pipeable.

Item is a struct which contains a value and an optional error. Just like errors are returned next to the result of a function in synchronous code, they should be passed along into asynchronous code and handled where more appropriate.

Stream is a read only channel of items. As the name suggests, it represents a stream of data.

Pipeable is a function that takes a context.Context and a Stream of one type and returns a Stream of the same or a different type. Pipeables can be composed together using the one of the Pipe functions. Pipeables are divided in three categories: generators, sinks and transformers.

  • Generator is a pipeable that does not read from its input stream. It starts a new stream from scratch.
  • Sync is a pipeable function that does not emit any items. It is used at the end of a pipeline.
  • Transformer is a pipeable that reads from its input stream and emits items to its output stream.

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrEOS = errors.New("end of stream")

Functions

func Segregate

func Segregate[T, U any](p Pipeable[T, U], predicate func(ctx context.Context, item Item[U]) bool) func(context.Context, Stream[T]) (Generator[U], Generator[U])

Segregate returns a function that returns two pipeables, where the first pipeable emits items that pass the predicate, and the second pipeable emits items that do not pass the predicate.

Example
package main

import (
	"context"
	"fmt"
	"strconv"

	"github.com/agiac/rivo"
)

func main() {
	ctx := context.Background()

	g := rivo.Of("1", "2", "3", "4", "5")

	toInt := rivo.Map(func(ctx context.Context, i rivo.Item[string]) (int, error) {
		return strconv.Atoi(i.Val)
	})

	p := rivo.Pipe(g, toInt)

	even, odd := rivo.Segregate(p, func(ctx context.Context, item rivo.Item[int]) bool {
		return item.Val%2 == 0
	})(ctx, nil)

	evens := make([]int, 0)
	odds := make([]int, 0)

	<-rivo.Connect(
		rivo.Pipe(even, rivo.Do(func(ctx context.Context, i rivo.Item[int]) {
			evens = append(evens, i.Val)
		})),
		rivo.Pipe(odd, rivo.Do(func(ctx context.Context, i rivo.Item[int]) {
			odds = append(odds, i.Val)
		})),
	)(ctx, nil)

	for _, i := range append(evens, odds...) {
		fmt.Println(i)
	}

}
Output:

2
4
1
3
5

func Tee

func Tee[T any](ctx context.Context, in Stream[T]) (Stream[T], Stream[T])

Tee returns two streams that each receive a copy of each item from the input stream. It is equivalent to TeeN(ctx, in, 2).

Example
package main

import (
	"context"
	"fmt"
	"github.com/agiac/rivo"
	"sync"
)

func main() {
	ctx := context.Background()

	in := rivo.Of("hello", "hello", "hello")(ctx, nil)

	out1, out2 := rivo.Tee(ctx, in)

	wg := sync.WaitGroup{}
	wg.Add(2)

	go func() {
		defer wg.Done()
		for i := range out1 {
			fmt.Println(i.Val)
		}
	}()

	go func() {
		defer wg.Done()
		for i := range out2 {
			fmt.Println(i.Val)
		}
	}()

	wg.Wait()

}
Output:

hello
hello
hello
hello
hello
hello

Types

type DoFunc

type DoFunc[T any] = func(context.Context, Item[T])

type FilterFunc

type FilterFunc[T any] = func(context.Context, Item[T]) (bool, error)

type ForEachFunc

type ForEachFunc[T any] = func(context.Context, Item[T]) error

type FromFuncFunc

type FromFuncFunc[T any] = func(context.Context) (T, error)

type FromSeq2Value

type FromSeq2Value[T, U any] struct {
	Val1 T
	Val2 U
}

type Generator

type Generator[T any] = Pipeable[None, T]

Generator is a pipeable function that does not read from its input stream. It starts a new stream from scratch.

func FromFunc

func FromFunc[T any](f FromFuncFunc[T], options ...Option) Generator[T]

FromFunc returns a generator Pipeable that emits items generated by the given function. The input stream is ignored. The returned stream will emit items until the function returns ErrEOS.

Example
ctx := context.Background()

count := atomic.Int32{}

genFn := func(ctx context.Context) (int32, error) {
	value := count.Add(1)

	if value > 5 {
		return 0, ErrEOS
	}

	return value, nil
}

in := FromFunc(genFn)

s := in(ctx, nil)

for item := range s {
	fmt.Println(item.Val)
}
Output:

1
2
3
4
5

func FromSeq

func FromSeq[T any](seq iter.Seq[T], opt ...Option) Generator[T]
Example
ctx := context.Background()

seq := slices.Values([]int{1, 2, 3, 4, 5})
in := FromSeq(seq)

s := in(ctx, nil)

for item := range s {
	fmt.Println(item.Val)
}
Output:

1
2
3
4
5

func FromSeq2

func FromSeq2[T, U any](seq iter.Seq2[T, U], opts ...Option) Generator[FromSeq2Value[T, U]]
Example
ctx := context.Background()

seq := slices.All([]string{"a", "b", "c", "d", "e"})

in := FromSeq2(seq)

s := in(ctx, nil)

for item := range s {
	fmt.Printf("%d, %s\n", item.Val.Val1, item.Val.Val2)
}
Output:

0, a
1, b
2, c
3, d
4, e

func Of

func Of[T any](items ...T) Generator[T]

Of returns a generator Pipeable that emits the given items. The input stream is ignored.

Example
ctx := context.Background()

in := Of(1, 2, 3, 4, 5)

s := in(ctx, nil)

for item := range s {
	fmt.Println(item.Val)
}
Output:

1
2
3
4
5

type Item

type Item[T any] struct {
	// Val is the value of the item when there is no error.
	Val T
	// Err is the optional error of the item.
	Err error
}

Item represents a single item in a data stream. It contains a value of type T and an optional error.

func Collect

func Collect[T any](in Stream[T]) []Item[T]

Collect collects all items from the stream and returns them as a slice.

Example
ctx := context.Background()

s := Of(1, 2, 3, 4, 5)(ctx, nil)

for _, item := range Collect(s) {
	fmt.Println(item.Val)
}
Output:

1
2
3
4
5

func CollectWithContext

func CollectWithContext[T any](ctx context.Context, in Stream[T]) []Item[T]

CollectWithContext collects all items from the stream and returns them as a slice. If the context is cancelled, it stops collecting items.

type MapFunc

type MapFunc[T, U any] = func(context.Context, Item[T]) (U, error)

type None

type None struct{}

type Option

type Option func(*options)

Option is a configuration option for a Pipeable.

func WithBufferSize

func WithBufferSize(size int) Option

WithBufferSize sets the size of the output channel buffer. The default is 0 (unbuffered).

func WithOnBeforeClose

func WithOnBeforeClose(fn func(context.Context) error) Option

WithOnBeforeClose sets a function that will be called before the Pipeable output channel is closed.

func WithPoolSize

func WithPoolSize(size int) Option

WithPoolSize sets the number of goroutines that will be used to process items. The default is 1.

func WithStopOnError

func WithStopOnError(stop bool) Option

WithStopOnError determines whether the Pipeable should stop processing items when an error occurs. The default is false.

type Pipeable

type Pipeable[T, U any] func(ctx context.Context, stream Stream[T]) Stream[U]

Pipeable is a function that takes a context and a stream and returns a stream. It is the building block of a data pipeline.

func Batch added in v0.1.0

func Batch[T any](n int, maxWait time.Duration, opt ...Option) Pipeable[T, []T]

Batch returns a Pipeable that batches items from the input Stream into slices of n items. If the batch is not full after maxWait, it will be sent anyway. Any error in the input Stream will be propagated to the output Stream immediately.

Example
ctx := context.Background()

in := Of(1, 2, 3, 4, 5)

b := Batch[int](2, time.Second)

p := Pipe(in, b)

for item := range p(ctx, nil) {
	fmt.Printf("%v\n", item.Val)
}
Output:

[1 2]
[3 4]
[5]

func Flatten added in v0.2.0

func Flatten[T any](opt ...Option) Pipeable[[]T, T]

Flatten returns a Pipeable that flattens a Stream of slices into a Stream of individual items.

Example
ctx := context.Background()

in := Of([]int{1, 2}, []int{3, 4}, []int{5})

f := Flatten[int]()

p := Pipe(in, f)

for item := range p(ctx, nil) {
	fmt.Printf("%v\n", item.Val)
}
Output:

1
2
3
4
5

func Pipe

func Pipe[A, B, C any](a Pipeable[A, B], b Pipeable[B, C]) Pipeable[A, C]

Pipe pipes two pipeable functions together. It is a convenience function that calls Pipe2.

Example
ctx := context.Background()

a := Of(1, 2, 3, 4, 5)

b := Map(func(ctx context.Context, i Item[int]) (int, error) {
	return i.Val + 1, nil
})

p := Pipe(a, b)

s := p(ctx, nil)

for item := range s {
	fmt.Println(item.Val)
}
Output:

2
3
4
5
6

func Pipe2

func Pipe2[A, B, C any](a Pipeable[A, B], b Pipeable[B, C]) Pipeable[A, C]

Pipe2 pipes two pipeable functions together.

func Pipe3

func Pipe3[A, B, C, D any](a Pipeable[A, B], b Pipeable[B, C], c Pipeable[C, D]) Pipeable[A, D]

Pipe3 pipes three pipeable functions together.

func Pipe4

func Pipe4[A, B, C, D, E any](a Pipeable[A, B], b Pipeable[B, C], c Pipeable[C, D], d Pipeable[D, E]) Pipeable[A, E]

Pipe4 pipes four pipeable functions together.

func Pipe5

func Pipe5[A, B, C, D, E, F any](a Pipeable[A, B], b Pipeable[B, C], c Pipeable[C, D], d Pipeable[D, E], e Pipeable[E, F]) Pipeable[A, F]

Pipe5 pipes five pipeable functions together.

type Stream

type Stream[T any] <-chan Item[T]

Stream represents a data stream of items. It is a read only channel of Item[T].

func OrDone

func OrDone[T any](ctx context.Context, in Stream[T]) Stream[T]

OrDone is a utility function that returns a channel that will be closed when the context is done.

func TeeN

func TeeN[T any](ctx context.Context, in Stream[T], n int) []Stream[T]

TeeN returns n streams that each receive a copy of each item from the input stream.

type Sync

type Sync[T any] = Pipeable[T, None]

Sync is a pipeable function that does not emit any items. It is used at the end of a pipeline.

func Connect

func Connect[A any](pipeables ...Sync[A]) Sync[A]

Connect returns a Sync that applies the given syncs to the input stream concurrently. The output stream will not emit any items, and it will be closed when the input stream is closed or the context is done.

Example
package main

import (
	"context"
	"fmt"
	"strings"

	"github.com/agiac/rivo"
)

func main() {
	ctx := context.Background()

	g := rivo.Of("Hello", "Hello", "Hello")

	capitalize := rivo.Map(func(ctx context.Context, i rivo.Item[string]) (string, error) {
		return strings.ToUpper(i.Val), nil
	})

	lowercase := rivo.Map(func(ctx context.Context, i rivo.Item[string]) (string, error) {
		return strings.ToLower(i.Val), nil
	})

	resA := make([]string, 0)
	a := rivo.Do(func(ctx context.Context, i rivo.Item[string]) {
		resA = append(resA, i.Val)
	})

	resB := make([]string, 0)
	b := rivo.Do(func(ctx context.Context, i rivo.Item[string]) {
		resB = append(resB, i.Val)
	})

	p1 := rivo.Pipe(capitalize, a)
	p2 := rivo.Pipe(lowercase, b)

	<-rivo.Connect(p1, p2)(ctx, g(ctx, nil))

	for _, s := range resA {
		fmt.Println(s)
	}

	for _, s := range resB {
		fmt.Println(s)
	}

}
Output:

HELLO
HELLO
HELLO
hello
hello
hello

func Do

func Do[T any](f DoFunc[T], opt ...Option) Sync[T]

Do returns a Sync that applies the given function to each item in the stream. The output stream will not emit any items, and it will be closed when the input stream is closed or the context is done.

Example
ctx := context.Background()

in := make(chan Item[int])
go func() {
	defer close(in)
	in <- Item[int]{Val: 1}
	in <- Item[int]{Val: 2}
	in <- Item[int]{Err: errors.New("error 1")}
	in <- Item[int]{Val: 4}
	in <- Item[int]{Err: errors.New("error 2")}
}()

d := Do(func(ctx context.Context, i Item[int]) {
	if i.Err != nil {
		fmt.Printf("ERROR: %v\n", i.Err)
	}
})

<-d(ctx, in)
Output:

ERROR: error 1
ERROR: error 2

type Transformer

type Transformer[T, U any] = Pipeable[T, U]

Transformer is a pipeable that reads from its input stream and emits items to its output stream.

func Filter

func Filter[T any](f FilterFunc[T], opt ...Option) Transformer[T, T]

Filter returns a Transformer that filters the input stream using the given function.

Example
ctx := context.Background()

in := Of(1, 2, 3, 4, 5)

onlyEven := Filter(func(ctx context.Context, i Item[int]) (bool, error) {
	// Always check for errors
	if i.Err != nil {
		return true, i.Err // Propagate the error
	}

	return i.Val%2 == 0, nil
})

p := Pipe(in, onlyEven)

s := p(ctx, nil)

for item := range s {
	fmt.Println(item.Val)
}
Output:

2
4

func ForEach

func ForEach[T any](f ForEachFunc[T], opt ...Option) Transformer[T, struct{}]

ForEach returns a Transformer that applies a function to each item from the input stream. It is intended for side effect and the output stream will only emit the errors returned by the function.

Example
ctx := context.Background()

g := Of(1, 2, 3, 4, 5)

f := ForEach(func(ctx context.Context, i Item[int]) error {
	// Do some side effect
	// ...
	// Simulate an error
	if i.Val == 3 {
		return fmt.Errorf("an error")
	}

	return nil
})

s := Pipe(g, f)(ctx, nil)

for item := range s {
	fmt.Printf("item: %v; error: %v\n", item.Val, item.Err)
}
Output:

item: {}; error: an error

func Map

func Map[T, U any](f MapFunc[T, U], opt ...Option) Transformer[T, U]

Map returns a Transformer that applies a function to each item from the input stream.

Example
ctx := context.Background()

in := Of(1, 2, 3, 4, 5)

double := Map(func(ctx context.Context, i Item[int]) (int, error) {
	// Always check for errors
	if i.Err != nil {
		return 0, i.Err // Propagate the error
	}

	return i.Val * 2, nil
})

p := Pipe(in, double)

s := p(ctx, nil)

for item := range s {
	fmt.Println(item.Val)
}
Output:

2
4
6
8
10

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL