postgres

package
v0.11.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 9, 2025 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultMessagesTable = "messages"

DefaultMessagesTable is the table name that will be used if no other table name provided.

Variables

View Source
var (
	ErrMissingSchemaName = errors.New("missing schema name")
)

errors.

Functions

This section is empty.

Types

type Executor

type Executor interface {
	Exec(ctx context.Context, sql string, args ...any) error
}

Executor knows how to run a sql query.

type Instance

type Instance interface {
	Executor
	Ping(ctx context.Context) error
	Query(ctx context.Context, sql string, args ...any) (Rows, error)
	QueryRow(ctx context.Context, sql string, args ...any) Row
}

Instance defines the postgres interface to be used by the store.

type Option

type Option func(any)

Option is a function to set options to Publisher.

func WithJSONPayload added in v0.6.0

func WithJSONPayload() Option

WithJSONPayload creates payload column as JSONB.

func WithSchema

func WithSchema(s string) Option

WithSchema setups schema name.

func WithTableName

func WithTableName(t string) Option

WithTableName setups schema name.

func WithTransformer added in v0.8.0

func WithTransformer[M any, T Storer[M]](tr store.Transformer[M]) Option

WithTransformer applies sets a custom message transformer.

type Row

type Row interface {
	Scan(dest ...any) error
}

Row knows how to scan an query row.

type Rows

type Rows interface {
	Row
	// Close closes the rows, making the connection ready for use again. It is safe
	// to call Close after rows is already closed.
	Close()

	// Err returns any error that occurred while reading.
	Err() error

	// Next prepares the next row for reading. It returns true if there is another
	// row and false if no more rows are available. It automatically closes rows
	// when all rows are read.
	Next() bool
}

Rows knows how to scan multiple query rows.

type Storer

type Storer[T any] struct {
	// contains filtered or unexported fields
}

Storer is the implementation of messages store for postgres.

func New

func New[T any](ctx context.Context, db Instance, opts ...Option) (*Storer[T], error)

New returns a postgres store initialised with the given connection instance and config.

func (*Storer[T]) DeletePublishedByExpiration

func (s *Storer[T]) DeletePublishedByExpiration(ctx context.Context, d time.Duration) error

DeletePublishedByExpiration performs a hard delete of the messages with the column published to true and created at lower than the given duration.

func (Storer[T]) Find added in v0.7.0

func (s Storer[T]) Find(ctx context.Context, q *inspect.Query) (*inspect.Result, error)

Find returns a list of paginated messages filtered by the given query.

func (Storer[T]) Messages

func (s Storer[T]) Messages(ctx context.Context, batch int) ([]messenger.Message, error)

Messages returns a list of unpublished messages ordered by created at, first the oldest.

func (Storer[T]) Published

func (s Storer[T]) Published(ctx context.Context, msg messenger.Message) error

Published marks as published the given messages.

func (*Storer[T]) Republish added in v0.9.0

func (s *Storer[T]) Republish(ctx context.Context, msgID ...string) error

Republish given a list of message ids set published to FALSE. If the given message id does not exists it skips.

func (*Storer[T]) Store

func (s *Storer[T]) Store(ctx context.Context, tx Executor, msgs ...T) error

Store saves messages.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL