Documentation ¶
Index ¶
- Constants
- func FanIn(ctx context.Context, channels ...<-chan error) <-chan error
- func GetBackoffForNextSchedule(cronSchedule string) (next time.Duration)
- func ResolveClusterParameter(params string) map[int]string
- func SetDiscovery(protocol string, ...)
- func ValidateHandler(handler interface{}) error
- func ValidateSchedule(cronSchedule string) error
- type BaseHandler
- type DefaultRetryPolicy
- func (d *DefaultRetryPolicy) ComputeNextDelay(numAttempts int) time.Duration
- func (d *DefaultRetryPolicy) SetBackoffCoefficient(b float64) *DefaultRetryPolicy
- func (d *DefaultRetryPolicy) SetInitialInterval(t time.Duration) *DefaultRetryPolicy
- func (d *DefaultRetryPolicy) SetMaximumAttempts(ma int) *DefaultRetryPolicy
- func (d *DefaultRetryPolicy) SetMaximumInterval(t time.Duration) *DefaultRetryPolicy
- type DefaultWorkerInstance
- func (d *DefaultWorkerInstance) GetCreatedAt() int64
- func (d *DefaultWorkerInstance) GetExpiration() int64
- func (d *DefaultWorkerInstance) GetHealthStatus() HealthStatus
- func (d *DefaultWorkerInstance) GetId() string
- func (d *DefaultWorkerInstance) GetServiceName() string
- func (d *DefaultWorkerInstance) GetShardIndex() int
- func (d *DefaultWorkerInstance) UpdateExpiration(exp int64)
- func (d *DefaultWorkerInstance) UpdateHealthStatus(hs HealthStatus)
- func (d *DefaultWorkerInstance) UpdateShardIndex(i int)
- type DefaultWorkerInstanceSlice
- type HealthStatus
- type Resolver
- func (r *Resolver) Build() (err error)
- func (r *Resolver) Context() context.Context
- func (r *Resolver) Instance() DefaultWorkerInstance
- func (r *Resolver) KeepaliveInterval() time.Duration
- func (r *Resolver) UpdateShardIndex(i int)
- func (r *Resolver) UpdateState(s State)
- func (r *Resolver) WorkerRegister() (err error)
- func (r *Resolver) WorkerUnregister()
- type Retrier
- type RetryPolicy
- type State
- type Stater
- type WatchHandler
- type WatchInfo
- type Worker
- func (w *Worker) Cancel()
- func (w *Worker) ChangeState(state Stater)
- func (w *Worker) GetInstance() DefaultWorkerInstance
- func (w *Worker) Run() error
- func (w *Worker) UpdateInstance(ins DefaultWorkerInstance)
- func (w *Worker) UpdateShardIndex(index int)
- func (w *Worker) Validate() error
- func (w *Worker) Wait() error
- type WorkerConn
- type WorkerDiscovery
- type WorkerModel
- type WorkerOption
- type WorkerState
- type WorkerStateInitialized
- type WorkerStatePending
- type WorkerStateRunning
- type WorkerStateShutdown
Constants ¶
View Source
const ( HandleRunLabelRun = true HandleRunLabelUnRun = false )
View Source
const NoCronBackoff = time.Duration(-1)
NoCronBackoff 不存在cron重试
View Source
const (
NoCronExpress = ""
)
View Source
const RetryDone = time.Duration(-1)
Variables ¶
This section is empty.
Functions ¶
func GetBackoffForNextSchedule ¶
GetBackoffForNextSchedule 获取下一次任务执行时间
func ResolveClusterParameter ¶ added in v1.1.0
func SetDiscovery ¶ added in v1.1.0
func SetDiscovery(protocol string, creator func(*config.WorkerDiscoveryConfig) (WorkerDiscovery, error))
func ValidateHandler ¶
func ValidateHandler(handler interface{}) error
func ValidateSchedule ¶
ValidateSchedule 校验cron表达式格式是否准确
Types ¶
type BaseHandler ¶ added in v1.1.0
type BaseHandler interface { Before(ctx context.Context) error Do(ctx context.Context) error After(ctx context.Context) error ErrorMessage(ctx context.Context, err error) }
BaseHandler 任务处理程序接口
type DefaultRetryPolicy ¶
type DefaultRetryPolicy struct {
// contains filtered or unexported fields
}
func GetDefaultRetryPolicy ¶
func GetDefaultRetryPolicy() *DefaultRetryPolicy
func (*DefaultRetryPolicy) ComputeNextDelay ¶
func (d *DefaultRetryPolicy) ComputeNextDelay(numAttempts int) time.Duration
func (*DefaultRetryPolicy) SetBackoffCoefficient ¶
func (d *DefaultRetryPolicy) SetBackoffCoefficient(b float64) *DefaultRetryPolicy
func (*DefaultRetryPolicy) SetInitialInterval ¶
func (d *DefaultRetryPolicy) SetInitialInterval(t time.Duration) *DefaultRetryPolicy
func (*DefaultRetryPolicy) SetMaximumAttempts ¶
func (d *DefaultRetryPolicy) SetMaximumAttempts(ma int) *DefaultRetryPolicy
func (*DefaultRetryPolicy) SetMaximumInterval ¶
func (d *DefaultRetryPolicy) SetMaximumInterval(t time.Duration) *DefaultRetryPolicy
type DefaultWorkerInstance ¶ added in v1.1.0
type DefaultWorkerInstance struct { Id string `json:"id"` ServiceName string `json:"serviceName"` CreatedAt int64 `json:"createdAt"` ShardIndex int `json:"shardIndex"` HealthStatus HealthStatus `json:"healthStatus"` Expiration int64 `json:"expiration"` //秒,服务到期时间,暂只有redis集群模式下 使用 }
func (*DefaultWorkerInstance) GetCreatedAt ¶ added in v1.1.0
func (d *DefaultWorkerInstance) GetCreatedAt() int64
func (*DefaultWorkerInstance) GetExpiration ¶ added in v1.1.0
func (d *DefaultWorkerInstance) GetExpiration() int64
func (*DefaultWorkerInstance) GetHealthStatus ¶ added in v1.1.0
func (d *DefaultWorkerInstance) GetHealthStatus() HealthStatus
func (*DefaultWorkerInstance) GetId ¶ added in v1.1.0
func (d *DefaultWorkerInstance) GetId() string
func (*DefaultWorkerInstance) GetServiceName ¶ added in v1.1.0
func (d *DefaultWorkerInstance) GetServiceName() string
func (*DefaultWorkerInstance) GetShardIndex ¶ added in v1.1.0
func (d *DefaultWorkerInstance) GetShardIndex() int
func (*DefaultWorkerInstance) UpdateExpiration ¶ added in v1.1.0
func (d *DefaultWorkerInstance) UpdateExpiration(exp int64)
func (*DefaultWorkerInstance) UpdateHealthStatus ¶ added in v1.1.0
func (d *DefaultWorkerInstance) UpdateHealthStatus(hs HealthStatus)
func (*DefaultWorkerInstance) UpdateShardIndex ¶ added in v1.1.0
func (d *DefaultWorkerInstance) UpdateShardIndex(i int)
type DefaultWorkerInstanceSlice ¶ added in v1.1.0
type DefaultWorkerInstanceSlice []DefaultWorkerInstance
func (DefaultWorkerInstanceSlice) Len ¶ added in v1.1.0
func (s DefaultWorkerInstanceSlice) Len() int
func (DefaultWorkerInstanceSlice) Less ¶ added in v1.1.0
func (s DefaultWorkerInstanceSlice) Less(i, j int) bool
func (DefaultWorkerInstanceSlice) Swap ¶ added in v1.1.0
func (s DefaultWorkerInstanceSlice) Swap(i, j int)
type HealthStatus ¶ added in v1.1.0
type HealthStatus int
const ( Green HealthStatus = 0 Yellow HealthStatus = 1 Red HealthStatus = 2 )
type Resolver ¶ added in v1.1.0
type Resolver struct {
// contains filtered or unexported fields
}
func NewResolver ¶ added in v1.1.0
func NewResolver(worker *Worker, discovery WorkerDiscovery, shardingTotalCount int) *Resolver
func (*Resolver) Instance ¶ added in v1.1.0
func (r *Resolver) Instance() DefaultWorkerInstance
func (*Resolver) KeepaliveInterval ¶ added in v1.1.0
func (*Resolver) UpdateShardIndex ¶ added in v1.1.0
func (*Resolver) UpdateState ¶ added in v1.1.0
func (*Resolver) WorkerRegister ¶ added in v1.1.0
func (*Resolver) WorkerUnregister ¶ added in v1.1.0
func (r *Resolver) WorkerUnregister()
type RetryPolicy ¶
type State ¶ added in v1.1.0
type State struct {
Instances []DefaultWorkerInstance
}
type WatchHandler ¶ added in v1.1.0
type WatchHandler interface { BaseHandler Watch(info WatchInfo) }
type WatchInfo ¶ added in v1.1.0
type WatchInfo struct { CurrentInstance DefaultWorkerInstance Param string ClusterInstances []DefaultWorkerInstance }
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker 任务核心类
func NewWorker ¶
func NewWorker(ctx context.Context, cfg config.WorkerConfig, hd BaseHandler, opt ...WorkerOption) *Worker
NewWorker 获取任务类
func (*Worker) ChangeState ¶
func (*Worker) GetInstance ¶ added in v1.1.0
func (w *Worker) GetInstance() DefaultWorkerInstance
GetInstance
func (*Worker) UpdateInstance ¶ added in v1.1.0
func (w *Worker) UpdateInstance(ins DefaultWorkerInstance)
GetWorkerInstance
func (*Worker) UpdateShardIndex ¶ added in v1.1.0
GetWorkerInstance
type WorkerConn ¶ added in v1.1.0
type WorkerDiscovery ¶ added in v1.1.0
type WorkerDiscovery interface { Register(instance DefaultWorkerInstance, w WorkerConn, shardingTotalCount int) error Reset(instance DefaultWorkerInstance, w WorkerConn, shardingTotalCount int) error Unregister(instance DefaultWorkerInstance) error Build(w WorkerConn) error Keepalive(w WorkerConn) (HealthStatus, error) }
func GetDiscovery ¶ added in v1.1.0
func GetDiscovery(wdc *config.WorkerDiscoveryConfig) (WorkerDiscovery, error)
type WorkerModel ¶ added in v1.1.0
type WorkerModel int
const ( Local WorkerModel = 1 Cluster WorkerModel = 2 )
type WorkerOption ¶
type WorkerOption interface {
// contains filtered or unexported methods
}
func WithModel ¶ added in v1.1.0
func WithModel(model WorkerModel) WorkerOption
WithModel worker执行模式
func WithParams ¶ added in v1.1.0
func WithParams(params string) WorkerOption
WithParams handle执行时可传递的参数
func WithServiceName ¶ added in v1.1.0
func WithServiceName(sn string) WorkerOption
WithModel worker执行模式
func WithShardingTotalCount ¶ added in v1.1.0
func WithShardingTotalCount(count int) WorkerOption
WithShardingTotalCount worker分片数
type WorkerState ¶
type WorkerState int
const ( // Initialized worker初始化完成 Initialized WorkerState = iota // Pending 表示worker已创建,但未开始处理handler Pending // Running 表示worker已绑定一个process,正在处理 Running // Shutdown 表示worker已结束 Shutdown )
type WorkerStateInitialized ¶
type WorkerStateInitialized struct {
// contains filtered or unexported fields
}
func (*WorkerStateInitialized) Handle ¶
func (s *WorkerStateInitialized) Handle()
func (*WorkerStateInitialized) NextStage ¶
func (s *WorkerStateInitialized) NextStage()
type WorkerStatePending ¶
type WorkerStatePending struct {
// contains filtered or unexported fields
}
func (*WorkerStatePending) Handle ¶
func (s *WorkerStatePending) Handle()
func (*WorkerStatePending) NextStage ¶
func (s *WorkerStatePending) NextStage()
type WorkerStateRunning ¶
type WorkerStateRunning struct {
// contains filtered or unexported fields
}
func (*WorkerStateRunning) Handle ¶
func (s *WorkerStateRunning) Handle()
func (*WorkerStateRunning) NextStage ¶
func (s *WorkerStateRunning) NextStage()
type WorkerStateShutdown ¶
type WorkerStateShutdown struct {
// contains filtered or unexported fields
}
func (*WorkerStateShutdown) Handle ¶
func (s *WorkerStateShutdown) Handle()
func (*WorkerStateShutdown) NextStage ¶
func (s *WorkerStateShutdown) NextStage()
Source Files ¶
Click to show internal directories.
Click to hide internal directories.