async

package module
v0.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 19, 2020 License: MIT Imports: 11 Imported by: 1

README

Async

What is package async

Package async simplifies the implementation of orchestration patterns for concurrent systems. It is similar to Java Future or JS Promise, which makes life much easier when dealing with asynchronous operation and concurrent processing. Golang is excellent in term of parallel programming. However, dealing with goroutine and channels could be a big headache when business logic gets complicated. Wrapping them into higher-level functions brings code much better readability and developers a ease of thinking.

Currently, this packageg includes:

  • Asynchronous tasks with cancellations, context propagation and state.
  • Task chaining by using continuations.
  • Fork/join pattern - running a bunch of work and waiting for everything to finish.
  • Throttling pattern - throttling task execution on a specified rate.
  • Spread pattern - spreading tasks across time.
  • Partition pattern - partitioning data concurrently.
  • Repeat pattern - repeating a certain task at a specified interval.
  • Batch pattern - batching many tasks into a single one with individual continuations.

Concept

Task is a basic concept like Future in Java. You can create a Task with an executable function which takes in context and returns result and error.

task := NewTask(func(context.Context) (interface{}, error) {
    // run the job
   return res, err
})
Get the result

The function will be evaluated asynchronously. You can query whether it's completed by calling task.State(), which would be a non-blocking function. Alternative, you can wait for the response with task.Outcome(), which will block the execution until the job is done. These 2 functions are quite similar to Future.isDone() or Future.get()

Cancelling

There could be case that we don't care about the result anymore some time after execution. In this case, the task can be aborted by invoking task.Cancel().

Chaining

To have a follow-up action after the task, we can simply call ContinueWith(). This could be very useful to create a chain of processing, or like have a teardown process after the job.

Examples

For example, if want to upload numerous files efficiently. There are multiple strategies you can take Given file uploading function like:

func upload(context.Context) (interface{}, error){
    // do file uploading 
    return res, err
}

Fork join

The main characteristic for Fork join task is to spawn new subtasks running concurrently. They could be different parts of the main task which can be running independently. The following code example illustrates how you can send files to S3 concurrently with few lines of code.

func uploadFilesConcurrently(files []string) {
	tasks := []Tasks{}
		for _, file := files {
		tasks = append(tasks, NewTask(upload(file)))
	}
   ForkJoin(context.Background(), tasks)
}
Invoke All

The Fork Join may not apply to every cases imagining the number of tasks go crazy. In that case, the number of concurrently running tasks, goroutines and CPU utilisation would overwhelm the node. One solution is to constraint the maximum concurrency. InvokeAll is introduced for this purpose, it's like maintaining a fixed size of goroutine pool which attempt serve the given tasks with shortest time.

InvokeAll(context.Background(), concurrency, tasks)
Spread

Sometimes we don't really care about the concurrency but just want to make sure the tasks could be finished with certain time period. Spread function would be useful in this case by spreading the tasks evenly in given period.

Spread(context.Background(), period, tasks)

For example, if we want to send 50 files within 10 seconds, the Spread function would start to run the task every 0.2 second. An assumption made here is that every task takes similar period of time. To have more sophisticated model, we may need to have adaptive learning model to derive the task duration from characteristics or parameters of distinct tasks.

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func CancelAll

func CancelAll(tasks []Task)

CancelAll cancels all specified tasks.

func WaitAll

func WaitAll(tasks []Task)

WaitAll waits for all tasks to finish.

Types

type Batch

type Batch interface {
	Append(payload interface{}) Task
	Size() int
	Reduce()
}

Batch represents a batch where one can append to the batch and process it as a whole.

Example
var wg sync.WaitGroup
wg.Add(2)

r := NewBatch(context.Background(), func(input []interface{}) []interface{} {
	fmt.Println(input)
	return input
})

r.Append(1).ContinueWith(context.TODO(), func(result interface{}, err error) (interface{}, error) {
	wg.Done()
	return nil, nil
})
r.Append(2).ContinueWith(context.TODO(), func(result interface{}, err error) (interface{}, error) {
	wg.Done()
	return nil, nil
})
r.Reduce()
wg.Wait()
Output:

[1 2]

func NewBatch

func NewBatch(ctx context.Context, process func([]interface{}) []interface{}) Batch

NewBatch creates a new batch

type PartitionFunc

type PartitionFunc func(data interface{}) (key string, ok bool)

PartitionFunc takes in data and outputs key if ok is false, the data doesn't fall into and partition

type Partitioner

type Partitioner interface {
	// Append items to the queue which is pending partition
	Append(items interface{}) Task

	// Partition items and output the result
	Partition() map[string][]interface{}
}

Partitioner partitions events

Example
partitionFunc := func(data interface{}) (string, bool) {
	xevent, ok := data.(map[string]string)
	if !ok {
		return "", false
	}
	key, ok := xevent["pre"]
	return key, ok
}
p := NewPartitioner(context.Background(), partitionFunc)

input := []interface{}{
	map[string]string{"pre": "a", "val": "val1"},
	map[string]string{"pre": "b", "val": "val2"},
	map[string]string{"pre": "a", "val": "val4"},
	map[string]string{"pre": "c", "val": "val5"},
}
t := p.Append(input)
_, _ = t.Outcome()

res := p.Partition()
first := res["a"][0].(map[string]string)
fmt.Println(first["pre"])
fmt.Println(first["val"])
Output:

a
val1

func NewPartitioner

func NewPartitioner(ctx context.Context, partition PartitionFunc) Partitioner

NewPartitioner creates a new partitioner

type State

type State byte

State represents the state enumeration for a task.

const (
	IsCreated   State = iota // IsCreated represents a newly created task
	IsRunning                // IsRunning represents a task which is currently running
	IsCompleted              // IsCompleted represents a task which was completed successfully or errored out
	IsCancelled              // IsCancelled represents a task which was cancelled or has timed out
)

Various task states

type Task

type Task interface {
	Run(ctx context.Context) Task
	Cancel()
	State() State
	Outcome() (interface{}, error)
	ContinueWith(ctx context.Context, nextAction func(interface{}, error) (interface{}, error)) Task
}

Task represents a unit of work to be done

func Consume

func Consume(ctx context.Context, concurrency int, tasks chan Task) Task

Consume runs the tasks with a specific max concurrency

func ForkJoin

func ForkJoin(ctx context.Context, tasks []Task) Task

ForkJoin executes input task in parallel and waits for ALL outcomes before returning.

Example
first := NewTask(func(context.Context) (interface{}, error) {
	return 1, nil
})

second := NewTask(func(context.Context) (interface{}, error) {
	return nil, errors.New("some error")
})

ForkJoin(context.Background(), []Task{first, second})

fmt.Println(first.Outcome())
fmt.Println(second.Outcome())
Output:

1 <nil>
<nil> some error

func Invoke

func Invoke(ctx context.Context, action Work) Task

Invoke creates a new tasks and runs it asynchronously.

func InvokeAll

func InvokeAll(ctx context.Context, concurrency int, tasks []Task) Task

InvokeAll runs the tasks with a specific max concurrency

Example
resChan := make(chan int, 6)
works := make([]Work, 6, 6)
for i := range works {
	j := i
	works[j] = func(context.Context) (interface{}, error) {
		fmt.Println(j / 2)
		time.Sleep(time.Millisecond * 10)
		return nil, nil
	}
}
tasks := NewTasks(works...)
InvokeAll(context.Background(), 2, tasks)
WaitAll(tasks)
close(resChan)
res := []int{}
for r := range resChan {
	res = append(res, r)
}
Output:

0
0
1
1
2
2

func NewTask

func NewTask(action Work) Task

NewTask creates a new task.

func NewTasks

func NewTasks(actions ...Work) []Task

NewTasks creates a set of new tasks.

func Repeat

func Repeat(ctx context.Context, interval time.Duration, action Work) Task

Repeat performs an action asynchronously on a predetermined interval.

Example
out := make(chan bool, 1)
task := Repeat(context.TODO(), time.Nanosecond*10, func(context.Context) (interface{}, error) {
	out <- true
	return nil, nil
})

<-out
v := <-out
fmt.Println(v)
task.Cancel()
Output:

true

func Spread

func Spread(ctx context.Context, within time.Duration, tasks []Task) Task

Spread evenly spreads the work within the specified duration.

Example
tasks := newTasks()
within := 200 * time.Millisecond

// Spread
task := Spread(context.Background(), within, tasks)
_, _ = task.Outcome() // Wait

// Make sure all tasks are done
for _, task := range tasks {
	v, _ := task.Outcome()
	fmt.Println(v)
}
Output:

1
1
1
1
1

func Throttle

func Throttle(ctx context.Context, tasks []Task, rateLimit int, every time.Duration) Task

Throttle runs the tasks with a specified rate limiter.

type Work

type Work func(context.Context) (interface{}, error)

Work represents a handler to execute

Directories

Path Synopsis
Code generated by mockery v1.0.0.
Code generated by mockery v1.0.0.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL