stream

package
v0.15.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 28, 2024 License: MIT Imports: 7 Imported by: 5

Documentation

Overview

Package stream allows iterating over sequences of values where iteration may fail, for example when it involves I/O.

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// End is returned from Stream.Next when iteration ends successfully.
	End = errors.New("end of stream")
	// ErrClosedPipe is returned from PipeSender.Send() when the associated stream has already been
	// closed.
	ErrClosedPipe = errors.New("closed pipe")
	// ErrMoreThanOne is returned from One when a Stream yielded more than one item.
	ErrMoreThanOne = errors.New("stream had more than one item")
	// ErrEmpty is returned from One when a Stream yielded no items.
	ErrEmpty = errors.New("stream empty")
)

Functions

func Collect

func Collect[T any](ctx context.Context, s Stream[T]) ([]T, error)

Collect advances s to the end and returns all of the items seen as a slice.

Example
package main

import (
	"context"
	"fmt"

	"github.com/bradenaw/juniper/iterator"
	"github.com/bradenaw/juniper/stream"
)

func main() {
	ctx := context.Background()
	s := stream.FromIterator(iterator.Slice([]string{"a", "b", "c"}))

	x, err := stream.Collect(ctx, s)
	fmt.Println(err)
	fmt.Println(x)

}
Output:

<nil>
[a b c]

func Last

func Last[T any](ctx context.Context, s Stream[T], n int) ([]T, error)

Last consumes s and returns the last n items. If s yields fewer than n items, Last returns all of them.

Example
package main

import (
	"context"
	"fmt"

	"github.com/bradenaw/juniper/iterator"
	"github.com/bradenaw/juniper/stream"
)

func main() {
	ctx := context.Background()
	s := stream.FromIterator(iterator.Counter(10))
	last5, _ := stream.Last(ctx, s, 5)
	fmt.Println(last5)

	s = stream.FromIterator(iterator.Counter(3))
	last5, _ = stream.Last(ctx, s, 5)
	fmt.Println(last5)

}
Output:

[5 6 7 8 9]
[0 1 2]

func One

func One[T any](ctx context.Context, s Stream[T]) (T, error)

One returns the only item that s yields. Returns an error if encountered, or if s yields zero or more than one item.

Example
package main

import (
	"context"
	"fmt"

	"github.com/bradenaw/juniper/iterator"
	"github.com/bradenaw/juniper/stream"
)

func main() {
	ctx := context.Background()

	s := stream.FromIterator(iterator.Slice([]string{"a"}))
	item, err := stream.One(ctx, s)
	fmt.Println(err == nil)
	fmt.Println(item)

	s = stream.FromIterator(iterator.Slice([]string{"a", "b"}))
	_, err = stream.One(ctx, s)
	fmt.Println(err == stream.ErrMoreThanOne)

	s = stream.FromIterator(iterator.Slice([]string{}))
	_, err = stream.One(ctx, s)
	fmt.Println(err == stream.ErrEmpty)

}
Output:

true
a
true
true

func Pipe

func Pipe[T any](bufferSize int) (*PipeSender[T], Stream[T])

Pipe returns a linked sender and receiver pair. Values sent using sender.Send will be delivered to the given Stream. The Stream will terminate when the sender is closed.

bufferSize is the number of elements in the buffer between the sender and the receiver. 0 has the same meaning as for the built-in make(chan).

Example
package main

import (
	"context"
	"fmt"

	"github.com/bradenaw/juniper/stream"
)

func main() {
	ctx := context.Background()
	sender, receiver := stream.Pipe[int](0)

	go func() {
		sender.Send(ctx, 1)
		sender.Send(ctx, 2)
		sender.Send(ctx, 3)
		sender.Close(nil)
	}()

	defer receiver.Close()
	for {
		item, err := receiver.Next(ctx)
		if err == stream.End {
			break
		} else if err != nil {
			fmt.Printf("stream ended with error: %s\n", err)
			return
		}
		fmt.Println(item)
	}

}
Output:

1
2
3
Example (Error)
package main

import (
	"context"
	"errors"
	"fmt"

	"github.com/bradenaw/juniper/stream"
)

func main() {
	ctx := context.Background()
	sender, receiver := stream.Pipe[int](0)

	oopsError := errors.New("oops")

	go func() {
		sender.Send(ctx, 1)
		sender.Close(oopsError)
	}()

	defer receiver.Close()
	for {
		item, err := receiver.Next(ctx)
		if err == stream.End {
			fmt.Println("stream ended normally")
			break
		} else if err != nil {
			fmt.Printf("stream ended with error: %s\n", err)
			return
		}
		fmt.Println(item)
	}

}
Output:

1
stream ended with error: oops

func Reduce

func Reduce[T any, U any](
	ctx context.Context,
	s Stream[T],
	initial U,
	f func(U, T) (U, error),
) (U, error)

Reduce reduces s to a single value using the reduction function f.

Example
package main

import (
	"context"
	"fmt"

	"github.com/bradenaw/juniper/iterator"
	"github.com/bradenaw/juniper/stream"
)

func main() {
	ctx := context.Background()
	s := stream.FromIterator(iterator.Slice([]int{1, 2, 3, 4, 5}))

	sum, _ := stream.Reduce(ctx, s, 0, func(x, y int) (int, error) {
		return x + y, nil
	})
	fmt.Println(sum)

	s = stream.FromIterator(iterator.Slice([]int{1, 3, 2, 3}))
	// Computes the exponentially-weighted moving average of the values of s.
	first := true
	ewma, _ := stream.Reduce(ctx, s, 0, func(running float64, item int) (float64, error) {
		if first {
			first = false
			return float64(item), nil
		}
		return running*0.5 + float64(item)*0.5, nil
	})
	// Should end as 1/8 + 3/8 + 2/4 + 3/2
	//             = 1/8 + 3/8 + 4/8 + 12/8
	//             = 20/8
	//             = 2.5
	fmt.Println(ewma)

}
Output:

15
2.5

Types

type Peekable

type Peekable[T any] interface {
	Stream[T]
	// Peek returns the next item of the stream if there is one without consuming it.
	//
	// If Peek returns a value, the next call to Next will return the same value.
	Peek(ctx context.Context) (T, error)
}

Peekable allows viewing the next item from a stream without consuming it.

Example
package main

import (
	"context"
	"fmt"

	"github.com/bradenaw/juniper/iterator"
	"github.com/bradenaw/juniper/stream"
)

func main() {
	ctx := context.Background()
	s := stream.FromIterator(iterator.Slice([]int{1, 2, 3}))

	p := stream.WithPeek(s)
	x, _ := p.Peek(ctx)
	fmt.Println(x)
	x, _ = p.Next(ctx)
	fmt.Println(x)
	x, _ = p.Next(ctx)
	fmt.Println(x)
	x, _ = p.Peek(ctx)
	fmt.Println(x)

}
Output:

1
1
2
3

func WithPeek

func WithPeek[T any](s Stream[T]) Peekable[T]

WithPeek returns iter with a Peek() method attached.

type PipeSender

type PipeSender[T any] struct {
	// contains filtered or unexported fields
}

PipeSender is the send half of a pipe returned by Pipe.

func (*PipeSender[T]) Close

func (s *PipeSender[T]) Close(err error)

Close closes the PipeSender, signalling to the receiver that no more values will be sent. If an error is provided, it will surface to the receiver's Next and to any concurrent Sends.

Close may only be called once.

func (*PipeSender[T]) Send

func (s *PipeSender[T]) Send(ctx context.Context, x T) error

Send attempts to send x to the receiver. If the receiver closes before x can be sent, returns ErrClosedPipe immediately. If ctx expires before x can be sent, returns ctx.Err().

A nil return does not necessarily mean that the receiver will see x, since the receiver may close early.

Send may be called concurrently with other Sends and with Close.

func (*PipeSender[T]) TrySend added in v0.9.0

func (s *PipeSender[T]) TrySend(ctx context.Context, x T) (bool, error)

TrySend attempts to send x to the receiver, but returns (false, nil) if the pipe's buffer is already full instead of blocking. If the receiver is already closed, returns ErrClosedPipe. If ctx expires before x can be sent, returns ctx.Err().

A (true, nil) return does not necessarily mean that the receiver will see x, since the receiver may close early.

TrySend may be called concurrently with other Sends and with Close.

type Stream

type Stream[T any] interface {
	// Next advances the stream and returns the next item. If the stream is already over, Next
	// returns stream.End in the second return. Note that the final item of the stream has nil in
	// the second return, and it's the following call that returns stream.End.
	//
	// Once a Next call returns stream.End, it is expected that the Stream will return stream.End to
	// every Next call afterwards.
	Next(ctx context.Context) (T, error)
	// Close ends receiving from the stream. It is invalid to call Next after calling Close.
	Close()
}

Stream is used to iterate over a sequence of values. It is similar to Iterator, except intended for use when iteration may fail for some reason, usually because the sequence requires I/O to produce.

Streams and the combinator functions are lazy, meaning they do no work until a call to Next().

Streams do not need to be fully consumed, but streams must be closed. Functions in this package that are passed streams expect to be the sole user of that stream going forward, and so will handle closing on your behalf so long as all streams they return are closed appropriately.

func Batch

func Batch[T any](s Stream[T], maxWait time.Duration, batchSize int) Stream[[]T]

Batch returns a stream of non-overlapping batches from s of size batchSize. Batch is similar to Chunk with the added feature that an underfilled batch will be delivered to the output stream if any item has been in the batch for more than maxWait.

Example
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/bradenaw/juniper/stream"
)

func main() {
	ctx := context.Background()

	sender, receiver := stream.Pipe[string](0)
	batchStream := stream.Batch(receiver, 50*time.Millisecond, 3)

	wait := make(chan struct{}, 3)
	go func() {
		_ = sender.Send(ctx, "a")
		_ = sender.Send(ctx, "b")
		// Wait here before sending any more to show that the first batch will flush early because
		// of maxTime=50*time.Millisecond.
		<-wait
		_ = sender.Send(ctx, "c")
		_ = sender.Send(ctx, "d")
		_ = sender.Send(ctx, "e")
		_ = sender.Send(ctx, "f")
		sender.Close(nil)
	}()

	defer batchStream.Close()
	var batches [][]string
	for {
		batch, err := batchStream.Next(ctx)
		if err == stream.End {
			break
		} else if err != nil {
			fmt.Printf("stream ended with error: %s\n", err)
			return
		}
		batches = append(batches, batch)
		wait <- struct{}{}
	}
	fmt.Println(batches)

}
Output:

[[a b] [c d e] [f]]

func BatchFunc

func BatchFunc[T any](
	s Stream[T],
	maxWait time.Duration,
	full func(batch []T) bool,
) Stream[[]T]

BatchFunc returns a stream of non-overlapping batches from s, using full to determine when a batch is full. BatchFunc is similar to Chunk with the added feature that an underfilled batch will be delivered to the output stream if any item has been in the batch for more than maxWait.

func Chan

func Chan[T any](c <-chan T) Stream[T]

Chan returns a Stream that receives values from c.

Example
package main

import (
	"context"
	"fmt"

	"github.com/bradenaw/juniper/stream"
)

func main() {
	ctx := context.Background()

	c := make(chan string, 3)
	c <- "a"
	c <- "b"
	c <- "c"
	close(c)
	s := stream.Chan(c)

	x, err := stream.Collect(ctx, s)
	fmt.Println(err)
	fmt.Println(x)

}
Output:

<nil>
[a b c]

func Chunk

func Chunk[T any](s Stream[T], chunkSize int) Stream[[]T]

Chunk returns a stream of non-overlapping chunks from s of size chunkSize. The last chunk will be smaller than chunkSize if the stream does not contain an even multiple.

Example
package main

import (
	"context"
	"fmt"

	"github.com/bradenaw/juniper/iterator"
	"github.com/bradenaw/juniper/stream"
)

func main() {
	ctx := context.Background()
	s := stream.FromIterator(iterator.Slice([]string{"a", "b", "c", "d", "e", "f", "g", "h"}))

	chunked := stream.Chunk(s, 3)
	item, _ := chunked.Next(ctx)
	fmt.Println(item)
	item, _ = chunked.Next(ctx)
	fmt.Println(item)
	item, _ = chunked.Next(ctx)
	fmt.Println(item)

}
Output:

[a b c]
[d e f]
[g h]

func Compact

func Compact[T comparable](s Stream[T]) Stream[T]

Compact elides adjacent duplicates from s.

Example
package main

import (
	"context"
	"fmt"

	"github.com/bradenaw/juniper/iterator"
	"github.com/bradenaw/juniper/stream"
)

