postq

package module
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 4, 2023 License: Apache-2.0 Imports: 7 Imported by: 5

README

postq

A queuing library for postgres

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AsyncEventConsumer

type AsyncEventConsumer struct {
	// Name of the events in the push queue to watch for.
	WatchEvents []string

	// Number of events to be fetched and processed at a time.
	BatchSize int

	// An async event handler that consumes events.
	Consumer AsyncEventHandlerFunc

	// ConsumerOption is the configuration for the PGConsumer.
	ConsumerOption *ConsumerOption

	// EventFetcherOption contains configuration on how the events should be fetched.
	EventFetcherOption *EventFetcherOption
}

func (AsyncEventConsumer) EventConsumer

func (t AsyncEventConsumer) EventConsumer() (*PGConsumer, error)

func (*AsyncEventConsumer) Handle

func (t *AsyncEventConsumer) Handle(ctx Context) (int, error)

type AsyncEventHandlerFunc

type AsyncEventHandlerFunc func(Context, Events) Events

AsyncEventHandlerFunc processes multiple events and returns the failed ones

func AsyncHandler added in v0.1.2

func AsyncHandler[T Context](fn func(ctx T, e Events) Events) AsyncEventHandlerFunc

AsyncHandler converts the given user defined handler into a async event handler.

type ConsumerFunc

type ConsumerFunc func(ctx Context) (count int, err error)

type ConsumerOption

type ConsumerOption struct {
	// Number of concurrent consumers.
	// 	default: 1
	NumConsumers int

	// Timeout is the timeout to call the consumer func in case no pg notification is received.
	// 	default: 1 minute
	Timeout time.Duration

	// handle errors when consuming.
	// returns whether to retry or not.
	// 	default: sleep for 5 seconds and retry.
	ErrorHandler func(err error) bool
}

type Context

type Context interface {
	context.Context
	Pool() *pgxpool.Pool
}

type Event

type Event struct {
	ID          uuid.UUID  `json:"id"`
	Name        string     `json:"name"`
	Error       *string    `json:"error"`
	Attempts    int        `json:"attempts"`
	LastAttempt *time.Time `json:"last_attempt"`

	Properties map[string]string `json:"properties"`
	CreatedAt  time.Time         `json:"created_at"`
}

Event represents the event queue table. The table must have the following fields.

func (*Event) Scan

func (t *Event) Scan(rows pgx.Row) error

Scan scans pgx rows into Event

func (*Event) SetError

func (t *Event) SetError(err string)

type EventFetcherOption

type EventFetcherOption struct {
	// MaxAttempts is the number of times an event is attempted to process
	// default: 3
	MaxAttempts int

	// BaseDelay is the base delay between retries
	// default: 60 seconds
	BaseDelay int

	// Exponent is the exponent of the base delay
	// default: 5 (along with baseDelay = 60, the retries are 1, 6, 31, 156 (in minutes))
	Exponent int
}

type Events

type Events []Event

func (Events) Update

func (events Events) Update(ctx Context, tx *pgx.Conn) error

Update updates the events in batches.

type PGConsumer

type PGConsumer struct {
	// contains filtered or unexported fields
}

PGConsumer manages concurrent consumers to handle PostgreSQL NOTIFY events from a specific channel.

func NewPGConsumer

func NewPGConsumer(consumerFunc ConsumerFunc, opt *ConsumerOption) (*PGConsumer, error)

NewPGConsumer returns a new EventConsumer

func (*PGConsumer) ConsumeUntilEmpty

func (t *PGConsumer) ConsumeUntilEmpty(ctx Context)

ConsumeUntilEmpty consumes events in a loop until the event queue is empty.

func (*PGConsumer) Listen

func (e *PGConsumer) Listen(ctx Context, pgNotify <-chan string)

Listen starts consumers in the background

type SyncEventConsumer

type SyncEventConsumer struct {
	// Name of the events in the push queue to watch for.
	WatchEvents []string

	// List of sync event handlers that process a single event one after another in order.
	// All the handlers must succeed or else the event will be marked as failed.
	Consumers []SyncEventHandlerFunc

	// ConsumerOption is the configuration for the PGConsumer.
	ConsumerOption *ConsumerOption

	// EventFetcherOption contains configuration on how the events should be fetched.
	EventFetchOption *EventFetcherOption
}

func (SyncEventConsumer) EventConsumer

func (t SyncEventConsumer) EventConsumer() (*PGConsumer, error)

func (*SyncEventConsumer) Handle

func (t *SyncEventConsumer) Handle(ctx Context) (int, error)

type SyncEventHandlerFunc

type SyncEventHandlerFunc func(Context, Event) error

SyncEventHandlerFunc processes a single event and ONLY makes db changes.

func SyncHandlers added in v0.1.2

func SyncHandlers[T Context](fn ...func(ctx T, e Event) error) []SyncEventHandlerFunc

SyncHandlers converts the given user defined handlers into sync event handlers.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL