distributed

package
v0.0.0-...-d3f65ed Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 4, 2024 License: Apache-2.0 Imports: 17 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrWorkerGracefullyQuit = errors.New("worker quit gracefully")
	ErrWorkerAbruptlyQuit   = errors.New("worker quit abruptly")
)

Functions

func SetConcurrency

func SetConcurrency(concurrency int64) options.Option

SetConcurrency 设置并发量

func SetConsumeQueue

func SetConsumeQueue(consumeQueue string) options.Option

SetConsumeQueue 设置消费队列

func SetDelayedQueue

func SetDelayedQueue(delayedQueue string) options.Option

SetDelayedQueue 设置延时队列

func SetNoUnixSignals

func SetNoUnixSignals(noUnixSignals bool) options.Option

SetNoUnixSignals 设置是否优雅关闭

func SetResultExpire

func SetResultExpire(expire int64) options.Option

SetResultExpire 设置 backend result 过期时间

Types

type Config

type Config struct {
	NoUnixSignals bool   `json:"no_unix_signals"`
	ResultExpire  int64  `json:"result_expire"`
	Concurrency   int64  `json:"concurrency"`
	ConsumeQueue  string `json:"consume_queue"`
	DelayedQueue  string `json:"delayed_queue"`
}

Config distributed service配置文件

type Server

type Server struct {
	// contains filtered or unexported fields
}

func NewServer

func NewServer(controller controller.Controller, backend backend.Backend, lock locker.Locker, helper *log.Helper, prePublishHandler func(task *task.Signature), options ...options.Option) *Server

NewServer 创建服务

func (*Server) EnforcementConf

func (s *Server) EnforcementConf()

func (*Server) GetBackend

func (s *Server) GetBackend() backend.Backend

GetBackend 获取 Backend

func (*Server) GetConfig

func (s *Server) GetConfig() *Config

GetConfig 获取配置文件

func (*Server) GetController

func (s *Server) GetController() controller.Controller

GetController 获取 Controller

func (*Server) GetLocker

func (s *Server) GetLocker() locker.Locker

GetLocker 获取 Locker

func (*Server) GetRegisteredTask

func (s *Server) GetRegisteredTask(name string) (interface{}, bool)

GetRegisteredTask 获取注册的任务

func (*Server) IsRegisteredTask

func (s *Server) IsRegisteredTask(name string) bool

IsRegisteredTask 判断任务是否注册

func (*Server) NewWorker

func (s *Server) NewWorker(consumerTag string, concurrency int, queue string) *Worker

func (*Server) RegisteredTask

func (s *Server) RegisteredTask(name string, fn interface{}) error

RegisteredTask 注册多个任务 interface 规则: 必须是func且必须有返回参数,最后一个出参是error

func (*Server) RegisteredTasks

func (s *Server) RegisteredTasks(handelTaskMap map[string]interface{}) error

RegisteredTasks 注册多个任务 handelTaskMap map[string]interface{} interface 规则: 必须是func且必须有返回参数,最后一个出参是error

func (*Server) RegisteredTimedChain

func (s *Server) RegisteredTimedChain(spec, name string, signatures ...*task.Signature) error

RegisteredTimedChain 注册定时链式任务

func (*Server) RegisteredTimedGroup

func (s *Server) RegisteredTimedGroup(spec, name string, groupID string, concurrency int, signatures ...*task.Signature) error

RegisteredTimedGroup 注册定时任务组

func (*Server) RegisteredTimedGroupCallback

func (s *Server) RegisteredTimedGroupCallback(spec, name string, groupID string, concurrency int, callback *task.Signature, signatures ...*task.Signature) error

RegisteredTimedGroupCallback 注册具有回调的组任务

func (*Server) RegisteredTimedTask

func (s *Server) RegisteredTimedTask(spec, name string, signature *task.Signature) error

RegisteredTimedTask 注册定时任务

func (*Server) SendChain

func (s *Server) SendChain(chain *task.Chain) (*result.ChainAsyncResult, error)

SendChain 发送链式调用任务

func (*Server) SendGroup

func (s *Server) SendGroup(group *task.Group, concurrency int) ([]*result.AsyncResult, error)

SendGroup 发送并行任务组

func (*Server) SendGroupCallback

func (s *Server) SendGroupCallback(groupCallback *task.GroupCallback, concurrency int) (*result.GroupCallbackAsyncResult, error)

SendGroupCallback 发送具有回调任务的任务组

func (*Server) SendGroupCallbackWithContext

func (s *Server) SendGroupCallbackWithContext(ctx context.Context, groupCallback *task.GroupCallback, concurrency int) (*result.GroupCallbackAsyncResult, error)

SendGroupCallbackWithContext 发送具有回调任务的任务组

func (*Server) SendGroupWithContext

func (s *Server) SendGroupWithContext(ctx context.Context, group *task.Group, concurrency int) ([]*result.AsyncResult, error)

SendGroupWithContext 发送并行执行的任务组

func (*Server) SendTask

func (s *Server) SendTask(signature *task.Signature) (*result.AsyncResult, error)

SendTask 发送任务

func (*Server) SendTaskWithContext

func (s *Server) SendTaskWithContext(ctx context.Context, signature *task.Signature) (*result.AsyncResult, error)

SendTaskWithContext 发送任务,可以传入ctx

type Worker

type Worker struct {
	NoUnixSignals bool `json:"no_unix_signals"` // 是否使用unix信号控制

	Concurrency int    `json:"concurrency"`  // 并发数
	ConsumerTag string `json:"consumer_tag"` // 消费者标签
	Queue       string `json:"queue"`        // 队列名称
	// contains filtered or unexported fields
}

Worker 任务处理

func (*Worker) ConsumeQueue

func (w *Worker) ConsumeQueue() string

func (*Worker) PreConsumeHandler

func (w *Worker) PreConsumeHandler() bool

func (*Worker) Process

func (w *Worker) Process(signature *task.Signature) error

Process 任务处理

func (*Worker) Quit

func (w *Worker) Quit()

Quit 触发强制退出

func (*Worker) SetAfterTaskHandler

func (w *Worker) SetAfterTaskHandler(f func(task *task.Signature))

SetAfterTaskHandler 设置执行任务后执行函数

func (*Worker) SetBeforeTaskHandler

func (w *Worker) SetBeforeTaskHandler(f func(task *task.Signature))

SetBeforeTaskHandler 设置执行任务前执行函数

func (*Worker) SetErrorHandler

func (w *Worker) SetErrorHandler(f func(err error))

SetErrorHandler 设置处理错误函数

func (*Worker) SetPreConsumeHandler

func (w *Worker) SetPreConsumeHandler(f func(worker *Worker) bool)

SetPreConsumeHandler 设置预处理函数

func (*Worker) Start

func (w *Worker) Start() error

Start 启动

func (*Worker) StartSync

func (w *Worker) StartSync(errChan chan<- error)

StartSync 异步启动

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL