Documentation ¶
Index ¶
- func Bulk[T any](ctx context.Context, ch <-chan T, count int, ...) <-chan []T
- func CopyFirst[T any](ctx context.Context, input <-chan T) (first T, forward <-chan T, err error)
- func ErrgroupReceive(ctx context.Context, g *errgroup.Group, err <-chan error)
- func WaitAsync(ctx context.Context, w Waiter) <-chan error
- type BulkChunkSplitPolicy
- type BulkChunkSplitPolicyFactory
- type Bulker
- type Counter
- type ProcessBulk
- type Waiter
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Bulk ¶
func Bulk[T any]( ctx context.Context, ch <-chan T, count int, splitPolicyFactory BulkChunkSplitPolicyFactory[T], ) <-chan []T
Bulk reads all values from a channel and streams them in chunks into a returned channel.
func CopyFirst ¶
func CopyFirst[T any]( ctx context.Context, input <-chan T, ) (first T, forward <-chan T, err error)
CopyFirst asynchronously forwards all items from input to forward and synchronously returns the first item.
func ErrgroupReceive ¶
ErrgroupReceive adds a goroutine to the specified group that returns the first non-nil error (if any) from the specified channel. If the channel is closed, it will return nil.
Types ¶
type BulkChunkSplitPolicy ¶
BulkChunkSplitPolicy is a state machine which tracks the items of a chunk a bulker assembles. A call takes an item for the current chunk into account. Output true indicates that the state machine was reset first and the bulker shall finish the current chunk now (not e.g. once $size is reached) without the given item.
func NeverSplit ¶
func NeverSplit[T any]() BulkChunkSplitPolicy[T]
NeverSplit returns a pseudo state machine which never demands splitting.
type BulkChunkSplitPolicyFactory ¶
type BulkChunkSplitPolicyFactory[T any] func() BulkChunkSplitPolicy[T]
type Bulker ¶
type Bulker[T any] struct { // contains filtered or unexported fields }
Bulker reads all values from a channel and streams them in chunks into a Bulk channel.
type Counter ¶
type Counter struct {
// contains filtered or unexported fields
}
Counter implements an atomic counter.
type ProcessBulk ¶
func ForwardBulk ¶
func ForwardBulk[T any](ch chan<- T) ProcessBulk[T]