worker

package
v0.1.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 13, 2024 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultProcess   = "defaultProcess"   // 默认处理
	AggregateProcess = "aggregateProcess" // 聚合处理
	CronProcess      = "cronProcess"      // 定时任务
)

Variables

View Source
var (
	ErrUuidNil                       = fmt.Errorf("uuid is empty")
	ErrRedisNil                      = fmt.Errorf("redis is empty")
	ErrRedisInvalid                  = fmt.Errorf("redis is invalid")
	ErrExprInvalid                   = fmt.Errorf("expr is invalid")
	ErrSaveCron                      = fmt.Errorf("save cron failed")
	ErrHttpCallbackInvalidStatusCode = fmt.Errorf("http callback invalid status code")
)

Functions

func WithCallback

func WithCallback(s string) func(*Options)

WithCallback 设置任务完成后的回调地址

func WithClearArchived

func WithClearArchived(second int) func(*Options)

WithClearArchived 清除已存档任务的间隔,默认为 300 秒

func WithConcurrencyNum

func WithConcurrencyNum(s int) func(*Options)

WithConcurrencyNum 设置任务处理器的并发数

func WithGroup

func WithGroup(s string) func(*Options)

WithGroup 设置任务处理器的组名

func WithGroupGracePeriod

func WithGroupGracePeriod(second int) func(*Options)

WithGroupGracePeriod 每多少秒聚合一次

func WithGroupMaxDelay

func WithGroupMaxDelay(second int) func(*Options)

WithGroupMaxDelay 最晚多少秒聚合一次

func WithGroupMaxSize

func WithGroupMaxSize(second int) func(*Options)

WithGroupMaxSize 多少个任务聚合一次

func WithHandler

func WithHandler(fun func(ctx context.Context, p Payload) error) func(*Options)

WithHandler 设置任务的回调处理器

func WithHandlerAggregator

func WithHandlerAggregator(fun func(ctx context.Context, task *asynq.Task) error) func(*Options)

WithHandleAggregator 设置聚合任务的回调处理器

func WithHandlerNeedWorker

func WithHandlerNeedWorker(fun func(worker Worker, ctx context.Context, p Payload) error) func(*Options)

WithHandlerNeedWorker 设置需要Worker参数的任务处理函数

func WithMaxRetry

func WithMaxRetry(count int) func(*Options)

WithMaxRetry 任务出错时的最大重试次数,默认为 3

func WithRedisLinkMode

func WithRedisLinkMode(s string) func(*Options)

WithRedisLinkMode 设置redis连接模式

func WithRedisPeriodKey

func WithRedisPeriodKey(s string) func(*Options)

WithRedisPeriodKey 设置redis周期任务key

func WithRedisUri

func WithRedisUri(s string) func(*Options)

WithRedisUri 设置redis连接地址,默认值redis://127.0.0.1:6379/0

func WithRetention

func WithRetention(second int) func(*Options)

WithRetention 成功任务存储时间,默认 60 秒,如果提供此选项,任务将在成功处理后作为已完成任务存储

func WithRunAt

func WithRunAt(at time.Time) func(*RunOptions)

WithRunAt 运行任务的时间

func WithRunCtx

func WithRunCtx(ctx context.Context) func(*RunOptions)

WithRunCtx 任务上下文

func WithRunExpr

func WithRunExpr(s string) func(*RunOptions)

WithRunExpr Cron表达式, 最小单位1分钟, 参见gorhill/cronexpr

func WithRunGroup

func WithRunGroup(s string) func(*RunOptions)

WithRunGroup 组前缀,默认组

func WithRunIn

func WithRunIn(in time.Duration) func(*RunOptions)

WithRunIn 任务延迟执行,在xxx秒内运行

func WithRunMaxRetry

func WithRunMaxRetry(count int) func(*RunOptions)

WithRunMaxRetry 最大重试次数, 任务回调发生error会重试,默认3次

func WithRunNow

func WithRunNow(flag bool) func(*RunOptions)

WithRunNow 立即运行任务

func WithRunPayload

func WithRunPayload(s []byte) func(*RunOptions)

WithRunPayload 任务负载,任务回调会使用

func WithRunReplace

func WithRunReplace(flag bool) func(*RunOptions)

WithRunReplace 当uid重复时,删除旧的并创建新的

func WithRunRetention

func WithRunRetention(second int) func(*RunOptions)

WithRunRetention 任务过期时间,默认60秒

func WithRunTimeout

func WithRunTimeout(second int) func(*RunOptions)

WithRunTimeout 任务超时,默认60秒

func WithRunUuid

func WithRunUuid(s string) func(*RunOptions)

WithRunUuid 任务唯一id

func WithTimeout

func WithTimeout(second int) func(*Options)

WithTimeout 任务超时时间,默认为 10 秒

func WithUseAggregator

func WithUseAggregator(s bool) func(*Options)

WithUseAggregator 是否使用聚合器

Types

type BaseProcess

type BaseProcess struct{}

func (*BaseProcess) GetProcessConfig

func (q *BaseProcess) GetProcessConfig() *ProcessConfig

func (*BaseProcess) GetTopic

func (q *BaseProcess) GetTopic() string

func (*BaseProcess) Handle

func (q *BaseProcess) Handle(ctx context.Context, p Payload) (err error)

func (*BaseProcess) HandleAggregate

func (q *BaseProcess) HandleAggregate(ctx context.Context, task *asynq.Task) (err error)

HandleAggregate 处理消息

type Options

type Options struct {
	// contains filtered or unexported fields
}

type Payload

type Payload struct {
	Group   string `json:"group"`
	Uid     string `json:"uid"`
	Payload []byte `json:"payload"`
}

func (Payload) String

func (p Payload) String() (str string)

type Process

type Process interface {
	GetTopic() string
	GetProcessConfig() *ProcessConfig                                  // 获取消费主题
	Handle(ctx context.Context, p Payload) (err error)                 // 处理过程的方法
	HandleAggregate(ctx context.Context, task *asynq.Task) (err error) // 处理聚合过程的方法
}

Process 任务具体处理过程接口,实现该接口即可加入到任务队列中

type ProcessConfig

type ProcessConfig struct {
	Topic       string
	ProcessType string
}

type RunOptions

type RunOptions struct {
	// contains filtered or unexported fields
}

type Scheduled

type Scheduled struct {
	// contains filtered or unexported fields
}

Scheduled 任务调度器

func RegisterProcess

func RegisterProcess(p Process) (s *Scheduled)

func (*Scheduled) Cron

func (s *Scheduled) Cron(ctx context.Context, topic, cronExpr string, data []byte) (err error)

Cron 采用定时任务的方式执行任务

func (*Scheduled) Push

func (s *Scheduled) Push(ctx context.Context, topic string, dataId string, data []byte, timeout int) (err error)

Push 采用消息队列的方式执行任务

type Worker

type Worker struct {
	Error error
	// contains filtered or unexported fields
}

func New

func New(options ...func(*Options)) *Worker

New 创建一个新的任务处理器

func (*Worker) Cron

func (wk *Worker) Cron(options ...func(*RunOptions)) (err error)

Cron 设置周期性任务

func (*Worker) Once

func (wk *Worker) Once(options ...func(*RunOptions)) (err error)

func (*Worker) Remove

func (wk *Worker) Remove(ctx context.Context, uid string) (err error)

Remove 移除任务

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL