Documentation ¶
Index ¶
- Variables
- type Job
- type JobHandler
- type JobParam
- type JobPlan
- type JobProducer
- type Node
- func (node *Node) AddWorker(ctx context.Context, handler JobHandler) (*Worker, error)
- func (node *Node) Close(ctx context.Context) error
- func (node *Node) DispatchJob(ctx context.Context, key string, payload []byte) error
- func (node *Node) IsClosed() bool
- func (node *Node) IsShutdown() bool
- func (node *Node) JobKeys() []string
- func (node *Node) JobPayload(key string) ([]byte, bool)
- func (node *Node) NewTicker(ctx context.Context, name string, d time.Duration, opts ...TickerOption) (*Ticker, error)
- func (node *Node) NotifyWorker(ctx context.Context, key string, payload []byte) error
- func (node *Node) PoolWorkers() []*Worker
- func (node *Node) RemoveWorker(ctx context.Context, w *Worker) error
- func (node *Node) Schedule(ctx context.Context, producer JobProducer, interval time.Duration) error
- func (node *Node) Shutdown(ctx context.Context) error
- func (node *Node) StopJob(ctx context.Context, key string) error
- func (node *Node) Workers() []*Worker
- type NodeOption
- func WithAckGracePeriod(ttl time.Duration) NodeOption
- func WithClientOnly() NodeOption
- func WithJobSinkBlockDuration(d time.Duration) NodeOption
- func WithLogger(logger pulse.Logger) NodeOption
- func WithMaxQueuedJobs(max int) NodeOption
- func WithPendingJobTTL(ttl time.Duration) NodeOption
- func WithWorkerShutdownTTL(ttl time.Duration) NodeOption
- func WithWorkerTTL(ttl time.Duration) NodeOption
- type NotificationHandler
- type Ticker
- type TickerOption
- type Worker
Constants ¶
This section is empty.
Variables ¶
var ErrRequeue = errors.New("requeue")
ErrRequeue indicates that a worker failed to process a job's start or stop operation and requests the job to be requeued for another attempt.
var ErrScheduleStop = fmt.Errorf("stop")
ErrScheduleStop is returned by JobProducer.Plan to indicate that the corresponding schedule should be stopped.
Functions ¶
This section is empty.
Types ¶
type Job ¶
type Job struct { // Key is used to identify the worker that handles the job. Key string // Payload is the job payload. Payload []byte // CreatedAt is the time the job was created. CreatedAt time.Time // Worker is the worker that handles the job. Worker *Worker // NodeID is the ID of the node that created the job. NodeID string }
Job is a job that can be added to a worker.
type JobHandler ¶
type JobHandler interface { // Start starts a job. Start(job *Job) error // Stop stops a job with a given key. Stop(key string) error }
JobHandler starts and stops jobs.
type JobParam ¶ added in v0.0.3
type JobParam struct { // Key is the job key. Key string // Payload is the job payload. Payload []byte }
JobParam represents a job to start.
type JobPlan ¶ added in v0.0.3
type JobPlan struct { // Jobs to start. Start []*JobParam // Job keys to stop. Stop []string // StopAll indicates that all jobs not in Jobs should be // stopped. Stop is ignored if StopAll is true. StopAll bool }
JobPlan represents a list of jobs to start and job keys to stop.
type JobProducer ¶ added in v0.0.3
type JobProducer interface { // Name returns the name of the producer. Schedule calls Plan on // only one of the producers with identical names across all // nodes. Name() string // Plan computes the list of jobs to start and job keys to stop. // Returning ErrScheduleStop indicates that the recurring // schedule should be stopped. Plan() (*JobPlan, error) }
JobComputeFunc is the function called by the scheduler to compute jobs. It returns the list of jobs to start and job keys to stop.
type Node ¶
Node is a pool of workers.
func AddNode ¶
func AddNode(ctx context.Context, poolName string, rdb *redis.Client, opts ...NodeOption) (*Node, error)
AddNode adds a new node to the pool with the given name and returns it. The node can be used to dispatch jobs and add new workers. A node also routes dispatched jobs to the proper worker and acks the corresponding events once the worker acks the job.
The options WithClientOnly can be used to create a node that can only be used to dispatch jobs. Such a node does not route or process jobs in the background.
func (*Node) AddWorker ¶
AddWorker adds a new worker to the pool and returns it. The worker starts processing jobs immediately. handler can optionally implement the NotificationHandler interface to handle notifications.
func (*Node) Close ¶
Close stops the pool node workers and closes the Redis connection but does not stop workers running in other nodes. It requeues all the jobs run by workers of the node. One of Shutdown or Close should be called before the node is garbage collected unless it is client-only.
func (*Node) DispatchJob ¶
DispatchJob dispatches a job to the proper worker in the pool. It returns: - nil if the job is successfully dispatched and started by a worker - an error returned by the worker's start handler if the job fails to start - the context error if the context is canceled before the job is started - an error if the pool is closed or if there's a failure in adding the job
The method blocks until one of the above conditions is met.
func (*Node) IsShutdown ¶
IsShutdown returns true if the pool is shutdown.
func (*Node) JobKeys ¶ added in v1.0.1
JobKeys returns the list of keys of the jobs running in the pool.
func (*Node) JobPayload ¶ added in v1.0.1
JobPayload returns the payload of the job with the given key. It returns: - (payload, true) if the job exists and has a payload - (nil, true) if the job exists but has no payload (empty payload) - (nil, false) if the job does not exist
func (*Node) NewTicker ¶ added in v0.0.3
func (node *Node) NewTicker(ctx context.Context, name string, d time.Duration, opts ...TickerOption) (*Ticker, error)
NewTicker returns a new Ticker that behaves similarly to time.Ticker, but instead delivers the current time on the channel to only one of the nodes that invoked NewTicker with the same name.
func (*Node) NotifyWorker ¶
NotifyWorker notifies the worker that handles the job with the given key.
func (*Node) PoolWorkers ¶ added in v1.0.1
PoolWorkers returns the list of workers running in the pool.
func (*Node) RemoveWorker ¶
RemoveWorker stops the worker, removes it from the pool and requeues all its jobs.
func (*Node) Schedule ¶ added in v0.0.3
Schedule calls the producer Plan method on the given interval and starts and stops jobs accordingly. The schedule stops when the producer Plan method returns ErrScheduleStop. Plan is called on only one of the nodes that scheduled the same producer.
func (*Node) Shutdown ¶
Shutdown stops the pool workers gracefully across all nodes. It notifies all workers and waits until they are completed. Shutdown prevents the pool nodes from creating new workers and the pool workers from accepting new jobs.
type NodeOption ¶
type NodeOption func(*nodeOptions)
NodeOption is a worker creation option.
func WithAckGracePeriod ¶ added in v1.0.0
func WithAckGracePeriod(ttl time.Duration) NodeOption
WithAckGracePeriod sets the duration after which a job is made available to other workers if it wasn't started. The default is 20s.
func WithClientOnly ¶
func WithClientOnly() NodeOption
WithClientOnly sets the pool to be client only. A client-only pool only supports dispatching jobs to workers and does not start background goroutines to route jobs.
func WithJobSinkBlockDuration ¶
func WithJobSinkBlockDuration(d time.Duration) NodeOption
WithJobSinkBlockDuration sets the duration to block when reading from the job stream. The default is 5s. This option is mostly useful for testing.
func WithLogger ¶
func WithLogger(logger pulse.Logger) NodeOption
WithLogger sets the handler used to report temporary errors.
func WithMaxQueuedJobs ¶
func WithMaxQueuedJobs(max int) NodeOption
WithMaxQueuedJobs sets the maximum number of jobs that can be queued in the pool. The default is 1000.
func WithPendingJobTTL ¶
func WithPendingJobTTL(ttl time.Duration) NodeOption
WithPendingJobTTL sets the duration after which a job is made available to other workers if it wasn't started. The default is 20s.
func WithWorkerShutdownTTL ¶
func WithWorkerShutdownTTL(ttl time.Duration) NodeOption
WithWorkerShutdownTTL sets the maximum time to wait for workers to shutdown. The default is 2 minutes.
func WithWorkerTTL ¶
func WithWorkerTTL(ttl time.Duration) NodeOption
WithWorkerTTL sets the duration after which the worker is removed from the pool in case of network partitioning. The default is 10s. A lower number causes more frequent keep-alive updates from all workers.
type NotificationHandler ¶
type NotificationHandler interface { // HandleNotification handles a notification. HandleNotification(key string, payload []byte) error }
NotificationHandler handle job notifications.
type Ticker ¶ added in v0.0.3
Ticker represents a clock that periodically sends ticks to one of the pool nodes which created a ticker with the same name.
type TickerOption ¶ added in v0.0.3
type TickerOption func(*tickerOptions)
TickerOption is a worker creation option.
func WithTickerLogger ¶ added in v0.0.3
func WithTickerLogger(logger pulse.Logger) TickerOption
WithTickerLogger sets the handler used to report temporary errors.