Documentation ¶
Index ¶
- Variables
- func CancelAll[T SilentTask](tasks []T)
- func DoJitter(doFn func(), maxJitterDurationInMilliseconds int) int
- func ForkJoin[T SilentTask](ctx context.Context, tasks []T)
- func ForkJoinFailFast[T SilentTask](ctx context.Context, tasks []T) error
- func WaitAll[T SilentTask](tasks []T)
- type Batcher
- type BatcherOption
- type PanicRecoverWork
- type PartitionFunc
- type Partitioner
- type SilentBatcher
- type SilentTask
- func AddJitterST(t SilentTask, maxJitterDurationInMilliseconds int) SilentTask
- func ContinueInSilence(currentTask SilentTask, nextAction func(context.Context, error) error) SilentTask
- func ContinueWithNoResult[T any](currentTask Task[T], nextAction func(context.Context, T, error) error) SilentTask
- func InvokeInSilence(ctx context.Context, action SilentWork) SilentTask
- func NewSilentTask(action SilentWork) SilentTask
- func NewSilentTasks(actions ...SilentWork) []SilentTask
- func Repeat(ctx context.Context, interval time.Duration, action SilentWork) SilentTask
- func RunWithConcurrencyLevelC[T SilentTask](ctx context.Context, concurrencyLevel int, tasks <-chan T) SilentTask
- func RunWithConcurrencyLevelS[T SilentTask](ctx context.Context, concurrencyLevel int, tasks []T) SilentTask
- func Spread[T SilentTask](ctx context.Context, tasks []T, within time.Duration) SilentTask
- func Throttle[T SilentTask](ctx context.Context, tasks []T, rateLimit int, every time.Duration) SilentTask
- type SilentWork
- type State
- type Task
- func AddJitterT[T any](t Task[T], maxJitterDurationInMilliseconds int) Task[T]
- func Completed[T any](result T, err error) Task[T]
- func ContinueWith[T any, S any](currentTask Task[T], nextAction func(context.Context, T, error) (S, error)) Task[S]
- func ContinueWithResult[T any](currentTask SilentTask, nextAction func(context.Context, error) (T, error)) Task[T]
- func Invoke[T any](ctx context.Context, action Work[T]) Task[T]
- func NewTask[T any](action Work[T]) Task[T]
- func NewTasks[T any](actions ...Work[T]) []Task[T]
- type Work
- type WorkerPool
- type WorkerPoolOption
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var (
ErrBatchProcessorNotActive = errors.New("batch processor has already shut down")
)
var ErrCancelled = errors.New("task cancelled")
ErrCancelled is returned when a task gets cancelled.
Functions ¶
func CancelAll ¶
func CancelAll[T SilentTask](tasks []T)
CancelAll cancels all given tasks.
Note: task cannot be nil
func DoJitter ¶
DoJitter adds a random jitter before executing doFn, then returns the jitter duration. Why jitter?
http://highscalability.com/blog/2012/4/17/youtube-strategy-adding-jitter-isnt-a-bug.html
Example ¶
t1 := InvokeInSilence( context.Background(), func(ctx context.Context) error { DoJitter( func() { fmt.Println("do something after random jitter") }, 1000, ) return nil }, ) t2 := AddJitterT( NewTask( func(ctx context.Context) (int, error) { fmt.Println("return 1 after random jitter") return 1, nil }, ), 1000, ).Run(context.Background()) t3 := AddJitterST( NewSilentTask( func(ctx context.Context) error { fmt.Println("return nil after random jitter") return nil }, ), 1000, ).Execute(context.Background()) WaitAll([]SilentTask{t1, t2, t3})
Output:
func ForkJoin ¶
func ForkJoin[T SilentTask](ctx context.Context, tasks []T)
ForkJoin executes given tasks in parallel and waits for ALL to complete before returning.
Note: task cannot be nil
Example ¶
first := NewTask( func(context.Context) (int, error) { return 1, nil }, ) second := NewTask( func(context.Context) (interface{}, error) { return nil, errors.New("some error") }, ) ForkJoin(context.Background(), []SilentTask{first, second}) fmt.Println(first.Outcome()) fmt.Println(second.Outcome())
Output: 1 <nil> <nil> some error
func ForkJoinFailFast ¶ added in v1.0.2
func ForkJoinFailFast[T SilentTask](ctx context.Context, tasks []T) error
ForkJoinFailFast executes given tasks in parallel and waits for the 1st task to fail and returns immediately or for ALL to complete successfully before returning.
Note: task cannot be nil
func WaitAll ¶
func WaitAll[T SilentTask](tasks []T)
WaitAll waits for all executed tasks to finish.
Note: task cannot be nil
Types ¶
type Batcher ¶
type Batcher[P any, T any] interface { // Append adds a new payload to the batch and returns a task for that particular payload. // Clients MUST execute the returned task before blocking and waiting for it to complete // to extract result. Append(payload P) Task[T] // contains filtered or unexported methods }
Batcher is a batch processor which is suitable for sitting in the back to receive tasks from callers to execute in one go and then return individual result to each caller.
Example ¶
b := NewBatcher( func(input []int) ([]int, error) { fmt.Println(input) result := make([]int, len(input)) for idx, number := range input { result[idx] = number * 2 } return result, nil }, nil, ) defer b.Shutdown() t1 := b.Append(1) t2 := b.Append(2) b.Process(context.Background()) ContinueWithNoResult( t1, func(_ context.Context, v int, err error) error { fmt.Println(v) return nil }, ).ExecuteSync(context.Background()) ContinueWithNoResult( t2, func(_ context.Context, v int, err error) error { fmt.Println(v) return nil }, ).ExecuteSync(context.Background())
Output: [1 2] 2 4
func NewBatcher ¶
func NewBatcher[P any, T any]( processFn func([]P) ([]T, error), payloadKeyExtractor func(P) any, options ...BatcherOption, ) Batcher[P, T]
NewBatcher returns a new Batcher
type BatcherOption ¶
type BatcherOption func(*batcherConfigs)
func WithAutoProcessInterval ¶
func WithAutoProcessInterval(autoProcessIntervalInMilliseconds time.Duration) BatcherOption
WithAutoProcessInterval sets the interval at which Batcher will automatically process the pending tasks. If `autoProcessDurationInMilliseconds <= 0`, the default behavior applies: no periodic auto processing will be done.
Note: if periodic auto processing is ON, clients MUST call Batcher.Shutdown() to clean up the timer goroutine properly in order to avoid memory leak.
func WithAutoProcessSize ¶
func WithAutoProcessSize(autoProcessSize int) BatcherOption
WithAutoProcessSize sets the limit at which Batcher will automatically process the pending tasks. If `autoProcessSize <= 0`, the default behavior applies: no auto processing will be done based on size.
func WithShutdownGraceDuration ¶
func WithShutdownGraceDuration(shutdownDurationInMilliseconds time.Duration) BatcherOption
WithShutdownGraceDuration specifies how long Batcher will wait for the Shutdown operation to complete before returning. If `shutdownDurationInMilliseconds <= 0`, Batcher will block and wait until the shutdown operation fully completes.
type PanicRecoverWork ¶ added in v1.0.2
type PanicRecoverWork func(any)
PanicRecoverWork represents a unit of work to be executed when a panic occurs.
type PartitionFunc ¶
type PartitionFunc[K comparable, V any] func(data V) (key K, ok bool)
PartitionFunc takes in data and then returns a key and whether the key can be used to route data into a partition.
type Partitioner ¶
type Partitioner[K comparable, V any] interface { // Take items and divide them into separate partitions asynchronously. Take(items ...V) // Outcome returns items divided into separate partitions. Outcome() map[K][]V }
Partitioner divides items into separate partitions.
Example ¶
partitionFunc := func(a animal) (string, bool) { if a.species == "" { return "", false } return a.species, true } p := NewPartitioner(context.Background(), partitionFunc) input := []animal{ {"dog", "name1"}, {"snail", "name2"}, {"dog", "name4"}, {"cat", "name5"}, } p.Take(input...) res := p.Outcome() fmt.Println(res) first := res["dog"] fmt.Println(first[0]) fmt.Println(first[1])
Output: map[cat:[{cat name5}] dog:[{dog name1} {dog name4}] snail:[{snail name2}]] {dog name1} {dog name4}
func NewPartitioner ¶
func NewPartitioner[K comparable, V any](ctx context.Context, partitionFn PartitionFunc[K, V]) Partitioner[K, V]
NewPartitioner creates a new partitioner.
type SilentBatcher ¶ added in v1.0.2
type SilentBatcher[P any] interface { // Append adds a new payload to the batch and returns a task for that particular payload. // Clients MUST execute the returned task before blocking and waiting for it to complete // to extract result. Append(payload P) SilentTask // contains filtered or unexported methods }
SilentBatcher is a batch processor which is suitable for sitting in the back to accumulate tasks and then execute all in one go silently.
Example ¶
var wg sync.WaitGroup wg.Add(2) b := NewSilentBatcher( func(input []interface{}) error { fmt.Println(input) return nil }, ) ContinueInSilence( b.Append(1), func(_ context.Context, err error) error { wg.Done() return nil }, ).Execute(context.Background()) ContinueInSilence( b.Append(2), func(_ context.Context, err error) error { wg.Done() return nil }, ).Execute(context.Background()) b.Process(context.Background()) wg.Wait()
Output: [1 2]
func NewSilentBatcher ¶ added in v1.0.2
func NewSilentBatcher[P any](processFn func([]P) error, options ...BatcherOption) SilentBatcher[P]
NewSilentBatcher returns a new SilentBatcher
type SilentTask ¶
type SilentTask interface { // WithRecoverAction attaches the given recover action with task so that // it can be executed when a panic occurs. WithRecoverAction(recoverAction PanicRecoverWork) // Execute starts this task asynchronously. Execute(ctx context.Context) SilentTask // ExecuteSync starts this task synchronously. ExecuteSync(ctx context.Context) SilentTask // Wait waits for this task to complete. Wait() // Cancel changes the state of this task to `Cancelled`. Cancel() // Error returns the error that occurred when this task was executed. Error() error // State returns the current state of this task. This operation is non-blocking. State() State // Duration returns the duration of this task. Duration() time.Duration }
SilentTask represents a unit of work to complete in silence like background works that return no values.
func AddJitterST ¶
func AddJitterST(t SilentTask, maxJitterDurationInMilliseconds int) SilentTask
AddJitterST adds a random jitter before executing the given SilentTask. Why jitter?
http://highscalability.com/blog/2012/4/17/youtube-strategy-adding-jitter-isnt-a-bug.html
func ContinueInSilence ¶
func ContinueInSilence(currentTask SilentTask, nextAction func(context.Context, error) error) SilentTask
ContinueInSilence proceeds with the next task once the current one is finished. When we have a chain of tasks like A -> B -> C, executing C will trigger A & B as well. However, executing A will NOT trigger B & C.
func ContinueWithNoResult ¶
func ContinueWithNoResult[T any](currentTask Task[T], nextAction func(context.Context, T, error) error) SilentTask
ContinueWithNoResult proceeds with the next task once the current one is finished. When we have a chain of tasks like A -> B -> C, executing C will trigger A & B as well. However, executing A will NOT trigger B & C.
func InvokeInSilence ¶
func InvokeInSilence(ctx context.Context, action SilentWork) SilentTask
InvokeInSilence creates a new SilentTask and runs it asynchronously.
func NewSilentTask ¶
func NewSilentTask(action SilentWork) SilentTask
NewSilentTask creates a new SilentTask.
func NewSilentTasks ¶
func NewSilentTasks(actions ...SilentWork) []SilentTask
NewSilentTasks creates a group of new SilentTask.
func Repeat ¶
func Repeat(ctx context.Context, interval time.Duration, action SilentWork) SilentTask
Repeat executes the given SilentWork asynchronously on a pre-determined interval.
Example ¶
out := make(chan bool, 1) task := Repeat( context.Background(), time.Nanosecond*10, func(context.Context) error { defer func() { recover() }() out <- true return nil }, ) <-out v := <-out fmt.Println(v) task.Cancel() close(out)
Output: true
func RunWithConcurrencyLevelC ¶
func RunWithConcurrencyLevelC[T SilentTask](ctx context.Context, concurrencyLevel int, tasks <-chan T) SilentTask
RunWithConcurrencyLevelC runs the given tasks up to the max concurrency level.
Note: When `ctx` is cancelled, we spawn a new goroutine to cancel all remaining tasks in the given channel. To avoid memory leak, client MUST make sure new tasks will eventually stop arriving once `ctx` is cancelled so that the new goroutine can return.
func RunWithConcurrencyLevelS ¶
func RunWithConcurrencyLevelS[T SilentTask](ctx context.Context, concurrencyLevel int, tasks []T) SilentTask
RunWithConcurrencyLevelS runs the given tasks up to the max concurrency level.
Example ¶
resChan := make(chan int, 6) works := make([]Work[struct{}], 6, 6) for i := range works { j := i works[j] = func(context.Context) (struct{}, error) { fmt.Println(j / 2) time.Sleep(time.Millisecond * 10) return struct{}{}, nil } } tasks := NewTasks(works...) RunWithConcurrencyLevelS(context.Background(), 2, tasks) WaitAll(tasks) close(resChan) var res []int for r := range resChan { res = append(res, r) }
Output: 0 0 1 1 2 2
func Spread ¶
func Spread[T SilentTask](ctx context.Context, tasks []T, within time.Duration) SilentTask
Spread evenly spreads the tasks within the specified duration.
Example ¶
tasks := newTasks() within := 200 * time.Millisecond // Spread task := Spread(context.Background(), tasks, within) task.Wait() // Make sure all tasks are done for _, task := range tasks { v, _ := task.Outcome() fmt.Println(v) }
Output: 1 1 1 1 1
func Throttle ¶
func Throttle[T SilentTask](ctx context.Context, tasks []T, rateLimit int, every time.Duration) SilentTask
Throttle runs the given tasks at the specified rate.
type SilentWork ¶
SilentWork represents a unit of work to execute in silence like background works that return no values.
type State ¶
type State byte
State represents the state enumeration for a task.
const ( IsCreated State = iota // IsCreated represents a newly created task IsRunning // IsRunning represents a task which is currently running IsCompleted // IsCompleted represents a task which was completed successfully or errored out IsCancelled // IsCancelled represents a task which was cancelled or has timed out )
Various task states.
type Task ¶
type Task[T any] interface { SilentTask // Run starts this task asynchronously. Run(ctx context.Context) Task[T] // RunSync starts this task synchronously. RunSync(ctx context.Context) Task[T] // Outcome waits for this task to complete and returns the final result & error. Outcome() (T, error) // ResultOrDefault waits for this task to complete and returns the final result if // there's no error or the default result if there's an error. ResultOrDefault(T) T }
Task represents a unit of work that is expected to return a value of a particular type.
func AddJitterT ¶
AddJitterT adds a random jitter before executing the given Task. Why jitter?
http://highscalability.com/blog/2012/4/17/youtube-strategy-adding-jitter-isnt-a-bug.html
func ContinueWith ¶
func ContinueWith[T any, S any](currentTask Task[T], nextAction func(context.Context, T, error) (S, error)) Task[S]
ContinueWith proceeds with the next task once the current one is finished. When we have a chain of tasks like A -> B -> C, executing C will trigger A & B as well. However, executing A will NOT trigger B & C.
func ContinueWithResult ¶
func ContinueWithResult[T any](currentTask SilentTask, nextAction func(context.Context, error) (T, error)) Task[T]
ContinueWithResult proceeds with the next task once the current one is finished. When we have a chain of tasks like A -> B -> C, executing C will trigger A & B as well. However, executing A will NOT trigger B & C.
type Work ¶
Work represents a unit of work to execute that is expected to return a value of a particular type.
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool is similar to a thread pool in Java, where the number of concurrent goroutines processing requests does not exceed the configured maximum.
func NewWorkerPool ¶
func NewWorkerPool(options ...WorkerPoolOption) *WorkerPool
NewWorkerPool creates and starts a pool of worker goroutines.
`maxSize` specifies the maximum number of workers that can execute tasks concurrently. When there's no incoming tasks, workers get killed 1-by-1 until there's no remaining workers.
func (*WorkerPool) Pause ¶
func (p *WorkerPool) Pause(ctx context.Context)
Pause causes all workers to wait on the given context, thereby making them unavailable to run tasks. Pause returns when all workers are waiting. Tasks can still be queued but won't get executed until `ctx` is cancelled or times out.
Calling Pause when the worker pool is already paused causes Pause to wait until previous pauses are cancelled. This allows a goroutine to take control of pausing and un-pausing the pool as soon as other goroutines have un-paused it.
When this worker pool is stopped, workers are un-paused and queued tasks may be executed during StopWait.
func (*WorkerPool) Size ¶
func (p *WorkerPool) Size() int
Size returns the maximum number of concurrent workers.
func (*WorkerPool) Stop ¶
func (p *WorkerPool) Stop()
Stop stops this worker pool and waits for the running tasks to complete. Pending tasks that are still in the queue will get cancelled. New tasks must not be submitted to the worker pool after calling Stop().
Note: to avoid memory leak, clients MUST always call Stop() or StopWait() when this worker pool is no longer needed.
func (*WorkerPool) StopWait ¶
func (p *WorkerPool) StopWait()
StopWait stops this worker pool and waits for the running tasks + all queued tasks to complete. New tasks must not be submitted to the worker pool after calling Stop().
Note: to avoid memory leak, clients MUST always call Stop() or StopWait() when this worker pool is no longer needed.
func (*WorkerPool) Stopped ¶
func (p *WorkerPool) Stopped() bool
Stopped returns true if this worker pool has been stopped.
func (*WorkerPool) Submit ¶
func (p *WorkerPool) Submit(ctx context.Context, task SilentTask)
Submit enqueues a function for a worker to execute.
Submit will not block regardless of the number of tasks submitted. Each task is given to an available worker or to a newly started worker. If there's no available workers, and no new workers can be created due to the configured maximum, then the task is put onto a waiting queue.
When the waiting queue is not empty, incoming tasks will go into the queue immediately. Tasks are removed from the waiting queue as workers become available.
As long as no new tasks arrive, one idle worker will get killed periodically until no more workers are left. Since starting new goroutines is cheap & quick, there's no need to retain idle workers indefinitely.
func (*WorkerPool) WaitingQueueSize ¶
func (p *WorkerPool) WaitingQueueSize() int
WaitingQueueSize returns the count of tasks in the waiting queue.
type WorkerPoolOption ¶
type WorkerPoolOption func(*workerPoolConfigs)
func WithBurst ¶
func WithBurst(burstQueueThreshold int, burstCapacity int) WorkerPoolOption
WithBurst sets the threshold for the waiting queue at which point the maximum size of the worker pool will be increased by the given capacity.
func WithIdleTimeout ¶
func WithIdleTimeout(idleTimeout time.Duration) WorkerPoolOption
WithIdleTimeout sets the maximum duration that a worker can stay idle before one of them gets killed.
func WithMaxSize ¶
func WithMaxSize(maxSize int) WorkerPoolOption
WithMaxSize sets the maximum size of the worker pool under normal condition.