async

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 5, 2022 License: MIT Imports: 10 Imported by: 1

README

Async

Why you should consider this package

Package async simplifies the implementation of orchestration patterns for concurrent systems. It is similar to Java Future or JS Promise, which makes life much easier when dealing with asynchronous operation and concurrent processing. Golang is excellent in terms of parallel programming. However, dealing with goroutines and channels could be a big headache when business logic gets complicated. Wrapping them into higher-level functions improves code readability significantly and makes it easier for engineers to reason about the system's behaviours.

Currently, this package includes:

  • Asynchronous tasks with cancellations, context propagation and state.
  • Task chaining by using continuations.
  • Fork/join pattern - running a batch of tasks in parallel and blocking until all finish.
  • Throttling pattern - throttling task execution at a specified rate.
  • Spread pattern - spreading tasks within a specified duration.
  • Repeat pattern - repeating a task on a pre-determined interval.
  • Batch pattern - batching many tasks to be processed together with individual continuations.
  • Jitter pattern - adding a random jitter before executing a function to avoid thundering herds.

Concept

Task is a basic concept like Future in Java. You can create a Task using an executable function which takes in context.Context, then returns error and an optional result.

task := NewTask(func(context.Context) (interface{}, error) {
    // run the job
    return res, err
})

silentTask := NewSilentTask(func(context.Context) error {
    // run the job
    return err
})
Get the result

The function will be executed asynchronously. You can query whether it's completed by calling task.State(), which is a non-blocking function. Alternative, you can wait for the response using task.Outcome() or silentTask.Wait(), which will block the execution until the task is done. These functions are quite similar to the equivalents in Java Future.isDone() or Future.get().

Cancelling

There could be case that we don't care about the result anymore some time after execution. In this case, a task can be aborted by invoking task.Cancel().

Chaining

To have a follow-up action after a task is done, you can use the provided family of Continue functions. This could be very useful to create a chain of processing, or to have a teardown process at the end of a task.

Features

Fork join

ForkJoin is meant for running multiple subtasks concurrently. They could be different parts of the main task which can be executed independently. The following code example illustrates how you can send files to S3 concurrently with a few lines of code.

func uploadFilesConcurrently(files []string) {
    var tasks []Task
    for _, file := range files {
        f := file
        
        tasks = append(tasks, NewTask(func(ctx context.Context) (interface{}, error) {
            return upload(ctx, f)
        }))
    }

    ForkJoin(context.Background(), tasks)
}

func upload(ctx context.Context, file string) (interface{}, error){
    // do file uploading
    return "", nil
}
Throttle

Sometimes you don't really care about the concurrency level but just want to execute the tasks at a particular rate. The Throttle function would come in handy in this case.

// Throttle runs the given tasks at the specified rate.
func Throttle(ctx context.Context, tasks []Task, rateLimit int, every time.Duration) SilentTask

For example, if you want to send 4 files every 2 seconds, the Throttle function will start a task every 0.5 second.

Spread

Instead of starting all tasks at once with ForkJoin, you can also spread the starting points of our tasks evenly within a certain duration using the Spread function.

// Spread evenly spreads the tasks within the specified duration.
func Spread(ctx context.Context, tasks []Task, within time.Duration) SilentTask

For example, if you want to send 50 files within 10 seconds, the Spread function would start a task every 0.2s.

Repeat

In cases where you need to repeat a background task on a pre-determined interval, Repeat is your friend. The returned SilentTask can then be used to cancel the repeating task at any time.

// Repeat executes the given SilentWork asynchronously on a pre-determined interval.
func Repeat(ctx context.Context, interval time.Duration, action SilentWork) SilentTask
Batch

Instead of executing a task immediately whenever you receive an input, sometimes, it might be more efficient to create a batch of inputs and process all in one go.

out := make(chan int, taskCount)

processBatch := func(nums []interface{}) error {
    for _, number := range nums {
        out <- number.(int) * 10
    }
    
    return nil
}

