Documentation
¶
Index ¶
- func SetSSLMode(enabled bool)
- type BufferedQueue
- type Consumer
- func (consumer *Consumer) Get() (*Package, error)
- func (consumer *Consumer) GetFailed() (*Package, error)
- func (consumer *Consumer) GetUnacked() (*Package, error)
- func (consumer *Consumer) GetUnackedLength() int64
- func (consumer *Consumer) HasUnacked() bool
- func (consumer *Consumer) MultiGet(length int) ([]*Package, error)
- func (consumer *Consumer) NoWaitGet() (*Package, error)
- func (consumer *Consumer) Quit()
- func (consumer *Consumer) RequeueWorking() error
- func (consumer *Consumer) ResetWorking() error
- type ConsumerStat
- type Observer
- type Package
- type Queue
- func (queue *Queue) AddConsumer(name string) (c *Consumer, err error)
- func (queue *Queue) Delete() error
- func (queue *Queue) GetFailedLength() int64
- func (queue *Queue) GetInputLength() int64
- func (queue *Queue) Put(payload string) error
- func (queue *Queue) RequeueFailed() error
- func (queue *Queue) ResetFailed() error
- func (queue *Queue) ResetInput() error
- type QueueStat
- type Server
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func SetSSLMode ¶
func SetSSLMode(enabled bool)
Types ¶
type BufferedQueue ¶
type BufferedQueue struct { *Queue BufferSize int Buffer chan *Package // contains filtered or unexported fields }
BufferedQueue provides an queue with buffered writes for increased performance. Only one buffered queue (per name) can be started at a time. Before terminating the queue should be flushed using FlushBuffer() to avoid package loss
func CreateBufferedQueue ¶
func CreateBufferedQueue(redisHost, redisPort, redisPassword string, redisDB int64, name string, bufferSize int) *BufferedQueue
CreateBufferedQueue returns BufferedQueue. To start writing the buffer to redis use Start(). Optimal BufferSize seems to be around 200. Works like SelectBufferedQueue for existing queues
func SelectBufferedQueue ¶
func SelectBufferedQueue(redisHost, redisPort, redisPassword string, redisDB int64, name string, bufferSize int) (queue *BufferedQueue, err error)
SelectBufferedQueue returns a BufferedQueue if a queue with the name exists
func (*BufferedQueue) FlushBuffer ¶
func (queue *BufferedQueue) FlushBuffer()
FlushBuffer tells the background writer to flush the buffer to redis
func (*BufferedQueue) Put ¶
func (queue *BufferedQueue) Put(payload string) error
Put writes the payload to the buffer
func (*BufferedQueue) Start ¶
func (queue *BufferedQueue) Start() error
Start dispatches the background writer that flushes the buffer. If there is already a BufferedQueue running it will return an error.
type Consumer ¶
Consumer are used for reading from queues
func (*Consumer) GetFailed ¶
GetFailed returns a single packages from the failed queue of this consumer
func (*Consumer) GetUnacked ¶
GetUnacked returns a single packages from the working queue of this consumer
func (*Consumer) GetUnackedLength ¶
GetUnackedLength returns the number of packages in the unacked queue
func (*Consumer) HasUnacked ¶
HasUnacked returns true if the consumers has unacked packages
func (*Consumer) NoWaitGet ¶
NoWaitGet returns a single package from the queue (returns nil, nil if no package in queue)
func (*Consumer) RequeueWorking ¶
RequeueWorking requeues all packages from working to input
func (*Consumer) ResetWorking ¶
ResetWorking deletes! all messages in the working queue of this consumer
type ConsumerStat ¶
ConsumerStat collects data about a queues consumer
type Observer ¶
Observer is a very simple implementation of an statistics observer far more complex things could be implemented with the way stats are written for now it allows basic access to throughput rates and queue size averaged over seconds, minutes and hours
func NewObserver ¶
NewObserver returns an Oberserver to monitor different statistics from redis
func (*Observer) GetAllQueues ¶
GetAllQueues returns a list of all registed queues
func (*Observer) UpdateAllStats ¶
func (observer *Observer) UpdateAllStats()
UpdateAllStats fetches stats for all queues and all their consumers
func (*Observer) UpdateQueueStats ¶
UpdateQueueStats fetches stats for one specific queue and its consumers
type Package ¶
type Package struct { Payload string CreatedAt time.Time Queue interface{} `json:"-"` Consumer *Consumer `json:"-"` Collection *[]*Package `json:"-"` Acked bool `json:"-"` }
Package provides headers and handling functions around payloads
type Queue ¶
type Queue struct { Name string // contains filtered or unexported fields }
Queue is the central element of this library. Packages can be put into or get from the queue. To read from a queue you need a consumer.
func CreateQueue ¶
CreateQueue return a queue that you can Put() or AddConsumer() to Works like SelectQueue for existing queues
func SelectQueue ¶
func SelectQueue(redisHost, redisPort, redisPassword string, redisDB int64, name string) (queue *Queue, err error)
SelectQueue returns a Queue if a queue with the name exists
func (*Queue) AddConsumer ¶
AddConsumer returns a conumser that can write from the queue
func (*Queue) Delete ¶
Delete clears all input and failed queues as well as all consumers will not proceed as long as consumers are running
func (*Queue) GetFailedLength ¶
GetFailedLength returns the number of packages in the failed queue
func (*Queue) GetInputLength ¶
GetInputLength returns the number of packages in the input queue
func (*Queue) RequeueFailed ¶
RequeueFailed moves all failed packages back to the input queue
func (*Queue) ResetFailed ¶
ResetFailed deletes all packages from the failed queue
func (*Queue) ResetInput ¶
ResetInput deletes all packages from the input queue
type QueueStat ¶
type QueueStat struct { InputSizeSecond int64 InputSizeMinute int64 InputSizeHour int64 FailSizeSecond int64 FailSizeMinute int64 FailSizeHour int64 InputRateSecond int64 InputRateMinute int64 InputRateHour int64 WorkRateSecond int64 WorkRateMinute int64 WorkRateHour int64 ConsumerStats map[string]*ConsumerStat }
QueueStat collects data about a queue