concurrency

package
v0.0.0-...-0450f2b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 16, 2024 License: Apache-2.0 Imports: 6 Imported by: 94

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CreateJobsFromStrings deprecated

func CreateJobsFromStrings(values []string) []interface{}

CreateJobsFromStrings is an utility to create jobs from an slice of strings.

Deprecated: will be removed as it's not needed when using ForEachJob.

func ForEach deprecated

func ForEach(ctx context.Context, jobs []interface{}, concurrency int, jobFunc func(ctx context.Context, job interface{}) error) error

ForEach runs the provided jobFunc for each job up to concurrency concurrent workers. The execution breaks on first error encountered.

Deprecated: use ForEachJob instead.

func ForEachJob

func ForEachJob(ctx context.Context, jobs int, concurrency int, jobFunc func(ctx context.Context, idx int) error) error

ForEachJob runs the provided jobFunc for each job index in [0, jobs) up to concurrency concurrent workers. If the concurrency value is <= 0 all jobs will be executed in parallel.

The execution breaks on first error encountered.

ForEachJob cancels the context.Context passed to each invocation of jobFunc before ForEachJob returns.

func ForEachJobMergeResults

func ForEachJobMergeResults[J any, R any](ctx context.Context, jobs []J, concurrency int, jobFunc func(ctx context.Context, job J) ([]R, error)) ([]R, error)

ForEachJobMergeResults is like ForEachJob but expects jobFunc to return a slice of results which are then merged with results from all jobs. This function returns no results if an error occurred running any jobFunc.

ForEachJobMergeResults cancels the context.Context passed to each invocation of jobFunc before ForEachJobMergeResults returns.

func ForEachUser

func ForEachUser(ctx context.Context, userIDs []string, concurrency int, userFunc func(ctx context.Context, userID string) error) error

ForEachUser runs the provided userFunc for each userIDs up to concurrency concurrent workers. In case userFunc returns error, it will continue to process remaining users but returns an error with all errors userFunc has returned.

Types

type LimitedConcurrencySingleFlight

type LimitedConcurrencySingleFlight struct {
	// contains filtered or unexported fields
}

LimitedConcurrencySingleFlight ensures that across all concurrent calls to ForEachNotInFlight, only up to maxConcurrent tokens are running concurrently. See the docs of ForEachNotInFlight for the uniqueness semantics of tokens.

An example use case for LimitedConcurrencySingleFlight is to run a periodic job concurrently for N users. Sometimes a single user's job may take longer than the period. And this will block future scheduled jobs until the first job for that user completes. Call LimitedConcurrencySingleFlight.ForEachNotInFlight in its own goroutine with user IDs as tokens. This will run the jobs concurrently and make sure that two jobs for the same user don't run concurrently.

func NewLimitedConcurrencySingleFlight

func NewLimitedConcurrencySingleFlight(maxConcurrent int) *LimitedConcurrencySingleFlight

func (*LimitedConcurrencySingleFlight) ForEachNotInFlight

func (w *LimitedConcurrencySingleFlight) ForEachNotInFlight(ctx context.Context, tokens []string, f func(context.Context, string) error) error

ForEachNotInFlight invokes f for every token in tokens that is not in-flight (not still being executed) in a different concurrent call to ForEachNotInFlight. ForEachNotInFlight returns when invocations to f for all such tokens have returned. Upon context cancellation ForEachNotInFlight stops making new invocations of f for tokens and waits for all already started invocations of f to return. ForEachNotInFlight returns the combined errors from all f invocations.

func (*LimitedConcurrencySingleFlight) Wait

Wait returns when there are no in-flight calls to ForEachNotInFlight.

type ReusableGoroutinesPool

type ReusableGoroutinesPool struct {
	// contains filtered or unexported fields
}

func NewReusableGoroutinesPool

func NewReusableGoroutinesPool(size int) *ReusableGoroutinesPool

NewReusableGoroutinesPool creates a new worker pool with the given size. These workers will run the workloads passed through Go() calls. If all workers are busy, Go() will spawn a new goroutine to run the workload.

func (*ReusableGoroutinesPool) Close

func (p *ReusableGoroutinesPool) Close()

Close stops the workers of the pool. No new Go() calls should be performed after calling Close(). Close does NOT wait for all jobs to finish, it is the caller's responsibility to ensure that in the provided workloads. Close is intended to be used in tests to ensure that no goroutines are leaked.

func (*ReusableGoroutinesPool) Go

func (p *ReusableGoroutinesPool) Go(f func())

Go will run the given function in a worker of the pool. If all workers are busy, Go() will spawn a new goroutine to run the workload.

type SyncBuffer

type SyncBuffer struct {
	// contains filtered or unexported fields
}

SyncBuffer is a io.writer implementation with atomic writes. It only keeps data in memory.

func (*SyncBuffer) Reset

func (sb *SyncBuffer) Reset()

func (*SyncBuffer) String

func (sb *SyncBuffer) String() string

func (*SyncBuffer) Write

func (sb *SyncBuffer) Write(p []byte) (n int, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL