Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AsyncEventConsumer ¶
type AsyncEventConsumer struct { // Name of the events in the push queue to watch for. WatchEvents []string // Number of events to be fetched and processed at a time. BatchSize int // An async event handler that consumes events. Consumer AsyncEventHandlerFunc // ConsumerOption is the configuration for the PGConsumer. ConsumerOption *ConsumerOption // EventFetcherOption contains configuration on how the events should be fetched. EventFetcherOption *EventFetcherOption // contains filtered or unexported fields }
func (AsyncEventConsumer) EventConsumer ¶
func (t AsyncEventConsumer) EventConsumer() (*PGConsumer, error)
func (AsyncEventConsumer) GetRecords ¶
func (t AsyncEventConsumer) GetRecords() ([]models.Event, error)
func (*AsyncEventConsumer) Handle ¶
func (t *AsyncEventConsumer) Handle(ctx context.Context) (int, error)
func (*AsyncEventConsumer) RecordEvents ¶
func (t *AsyncEventConsumer) RecordEvents(size int)
RecordEvents will record all the events fetched by the consumer in a ring buffer.
type AsyncEventHandlerFunc ¶
AsyncEventHandlerFunc processes multiple events and returns the failed ones
func AsyncHandler ¶
func AsyncHandler(fn func(ctx context.Context, e models.Events) models.Events) AsyncEventHandlerFunc
AsyncHandler converts the given user defined handler into a async event handler.
type ConsumerOption ¶
type ConsumerOption struct { // Number of concurrent consumers. // default: 1 NumConsumers int // Timeout is the timeout to call the consumer func in case no pg notification is received. // default: 1 minute Timeout time.Duration // handle errors when consuming. // returns whether to retry or not. // default: sleep for 1s and retry. ErrorHandler func(ctx context.Context, e error) bool }
type EventFetcherOption ¶
type EventFetcherOption struct { // MaxAttempts is the number of times an event is attempted to process // default: 3 MaxAttempts int // BaseDelay is the base delay between retries // default: 60 seconds BaseDelay int // Exponent is the exponent of the base delay // default: 5 (along with baseDelay = 60, the retries are 1, 6, 31, 156 (in minutes)) Exponent int }
type PGConsumer ¶
type PGConsumer struct {
// contains filtered or unexported fields
}
PGConsumer manages concurrent consumers to handle PostgreSQL NOTIFY events from a specific channel.
func NewPGConsumer ¶
func NewPGConsumer(consumerFunc ConsumerFunc, opt *ConsumerOption) (*PGConsumer, error)
NewPGConsumer returns a new EventConsumer
func (*PGConsumer) ConsumeUntilEmpty ¶
func (t *PGConsumer) ConsumeUntilEmpty(ctx context.Context)
ConsumeUntilEmpty consumes events in a loop until the event queue is empty.
type SyncEventConsumer ¶
type SyncEventConsumer struct { // Name of the events in the push queue to watch for. WatchEvents []string // List of sync event handlers that process a single event one after another in order. // All the handlers must succeed or else the event will be marked as failed. Consumers []SyncEventHandlerFunc // ConsumerOption is the configuration for the PGConsumer. ConsumerOption *ConsumerOption // EventFetcherOption contains configuration on how the events should be fetched. EventFetchOption *EventFetcherOption // contains filtered or unexported fields }
func (SyncEventConsumer) EventConsumer ¶
func (t SyncEventConsumer) EventConsumer() (*PGConsumer, error)
func (SyncEventConsumer) GetRecords ¶
func (t SyncEventConsumer) GetRecords() ([]models.Event, error)
func (*SyncEventConsumer) Handle ¶
func (t *SyncEventConsumer) Handle(ctx context.Context) (int, error)
func (*SyncEventConsumer) RecordEvents ¶
func (t *SyncEventConsumer) RecordEvents(size int)
RecordEvents will record all the events fetched by the consumer in a ring buffer.
type SyncEventHandlerFunc ¶
SyncEventHandlerFunc processes a single event and ONLY makes db changes.
func SyncHandlers ¶
SyncHandlers converts the given user defined handlers into sync event handlers.