Documentation ¶
Overview ¶
Package iterators provides generic iterators, that is constructs that may be iterated over using Next() in a loop, then calling some Item() (T, error) method to retrieve the current iterated value.
When iteration is complete, the iterator resources should be relinquished using Close().
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ChanIterator ¶
type ChanIterator[T any] struct { // contains filtered or unexported fields }
ChanIterator is a channel-based iterator that may be used to run a collection of StructIterators in parallel.
Notice that its asynchronous working does not make it suitable to collect ordered items.
The ChanIterator is goroutine-safe and may be iterated by several concurrent goroutines.
The input is collected from a collection of input StructIterators, then may be collected by one or several goroutines reading from the ChanIterator using Next() and Item().
Item() may return io.EOF is the iterator is done with producing records (e.g. some other consumer reached the end of the stream).
WithChanFanInBuffers may be used to pre-fetch from input iterators asynchronously.
Methods Collect() and CollectPrt() can't be used by concurrent goroutines and are protected against such a misuse.
Example ¶
baseIterators := []iterators.StructIterator[SampleStruct]{ iterators.NewSliceIterator[SampleStruct](testSlice()), iterators.NewSliceIterator[SampleStruct](testSlice()), iterators.NewSliceIterator[SampleStruct](testSlice()), } group, ctx := errgroup.WithContext(context.Background()) iterator := iterators.NewChanIterator[SampleStruct](ctx, baseIterators) defer func() { _ = iterator.Close() }() var mx sync.Mutex items := make(SortableStructs, 0, 6) latch := make(chan struct{}) // In this example, we iterate in parallel: // 3 producer iterators and 3 consumer iterators are running in parallel against // a single inner channel. for i := 0; i < 3; i++ { group.Go(func() error { <-latch count := 0 defer func() { fmt.Fprintf(os.Stderr, "goroutine count: %d\n", count) // stderr doesn't count for example asserted output }() for iterator.Next() { item, err := iterator.Item() if err != nil { if errors.Is(err, io.EOF) { return nil } return err } mx.Lock() items = append(items, item) mx.Unlock() count++ } return nil }) } close(latch) if err := group.Wait(); err != nil { fmt.Printf("an error occured: %v", err) } sort.Sort(items) fmt.Printf("count: %d\n", len(items)) fmt.Printf("items: %v\n", items)
Output: count: 6 items: [{1 x}, {1 x}, {1 x}, {2 y}, {2 y}, {2 y}]
func NewChanIterator ¶
func NewChanIterator[T any](ctx context.Context, iterators []StructIterator[T], opts ...ChanIteratorOption) *ChanIterator[T]
NewChanIterator builds a ChanIterator and starts the goroutines pumping items from the input iterators.
All goroutines are terminated and input iterators closed if the context is cancelled.
func (*ChanIterator[T]) Close ¶
func (d *ChanIterator[T]) Close() error
func (*ChanIterator[T]) Collect ¶
func (d *ChanIterator[T]) Collect() ([]T, error)
func (*ChanIterator[T]) CollectPtr ¶
func (d *ChanIterator[T]) CollectPtr() ([]*T, error)
func (*ChanIterator[T]) Item ¶
func (d *ChanIterator[T]) Item() (T, error)
func (*ChanIterator[T]) Next ¶
func (d *ChanIterator[T]) Next() bool
Example ¶
baseIterators := []iterators.StructIterator[SampleStruct]{ iterators.NewSliceIterator[SampleStruct](testSlice()), iterators.NewSliceIterator[SampleStruct](testSlice()), } count := 0 iterator := iterators.NewChanIterator[SampleStruct](context.Background(), baseIterators) defer func() { _ = iterator.Close() }() items := make(SortableStructs, 0, 4) for iterator.Next() { item, err := iterator.Item() if err != nil { if !errors.Is(err, io.EOF) { fmt.Printf("err: %v\n", err) } break } count++ items = append(items, item) } sort.Sort(items) fmt.Printf("count: %d\n", count) fmt.Printf("items: %v\n", items)
Output: count: 4 items: [{1 x}, {1 x}, {2 y}, {2 y}]
type ChanIteratorOption ¶
type ChanIteratorOption func(*chanIteratorOptions)
ChanIteratorOption provides options to the ChanIterator
func WithChanFanInBuffers ¶
func WithChanFanInBuffers(n int) ChanIteratorOption
WithChanFanInBuffers allocates buffers to fan-in the input results.
The default value is the number of underlying iterators.
func WithChanPreallocatedItems ¶
func WithChanPreallocatedItems(n int) ChanIteratorOption
WithChanPreallocatedItems preallocate n items in the returned slice when using the Collect and CollectPtr methods.
type IteratorContext ¶
type IteratorContext struct {
Iterated int
}
func GetIteratorContext ¶
GetIteratorContext allows the retrieval of the context of the iterator from within a transformer.
type RowsIterator ¶
type RowsIterator[R ScannableIterator, T any] struct { // contains filtered or unexported fields }
RowsIterator transforms a ScannableIterator of type R (e.g. a DB cursor such as sqlx.Rows) into a StructIterator with target type T.
Rows iteratated over R are scanned into structs of type T.
Notice that the rows iterator is not goroutine-safe and should not be iterated concurrently.
func NewRowsIterator ¶
func NewRowsIterator[R ScannableIterator, T any](rows R, opts ...RowsIteratorOption) *RowsIterator[R, T]
NewRowsIterator makes a StructIterator[T] from a ScannableIterator.
func (*RowsIterator[R, T]) Close ¶
func (ri *RowsIterator[R, T]) Close() error
func (*RowsIterator[R, T]) Collect ¶
func (ri *RowsIterator[R, T]) Collect() ([]T, error)
Example ¶
package main import ( "fmt" "log" "github.com/fredbi/go-patterns/iterators" "github.com/fredbi/go-patterns/iterators/internal/testdb" ) func main() { dbName := testdb.UniqueDBName() db, err := testdb.CreateDBAndData(dbName) if err != nil { log.Fatalf("could not create test DB: %v", err) } rows, err := testdb.OpenDBCursor(db) if err != nil { log.Fatalf("could not create test DB: %v", err) } iterator := iterators.NewSqlxIterator[testdb.DummyRow](rows) items, err := iterator.Collect() if err != nil { fmt.Printf("err: %v\n", err) } fmt.Printf("items: %#v\n", items) fmt.Printf("count: %d\n", len(items)) }
Output: items: []testdb.DummyRow{testdb.DummyRow{A:1, B:"x"}, testdb.DummyRow{A:2, B:"y"}} count: 2
func (*RowsIterator[R, T]) CollectPtr ¶
func (ri *RowsIterator[R, T]) CollectPtr() ([]*T, error)
Example ¶
package main import ( "fmt" "log" "github.com/fredbi/go-patterns/iterators" "github.com/fredbi/go-patterns/iterators/internal/testdb" ) func main() { dbName := testdb.UniqueDBName() db, err := testdb.CreateDBAndData(dbName) if err != nil { log.Fatalf("could not create test DB: %v", err) } rows, err := testdb.OpenDBCursor(db) if err != nil { log.Fatalf("could not create test DB: %v", err) } iterator := iterators.NewSqlxIterator[testdb.DummyRow](rows) items, err := iterator.CollectPtr() if err != nil { fmt.Printf("err: %v\n", err) } fmt.Printf("count: %d\n", len(items)) }
Output: count: 2
func (*RowsIterator[R, T]) Item ¶
func (ri *RowsIterator[R, T]) Item() (T, error)
func (*RowsIterator[R, T]) Next ¶
func (ri *RowsIterator[R, T]) Next() bool
Example ¶
package main import ( "fmt" "log" "github.com/fredbi/go-patterns/iterators" "github.com/fredbi/go-patterns/iterators/internal/testdb" "github.com/jmoiron/sqlx" ) func main() { dbName := testdb.UniqueDBName() // create a DB and fill a table with some data db, err := testdb.CreateDBAndData(dbName) if err != nil { log.Fatalf("could not create test DB: %v", err) } // open a cursor selecting over the test data rows, err := testdb.OpenDBCursor(db) if err != nil { log.Fatalf("could not query DB: %v", err) } iterator := iterators.NewRowsIterator[*sqlx.Rows, testdb.DummyRow](rows) defer func() { _ = iterator.Close() }() count := 0 for iterator.Next() { item, err := iterator.Item() if err != nil { fmt.Printf("err: %v\n", err) } count++ fmt.Printf("item: %#v\n", item) } fmt.Printf("count: %d\n", count) }
Output: item: testdb.DummyRow{A:1, B:"x"} item: testdb.DummyRow{A:2, B:"y"} count: 2
type RowsIteratorOption ¶
type RowsIteratorOption func(*rowsIteratorOptions)
RowsIteratorOption provides options to the RowsIterator
func WithRowsPreallocatedItems ¶
func WithRowsPreallocatedItems(n int) RowsIteratorOption
WithRowsPreallocatedItems preallocate n items in the returned slice when using the Collect and CollectPtr methods.
The default value is 1000.
type ScannableIterator ¶
ScannableIterator is an iterator over DB records that can be scanned.
type SliceIterator ¶
type SliceIterator[T any] struct { // contains filtered or unexported fields }
SliceIterator constructs an iterator based on a slice of items.
This very simple iterator is essentially used for testing.
func NewSliceIterator ¶
func NewSliceIterator[T any](rows []T) *SliceIterator[T]
NewSliceIterator constructs a SliceIterator from a slice of items (rows).
func (*SliceIterator[T]) Close ¶
func (si *SliceIterator[T]) Close() error
func (*SliceIterator[T]) Collect ¶
func (si *SliceIterator[T]) Collect() ([]T, error)
Example ¶
iterator := iterators.NewSliceIterator[SampleStruct](testSlice()) items, err := iterator.Collect() if err != nil { fmt.Printf("err: %v\n", err) } fmt.Printf("items: %#v\n", items) fmt.Printf("count: %d\n", len(items))
Output: items: []iterators_test.SampleStruct{iterators_test.SampleStruct{A:1, B:"x"}, iterators_test.SampleStruct{A:2, B:"y"}} count: 2
func (*SliceIterator[T]) CollectPtr ¶
func (si *SliceIterator[T]) CollectPtr() ([]*T, error)
Example ¶
iterator := iterators.NewSliceIterator[SampleStruct](testSlice()) items, err := iterator.CollectPtr() if err != nil { fmt.Printf("err: %v\n", err) } fmt.Printf("count: %d\n", len(items))
Output: count: 2
func (*SliceIterator[T]) Item ¶
func (si *SliceIterator[T]) Item() (T, error)
func (*SliceIterator[T]) Next ¶
func (si *SliceIterator[T]) Next() bool
Example ¶
iterator := iterators.NewSliceIterator[SampleStruct](testSlice()) defer func() { _ = iterator.Close() }() count := 0 for iterator.Next() { item, err := iterator.Item() if err != nil { fmt.Printf("err: %v\n", err) } count++ fmt.Printf("item: %#v\n", item) } fmt.Printf("count: %d\n", count)
Output: item: iterators_test.SampleStruct{A:1, B:"x"} item: iterators_test.SampleStruct{A:2, B:"y"} count: 2
type SqlxIterator ¶
type SqlxIterator[T any] struct { *RowsIterator[*sqlx.Rows, T] }
SqlxIterator is a shorthand for RowsIterator[*sqlx.Rows, T].
func NewSqlxIterator ¶
func NewSqlxIterator[T any](rows *sqlx.Rows, opts ...RowsIteratorOption) *SqlxIterator[T]
NewSqlxIterator makes a SqlxIterator[T] producing items of type T from a github.com/jmoiron/sqlx.Rows cursor.
type StructIterator ¶
type StructIterator[T any] interface { Iterator // Item return the current iterated item. // // Next() must have been called at least once. Item() (T, error) // Collect returns all items in one slice, then closes the iterator Collect() ([]T, error) // CollectPtr returns all items in one slice of pointers, then closes the iterator CollectPtr() ([]*T, error) }
StructIterator is an iterator that delivers items of some type T.
type TransformIterator ¶
type TransformIterator[S, T any] struct { StructIterator[S] // contains filtered or unexported fields }
TransformIterator transforms any iterator into an iterator that transforms the input of type S into type T at every call to Item().
Example ¶
// this example interrupts the iterations after 1 iteration, using // a transformer and the iterator's context baseIterator := iterators.NewSliceIterator[SampleStruct](testSlice()) transformer := func(ctx context.Context, in SampleStruct) (SampleStruct, error) { ictx := iterators.GetIteratorContext(ctx) var index int // this retrieves the current iterated count from the context if ictx != nil { index = ictx.Iterated } if index > 1 { return SampleStruct{}, io.EOF } return in, nil } iterator := iterators.NewTransformIterator[SampleStruct, SampleStruct](context.Background(), baseIterator, transformer) defer func() { _ = iterator.Close() }() count := 0 for iterator.Next() { item, err := iterator.Item() if err != nil { if errors.Is(err, io.EOF) { break } fmt.Printf("err: %v\n", err) break } count++ fmt.Printf("item: %#v\n", item) } fmt.Printf("count: %d\n", count)
Output: item: iterators_test.SampleStruct{A:1, B:"x"} count: 1
func NewTransformIterator ¶
func NewTransformIterator[S, T any](ctx context.Context, iterator StructIterator[S], transformer TransformerCtx[S, T], opts ...RowsIteratorOption) *TransformIterator[S, T]
NewTransformIterator makes a StructIterator[T] from a ScannableIterator.
The parent context provided allows the transformer to know about the current context of the iterator.
This is useful if the transformation depends on the currently iterated step.
Notice that the transformer may also perform some other things, e.g. logging, collecting some stats or traces.
func (*TransformIterator[S, T]) CollectPtr ¶
func (rt *TransformIterator[S, T]) CollectPtr() ([]*T, error)
func (*TransformIterator[S, T]) Next ¶
func (rt *TransformIterator[S, T]) Next() bool
Example ¶
baseIterator := iterators.NewSliceIterator[SampleStruct](testSlice()) transformer := func(ctx context.Context, in SampleStruct) (TransformedStruct, error) { ictx := iterators.GetIteratorContext(ctx) var index int // this retrieves the current iterated count from the context if ictx != nil { index = ictx.Iterated } fmt.Printf("transforming iteration %d\n", index) return TransformedStruct{ X: in.A + index, Y: strings.Repeat(in.B, 2), }, nil } iterator := iterators.NewTransformIterator[SampleStruct, TransformedStruct](context.Background(), baseIterator, transformer) defer func() { _ = iterator.Close() }() count := 0 for iterator.Next() { item, err := iterator.Item() if err != nil { fmt.Printf("err: %v\n", err) } count++ fmt.Printf("item: %#v\n", item) } fmt.Printf("count: %d\n", count)
Output: transforming iteration 1 item: iterators_test.TransformedStruct{X:2, Y:"xx"} transforming iteration 2 item: iterators_test.TransformedStruct{X:4, Y:"yy"} count: 2