Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var (
ErrRunnerClosed = errors.New("runner closed")
)
var ErrTaskRunnerBusy = errors.New("task runner is busy")
ErrTaskRunnerBusy is the error that indicates the runner is busy.
Functions ¶
func GoSafe ¶
func GoSafe(fn func())
GoSafe runs the given fn using another goroutine, recovers if fn panics.
func GoSafeCtx ¶
GoSafeCtx runs the given fn using another goroutine, recovers if fn panics with ctx.
func RunSafeCtx ¶
RunSafeCtx runs the given fn, recovers if fn panics with ctx.
Types ¶
type RoutineGroup ¶
type RoutineGroup struct {
// contains filtered or unexported fields
}
A RoutineGroup is used to group goroutines together and all wait all goroutines to be done.
func (*RoutineGroup) Run ¶
func (g *RoutineGroup) Run(fn func())
Run runs the given fn in RoutineGroup. Don't reference the variables from outside, because outside variables can be changed by other goroutines
func (*RoutineGroup) RunSafe ¶
func (g *RoutineGroup) RunSafe(fn func())
RunSafe runs the given fn in RoutineGroup, and avoid panics. Don't reference the variables from outside, because outside variables can be changed by other goroutines
func (*RoutineGroup) Wait ¶
func (g *RoutineGroup) Wait()
Wait waits all running functions to be done.
type StableRunner ¶
type StableRunner[I, O any] struct { // contains filtered or unexported fields }
StableRunner is a runner that guarantees messages are taken out with the pushed order. This runner is typically useful for Kafka consumers with parallel processing.
func NewStableRunner ¶
func NewStableRunner[I, O any](fn func(I) O) *StableRunner[I, O]
NewStableRunner returns a new StableRunner with given message processor fn.
func (*StableRunner[I, O]) Get ¶
func (r *StableRunner[I, O]) Get() (O, error)
Get returns the next processed message in order. This method should be called in one goroutine.
func (*StableRunner[I, O]) Push ¶
func (r *StableRunner[I, O]) Push(v I) error
Push pushes the message v into the runner and to be processed concurrently, after processed, it will be cached to let caller take it in pushing order.
func (*StableRunner[I, O]) Wait ¶
func (r *StableRunner[I, O]) Wait()
Wait waits all the messages to be processed and taken from inner buffer.
type TaskRunner ¶
type TaskRunner struct {
// contains filtered or unexported fields
}
A TaskRunner is used to control the concurrency of goroutines.
func NewTaskRunner ¶
func NewTaskRunner(concurrency int) *TaskRunner
NewTaskRunner returns a TaskRunner.
func (*TaskRunner) Schedule ¶
func (rp *TaskRunner) Schedule(task func())
Schedule schedules a task to run under concurrency control.
func (*TaskRunner) ScheduleImmediately ¶
func (rp *TaskRunner) ScheduleImmediately(task func()) error
ScheduleImmediately schedules a task to run immediately under concurrency control. It returns ErrTaskRunnerBusy if the runner is busy.
type WorkerGroup ¶
type WorkerGroup struct {
// contains filtered or unexported fields
}
A WorkerGroup is used to run given number of workers to process jobs.
func NewWorkerGroup ¶
func NewWorkerGroup(job func(), workers int) WorkerGroup
NewWorkerGroup returns a WorkerGroup with given job and workers.