Documentation ¶
Index ¶
- func CancelAll(tasks []Task)
- func WaitAll(tasks []Task)
- type Batch
- type PartitionFunc
- type Partitioner
- type State
- type Task
- func Consume(ctx context.Context, concurrency int, tasks chan Task) Task
- func ForkJoin(ctx context.Context, tasks []Task) Task
- func Invoke(ctx context.Context, action Work) Task
- func InvokeAll(ctx context.Context, concurrency int, tasks []Task) Task
- func NewTask(action Work) Task
- func NewTasks(actions ...Work) []Task
- func Repeat(ctx context.Context, interval time.Duration, action Work) Task
- func Spread(ctx context.Context, within time.Duration, tasks []Task) Task
- func Throttle(ctx context.Context, tasks []Task, rateLimit int, every time.Duration) Task
- type Work
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Batch ¶
Batch represents a batch where one can append to the batch and process it as a whole.
Example ¶
var wg sync.WaitGroup wg.Add(2) r := NewBatch(context.Background(), func(input []interface{}) []interface{} { fmt.Println(input) return input }) r.Append(1).ContinueWith(context.TODO(), func(result interface{}, err error) (interface{}, error) { wg.Done() return nil, nil }) r.Append(2).ContinueWith(context.TODO(), func(result interface{}, err error) (interface{}, error) { wg.Done() return nil, nil }) r.Reduce() wg.Wait()
Output: [1 2]
type PartitionFunc ¶
PartitionFunc takes in data and outputs key if ok is false, the data doesn't fall into and partition
type Partitioner ¶
type Partitioner interface { // Append items to the queue which is pending partition Append(items interface{}) Task // Partition items and output the result Partition() map[string][]interface{} }
Partitioner partitions events
Example ¶
partitionFunc := func(data interface{}) (string, bool) { xevent, ok := data.(map[string]string) if !ok { return "", false } key, ok := xevent["pre"] return key, ok } p := NewPartitioner(context.Background(), partitionFunc) input := []interface{}{ map[string]string{"pre": "a", "val": "val1"}, map[string]string{"pre": "b", "val": "val2"}, map[string]string{"pre": "a", "val": "val4"}, map[string]string{"pre": "c", "val": "val5"}, } t := p.Append(input) _, _ = t.Outcome() res := p.Partition() first := res["a"][0].(map[string]string) fmt.Println(first["pre"]) fmt.Println(first["val"])
Output: a val1
func NewPartitioner ¶
func NewPartitioner(ctx context.Context, partition PartitionFunc) Partitioner
NewPartitioner creates a new partitioner
type State ¶
type State byte
State represents the state enumeration for a task.
const ( IsCreated State = iota // IsCreated represents a newly created task IsRunning // IsRunning represents a task which is currently running IsCompleted // IsCompleted represents a task which was completed successfully or errored out IsCancelled // IsCancelled represents a task which was cancelled or has timed out )
Various task states
type Task ¶
type Task interface { Run(ctx context.Context) Task Cancel() State() State Outcome() (interface{}, error) ContinueWith(ctx context.Context, nextAction func(interface{}, error) (interface{}, error)) Task }
Task represents a unit of work to be done
func ForkJoin ¶
ForkJoin executes input task in parallel and waits for ALL outcomes before returning.
Example ¶
first := NewTask(func(context.Context) (interface{}, error) { return 1, nil }) second := NewTask(func(context.Context) (interface{}, error) { return nil, errors.New("some error") }) ForkJoin(context.Background(), []Task{first, second}) fmt.Println(first.Outcome()) fmt.Println(second.Outcome())
Output: 1 <nil> <nil> some error
func InvokeAll ¶
InvokeAll runs the tasks with a specific max concurrency
Example ¶
resChan := make(chan int, 6) works := make([]Work, 6, 6) for i := range works { j := i works[j] = func(context.Context) (interface{}, error) { fmt.Println(j / 2) time.Sleep(time.Millisecond * 10) return nil, nil } } tasks := NewTasks(works...) InvokeAll(context.Background(), 2, tasks) WaitAll(tasks) close(resChan) res := []int{} for r := range resChan { res = append(res, r) }
Output: 0 0 1 1 2 2
func Repeat ¶
Repeat performs an action asynchronously on a predetermined interval.
Example ¶
out := make(chan bool, 1) task := Repeat(context.TODO(), time.Nanosecond*10, func(context.Context) (interface{}, error) { out <- true return nil, nil }) <-out v := <-out fmt.Println(v) task.Cancel()
Output: true
func Spread ¶
Spread evenly spreads the work within the specified duration.
Example ¶
tasks := newTasks() within := 200 * time.Millisecond // Spread task := Spread(context.Background(), within, tasks) _, _ = task.Outcome() // Wait // Make sure all tasks are done for _, task := range tasks { v, _ := task.Outcome() fmt.Println(v) }
Output: 1 1 1 1 1
Source Files ¶
Click to show internal directories.
Click to hide internal directories.