Documentation ¶
Index ¶
- Constants
- Variables
- func ConfigureAPIServer(options APIOptions)
- func RegisterAPIEndpoints(mux *http.ServeMux)
- func StartAPIServer(port int)
- func StopAPIServer()
- type APIOptions
- type Args
- type EnqueueData
- type EnqueueOptions
- type Fetcher
- type JobFunc
- type JobStatus
- type Manager
- func (m *Manager) AddBeforeStartHooks(hooks ...func())
- func (m *Manager) AddDuringDrainHooks(hooks ...func())
- func (m *Manager) AddRetriesExhaustedHandlers(handlers ...RetriesExhaustedFunc)
- func (m *Manager) AddWorker(queue string, concurrency int, job JobFunc, mids ...MiddlewareFunc)
- func (m *Manager) GetRedisClient() *redis.Client
- func (m *Manager) GetRetries(page uint64, pageSize int64, match string) (Retries, error)
- func (m *Manager) GetStats() (Stats, error)
- func (m *Manager) Producer() *Producer
- func (m *Manager) Run()
- func (m *Manager) SetRetriesExhaustedHandlers(handlers ...RetriesExhaustedFunc)
- func (m *Manager) Stop()
- type MiddlewareFunc
- type Middlewares
- type Msg
- type Options
- type Producer
- func (p *Producer) Enqueue(queue, class string, args interface{}) (string, error)
- func (p *Producer) EnqueueAt(queue, class string, at time.Time, args interface{}) (string, error)
- func (p *Producer) EnqueueIn(queue, class string, in float64, args interface{}) (string, error)
- func (p *Producer) EnqueueWithOptions(queue, class string, args interface{}, opts EnqueueOptions) (string, error)
- func (p *Producer) GetRedisClient() *redis.Client
- type Retries
- type RetriesExhaustedFunc
- type RetryOptions
- type Stats
Constants ¶
const ( // DefaultRetryMax is default for max number of retries for a job DefaultRetryMax = 25 // RetryTimeFormat is default for retry time format RetryTimeFormat = "2006-01-02 15:04:05 MST" )
const (
// NanoSecondPrecision is a constant for the number of nanoseconds in a second
NanoSecondPrecision = 1000000000.0
)
Variables ¶
var Logger = log.New(os.Stdout, "go-workers2: ", log.Ldate|log.Lmicroseconds)
Logger is the default go-workers2 logger, only used here in this file. TODO: remove this
Functions ¶
func ConfigureAPIServer ¶
func ConfigureAPIServer(options APIOptions)
ConfigureAPIServer allows global API server configuration with the given options
func RegisterAPIEndpoints ¶
RegisterAPIEndpoints sets up API server endpoints
Types ¶
type APIOptions ¶
APIOptions contains the set of configuration options for the global api
type Args ¶
type Args struct {
// contains filtered or unexported fields
}
Args is the set of parameters for a message
type EnqueueData ¶
type EnqueueData struct { Queue string `json:"queue,omitempty"` Class string `json:"class"` Args interface{} `json:"args"` Jid string `json:"jid"` EnqueuedAt float64 `json:"enqueued_at"` EnqueueOptions }
EnqueueData stores data and configuration for new work
type EnqueueOptions ¶
type EnqueueOptions struct { RetryCount int `json:"retry_count,omitempty"` RetryMax int `json:"retry_max,omitempty"` Retry bool `json:"retry,omitempty"` At float64 `json:"at,omitempty"` RetryOptions RetryOptions `json:"retry_options,omitempty"` }
EnqueueOptions stores configuration for new work
type Fetcher ¶
type Fetcher interface { Queue() string Fetch() Acknowledge(*Msg) Ready() chan bool Messages() chan *Msg Close() Closed() bool }
Fetcher is an interface for managing work messages
type JobFunc ¶
JobFunc is a message processor
func LogMiddleware ¶
LogMiddleware is the default logging middleware
func NopMiddleware ¶
NopMiddleware does nothing
func RetryMiddleware ¶
RetryMiddleware middleware that allows retries for jobs failures
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager coordinates work, workers, and signaling needed for job processing
func NewManager ¶
NewManager creates a new manager with provide options
func NewManagerWithRedisClient ¶
NewManagerWithRedisClient creates a new manager with provide options and pre-configured Redis client
func (*Manager) AddBeforeStartHooks ¶
func (m *Manager) AddBeforeStartHooks(hooks ...func())
AddBeforeStartHooks adds functions to be executed before the manager starts
func (*Manager) AddDuringDrainHooks ¶
func (m *Manager) AddDuringDrainHooks(hooks ...func())
AddDuringDrainHooks adds function to be execute during a drain operation
func (*Manager) AddRetriesExhaustedHandlers ¶
func (m *Manager) AddRetriesExhaustedHandlers(handlers ...RetriesExhaustedFunc)
AddRetriesExhaustedHandlers adds function(s) to be executed when retries are exhausted for a job.
func (*Manager) AddWorker ¶
func (m *Manager) AddWorker(queue string, concurrency int, job JobFunc, mids ...MiddlewareFunc)
AddWorker adds a new job processing worker
func (*Manager) GetRedisClient ¶
func (m *Manager) GetRedisClient() *redis.Client
GetRedisClient returns the Redis client used by the manager
func (*Manager) GetRetries ¶
GetRetries returns the set of retry jobs for the manager
func (*Manager) Producer ¶
Producer creates a new work producer with configuration identical to the manager
func (*Manager) Run ¶
func (m *Manager) Run()
Run starts all workers under this Manager and blocks until they exit.
func (*Manager) SetRetriesExhaustedHandlers ¶
func (m *Manager) SetRetriesExhaustedHandlers(handlers ...RetriesExhaustedFunc)
SetRetriesExhaustedHandlers sets function(s) that will be sequentially executed when retries are exhausted for a job.
type MiddlewareFunc ¶
MiddlewareFunc is an extra function on the processing pipeline
type Middlewares ¶
type Middlewares []MiddlewareFunc
Middlewares contains the lists of all configured middleware functions
func DefaultMiddlewares ¶
func DefaultMiddlewares() Middlewares
DefaultMiddlewares creates the default middleware pipeline
func NewMiddlewares ¶
func NewMiddlewares(mids ...MiddlewareFunc) Middlewares
NewMiddlewares creates the processing pipeline given the list of middleware funcs
func (Middlewares) Append ¶
func (m Middlewares) Append(mid MiddlewareFunc) Middlewares
Append adds middleware to the end of the processing pipeline
func (Middlewares) Prepend ¶
func (m Middlewares) Prepend(mid MiddlewareFunc) Middlewares
Prepend adds middleware to the front of the processing pipeline
type Msg ¶
type Msg struct {
// contains filtered or unexported fields
}
Msg is the struct for job data (parameters and metadata)
func (*Msg) OriginalJson ¶
OriginalJson returns the original JSON message
type Options ¶
type Options struct { ProcessID string Namespace string PollInterval time.Duration Database int Password string PoolSize int // Provide one of ServerAddr or (SentinelAddrs + RedisMasterName) ServerAddr string SentinelAddrs string RedisMasterName string RedisTLSConfig *tls.Config // Optional display name used when displaying manager stats ManagerDisplayName string // Log Logger *log.Logger // contains filtered or unexported fields }
Options contains the set of configuration options for a manager and/or producer
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer is used to enqueue new work
func NewProducer ¶
NewProducer creates a new producer with the given options
func NewProducerWithRedisClient ¶
NewProducerWithRedisClient creates a new producer with the given options and Redis client
func (*Producer) EnqueueWithOptions ¶
func (p *Producer) EnqueueWithOptions(queue, class string, args interface{}, opts EnqueueOptions) (string, error)
EnqueueWithOptions enqueues new work for processing with the given options
func (*Producer) GetRedisClient ¶
func (p *Producer) GetRedisClient() *redis.Client
GetRedisClient returns the Redis client used by the producer Deprecated: the Redis client is an internal implementation and access will be removed
type Retries ¶
type Retries struct { TotalRetryCount int64 `json:"total_retry_count"` RetryJobs []*Msg `json:"retry_jobs"` }
Retries stores retry information
type RetriesExhaustedFunc ¶
RetriesExhaustedFunc gets executed when retry attempts have been exhausted.
type RetryOptions ¶
type Stats ¶
type Stats struct { Name string `json:"manager_name"` Processed int64 `json:"processed"` Failed int64 `json:"failed"` Jobs map[string][]JobStatus `json:"jobs"` Enqueued map[string]int64 `json:"enqueued"` RetryCount int64 `json:"retry_count"` }
Stats containts current stats for a manager