Documentation ¶
Index ¶
- Variables
- func CancelAll(tasks []Task)
- func CancelAllST(tasks []SilentTask)
- func DoJitter(doFn func(), maxJitterDurationInMilliseconds int) int
- func ForkJoin(ctx context.Context, tasks []Task)
- func ForkJoinST(ctx context.Context, tasks []SilentTask)
- func WaitAll(tasks []Task)
- func WaitAllST(tasks []SilentTask)
- type Batcher
- type BatcherOption
- type SilentTask
- func AddJitterST(t SilentTask, maxJitterDurationInMilliseconds int) SilentTask
- func ContinueInSilence(currentTask SilentTask, nextAction func(context.Context, error) error) SilentTask
- func ContinueWithNoResult(currentTask Task, nextAction func(context.Context, interface{}, error) error) SilentTask
- func InvokeInSilence(ctx context.Context, action SilentWork) SilentTask
- func NewSilentTask(action SilentWork) SilentTask
- func NewSilentTasks(actions ...SilentWork) []SilentTask
- func Repeat(ctx context.Context, interval time.Duration, action SilentWork) SilentTask
- func Spread(ctx context.Context, tasks []Task, within time.Duration) SilentTask
- func Throttle(ctx context.Context, tasks []Task, rateLimit int, every time.Duration) SilentTask
- type SilentWork
- type State
- type Task
- func AddJitterT(t Task, maxJitterDurationInMilliseconds int) Task
- func Completed(result interface{}, err error) Task
- func ContinueWith(currentTask Task, ...) Task
- func ContinueWithResult(currentTask SilentTask, ...) Task
- func Invoke(ctx context.Context, action Work) Task
- func NewTask(action Work) Task
- func NewTasks(actions ...Work) []Task
- type Work
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var (
ErrBatchProcessorNotActive = errors.New("batch processor has already shut down")
)
var ErrCancelled = errors.New("task cancelled")
ErrCancelled is returned when a task gets cancelled.
Functions ¶
func DoJitter ¶
DoJitter adds a random jitter before executing doFn, then returns the jitter duration. Why jitter?
http://highscalability.com/blog/2012/4/17/youtube-strategy-adding-jitter-isnt-a-bug.html
Example ¶
t1 := InvokeInSilence( context.Background(), func(ctx context.Context) error { DoJitter( func() { fmt.Println("do something after random jitter") }, 1000, ) return nil }, ) t2 := AddJitterT( NewTask( func(ctx context.Context) (interface{}, error) { fmt.Println("return 1 after random jitter") return 1, nil }, ), 1000, ).Run(context.Background()) t3 := AddJitterST( NewSilentTask( func(ctx context.Context) error { fmt.Println("return nil after random jitter") return nil }, ), 1000, ).Execute(context.Background()) WaitAllST([]SilentTask{t1, t2, t3})
Output:
func ForkJoin ¶
ForkJoin executes given tasks in parallel and waits for ALL to complete before returning.
Example ¶
first := NewTask( func(context.Context) (interface{}, error) { return 1, nil }, ) second := NewTask( func(context.Context) (interface{}, error) { return nil, errors.New("some error") }, ) ForkJoin(context.Background(), []Task{first, second}) fmt.Println(first.Outcome()) fmt.Println(second.Outcome())
Output: 1 <nil> <nil> some error
func ForkJoinST ¶
func ForkJoinST(ctx context.Context, tasks []SilentTask)
ForkJoinST executes given tasks in parallel and waits for ALL to complete before returning.
func WaitAllST ¶
func WaitAllST(tasks []SilentTask)
WaitAllST waits for all executed tasks to finish.
Types ¶
type Batcher ¶
type Batcher interface { // Append adds a new payload to the batch and returns a task for that particular payload. // Clients MUST execute the returned task before blocking and waiting for it to complete // to extract result. Append(payload interface{}) SilentTask // Size returns the length of the pending queue. Size() int // Process executes all pending tasks in one go. Process(ctx context.Context) // Shutdown notifies this batch processor to complete its work gracefully. Future calls // to Append will return an error immediately and Process will be a no-op. This is a // blocking call which will wait up to the configured amount of time for the last batch // to complete. Shutdown() }
Batcher is a batch processor which is suitable for sitting in the back to accumulate tasks and then execute all in one go.
Example ¶
var wg sync.WaitGroup wg.Add(2) b := NewBatcher( func(input []interface{}) error { fmt.Println(input) return nil }, ) ContinueInSilence( b.Append(1), func(_ context.Context, err error) error { wg.Done() return nil }, ).Execute(context.Background()) ContinueInSilence( b.Append(2), func(_ context.Context, err error) error { wg.Done() return nil }, ).Execute(context.Background()) b.Process(context.Background()) wg.Wait()
Output: [1 2]
func NewBatcher ¶
func NewBatcher(processFn func([]interface{}) error, options ...BatcherOption) Batcher
NewBatcher returns a new Batcher
type BatcherOption ¶
type BatcherOption func(*batcherConfigs)
func WithAutoProcessInterval ¶
func WithAutoProcessInterval(autoProcessIntervalInMilliseconds time.Duration) BatcherOption
WithAutoProcessInterval sets the interval at which Batcher will automatically process the pending tasks. If `autoProcessDurationInMilliseconds <= 0`, the default behavior applies: no periodic auto processing will be done.
Note: if periodic auto processing is ON, clients MUST call Batcher.Shutdown() to clean up the timer goroutine properly in order to avoid memory leak.
func WithAutoProcessSize ¶
func WithAutoProcessSize(autoProcessSize int) BatcherOption
WithAutoProcessSize sets the limit at which Batcher will automatically process the pending tasks. If `autoProcessSize <= 0`, the default behavior applies: no auto processing will be done based on size.
func WithShutdownGraceDuration ¶
func WithShutdownGraceDuration(shutdownDurationInMilliseconds time.Duration) BatcherOption
WithShutdownGraceDuration specifies how long Batcher will wait for the Shutdown operation to complete before returning. If `shutdownDurationInMilliseconds <= 0`, Batcher will block and wait until the shutdown operation fully completes.
type SilentTask ¶
type SilentTask interface { // Execute starts this task asynchronously. Execute(ctx context.Context) SilentTask // ExecuteSync starts this task synchronously. ExecuteSync(ctx context.Context) SilentTask // Wait waits for this task to complete. Wait() // Cancel changes the state of this task to `Cancelled`. Cancel() // Error returns the error that occurred when this task was executed. Error() error // State returns the current state of this task. This operation is non-blocking. State() State // Duration returns the duration of this task. Duration() time.Duration }
SilentTask represents a unit of work to complete in silence like background works that return no values.
func AddJitterST ¶
func AddJitterST(t SilentTask, maxJitterDurationInMilliseconds int) SilentTask
AddJitterST adds a random jitter before executing the given SilentTask. Why jitter?
http://highscalability.com/blog/2012/4/17/youtube-strategy-adding-jitter-isnt-a-bug.html
func ContinueInSilence ¶
func ContinueInSilence(currentTask SilentTask, nextAction func(context.Context, error) error) SilentTask
ContinueInSilence proceeds with the next task once the current one is finished. When we have a chain of tasks like A -> B -> C, executing C will trigger A & B as well. However, executing A will NOT trigger B & C.
func ContinueWithNoResult ¶
func ContinueWithNoResult(currentTask Task, nextAction func(context.Context, interface{}, error) error) SilentTask
ContinueWithNoResult proceeds with the next task once the current one is finished. When we have a chain of tasks like A -> B -> C, executing C will trigger A & B as well. However, executing A will NOT trigger B & C.
func InvokeInSilence ¶
func InvokeInSilence(ctx context.Context, action SilentWork) SilentTask
InvokeInSilence creates a new SilentTask and runs it asynchronously.
func NewSilentTask ¶
func NewSilentTask(action SilentWork) SilentTask
NewSilentTask creates a new SilentTask.
func NewSilentTasks ¶
func NewSilentTasks(actions ...SilentWork) []SilentTask
NewSilentTasks creates a group of new SilentTask.
func Repeat ¶
func Repeat(ctx context.Context, interval time.Duration, action SilentWork) SilentTask
Repeat executes the given SilentWork asynchronously on a pre-determined interval.
Example ¶
out := make(chan bool, 1) task := Repeat( context.Background(), time.Nanosecond*10, func(context.Context) error { defer func() { recover() }() out <- true return nil }, ) <-out v := <-out fmt.Println(v) task.Cancel() close(out)
Output: true
func Spread ¶
Spread evenly spreads the tasks within the specified duration.
Example ¶
tasks := newTasks() within := 200 * time.Millisecond // Spread task := Spread(context.Background(), tasks, within) task.Wait() // Make sure all tasks are done for _, task := range tasks { v, _ := task.Outcome() fmt.Println(v) }
Output: 1 1 1 1 1
type SilentWork ¶
SilentWork represents a handler to execute in silence like background works that return no values.
type State ¶
type State byte
State represents the state enumeration for a task.
const ( IsCreated State = iota // IsCreated represents a newly created task IsRunning // IsRunning represents a task which is currently running IsCompleted // IsCompleted represents a task which was completed successfully or errored out IsCancelled // IsCancelled represents a task which was cancelled or has timed out )
Various task states.
type Task ¶
type Task interface { SilentTask // Run starts this task asynchronously. Run(ctx context.Context) Task // RunSync starts this task synchronously. RunSync(ctx context.Context) Task // Outcome waits for this task to complete and returns the final result & error. Outcome() (interface{}, error) // ResultOrDefault waits for this task to complete and returns the final result if // there's no error or the default result if there's an error. ResultOrDefault(interface{}) interface{} }
Task represents a unit of work that is expected to return a value of a particular type.
func AddJitterT ¶
AddJitterT adds a random jitter before executing the given Task. Why jitter?
http://highscalability.com/blog/2012/4/17/youtube-strategy-adding-jitter-isnt-a-bug.html
func ContinueWith ¶
func ContinueWith(currentTask Task, nextAction func(context.Context, interface{}, error) (interface{}, error)) Task
ContinueWith proceeds with the next task once the current one is finished. When we have a chain of tasks like A -> B -> C, executing C will trigger A & B as well. However, executing A will NOT trigger B & C.
func ContinueWithResult ¶
func ContinueWithResult(currentTask SilentTask, nextAction func(context.Context, error) (interface{}, error)) Task
ContinueWithResult proceeds with the next task once the current one is finished. When we have a chain of tasks like A -> B -> C, executing C will trigger A & B as well. However, executing A will NOT trigger B & C.