Versions in this module Expand all Collapse all v2 v2.0.0 Jul 7, 2023 Changes in this version + var ErrInvalidTopicName = errors.New(...) + var ErrPublisherClosed = errors.New("publisher is closed") + var ErrSubscriberClosed = errors.New("subscriber is closed") + func TxFromContext(ctx context.Context) (*sql.Tx, bool) + type BackoffManager interface + HandleError func(logger watermill.LoggerAdapter, noMsg bool, err error) time.Duration + func NewDefaultBackoffManager(pollInterval, retryInterval time.Duration) BackoffManager + type Beginner interface + BeginTx func(context.Context, *sql.TxOptions) (*sql.Tx, error) + type ContextExecutor interface + ExecContext func(ctx context.Context, query string, args ...interface{}) (sql.Result, error) + QueryContext func(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) + QueryRowContext func(ctx context.Context, query string, args ...interface{}) *sql.Row + type DefaultMySQLOffsetsAdapter struct + GenerateMessagesOffsetsTableName func(topic string) string + func (a DefaultMySQLOffsetsAdapter) AckMessageQuery(topic string, row Row, consumerGroup string) (string, []interface{}) + func (a DefaultMySQLOffsetsAdapter) ConsumedMessageQuery(topic string, row Row, consumerGroup string, consumerULID []byte) (string, []interface{}) + func (a DefaultMySQLOffsetsAdapter) MessagesOffsetsTable(topic string) string + func (a DefaultMySQLOffsetsAdapter) NextOffsetQuery(topic, consumerGroup string) (string, []interface{}) + func (a DefaultMySQLOffsetsAdapter) SchemaInitializingQueries(topic string) []string + type DefaultMySQLSchema struct + GenerateMessagesTableName func(topic string) string + SubscribeBatchSize int + func (s DefaultMySQLSchema) InsertQuery(topic string, msgs message.Messages) (string, []interface{}, error) + func (s DefaultMySQLSchema) MessagesTable(topic string) string + func (s DefaultMySQLSchema) SchemaInitializingQueries(topic string) []string + func (s DefaultMySQLSchema) SelectQuery(topic string, consumerGroup string, offsetsAdapter OffsetsAdapter) (string, []interface{}) + func (s DefaultMySQLSchema) SubscribeIsolationLevel() sql.IsolationLevel + func (s DefaultMySQLSchema) UnmarshalMessage(row Scanner) (Row, error) + type DefaultPostgreSQLOffsetsAdapter struct + GenerateMessagesOffsetsTableName func(topic string) string + func (a DefaultPostgreSQLOffsetsAdapter) AckMessageQuery(topic string, row Row, consumerGroup string) (string, []interface{}) + func (a DefaultPostgreSQLOffsetsAdapter) ConsumedMessageQuery(topic string, row Row, consumerGroup string, consumerULID []byte) (string, []interface{}) + func (a DefaultPostgreSQLOffsetsAdapter) MessagesOffsetsTable(topic string) string + func (a DefaultPostgreSQLOffsetsAdapter) NextOffsetQuery(topic, consumerGroup string) (string, []interface{}) + func (a DefaultPostgreSQLOffsetsAdapter) SchemaInitializingQueries(topic string) []string + type DefaultPostgreSQLSchema struct + GenerateMessagesTableName func(topic string) string + SubscribeBatchSize int + func (s DefaultPostgreSQLSchema) InsertQuery(topic string, msgs message.Messages) (string, []interface{}, error) + func (s DefaultPostgreSQLSchema) MessagesTable(topic string) string + func (s DefaultPostgreSQLSchema) SchemaInitializingQueries(topic string) []string + func (s DefaultPostgreSQLSchema) SelectQuery(topic string, consumerGroup string, offsetsAdapter OffsetsAdapter) (string, []interface{}) + func (s DefaultPostgreSQLSchema) SubscribeIsolationLevel() sql.IsolationLevel + func (s DefaultPostgreSQLSchema) UnmarshalMessage(row Scanner) (Row, error) + type DefaultSchema = DefaultMySQLSchema + type Executor interface + Exec func(query string, args ...interface{}) (sql.Result, error) + Query func(query string, args ...interface{}) (*sql.Rows, error) + QueryRow func(query string, args ...interface{}) *sql.Row + type OffsetsAdapter interface + AckMessageQuery func(topic string, row Row, consumerGroup string) (string, []interface{}) + ConsumedMessageQuery func(topic string, row Row, consumerGroup string, consumerULID []byte) (string, []interface{}) + NextOffsetQuery func(topic, consumerGroup string) (string, []interface{}) + SchemaInitializingQueries func(topic string) []string + type Publisher struct + func NewPublisher(db ContextExecutor, config PublisherConfig, logger watermill.LoggerAdapter) (*Publisher, error) + func (p *Publisher) Close() error + func (p *Publisher) Publish(topic string, messages ...*message.Message) (err error) + type PublisherConfig struct + AutoInitializeSchema bool + SchemaAdapter SchemaAdapter + type Row struct + ExtraData map[string]any + Metadata []byte + Msg *message.Message + Offset int64 + Payload []byte + UUID []byte + type Scanner interface + Scan func(dest ...any) error + type SchemaAdapter interface + InsertQuery func(topic string, msgs message.Messages) (string, []interface{}, error) + SchemaInitializingQueries func(topic string) []string + SelectQuery func(topic string, consumerGroup string, offsetsAdapter OffsetsAdapter) (string, []interface{}) + SubscribeIsolationLevel func() sql.IsolationLevel + UnmarshalMessage func(row Scanner) (Row, error) + type Subscriber struct + func NewSubscriber(db Beginner, config SubscriberConfig, logger watermill.LoggerAdapter) (*Subscriber, error) + func (s *Subscriber) Close() error + func (s *Subscriber) Subscribe(ctx context.Context, topic string) (o <-chan *message.Message, err error) + func (s *Subscriber) SubscribeInitialize(topic string) error + type SubscriberConfig struct + BackoffManager BackoffManager + ConsumerGroup string + InitializeSchema bool + OffsetsAdapter OffsetsAdapter + PollInterval time.Duration + ResendInterval time.Duration + RetryInterval time.Duration + SchemaAdapter SchemaAdapter Other modules containing this package github.com/ThreeDotsLabs/watermill-sql github.com/ThreeDotsLabs/watermill-sql/v3 github.com/ThreeDotsLabs/watermill-sql/v4