Documentation ¶
Index ¶
- Variables
- func SetConcurrency(concurrency int64) options.Option
- func SetConsumeQueue(consumeQueue string) options.Option
- func SetDelayedQueue(delayedQueue string) options.Option
- func SetNoUnixSignals(noUnixSignals bool) options.Option
- func SetResultExpire(expire int64) options.Option
- type Config
- type Server
- func (s *Server) EnforcementConf()
- func (s *Server) GetBackend() backend.Backend
- func (s *Server) GetConfig() *Config
- func (s *Server) GetController() controller.Controller
- func (s *Server) GetLocker() locker.Locker
- func (s *Server) GetRegisteredTask(name string) (interface{}, bool)
- func (s *Server) IsRegisteredTask(name string) bool
- func (s *Server) NewWorker(consumerTag string, concurrency int, queue string) *Worker
- func (s *Server) RegisteredTask(name string, fn interface{}) error
- func (s *Server) RegisteredTasks(handelTaskMap map[string]interface{}) error
- func (s *Server) RegisteredTimedChain(spec, name string, signatures ...*task.Signature) error
- func (s *Server) RegisteredTimedGroup(spec, name string, groupID string, concurrency int, ...) error
- func (s *Server) RegisteredTimedGroupCallback(spec, name string, groupID string, concurrency int, callback *task.Signature, ...) error
- func (s *Server) RegisteredTimedTask(spec, name string, signature *task.Signature) error
- func (s *Server) SendChain(chain *task.Chain) (*result.ChainAsyncResult, error)
- func (s *Server) SendGroup(group *task.Group, concurrency int) ([]*result.AsyncResult, error)
- func (s *Server) SendGroupCallback(groupCallback *task.GroupCallback, concurrency int) (*result.GroupCallbackAsyncResult, error)
- func (s *Server) SendGroupCallbackWithContext(ctx context.Context, groupCallback *task.GroupCallback, concurrency int) (*result.GroupCallbackAsyncResult, error)
- func (s *Server) SendGroupWithContext(ctx context.Context, group *task.Group, concurrency int) ([]*result.AsyncResult, error)
- func (s *Server) SendTask(signature *task.Signature) (*result.AsyncResult, error)
- func (s *Server) SendTaskWithContext(ctx context.Context, signature *task.Signature) (*result.AsyncResult, error)
- type Worker
- func (w *Worker) ConsumeQueue() string
- func (w *Worker) PreConsumeHandler() bool
- func (w *Worker) Process(signature *task.Signature) error
- func (w *Worker) Quit()
- func (w *Worker) SetAfterTaskHandler(f func(task *task.Signature))
- func (w *Worker) SetBeforeTaskHandler(f func(task *task.Signature))
- func (w *Worker) SetErrorHandler(f func(err error))
- func (w *Worker) SetPreConsumeHandler(f func(worker *Worker) bool)
- func (w *Worker) Start() error
- func (w *Worker) StartSync(errChan chan<- error)
Constants ¶
This section is empty.
Variables ¶
var ( ErrWorkerGracefullyQuit = errors.New("worker quit gracefully") ErrWorkerAbruptlyQuit = errors.New("worker quit abruptly") )
Functions ¶
func SetConsumeQueue ¶
SetConsumeQueue 设置消费队列
func SetDelayedQueue ¶
SetDelayedQueue 设置延时队列
func SetNoUnixSignals ¶
SetNoUnixSignals 设置是否优雅关闭
func SetResultExpire ¶
SetResultExpire 设置 backend result 过期时间
Types ¶
type Config ¶
type Config struct { NoUnixSignals bool `json:"no_unix_signals"` ResultExpire int64 `json:"result_expire"` Concurrency int64 `json:"concurrency"` ConsumeQueue string `json:"consume_queue"` DelayedQueue string `json:"delayed_queue"` }
Config distributed service配置文件
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
func NewServer ¶
func NewServer(controller controller.Controller, backend backend.Backend, lock locker.Locker, helper *log.Helper, prePublishHandler func(task *task.Signature), options ...options.Option) *Server
NewServer 创建服务
func (*Server) EnforcementConf ¶
func (s *Server) EnforcementConf()
func (*Server) GetController ¶
func (s *Server) GetController() controller.Controller
GetController 获取 Controller
func (*Server) GetRegisteredTask ¶
GetRegisteredTask 获取注册的任务
func (*Server) IsRegisteredTask ¶
IsRegisteredTask 判断任务是否注册
func (*Server) RegisteredTask ¶
RegisteredTask 注册多个任务 interface 规则: 必须是func且必须有返回参数,最后一个出参是error
func (*Server) RegisteredTasks ¶
RegisteredTasks 注册多个任务 handelTaskMap map[string]interface{} interface 规则: 必须是func且必须有返回参数,最后一个出参是error
func (*Server) RegisteredTimedChain ¶
RegisteredTimedChain 注册定时链式任务
func (*Server) RegisteredTimedGroup ¶
func (s *Server) RegisteredTimedGroup(spec, name string, groupID string, concurrency int, signatures ...*task.Signature) error
RegisteredTimedGroup 注册定时任务组
func (*Server) RegisteredTimedGroupCallback ¶
func (s *Server) RegisteredTimedGroupCallback(spec, name string, groupID string, concurrency int, callback *task.Signature, signatures ...*task.Signature) error
RegisteredTimedGroupCallback 注册具有回调的组任务
func (*Server) RegisteredTimedTask ¶
RegisteredTimedTask 注册定时任务
func (*Server) SendGroupCallback ¶
func (s *Server) SendGroupCallback(groupCallback *task.GroupCallback, concurrency int) (*result.GroupCallbackAsyncResult, error)
SendGroupCallback 发送具有回调任务的任务组
func (*Server) SendGroupCallbackWithContext ¶
func (s *Server) SendGroupCallbackWithContext(ctx context.Context, groupCallback *task.GroupCallback, concurrency int) (*result.GroupCallbackAsyncResult, error)
SendGroupCallbackWithContext 发送具有回调任务的任务组
func (*Server) SendGroupWithContext ¶
func (s *Server) SendGroupWithContext(ctx context.Context, group *task.Group, concurrency int) ([]*result.AsyncResult, error)
SendGroupWithContext 发送并行执行的任务组
func (*Server) SendTaskWithContext ¶
func (s *Server) SendTaskWithContext(ctx context.Context, signature *task.Signature) (*result.AsyncResult, error)
SendTaskWithContext 发送任务,可以传入ctx
type Worker ¶
type Worker struct { NoUnixSignals bool `json:"no_unix_signals"` // 是否使用unix信号控制 Concurrency int `json:"concurrency"` // 并发数 ConsumerTag string `json:"consumer_tag"` // 消费者标签 Queue string `json:"queue"` // 队列名称 // contains filtered or unexported fields }
Worker 任务处理
func (*Worker) ConsumeQueue ¶
func (*Worker) PreConsumeHandler ¶
func (*Worker) SetAfterTaskHandler ¶
SetAfterTaskHandler 设置执行任务后执行函数
func (*Worker) SetBeforeTaskHandler ¶
SetBeforeTaskHandler 设置执行任务前执行函数
func (*Worker) SetErrorHandler ¶
SetErrorHandler 设置处理错误函数
func (*Worker) SetPreConsumeHandler ¶
SetPreConsumeHandler 设置预处理函数