goroutine_util

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 30, 2024 License: MulanPSL-2.0 Imports: 8 Imported by: 1

README

1. 说明

Go 协程工具函数库

2. 使用

go get gitee.com/ivfzhou/goroutine-util@latest
// RunConcurrently 并发运行fn,一旦有error发生终止运行。
func RunConcurrently(ctx context.Context, fn ...func(context.Context) error) (wait func(fastExit bool) error)

// RunSequentially 依次运行fn,当有error发生时停止后续fn运行。
func RunSequentially(ctx context.Context, fn ...func (context.Context) error) error

// NewRunner 该函数提供同时最多运行max个协程fn,一旦fn发生error便终止fn运行。
//
// max小于等于0表示不限制协程数。
//
// 朝返回的run函数中添加fn,若block为true表示正在运行的任务数已达到max则会阻塞。
//
// run函数返回error为任务fn返回的第一个error,与wait函数返回的error为同一个。
//
// 注意请在add完所有任务后调用wait。
func NewRunner[T any](ctx context.Context, max int, fn func (context.Context, T) error) (run func (t T, block bool) error, wait func (fastExit bool) error)

// RunData 并发将jobs传递给fn函数运行,一旦发生error便立即返回该error,并结束其它协程。
func RunData[T any](ctx context.Context, fn func (context.Context, T) error, fastExit bool, jobs ...T) error

// RunPipeline 将每个jobs依次递给steps函数处理。一旦某个step发生error或者panic,立即返回该error,并及时结束其他协程。
// 除非stopWhenErr为false,则只是终止该job往下一个step投递。
//
// 一个job最多在一个step中运行一次,且一个job一定是依次序递给steps,前一个step处理完毕才会给下一个step处理。
//
// 每个step并发运行jobs。
//
// 等待所有jobs处理结束时会close successCh、errCh,或者ctx被cancel时也将及时结束开启的goroutine后返回。
//
// 从successCh和errCh中获取成功跑完所有step的job和是否发生error。
//
// 若steps中含有nil将会panic。
func RunPipeline[T any](ctx context.Context, jobs []T, stopWhenErr bool, steps ...func (context.Context, T) error) (successCh <-chan T, errCh <-chan error)

// NewPipelineRunner 形同RunPipeline,不同在于使用push推送job。step返回true表示传递给下一个step处理。
func NewPipelineRunner[T any](ctx context.Context, steps ...func (context.Context, T) bool) (push func (T) bool, successCh <-chan T, endPush func ())

// ListenChan 监听chans,一旦有一个chan激活便立即将T发送给ch,并close ch。
//
// 若所有chans都未曾激活(chan是nil也认为未激活)且都close了,则ch被close。
//
// 若同时多个chans被激活,则随机将一个激活值发送给ch。
func ListenChan[T any](chans ...<-chan T) (ch <-chan T)

// RunPeriodically 依次运行fn,每个fn之间至少间隔period时间。
func RunPeriodically(period time.Duration) (run func (fn func ()))

3. 联系作者

电邮:ifzhou@126.com

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func ListenChan

func ListenChan[T any](chans ...<-chan T) (ch <-chan T)

ListenChan 监听chans,一旦有一个chan激活便立即将T发送给ch,并close ch。

若所有chans都未曾激活(chan是nil也认为未激活)且都close了,则ch被close。

若同时多个chans被激活,则随机将一个激活值发送给ch。

func NewPipelineRunner

func NewPipelineRunner[T any](ctx context.Context, steps ...func(context.Context, T) bool) (
	push func(T) bool, successCh <-chan T, endPush func())

NewPipelineRunner 形同RunPipeline,不同在于使用push推送job。step返回true表示传递给下一个step处理。

func NewRunner

func NewRunner[T any](ctx context.Context, max int, fn func(context.Context, T) error) (
	run func(t T, block bool) error, wait func(fastExit bool) error)

NewRunner 该函数提供同时最多运行max个协程fn,一旦fn发生error便终止fn运行。

max小于等于0表示不限制协程数。

朝返回的run函数中添加fn,若block为true表示正在运行的任务数已达到max则会阻塞。

run函数返回error为任务fn返回的第一个error,与wait函数返回的error为同一个。

注意请在add完所有任务后调用wait。

Example
type product struct {
	// some stuff
}
ctx := context.Background()
op := func(ctx context.Context, data *product) error {
	// do something
	return nil
}
add, wait := goroutine_util.NewRunner[*product](ctx, 12, op)

// many products
var projects []*product
for _, v := range projects {
	// blocked since number of ops running simultaneously reaches 12
	if err := add(v, true); err != nil {
		// means having a op return err
	}

	// no block
	if err := add(v, false); err != nil {
		// means having a op return err
	}
}

// wait all op done and check err
if err := wait(true); err != nil {
	// op occur err
}
Output:

func RunConcurrently

func RunConcurrently(ctx context.Context, fn ...func(context.Context) error) (wait func(fastExit bool) error)

RunConcurrently 并发运行fn,一旦有error发生终止运行。

Example
ctx := context.Background()
var order any
work1 := func(ctx context.Context) error {
	// op order
	order = nil
	return nil
}

var stock any
work2 := func(ctx context.Context) error {
	// op stock
	stock = nil
	return nil
}
err := goroutine_util.RunConcurrently(ctx, work1, work2)(false)
// check err
if err != nil {
	return
}

// do your want
_ = order
_ = stock
Output:

func RunData

func RunData[T any](ctx context.Context, fn func(context.Context, T) error, fastExit bool, jobs ...T) error

RunData 并发将jobs传递给fn函数运行,一旦发生error便立即返回该error,并结束其它协程。

func RunPeriodically

func RunPeriodically(period time.Duration) (run func(fn func()))

RunPeriodically 依次运行fn,每个fn之间至少间隔period时间。

func RunPipeline

func RunPipeline[T any](ctx context.Context, jobs []T, stopWhenErr bool, steps ...func(context.Context, T) error) (
	successCh <-chan T, errCh <-chan error)

RunPipeline 将每个jobs依次递给steps函数处理。一旦某个step发生error或者panic,立即返回该error,并及时结束其他协程。 除非stopWhenErr为false,则只是终止该job往下一个step投递。

一个job最多在一个step中运行一次,且一个job一定是依次序递给steps,前一个step处理完毕才会给下一个step处理。

每个step并发运行jobs。

等待所有jobs处理结束时会close successCh、errCh,或者ctx被cancel时也将及时结束开启的goroutine后返回。

从successCh和errCh中获取成功跑完所有step的job和是否发生error。

若steps中含有nil将会panic。

Example
type data struct{}
ctx := context.Background()

jobs := []*data{{}, {}}
work1 := func(ctx context.Context, d *data) error { return nil }
work2 := func(ctx context.Context, d *data) error { return nil }

succCh, errCh := goroutine_util.RunPipeline(ctx, jobs, false, work1, work2)
select {
case <-succCh:
case <-errCh:
	// return err
}
Output:

func RunSequentially

func RunSequentially(ctx context.Context, fn ...func(context.Context) error) error

RunSequentially 依次运行fn,当有error发生时停止后续fn运行。

Example
ctx := context.Background()
first := func(context.Context) error { return nil }
then := func(context.Context) error { return nil }
last := func(context.Context) error { return nil }
err := goroutine_util.RunSequentially(ctx, first, then, last)
if err != nil {
	// return err
}
Output:

Types

type Queue

type Queue[E any] struct {
	// contains filtered or unexported fields
}

func (*Queue[E]) Close

func (q *Queue[E]) Close()

Close 从GetFromChan中获取的chan将被close。

func (*Queue[E]) GetFromChan

func (q *Queue[E]) GetFromChan() <-chan E

GetFromChan 获取队列头元素。

func (*Queue[E]) Push

func (q *Queue[E]) Push(elem E) bool

Push 向队列尾部加元素,如果队列已被close则不会加元素。

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL