Documentation ¶
Overview ¶
Package beanstalk provides a beanstalk client.
Producer ¶
The Producer is used to put jobs into tubes. It provides a connection pool:
producer, err := beanstalk.NewProducer([]string{"localhost:11300"}, beanstalk.Config{ // Multiply the list of URIs to create a larger pool of connections. Multiply: 3, }) if err != nil { // handle error } defer producer.Stop()
Putting a job in a tube is done by calling Put, which will select a random connection for its operation:
// Create the put parameters. These can be reused between Put calls. params := beanstalk.PutParams{Priority: 1024, Delay: 0, TTR: 30 * time.Second} // Put the "Hello World" message in the "mytube" tube. id, err := producer.Put(ctx, "mytube", []byte("Hello World"), params) if err != nil { // handle error }
If a Put operation fails on a connection, another connection in the pool will be selected for a retry.
Consumer ¶
The Consumer is used to reserve jobs from tubes. It provides a connection pool:
consumer, err := beanstalk.NewConsumer([]string{"localhost:11300"}, []string{"mytube"}, beanstalk.Config{ // Multiply the list of URIs to create a larger pool of connections. Multiply: 3, // NumGoroutines is the number of goroutines that the Receive method will // spin up to process jobs concurrently. NumGoroutines: 30, }) if err != nil { // handle error }
The ratio of Multiply and NumGoroutines is important. Multiply determines the size of the connection pool and NumGoroutines determines how many reserved jobs you have in-flight. If you have a limited number of connections, but a high number of reserved jobs in-flight, your TCP connection pool might experience congestion and your processing speed will suffer. Although the ratio depends on the speed by which jobs are processed, a good rule of thumb is 1:10.
Reserve jobs from the tubes specified in NewConsumer is done by calling Receive, which will reserve jobs on any of the connections in the pool:
// Call the inline function for every job that was reserved. consumer.Receive(ctx, func(ctx context.Context, job *beanstalk.Job) { // handle job if err := job.Delete(ctx); err != nil { // handle error } })
If the context passed to Receive is cancelled, Receive will finish processing the jobs it has reserved before returning.
Job ¶
When Receive offers a job the goroutine is responsible for processing that job and finishing it up. A job can either be deleted, released or buried:
// Delete a job, when processing was successful. err = job.Delete(ctx) // Release a job, putting it back in the queue for another worker to pick up. err = job.Release(ctx) // Release a job, but put it back with a custom priority and a delay before // it's offered to another worker. err = job.ReleaseWithParams(ctx, 512, 5 * time.Second) // Bury a job, when it doesn't need to be processed but needs to be kept // around for manual inspection or manual requeuing. err = job.Bury(ctx)
Conn ¶
If the Producer and Consumer abstractions are too high, then Conn provides the lower level abstraction of a single connection to a beanstalk server:
conn, err := beanstalk.Dial("localhost:11300", beanstalk.Config{})) if err != nil { // handle error } defer conn.Close() // conn.Put(...) // conn.Watch(...) // conn.Reserve(...)
Logging ¶
The Config structure offers hooks for info and error logs that allows hooking in to a custom log solution.
config := beanstalk.Config{ InfoFunc: func(message string) { log.Info(message) }, ErrorFunc: func(err error, message string) { log.WithError(err).Error(message) }, }
URIs ¶
NewProducer, NewConsumer and Dial take a URI or a list of URIs as their first argument, who can be described in various formats. In the above examples the beanstalk server was referenced by the host:port notation. This package also supports URI formats like beanstalk:// for a plaintext connection, and beanstalks:// or tls:// for encrypted connections.
In the case of encrypted connections, if no port has been specified it will default to port 11400 as opposed to the default 11300 port.
Index ¶
- Variables
- func ValidURIs(uris []string) error
- type Config
- type Conn
- func (conn *Conn) Close() error
- func (conn *Conn) Delete(ctx context.Context, id uint64) error
- func (conn *Conn) Ignore(ctx context.Context, tube string) error
- func (conn *Conn) Kick(ctx context.Context, tube string, bound int) (int64, error)
- func (conn *Conn) ListTubes(ctx context.Context) ([]string, error)
- func (conn *Conn) PeekBuried(ctx context.Context, tube string) (*Job, error)
- func (conn *Conn) PeekDelayed(ctx context.Context, tube string) (*Job, error)
- func (conn *Conn) Put(ctx context.Context, tube string, body []byte, params PutParams) (uint64, error)
- func (conn *Conn) Reserve(ctx context.Context) (*Job, error)
- func (conn *Conn) ReserveWithTimeout(ctx context.Context, timeout time.Duration) (*Job, error)
- func (conn *Conn) String() string
- func (conn *Conn) TubeStats(ctx context.Context, tube string) (TubeStats, error)
- func (conn *Conn) Watch(ctx context.Context, tube string) error
- type Consumer
- type Job
- func (job *Job) Bury(ctx context.Context) error
- func (job *Job) BuryWithPriority(ctx context.Context, priority uint32) error
- func (job *Job) Delete(ctx context.Context) error
- func (job *Job) Kick(ctx context.Context) error
- func (job *Job) Release(ctx context.Context) error
- func (job *Job) ReleaseWithParams(ctx context.Context, priority uint32, delay time.Duration) error
- func (job *Job) Touch(ctx context.Context) error
- func (job *Job) TouchAfter() time.Duration
- type Producer
- type PutParams
- type TubeStats
Constants ¶
This section is empty.
Variables ¶
var ( ErrBuried = errors.New("job was buried") ErrDeadlineSoon = errors.New("deadline soon") ErrDisconnected = errors.New("client disconnected") ErrDraining = errors.New("server is draining") ErrNotFound = errors.New("job not found") ErrTimedOut = errors.New("reserve timed out") ErrTooBig = errors.New("job too big") ErrNotIgnored = errors.New("tube not ignored") ErrTubeTooLong = errors.New("tube name too long") ErrUnexpected = errors.New("unexpected response received") )
These error may be returned by any of Conn's methods.
var ErrJobFinished = errors.New("job was already finished")
ErrJobFinished is returned when a job was already finished.
Functions ¶
Types ¶
type Config ¶ added in v1.3.0
type Config struct { // Multiply the list of URIs to create a larger pool of connections. // // The default is to have 1. Multiply int // NumGoroutines is the number of goroutines that the Receive method will // spin up to process jobs concurrently. // // The default is to spin up 10 goroutines. NumGoroutines int // ConnTimeout configures the read and write timeout of the connection. This // can be overridden by a context deadline if its value is lower. // // Note that this does not work with Reserve() and might interfere with // ReserveWithTimeout() if configured incorrectly. // // The default is to have no timeout. ConnTimeout time.Duration // ReserveTimeout is the time a consumer connection waits between reserve // attempts if the last attempt failed to reserve a job. // // The default is to wait 1 seconds. ReserveTimeout time.Duration // ReconnectTimeout is the timeout between reconnects. // // The default is to wait 3 seconds. ReconnectTimeout time.Duration // TLSConfig describes the configuration that is used when Dial() makes a // TLS connection. TLSConfig *tls.Config // InfoFunc is called to log informational messages. InfoFunc func(message string) // ErrorFunc is called to log error messages. ErrorFunc func(err error, message string) // IgnoreURIValidation skips the step of calling ValidURIs() method during initialization IgnoreURIValidation bool }
Config is used to configure a Consumer, Producer or Conn.
type Conn ¶ added in v1.3.0
type Conn struct { URI string // contains filtered or unexported fields }
Conn describes a connection to a beanstalk server.
func (*Conn) Kick ¶ added in v1.3.3
Kick one or more jobs in the specified tube. This function returns the number of jobs that were kicked.
func (*Conn) PeekBuried ¶ added in v1.3.3
PeekBuried peeks at a buried job on the specified tube and returns the job. If there are no jobs to peek at, this function will return without a job or error.
func (*Conn) PeekDelayed ¶ added in v1.4.2
PeekDelayed peeks at a delayed job on the specified tube and returns the job. If there are no jobs to peek at, this function will return without a job or error.
func (*Conn) Put ¶ added in v1.3.0
func (conn *Conn) Put(ctx context.Context, tube string, body []byte, params PutParams) (uint64, error)
Put a job in the specified tube.
func (*Conn) ReserveWithTimeout ¶ added in v1.3.0
ReserveWithTimeout tries to reserve a job and block for up to a maximum of timeout. If no job could be reserved, this function will return without a job or error.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer maintains a pool of connections and allows workers to reserve jobs on those connections.
func NewConsumer ¶
NewConsumer returns a new Consumer.
type Job ¶
type Job struct { ID uint64 Body []byte ReservedAt time.Time Stats struct { PutParams `yaml:",inline"` Tube string `yaml:"tube"` State string `yaml:"state"` Age time.Duration `yaml:"age"` TimeLeft time.Duration `yaml:"time-left"` File int `yaml:"file"` Reserves int `yaml:"reserves"` Timeouts int `yaml:"timeouts"` Releases int `yaml:"releases"` Buries int `yaml:"buries"` Kicks int `yaml:"kicks"` } // contains filtered or unexported fields }
Job describes a beanstalk job and its stats.
func (*Job) BuryWithPriority ¶
BuryWithPriority buries this job with the specified priority.
func (*Job) ReleaseWithParams ¶
ReleaseWithParams releases this job back with the specified priority and delay.
func (*Job) TouchAfter ¶ added in v1.3.0
TouchAfter returns the duration until this jobs needs to be touched for its reservation to be retained.
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer maintains a pool of connections to beanstalk servers on which it inserts jobs.
func NewProducer ¶
NewProducer returns a new Producer.
func (*Producer) IsConnected ¶ added in v1.4.0
IsConnected returns true when at least one producer in the pool is connected.
type PutParams ¶
type PutParams struct { Priority uint32 `yaml:"pri"` Delay time.Duration `yaml:"delay"` TTR time.Duration `yaml:"ttr"` }
PutParams are the parameters used to perform a Put operation.
type TubeStats ¶ added in v1.2.2
type TubeStats struct { Name string `yaml:"name"` UrgentJobs int64 `yaml:"current-jobs-urgent"` ReadyJobs int64 `yaml:"current-jobs-ready"` ReservedJobs int64 `yaml:"current-jobs-reserved"` DelayedJobs int64 `yaml:"current-jobs-delayed"` BuriedJobs int64 `yaml:"current-jobs-buried"` TotalJobs int64 `yaml:"total-jobs"` CurrentUsing int64 `yaml:"current-using"` CurrentWatching int64 `yaml:"current-watching"` CurrentWaiting int64 `yaml:"current-waiting"` Deletes int64 `yaml:"cmd-delete"` Pauses int64 `yaml:"cmd-pause-tube"` Pause time.Duration `yaml:"pause"` PauseLeft time.Duration `yaml:"pause-time-left"` }
TubeStats describe the statistics of a beanstalk tube.