postq

package
v1.0.644 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 12, 2024 License: Apache-2.0 Imports: 11 Imported by: 3

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AsyncEventConsumer

type AsyncEventConsumer struct {

	// Name of the events in the push queue to watch for.
	WatchEvents []string

	// Number of events to be fetched and processed at a time.
	BatchSize int

	// An async event handler that consumes events.
	Consumer AsyncEventHandlerFunc

	// ConsumerOption is the configuration for the PGConsumer.
	ConsumerOption *ConsumerOption

	// EventFetcherOption contains configuration on how the events should be fetched.
	EventFetcherOption *EventFetcherOption
	// contains filtered or unexported fields
}

func (AsyncEventConsumer) EventConsumer

func (t AsyncEventConsumer) EventConsumer() (*PGConsumer, error)

func (AsyncEventConsumer) GetRecords

func (t AsyncEventConsumer) GetRecords() ([]models.Event, error)

func (*AsyncEventConsumer) Handle

func (t *AsyncEventConsumer) Handle(ctx context.Context) (int, error)

func (*AsyncEventConsumer) RecordEvents

func (t *AsyncEventConsumer) RecordEvents(size int)

RecordEvents will record all the events fetched by the consumer in a ring buffer.

type AsyncEventHandlerFunc

type AsyncEventHandlerFunc func(context.Context, models.Events) models.Events

AsyncEventHandlerFunc processes multiple events and returns the failed ones

func AsyncHandler

func AsyncHandler(fn func(ctx context.Context, e models.Events) models.Events) AsyncEventHandlerFunc

AsyncHandler converts the given user defined handler into a async event handler.

type ConsumerFunc

type ConsumerFunc func(ctx context.Context) (count int, err error)

type ConsumerOption

type ConsumerOption struct {
	// Number of concurrent consumers.
	// 	default: 1
	NumConsumers int

	// Timeout is the timeout to call the consumer func in case no pg notification is received.
	// 	default: 1 minute
	Timeout time.Duration

	// handle errors when consuming.
	// returns whether to retry or not.
	// 	default: sleep for 1s and retry.
	ErrorHandler func(ctx context.Context, e error) bool
}

type EventFetcherOption

type EventFetcherOption struct {
	// MaxAttempts is the number of times an event is attempted to process
	// default: 3
	MaxAttempts int

	// BaseDelay is the base delay between retries
	// default: 60 seconds
	BaseDelay int

	// Exponent is the exponent of the base delay
	// default: 5 (along with baseDelay = 60, the retries are 1, 6, 31, 156 (in minutes))
	Exponent int
}

type PGConsumer

type PGConsumer struct {
	// contains filtered or unexported fields
}

PGConsumer manages concurrent consumers to handle PostgreSQL NOTIFY events from a specific channel.

func NewPGConsumer

func NewPGConsumer(consumerFunc ConsumerFunc, opt *ConsumerOption) (*PGConsumer, error)

NewPGConsumer returns a new EventConsumer

func (*PGConsumer) ConsumeUntilEmpty

func (t *PGConsumer) ConsumeUntilEmpty(ctx context.Context)

ConsumeUntilEmpty consumes events in a loop until the event queue is empty.

func (*PGConsumer) Listen

func (e *PGConsumer) Listen(ctx context.Context, pgNotify <-chan string)

Listen starts consumers in the background

type SyncEventConsumer

type SyncEventConsumer struct {

	// Name of the events in the push queue to watch for.
	WatchEvents []string

	// List of sync event handlers that process a single event one after another in order.
	// All the handlers must succeed or else the event will be marked as failed.
	Consumers []SyncEventHandlerFunc

	// ConsumerOption is the configuration for the PGConsumer.
	ConsumerOption *ConsumerOption

	// EventFetcherOption contains configuration on how the events should be fetched.
	EventFetchOption *EventFetcherOption
	// contains filtered or unexported fields
}

func (SyncEventConsumer) EventConsumer

func (t SyncEventConsumer) EventConsumer() (*PGConsumer, error)

func (SyncEventConsumer) GetRecords

func (t SyncEventConsumer) GetRecords() ([]models.Event, error)

func (*SyncEventConsumer) Handle

func (t *SyncEventConsumer) Handle(ctx context.Context) (int, error)

func (*SyncEventConsumer) RecordEvents

func (t *SyncEventConsumer) RecordEvents(size int)

RecordEvents will record all the events fetched by the consumer in a ring buffer.

type SyncEventHandlerFunc

type SyncEventHandlerFunc func(context.Context, models.Event) error

SyncEventHandlerFunc processes a single event and ONLY makes db changes.

func SyncHandlers

func SyncHandlers(fn ...func(ctx context.Context, e models.Event) error) []SyncEventHandlerFunc

SyncHandlers converts the given user defined handlers into sync event handlers.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL