Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue is queue processing struct. Keep a handle to this struct for sending and receiving messages.
func NewQueue ¶
func NewQueue( dbName, collectionName, connectionUri, caFile string, timeout time.Duration, ) (*Queue, error)
NewQueue creates new new queue struct. This method panics if the timeout is negative, or any of the string parameters has a length of zero.
func (*Queue) Dequeue ¶
func (q *Queue) Dequeue( ctx context.Context, ) (*QueueMessage, error)
Dequeue pulls the next item off the queue (if available). You must call the Done function on the message when you are done processing or it will timeout and be made visible again. If not entries are available, nil is returned.
func (*Queue) Enqueue ¶
Enqueue inserts a new item into the queue. This allows for an empty payload. If visibility is negative, this will panic.
func (*Queue) Listen ¶
func (q *Queue) Listen(count int) <-chan *QueueMessage
Listen returns a channel and polls the database for new messages in separate goroutine(s). The channel created does not buffer. If you call Listen, you must call StopListen on process shutdown, which will close the channel. The count param indicates the number of goroutines to spawn to query the database for new entries. If count is less than 1, this method panics.
func (*Queue) StopListen ¶
func (q *Queue) StopListen()
StopListen must be called when you are ready to shutdown the Listen call. This closes the channel returned by the Listen call and terminates the goroutines that are call Dequeue.
type QueueMessage ¶
type QueueMessage struct { Id *primitive.ObjectID `json:"id" bson:"_id"` Version *primitive.ObjectID `json:"version" bson:"version"` Visibility int `json:"visibility" bson:"visibility"` // Visibility timeout is in seconds Created *time.Time `json:"created" bson:"created"` Payload string `json:"payload" bson:"payload"` Started *time.Time `json:"started" bson:"started"` Dequeued *time.Time `json:"dequeued" bson:"dequeued"` Expire *time.Time `json:"expire" bson:"expire"` // contains filtered or unexported fields }
QueueMessage is the queue message structure.
func (*QueueMessage) Done ¶
func (m *QueueMessage) Done(ctx context.Context) error
Done tries to delete the message from the queue. If the visibility expired and the entry was updated, the version will not match and this method will return an error. The error is simply informational because the entry will be made available for another worker/processor. Reminder, this queue is for idempotent workloads.