Documentation
¶
Index ¶
- func Batch[A any](in <-chan A, size int, timeout time.Duration) <-chan []A
- func Buffer[A any](in <-chan A, size int) <-chan A
- func Delay[A any](in <-chan A, delay time.Duration) <-chan A
- func Drain[A any](in <-chan A)
- func DrainNB[A any](in <-chan A)
- func FilterMap[A, B any](in <-chan A, n int, f func(A) (B, bool)) <-chan B
- func ForEach[A any](in <-chan A, n int, f func(A))
- func Loop[A, B any](in <-chan A, done chan<- B, n int, f func(A))
- func MapAndSplit[A, B any](in <-chan A, numOuts int, n int, f func(A) (B, int)) []<-chan B
- func MapReduce[A any, K comparable, V any](in <-chan A, nm int, mapper func(A) (K, V), nr int, reducer func(V, V) V) map[K]V
- func Merge[A any](ins ...<-chan A) <-chan A
- func OrderedFilterMap[A, B any](in <-chan A, n int, f func(A) (B, bool)) <-chan B
- func OrderedLoop[A, B any](in <-chan A, done chan<- B, n int, f func(a A, canWrite <-chan struct{}))
- func OrderedMapAndSplit[A, B any](in <-chan A, numOuts int, n int, f func(A) (B, int)) []<-chan B
- func Reduce[A any](in <-chan A, n int, f func(A, A) A) (A, bool)
- func Unbatch[A any](in <-chan []A) <-chan A
- type OnceWithWait
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Batch ¶
Batch groups items from an input channel into batches based on a maximum size and a timeout. A batch is emitted when it reaches the maximum size, the timeout expires, or the input channel closes. This function never emits empty batches. The timeout countdown starts when the first item is added to a new batch. To emit batches only when full, set the timeout to -1. Zero timeout is not supported and will panic.
func Delay ¶
Delay postpones the delivery of items from an input channel by a specified duration, maintaining the order. Useful for adding delays in processing or simulating latency.
func ForEach ¶ added in v0.3.0
ForEach is a blocking function that processes input channel concurrently using n goroutines
func Loop ¶
Loop allows to process items from the input channel concurrently using n goroutines. If done channel is not nil, it will be closed after all items are processed.
func MapAndSplit ¶
func MapReduce ¶ added in v0.2.0
func MapReduce[A any, K comparable, V any](in <-chan A, nm int, mapper func(A) (K, V), nr int, reducer func(V, V) V) map[K]V
MapReduce applies a map-reduce pattern to the input channel. First inout is converted into key-value pairs using the mapper function and nm goroutines. If there are multiple values for the same key, they are reduced into a single value using the reducer function and nr goroutines. The result is a map where each key is associated with a single value.
func OrderedFilterMap ¶ added in v0.3.0
func OrderedLoop ¶
func OrderedLoop[A, B any](in <-chan A, done chan<- B, n int, f func(a A, canWrite <-chan struct{}))
OrderedLoop is similar to Loop, but it allows to write results to some channel in the same order as items were read from the input. If done channel is not nil, it will be closed after all items are processed. Special "canWrite" channel is passed to user's function f. Typical f function looks like this: - Do some processing (this part is executed concurrently). - Read from canWrite channel exactly once. This step is required. Otherwise, behavior is undefined. - Write result of the processing somewhere. This step is optional. This way processing is done concurrently, but results are written in order.
func OrderedMapAndSplit ¶
Types ¶
type OnceWithWait ¶ added in v0.3.0
type OnceWithWait struct {
// contains filtered or unexported fields
}
OnceWithWait is like sync.Once, but also allows waiting until the first call is complete.
func (*OnceWithWait) Do ¶ added in v0.3.0
func (o *OnceWithWait) Do(f func())
Do executes the function f only once, no matter how many times Do is called. It also signals any goroutines waiting on Wait().
func (*OnceWithWait) Wait ¶ added in v0.3.0
func (o *OnceWithWait) Wait()
Wait blocks until the first call to Do is complete. It returns immediately if Do has already been called.
func (*OnceWithWait) WasCalled ¶ added in v0.3.0
func (o *OnceWithWait) WasCalled() bool
WasCalled returns true if Do has been called.