Versions in this module Expand all Collapse all v0 v0.0.1 Dec 29, 2024 Changes in this version + var ErrTableEmpty = errors.New("table is empty") + var ErrTxNil = errors.New("tx is nil") + type ForwardOption func(forwarder *forwarder) + func WithForwardFilter(filter types.MessageFilter) ForwardOption + type Forwarder interface + Forward func(ctx context.Context, limit int) (types.ForwardStats, error) + func NewForwarder(reader Reader, publisher Publisher, opts ...ForwardOption) (Forwarder, error) + func NewForwarderFromPool(table string, pool *pgxpool.Pool, publisher Publisher, opts ...ForwardOption) (Forwarder, error) + type Publisher interface + Publish func(ctx context.Context, message types.Message) error + type ReadOption func(*reader) + func WithReadFilter(filter types.MessageFilter) ReadOption + type Reader interface + Ack func(ctx context.Context, ids []int64) (int, error) + Read func(ctx context.Context, limit int) ([]types.Message, error) + func NewReader(table string, pool *pgxpool.Pool, opts ...ReadOption) (Reader, error) + type Tx interface + type WriteOption func(*writer) + func WithDisablePreparedBatch() WriteOption + type Writer interface + Write func(ctx context.Context, tx Tx, message types.Message) (int64, error) + WriteBatch func(ctx context.Context, tx pgx.Tx, messages []types.Message) ([]int64, error) + func NewWriter(table string, opts ...WriteOption) (Writer, error)