Documentation ¶
Index ¶
- func ConcurrentTasksWithCtxTimeout(baseCtx context.Context, numWorker int, tasks []*ConcurrentTask) error
- func Map(size, numWorker int, f func(i int) error) error
- func MapWithContext(ctx context.Context, size, numWorker int, ...) error
- func MapWithCtxTimeout(baseCtx context.Context, size, numWorker int, timeout time.Duration, ...) error
- func MapWithTimeout(size, numWorker int, timeout time.Duration, ...) error
- func ProcessStream[T any, K comparable](ctx context.Context, numWorker int, inputChan <-chan T, outputChan chan<- K, ...) error
- func RunTasks(ctx context.Context, fs ...func(ctx context.Context) error) (err error)
- func Select[T any](ctx context.Context, src []<-chan T) <-chan *ValueType[T]
- func SuggestNum(i int) int
- func WithSignal(ctx context.Context) context.Context
- type BulkEmitter
- func (be *BulkEmitter[T, K]) Close()
- func (be *BulkEmitter[T, K]) CloseAndWait()
- func (be *BulkEmitter[T, K]) Commit(ctx context.Context, key string, request T, handler func(reply K, err error)) error
- func (be *BulkEmitter[T, K]) Do(ctx context.Context, key string, request T) (K, error)
- func (be *BulkEmitter[T, K]) Run(ctx context.Context) error
- type BulkEmitterHandler
- type BulkEmitterOption
- type BulkError
- type ConcurrentTask
- type ConcurrentTaskRunner
- type ProcessFunc
- type RunnerOption
- type Safe
- type ValueType
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ConcurrentTasksWithCtxTimeout ¶
func ConcurrentTasksWithCtxTimeout(baseCtx context.Context, numWorker int, tasks []*ConcurrentTask) error
ConcurrentTasksWithCtxTimeout process task functions concurrently
func Map ¶
Map is a helper function to parallel operate on slice. Example usage: ```
itemIDs := []int{1, 2, 3} items := make([]*Item, len(itemIDs)) routine.Map(len(items), 10, func(i int) { items[i] = getItemByID(itemIDs[i]) })
```
func MapWithContext ¶
func MapWithCtxTimeout ¶
func MapWithTimeout ¶
func MapWithTimeout(size, numWorker int, timeout time.Duration, f func(ctx context.Context, i int) error) error
MapWithTimeout is the same as Map, except each function need to be executed within timeout. Important reminder to handle the context provided: It is better to check the `ctx.Err() == nil` before any write operation, because the function may be canceled at any momement, and the underlying resource refering may have already been discarded.
func ProcessStream ¶
func ProcessStream[T any, K comparable](ctx context.Context, numWorker int, inputChan <-chan T, outputChan chan<- K, fn ProcessFunc[T, K]) error
ProcessStream provides a way to process stream data in multiple go routine, while preserving the ordinal order unchanged. The invoker just need to provide a process func to handle the concrete processing logic. If the func returns nil value or non-nil error, the value will be ignored. The parameter `inputChan` and `outputChan` accepts any kind of channels. Notice: The process func should honor the input context, otherwise it will result in routine leaking.
func Select ¶
Select function helps caller reading data from multiple channel on limit time, and caller can know result came from which channel by mapping ValueType.Index to input channels Example:
sources := make([]chan int32, 10) output := routine.Select(ctx, sources) for v := range output { fmt.Println(v.Index, v.Value) }
func SuggestNum ¶
SuggestNum 基于当前运行条件,适合并发的 goroutine 数量. 目前的评估算法是:需要处理的任务数量与 CPU 核数*2 相比较,选择较小者.
Types ¶
type BulkEmitter ¶
type BulkEmitter[T, K any] struct { // contains filtered or unexported fields }
BulkEmitter is used to stack multiple requests to do batch requests. It's mostly used to invoke batch API of database or service.
func NewBulkEmitter ¶
func NewBulkEmitter[T, K any](handler BulkEmitterHandler[T, K], opts ...BulkEmitterOption[T, K]) *BulkEmitter[T, K]
NewBulkEmitter returns a BulkEmitter object.
func (*BulkEmitter[T, K]) Close ¶
func (be *BulkEmitter[T, K]) Close()
Close release the resource used by emitter. Panic when called more than once.
func (*BulkEmitter[T, K]) CloseAndWait ¶
func (be *BulkEmitter[T, K]) CloseAndWait()
CloseAndWait release the resource used by emitter. Panic when called more than once. Wait until the the emitter finishes.
func (*BulkEmitter[T, K]) Commit ¶
func (be *BulkEmitter[T, K]) Commit(ctx context.Context, key string, request T, handler func(reply K, err error)) error
Commit submits the individual request. Rather than waiting until the whole action is completed, it returns intermediately after the request has been accepted by the bulk emitter, with the contraint of the processor limit. Key will be used to identify and deduplicate the requests. The response will be handled in argument `handler` after the reply has returned. If the handler is nil, the reply and error will be ignored.
func (*BulkEmitter[T, K]) Do ¶
func (be *BulkEmitter[T, K]) Do(ctx context.Context, key string, request T) (K, error)
Do submits the individual request. It will wait until the action is completed. Key will be used to identify and deduplicate the requests. The response will be the same for the request with same key.
WARNING: It's dangerous to invoke the Do() method in an unconstraint go routine, where routine leaking is inevitable. When in such case, it's strongly suggested to use Commit() instead of Do().
type BulkEmitterHandler ¶
BulkEmitterHandler is callback to invoke when a batch of requests are grouped together, and desires actual action.
type BulkEmitterOption ¶
type BulkEmitterOption[T, K any] func(be *BulkEmitter[T, K])
BulkEmitterOption helps set variables of BulkEmitter.
func WithInterval ¶
func WithInterval[T, K any](interval time.Duration) BulkEmitterOption[T, K]
WithInterval sets emitting interval.
func WithMaxSize ¶
func WithMaxSize[T, K any](size int) BulkEmitterOption[T, K]
WithMaxSize sets max size of a bulk.
func WithWorker ¶
func WithWorker[T, K any](num int) BulkEmitterOption[T, K]
WithWorker sets the concurrent worker to coexists.
type BulkError ¶
BulkError can be returned in `BulkEmitterHandler` to indicate error for each individual response.
type ConcurrentTask ¶
type ConcurrentTask struct { F func(ctx context.Context) error WithBlockSignal bool IgnoreBlockSignal bool Timeout time.Duration }
ConcurrentTask define functions as tasks. F指的是具体的可以并行执行的任务; WithBlockSignal表示当前任务如果出错,是否发出一个block信号,去尝试阻止任务集群内更多的任务继续执行。(已经开始的任务要看任务内部是否处理相关信号。) IgnoreBlockSignal表示当前任务不关心block信号,仅会考虑超时的影响 Timeout表示每个任务自己的超时时间
type ConcurrentTaskRunner ¶
type ConcurrentTaskRunner struct {
// contains filtered or unexported fields
}
func NewConcurrentTaskRunner ¶
func NewConcurrentTaskRunner(ctx context.Context, opts ...RunnerOption) *ConcurrentTaskRunner
func (*ConcurrentTaskRunner) AddTasks ¶
func (c *ConcurrentTaskRunner) AddTasks(fs ...func(ctx context.Context) error) *ConcurrentTaskRunner
func (*ConcurrentTaskRunner) Run ¶
func (c *ConcurrentTaskRunner) Run() (err error)
type ProcessFunc ¶
type ProcessFunc[T any, K comparable] func(context.Context, T) (K, error)
type RunnerOption ¶
type RunnerOption func(c *ConcurrentTaskRunner)
func WithNumOfWorker ¶
func WithNumOfWorker(numOfWorker int) RunnerOption
func WithTimeout ¶
func WithTimeout(duration time.Duration) RunnerOption
type Safe ¶
type Safe struct {
// contains filtered or unexported fields
}
Safe aids user access fields in struct in concurrent way. The returned error from given function will be passthrough. Suggested usage: ```
type MyClass struct { routine.Safe ... Other fields }
err := myclass.View(func() error { got := myclass.somefield ... DO NOT invoke other method of the same instance, otherwise it may occurs deadlock by accident. })
```