Documentation
¶
Overview ¶
Disco is a flexible, idiomatic approach to a Go Disque client.
It attempts to provide two ways of using Disque: a low level API that allows more flexibility and control for users that need it and high level API for the most common usage patterns.
High Level Api ¶
The high level API attempts to provide a common usage pattern in a idiomatic Go manner, ideally it should simplify Disque usage by not having to deal with the nuts and bolts of the low level API.
Funnels ¶
Funnels are an abstraction on top of a `disco.Pool`: they provide Go channels that you can use to enqueue or receive jobs from Disque.
// See GoDoc for further details in connection Pool creation. pool, _ := NewPool(2, 5, 1, time.Second * 200) funnel := pool.NewFunnel("disco-test-queue", "other-queue") defer funnel.Close() // Enqueue jobs simply by directing them to the Outgoing channel. funnel.Outgoing <- Job{Queue: "disco-test-queue", Payload: []byte("this-is-the-payload")}: // Receive jobs from disque simply by leveraging the Incoming channel, you can leverage // common Go patterns such as a select statement to handle timeouts or other kinds of errors. select { case job, ok := <- funnel.Incoming: string(job.Payload) //=> "this-is-the-payload" { case <- time.Tick(time.Second): // Handle timeout (or not) }
A funnel will also manage the job's lifecycle for you: jobs received via the `Incoming` channel will be acknowledged in Disque automatically (you'll still have the option to put it back in the queue if need be) and jobs fetched from Disque after the funnel is closed will be automatically NAcked so as not to lose data.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Connection ¶
A Disque connection.
func NewConnection ¶
func NewConnection(cycle int) (Connection, error)
Creates a new connection to the disque nodes specified in the `DISQUE_NODES` environment variable.
func NewConnectionToNodes ¶
func NewConnectionToNodes(cycle int, nodes ...string) (Connection, error)
Creates a new connection to an array of Disque nodes.
func NewConnectionToURLS ¶
func NewConnectionToURLS(cycle int, nodes string) (Connection, error)
Creates a new connection to a list of comma-separated disque node URLs.
func (*Connection) Ack ¶
func (c *Connection) Ack(jobID string) error
Wrapper around Disque's `ACKJOB` call
func (*Connection) NAck ¶
func (c *Connection) NAck(jobID string) error
Wrapper around Disque's `NACK` call
type Funnel ¶
type Funnel struct { Queues []string Incoming chan Job Outgoing chan Job Connections *Pool FetchTimeout time.Duration FetchCount int Closed bool }
A funnel is a high-level API for Disque usage: it acts as a bridge between Disque and Go native channels, allowing for idiomatic interaction with the datastore.
func NewFunnel ¶
Creates a new funnel with a specific queue configuration and starts the appropriate goroutines to keep it's go channels synchronized with Disque.
func (*Funnel) Close ¶
func (f *Funnel) Close()
Marks the funnel as closed, which in turn closes its internal go channels gracefully.
func (*Funnel) Dispatch ¶
func (f *Funnel) Dispatch()
Listens to the `Outgoing` channel in the funnel, and dispatches any messages received to it's appropriate queue taking a connection from the funnel's pool.
This is a blocking call which is launched on a goroutine when #NewFunnel is called, you won't reguarly call it directly, but it's left as a public method to allow more flexibility of use cases.
func (*Funnel) Listen ¶
func (f *Funnel) Listen()
Takes a connection from the funnel's Disque connection pool and uses it to fetch jobs from the funnel's configured queues.
This is a blocking call which is launched on a goroutine when #NewFunnel is called, you won't reguarly call it directly, but it's left as a public method to allow more flexibility of use cases.
type Pool ¶
A Disque connection pool.
func NewPool ¶
Creates a new Pool with connections to the disque nodes specified in the `DISQUE_NODES` environment variable.
func NewPoolToNodes ¶
func NewPoolToNodes(maxIdle, maxActive, cycle int, idleTimeout time.Duration, nodes ...string) (Pool, error)
Creates a new Pool with connections to an array of Disque nodes.
func NewPoolToURLS ¶
func NewPoolToURLS(maxIdle, maxActive, cycle int, idleTimeout time.Duration, urls string) (Pool, error)
Creates a new Pool with connections to a list of comma-separated disque node URLs.