Documentation ¶
Index ¶
- Constants
- func StaticMetaInjector(m Metadata) func(context.Context, Metadata)
- func ValidateFields(ctx context.Context, db *sqlx.DB, queueName string) error
- func ValidateIndexes(ctx context.Context, db *sqlx.DB, queueName string) error
- type Consumer
- type ConsumerOption
- func WithAckTimeout(d time.Duration) ConsumerOption
- func WithHistoryLimit(d time.Duration) ConsumerOption
- func WithInvalidMessageCallback(fn InvalidMessageCallback) ConsumerOption
- func WithLockDuration(d time.Duration) ConsumerOption
- func WithLogger(logger *slog.Logger) ConsumerOption
- func WithMaxConsumeCount(max uint) ConsumerOption
- func WithMaxParallelMessages(n int) ConsumerOption
- func WithMessageProcessingReserveDuration(d time.Duration) ConsumerOption
- func WithMetadataFilter(filter *MetadataFilter) ConsumerOption
- func WithMetrics(m metric.Meter) ConsumerOption
- func WithPollingInterval(d time.Duration) ConsumerOption
- type InvalidMessage
- type InvalidMessageCallback
- type MessageHandler
- type MessageHandlerFunc
- type MessageIncoming
- type MessageOutgoing
- type Metadata
- type MetadataFilter
- type MetadataOperation
- type Publisher
- type PublisherOption
Examples ¶
Constants ¶
const ( // MessageProcessed signals that message was processed and shouldn't be processed // again. If processed with an error, it is expected permanent, and new run would // result in the same error. MessageProcessed = true // MessageNotProcessed signals that message wasn't processed and can be processed // again. The error interrupting processing is considered temporary. MessageNotProcessed = false )
Variables ¶
This section is empty.
Functions ¶
func StaticMetaInjector ¶
StaticMetaInjector returns a Metadata injector that injects given Metadata.
func ValidateFields ¶
ValidateFields checks if required fields exist
Types ¶
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer is the preconfigured subscriber of the write input messages
Example ¶
package main import ( "context" "database/sql" "encoding/json" "errors" "fmt" "log" "os" "os/signal" "go.dataddo.com/pgq" ) type Handler struct{} func (h *Handler) HandleMessage(ctx context.Context, msg *pgq.MessageIncoming) (res bool, err error) { defer func() { r := recover() if r == nil { return } log.Println("Recovered in 'Handler.HandleMessage()'", r) // nack the message, it will be retried res = pgq.MessageNotProcessed if e, ok := r.(error); ok { err = e } else { err = fmt.Errorf("%v", r) } }() if msg.Metadata["heaviness"] == "heavy" { // nack the message, it will be retried // Message won't contain error detail in the database. return pgq.MessageNotProcessed, nil } var myPayload struct { Foo string `json:"foo"` } if err := json.Unmarshal(msg.Payload, &myPayload); err != nil { // discard the message, it will not be retried // Message will contain error detail in the database. return pgq.MessageProcessed, fmt.Errorf("invalid payload: %v", err) } // doSomethingWithThePayload(ctx, myPayload) return pgq.MessageProcessed, nil } func main() { db, err := sql.Open("postgres", "user=postgres password=postgres host=localhost port=5432 dbname=postgres") if err != nil { log.Fatal("Error opening database:", err) } defer db.Close() const queueName = "test_queue" c, err := pgq.NewConsumer(db, queueName, &Handler{}) if err != nil { log.Fatal("Error creating consumer:", err) } ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt) if err := c.Run(ctx); err != nil && !errors.Is(err, context.Canceled) { log.Fatal("Error running consumer:", err) } }
Output:
func NewConsumer ¶
func NewConsumer(db *sql.DB, queueName string, handler MessageHandler, opts ...ConsumerOption) (*Consumer, error)
NewConsumer creates Consumer with proper settings
Example ¶
slogger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) c, err := pgq.NewConsumer(db, "queue_name", &Handler{}, pgq.WithLockDuration(10*time.Minute), pgq.WithPollingInterval(500*time.Millisecond), pgq.WithAckTimeout(5*time.Second), pgq.WithMessageProcessingReserveDuration(5*time.Second), pgq.WithMaxParallelMessages(42), pgq.WithMetrics(noop.Meter{}), pgq.WithHistoryLimit(24*time.Hour), pgq.WithLogger(slogger), pgq.WithInvalidMessageCallback(func(ctx context.Context, msg pgq.InvalidMessage, err error) { // message Payload and/or Metadata are not JSON object. // The message will be discarded. slogger.Warn("invalid message", "error", err, "msg.id", msg.ID, ) }), ) _, _ = c, err
Output:
func NewConsumerExt ¶
func NewConsumerExt(db *sqlx.DB, queueName string, handler MessageHandler, opts ...ConsumerOption) (*Consumer, error)
NewConsumer creates Consumer with proper settings, using sqlx.DB (until refactored to use pgx directly)
type ConsumerOption ¶
type ConsumerOption func(c *consumerConfig)
ConsumerOption applies option to consumerConfig.
func WithAckTimeout ¶
func WithAckTimeout(d time.Duration) ConsumerOption
WithAckTimeout sets the timeout for updating the message status when message is processed.
func WithHistoryLimit ¶
func WithHistoryLimit(d time.Duration) ConsumerOption
WithHistoryLimit sets how long in history you want to search for unprocessed messages (default is no limit). If not set, it will look for message in the whole table. You may set this value when using partitioned table to search just in partitions you are interested in.
func WithInvalidMessageCallback ¶
func WithInvalidMessageCallback(fn InvalidMessageCallback) ConsumerOption
WithInvalidMessageCallback sets callback for invalid messages.
func WithLockDuration ¶
func WithLockDuration(d time.Duration) ConsumerOption
WithLockDuration sets the maximal duration for how long the message remains locked for other consumers.
func WithLogger ¶
func WithLogger(logger *slog.Logger) ConsumerOption
WithLogger sets logger. Default is no logging.
func WithMaxConsumeCount ¶
func WithMaxConsumeCount(max uint) ConsumerOption
WithMaxConsumeCount sets the maximal number of times a message can be consumed before it is ignored. Unhandled SIGKILL or uncaught panic, OOM error etc. could lead to consumer failure infinite loop. Setting this value to greater than 0 will prevent happening this loop. Setting this to value 0 disableS this safe mechanism.
func WithMaxParallelMessages ¶
func WithMaxParallelMessages(n int) ConsumerOption
WithMaxParallelMessages sets how many jobs can single consumer process simultaneously.
func WithMessageProcessingReserveDuration ¶
func WithMessageProcessingReserveDuration(d time.Duration) ConsumerOption
WithMessageProcessingReserveDuration sets the duration for which the message is reserved for handling result state.
func WithMetadataFilter ¶
func WithMetadataFilter(filter *MetadataFilter) ConsumerOption
func WithMetrics ¶
func WithMetrics(m metric.Meter) ConsumerOption
WithMetrics sets metrics meter. Default is noop.Meter{}.
func WithPollingInterval ¶
func WithPollingInterval(d time.Duration) ConsumerOption
WithPollingInterval sets how frequently consumer checks the queue for new messages.
type InvalidMessage ¶
type InvalidMessage struct { ID string Metadata json.RawMessage Payload json.RawMessage }
InvalidMessage is definition of invalid message used as argument for InvalidMessageCallback by Consumer.
type InvalidMessageCallback ¶
type InvalidMessageCallback func(ctx context.Context, msg InvalidMessage, err error)
InvalidMessageCallback defines what should happen to messages which are identified as invalid. Such messages usually have missing or malformed required fields.
type MessageHandler ¶
type MessageHandler interface {
HandleMessage(context.Context, *MessageIncoming) (processed bool, err error)
}
MessageHandler handles message received from queue. Returning false means message wasn't processed correctly and shouldn't be acknowledged. Error contains additional information.
Possible combinations:
// | processed | err | description | // | --------- | ---------- | ---------------------------------------------------- | // | false | <nil> | missing failure info, but the message can be retried | // | false | some error | not processed, because of some error, can be retried | // | true | <nil> | processed, no error. | // | true | some error | processed, ended with error. Don't retry! |
type MessageHandlerFunc ¶
type MessageHandlerFunc func(context.Context, *MessageIncoming) (processed bool, err error)
MessageHandlerFunc is MessageHandler implementation by simple function.
func (MessageHandlerFunc) HandleMessage ¶
func (fn MessageHandlerFunc) HandleMessage(ctx context.Context, msg *MessageIncoming) (processed bool, err error)
HandleMessage calls self. It also implements MessageHandler interface.
type MessageIncoming ¶
type MessageIncoming struct { // Metadata contains the message Metadata. Metadata Metadata // Payload is the message's Payload. Payload json.RawMessage // Attempt number, counts from 1. It is incremented every time the message is // consumed. Attempt int // Deadline is the time when the message will be returned to the queue if not // finished. It is set by the queue when the message is consumed. Deadline time.Time // contains filtered or unexported fields }
MessageIncoming is a record retrieved from table queue in Postgres
func NewMessage ¶
func NewMessage(meta Metadata, payload json.RawMessage, attempt int, maxConsumedCount uint) *MessageIncoming
NewMessage creates new message that satisfies Message interface.
func (*MessageIncoming) LastAttempt ¶
func (m *MessageIncoming) LastAttempt() bool
LastAttempt returns true if the message is consumed for the last time according to maxConsumedCount settings. If the Consumer is not configured to limit the number of attempts setting WithMaxConsumeCount to zero, it always returns false.
type MessageOutgoing ¶
type MessageOutgoing struct { // ScheduledFor is the time when the message should be processed. If nil, the messages // gets processed immediately. ScheduledFor *time.Time `db:"scheduled_for"` // Payload is the message's Payload. Payload json.RawMessage `db:"payload"` // Metadata contains the message Metadata. Metadata Metadata `db:"metadata"` }
MessageOutgoing is a record to be inserted into table queue in Postgres
type MetadataFilter ¶
type MetadataFilter struct { Key string Operation MetadataOperation Value string }
MetadataFilter is a filter for metadata. Right now support only direct matching of key/value
type MetadataOperation ¶
type MetadataOperation string
MetadataFilter is a filter for metadata. Right now support only direct matching of key/value
const ( OpEqual MetadataOperation = "=" OpNotEqual MetadataOperation = "<>" )
type Publisher ¶
type Publisher interface {
Publish(ctx context.Context, queue string, msg ...*MessageOutgoing) ([]uuid.UUID, error)
}
Publisher publishes messages to Postgres queue.
Example ¶
package main import ( "context" "database/sql" "encoding/json" "log" "time" "go.dataddo.com/pgq" ) type PayloadStruct struct { Foo string `json:"foo"` } func main() { db, err := sql.Open("postgres", "user=postgres password=postgres host=localhost port=5432 dbname=postgres") if err != nil { log.Fatal("Error opening database:", err) } defer db.Close() const queueName = "test_queue" p := pgq.NewPublisher(db) payload, _ := json.Marshal(PayloadStruct{Foo: "bar"}) messages := []*pgq.MessageOutgoing{ { Metadata: pgq.Metadata{ "version": "1.0", }, Payload: json.RawMessage(payload), }, { Metadata: pgq.Metadata{ "version": "1.0", }, Payload: json.RawMessage(payload), }, { Metadata: pgq.Metadata{ "version": "1.0", }, Payload: json.RawMessage(payload), }, } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() ids, err := p.Publish(ctx, queueName, messages...) if err != nil { log.Fatal("Error publishing message:", err) } log.Println("Published messages with ids:", ids) }
Output:
func NewPublisher ¶
func NewPublisher(db *sql.DB, opts ...PublisherOption) Publisher
NewPublisher initializes the publisher with given *sql.DB client.
Example ¶
package main import ( "database/sql" "os" "go.dataddo.com/pgq" ) var db *sql.DB func main() { hostname, _ := os.Hostname() p := pgq.NewPublisher(db, pgq.WithMetaInjectors( pgq.StaticMetaInjector(pgq.Metadata{"publisher-id": hostname}), ), ) _ = p }
Output:
func NewPublisherExt ¶
func NewPublisherExt(db *sqlx.DB, opts ...PublisherOption) Publisher
NewPublisher initializes the publisher with given *sqlx.DB client
type PublisherOption ¶
type PublisherOption func(*publisherConfig)
PublisherOption configures the publisher. Multiple options can be passed to NewPublisher. Options are applied in the order they are given. The last option overrides any previous ones. If no options are passed to NewPublisher, the default values are used.
func WithMetaInjectors ¶
func WithMetaInjectors(injectors ...func(context.Context, Metadata)) PublisherOption
WithMetaInjectors adds Metadata injectors to the publisher. Injectors are run in the order they are given.
Directories ¶
Path | Synopsis |
---|---|
internal
|
|
require
Package require is a minimal alternative to github.com/stretchr/testify/require.
|
Package require is a minimal alternative to github.com/stretchr/testify/require. |
x
|
|
schema
Package schema is a place where to put general functions and constants relevant to the postgres table schema and pgq setup
|
Package schema is a place where to put general functions and constants relevant to the postgres table schema and pgq setup |