Documentation ¶
Index ¶
- func LPOPRPUSH(cnx redis.Conn) *redis.Script
- type BaseQueue
- func (q *BaseQueue) Concat(src string) (moved int, err error)
- func (q *BaseQueue) Processor() Processor
- func (q *BaseQueue) Pull(timeout time.Duration) (payload []byte, err error)
- func (q *BaseQueue) Push(payload []byte) (err error)
- func (q *BaseQueue) SetProcessor(processor Processor)
- func (q *BaseQueue) Source() string
- type ByteQueue
- type DurableQueue
- type Processor
- type Queue
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type BaseQueue ¶
type BaseQueue struct {
// contains filtered or unexported fields
}
BaseQueue provides a basic implementation of the Queue interface. Its basic methodology is to preform updates using a Processor interface which in and of itself defines how updates can be handled.
func (*BaseQueue) Concat ¶
Takes all elements from the source queue and adds them to this one. This can be a long-running operation. If a persistent error is returned while moving things, then it will be returned and the concat will stop, though the concat operation can be safely resumed at any time.
func (*BaseQueue) Processor ¶
Source implements the Source method on the Queue interface. It functions by requesting a read-level lock from the guarding mutex and returning that value once obtained. If no processor is set, the the default FIFO implementation is returned.
func (*BaseQueue) Push ¶
Push pushes the given payload (a byte slice) into the specified keyspace by delegating into the `Processor`'s `func Push`. It obtains a connection to Redis using the pool, which is passed into the Processor, and recycles that connection after the function has returned.
If an error occurs during Pushing, it will be returned, and it can be assumed that the payload is not in Redis.
func (*BaseQueue) SetProcessor ¶
SetProcessor implements the SetProcessor method on the Queue interface. It functions by requesting write-level access from the guarding mutex and preforms the update atomically.
type ByteQueue ¶
type ByteQueue struct {
BaseQueue
}
ByteQueue represents either a FILO or FIFO queue contained in a particular Redis keyspace. It allows callers to push `[]byte` payloads, and receive them back over the `In() <-chan []byte`. It is typically used in a distributed setting, where the pusher may not always get the item back.
func NewByteQueue ¶
NewByteQueue allocates and returns a pointer to a new instance of a ByteQueue. It initializes itself using the given *redis.Pool, and the name, which refers to the keyspace wherein these values will be stored.
Internal channels are also initialized here.
type DurableQueue ¶
type DurableQueue struct { // DurableQueue extends a BaseQueue BaseQueue // contains filtered or unexported fields }
DurableQueue is an implementation of the Queue interface which takes items from a source queue and pushes them into the destination queue when Pull() is called.
func NewDurableQueue ¶
func NewDurableQueue(pool *redis.Pool, source, dest string) *DurableQueue
NewDurableQueue initializes and returns a new pointer to an instance of a DurableQueue. It is initialized with the given Redis pool, and the source and destination queues. By default the FIFO tactic is used, but a call to SetProcessor can change this in a safe fashion.
DurableQueues own no goroutines, so this method does not spwawn any goroutines or channels.
func (*DurableQueue) Dest ¶
func (q *DurableQueue) Dest() string
Dest returns the destination keyspace in Redis where pulled items end up. It first obtains a read-level lock on the member `dest` variable before returning.
func (*DurableQueue) Pull ¶
func (q *DurableQueue) Pull(timeout time.Duration) (payload []byte, err error)
Pull implements the Pull function on the Queue interface. Unlike common implementations of the Queue type, it mutates the Redis keyspace twice, by removing an item from one LIST and popping it onto another. It does so by delegating into the processor, thus blocking until the processor returns.
func (*DurableQueue) SetDest ¶
func (q *DurableQueue) SetDest(dest string) string
SetDest updates the destination where items are "pulled" to in a safe, blocking manner. It does this by first obtaining a write-level lock on the internal member variable wherein the destination is stored, updating, and then relinquishing the lock.
It returns the new destination that was just set.
type Processor ¶
type Processor interface { // Push pushes a given `payload` into the keyspace at `key` over the // given `redis.Conn`. This function should block until the item can // succesfully be confirmed to have been pushed. Push(conn redis.Conn, src string, payload []byte) (err error) // Pull pulls a given `payload` from the keyspace at `key` over the // given `redis.Conn`. This function should block until the given // timeout has elapsed, or an item is available. If the timeout has // passed, a redis.ErrNil will be returned. Pull(conn redis.Conn, src string, timeout time.Duration) (payload []byte, err error) // PullTo transfers a given payload from the source (src) keyspace to // the destination (dest) keyspace and returns the moved item in the // payload space. If an error was encountered, then it will be returned // immediately. Timeout semantisc are idential to those on Pull, unless // noted otherwise in implementation. PullTo(conn redis.Conn, src, dest string, timeout time.Duration) (payload []byte, err error) // Moves all elements from the src queue to the end of the destination // It should return a redis.ErrNil when the source queue is empty. Concat(conn redis.Conn, src, dest string) (err error) }
Processor is an interface to a type encapsulating the interaction between a queue.ByteQueue and a datastructure in Redis.
var FIFO Processor = &fifoProcessor{}
FIFO is a first in, first out implementation of the Processor interface.
var LIFO Processor = &lifoProcessor{}
FIFO is a last in, first out implementation of the Processor interface.
type Queue ¶
type Queue interface { // Source returns the keyspace in Redis from which this queue is // populated. Source() string // Push pushes the given payload (a byte slice) into the specified // keyspace by delegating into the `Processor`'s `func Push`. It obtains // a connection to Redis using the pool, which is passed into the // Processor, and recycles that connection after the function has // returned. // // If an error occurs during Pushing, it will be returned, and it can be // assumed that the payload is not in Redis. Push(payload []byte) (err error) // Pull returns the next available payload, blocking until data can be // returned. Pull(timeout time.Duration) (payload []byte, err error) // Takes all elements from the source queue and adds them to this one. This // can be a long-running operation. If a persistent error is returned while // moving things, then concat will stop, though the concat operation can // be safely resumed at any time. // // Returns the number of items successfully moved and any error that // occurred. Concat(src string) (moved int, err error) // Processor returns the processor that is being used to push and pull. // If no processor is specified, a first-in-first-out will be returned // by default. Processor() Processor // SetProcessor sets the current processor to the specified processor by // aquiring a write lock into the mutex guarding that field. The // processor will be switched over during the next iteration of a // Pull-cycle, or a call to Push. SetProcessor(processor Processor) }