core

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 27, 2024 License: MIT Imports: 5 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Batch

func Batch[A any](in <-chan A, size int, timeout time.Duration) <-chan []A

Batch groups items from an input channel into batches based on a maximum size and a timeout. A batch is emitted when it reaches the maximum size, the timeout expires, or the input channel closes. This function never emits empty batches. The timeout countdown starts when the first item is added to a new batch. To emit batches only when full, set the timeout to -1. Zero timeout is not supported and will panic.

func Buffer

func Buffer[A any](in <-chan A, size int) <-chan A

func Delay

func Delay[A any](in <-chan A, delay time.Duration) <-chan A

Delay postpones the delivery of items from an input channel by a specified duration, maintaining the order. Useful for adding delays in processing or simulating latency.

func Drain

func Drain[A any](in <-chan A)

func DrainNB

func DrainNB[A any](in <-chan A)

func FilterMap added in v0.3.0

func FilterMap[A, B any](in <-chan A, n int, f func(A) (B, bool)) <-chan B

func ForEach added in v0.3.0

func ForEach[A any](in <-chan A, n int, f func(A))

ForEach is a blocking function that processes input channel concurrently using n goroutines

func Loop

func Loop[A, B any](in <-chan A, done chan<- B, n int, f func(A))

Loop allows to process items from the input channel concurrently using n goroutines. If done channel is not nil, it will be closed after all items are processed.

func MapAndSplit

func MapAndSplit[A, B any](in <-chan A, numOuts int, n int, f func(A) (B, int)) []<-chan B

func MapReduce added in v0.2.0

func MapReduce[A any, K comparable, V any](in <-chan A, nm int, mapper func(A) (K, V), nr int, reducer func(V, V) V) map[K]V

MapReduce applies a map-reduce pattern to the input channel. First inout is converted into key-value pairs using the mapper function and nm goroutines. If there are multiple values for the same key, they are reduced into a single value using the reducer function and nr goroutines. The result is a map where each key is associated with a single value.

func Merge

func Merge[A any](ins ...<-chan A) <-chan A

func OrderedFilterMap added in v0.3.0

func OrderedFilterMap[A, B any](in <-chan A, n int, f func(A) (B, bool)) <-chan B

func OrderedLoop

func OrderedLoop[A, B any](in <-chan A, done chan<- B, n int, f func(a A, canWrite <-chan struct{}))

OrderedLoop is similar to Loop, but it allows to write results to some channel in the same order as items were read from the input. If done channel is not nil, it will be closed after all items are processed. Special "canWrite" channel is passed to user's function f. Typical f function looks like this: - Do some processing (this part is executed concurrently). - Read from canWrite channel exactly once. This step is required. Otherwise, behavior is undefined. - Write result of the processing somewhere. This step is optional. This way processing is done concurrently, but results are written in order.

func OrderedMapAndSplit

func OrderedMapAndSplit[A, B any](in <-chan A, numOuts int, n int, f func(A) (B, int)) []<-chan B

func Reduce added in v0.2.0

func Reduce[A any](in <-chan A, n int, f func(A, A) A) (A, bool)

Reduce reduces the input channel into a single value using the provided function, using n goroutines for concurrency

func Unbatch

func Unbatch[A any](in <-chan []A) <-chan A

Unbatch is the inverse of Batch. It takes a channel of batches and emits individual items.

Types

type OnceWithWait added in v0.3.0

type OnceWithWait struct {
	// contains filtered or unexported fields
}

OnceWithWait is like sync.Once, but also allows waiting until the first call is complete.

func (*OnceWithWait) Do added in v0.3.0

func (o *OnceWithWait) Do(f func())

Do executes the function f only once, no matter how many times Do is called. It also signals any goroutines waiting on Wait().

func (*OnceWithWait) Wait added in v0.3.0

func (o *OnceWithWait) Wait()

Wait blocks until the first call to Do is complete. It returns immediately if Do has already been called.

func (*OnceWithWait) WasCalled added in v0.3.0

func (o *OnceWithWait) WasCalled() bool

WasCalled returns true if Do has been called.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL