Documentation
¶
Index ¶
- Variables
- func BrokerHookProtect(ctx context.Context, f func(ctx context.Context) error) (err error)
- func Json(v any) string
- func Protect(f func() error) (err error)
- type BatchError
- type Broker
- type BrokerAfterExit
- type BrokerAfterStart
- type BrokerBeforeExit
- type BrokerBeforeStart
- type Callback
- type CommandTask
- type Hook
- func (h *Hook) OnComplete(v func(ctx context.Context, runtime *TaskRuntime) error)
- func (h *Hook) OnContext(v func(ctx context.Context, runtime *TaskRuntime) context.Context)
- func (h *Hook) OnPush(v func(ctx context.Context, msg ...*Message) ([]*Message, error))
- func (h *Hook) OnRun(v func(ctx context.Context, runtime *TaskRuntime) error)
- type HttpTask
- func (h *HttpTask) Load(ctx context.Context, msg *Message) (err error)
- func (h *HttpTask) Message() (msg *Message, err error)
- func (h *HttpTask) Run(ctx context.Context) (result string, err error)
- func (h *HttpTask) Scan(src []byte) (err error)
- func (h *HttpTask) SetBody(data []byte) *HttpTask
- func (h *HttpTask) SetHeader(k, v string) *HttpTask
- func (h *HttpTask) SetHeaders(headers map[string]string) *HttpTask
- func (h *HttpTask) SetMethod(method string) *HttpTask
- func (h *HttpTask) TaskName() string
- type Logger
- type Message
- func (m *Message) SetData(data any) *Message
- func (m *Message) SetDelay(delay time.Duration) *Message
- func (m *Message) SetExpire(d time.Duration) *Message
- func (m *Message) SetExpiredAt(t time.Time) *Message
- func (m *Message) SetMaxRetry(retry int) *Message
- func (m *Message) SetMeta(meta Meta) *Message
- func (m *Message) SetRawData(data json.RawMessage) *Message
- func (m *Message) SetRawTask(name string, data any) (msg *Message, err error)
- func (m *Message) SetRetryRule(rule []int) *Message
- func (m *Message) SetTask(task Task) (message *Message, err error)
- func (m *Message) SetTimeout(t time.Duration) *Message
- func (m *Message) SetTraceId(traceId string) *Message
- func (m *Message) String() string
- func (m *Message) TryRetry(delay time.Duration) *Message
- type Messages
- type Meta
- type OnComplete
- type OnFail
- type OnLoad
- type OnSuccess
- type Process
- type Producer
- type RedisBroker
- func (r *RedisBroker) AfterExit(ctx context.Context) error
- func (r *RedisBroker) AfterStart(ctx context.Context) error
- func (r *RedisBroker) BeforeExit(ctx context.Context) error
- func (r *RedisBroker) BeforeStart(ctx context.Context) error
- func (r *RedisBroker) Decode(bytes []byte) (msg *Message, err error)
- func (r *RedisBroker) Encode(msg *Message) ([]byte, error)
- func (r *RedisBroker) Pop(ctx context.Context) (msg *Message, err error)
- func (r *RedisBroker) Push(ctx context.Context, msg ...*Message) (err error)
- type RedisBrokerConfig
- type Register
- type Rmq
- func (q *Rmq) Exit()
- func (q *Rmq) Push(ctx context.Context, msg ...*Message) (err error)
- func (q *Rmq) Register(task ...Task)
- func (q *Rmq) RegisterFunc(name string, callback Callback)
- func (q *Rmq) SetBroker(b Broker)
- func (q *Rmq) SetProcess(p Process)
- func (q *Rmq) StartWorker(c *WorkerConfig)
- func (q *Rmq) Tasks() map[string]*TaskInfo
- func (q *Rmq) TryRetry(ctx context.Context, msg *Message) (err error)
- func (q *Rmq) TryRun(ctx context.Context, msg *Message)
- type Task
- type TaskInfo
- type TaskName
- type TaskRuntime
- type TaskScanner
- type TaskValuer
- type Timestamp
- type WorkerConfig
Constants ¶
This section is empty.
Variables ¶
View Source
var DefaultLog = newDefaultLog()
View Source
var DefaultMeta = Meta{ RetryRule: nil, Retry: [2]int{0, 0}, Delay: 0, Timeout: 30, TraceId: "", }
View Source
var DefaultRedisBrokerConfig = RedisBrokerConfig{ Key: "rmq:queue", WaitDuration: 1 * time.Second, DelayKey: "rmq:queue:delay", DelayWaitDuration: 4 * time.Second, }
View Source
var DefaultRetryRule = []int{
60 * 1,
60 * 2,
60 * 5,
60 * 15,
60 * 60,
60 * 60 * 3,
}
View Source
var RetryMeta = Meta{ RetryRule: DefaultRetryRule, Retry: [2]int{0, 7}, Delay: 0, Timeout: 30, TraceId: "", }
Functions ¶
func BrokerHookProtect ¶
Types ¶
type BatchError ¶
type BatchError struct {
// contains filtered or unexported fields
}
func (*BatchError) HasError ¶
func (b *BatchError) HasError() bool
type BrokerAfterExit ¶
type BrokerAfterStart ¶
type BrokerBeforeExit ¶
type BrokerBeforeStart ¶
type CommandTask ¶
func NewCommandTask ¶
func NewCommandTask(shell string, command ...string) *CommandTask
func (*CommandTask) TaskName ¶
func (c *CommandTask) TaskName() string
type Hook ¶
type Hook struct {
// contains filtered or unexported fields
}
func (*Hook) OnComplete ¶
func (h *Hook) OnComplete(v func(ctx context.Context, runtime *TaskRuntime) error)
type HttpTask ¶
type HttpTask struct { Url string `json:"url"` Method string `json:"method"` Header map[string]string `json:"header,omitempty"` Body json.RawMessage `json:"body,omitempty"` // 因为json序列号,保证消息好看一点 // contains filtered or unexported fields }
func NewHttpTaskGet ¶
func NewHttpTaskJsonPost ¶
type Message ¶
type Message struct { Id string `json:"id"` Task string `json:"task"` Data json.RawMessage `json:"data"` RunAt Timestamp `json:"run_at"` // 应执行时间 ExpiredAt Timestamp `json:"expired_at"` // 过期时间 CreatedAt Timestamp `json:"created_at"` // 创建时间 Meta Meta `json:"meta,omitempty"` }
func NewBlankMsg ¶
func NewBlankMsg() *Message
func (*Message) SetMaxRetry ¶
func (*Message) SetRawData ¶
func (m *Message) SetRawData(data json.RawMessage) *Message
func (*Message) SetRawTask ¶ added in v1.1.3
func (*Message) SetRetryRule ¶
func (*Message) SetTraceId ¶
type RedisBroker ¶
type RedisBroker struct {
// contains filtered or unexported fields
}
func NewRedisBroker ¶
func NewRedisBroker(rd *redis.Client, c RedisBrokerConfig, log Logger) *RedisBroker
func (*RedisBroker) AfterStart ¶
func (r *RedisBroker) AfterStart(ctx context.Context) error
func (*RedisBroker) BeforeExit ¶
func (r *RedisBroker) BeforeExit(ctx context.Context) error
func (*RedisBroker) BeforeStart ¶
func (r *RedisBroker) BeforeStart(ctx context.Context) error
type RedisBrokerConfig ¶
type RedisBrokerConfig struct { Key string `json:"key" toml:"key" yaml:"key"` WaitDuration time.Duration `json:"wait_duration" toml:"wait_duration" yaml:"wait_duration"` DelayKey string `json:"delay_key" toml:"delay_key" yaml:"delay_key"` DelayWaitDuration time.Duration `json:"delay_wait_duration" yaml:"delay_wait_duration" json:"delay_wait_duration"` }
type Rmq ¶
type Rmq struct { // 钩子 Hook *Hook // contains filtered or unexported fields }
func (*Rmq) RegisterFunc ¶
func (*Rmq) StartWorker ¶
func (q *Rmq) StartWorker(c *WorkerConfig)
StartWorker 线上是多容器的,不用多个协程并发跑,只要加pod就行 某前理论存在丢失消息的可能,所以只能用于不重要的任务
type TaskRuntime ¶
type TaskRuntime struct { Msg *Message `json:"msg"` StartTime time.Time `json:"start_time"` EndTime time.Time `json:"end_time"` Duration time.Duration `json:"duration"` TaskError error `json:"task_error,omitempty"` // 执行错误 Error error `json:"error,omitempty"` // 最后的错误 Result string `json:"result,omitempty"` // 结果 }
func (*TaskRuntime) IsSuccess ¶
func (a *TaskRuntime) IsSuccess() bool
type TaskScanner ¶
type TaskValuer ¶
type WorkerConfig ¶
Source Files
¶
Click to show internal directories.
Click to hide internal directories.