// Auto process batch every 100ms
periodicBatcher := NewBatcher(
    processBatch,
    WithAutoProcessInterval(100*time.Millisecond),
)

// Auto process batch when pending queue reaches 10
sizeBatcher := NewBatcher(
    processBatch,
    WithAutoProcessSize(10),
)

// Auto process batch every 100ms or when pending queue reaches 10
periodicSizeBatcher := NewBatcher(
    processBatch,
    WithAutoProcessInterval(100*time.Millisecond),
    WithAutoProcessSize(10),
)

// Auto process batch when pending queue reaches 10
manualBatcher := NewBatcher(
    processBatch,
)

manualBatcher.Process()

See batch_test.go for more detailed examples on how to use this feature.

Jitter

Using jitters to avoid thundering herds is a popular technique. We decided to make it simple for you.

t := InvokeInSilence(
    context.Background(), 
    func(ctx context.Context) error {
        DoJitter(func() {
            fmt.Println("do something after random jitter")
        }, 1000)

        return nil
    },
)

See jitter_test.go for a detailed example on how to use this feature.

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	ErrBatchProcessorNotActive = errors.New("batch processor has already shut down")
)
View Source
var ErrCancelled = errors.New("task cancelled")

ErrCancelled is returned when a task gets cancelled.

Functions

func CancelAll

func CancelAll(tasks []Task)

CancelAll cancels all given tasks.

func CancelAllST

func CancelAllST(tasks []SilentTask)

CancelAllST cancels all given tasks.

func DoJitter

func DoJitter(doFn func(), maxJitterDurationInMilliseconds int) int

DoJitter adds a random jitter before executing doFn, then returns the jitter duration. Why jitter?

http://highscalability.com/blog/2012/4/17/youtube-strategy-adding-jitter-isnt-a-bug.html

Example
t1 := InvokeInSilence(
	context.Background(), func(ctx context.Context) error {
		DoJitter(
			func() {
				fmt.Println("do something after random jitter")
			}, 1000,
		)

		return nil
	},
)

t2 := AddJitterT(
	NewTask(
		func(ctx context.Context) (interface{}, error) {
			fmt.Println("return 1 after random jitter")
			return 1, nil
		},
	),
	1000,
).Run(context.Background())

t3 := AddJitterST(
	NewSilentTask(
		func(ctx context.Context) error {
			fmt.Println("return nil after random jitter")
			return nil
		},
	),
	1000,
).Execute(context.Background())

WaitAllST([]SilentTask{t1, t2, t3})
Output:

func ForkJoin

func ForkJoin(ctx context.Context, tasks []Task)

ForkJoin executes given tasks in parallel and waits for ALL to complete before returning.

Example
first := NewTask(
	func(context.Context) (interface{}, error) {
		return 1, nil
	},
)

second := NewTask(
	func(context.Context) (interface{}, error) {
		return nil, errors.New("some error")
	},
)

ForkJoin(context.Background(), []Task{first, second})

fmt.Println(first.Outcome())
fmt.Println(second.Outcome())
Output:

1 <nil>
<nil> some error

func ForkJoinST

func ForkJoinST(ctx context.Context, tasks []SilentTask)

ForkJoinST executes given tasks in parallel and waits for ALL to complete before returning.

func WaitAll

func WaitAll(tasks []Task)

WaitAll waits for all executed tasks to finish.

func WaitAllST

func WaitAllST(tasks []SilentTask)

WaitAllST waits for all executed tasks to finish.

Types

type Batcher

type Batcher interface {
	// Append adds a new payload to the batch and returns a task for that particular payload.
	// Clients MUST execute the returned task before blocking and waiting for it to complete
	// to extract result.
	Append(payload interface{}) SilentTask
	// Size returns the length of the pending queue.
	Size() int
	// Process executes all pending tasks in one go.
	Process(ctx context.Context)
	// Shutdown notifies this batch processor to complete its work gracefully. Future calls
	// to Append will return an error immediately and Process will be a no-op. This is a
	// blocking call which will wait up to the configured amount of time for the last batch
	// to complete.
	Shutdown()
}

