Documentation ¶
Overview ¶
Example ¶
package main import ( "context" "fmt" "os" "time" "github.com/bsm/pgpq" "github.com/google/uuid" ) func main() { ctx := context.Background() // connection URL: // - use `sslmode=verify-ca` for production // - use `default_query_exec_mode=simple_protocol` in combination with proxies url := "postgres://localhost/pgpq_test?sslmode=disable" if v := os.Getenv("DATABASE_URL"); v != "" { url = v } // connect to postgres client, err := pgpq.Connect(ctx, url) if err != nil { panic(err) } defer client.Close() // truncate the queue, for testing only if err := client.Truncate(ctx); err != nil { panic(err) } // push some tasks into the queue if err := client.Push(ctx, &pgpq.Task{ Priority: 3, Payload: []byte(`{"foo":1}`), }); err != nil { panic(err) } if err := client.Push(ctx, &pgpq.Task{ ID: uuid.MustParse("28667ce4-1999-4af4-9ff2-1757b3844048"), // custom UUID Priority: 4, Payload: []byte(`{"bar":2}`), }); err != nil { panic(err) } if err := client.Push(ctx, &pgpq.Task{ Payload: []byte(`{"baz":3}`), }); err != nil { panic(err) } if err := client.Push(ctx, &pgpq.Task{ Payload: []byte(`{"baz":4}`), NotBefore: time.Now().Add(time.Minute), // delay this task for 1m }); err != nil { panic(err) } // shift the task with the highest priority from the queue claim, err := client.Shift(ctx) if err != nil { panic(err) } defer claim.Release(ctx) // print ID and payload fmt.Println(claim.ID.String()) fmt.Println(string(claim.Payload)) // mark task done and remove from the queue if err := claim.Done(ctx); err != nil { panic(err) } }
Output: 28667ce4-1999-4af4-9ff2-1757b3844048 {"bar": 2}
Index ¶
- Variables
- type Claim
- type Client
- func (c *Client) Claim(ctx context.Context, id uuid.UUID) (*Claim, error)
- func (c *Client) Close() error
- func (c *Client) Get(ctx context.Context, id uuid.UUID) (*TaskDetails, error)
- func (c *Client) Len(ctx context.Context, opts ...ScopeOption) (int64, error)
- func (c *Client) List(ctx context.Context, opts ...ListOption) ([]*TaskDetails, error)
- func (c *Client) MinCreatedAt(ctx context.Context, opts ...ScopeOption) (time.Time, error)
- func (c *Client) Push(ctx context.Context, task *Task) error
- func (c *Client) Shift(ctx context.Context, opts ...ScopeOption) (*Claim, error)
- func (c *Client) Truncate(ctx context.Context, opts ...ScopeOption) error
- type ListOption
- type NamespaceOption
- type ScopeOption
- type Task
- type TaskDetails
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrDuplicateID occurs when a task with the same ID already exists. ErrDuplicateID = errors.New("duplicate ID") // ErrNoTask is returned when tasks cannot be found. ErrNoTask = errors.New("no task") )
Functions ¶
This section is empty.
Types ¶
type Claim ¶
type Claim struct { TaskDetails // contains filtered or unexported fields }
Claim contains a claim on a task. The owner of the claim has an exclusive lock on the task. You must call either Release, Update or Done to release the claim.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client implements a queue client.
func Connect ¶
Connect connects to a PG instance using a URL. Example:
postgres://user:secret@test.host:5432/mydb?sslmode=verify-ca
func Wrap ¶
Wrap wraps an existing database/sql.DB instance. Please note that calling Close() will not close the underlying connection.
func (*Client) Claim ¶ added in v0.2.1
Claim locks and returns the task with the given ID. It may return ErrNoTask.
func (*Client) Len ¶ added in v0.2.0
Len returns the queue length. This counts all the non-delayed tasks.
func (*Client) List ¶
func (c *Client) List(ctx context.Context, opts ...ListOption) ([]*TaskDetails, error)
List lists all tasks (incl. delayed) in the queue.
func (*Client) MinCreatedAt ¶ added in v0.2.0
MinCreatedAt returns created timestamp of the oldest non-delayed task in the queue. It may return ErrNoTask.
type ListOption ¶
type ListOption interface {
// contains filtered or unexported methods
}
ListOption can be applied when listing tasks.
func WithLimit ¶
func WithLimit(v int64) ListOption
WithLimit applies a limit to the list. Default: 100.
type NamespaceOption ¶ added in v0.3.0
type NamespaceOption interface { ListOption ScopeOption }
NamespaceOption can be used in different methods.
func WithNamespace ¶ added in v0.3.0
func WithNamespace(ns string) NamespaceOption
WithNamespace restricts a client to a particular namespace. Namespaces must contain ASCII characters only.
type ScopeOption ¶ added in v0.3.0
type ScopeOption interface {
// contains filtered or unexported methods
}
ScopeOption can be applied when scoping results.