Documentation ¶
Overview ¶
Package stream allows iterating over sequences of values where iteration may fail, for example when it involves I/O.
Index ¶
- Variables
- func Collect[T any](ctx context.Context, s Stream[T]) ([]T, error)
- func Last[T any](ctx context.Context, s Stream[T], n int) ([]T, error)
- func One[T any](ctx context.Context, s Stream[T]) (T, error)
- func Pipe[T any](bufferSize int) (*PipeSender[T], Stream[T])
- func Reduce[T any, U any](ctx context.Context, s Stream[T], initial U, f func(U, T) (U, error)) (U, error)
- type Peekable
- type PipeSender
- type Stream
- func Batch[T any](s Stream[T], maxWait time.Duration, batchSize int) Stream[[]T]
- func BatchFunc[T any](s Stream[T], maxWait time.Duration, full func(batch []T) bool) Stream[[]T]
- func Chan[T any](c <-chan T) Stream[T]
- func Chunk[T any](s Stream[T], chunkSize int) Stream[[]T]
- func Compact[T comparable](s Stream[T]) Stream[T]
- func CompactFunc[T any](s Stream[T], eq func(T, T) bool) Stream[T]
- func Empty[T any]() Stream[T]
- func Error[T any](err error) Stream[T]
- func Filter[T any](s Stream[T], keep func(context.Context, T) (bool, error)) Stream[T]
- func First[T any](s Stream[T], n int) Stream[T]
- func Flatten[T any](s Stream[Stream[T]]) Stream[T]
- func FlattenSlices[T any](s Stream[[]T]) Stream[T]
- func FromIterator[T any](iter iterator.Iterator[T]) Stream[T]
- func Join[T any](streams ...Stream[T]) Stream[T]
- func Map[T any, U any](s Stream[T], f func(context.Context, T) (U, error)) Stream[U]
- func Merge[T any](in ...Stream[T]) Stream[T]
- func Runs[T any](s Stream[T], same func(a, b T) bool) Stream[Stream[T]]
- func While[T any](s Stream[T], f func(context.Context, T) (bool, error)) Stream[T]
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // End is returned from Stream.Next when iteration ends successfully. End = errors.New("end of stream") // ErrClosedPipe is returned from PipeSender.Send() when the associated stream has already been // closed. ErrClosedPipe = errors.New("closed pipe") // ErrMoreThanOne is returned from One when a Stream yielded more than one item. ErrMoreThanOne = errors.New("stream had more than one item") // ErrEmpty is returned from One when a Stream yielded no items. ErrEmpty = errors.New("stream empty") )
Functions ¶
func Collect ¶
Collect advances s to the end and returns all of the items seen as a slice.
Example ¶
package main import ( "context" "fmt" "github.com/bradenaw/juniper/iterator" "github.com/bradenaw/juniper/stream" ) func main() { ctx := context.Background() s := stream.FromIterator(iterator.Slice([]string{"a", "b", "c"})) x, err := stream.Collect(ctx, s) fmt.Println(err) fmt.Println(x) }
Output: <nil> [a b c]
func Last ¶
Last consumes s and returns the last n items. If s yields fewer than n items, Last returns all of them.
Example ¶
package main import ( "context" "fmt" "github.com/bradenaw/juniper/iterator" "github.com/bradenaw/juniper/stream" ) func main() { ctx := context.Background() s := stream.FromIterator(iterator.Counter(10)) last5, _ := stream.Last(ctx, s, 5) fmt.Println(last5) s = stream.FromIterator(iterator.Counter(3)) last5, _ = stream.Last(ctx, s, 5) fmt.Println(last5) }
Output: [5 6 7 8 9] [0 1 2]
func One ¶
One returns the only item that s yields. Returns an error if encountered, or if s yields zero or more than one item.
Example ¶
package main import ( "context" "fmt" "github.com/bradenaw/juniper/iterator" "github.com/bradenaw/juniper/stream" ) func main() { ctx := context.Background() s := stream.FromIterator(iterator.Slice([]string{"a"})) item, err := stream.One(ctx, s) fmt.Println(err == nil) fmt.Println(item) s = stream.FromIterator(iterator.Slice([]string{"a", "b"})) _, err = stream.One(ctx, s) fmt.Println(err == stream.ErrMoreThanOne) s = stream.FromIterator(iterator.Slice([]string{})) _, err = stream.One(ctx, s) fmt.Println(err == stream.ErrEmpty) }
Output: true a true true
func Pipe ¶
func Pipe[T any](bufferSize int) (*PipeSender[T], Stream[T])
Pipe returns a linked sender and receiver pair. Values sent using sender.Send will be delivered to the given Stream. The Stream will terminate when the sender is closed.
bufferSize is the number of elements in the buffer between the sender and the receiver. 0 has the same meaning as for the built-in make(chan).
Example ¶
package main import ( "context" "fmt" "github.com/bradenaw/juniper/stream" ) func main() { ctx := context.Background() sender, receiver := stream.Pipe[int](0) go func() { sender.Send(ctx, 1) sender.Send(ctx, 2) sender.Send(ctx, 3) sender.Close(nil) }() defer receiver.Close() for { item, err := receiver.Next(ctx) if err == stream.End { break } else if err != nil { fmt.Printf("stream ended with error: %s\n", err) return } fmt.Println(item) } }
Output: 1 2 3
Example (Error) ¶
package main import ( "context" "errors" "fmt" "github.com/bradenaw/juniper/stream" ) func main() { ctx := context.Background() sender, receiver := stream.Pipe[int](0) oopsError := errors.New("oops") go func() { sender.Send(ctx, 1) sender.Close(oopsError) }() defer receiver.Close() for { item, err := receiver.Next(ctx) if err == stream.End { fmt.Println("stream ended normally") break } else if err != nil { fmt.Printf("stream ended with error: %s\n", err) return } fmt.Println(item) } }
Output: 1 stream ended with error: oops
func Reduce ¶
func Reduce[T any, U any]( ctx context.Context, s Stream[T], initial U, f func(U, T) (U, error), ) (U, error)
Reduce reduces s to a single value using the reduction function f.
Example ¶
package main import ( "context" "fmt" "github.com/bradenaw/juniper/iterator" "github.com/bradenaw/juniper/stream" ) func main() { ctx := context.Background() s := stream.FromIterator(iterator.Slice([]int{1, 2, 3, 4, 5})) sum, _ := stream.Reduce(ctx, s, 0, func(x, y int) (int, error) { return x + y, nil }) fmt.Println(sum) s = stream.FromIterator(iterator.Slice([]int{1, 3, 2, 3})) // Computes the exponentially-weighted moving average of the values of s. first := true ewma, _ := stream.Reduce(ctx, s, 0, func(running float64, item int) (float64, error) { if first { first = false return float64(item), nil } return running*0.5 + float64(item)*0.5, nil }) // Should end as 1/8 + 3/8 + 2/4 + 3/2 // = 1/8 + 3/8 + 4/8 + 12/8 // = 20/8 // = 2.5 fmt.Println(ewma) }
Output: 15 2.5
Types ¶
type Peekable ¶
type Peekable[T any] interface { Stream[T] // Peek returns the next item of the stream if there is one without consuming it. // // If Peek returns a value, the next call to Next will return the same value. Peek(ctx context.Context) (T, error) }
Peekable allows viewing the next item from a stream without consuming it.
Example ¶
package main import ( "context" "fmt" "github.com/bradenaw/juniper/iterator" "github.com/bradenaw/juniper/stream" ) func main() { ctx := context.Background() s := stream.FromIterator(iterator.Slice([]int{1, 2, 3})) p := stream.WithPeek(s) x, _ := p.Peek(ctx) fmt.Println(x) x, _ = p.Next(ctx) fmt.Println(x) x, _ = p.Next(ctx) fmt.Println(x) x, _ = p.Peek(ctx) fmt.Println(x) }
Output: 1 1 2 3
type PipeSender ¶
type PipeSender[T any] struct { // contains filtered or unexported fields }
PipeSender is the send half of a pipe returned by Pipe.
func (*PipeSender[T]) Close ¶
func (s *PipeSender[T]) Close(err error)
Close closes the PipeSender, signalling to the receiver that no more values will be sent. If an error is provided, it will surface to the receiver's Next and to any concurrent Sends.
Close may only be called once.
func (*PipeSender[T]) Send ¶
func (s *PipeSender[T]) Send(ctx context.Context, x T) error
Send attempts to send x to the receiver. If the receiver closes before x can be sent, returns ErrClosedPipe immediately. If ctx expires before x can be sent, returns ctx.Err().
A nil return does not necessarily mean that the receiver will see x, since the receiver may close early.
Send may be called concurrently with other Sends and with Close.
func (*PipeSender[T]) TrySend ¶ added in v0.9.0
func (s *PipeSender[T]) TrySend(ctx context.Context, x T) (bool, error)
TrySend attempts to send x to the receiver, but returns (false, nil) if the pipe's buffer is already full instead of blocking. If the receiver is already closed, returns ErrClosedPipe. If ctx expires before x can be sent, returns ctx.Err().
A (true, nil) return does not necessarily mean that the receiver will see x, since the receiver may close early.
TrySend may be called concurrently with other Sends and with Close.
type Stream ¶
type Stream[T any] interface { // Next advances the stream and returns the next item. If the stream is already over, Next // returns stream.End in the second return. Note that the final item of the stream has nil in // the second return, and it's the following call that returns stream.End. // // Once a Next call returns stream.End, it is expected that the Stream will return stream.End to // every Next call afterwards. Next(ctx context.Context) (T, error) // Close ends receiving from the stream. It is invalid to call Next after calling Close. Close() }
Stream is used to iterate over a sequence of values. It is similar to Iterator, except intended for use when iteration may fail for some reason, usually because the sequence requires I/O to produce.
Streams and the combinator functions are lazy, meaning they do no work until a call to Next().
Streams do not need to be fully consumed, but streams must be closed. Functions in this package that are passed streams expect to be the sole user of that stream going forward, and so will handle closing on your behalf so long as all streams they return are closed appropriately.
func Batch ¶
Batch returns a stream of non-overlapping batches from s of size batchSize. Batch is similar to Chunk with the added feature that an underfilled batch will be delivered to the output stream if any item has been in the batch for more than maxWait.
Example ¶
package main import ( "context" "fmt" "time" "github.com/bradenaw/juniper/stream" ) func main() { ctx := context.Background() sender, receiver := stream.Pipe[string](0) batchStream := stream.Batch(receiver, 50*time.Millisecond, 3) wait := make(chan struct{}, 3) go func() { _ = sender.Send(ctx, "a") _ = sender.Send(ctx, "b") // Wait here before sending any more to show that the first batch will flush early because // of maxTime=50*time.Millisecond. <-wait _ = sender.Send(ctx, "c") _ = sender.Send(ctx, "d") _ = sender.Send(ctx, "e") _ = sender.Send(ctx, "f") sender.Close(nil) }() defer batchStream.Close() var batches [][]string for { batch, err := batchStream.Next(ctx) if err == stream.End { break } else if err != nil { fmt.Printf("stream ended with error: %s\n", err) return } batches = append(batches, batch) wait <- struct{}{} } fmt.Println(batches) }
Output: [[a b] [c d e] [f]]
func BatchFunc ¶
BatchFunc returns a stream of non-overlapping batches from s, using full to determine when a batch is full. BatchFunc is similar to Chunk with the added feature that an underfilled batch will be delivered to the output stream if any item has been in the batch for more than maxWait.
func Chan ¶
Chan returns a Stream that receives values from c.
Example ¶
package main import ( "context" "fmt" "github.com/bradenaw/juniper/stream" ) func main() { ctx := context.Background() c := make(chan string, 3) c <- "a" c <- "b" c <- "c" close(c) s := stream.Chan(c) x, err := stream.Collect(ctx, s) fmt.Println(err) fmt.Println(x) }
Output: <nil> [a b c]
func Chunk ¶
Chunk returns a stream of non-overlapping chunks from s of size chunkSize. The last chunk will be smaller than chunkSize if the stream does not contain an even multiple.
Example ¶
package main import ( "context" "fmt" "github.com/bradenaw/juniper/iterator" "github.com/bradenaw/juniper/stream" ) func main() { ctx := context.Background() s := stream.FromIterator(iterator.Slice([]string{"a", "b", "c", "d", "e", "f", "g", "h"})) chunked := stream.Chunk(s, 3) item, _ := chunked.Next(ctx) fmt.Println(item) item, _ = chunked.Next(ctx) fmt.Println(item) item, _ = chunked.Next(ctx) fmt.Println(item) }
Output: [a b c] [d e f] [g h]
func Compact ¶
func Compact[T comparable](s Stream[T]) Stream[T]
Compact elides adjacent duplicates from s.
Example ¶
package main import ( "context" "fmt" "github.com/bradenaw/juniper/iterator" "github.com/bradenaw/juniper/stream" ) func main() { ctx := context.Background() s := stream.FromIterator(iterator.Slice([]string{"a", "a", "b", "c", "c", "c", "a"})) compactStream := stream.Compact(s) compacted, _ := stream.Collect(ctx, compactStream) fmt.Println(compacted) }
Output: [a b c a]
func CompactFunc ¶
CompactFunc elides adjacent duplicates from s, using eq to determine duplicates.
Example ¶
package main import ( "context" "fmt" "github.com/bradenaw/juniper/iterator" "github.com/bradenaw/juniper/stream" ) func main() { ctx := context.Background() s := stream.FromIterator(iterator.Slice([]string{ "bank", "beach", "ghost", "goat", "group", "yaw", "yew", })) compactStream := stream.CompactFunc(s, func(a, b string) bool { return a[0] == b[0] }) compacted, _ := stream.Collect(ctx, compactStream) fmt.Println(compacted) }
Output: [bank ghost yaw]
func Error ¶
Error returns a Stream that immediately produces err from Next.
Example ¶
package main import ( "context" "errors" "fmt" "github.com/bradenaw/juniper/stream" ) func main() { ctx := context.Background() s := stream.Error[int](errors.New("foo")) _, err := s.Next(ctx) fmt.Println(err) }
Output: foo
func Filter ¶
Filter returns a Stream that yields only the items from s for which keep returns true. If keep returns an error, terminates the stream early.
Example ¶
package main import ( "context" "fmt" "github.com/bradenaw/juniper/iterator" "github.com/bradenaw/juniper/stream" ) func main() { ctx := context.Background() s := stream.FromIterator(iterator.Slice([]int{1, 2, 3, 4, 5, 6})) evensStream := stream.Filter(s, func(ctx context.Context, x int) (bool, error) { return x%2 == 0, nil }) evens, _ := stream.Collect(ctx, evensStream) fmt.Println(evens) }
Output: [2 4 6]
func First ¶
First returns a Stream that yields the first n items from s.
Example ¶
package main import ( "context" "fmt" "github.com/bradenaw/juniper/iterator" "github.com/bradenaw/juniper/stream" ) func main() { ctx := context.Background() s := stream.FromIterator(iterator.Slice([]string{"a", "b", "c", "d", "e"})) first3Stream := stream.First(s, 3) first3, _ := stream.Collect(ctx, first3Stream) fmt.Println(first3) }
Output: [a b c]
func Flatten ¶
Flatten returns a stream that yields all items from all streams yielded by s.
Example ¶
package main import ( "context" "fmt" "github.com/bradenaw/juniper/iterator" "github.com/bradenaw/juniper/stream" ) func main() { ctx := context.Background() s := stream.FromIterator(iterator.Slice([]stream.Stream[int]{ stream.FromIterator(iterator.Slice([]int{0, 1, 2})), stream.FromIterator(iterator.Slice([]int{3, 4, 5, 6})), stream.FromIterator(iterator.Slice([]int{7})), })) allStream := stream.Flatten(s) all, _ := stream.Collect(ctx, allStream) fmt.Println(all) }
Output: [0 1 2 3 4 5 6 7]
func FlattenSlices ¶ added in v0.15.1
func FromIterator ¶
FromIterator returns a Stream that yields the values from iter. This stream ignores the context passed to Next during the call to iter.Next.
func Join ¶
Join returns a Stream that yields all elements from streams[0], then all elements from streams[1], and so on.
Example ¶
package main import ( "context" "fmt" "github.com/bradenaw/juniper/iterator" "github.com/bradenaw/juniper/stream" ) func main() { ctx := context.Background() s := stream.Join( stream.FromIterator(iterator.Counter(3)), stream.FromIterator(iterator.Counter(5)), stream.FromIterator(iterator.Counter(2)), ) all, _ := stream.Collect(ctx, s) fmt.Println(all) }
Output: [0 1 2 0 1 2 3 4 0 1]
func Map ¶
Map transforms the values of s using the conversion f. If f returns an error, terminates the stream early.
Example ¶
package main import ( "context" "fmt" "github.com/bradenaw/juniper/iterator" "github.com/bradenaw/juniper/stream" ) func main() { ctx := context.Background() s := stream.FromIterator(iterator.Counter(5)) halfStream := stream.Map(s, func(ctx context.Context, x int) (float64, error) { return float64(x) / 2, nil }) all, _ := stream.Collect(ctx, halfStream) fmt.Println(all) }
Output: [0 0.5 1 1.5 2]
func Merge ¶ added in v0.2.0
Merge merges the in streams, returning a stream that yields all elements from all of them as they arrive.
Example ¶
package main import ( "context" "fmt" "github.com/bradenaw/juniper/iterator" "github.com/bradenaw/juniper/stream" ) func main() { ctx := context.Background() a := stream.FromIterator(iterator.Slice([]string{"a", "b", "c"})) b := stream.FromIterator(iterator.Slice([]string{"x", "y", "z"})) c := stream.FromIterator(iterator.Slice([]string{"m", "n"})) s := stream.Merge(a, b, c) for { item, err := s.Next(ctx) if err == stream.End { break } else if err != nil { panic(err) } fmt.Println(item) } }
Output: m b a n x c z y
func Runs ¶
Runs returns a stream of streams. The inner streams yield contiguous elements from s such that same(a, b) returns true for any a and b in the run.
The inner stream should be drained before calling Next on the outer stream.
same(a, a) must return true. If same(a, b) and same(b, c) both return true, then same(a, c) must also.
Example ¶
package main import ( "context" "fmt" "github.com/bradenaw/juniper/iterator" "github.com/bradenaw/juniper/stream" ) func main() { ctx := context.Background() s := stream.FromIterator(iterator.Slice([]int{2, 4, 0, 7, 1, 3, 9, 2, 8})) // Contiguous runs of evens/odds. parityRuns := stream.Runs(s, func(a, b int) bool { return a%2 == b%2 }) one, _ := parityRuns.Next(ctx) allOne, _ := stream.Collect(ctx, one) fmt.Println(allOne) two, _ := parityRuns.Next(ctx) allTwo, _ := stream.Collect(ctx, two) fmt.Println(allTwo) three, _ := parityRuns.Next(ctx) allThree, _ := stream.Collect(ctx, three) fmt.Println(allThree) }
Output: [2 4 0] [7 1 3 9] [2 8]
func While ¶
While returns a Stream that terminates before the first item from s for which f returns false. If f returns an error, terminates the stream early.
Example ¶
package main import ( "context" "fmt" "github.com/bradenaw/juniper/iterator" "github.com/bradenaw/juniper/stream" ) func main() { ctx := context.Background() s := stream.FromIterator(iterator.Slice([]string{ "aardvark", "badger", "cheetah", "dinosaur", "egret", })) beforeD := stream.While(s, func(ctx context.Context, s string) (bool, error) { return s < "d", nil }) out, _ := stream.Collect(ctx, beforeD) fmt.Println(out) }
Output: [aardvark badger cheetah]