Documentation ¶
Index ¶
- Variables
- type DB
- type Message
- type PGMQ
- func (p *PGMQ) Archive(ctx context.Context, queue string, msgID int64) (bool, error)
- func (p *PGMQ) ArchiveBatch(ctx context.Context, queue string, msgIDs []int64) ([]bool, error)
- func (p *PGMQ) Close()
- func (p *PGMQ) CreateQueue(ctx context.Context, queue string) error
- func (p *PGMQ) Delete(ctx context.Context, queue string, msgID int64) (bool, error)
- func (p *PGMQ) DeleteBatch(ctx context.Context, queue string, msgIDs []int64) ([]bool, error)
- func (p *PGMQ) DropQueue(ctx context.Context, queue string) error
- func (p *PGMQ) Pop(ctx context.Context, queue string) (*Message, error)
- func (p *PGMQ) Read(ctx context.Context, queue string, vt int64) (*Message, error)
- func (p *PGMQ) ReadBatch(ctx context.Context, queue string, vt int64, numMsgs int64) ([]*Message, error)
- func (p *PGMQ) Send(ctx context.Context, queue string, msg map[string]any) (int64, error)
- func (p *PGMQ) SendBatch(ctx context.Context, queue string, msgs []map[string]any) ([]int64, error)
Constants ¶
This section is empty.
Variables ¶
var ( ErrNoRows = errors.New("pgmq: no rows in result set") ErrPing = errors.New("pgmq: failed to ping db") )
Functions ¶
This section is empty.
Types ¶
type PGMQ ¶
type PGMQ struct {
// contains filtered or unexported fields
}
func New ¶
New establishes a connection to Postgres given by the connString, checks connection, if check is failed, returns ErrPing, that can be retried, then creates the pgmq extension if it does not already exist.
func (*PGMQ) Archive ¶
Archive moves a message from the queue table to the archive table by its id. View messages on the archive table with sql:
select * from pgmq_<queue_name>_archive;
func (*PGMQ) ArchiveBatch ¶
ArchiveBatch moves a batch of messages from the queue table to the archive table by their ids. View messages on the archive table with sql:
select * from pgmq_<queue_name>_archive;
func (*PGMQ) CreateQueue ¶
CreateQueue creates a new queue. This sets up the queue's tables, indexes, and metadata.
func (*PGMQ) Delete ¶
Delete deletes a message from the queue by its id. This is a permanent delete and cannot be undone. If you want to retain a log of the message, use the Archive method.
func (*PGMQ) DeleteBatch ¶
DeleteBatch deletes a batch of messages from the queue by their ids. This is a permanent delete and cannot be undone. If you want to retain a log of the messages, use the ArchiveBatch method.
func (*PGMQ) DropQueue ¶
DropQueue deletes the given queue. It deletes the queue's tables, indices, and metadata. It will return an error if the queue does not exist.
func (*PGMQ) Pop ¶
Pop reads single message from the queue and deletes it at the same time. Similar to Read and ReadBatch if no messages are available an ErrNoRows is returned. Unlike these methods, the visibility timeout does not apply. This is because the message is immediately deleted.
func (*PGMQ) Read ¶
Read a single message from the queue. If the queue is empty or all messages are invisible, an ErrNoRows errors is returned. If a message is returned, it is made invisible for the duration of the visibility timeout (vt) in seconds.
func (*PGMQ) ReadBatch ¶
func (p *PGMQ) ReadBatch(ctx context.Context, queue string, vt int64, numMsgs int64) ([]*Message, error)
ReadBatch reads a specified number of messages from the queue. Any messages that are returned are made invisible for the duration of the visibility timeout (vt) in seconds. If vt is 0 it will be set to the default value, vtDefault.
If the queue is empty or all messages are invisible an ErrNoRows error is returned.