postq

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 2, 2023 License: Apache-2.0 Imports: 7 Imported by: 5

README

postq

A queuing library for postgres

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AsyncEventConsumer

type AsyncEventConsumer struct {
	// Name of the events in the push queue to watch for.
	WatchEvents []string

	// Number of events to be fetched and processed at a time.
	BatchSize int

	// An async event handler that consumes events.
	Consumer AsyncEventHandlerFunc

	// ConsumerOption is the configuration for the PGConsumer.
	ConsumerOption *ConsumerOption

	// EventFetcherOption contains configuration on how the events should be fetched.
	EventFetcherOption *EventFetcherOption
}

func (AsyncEventConsumer) EventConsumer

func (t AsyncEventConsumer) EventConsumer() (*PGConsumer, error)

func (*AsyncEventConsumer) Handle

func (t *AsyncEventConsumer) Handle(ctx Context) (int, error)

type AsyncEventHandlerFunc

type AsyncEventHandlerFunc func(Context, Events) Events

AsyncEventHandlerFunc processes multiple events and returns the failed ones

type ConsumerFunc

type ConsumerFunc func(ctx Context) (count int, err error)

type ConsumerOption

type ConsumerOption struct {
	// Number of concurrent consumers.
	// 	default: 1
	NumConsumers int

	// Timeout is the timeout to call the consumer func in case no pg notification is received.
	// 	default: 1 minute
	Timeout time.Duration

	// handle errors when consuming.
	// returns whether to retry or not.
	// 	default: sleep for 5 seconds and retry.
	ErrorHandler func(err error) bool
}

type Context

type Context interface {
	context.Context
	Pool() *pgxpool.Pool
}

type Event

type Event struct {
	ID          uuid.UUID  `json:"id"`
	Name        string     `json:"name"`
	Error       *string    `json:"error"`
	Attempts    int        `json:"attempts"`
	LastAttempt *time.Time `json:"last_attempt"`

	Properties map[string]string `json:"properties"`
	CreatedAt  time.Time         `json:"created_at"`
}

Event represents the event queue table. The table must have the following fields.

func (*Event) Scan

func (t *Event) Scan(rows pgx.Row) error

Scan scans pgx rows into Event

func (*Event) SetError

func (t *Event) SetError(err string)

type EventFetcherOption

type EventFetcherOption struct {
	// MaxAttempts is the number of times an event is attempted to process
	// default: 3
	MaxAttempts int

	// BaseDelay is the base delay between retries
	// default: 60 seconds
	BaseDelay int

	// Exponent is the exponent of the base delay
	// default: 5 (along with baseDelay = 60, the retries are 1, 6, 31, 156 (in minutes))
	Exponent int
}

type Events

type Events []Event

func (Events) Update

func (events Events) Update(ctx Context, tx *pgx.Conn) error

Update updates the events in batches.

type PGConsumer

type PGConsumer struct {
	// contains filtered or unexported fields
}

PGConsumer manages concurrent consumers to handle PostgreSQL NOTIFY events from a specific channel.

func NewPGConsumer

func NewPGConsumer(consumerFunc ConsumerFunc, opt *ConsumerOption) (*PGConsumer, error)

NewPGConsumer returns a new EventConsumer

func (*PGConsumer) ConsumeUntilEmpty

func (t *PGConsumer) ConsumeUntilEmpty(ctx Context)

ConsumeUntilEmpty consumes events in a loop until the event queue is empty.

func (*PGConsumer) Listen

func (e *PGConsumer) Listen(ctx Context, pgNotify <-chan string)

Listen starts consumers in the background

type SyncEventConsumer

type SyncEventConsumer struct {
	// Name of the events in the push queue to watch for.
	WatchEvents []string

	// List of sync event handlers that process a single event one after another in order.
	// All the handlers must succeed or else the event will be marked as failed.
	Consumers []SyncEventHandlerFunc

	// ConsumerOption is the configuration for the PGConsumer.
	ConsumerOption *ConsumerOption

	// EventFetcherOption contains configuration on how the events should be fetched.
	EventFetchOption *EventFetcherOption
}

func (SyncEventConsumer) EventConsumer

func (t SyncEventConsumer) EventConsumer() (*PGConsumer, error)

func (*SyncEventConsumer) Handle

func (t *SyncEventConsumer) Handle(ctx Context) (int, error)

type SyncEventHandlerFunc

type SyncEventHandlerFunc func(Context, Event) error

SyncEventHandlerFunc processes a single event and ONLY makes db changes.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL