Documentation ¶
Index ¶
- func CreateJobsFromStrings(values []string) []interface{}deprecated
- func ForEach(ctx context.Context, jobs []interface{}, concurrency int, ...) errordeprecated
- func ForEachJob(ctx context.Context, jobs int, concurrency int, ...) error
- func ForEachJobMergeResults[J any, R any](ctx context.Context, jobs []J, concurrency int, ...) ([]R, error)
- func ForEachUser(ctx context.Context, userIDs []string, concurrency int, ...) error
- type LimitedConcurrencySingleFlight
- type ReusableGoroutinesPool
- type SyncBuffer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CreateJobsFromStrings
deprecated
func CreateJobsFromStrings(values []string) []interface{}
CreateJobsFromStrings is an utility to create jobs from an slice of strings.
Deprecated: will be removed as it's not needed when using ForEachJob.
func ForEach
deprecated
func ForEach(ctx context.Context, jobs []interface{}, concurrency int, jobFunc func(ctx context.Context, job interface{}) error) error
ForEach runs the provided jobFunc for each job up to concurrency concurrent workers. The execution breaks on first error encountered.
Deprecated: use ForEachJob instead.
func ForEachJob ¶
func ForEachJob(ctx context.Context, jobs int, concurrency int, jobFunc func(ctx context.Context, idx int) error) error
ForEachJob runs the provided jobFunc for each job index in [0, jobs) up to concurrency concurrent workers. If the concurrency value is <= 0 all jobs will be executed in parallel.
The execution breaks on first error encountered.
ForEachJob cancels the context.Context passed to each invocation of jobFunc before ForEachJob returns.
func ForEachJobMergeResults ¶
func ForEachJobMergeResults[J any, R any](ctx context.Context, jobs []J, concurrency int, jobFunc func(ctx context.Context, job J) ([]R, error)) ([]R, error)
ForEachJobMergeResults is like ForEachJob but expects jobFunc to return a slice of results which are then merged with results from all jobs. This function returns no results if an error occurred running any jobFunc.
ForEachJobMergeResults cancels the context.Context passed to each invocation of jobFunc before ForEachJobMergeResults returns.
func ForEachUser ¶
func ForEachUser(ctx context.Context, userIDs []string, concurrency int, userFunc func(ctx context.Context, userID string) error) error
ForEachUser runs the provided userFunc for each userIDs up to concurrency concurrent workers. In case userFunc returns error, it will continue to process remaining users but returns an error with all errors userFunc has returned.
Types ¶
type LimitedConcurrencySingleFlight ¶
type LimitedConcurrencySingleFlight struct {
// contains filtered or unexported fields
}
LimitedConcurrencySingleFlight ensures that across all concurrent calls to ForEachNotInFlight, only up to maxConcurrent tokens are running concurrently. See the docs of ForEachNotInFlight for the uniqueness semantics of tokens.
An example use case for LimitedConcurrencySingleFlight is to run a periodic job concurrently for N users. Sometimes a single user's job may take longer than the period. And this will block future scheduled jobs until the first job for that user completes. Call LimitedConcurrencySingleFlight.ForEachNotInFlight in its own goroutine with user IDs as tokens. This will run the jobs concurrently and make sure that two jobs for the same user don't run concurrently.
func NewLimitedConcurrencySingleFlight ¶
func NewLimitedConcurrencySingleFlight(maxConcurrent int) *LimitedConcurrencySingleFlight
func (*LimitedConcurrencySingleFlight) ForEachNotInFlight ¶
func (w *LimitedConcurrencySingleFlight) ForEachNotInFlight(ctx context.Context, tokens []string, f func(context.Context, string) error) error
ForEachNotInFlight invokes f for every token in tokens that is not in-flight (not still being executed) in a different concurrent call to ForEachNotInFlight. ForEachNotInFlight returns when invocations to f for all such tokens have returned. Upon context cancellation ForEachNotInFlight stops making new invocations of f for tokens and waits for all already started invocations of f to return. ForEachNotInFlight returns the combined errors from all f invocations.
func (*LimitedConcurrencySingleFlight) Wait ¶
func (w *LimitedConcurrencySingleFlight) Wait()
Wait returns when there are no in-flight calls to ForEachNotInFlight.
type ReusableGoroutinesPool ¶
type ReusableGoroutinesPool struct {
// contains filtered or unexported fields
}
func NewReusableGoroutinesPool ¶
func NewReusableGoroutinesPool(size int) *ReusableGoroutinesPool
NewReusableGoroutinesPool creates a new worker pool with the given size. These workers will run the workloads passed through Go() calls. If all workers are busy, Go() will spawn a new goroutine to run the workload.
func (*ReusableGoroutinesPool) Close ¶
func (p *ReusableGoroutinesPool) Close()
Close stops the workers of the pool. No new Go() calls should be performed after calling Close(). Close does NOT wait for all jobs to finish, it is the caller's responsibility to ensure that in the provided workloads. Close is intended to be used in tests to ensure that no goroutines are leaked.
func (*ReusableGoroutinesPool) Go ¶
func (p *ReusableGoroutinesPool) Go(f func())
Go will run the given function in a worker of the pool. If all workers are busy, Go() will spawn a new goroutine to run the workload.
type SyncBuffer ¶
type SyncBuffer struct {
// contains filtered or unexported fields
}
SyncBuffer is a io.writer implementation with atomic writes. It only keeps data in memory.
func (*SyncBuffer) Reset ¶
func (sb *SyncBuffer) Reset()
func (*SyncBuffer) String ¶
func (sb *SyncBuffer) String() string