Documentation ¶
Index ¶
- Constants
- type AckOpts
- type AckableQueue
- type AcknowledgeableQueue
- func (q *AcknowledgeableQueue) Ack(id int64) error
- func (q *AcknowledgeableQueue) AckCtx(ctx context.Context, id int64) error
- func (q *AcknowledgeableQueue) Dequeue() (Msg, error)
- func (q *AcknowledgeableQueue) DequeueCtx(ctx context.Context) (Msg, error)
- func (q *AcknowledgeableQueue) ExpireAck(id int64) error
- func (q *AcknowledgeableQueue) Len() (int, error)
- func (q *AcknowledgeableQueue) Nack(id int64) error
- func (q *AcknowledgeableQueue) NackCtx(ctx context.Context, id int64) error
- func (q *AcknowledgeableQueue) RegisterBehaviourOnFailure(fn func(msg Msg) error)
- func (q *AcknowledgeableQueue) RegisterDeadLetterQueue(dlq Enqueuer)
- func (q *AcknowledgeableQueue) RegisterOnFailureCallback(fn func(msg Msg) error)
- func (q *AcknowledgeableQueue) TryAck(id int64) error
- func (q *AcknowledgeableQueue) TryAckCtx(ctx context.Context, id int64) error
- func (q *AcknowledgeableQueue) TryDequeue() (Msg, error)
- func (q *AcknowledgeableQueue) TryDequeueCtx(ctx context.Context) (Msg, error)
- func (q *AcknowledgeableQueue) TryNack(id int64) error
- func (q *AcknowledgeableQueue) TryNackCtx(ctx context.Context, id int64) error
- type Dequeuer
- type Enqueuer
- type ErrDBLocked
- type ErrNoItemsWaiting
- type Msg
- type Queue
- func (q *Queue) Close() error
- func (q *Queue) Dequeue() (Msg, error)
- func (q *Queue) DequeueCtx(ctx context.Context) (Msg, error)
- func (q *Queue) Enqueue(item []byte) error
- func (q *Queue) EnqueueCtx(ctx context.Context, item []byte) error
- func (q *Queue) Len() (int, error)
- func (q *Queue) TryDequeue() (Msg, error)
- func (q *Queue) TryDequeueCtx(ctx context.Context) (Msg, error)
- func (q *Queue) TryEnqueue(item []byte) error
- func (q *Queue) TryEnqueueCtx(ctx context.Context, item []byte) error
- type Queuer
Constants ¶
const (
InfiniteRetries = -1
)
AckOpts represents the queue-level settings for how acknowledgement of messages is handled.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AckableQueue ¶
type AckableQueue interface { Queuer // Ack acknowledges that an item has been successfully processed. // It takes the ID of the message to acknowledge. // Returns an error if the operation fails or the message doesn't exist. Ack(id int64) error // Nack indicates that an item processing has failed and should be requeued. // It takes the ID of the message to negative acknowledge. // Returns an error if the operation fails or the message doesn't exist. Nack(id int64) error }
AckableQueue extends the DQueue interface with acknowledgement capabilities. It allows for explicit acknowledgement or negative acknowledgement of processed items.
type AcknowledgeableQueue ¶
func NewAckQueue ¶
func NewAckQueue(filePath string, opts AckOpts) (*AcknowledgeableQueue, error)
NewAckQueue creates a new ack queue. If filePath is empty, the queue will be created in memory.
func NewUniqueAckQueue ¶
func NewUniqueAckQueue(filePath string, opts AckOpts) (*AcknowledgeableQueue, error)
NewUniqueAckQueue creates a new unique ack queue.
func (*AcknowledgeableQueue) Ack ¶
func (q *AcknowledgeableQueue) Ack(id int64) error
Ack acknowledges that an item has been successfully processed. It takes the ID of the message to acknowledge and returns an error if the operation fails. It uses a background context internally.
func (*AcknowledgeableQueue) AckCtx ¶ added in v0.2.1
func (q *AcknowledgeableQueue) AckCtx(ctx context.Context, id int64) error
AckCtx acknowledges that an item has been successfully processed. It takes the ID of the message to acknowledge and returns an error if the operation fails. If the db is locked, this will block until the db is unlocked.
func (*AcknowledgeableQueue) Dequeue ¶
func (q *AcknowledgeableQueue) Dequeue() (Msg, error)
Dequeue removes and returns the next item from the queue. It blocks if the queue is empty until an item becomes available. It uses a background context internally.
func (*AcknowledgeableQueue) DequeueCtx ¶
func (q *AcknowledgeableQueue) DequeueCtx(ctx context.Context) (Msg, error)
DequeueCtx removes and returns the next item from the queue. It blocks if the queue is empty until an item becomes available or the context is cancelled.
func (*AcknowledgeableQueue) ExpireAck ¶
func (q *AcknowledgeableQueue) ExpireAck(id int64) error
ExpireAck expires the acknowledgement deadline for an item, which requeues it to the front of the queue. It takes the ID of the message to expire the acknowledgement deadline for. Returns an error if the operation fails or the message doesn't exist.
func (*AcknowledgeableQueue) Len ¶
func (q *AcknowledgeableQueue) Len() (int, error)
Len returns the number of items in the queue. It returns the count and any error encountered during the operation.
func (*AcknowledgeableQueue) Nack ¶
func (q *AcknowledgeableQueue) Nack(id int64) error
Nack indicates that an item processing has failed and should be requeued. It takes the ID of the message to negative acknowledge. Returns an error if the operation fails or the message doesn't exist. It uses a background context internally.
func (*AcknowledgeableQueue) NackCtx ¶ added in v0.2.1
func (q *AcknowledgeableQueue) NackCtx(ctx context.Context, id int64) error
NackCtx indicates that an item processing has failed and should be requeued. It takes the ID of the message to negative acknowledge and returns an error if the operation fails. If the db is locked, this will block until the db is unlocked.
func (*AcknowledgeableQueue) RegisterBehaviourOnFailure ¶
func (q *AcknowledgeableQueue) RegisterBehaviourOnFailure(fn func(msg Msg) error)
SetBehaviourOnFailure sets the behaviour on failure for the queue. This occurs if a message receives more Nacks than the max retries. It takes a function that takes a message and returns an error. You can manually requeue it, put it in a different queue, or do whatever else. The default behaviour is to drop the message.
func (*AcknowledgeableQueue) RegisterDeadLetterQueue ¶
func (q *AcknowledgeableQueue) RegisterDeadLetterQueue(dlq Enqueuer)
RegisterDeadLetterQueue sets the dead letter queue for this AcknowledgeableQueue. This is shorthand for RegisterFailureCallback -> dlq.Enqueue.
func (*AcknowledgeableQueue) RegisterOnFailureCallback ¶
func (q *AcknowledgeableQueue) RegisterOnFailureCallback(fn func(msg Msg) error)
RegisterOnFailureCallback adds a callback to the queue that is called when a message fails to acknowledge.
func (*AcknowledgeableQueue) TryAck ¶ added in v0.2.1
func (q *AcknowledgeableQueue) TryAck(id int64) error
Ack acknowledges that an item has been successfully processed. It takes the ID of the message to acknowledge and returns an error if the operation fails.
func (*AcknowledgeableQueue) TryAckCtx ¶ added in v0.2.1
func (q *AcknowledgeableQueue) TryAckCtx(ctx context.Context, id int64) error
TryAckCtx acknowledges that an item has been successfully processed. It takes the ID of the message to acknowledge and returns an error if the operation fails. This is non-blocking, and will return immediately.
func (*AcknowledgeableQueue) TryDequeue ¶
func (q *AcknowledgeableQueue) TryDequeue() (Msg, error)
TryDequeue attempts to remove and return the next item from the queue. It returns immediately, even if the queue is empty. It uses a background context internally.
func (*AcknowledgeableQueue) TryDequeueCtx ¶
func (q *AcknowledgeableQueue) TryDequeueCtx(ctx context.Context) (Msg, error)
TryDequeueCtx attempts to remove and return the next item from the queue. It returns immediately if an item is available, or waits until the context is cancelled.
func (*AcknowledgeableQueue) TryNack ¶ added in v0.2.1
func (q *AcknowledgeableQueue) TryNack(id int64) error
TryNack indicates that an item processing has failed and should be requeued. It takes the ID of the message to negative acknowledge. This is non-blocking, and will return immediately.
func (*AcknowledgeableQueue) TryNackCtx ¶ added in v0.2.1
func (q *AcknowledgeableQueue) TryNackCtx(ctx context.Context, id int64) error
TryNackCtx indicates that an item processing has failed and should be requeued. It takes the ID of the message to negative acknowledge. This is non-blocking, and will return immediately.
type Dequeuer ¶
type Dequeuer interface { // Dequeue removes and returns the next item from the queue. // It blocks if the queue is empty until an item becomes available. // Returns an error if the operation fails. Dequeue() (Msg, error) // DequeueCtx removes and returns the next item from the queue. // It blocks if the queue is empty until an item becomes available or the context is cancelled. // Returns an error if the operation fails or the context is cancelled. DequeueCtx(ctx context.Context) (Msg, error) // TryDequeue attempts to remove and return the next item from the queue. // It returns immediately, even if the queue is empty. // Returns an error if the operation fails or the queue is empty. TryDequeue() (Msg, error) // TryDequeueCtx attempts to remove and return the next item from the queue. // It returns immediately if an item is available, or waits until the context is cancelled. // Returns an error if the operation fails, the queue is empty, or the context is cancelled. TryDequeueCtx(ctx context.Context) (Msg, error) }
Dequeuer provides methods for dequeueing items from the queue.
type Enqueuer ¶
type Enqueuer interface { // Enqueue adds an item to the queue. // It returns an error if the operation fails. Enqueue(item []byte) error EnqueueCtx(ctx context.Context, item []byte) error TryEnqueue(item []byte) error TryEnqueueCtx(ctx context.Context, item []byte) error }
Enqueuer provides methods for enqueueing items to the queue.
type ErrDBLocked ¶
type ErrDBLocked struct{}
func (*ErrDBLocked) Error ¶
func (e *ErrDBLocked) Error() string
type ErrNoItemsWaiting ¶
type ErrNoItemsWaiting struct{}
func (*ErrNoItemsWaiting) Error ¶
func (e *ErrNoItemsWaiting) Error() string
type Msg ¶
type Msg struct { // ID is a unique identifier for the message within the queue. ID int64 // Item contains the actual message data. Item []byte }
Msg represents a message in the queue. It contains the message ID and the actual data.
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue represents the basic queue structure. It contains the database connection, queue name, and other necessary fields for queue operations.
func NewSimpleQueue ¶
NewSimpleQueue creates a new simple queue. If filePath is empty, the queue will be created in memory.
func NewUniqueQueue ¶
NewUniqueQueue creates a new unique queue.
func (*Queue) Close ¶
Close closes the database connection associated with the queue. It should be called when the queue is no longer needed to free up resources.
func (*Queue) DequeueCtx ¶
Dequeue blocks until an item is available or the context is canceled. If the context is canceled, it returns an empty Msg and an error.
func (*Queue) Enqueue ¶
Enqueue adds an item to the queue. It returns an error if the operation fails.
func (*Queue) EnqueueCtx ¶
EnqueueCtx adds an item to the queue. It returns an error if the operation fails or the context is cancelled.
func (*Queue) Len ¶
Len returns the number of items in the queue. It returns the count and any error encountered during the operation.
func (*Queue) TryDequeue ¶
TryDequeue attempts to remove and return the next item from the queue. It returns immediately, even if the queue is empty. It uses a background context internally.
func (*Queue) TryDequeueCtx ¶
TryDequeueCtx attempts to remove and return the next item from the queue. This is non-blocking, and will return immediately.
func (*Queue) TryEnqueue ¶
TryEnqueue attempts to add an item to the queue. It returns immediately, even if the queue is empty. It uses a background context internally.