Documentation ¶
Overview ¶
Package pgqueue implements a durable, at-least-once, optionally ordered message queue on top of PostgreSQL.
Index ¶
- Constants
- Variables
- type AcceptFunc
- type AcceptQueriesFunc
- type Ack
- type ConsumeFunc
- type Delivery
- type DeliveryIterator
- type DeliveryRows
- type DeliveryRowsIterator
- type GetHandler
- type HandleFunc
- type NextRowFunc
- type OrderGuarantee
- type PQSubscriptionDriver
- func (drv PQSubscriptionDriver) FetchPendingDeliveries(ctx context.Context, yield func(Delivery)) error
- func (drv PQSubscriptionDriver) InsertSubscription(ctx context.Context) error
- func (drv PQSubscriptionDriver) ListenForDeliveries(ctx context.Context) (accept AcceptFunc, close func() error, err error)
- type Panic
- type QueryWithArgs
- type QueryWithArgsIterator
- type SubscriptionDriver
Constants ¶
const MarkAsDeliveredSQL string = `
deliveries = deliveries + 1,
last_delivered_at = NOW() AT TIME ZONE 'UTC'
`
const UpdateLastAckSQL string = `
last_ack_at = NOW() AT TIME ZONE 'UTC'
`
Variables ¶
var ( Ordered = OrderGuarantee{/* contains filtered or unexported fields */} Unordered = OrderGuarantee{/* contains filtered or unexported fields */} )
var ErrRequeued = errors.New("a message was requeued; redelivery not guaranteed unless consuming starts again")
Functions ¶
This section is empty.
Types ¶
type AcceptQueriesFunc ¶
type AcceptQueriesFunc = func(context.Context, func(QueryWithArgs)) error
type ConsumeFunc ¶
type ConsumeFunc = func(context.Context, GetHandler) error
func Subscribe ¶
func Subscribe(ctx context.Context, driver SubscriptionDriver) (consume ConsumeFunc, err error)
Subscribe creates a subscription and returns a function to consume from it.
A published message will be copied to all existing subscriptions at the time, even if they aren't any active consumers from it.
It depends on the provided SubscriptionDriver how message delivery for concurrent consumers to the same subscription behaves.
type Delivery ¶
type Delivery struct { // Unwrap unwraps the delivery as it comes from the queue into a value // that a handler can use. Unwrap func(into interface{}) error OK func(context.Context) error Requeue func(context.Context) error }
A Delivery is an attempted delivery of a message.
type DeliveryIterator ¶
func NewDeliveryIterator ¶
type DeliveryRows ¶
DeliveryRows is a row for a delivery with a transaction to ACK it.
type DeliveryRowsIterator ¶
type DeliveryRowsIterator struct { Next coro.Resume Yielded DeliveryRows Returned error }
func NewDeliveryRowsIterator ¶
func NewDeliveryRowsIterator(g func(func() error), f func(yield func(DeliveryRows)) error, options ...coro.SetOption) *DeliveryRowsIterator
type GetHandler ¶
type GetHandler = func() (unwrapInto interface{}, handle HandleFunc)
GetHandler is a function that is called with each incoming message. The function provides a value to unwrap the message into, and a handler function to then use this value.
When a message arrives as a Delivery from the ListenForDeliveries or FetchPendingDeliveries methods of the SubscriptionDriver, this function, provided to the consume function returned by Subscriber, is called. Then, the Delivery's UnwrapMessage is called with the returned unwrapInto value. If that doesn't fail, the handle function is called.
The handle function should return OK to acknowledge that the message has been processed and should be removed from the queue, or Requeue otherwise.
type NextRowFunc ¶
type OrderGuarantee ¶
type OrderGuarantee struct {
// contains filtered or unexported fields
}
func (OrderGuarantee) FetchIncomingRows ¶
type PQSubscriptionDriver ¶
type PQSubscriptionDriver struct { DB sqler.DB ExecInsertSubscription func(context.Context) error ListenForIncomingBaseQueries func(context.Context) (accept AcceptQueriesFunc, close func() error, err error) PendingBaseQuery QueryWithArgs RowsToDeliveries func(context.Context, *DeliveryRowsIterator) error Ordered OrderGuarantee }
func (PQSubscriptionDriver) FetchPendingDeliveries ¶
func (drv PQSubscriptionDriver) FetchPendingDeliveries(ctx context.Context, yield func(Delivery)) error
func (PQSubscriptionDriver) InsertSubscription ¶
func (drv PQSubscriptionDriver) InsertSubscription(ctx context.Context) error
func (PQSubscriptionDriver) ListenForDeliveries ¶
func (drv PQSubscriptionDriver) ListenForDeliveries(ctx context.Context) (accept AcceptFunc, close func() error, err error)
type Panic ¶
type Panic struct {
// contains filtered or unexported fields
}
A Panic is a panic captured as an error. Is is returned by the consume function returned by Subscribe when either the deliveries listener or the provided handler panic.
type QueryWithArgs ¶
type QueryWithArgs struct { Query string Args []interface{} }
func NewQueryWithArgs ¶
func NewQueryWithArgs(query string, args ...interface{}) QueryWithArgs
func (QueryWithArgs) PollIncomingDeliveries ¶
func (q QueryWithArgs) PollIncomingDeliveries(each time.Duration) func(context.Context) (accept AcceptQueriesFunc, close func() error, err error)
type QueryWithArgsIterator ¶
type QueryWithArgsIterator struct { Next coro.Resume Yielded QueryWithArgs Returned error }
func NewQueryWithArgsIterator ¶
func NewQueryWithArgsIterator(g func(func() error), f func(yield func(QueryWithArgs)) error, options ...coro.SetOption) *QueryWithArgsIterator
type SubscriptionDriver ¶
type SubscriptionDriver interface { InsertSubscription(context.Context) error ListenForDeliveries(context.Context) (accept AcceptFunc, close func() error, err error) FetchPendingDeliveries(context.Context, func(Delivery)) error }
A SubscriptionDriver is the abstract interface that
Directories ¶
Path | Synopsis |
---|---|
Package stopcontext extentds the standard Context interface with an additional Stopped signal, intended for gracefully stopping some iterative process associated with the Context.
|
Package stopcontext extentds the standard Context interface with an additional Stopped signal, intended for gracefully stopping some iterative process associated with the Context. |