Documentation ¶
Overview ¶
Package yaqpg proivides a simple locking queue for PostgreSQL databases. It is intended for light, local workloads.
Index ¶
- Constants
- Variables
- func DefaultBackoffDelay(attempts int) time.Duration
- func DefaultStableDelay(attempts int) time.Duration
- func Schema(q *Queue) string
- type FunctionProcessor
- type Item
- type Logger
- type Processor
- type Queue
- func (q *Queue) Add(ident string, payload []byte) error
- func (q *Queue) AddBatch(items []*Item) error
- func (q *Queue) AddBatchDelayed(items []*Item, delay time.Duration) error
- func (q *Queue) AddDelayed(ident string, payload []byte, delay time.Duration) error
- func (q *Queue) Connect(max_connections int) error
- func (q *Queue) Count() (int, error)
- func (q *Queue) CountPending() (int, error)
- func (q *Queue) CountReady() (int, error)
- func (q *Queue) CreateSchema() error
- func (q *Queue) Fill(count int, payload_size int, delay_max time.Duration) error
- func (q *Queue) Log(v ...interface{})
- func (q *Queue) LogCounts()
- func (q *Queue) Logf(f string, v ...interface{})
- func (q *Queue) MustCount() int
- func (q *Queue) MustCountPending() int
- func (q *Queue) MustCountReady() int
- func (q *Queue) Process(limit int, proc Processor) error
- func (q *Queue) ProcessReady(limit int, proc Processor) error
Constants ¶
const MaxProcessLimit = 1000
Variables ¶
var DefaultMaxAttempts = 5
var DefaultMaxConnections = 8
var DefaultMaxReprocessDelay = 1 * time.Minute
var DefaultProcessContextTimeout = 1 * time.Minute
var DefaultQueueName = "jobs"
var DefaultReprocessDelay = 2 * time.Second
var DefaultTableName = "yaqpg_queue"
var Exit = os.Exit
var FillBatchSize = 100
var Now = time.Now
Overridable with custom "now" for testing.
var StartWithPlaceholder = true
Functions ¶
func DefaultBackoffDelay ¶
DefaultBackoffDelay returns DefaultReprocessDelay that doubles for every attempt over one, up to DefaultMaxReprocessDelay.
func DefaultStableDelay ¶
DefaultStableDelay returns DefaultReprocessDelay regardless of attempts.
Types ¶
type FunctionProcessor ¶
FunctionProcessor is a Processor that uses a simple function to process each item.
type Item ¶
type Item struct { BatchId ulid.ULID // *should* be included in logs by the Processor. Ident string Attempts int Payload []byte // contains filtered or unexported fields }
Item represents a queue item as used in Process and batch adding.
type Logger ¶
type Logger interface {
Println(...interface{})
}
Logger is the simplest logger used by Queue.
type Processor ¶
type Processor interface { // Process processes an item. Returning error will invoke the re-queueing // logic, subject to MaxAttempts in the Queue. Process(context.Context, *Item) error }
Processor is the thing that processes an item, returning an error if not completed successfully.
type Queue ¶
type Queue struct { Name string TableName string MaxAttempts int ProcessContextTimeout time.Duration ReprocessDelayFunc func(attempts int) time.Duration Pool *pgxpool.Pool Logger Logger Silent bool }
Queue represents a named queue in a specific database table.
func MustStartDefaultQueue ¶
func MustStartDefaultQueue() *Queue
MustStartDefaultQueue calls StartDefaultQueue and panics on error.
func MustStartNamedQueue ¶
MustStartNamedQueue calls StartNamedQueue and panics on error.
func NewDefaultQueue ¶
func NewDefaultQueue() *Queue
NewDefaultQueue returns a new queue with all defaults.
func NewNamedQueue ¶
NewNamedQueue returns a new queue with all defaults except Name.
func StartDefaultQueue ¶
StartDefaultQueue creates a NewDefaultQueue and calls Connect with DefaultMaxConnections.
func StartNamedQueue ¶
StartNamedQueue creates a NewNamedQueue and calls Connect with DefaultMaxConnections.
func (*Queue) AddBatch ¶
AddBatch adds a set of items in a transaction for immediate availability when finished. Every *Item in items will be given the BatchId unique to this call.
func (*Queue) AddBatchDelayed ¶
AddBatchDelayed adds a set of items in a transaction for the given delay. The delay will be the same for all items. Every *Item in items will be given the BatchId unique to this call.
func (*Queue) AddDelayed ¶
AddDelayed adds an item to the queue with the specified delay.
func (*Queue) Connect ¶
Connect creates connection pool for the database defined in the DATABASE_URL environment variable and stores the connection in q.Pool.
func (*Queue) Count ¶
Count returns the total number of items in the queue, regardless of ready_at values.
func (*Queue) CountPending ¶
CountPending returns the total number of items not yet ready in the queue, i.e. those with a ready_at later than the current time. For obvious reasons, this number may be inaccurate by the time you consume it.
func (*Queue) CountReady ¶
CountReady returns the total number of ready items in the queue, including items currently being processed. For obvious reasons, this number may be inaccurate by the time you consume it.
(Excluding items in flight at the db level is too inefficient and anyway none of this remains accurate. If we find a need to know the total number of items being processed, this can be handled on the code side easily enough, but is probably not worth the trouble.)
func (*Queue) CreateSchema ¶
CreateSchema connects and creates the queue table and indexes if needed. Existing relations are NOT dropped! The queue must already have connected to the database.
func (*Queue) Fill ¶
Fill fills the queue with count items of test data, with a randomized delay up to delay_max. The payload will be payload_size bytes of random data. Every item will have the same payload.
Fill concurrently adds batches of up to FillBatchSize items, all of which will have the same delay time. More randomness can be obtained by setting this to a lower value.
This is (arguably) useful for testing!
NOTE: variable payload size is not supported at this time.
func (*Queue) Log ¶
func (q *Queue) Log(v ...interface{})
Log writes v to the logger, with the queue [Name] as a prefix.
func (*Queue) LogCounts ¶
func (q *Queue) LogCounts()
LogCounts retrieves and logs the counts, and panics if any count returns an error.
func (*Queue) MustCountPending ¶
MustCountPending calls CountPending and panics on error.
func (*Queue) MustCountReady ¶
MustCountReady calls CountReady and panics on error.
func (*Queue) Process ¶
Process gets up to limit items from the queue and processes them in goroutines. The items are updated after processing completes, and the transaction (batch) is committed if there were no database errors, or rolled back if there were. The Processor's Process function is passed a ready_at context which it should respect. Returning an error from that function will result in the item being released back into the queue with its Attempts incremented and its ReadyAt pushed out by ReadyAtDelay; or deleted if there MaxAttempts are exceeded.
func (*Queue) ProcessReady ¶
ProcessReady calls Process with limit concurrently as many times as needed for the currently ready set of items. The number of ready items may not be zero after completion: pending items may become available, and new items may be added by other processes.
WARNING: this can be dangerous if you have a large queue!