Documentation ¶
Index ¶
- func ListenChan[T any](chans ...<-chan T) (ch <-chan T)
- func NewPipelineRunner[T any](ctx context.Context, steps ...func(context.Context, T) bool) (push func(T) bool, successCh <-chan T, endPush func())
- func NewRunner[T any](ctx context.Context, max int, fn func(context.Context, T) error) (run func(t T, block bool) error, wait func(fastExit bool) error)
- func RunConcurrently(ctx context.Context, fn ...func(context.Context) error) (wait func(fastExit bool) error)
- func RunData[T any](ctx context.Context, fn func(context.Context, T) error, fastExit bool, ...) error
- func RunPeriodically(period time.Duration) (run func(fn func()))
- func RunPipeline[T any](ctx context.Context, jobs []T, stopWhenErr bool, ...) (successCh <-chan T, errCh <-chan error)
- func RunSequentially(ctx context.Context, fn ...func(context.Context) error) error
- type Queue
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ListenChan ¶
func ListenChan[T any](chans ...<-chan T) (ch <-chan T)
ListenChan 监听chans,一旦有一个chan激活便立即将T发送给ch,并close ch。
若所有chans都未曾激活(chan是nil也认为未激活)且都close了,则ch被close。
若同时多个chans被激活,则随机将一个激活值发送给ch。
func NewPipelineRunner ¶
func NewPipelineRunner[T any](ctx context.Context, steps ...func(context.Context, T) bool) ( push func(T) bool, successCh <-chan T, endPush func())
NewPipelineRunner 形同RunPipeline,不同在于使用push推送job。step返回true表示传递给下一个step处理。
func NewRunner ¶
func NewRunner[T any](ctx context.Context, max int, fn func(context.Context, T) error) ( run func(t T, block bool) error, wait func(fastExit bool) error)
NewRunner 该函数提供同时最多运行max个协程fn,一旦fn发生error便终止fn运行。
max小于等于0表示不限制协程数。
朝返回的run函数中添加fn,若block为true表示正在运行的任务数已达到max则会阻塞。
run函数返回error为任务fn返回的第一个error,与wait函数返回的error为同一个。
注意请在add完所有任务后调用wait。
Example ¶
type product struct { // some stuff } ctx := context.Background() op := func(ctx context.Context, data *product) error { // do something return nil } add, wait := goroutine_util.NewRunner[*product](ctx, 12, op) // many products var projects []*product for _, v := range projects { // blocked since number of ops running simultaneously reaches 12 if err := add(v, true); err != nil { // means having a op return err } // no block if err := add(v, false); err != nil { // means having a op return err } } // wait all op done and check err if err := wait(true); err != nil { // op occur err }
Output:
func RunConcurrently ¶
func RunConcurrently(ctx context.Context, fn ...func(context.Context) error) (wait func(fastExit bool) error)
RunConcurrently 并发运行fn,一旦有error发生终止运行。
Example ¶
ctx := context.Background() var order any work1 := func(ctx context.Context) error { // op order order = nil return nil } var stock any work2 := func(ctx context.Context) error { // op stock stock = nil return nil } err := goroutine_util.RunConcurrently(ctx, work1, work2)(false) // check err if err != nil { return } // do your want _ = order _ = stock
Output:
func RunData ¶
func RunData[T any](ctx context.Context, fn func(context.Context, T) error, fastExit bool, jobs ...T) error
RunData 并发将jobs传递给fn函数运行,一旦发生error便立即返回该error,并结束其它协程。
func RunPeriodically ¶
RunPeriodically 依次运行fn,每个fn之间至少间隔period时间。
func RunPipeline ¶
func RunPipeline[T any](ctx context.Context, jobs []T, stopWhenErr bool, steps ...func(context.Context, T) error) ( successCh <-chan T, errCh <-chan error)
RunPipeline 将每个jobs依次递给steps函数处理。一旦某个step发生error或者panic,立即返回该error,并及时结束其他协程。 除非stopWhenErr为false,则只是终止该job往下一个step投递。
一个job最多在一个step中运行一次,且一个job一定是依次序递给steps,前一个step处理完毕才会给下一个step处理。
每个step并发运行jobs。
等待所有jobs处理结束时会close successCh、errCh,或者ctx被cancel时也将及时结束开启的goroutine后返回。
从successCh和errCh中获取成功跑完所有step的job和是否发生error。
若steps中含有nil将会panic。
Example ¶
type data struct{} ctx := context.Background() jobs := []*data{{}, {}} work1 := func(ctx context.Context, d *data) error { return nil } work2 := func(ctx context.Context, d *data) error { return nil } succCh, errCh := goroutine_util.RunPipeline(ctx, jobs, false, work1, work2) select { case <-succCh: case <-errCh: // return err }
Output:
func RunSequentially ¶
RunSequentially 依次运行fn,当有error发生时停止后续fn运行。
Example ¶
ctx := context.Background() first := func(context.Context) error { return nil } then := func(context.Context) error { return nil } last := func(context.Context) error { return nil } err := goroutine_util.RunSequentially(ctx, first, then, last) if err != nil { // return err }
Output: