pool

package
v1.0.5-rc1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 22, 2024 License: MIT Imports: 23 Imported by: 0

README

Dedicated Worker Pool

The pool package builds on top of the Pulse rmap and streaming packages to provide scalable and reliable dedicated worker pools.

Overview

A dedicated worker pool uses a consistent hashing algorithm to assign long running jobs to workers. Each job is associated with a key and each worker with a range of hashed values. The pool hashes the job key when the job is dispatched to route the job to the proper worker.

Workers can be added or removed from the pool dynamically. Jobs get automatically re-assigned to workers when the pool grows or shrinks. This makes it possible to implement auto-scaling solutions, for example based on queueing delays.

Pulse uses the Jump Consistent Hash algorithm to assign jobs to workers which provides a good balance between load balancing and worker assignment stability.

%%{init: {'themeVariables': { 'edgeLabelBackground': '#7A7A7A'}}}%%
flowchart LR
    A[Job Producer]
    subgraph Pool["<span style='margin: 0 10px;'>Routing Pool Node</span>"]
        Sink["Job Sink"]
    end
    subgraph Worker[Worker Pool Node]
        Reader
        B[Worker]
    end
    A-->|Job+Key|Sink
    Sink-.->|Job|Reader
    Reader-.->|Job|B

    classDef userCode fill:#9A6D1F, stroke:#D9B871, stroke-width:2px, color:#FFF2CC;
    classDef pulse fill:#25503C, stroke:#5E8E71, stroke-width:2px, color:#D6E9C6;

    class A,B userCode;
    class Pool,Sink,Reader,Worker pulse;

    linkStyle 0 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 1 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 2 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;

Usage

Pulse dedicated worker pools are generally valuable when workers require state which depends on the jobs they perform.

To illustrate, let's consider the scenario of a multitenant system that requires managing a collection of background tasks for each tenant. In this case, utilizing a Pulse worker pool proves to be highly beneficial. The system can create a dedicated worker pool and create one job per tenant, utilizing the unique tenant identifier as the job key. This approach ensures that only one worker handles the background task for a specific tenant at any given time. As new tenants are added or old ones are removed, jobs can be started or stopped accordingly. Similarly, workers can be added or removed based on performance requirements.

Pulse dedicated worker pools are not needed when workers are stateless and can be scaled horizontally. In such cases, any standard load balancing solution can be used.

Creating A Pool

The function AddNode is used to create a new pool node. It takes as input a name, a Redis client and a set of options.

Pool AddNode

The AddNode function returns a new pool node and an error. The pool node should be closed when it is no longer needed (see below).

The options are used to configure the pool node. The following options are available:

  • WithLogger - sets the logger to be used by the pool node.
  • WithWorkerTTL - sets the worker time-to-live (TTL) in seconds. The TTL defines the maximum delay between two health-checks before a worker is removed from the pool. The default value is 10 seconds.
  • WithPendingJobTTL - sets the pending job time-to-live (TTL) in seconds. The TTL defines the maximum delay between a worker picking up the job and successfully starting it. The default value is 20 seconds.
  • WithWorkerShutdownTTL - specifies the maximum time to wait for a worker to shutdown gracefully. The default value is 2 minutes.
  • WithMaxQueuedJobs - sets the maximum number of jobs that can be queued before the pool starts rejecting new jobs. The default value is 1000.
  • WithClientOnly - specifies that the pool node should not starts background goroutines to manage the pool and thus not allow creating workers. This option is useful when the pool is used only to dispatch jobs to workers that are created in other nodes.
  • WithJobSinkBlockDuration - sets the max poll duration for new jobs. This value is mostly used by tests to accelerate the pool shutdown process. The default value is 5 seconds.
Closing A Node

The Close method closes the pool node and releases all resources associated with it. It should be called when the node is no longer needed.

Pool Close

Note that closing a pool node does not stop remote workers. It only stops the local pool node. Remote workers can be stopped by calling the Shutdown method described below.

Shutting Down A Pool

The Shutdown method shuts down the entire pool by stopping all its workers gracefully. It should be called when the pool is no longer needed.

Pool Shutdown

See the Data Flows section below for more details on the shutdown process.

Creating A Worker

The function AddWorker is used to create a new worker. It takes as input a job handler object.

Worker AddWorker

The job handler must implement the Start and Stop methods used to start and stop jobs. The handler may also optionally implement a HandleNotification method to receive notifications.

Worker JobHandler

The AddWorker function returns a new worker and an error. Workers can be removed from pool nodes using the RemoveWorker method.

Dispatching A Job

The DispatchJob method is used to dispatch a new job to the pool. It takes as input a job key and a job payload.

Pool DispatchJob

The job key is used to route the job to the proper worker. The job payload is passed to the worker's Start method.

The DispatchJob method returns an error if the job could not be dispatched. This can happen if the pool is full or if the job key is invalid.

Notifications

Nodes can send notifications to workers using the NotifyWorker method. The method takes as input a job key and a notification payload. The notification payload is passed to the worker's HandleNotification method.

Stopping A Job

The StopJob method is used to stop a job. It takes a job key as input and returns an error if the job could not be stopped. This can happen if the job key is invalid, the node is closed or the pool shutdown.

Scheduling

The Schedule method of the Node struct can be used to schedule jobs to be dispatched or stopped on a recurring basis. The method takes as input a job producer and invokes it at the specified interval. The job producer returns a list of jobs to be started and stopped.

Schedule makes it possible to maintain a pool of jobs for example in a multi-tenant system. See the examples for more details.

Data Flows

The following sections provide additional details on the internal data flows involved in creating and using a Pulse worker pool. They are provided for informational purposes only and are not required reading for simply using the package.

Adding A New Job

The following diagram illustrates the data flow involved in adding a new job to a Pulse worker pool:

  • The producer calls DispatchJob which adds an event to the pool job stream.
  • The pool job stream is read by the pool sink running in one of the pool nodes. The routing node records the event so it can ack it later and routes the event to the proper worker stream using a consistent hashing algorithm.
  • The dedicated worker stream is read by the worker which starts the job by calling the Start method on the worker job handler. Once Start returns successfully the worker sends an event back to the original pool node.
  • Upon getting the event, the pool node acks the job with the pool job stream and removes it from its pending jobs map.
%%{ init: { 'flowchart': { 'curve': 'basis' } } }%%
%%{init: {'themeVariables': { 'edgeLabelBackground': '#7A7A7A'}}}%%
flowchart TD
    subgraph w[Worker Node]
        r[Reader]
        u[User code]
    end
    subgraph rdb[Redis]
        js(["Pool Job Stream (shared)"])
        ws(["Worker Stream (dedicated)"])
        rs(["Routing Node Stream (dedicated)"])
    end
    subgraph p[Producer Node]
        pr[User code]
        no[Client Node]
    end
    subgraph ro[Routing Node]
        ps[Pool Sink]
        nr[Routing Node Reader]
    end
    pr --1. DispatchJob--> no
    no --2. Add Job--> js
    js --3. Job--> ps
    ps --4. Add Job--> ws
    ws --5. Job--> r
    r --6. Start Job--> u
    r --7. Add Ack--> rs
    rs --7. Ack--> nr
    nr --8. Ack Add Job Event--> js
    
    classDef userCode fill:#9A6D1F, stroke:#D9B871, stroke-width:2px, color:#FFF2CC;
    classDef producer fill:#2C5A9A, stroke:#6B96C1, stroke-width:2px, color:#CCE0FF;
    classDef redis fill:#25503C, stroke:#5E8E71, stroke-width:2px, color:#D6E9C6;
    classDef background fill:#7A7A7A, color:#F2F2F2;

    class pr,u userCode;
    class pj,js,ws,rs redis;
    class no,ps,r,c,nr producer;
    class p,w,rdb,ro background; 

    linkStyle 0 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 1 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 2 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 3 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 4 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 5 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 6 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 7 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;

The worker pool uses a job stream so that jobs that do not get acknowledged in time are automatically re-queued. This is useful in case of worker failure or network partitioning. The pool sink applies the consistent hashing algorithm to the job key to determine which worker stream the job should be added to. This ensures that unhealthy workers are properly ignored when requeuing jobs.

Shutdown and Cleanup

The following diagram illustrates the data flow involved in shutting down a Pulse worker pool:

  • The producer calls Shutdown which adds a shutdown event to the pool stream.
  • Upon receving the shutdown event the pool node closes the pool stream to avoid accepting new jobs and sets a flag in the pool shutdown replicated map.
  • The pool nodes get notified and stop accepting new jobs (DispatchJob returns an error if called).
  • The pool nodes add a stop event to the worker streams for all the workers they own.
  • Upon receiving the event, the workers remove themselves from the pool workers replicated map, destroy their stream and exit. Note that any job that was enqueued before the shutdown event still gets processed.
  • Once the workers have stopped, the producer that initiated the shutdown cleans up the pool resources (jobs sink, jobs stream, replicated maps) and the pool nodes exit.
%%{ init: { 'flowchart': { 'curve': 'basis' } } }%%
%%{init: {'themeVariables': { 'edgeLabelBackground': '#7A7A7A'}}}%%

flowchart TD
    subgraph pn1[Pool Node 1]
        u[User code]
        po1[Pool 1]
        w1[Worker 1]
    end
    subgraph pn2[Pool Node 2]
        po2[Pool 2]
        w2[Worker 2]
    end
    subgraph rdb[Redis]
        sr[(Shutdown <br/> Replicated Map)]
        wr[(Worker </br/> Replicated Map)]
        ws1(["Worker 1 Stream"])
        ws2(["Worker 2 Stream"])
    end
    u[User code] --1. Shutdown--> po1[Pool 1]
    po1 --2. Set Shutdown Flag--> sr[(Shutdown <br/> Replicated Map)]
    sr --3. Shutdown Flag--> po1
    sr --3. Shutdown Flag--> po2
    po1 --4. Add Stop--> ws1
    po2 --4. Add Stop--> ws2
    ws1 --5. Stop--> w1
    ws2 --5. Stop--> w2
    w1 --6. Remove Worker--> wr
    w2 --6. Remove Worker--> wr
    w1 --7. Delete--> ws1
    w2 --7. Delete--> ws2
    wr --8. Workers Empty--> po1
    po1 --9. Delete --> sr
    po1 --10. Delete --> wr
    
    classDef userCode fill:#9A6D1F, stroke:#D9B871, stroke-width:2px, color:#FFF2CC;
    classDef producer fill:#2C5A9A, stroke:#6B96C1, stroke-width:2px, color:#CCE0FF;
    classDef redis fill:#25503C, stroke:#5E8E71, stroke-width:2px, color:#D6E9C6;
    classDef background fill:#7A7A7A, color:#F2F2F2;

    class u userCode;
    class wr,sr,ws1,ws2 redis;
    class po1,po2,w1,w2 producer;
    class rdb,pn1,pn2 background; 

    linkStyle 0 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 1 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 2 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 3 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 4 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 5 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 6 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 7 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 8 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 9 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 10 stroke:#FF8888,color:#FF8888,stroke-width:3px;
    linkStyle 11 stroke:#FF8888,color:#FF8888,stroke-width:3px;
    linkStyle 12 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 13 stroke:#FF8888,color:#FF8888,stroke-width:3px;
    linkStyle 14 stroke:#FF8888,color:#FF8888,stroke-width:3px;

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrRequeue = errors.New("requeue")

ErrRequeue indicates that a worker failed to process a job's start or stop operation and requests the job to be requeued for another attempt.

View Source
var ErrScheduleStop = fmt.Errorf("stop")

ErrScheduleStop is returned by JobProducer.Plan to indicate that the corresponding schedule should be stopped.

Functions

This section is empty.

Types

type Job

type Job struct {
	// Key is used to identify the worker that handles the job.
	Key string
	// Payload is the job payload.
	Payload []byte
	// CreatedAt is the time the job was created.
	CreatedAt time.Time
	// Worker is the worker that handles the job.
	Worker *Worker
	// NodeID is the ID of the node that created the job.
	NodeID string
}

Job is a job that can be added to a worker.

type JobHandler

type JobHandler interface {
	// Start starts a job.
	Start(job *Job) error
	// Stop stops a job with a given key.
	Stop(key string) error
}

JobHandler starts and stops jobs.

type JobParam added in v0.0.3

type JobParam struct {
	// Key is the job key.
	Key string
	// Payload is the job payload.
	Payload []byte
}

JobParam represents a job to start.

type JobPlan added in v0.0.3

type JobPlan struct {
	// Jobs to start.
	Start []*JobParam
	// Job keys to stop.
	Stop []string
	// StopAll indicates that all jobs not in Jobs should be
	// stopped.  Stop is ignored if StopAll is true.
	StopAll bool
}

JobPlan represents a list of jobs to start and job keys to stop.

type JobProducer added in v0.0.3

type JobProducer interface {
	// Name returns the name of the producer. Schedule calls Plan on
	// only one of the producers with identical names across all
	// nodes.
	Name() string
	// Plan computes the list of jobs to start and job keys to stop.
	// Returning ErrScheduleStop indicates that the recurring
	// schedule should be stopped.
	Plan() (*JobPlan, error)
}

JobComputeFunc is the function called by the scheduler to compute jobs. It returns the list of jobs to start and job keys to stop.

type Node

type Node struct {
	NodeID   string
	PoolName string
	// contains filtered or unexported fields
}

Node is a pool of workers.

func AddNode

func AddNode(ctx context.Context, poolName string, rdb *redis.Client, opts ...NodeOption) (*Node, error)

AddNode adds a new node to the pool with the given name and returns it. The node can be used to dispatch jobs and add new workers. A node also routes dispatched jobs to the proper worker and acks the corresponding events once the worker acks the job.

The options WithClientOnly can be used to create a node that can only be used to dispatch jobs. Such a node does not route or process jobs in the background.

func (*Node) AddWorker

func (node *Node) AddWorker(ctx context.Context, handler JobHandler) (*Worker, error)

AddWorker adds a new worker to the pool and returns it. The worker starts processing jobs immediately. handler can optionally implement the NotificationHandler interface to handle notifications.

func (*Node) Close

func (node *Node) Close(ctx context.Context) error

Close stops the pool node workers and closes the Redis connection but does not stop workers running in other nodes. It requeues all the jobs run by workers of the node. One of Shutdown or Close should be called before the node is garbage collected unless it is client-only.

func (*Node) DispatchJob

func (node *Node) DispatchJob(ctx context.Context, key string, payload []byte) error

DispatchJob dispatches a job to the proper worker in the pool. It returns: - nil if the job is successfully dispatched and started by a worker - an error returned by the worker's start handler if the job fails to start - the context error if the context is canceled before the job is started - an error if the pool is closed or if there's a failure in adding the job

The method blocks until one of the above conditions is met.

func (*Node) IsClosed

func (node *Node) IsClosed() bool

IsClosed returns true if the node is closed.

func (*Node) IsShutdown

func (node *Node) IsShutdown() bool

IsShutdown returns true if the pool is shutdown.

func (*Node) JobKeys added in v1.0.1

func (node *Node) JobKeys() []string

JobKeys returns the list of keys of the jobs running in the pool.

func (*Node) JobPayload added in v1.0.1

func (node *Node) JobPayload(key string) ([]byte, bool)

JobPayload returns the payload of the job with the given key. It returns: - (payload, true) if the job exists and has a payload - (nil, true) if the job exists but has no payload (empty payload) - (nil, false) if the job does not exist

func (*Node) NewTicker added in v0.0.3

func (node *Node) NewTicker(ctx context.Context, name string, d time.Duration, opts ...TickerOption) (*Ticker, error)

NewTicker returns a new Ticker that behaves similarly to time.Ticker, but instead delivers the current time on the channel to only one of the nodes that invoked NewTicker with the same name.

func (*Node) NotifyWorker

func (node *Node) NotifyWorker(ctx context.Context, key string, payload []byte) error

NotifyWorker notifies the worker that handles the job with the given key.

func (*Node) PoolWorkers added in v1.0.1

func (node *Node) PoolWorkers() []*Worker

PoolWorkers returns the list of workers running in the pool.

func (*Node) RemoveWorker

func (node *Node) RemoveWorker(ctx context.Context, w *Worker) error

RemoveWorker stops the worker, removes it from the pool and requeues all its jobs.

func (*Node) Schedule added in v0.0.3

func (node *Node) Schedule(ctx context.Context, producer JobProducer, interval time.Duration) error

Schedule calls the producer Plan method on the given interval and starts and stops jobs accordingly. The schedule stops when the producer Plan method returns ErrScheduleStop. Plan is called on only one of the nodes that scheduled the same producer.

func (*Node) Shutdown

func (node *Node) Shutdown(ctx context.Context) error

Shutdown stops the pool workers gracefully across all nodes. It notifies all workers and waits until they are completed. Shutdown prevents the pool nodes from creating new workers and the pool workers from accepting new jobs.

func (*Node) StopJob

func (node *Node) StopJob(ctx context.Context, key string) error

StopJob stops the job with the given key.

func (*Node) Workers

func (node *Node) Workers() []*Worker

Workers returns the list of workers running in the local node.

type NodeOption

type NodeOption func(*nodeOptions)

NodeOption is a worker creation option.

func WithAckGracePeriod added in v1.0.0

func WithAckGracePeriod(ttl time.Duration) NodeOption

WithAckGracePeriod sets the duration after which a job is made available to other workers if it wasn't started. The default is 20s.

func WithClientOnly

func WithClientOnly() NodeOption

WithClientOnly sets the pool to be client only. A client-only pool only supports dispatching jobs to workers and does not start background goroutines to route jobs.

func WithJobSinkBlockDuration

func WithJobSinkBlockDuration(d time.Duration) NodeOption

WithJobSinkBlockDuration sets the duration to block when reading from the job stream. The default is 5s. This option is mostly useful for testing.

func WithLogger

func WithLogger(logger pulse.Logger) NodeOption

WithLogger sets the handler used to report temporary errors.

func WithMaxQueuedJobs

func WithMaxQueuedJobs(max int) NodeOption

WithMaxQueuedJobs sets the maximum number of jobs that can be queued in the pool. The default is 1000.

func WithPendingJobTTL

func WithPendingJobTTL(ttl time.Duration) NodeOption

WithPendingJobTTL sets the duration after which a job is made available to other workers if it wasn't started. The default is 20s.

func WithWorkerShutdownTTL

func WithWorkerShutdownTTL(ttl time.Duration) NodeOption

WithWorkerShutdownTTL sets the maximum time to wait for workers to shutdown. The default is 2 minutes.

func WithWorkerTTL

func WithWorkerTTL(ttl time.Duration) NodeOption

WithWorkerTTL sets the duration after which the worker is removed from the pool in case of network partitioning. The default is 10s. A lower number causes more frequent keep-alive updates from all workers.

type NotificationHandler

type NotificationHandler interface {
	// HandleNotification handles a notification.
	HandleNotification(key string, payload []byte) error
}

NotificationHandler handle job notifications.

type Ticker added in v0.0.3

type Ticker struct {
	C <-chan time.Time
	// contains filtered or unexported fields
}

Ticker represents a clock that periodically sends ticks to one of the pool nodes which created a ticker with the same name.

func (*Ticker) Stop added in v0.0.3

func (t *Ticker) Stop()

Stop turns off a ticker. After Stop, no more ticks will be sent. Stop does not close the channel, to prevent a concurrent goroutine reading from the channel from seeing an erroneous "tick".

type TickerOption added in v0.0.3

type TickerOption func(*tickerOptions)

TickerOption is a worker creation option.

func WithTickerLogger added in v0.0.3

func WithTickerLogger(logger pulse.Logger) TickerOption

WithTickerLogger sets the handler used to report temporary errors.

type Worker

type Worker struct {
	// Unique worker ID
	ID string
	// Worker pool node where worker is running.
	Node *Node
	// Time worker was created.
	CreatedAt time.Time
	// contains filtered or unexported fields
}

Worker is a worker that handles jobs with a given payload type.

func (*Worker) Jobs

func (w *Worker) Jobs() []*Job

Jobs returns the jobs handled by the worker.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL