postgresq

package
v0.6.9-dev Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 25, 2023 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MessageQueueTableName     = "message_queue"
	MessageQueueSchemaName    = "notification"
	MessageQueueTableFullName = MessageQueueSchemaName + "." + MessageQueueTableName
)

Variables

This section is empty.

Functions

This section is empty.

Types

type NotificationMessage

type NotificationMessage struct {
	ID     string `db:"id"`
	Status string `db:"status"`

	ReceiverType string           `db:"receiver_type"`
	Configs      pgc.StringAnyMap `db:"configs"`
	Details      pgc.StringAnyMap `db:"details"`
	Metadata     pgc.StringAnyMap `db:"metadata"`
	LastError    sql.NullString   `db:"last_error"`

	MaxTries  int  `db:"max_tries"`
	TryCount  int  `db:"try_count"`
	Retryable bool `db:"retryable"`

	ExpiredAt sql.NullTime `db:"expired_at"`
	CreatedAt time.Time    `db:"created_at"`
	UpdatedAt time.Time    `db:"updated_at"`
}

func (*NotificationMessage) FromDomain

func (nm *NotificationMessage) FromDomain(domainMessage notification.Message)

func (*NotificationMessage) ToDomain

func (nm *NotificationMessage) ToDomain() notification.Message

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

func New

func New(logger log.Logger, dbConfig db.Config, opts ...QueueOption) (*Queue, error)

New creates a new queue instance

func (*Queue) Cleanup

func (q *Queue) Cleanup(ctx context.Context, filter queues.FilterCleanup) error

func (*Queue) Dequeue

func (q *Queue) Dequeue(ctx context.Context, receiverTypes []string, batchSize int, handlerFn func(context.Context, []notification.Message) error) error

Dequeue pop the queue based on specific filters (receiver types or batch size) and process the messages with handlerFn message left in pending state that has expired or been updated long time ago means there was a failure when transforming row into a struct

func (*Queue) Enqueue

func (q *Queue) Enqueue(ctx context.Context, ms ...notification.Message) error

Enqueue pushes messages to the queue

func (*Queue) ErrorCallback

func (q *Queue) ErrorCallback(ctx context.Context, ms notification.Message) error

ErrorCallback is a callback that will be called once the message is failed to be handled by handlerFn

func (*Queue) Stop

func (q *Queue) Stop(ctx context.Context) error

Stop will close the db

func (*Queue) SuccessCallback

func (q *Queue) SuccessCallback(ctx context.Context, ms notification.Message) error

SuccessCallback is a callback that will be called once the message is succesfully handled by handlerFn

func (*Queue) Type

func (q *Queue) Type() string

type QueueOption

type QueueOption func(*Queue)

func WithStrategy

func WithStrategy(s Strategy) QueueOption

type Strategy

type Strategy string
const (
	StrategyDefault Strategy = "default"
	StrategyDLQ     Strategy = "dlq"
)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL