Documentation ¶
Index ¶
- Constants
- Variables
- func GetBrokerDriver[T any]() T
- func WfOptionMux(mux WFMux) func(workflow *Workflow)
- func WfOptionRecordErrorHandler(handler func(error)) func(workflow *Workflow)
- type BQClient
- func (b *BQClient) Dynamic(options ...DynamicOption) *BQClient
- func (b *BQClient) GetId() string
- func (b *BQClient) Priority(priority float64) *BQClient
- func (t BQClient) Publish(channel, topic string, payload []byte) error
- func (t BQClient) PublishAtTime(channel, topic string, payload []byte, atTime time.Time) error
- func (b *BQClient) PublishInSequential(channel, topic string, payload []byte) *SequentialCmd
- func (b *BQClient) SetId(id string) *BQClient
- func (t BQClient) Subscribe(channel, topic string, handle IConsumeHandle) (IBaseSubscribeCmd, error)
- func (t BQClient) SubscribeDelay(channel, topic string, handle IConsumeHandle) (IBaseSubscribeCmd, error)
- func (t BQClient) SubscribeSequential(channel, topic string, handle IConsumeHandle) (IBaseSubscribeCmd, error)
- func (b *BQClient) WithContext(ctx context.Context) *BQClient
- type BaseTask
- func (t *BaseTask) Execute() error
- func (t *BaseTask) ID() string
- func (t *BaseTask) OnExecute(fn func(task Task) error) *BaseTask
- func (t *BaseTask) OnRollback(fn func(task Task) error) *BaseTask
- func (t *BaseTask) Rollback() error
- func (t *BaseTask) Statement() []byte
- func (t *BaseTask) WithRecordStatement(statement []byte) *BaseTask
- type BeanqConfig
- type Broker
- func (t *Broker) AddConsumer(moodType btype.MoodType, channel, topic string, subscribe IConsumeHandle) error
- func (t *Broker) Dequeue(ctx context.Context, channel, topic string, do public.CallBack)
- func (t *Broker) Enqueue(ctx context.Context, data map[string]any) error
- func (t *Broker) Migrate(ctx context.Context, data []map[string]any) error
- func (t *Broker) Start(ctx context.Context)
- func (t *Broker) Status(ctx context.Context, channel, topic, id string) (map[string]string, error)
- func (t *Broker) WaitSignal(cancel context.CancelFunc) <-chan bool
- type Client
- type ClientOption
- type DebugLog
- type DefaultHandle
- type DelayFunc
- type DynamicOption
- type Handler
- type Health
- type History
- type IBaseCmd
- type IBaseSubscribeCmd
- type IConsumeCancel
- type IConsumeError
- type IConsumeHandle
- type Message
- type MessageM
- type MessageS
- type Mutex
- func (m *Mutex) Eval(ctx context.Context, client redis.UniversalClient, script *Script, ...) (interface{}, error)
- func (m *Mutex) ExtendContext(ctx context.Context) (bool, error)
- func (m *Mutex) LockContext(ctx context.Context) error
- func (m *Mutex) Name() string
- func (m *Mutex) UnlockContext(ctx context.Context) (bool, error)
- func (m *Mutex) Until() time.Time
- func (m *Mutex) Value() string
- type MuxClient
- type MuxOption
- func WithDriftFactor(factor float64) MuxOption
- func WithExpiry(expiry time.Duration) MuxOption
- func WithFailFast(b bool) MuxOption
- func WithGenValueFunc(genValueFunc func() (string, error)) MuxOption
- func WithRetryDelay(delay time.Duration) MuxOption
- func WithRetryDelayFunc(delayFunc DelayFunc) MuxOption
- func WithSetNXOnExtend() MuxOption
- func WithShufflePools(b bool) MuxOption
- func WithTimeoutFactor(factor float64) MuxOption
- func WithTries(tries int) MuxOption
- func WithValue(v string) MuxOption
- type OptionFunc
- type Publish
- type Queue
- type Redis
- type Script
- type SequentialCmd
- type Subscribe
- type Task
- type TaskStatus
- type UI
- type WFMux
- type Workflow
- func (w *Workflow) CurrentTask() Task
- func (w *Workflow) GetGid() string
- func (w *Workflow) Init(opts ...func(workflow *Workflow))
- func (w *Workflow) Message() *Message
- func (w *Workflow) NewTask(ids ...string) *BaseTask
- func (w *Workflow) OnRollbackResult(handler func(taskID string, err error) error) *Workflow
- func (w *Workflow) Results() []error
- func (w *Workflow) Run() (err error)
- func (w *Workflow) SetGid(gid string)
- func (w *Workflow) TrackRecord(taskID string, status TaskStatus)
- type WorkflowHandler
- type WorkflowRecord
Constants ¶
const ( ExecuteSuccess = iota + 1 ExecuteFailed RollbackSuccess RollbackFailed RollbackResultProcessFailed )
Variables ¶
var ( NilHandle = errors.New("beanq:handle is nil") NilCancel = errors.New("beanq:cancel is nil") )
consumer...
var ErrExtendFailed = bstatus.BqError("failed to extend lock")
ErrExtendFailed is the error resulting if Redsync fails to extend the lock.
var ErrFailed = bstatus.BqError("failed to acquire lock")
ErrFailed is the error resulting if Redsync fails to acquire the lock after exhausting all retries.
var ErrLockAlreadyExpired = bstatus.BqError("failed to unlock, lock was already expired")
ErrLockAlreadyExpired is the error resulting if trying to unlock the lock which already expired.
Functions ¶
func GetBrokerDriver ¶
func GetBrokerDriver[T any]() T
func WfOptionMux ¶
Types ¶
type BQClient ¶
type BQClient struct {
// contains filtered or unexported fields
}
func (*BQClient) Dynamic ¶
func (b *BQClient) Dynamic(options ...DynamicOption) *BQClient
Dynamic only support Sequential type for now.
func (BQClient) PublishAtTime ¶
func (*BQClient) PublishInSequential ¶
func (b *BQClient) PublishInSequential(channel, topic string, payload []byte) *SequentialCmd
func (BQClient) Subscribe ¶
func (t BQClient) Subscribe(channel, topic string, handle IConsumeHandle) (IBaseSubscribeCmd, error)
func (BQClient) SubscribeDelay ¶
func (t BQClient) SubscribeDelay(channel, topic string, handle IConsumeHandle) (IBaseSubscribeCmd, error)
func (BQClient) SubscribeSequential ¶
func (t BQClient) SubscribeSequential(channel, topic string, handle IConsumeHandle) (IBaseSubscribeCmd, error)
type BaseTask ¶
type BaseTask struct {
// contains filtered or unexported fields
}
BaseTask ...
func (*BaseTask) WithRecordStatement ¶
type BeanqConfig ¶
type BeanqConfig struct { Health Health `json:"health"` Broker string `json:"broker"` UI `json:"ui"` DebugLog `json:"debugLog"` Queue History `json:"history"` Redis Redis `json:"redis"` DeadLetterIdleTime time.Duration `json:"deadLetterIdle"` DeadLetterTicker time.Duration `json:"deadLetterTicker"` KeepFailedJobsInHistory time.Duration `json:"keepFailedJobsInHistory"` KeepSuccessJobsInHistory time.Duration `json:"keepSuccessJobsInHistory"` PublishTimeOut time.Duration `json:"publishTimeOut"` ConsumeTimeOut time.Duration `json:"consumeTimeOut"` MinConsumers int64 `json:"minConsumers"` JobMaxRetries int `json:"jobMaxRetries"` ConsumerPoolSize int `json:"consumerPoolSize"` }
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
func NewBroker ¶
func NewBroker(config *BeanqConfig) *Broker
func (*Broker) AddConsumer ¶
func (*Broker) WaitSignal ¶
func (t *Broker) WaitSignal(cancel context.CancelFunc) <-chan bool
type Client ¶
type Client struct { Topic string `json:"topic"` Channel string `json:"channel"` MaxLen int64 `json:"maxLen"` Retry int `json:"retry"` Priority float64 `json:"priority"` TimeToRun time.Duration `json:"timeToRun"` // contains filtered or unexported fields }
Client beanq's client
func New ¶
func New(config *BeanqConfig, options ...ClientOption) *Client
func (*Client) CheckAckStatus ¶
type ClientOption ¶
type ClientOption func(client *Client)
func WithCaptureExceptionOption ¶
func WithCaptureExceptionOption(handler func(ctx context.Context, err any)) ClientOption
type DefaultHandle ¶
type DefaultHandle struct { DoHandle func(ctx context.Context, message *Message) error DoCancel func(ctx context.Context, message *Message) error DoError func(ctx context.Context, err error) }
type DynamicOption ¶
type DynamicOption func(option *dynamicOption)
func DynamicKeyOpt ¶
func DynamicKeyOpt(key string) DynamicOption
type IBaseCmd ¶
type IBaseCmd interface {
// contains filtered or unexported methods
}
IBaseCmd BaseCmd public method
type IBaseSubscribeCmd ¶
IBaseSubscribeCmd BaseSubscribeCmd subscribe method
type IConsumeCancel ¶
type IConsumeError ¶
type IConsumeHandle ¶
type Message ¶
type Message struct { ExecuteTime time.Time `json:"executeTime"` EndTime time.Time `json:"endTime"` BeginTime time.Time `json:"beginTime"` Response any `json:"response"` Info bstatus.FlagInfo `json:"info"` Level bstatus.LevelMsg `json:"level"` Topic string `json:"topic"` Channel string `json:"channel"` Payload string `json:"payload"` AddTime string `json:"addTime"` Consumer string `json:"consumer"` RunTime string `json:"runTime"` MoodType btype.MoodType `json:"moodType"` Status bstatus.Status `json:"status"` Id string `json:"id"` Retry int `json:"retry"` TimeToRun time.Duration `json:"timeToRun"` MaxLen int64 `json:"maxLen"` Priority float64 `json:"priority"` PendingRetry int64 `json:"pendingRetry"` }
func (Message) MarshalBinary ¶
type Mutex ¶
type Mutex struct {
// contains filtered or unexported fields
}
A Mutex is a distributed mutual exclusion lock.
func (*Mutex) ExtendContext ¶
ExtendContext resets the mutex's expiry and returns the status of expiry extension.
func (*Mutex) LockContext ¶
LockContext locks m. In case it returns an error on failure, you may retry to acquire the lock by calling this method again.
func (*Mutex) UnlockContext ¶
UnlockContext unlocks m and returns the status of unlock.
type MuxClient ¶
type MuxClient struct {
// contains filtered or unexported fields
}
func NewMuxClient ¶
func NewMuxClient(client redis.UniversalClient) *MuxClient
func (*MuxClient) SetExpireTime ¶
type MuxOption ¶
type MuxOption interface {
Apply(*Mutex)
}
An MuxOption configures a mutex.
func WithDriftFactor ¶
WithDriftFactor can be used to set the clock drift factor. The default value is 0.01.
func WithExpiry ¶
WithExpiry can be used to set the expiry of a mutex to the given value. The default is 8s.
func WithFailFast ¶
WithFailFast can be used to quickly acquire and release the lock. When some Redis servers are blocking, we do not need to wait for responses from all the Redis servers response. As long as the quorum is met, we can assume the lock is acquired. The effect of this parameter is to achieve low latency, avoid Redis blocking causing Lock/Unlock to not return for a long time.
func WithGenValueFunc ¶
WithGenValueFunc can be used to set the custom value generator.
func WithRetryDelay ¶
WithRetryDelay can be used to set the amount of time to wait between retries. The default value is rand(50ms, 250ms).
func WithRetryDelayFunc ¶
WithRetryDelayFunc can be used to override default delay behavior.
func WithSetNXOnExtend ¶
func WithSetNXOnExtend() MuxOption
WithSetNXOnExtend improves extending logic to extend the key if exist and if not, tries to set a new key in redis Useful if your redises restart often and you want to reduce the chances of losing the lock
func WithShufflePools ¶
WithShufflePools can be used to shuffle Redis pools to reduce centralized access in concurrent scenarios.
func WithTimeoutFactor ¶
WithTimeoutFactor can be used to set the timeout factor. The default value is 0.05.
type Publish ¶
type Publish struct {
// contains filtered or unexported fields
}
Publish command:publish
type Redis ¶
type Redis struct { Host string `json:"host"` Port string `json:"port"` Password string `json:"password"` Prefix string `json:"prefix"` Database int `json:"database"` MaxLen int64 `json:"maxLen"` MinIdleConnections int `json:"minIdleConnections"` DialTimeout time.Duration `json:"dialTimeout"` ReadTimeout time.Duration `json:"readTimeout"` WriteTimeout time.Duration `json:"writeTimeout"` PoolTimeout time.Duration `json:"poolTimeout"` MaxRetries int `json:"maxRetries"` PoolSize int `json:"poolSize"` }
type Script ¶
Script encapsulates the source, hash and key count for a Lua script.
func NewScript ¶
NewScript returns a new script object. If keyCount is greater than or equal to zero, then the count is automatically inserted in the EVAL command argument list. If keyCount is less than zero, then the application supplies the count as the first value in the keysAndArgs argument to the Do, Send and SendHash methods.
type SequentialCmd ¶
type SequentialCmd struct {
// contains filtered or unexported fields
}
func (*SequentialCmd) Error ¶
func (s *SequentialCmd) Error() error
func (*SequentialCmd) WaitingAck ¶
func (s *SequentialCmd) WaitingAck() (*Message, error)
WaitingAck ...
type Subscribe ¶
type Subscribe struct {
// contains filtered or unexported fields
}
Subscribe command:subscribe
type TaskStatus ¶
type TaskStatus struct {
// contains filtered or unexported fields
}
func (*TaskStatus) Error ¶
func (t *TaskStatus) Error() error
func (*TaskStatus) Statement ¶
func (t *TaskStatus) Statement() string
func (*TaskStatus) Status ¶
func (t *TaskStatus) Status() string
func (*TaskStatus) String ¶
func (t *TaskStatus) String() string
type UI ¶
type UI struct { Stmt struct { Host string `json:"host"` Port string `json:"port"` User string `json:"user"` Password string `json:"password"` } GoogleAuth struct { ClientId string ClientSecret string CallbackUrl string } SendGrid struct { Key string FromName string FromAddress string } Root struct { UserName string `json:"username"` Password string `json:"password"` } `json:"root"` Issuer string `json:"issuer"` Subject string `json:"subject"` JwtKey string `json:"jwtKey"` Port string `json:"port"` ExpiresAt time.Duration `json:"expiresAt"` }
type Workflow ¶
type Workflow struct {
// contains filtered or unexported fields
}
func (*Workflow) CurrentTask ¶
func (*Workflow) OnRollbackResult ¶
OnRollbackResult handle rollback error
func (*Workflow) TrackRecord ¶
func (w *Workflow) TrackRecord(taskID string, status TaskStatus)
type WorkflowHandler ¶
type WorkflowRecord ¶
type WorkflowRecord struct {
// contains filtered or unexported fields
}
func NewWorkflowRecord ¶
func NewWorkflowRecord() *WorkflowRecord