Documentation ¶
Overview ¶
Package parallel used to execute tasks in parallel
Index ¶
- Constants
- func FetchAllPage(ctx context.Context, concurrency int, pageSize int, fetchFun FetchFunc) ([]interface{}, error)
- func PageRequest(ctx context.Context, logName string, concurrency int, pageSize int, ...) ([]interface{}, error)
- type FetchFunc
- type PageRequestFunc
- type ParallelOptions
- type ParallelTasks
- func (p *ParallelTasks) Add(tasks ...Task) *ParallelTasks
- func (p *ParallelTasks) Cancel(cancelReason error)
- func (p *ParallelTasks) Context(ctx context.Context) *ParallelTasks
- func (p *ParallelTasks) Do() *ParallelTasks
- func (p *ParallelTasks) FailFast() *ParallelTasks
- func (p *ParallelTasks) Name(name string) *ParallelTasks
- func (p *ParallelTasks) SetConcurrent(count int) *ParallelTasks
- func (p *ParallelTasks) SetMaxConcurrent(count int, max int) *ParallelTasks
- func (p *ParallelTasks) Wait() ([]interface{}, error)
- type Task
Constants ¶
const (
// DefaultConcurrentNum for default concurrent number of parallel
DefaultConcurrentNum = 10
)
Variables ¶
This section is empty.
Functions ¶
func FetchAllPage ¶ added in v0.7.0
func FetchAllPage(ctx context.Context, concurrency int, pageSize int, fetchFun FetchFunc) ([]interface{}, error)
FetchAllPage get all data concurrently
func PageRequest ¶ added in v0.7.0
func PageRequest(ctx context.Context, logName string, concurrency int, pageSize int, f PageRequestFunc) ([]interface{}, error)
PageRequest is concurrent request paging
Types ¶
type FetchFunc ¶ added in v0.7.0
type FetchFunc func(ctx context.Context, pageSize, page int) (total int, list interface{}, err error)
FetchFunc fetch a page of data The return value `list` must be a slice or panic will occur
type PageRequestFunc ¶ added in v0.7.0
type PageRequestFunc struct { // RequestPage for concurrent request paging RequestPage func(ctx context.Context, pageSize int, page int) (interface{}, error) // PageResult for get paging information PageResult func(items interface{}) (total int, currentPageLen int, err error) }
PageRequestFunc is a tool for concurrent processing of pagination
type ParallelOptions ¶
type ParallelTasks ¶
type ParallelTasks struct { Options ParallelOptions Log *zap.SugaredLogger // contains filtered or unexported fields }
ParallelTasks will construct a parallel tasks struct you could execute tasks in parallel eg. result, err := P("eg1", f1,f2, f3).Do().Wait() result, err := P("eg2", f1,f2, f3).Add(f4).FailFast().Do().Wait()
result, err := P("eg3", f1,f2, f3).Context(func()context.Context{ ctx, _ := context.WithTimeout(context.Background(), 500*time.Millisecond) // 0.5s will timeout return ctx }).Do().Wait()
func P ¶
func P(log *zap.SugaredLogger, name string, tasks ...Task) *ParallelTasks
P will construct ParallelTasks name will be used for log you must care about the variable that referenced by Closure
func (*ParallelTasks) Add ¶
func (p *ParallelTasks) Add(tasks ...Task) *ParallelTasks
func (*ParallelTasks) Cancel ¶
func (p *ParallelTasks) Cancel(cancelReason error)
func (*ParallelTasks) Context ¶
func (p *ParallelTasks) Context(ctx context.Context) *ParallelTasks
Context will set context , up to now , task is not support to cancel if you cancel from context, wait will return immediately
func (*ParallelTasks) Do ¶
func (p *ParallelTasks) Do() *ParallelTasks
Do will start to execute all task in parallel
func (*ParallelTasks) FailFast ¶
func (p *ParallelTasks) FailFast() *ParallelTasks
func (*ParallelTasks) Name ¶
func (p *ParallelTasks) Name(name string) *ParallelTasks
func (*ParallelTasks) SetConcurrent ¶
func (p *ParallelTasks) SetConcurrent(count int) *ParallelTasks
SetConcurrent set the number of concurrency
func (*ParallelTasks) SetMaxConcurrent ¶ added in v0.10.0
func (p *ParallelTasks) SetMaxConcurrent(count int, max int) *ParallelTasks
SetMaxConcurrent set the number of concurrency. if count is greater than max, max is used.
func (*ParallelTasks) Wait ¶
func (p *ParallelTasks) Wait() ([]interface{}, error)
Wait will wait all task executed, if set fail fast , it will return immediately if any task returns errors up to now , task is not support to cancel you should invoke Do() before invoke Wait() the result of task will be saved in []interface{} if you set failfast and one errors happened, it will return one error if you not set failfase and any errors happened, it will return []error as MultiErrors