Batcher is a batch processor which is suitable for sitting in the back to accumulate tasks and then execute all in one go.

Example
var wg sync.WaitGroup
wg.Add(2)

b := NewBatcher(
	func(input []interface{}) error {
		fmt.Println(input)
		return nil
	},
)

ContinueInSilence(
	b.Append(1), func(_ context.Context, err error) error {
		wg.Done()

		return nil
	},
).Execute(context.Background())

ContinueInSilence(
	b.Append(2), func(_ context.Context, err error) error {
		wg.Done()

		return nil
	},
).Execute(context.Background())

b.Process(context.Background())

wg.Wait()
Output:

[1 2]

func NewBatcher

func NewBatcher(processFn func([]interface{}) error, options ...BatcherOption) Batcher

NewBatcher returns a new Batcher

type BatcherOption

type BatcherOption func(*batcherConfigs)

func WithAutoProcessInterval

func WithAutoProcessInterval(autoProcessIntervalInMilliseconds time.Duration) BatcherOption

WithAutoProcessInterval sets the interval at which Batcher will automatically process the pending tasks. If `autoProcessDurationInMilliseconds <= 0`, the default behavior applies: no periodic auto processing will be done.

Note: if periodic auto processing is ON, clients MUST call Batcher.Shutdown() to clean up the timer goroutine properly in order to avoid memory leak.

func WithAutoProcessSize

func WithAutoProcessSize(autoProcessSize int) BatcherOption

WithAutoProcessSize sets the limit at which Batcher will automatically process the pending tasks. If `autoProcessSize <= 0`, the default behavior applies: no auto processing will be done based on size.

func WithShutdownGraceDuration

func WithShutdownGraceDuration(shutdownDurationInMilliseconds time.Duration) BatcherOption

WithShutdownGraceDuration specifies how long Batcher will wait for the Shutdown operation to complete before returning. If `shutdownDurationInMilliseconds <= 0`, Batcher will block and wait until the shutdown operation fully completes.

type SilentTask

type SilentTask interface {
	// Execute starts this task asynchronously.
	Execute(ctx context.Context) SilentTask
	// ExecuteSync starts this task synchronously.
	ExecuteSync(ctx context.Context) SilentTask
	// Wait waits for this task to complete.
	Wait()
	// Cancel changes the state of this task to `Cancelled`.
	Cancel()
	// Error returns the error that occurred when this task was executed.
	Error() error
	// State returns the current state of this task. This operation is non-blocking.
	State() State
	// Duration returns the duration of this task.
	Duration() time.Duration
}

SilentTask represents a unit of work to complete in silence like background works that return no values.

func AddJitterST

func AddJitterST(t SilentTask, maxJitterDurationInMilliseconds int) SilentTask

AddJitterST adds a random jitter before executing the given SilentTask. Why jitter?

http://highscalability.com/blog/2012/4/17/youtube-strategy-adding-jitter-isnt-a-bug.html

func ContinueInSilence

func ContinueInSilence(currentTask SilentTask, nextAction func(context.Context, error) error) SilentTask

ContinueInSilence proceeds with the next task once the current one is finished. When we have a chain of tasks like A -> B -> C, executing C will trigger A & B as well. However, executing A will NOT trigger B & C.

func ContinueWithNoResult

func ContinueWithNoResult(currentTask Task, nextAction func(context.Context, interface{}, error) error) SilentTask

ContinueWithNoResult proceeds with the next task once the current one is finished. When we have a chain of tasks like A -> B -> C, executing C will trigger A & B as well. However, executing A will NOT trigger B & C.

func InvokeInSilence

func InvokeInSilence(ctx context.Context, action SilentWork) SilentTask

InvokeInSilence creates a new SilentTask and runs it asynchronously.

func NewSilentTask

func NewSilentTask(action SilentWork) SilentTask

NewSilentTask creates a new SilentTask.

func NewSilentTasks

func NewSilentTasks(actions ...SilentWork) []SilentTask

NewSilentTasks creates a group of new SilentTask.

func Repeat

func Repeat(ctx context.Context, interval time.Duration, action SilentWork) SilentTask

Repeat executes the given SilentWork asynchronously on a pre-determined interval.

Example
out := make(chan bool, 1)
task := Repeat(
	context.Background(), time.Nanosecond*10, func(context.Context) error {
		defer func() {
			recover()
		}()

		out <- true
		return nil
	},
)

<-out
v := <-out

fmt.Println(v)

task.Cancel()
close(out)
Output:

true

func Spread

func Spread(ctx context.Context, tasks []Task, within time.Duration) SilentTask

Spread evenly spreads the tasks within the specified duration.

Example
tasks := newTasks()
within := 200 * time.Millisecond

// Spread
task := Spread(context.Background(), tasks, within)
task.Wait()

// Make sure all tasks are done
for _, task := range tasks {
	v, _ := task.Outcome()
	fmt.Println(v)
}
Output:

1
1
1
1
1

func Throttle

func Throttle(ctx context.Context, tasks []Task, rateLimit int, every time.Duration) SilentTask

Throttle runs the given tasks at the specified rate.

type SilentWork

type SilentWork func(context.Context) error

SilentWork represents a handler to execute in silence like background works that return no values.

type State

type State byte

State represents the state enumeration for a task.

const (
	IsCreated   State = iota // IsCreated represents a newly created task
	IsRunning                // IsRunning represents a task which is currently running
	IsCompleted              // IsCompleted represents a task which was completed successfully or errored out
	IsCancelled              // IsCancelled represents a task which was cancelled or has timed out
)

Various task states.

type Task

type Task interface {
	SilentTask
	// Run starts this task asynchronously.
	Run(ctx context.Context) Task
	// RunSync starts this task synchronously.
	RunSync(ctx context.Context) Task
	// Outcome waits for this task to complete and returns the final result & error.
	Outcome() (interface{}, error)
	// ResultOrDefault waits for this task to complete and returns the final result if
	// there's no error or the default result if there's an error.
	ResultOrDefault(interface{}) interface{}
}

Task represents a unit of work that is expected to return a value of a particular type.

func AddJitterT

func AddJitterT(t Task, maxJitterDurationInMilliseconds int) Task

AddJitterT adds a random jitter before executing the given Task. Why jitter?

http://highscalability.com/blog/2012/4/17/youtube-strategy-adding-jitter-isnt-a-bug.html

func Completed

func Completed(result interface{}, err error) Task

Completed returns a completed task with the given result and error.

func ContinueWith

func ContinueWith(currentTask Task, nextAction func(context.Context, interface{}, error) (interface{}, error)) Task

ContinueWith proceeds with the next task once the current one is finished. When we have a chain of tasks like A -> B -> C, executing C will trigger A & B as well. However, executing A will NOT trigger B & C.

func ContinueWithResult

func ContinueWithResult(currentTask SilentTask, nextAction func(context.Context, error) (interface{}, error)) Task

ContinueWithResult proceeds with the next task once the current one is finished. When we have a chain of tasks like A -> B -> C, executing C will trigger A & B as well. However, executing A will NOT trigger B & C.

func Invoke

func Invoke(ctx context.Context, action Work) Task

Invoke creates a new Task and runs it asynchronously.

func NewTask

func NewTask(action Work) Task

NewTask creates a new Task.

func NewTasks

func NewTasks(actions ...Work) []Task

NewTasks creates a group of new Task.

type Work

type Work func(context.Context) (interface{}, error)

Work represents a handler to execute that is expected to return a value of a particular type.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL