engine

package
v0.0.19 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 25, 2024 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	KindNormal = iota
)

Variables

This section is empty.

Functions

This section is empty.

Types

type AddTask

type AddTask[KEY Key] func(ctx context.Context, priority int, task ...*Task[KEY])

type Config

type Config[KEY Key] struct {
	WorkerCount     uint64
	WaitTaskCount   uint64
	MonitorInterval time.Duration // 全局检测定时器间隔时间,任务的卡住检测,worker panic recover都可以用这个检测
	DoneCache       ristretto.Config[KEY, struct{}]
	EnableTelemetry bool
}

func NewConfig added in v0.0.19

func NewConfig[KEY Key]() *Config[KEY]

func (*Config[KEY]) Init added in v0.0.19

func (c *Config[KEY]) Init()

func (*Config[KEY]) NewEngine

func (c *Config[KEY]) NewEngine() *Engine[KEY]

func (*Config[KEY]) NewEngineWithContext added in v0.0.19

func (c *Config[KEY]) NewEngineWithContext(ctx context.Context) *Engine[KEY]

type Engine

type Engine[KEY Key] struct {
	EngineStatistics
	// contains filtered or unexported fields
}

func NewEngine

func NewEngine[KEY Key](workerCount uint64) *Engine[KEY]

func NewEngineWithContext

func NewEngineWithContext[KEY Key](workerCount uint64, ctx context.Context) *Engine[KEY]

func (*Engine[KEY]) AddFixedTasks

func (e *Engine[KEY]) AddFixedTasks(workerId int, generation int, tasks ...*Task[KEY]) error

func (*Engine[KEY]) AddOptionTasks

func (e *Engine[KEY]) AddOptionTasks(ctx context.Context, priority int, tasks ...*Task[KEY])

func (*Engine[KEY]) AddTasks

func (e *Engine[KEY]) AddTasks(tasks ...*Task[KEY])

func (*Engine[KEY]) AddWorker

func (e *Engine[KEY]) AddWorker(num int)

func (*Engine[KEY]) AsyncAddOptionTasks

func (e *Engine[KEY]) AsyncAddOptionTasks(ctx context.Context, priority int, tasks ...*Task[KEY])

func (*Engine[KEY]) AsyncAddTasks

func (e *Engine[KEY]) AsyncAddTasks(tasks ...*Task[KEY])

func (*Engine[KEY]) Context

func (e *Engine[KEY]) Context() context.Context

func (*Engine[KEY]) ErrHandler

func (e *Engine[KEY]) ErrHandler(errHandler func(task *Task[KEY])) *Engine[KEY]

func (*Engine[KEY]) ErrHandlerRetryTimes

func (e *Engine[KEY]) ErrHandlerRetryTimes(times int) *Engine[KEY]

func (*Engine[KEY]) ErrHandlerUtilSuccess

func (e *Engine[KEY]) ErrHandlerUtilSuccess() *Engine[KEY]

func (*Engine[KEY]) ErrHandlerWriteToFile

func (e *Engine[KEY]) ErrHandlerWriteToFile(path string) *Engine[KEY]

func (*Engine[KEY]) ExecTask

func (e *Engine[KEY]) ExecTask(worker *Worker[KEY], task *Task[KEY])

func (*Engine[KEY]) KindGroupRandSpeedLimit

func (e *Engine[KEY]) KindGroupRandSpeedLimit(minInterval, maxInterval time.Duration, kinds ...Kind) *Engine[KEY]

func (*Engine[KEY]) KindGroupSpeedLimit

func (e *Engine[KEY]) KindGroupSpeedLimit(interval time.Duration, kinds ...Kind) *Engine[KEY]

多个kind共用一个timer

func (*Engine[KEY]) KindLimiter

func (e *Engine[KEY]) KindLimiter(kind Kind, r rate.Limit, b int) *Engine[KEY]

func (*Engine[KEY]) KindRandSpeedLimit

func (e *Engine[KEY]) KindRandSpeedLimit(kind Kind, minInterval, maxInterval time.Duration) *Engine[KEY]

func (*Engine[KEY]) KindSpeedLimit

func (e *Engine[KEY]) KindSpeedLimit(kind Kind, interval time.Duration) *Engine[KEY]

func (*Engine[KEY]) Limiter

func (e *Engine[KEY]) Limiter(r rate.Limit, b int) *Engine[KEY]

func (*Engine[KEY]) MonitorInterval

func (e *Engine[KEY]) MonitorInterval(interval time.Duration)

func (*Engine[KEY]) NewFixedWorker

func (e *Engine[KEY]) NewFixedWorker(interval time.Duration) int

func (*Engine[KEY]) OnStop added in v0.0.19

func (e *Engine[KEY]) OnStop(callBack func(context.Context)) *Engine[KEY]

func (*Engine[KEY]) RandSpeedLimited

func (e *Engine[KEY]) RandSpeedLimited(minInterval, maxInterval time.Duration) *Engine[KEY]

func (*Engine[KEY]) Run

func (e *Engine[KEY]) Run(tasks ...*Task[KEY])

func (*Engine[KEY]) RunSingleWorker

func (e *Engine[KEY]) RunSingleWorker(tasks ...*Task[KEY])

func (*Engine[KEY]) SkipKind

func (e *Engine[KEY]) SkipKind(kinds ...Kind) *Engine[KEY]

func (*Engine[KEY]) SpeedLimited

func (e *Engine[KEY]) SpeedLimited(interval time.Duration) *Engine[KEY]

func (*Engine[KEY]) Stop

func (e *Engine[KEY]) Stop()

func (*Engine[KEY]) StopAfter

func (e *Engine[KEY]) StopAfter(interval time.Duration) *Engine[KEY]

func (*Engine[KEY]) TaskSource

func (e *Engine[KEY]) TaskSource(taskSource func(addTask *Engine[KEY]))

TaskSource,参数为添加任务的函数,直到该函数运行结束,任务引擎才会检测任务是否结束

type EngineStatistics

type EngineStatistics struct {
	// contains filtered or unexported fields
}

EngineStatistics 基本引擎统计数据

type ErrHandle

type ErrHandle func(context.Context, error)

type Key

type Key interface {
	uint64 | string | byte | int | int32 | uint32 | int64
}

目前受限于ristretto.Cache的泛型限制,考虑移除并引入lru或boolong filter

type Kind

type Kind uint32

type KindHandler

type KindHandler[KEY Key] struct {
	Skip bool

	// TODO 指定Kind的Handler
	HandleFun TaskFunc[KEY]
	// contains filtered or unexported fields
}

type Task

type Task[KEY Key] struct {
	context.Context
	Kind     Kind
	Key      KEY
	Priority int
	Describe string

	TaskStatistics
	TaskFunc[KEY]
	// contains filtered or unexported fields
}

func NewTask

func NewTask[KEY Key](task TaskFunc[KEY]) *Task[KEY]

func (*Task[KEY]) Compare

func (t *Task[KEY]) Compare(t2 *Task[KEY]) int

func (*Task[KEY]) ErrLog

func (t *Task[KEY]) ErrLog()

func (*Task[KEY]) Errs

func (t *Task[KEY]) Errs() []error

func (*Task[KEY]) Id

func (t *Task[KEY]) Id() uint64

func (*Task[KEY]) SetDescribe

func (t *Task[KEY]) SetDescribe(describe string) *Task[KEY]

func (*Task[KEY]) SetKey

func (t *Task[KEY]) SetKey(key KEY) *Task[KEY]

func (*Task[KEY]) SetKind

func (t *Task[KEY]) SetKind(k Kind) *Task[KEY]

func (*Task[KEY]) SetPriority

func (t *Task[KEY]) SetPriority(priority int) *Task[KEY]

type TaskFunc

type TaskFunc[KEY Key] func(ctx context.Context) ([]*Task[KEY], error)

func (TaskFunc[KEY]) Do

func (t TaskFunc[KEY]) Do(ctx context.Context) ([]*Task[KEY], error)

type TaskInterface

type TaskInterface[KEY Key] interface {
	Do(ctx context.Context) ([]*Task[KEY], error)
}

type TaskStatistics

type TaskStatistics struct {
	ReExecTimes int
	ErrTimes    int
}

type Tasks

type Tasks[KEY Key] []*Task[KEY]

func (Tasks[KEY]) Less

func (tasks Tasks[KEY]) Less(i, j int) bool

type Type

type Type uint32

type Worker

type Worker[KEY Key] struct {
	// contains filtered or unexported fields
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL