Documentation
¶
Index ¶
- Constants
- func ListQueues(ctx context.Context, querier Querier) ([]string, error)
- func ValidateQueueName(ctx context.Context, querier Querier, name string) error
- type Message
- type Querier
- type Queue
- func (q *Queue[T]) Archive(ctx context.Context, msgID int64) error
- func (q *Queue[T]) ArchiveBatch(ctx context.Context, msgIDs []int64) error
- func (q *Queue[T]) Create(ctx context.Context) error
- func (q *Queue[T]) CreatePartitioned(ctx context.Context, partitionInterval, retentionInterval int) error
- func (q *Queue[T]) CreateUnlogged(ctx context.Context) error
- func (q *Queue[T]) Delete(ctx context.Context, msgID int64) error
- func (q *Queue[T]) DeleteBatch(ctx context.Context, msgIDs []int64) error
- func (q *Queue[T]) DetachArchive(ctx context.Context) error
- func (q *Queue[T]) Drop(ctx context.Context) error
- func (q *Queue[T]) Metrics(ctx context.Context) (*QueueMetrics, error)
- func (q *Queue[T]) Pop(ctx context.Context) (*Message[T], error)
- func (q *Queue[T]) Purge(ctx context.Context) (int, error)
- func (q *Queue[T]) Read(ctx context.Context, visibilityTimeout time.Duration) (*Message[T], error)
- func (q *Queue[T]) ReadBatch(ctx context.Context, maxMessages int, visibilityTimeout time.Duration) ([]*Message[T], error)
- func (q *Queue[T]) ReadBatchWithPoll(ctx context.Context, maxMessages int, ...) ([]*Message[T], error)
- func (q *Queue[T]) ReadWithPoll(ctx context.Context, ...) (*Message[T], error)
- func (q *Queue[T]) Send(ctx context.Context, message *T) (int64, error)
- func (q *Queue[T]) SendBatch(ctx context.Context, messages []*T) ([]int64, error)
- func (q *Queue[T]) SendBatchDelayed(ctx context.Context, messages []*T, delay time.Duration) ([]int64, error)
- func (q *Queue[T]) SendDelayed(ctx context.Context, message *T, delay time.Duration) (int64, error)
- func (q *Queue[T]) SetVisibilityTimeout(ctx context.Context, msgID int64, timeout time.Duration) error
- func (q *Queue[T]) WithTx(tx pgx.Tx) *Queue[T]
- type QueueMetrics
Constants ¶
const ( // ReadLimitDefault is the default maximum number of messages to read. ReadLimitDefault = 1 // VisibilityTimeoutDefault is the default message visibility timeout. VisibilityTimeoutDefault = 30 * time.Second // PollTimeoutDefault is the default maximum time to wait for a message. PollTimeoutDefault = 5 * time.Second // PollIntervalDefault is the default time to wait between polling attempts. PollIntervalDefault = 250 * time.Millisecond )
Variables ¶
This section is empty.
Functions ¶
func ListQueues ¶
ListQueues retrieves a list of all queues in the database.
Types ¶
type Message ¶
type Message[T any] struct { ID int64 `json:"msg_id"` ReadCount int `json:"read_ct"` EnqueuedAt time.Time `json:"enqueued_at"` VisibleAt time.Time `json:"vt"` Message T `json:"message"` }
Message represents a single message in the queue with metadata.
type Querier ¶
type Querier interface { Exec(context.Context, string, ...any) (pgconn.CommandTag, error) Query(context.Context, string, ...any) (pgx.Rows, error) QueryRow(context.Context, string, ...any) pgx.Row }
Querier interface defines the required database operations.
type Queue ¶
type Queue[T any] struct { // contains filtered or unexported fields }
Queue represents a typed message queue instance.
func New ¶
New creates a new Queue instance with the specified name. The pgmq extension is automatically created if it does not exist.
func (*Queue[T]) ArchiveBatch ¶
ArchiveBatch moves multiple messages to the archive table.
func (*Queue[T]) CreatePartitioned ¶
func (q *Queue[T]) CreatePartitioned(ctx context.Context, partitionInterval, retentionInterval int) error
CreatePartitioned initializes a new partitioned queue in the database.
func (*Queue[T]) CreateUnlogged ¶
CreateUnlogged initializes a new unlogged queue in the database.
func (*Queue[T]) DeleteBatch ¶
DeleteBatch permanently removes multiple messages from the queue.
func (*Queue[T]) DetachArchive ¶
DetachArchive detaches the archive table from the queue.
func (*Queue[T]) Metrics ¶
func (q *Queue[T]) Metrics(ctx context.Context) (*QueueMetrics, error)
Metrics retrieves the current queue metrics.
func (*Queue[T]) Read ¶
Read retrieves a single message from the queue. visibilityTimeout is the time to lock the message in seconds.
func (*Queue[T]) ReadBatch ¶
func (q *Queue[T]) ReadBatch(ctx context.Context, maxMessages int, visibilityTimeout time.Duration) ([]*Message[T], error)
ReadBatch retrieves multiple messages from the queue. maxMessages is the maximum number of messages to read. visibilityTimeout is the time to lock the messages in seconds.
func (*Queue[T]) ReadBatchWithPoll ¶
func (q *Queue[T]) ReadBatchWithPoll(ctx context.Context, maxMessages int, visibilityTimeout, pollTimeout, pollInterval time.Duration) ([]*Message[T], error)
ReadBatchWithPoll retrieves a batch of messages with polling support. maxMessages is the maximum number of messages to read. visibilityTimeout is the time to lock the messages in seconds. pollTimeout is the maximum time to wait for a message in seconds. pollInterval is the time to wait between polling attempts in milliseconds.
func (*Queue[T]) ReadWithPoll ¶
func (q *Queue[T]) ReadWithPoll(ctx context.Context, visibilityTimeout, pollTimeout, pollInterval time.Duration) (*Message[T], error)
ReadWithPoll retrieves a single message with polling support. visibilityTimeout is the time to lock the message in seconds. pollTimeout is the maximum time to wait for a message in seconds. pollInterval is the time to wait between polling attempts in milliseconds.
func (*Queue[T]) SendBatchDelayed ¶
func (q *Queue[T]) SendBatchDelayed(ctx context.Context, messages []*T, delay time.Duration) ([]int64, error)
SendBatchDelayed adds multiple messages with a specified delay. delay is the time to wait before the messages become visible in seconds.
func (*Queue[T]) SendDelayed ¶
SendDelayed adds a new message with a specified delay. delay is the time to wait before the messages become visible in seconds.