func main() {
	ctx := context.Background()
	s := stream.FromIterator(iterator.Slice([]string{"a", "a", "b", "c", "c", "c", "a"}))
	compactStream := stream.Compact(s)
	compacted, _ := stream.Collect(ctx, compactStream)
	fmt.Println(compacted)

}
Output:

[a b c a]

func CompactFunc

func CompactFunc[T any](s Stream[T], eq func(T, T) bool) Stream[T]

CompactFunc elides adjacent duplicates from s, using eq to determine duplicates.

Example
package main

import (
	"context"
	"fmt"

	"github.com/bradenaw/juniper/iterator"
	"github.com/bradenaw/juniper/stream"
)

func main() {
	ctx := context.Background()
	s := stream.FromIterator(iterator.Slice([]string{
		"bank",
		"beach",
		"ghost",
		"goat",
		"group",
		"yaw",
		"yew",
	}))
	compactStream := stream.CompactFunc(s, func(a, b string) bool {
		return a[0] == b[0]
	})
	compacted, _ := stream.Collect(ctx, compactStream)
	fmt.Println(compacted)

}
Output:

[bank ghost yaw]

func Empty added in v0.5.0

func Empty[T any]() Stream[T]

Empty returns a Stream that yields stream.End immediately.

func Error

func Error[T any](err error) Stream[T]

Error returns a Stream that immediately produces err from Next.

Example
package main

import (
	"context"
	"errors"
	"fmt"

	"github.com/bradenaw/juniper/stream"
)

func main() {
	ctx := context.Background()

	s := stream.Error[int](errors.New("foo"))

	_, err := s.Next(ctx)
	fmt.Println(err)

}
Output:

foo

func Filter

func Filter[T any](s Stream[T], keep func(context.Context, T) (bool, error)) Stream[T]

Filter returns a Stream that yields only the items from s for which keep returns true. If keep returns an error, terminates the stream early.

Example
package main

import (
	"context"
	"fmt"

	"github.com/bradenaw/juniper/iterator"
	"github.com/bradenaw/juniper/stream"
)

func main() {
	ctx := context.Background()
	s := stream.FromIterator(iterator.Slice([]int{1, 2, 3, 4, 5, 6}))

	evensStream := stream.Filter(s, func(ctx context.Context, x int) (bool, error) {
		return x%2 == 0, nil
	})
	evens, _ := stream.Collect(ctx, evensStream)
	fmt.Println(evens)

}
Output:

[2 4 6]

func First

func First[T any](s Stream[T], n int) Stream[T]

First returns a Stream that yields the first n items from s.

Example
package main

import (
	"context"
	"fmt"

	"github.com/bradenaw/juniper/iterator"
	"github.com/bradenaw/juniper/stream"
)

func main() {
	ctx := context.Background()
	s := stream.FromIterator(iterator.Slice([]string{"a", "b", "c", "d", "e"}))

	first3Stream := stream.First(s, 3)
	first3, _ := stream.Collect(ctx, first3Stream)
	fmt.Println(first3)

}
Output:

[a b c]

func Flatten

func Flatten[T any](s Stream[Stream[T]]) Stream[T]

Flatten returns a stream that yields all items from all streams yielded by s.

Example
package main

import (
	"context"
	"fmt"

	"github.com/bradenaw/juniper/iterator"
	"github.com/bradenaw/juniper/stream"
)

func main() {
	ctx := context.Background()

	s := stream.FromIterator(iterator.Slice([]stream.Stream[int]{
		stream.FromIterator(iterator.Slice([]int{0, 1, 2})),
		stream.FromIterator(iterator.Slice([]int{3, 4, 5, 6})),
		stream.FromIterator(iterator.Slice([]int{7})),
	}))

	allStream := stream.Flatten(s)
	all, _ := stream.Collect(ctx, allStream)

	fmt.Println(all)

}
Output:

[0 1 2 3 4 5 6 7]

func FlattenSlices added in v0.15.1

func FlattenSlices[T any](s Stream[[]T]) Stream[T]

func FromIterator

func FromIterator[T any](iter iterator.Iterator[T]) Stream[T]

FromIterator returns a Stream that yields the values from iter. This stream ignores the context passed to Next during the call to iter.Next.

func Join

func Join[T any](streams ...Stream[T]) Stream[T]

Join returns a Stream that yields all elements from streams[0], then all elements from streams[1], and so on.

Example
package main

import (
	"context"
	"fmt"

	"github.com/bradenaw/juniper/iterator"
	"github.com/bradenaw/juniper/stream"
)

func main() {
	ctx := context.Background()

	s := stream.Join(
		stream.FromIterator(iterator.Counter(3)),
		stream.FromIterator(iterator.Counter(5)),
		stream.FromIterator(iterator.Counter(2)),
	)

	all, _ := stream.Collect(ctx, s)

	fmt.Println(all)

}
Output:

[0 1 2 0 1 2 3 4 0 1]

func Map

func Map[T any, U any](s Stream[T], f func(context.Context, T) (U, error)) Stream[U]

Map transforms the values of s using the conversion f. If f returns an error, terminates the stream early.

Example
package main

import (
	"context"
	"fmt"

	"github.com/bradenaw/juniper/iterator"
	"github.com/bradenaw/juniper/stream"
)

func main() {
	ctx := context.Background()

	s := stream.FromIterator(iterator.Counter(5))
	halfStream := stream.Map(s, func(ctx context.Context, x int) (float64, error) {
		return float64(x) / 2, nil
	})
	all, _ := stream.Collect(ctx, halfStream)
	fmt.Println(all)

}
Output:

[0 0.5 1 1.5 2]

func Merge added in v0.2.0

func Merge[T any](in ...Stream[T]) Stream[T]

Merge merges the in streams, returning a stream that yields all elements from all of them as they arrive.

Example
package main

import (
	"context"
	"fmt"

	"github.com/bradenaw/juniper/iterator"
	"github.com/bradenaw/juniper/stream"
)

func main() {
	ctx := context.Background()

	a := stream.FromIterator(iterator.Slice([]string{"a", "b", "c"}))
	b := stream.FromIterator(iterator.Slice([]string{"x", "y", "z"}))
	c := stream.FromIterator(iterator.Slice([]string{"m", "n"}))

	s := stream.Merge(a, b, c)

	for {
		item, err := s.Next(ctx)
		if err == stream.End {
			break
		} else if err != nil {
			panic(err)
		}

		fmt.Println(item)
	}

}
Output:

m
b
a
n
x
c
z
y

func Runs

func Runs[T any](s Stream[T], same func(a, b T) bool) Stream[Stream[T]]

Runs returns a stream of streams. The inner streams yield contiguous elements from s such that same(a, b) returns true for any a and b in the run.

The inner stream should be drained before calling Next on the outer stream.

same(a, a) must return true. If same(a, b) and same(b, c) both return true, then same(a, c) must also.

Example
package main

import (
	"context"
	"fmt"

	"github.com/bradenaw/juniper/iterator"
	"github.com/bradenaw/juniper/stream"
)

func main() {
	ctx := context.Background()

	s := stream.FromIterator(iterator.Slice([]int{2, 4, 0, 7, 1, 3, 9, 2, 8}))

	// Contiguous runs of evens/odds.
	parityRuns := stream.Runs(s, func(a, b int) bool {
		return a%2 == b%2
	})

	one, _ := parityRuns.Next(ctx)
	allOne, _ := stream.Collect(ctx, one)
	fmt.Println(allOne)
	two, _ := parityRuns.Next(ctx)
	allTwo, _ := stream.Collect(ctx, two)
	fmt.Println(allTwo)
	three, _ := parityRuns.Next(ctx)
	allThree, _ := stream.Collect(ctx, three)
	fmt.Println(allThree)

}
Output:

[2 4 0]
[7 1 3 9]
[2 8]

func While

func While[T any](s Stream[T], f func(context.Context, T) (bool, error)) Stream[T]

While returns a Stream that terminates before the first item from s for which f returns false. If f returns an error, terminates the stream early.

Example
package main

import (
	"context"
	"fmt"

	"github.com/bradenaw/juniper/iterator"
	"github.com/bradenaw/juniper/stream"
)

func main() {
	ctx := context.Background()

	s := stream.FromIterator(iterator.Slice([]string{
		"aardvark",
		"badger",
		"cheetah",
		"dinosaur",
		"egret",
	}))

	beforeD := stream.While(s, func(ctx context.Context, s string) (bool, error) {
		return s < "d", nil
	})

	out, _ := stream.Collect(ctx, beforeD)
	fmt.Println(out)

}
Output:

[aardvark badger cheetah]

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL