Documentation ¶
Overview ¶
Package singu defines queue message struct and queue API.
Index ¶
- Constants
- Variables
- func UniqueId() string
- type IQueue
- type InmemQueue
- func (q *InmemQueue) Destroy()
- func (q *InmemQueue) EphemeralSize() (int, error)
- func (q *InmemQueue) EphemeralStorageCapacity() (int, error)
- func (q *InmemQueue) Finish(id string) error
- func (q *InmemQueue) Init() error
- func (q *InmemQueue) IsEphemeralStorageEnabled() bool
- func (q *InmemQueue) Name() string
- func (q *InmemQueue) OrphanMessages(numSeconds, numMessages int) ([]*QueueMessage, error)
- func (q *InmemQueue) Queue(msg *QueueMessage) (*QueueMessage, error)
- func (q *InmemQueue) QueueSize() (int, error)
- func (q *InmemQueue) QueueStorageCapacity() (int, error)
- func (q *InmemQueue) Requeue(id string, silent bool) (*QueueMessage, error)
- func (q *InmemQueue) Take() (*QueueMessage, error)
- type QueueMessage
Constants ¶
const (
// SizeNotSupported is returned if queue implementation does not support counting number of messages in storage
SizeNotSupported = -1
)
const (
// Version of singu
Version = "0.1.1"
)
Variables ¶
var ( // ErrorOperationNotSupported is returned when the queue implementation does not support the invoked operation ErrorOperationNotSupported = errors.New("operation not supported") // ErrorQueueIsFull is returned when queue storage is full and can not accept any more message ErrorQueueIsFull = errors.New("queue storage is full") // ErrorEphemeralIsFull is returned when ephemeral storage is full and can not accept any more message ErrorEphemeralIsFull = errors.New("ephemeral storage is full") )
Functions ¶
Types ¶
type IQueue ¶
type IQueue interface { // Name returns queue's name. Name() string // QueueStorageCapacity returns max number of message queue storage can hold, or SizeNotSupported if queue storage has unlimited capacity. QueueStorageCapacity() (int, error) // EphemeralStorageCapacity returns max number of message ephemeral storage can hold, or SizeNotSupported if ephemeral storage has unlimited capacity. EphemeralStorageCapacity() (int, error) // IsEphemeralStorageEnabled returns true if ephemeral storage is supported, false otherwise. IsEphemeralStorageEnabled() bool // Queue enqueues a message: put the message to the tail of queue storage. // This function returns the enqueued QueueMessage with Id and QueueTimestamp fields filled. Queue(msg *QueueMessage) (*QueueMessage, error) // Requeue moves the enqueued message from ephemeral back to queue storage. // - id: id of the message to be re-queued // - silent: if true, message's requeue count and queue timestamp will not be updated; if false, message's requeue count is increased and queue timestamp is updated // // This function returns the enqueued QueueMessage with Id and QueueTimestamp fields filled. // // Notes: // - message is put to head or tail of queue storage depending on queue implementation Requeue(id string, silent bool) (*QueueMessage, error) // Finish is called to signal that the message can now be removed from ephemeral storage. Finish(id string) error // Take dequeues a message: move a message from the head of queue storage to ephemeral storage and return the message. // Nil is returned if queue storage is empty. Take() (*QueueMessage, error) // OrphanMessages returns all messages that have been staying in ephemeral storage for more than a specific number of seconds. // - numSeconds: messages older than <numSeconds> will be returned // - numMessages: limit number of returned messages, value less than or equal to zero means 'no limit' // // Note: order of returned messages depends on queue implementation OrphanMessages(numSeconds, numMessages int) ([]*QueueMessage, error) // QueueSize returns number messages currently in queue storage. QueueSize() (int, error) // EphemeralSize returns number messages currently in ephemeral storage. EphemeralSize() (int, error) }
IQueue defines API to access queue messages.
Queue implementation:
- Queue storage to store queue messages. Messages are put to the tail and taken from the head of queue storage in FIFO manner.
- Messages taken from queue storage are temporarily stored in ephemeral storage until Finish or Requeue is called.
- Ephemeral storage is optional, depends on queue implementation.
Queue usage flow:
- Create a IQueue instance.
- Call IQueue.queue(msg) to put messages to queue.
- Call IQueue.take() to take messages from queue.
- Do something with the message.
- When done, call IQueue.finish(id)
- If not done and the message needs to be re-queued, call IQueue.requeue(id, true/false) to put the message back to queue.
func NewInmemQueue ¶
func NewInmemQueue(name string, queueCapacity int, ephemeralDisabled bool, ephemeralCapacity int) IQueue
NewInmemQueue creates a new InmemQueue instance.
- name: queue's name
- queueCapacity: if zero or negative queue storage has unlimited capacity; otherwise number of messages can be stored in queue storage is capped by the specified number
- ephemeralCapacity: if zero or negative ephemeral storage has unlimited capacity; otherwise ephemeral storage is capped by the specified number
type InmemQueue ¶
type InmemQueue struct {
// contains filtered or unexported fields
}
InmemQueue is in-memory queue implementation.
- If queue message's id is not set, this queue implementation will assign one. Otherwise, the pre-set message id is used.
func (*InmemQueue) EphemeralSize ¶
func (q *InmemQueue) EphemeralSize() (int, error)
EphemeralSize implements IQueue.EphemeralSize
func (*InmemQueue) EphemeralStorageCapacity ¶
func (q *InmemQueue) EphemeralStorageCapacity() (int, error)
EphemeralStorageCapacity implements IQueue.EphemeralStorageCapacity
func (*InmemQueue) Finish ¶
func (q *InmemQueue) Finish(id string) error
Finish implements IQueue.Finish
func (*InmemQueue) IsEphemeralStorageEnabled ¶
func (q *InmemQueue) IsEphemeralStorageEnabled() bool
IsEphemeralStorageEnabled implements IQueue.IsEphemeralStorageEnabled
func (*InmemQueue) OrphanMessages ¶
func (q *InmemQueue) OrphanMessages(numSeconds, numMessages int) ([]*QueueMessage, error)
OrphanMessages implements IQueue.OrphanMessages
func (*InmemQueue) Queue ¶
func (q *InmemQueue) Queue(msg *QueueMessage) (*QueueMessage, error)
Queue implements IQueue.Queue
func (*InmemQueue) QueueSize ¶
func (q *InmemQueue) QueueSize() (int, error)
QueueSize implement IQueue.QueueSize
func (*InmemQueue) QueueStorageCapacity ¶
func (q *InmemQueue) QueueStorageCapacity() (int, error)
QueueStorageCapacity implements IQueue.QueueStorageCapacity
func (*InmemQueue) Requeue ¶
func (q *InmemQueue) Requeue(id string, silent bool) (*QueueMessage, error)
Requeue implements IQueue.Requeue
func (*InmemQueue) Take ¶
func (q *InmemQueue) Take() (*QueueMessage, error)
Take implements IQueue.Take
type QueueMessage ¶
type QueueMessage struct { Id string `json:"id"` // message's unique id Timestamp time.Time `json:"time"` // message's creation timestamp QueueTimestamp time.Time `json:"qtime"` // message's last-queued timestamp, maintained by queue implementation TakenTimestamp time.Time `json:"ttime"` // message's taken timestamp, maintained by queue implementation NumRequeues int `json:"num_requeues"` // how many times message has been re-queued?, maintained by queue implementations Payload []byte `json:"payload"` // message's payload }
QueueMessage represents a queue message.
func CloneQueueMessage ¶
func CloneQueueMessage(msg QueueMessage) QueueMessage
CloneQueueMessage clones a QueueMessage instance
func NewQueueMessage ¶
func NewQueueMessage(payload []byte) *QueueMessage
NewQueueMessage creates a new QueueMessage instance with provided payload