sql

package
v1.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 8, 2023 License: Apache-2.0, MIT Imports: 16 Imported by: 0

README

Watermill SQL Pub/Sub

CI Status Go Report Card

This is Pub/Sub for the Watermill project.

All Pub/Sub implementations can be found at https://watermill.io/pubsubs/.

Watermill is a Go library for working efficiently with message streams. It is intended for building event driven applications, enabling event sourcing, RPC over messages, sagas and basically whatever else comes to your mind. You can use conventional pub/sub implementations like Kafka or RabbitMQ, but also HTTP or MySQL binlog if that fits your use case.

Documentation: https://watermill.io/

Getting started guide: https://watermill.io/docs/getting-started/

Issues: https://github.com/ThreeDotsLabs/watermill/issues

Contributing

All contributions are very much welcome. If you'd like to help with Watermill development, please see open issues and submit your pull request via GitHub.

Support

If you didn't find the answer to your question in the documentation, feel free to ask us directly!

Please join us on the #watermill channel on the Gophers slack: You can get an invite here.

License

MIT License

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrInvalidTopicName = errors.New("topic name should not contain characters matched by " + disallowedTopicCharacters.String())
View Source
var (
	ErrPublisherClosed = errors.New("publisher is closed")
)
View Source
var (
	ErrSubscriberClosed = errors.New("subscriber is closed")
)

Functions

func TxFromContext

func TxFromContext(ctx context.Context) (*sql.Tx, bool)

TxFromContext returns the transaction used by the subscriber to consume the message. The transaction will be committed if ack of the message is successful. When a nack is sent, the transaction will be rolled back.

It is useful when you want to ensure that data is updated only when the message is processed. Example usage: https://github.com/wfusion/gofusion/common/infra/watermill/tree/master/_examples/real-world-examples/exactly-once-delivery-counter

Types

type BackoffManager

type BackoffManager interface {
	// HandleError handles the error possibly logging it or returning a backoff time depending on the error or the absence of the message.
	HandleError(logger watermill.LoggerAdapter, noMsg bool, err error) time.Duration
}

BackoffManager handles errors or empty result sets and computes the backoff time. You could for example create a stateful version that computes a backoff depending on the error frequency or make errors more or less persistent.

func NewDefaultBackoffManager

func NewDefaultBackoffManager(pollInterval, retryInterval time.Duration) BackoffManager

type Beginner

type Beginner interface {
	BeginTx(context.Context, *sql.TxOptions) (*sql.Tx, error)
	ContextExecutor
}

Beginner begins transactions.

type ContextExecutor

type ContextExecutor interface {
	Executor

	ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
	QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
	QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
}

ContextExecutor can perform SQL queries with context

type DefaultMySQLOffsetsAdapter

type DefaultMySQLOffsetsAdapter struct {
	// GenerateMessagesOffsetsTableName may be used to override how the messages/offsets table name is generated.
	GenerateMessagesOffsetsTableName func(topic string) string
}

DefaultMySQLOffsetsAdapter is adapter for storing offsets for MySQL (or MariaDB) databases.

DefaultMySQLOffsetsAdapter is designed to support multiple subscribers with exactly once delivery and guaranteed order.

We are using FOR UPDATE in NextOffsetQuery to lock consumer group in offsets table.

When another consumer is trying to consume the same message, deadlock should occur in ConsumedMessageQuery. After deadlock, consumer will consume next message.

func (DefaultMySQLOffsetsAdapter) AckMessageQuery

func (a DefaultMySQLOffsetsAdapter) AckMessageQuery(topic string, row Row,
	consumerGroup string) (string, []any)

func (DefaultMySQLOffsetsAdapter) ConsumedMessageQuery

func (a DefaultMySQLOffsetsAdapter) ConsumedMessageQuery(topic string, row Row,
	consumerGroup string, consumerULID []byte) (string, []any)

func (DefaultMySQLOffsetsAdapter) MessagesOffsetsTable

func (a DefaultMySQLOffsetsAdapter) MessagesOffsetsTable(topic string) string

func (DefaultMySQLOffsetsAdapter) NextOffsetQuery

func (a DefaultMySQLOffsetsAdapter) NextOffsetQuery(topic, consumerGroup string) (string, []any)

func (DefaultMySQLOffsetsAdapter) SchemaInitializingQueries

func (a DefaultMySQLOffsetsAdapter) SchemaInitializingQueries(topic string) []string

type DefaultMySQLSchema

type DefaultMySQLSchema struct {
	// GenerateMessagesTableName may be used to override how the messages table name is generated.
	GenerateMessagesTableName func(topic string) string

	// SubscribeBatchSize is the number of messages to be queried at once.
	//
	// Higher value, increases a chance of message re-delivery in case of crash or networking issues.
	// 1 is the safest value, but it may have a negative impact on performance when consuming a lot of messages.
	//
	// Default value is 100.
	SubscribeBatchSize int
}

DefaultMySQLSchema is a default implementation of SchemaAdapter based on MySQL. If you need some customization, you can use composition to change schema and method of unmarshaling.

type MyMessagesSchema struct {
	DefaultMySQLSchema
}

func (m MyMessagesSchema) SchemaInitializingQueries(topic string) []string {
	createMessagesTable := strings.Join([]string{
		"CREATE TABLE IF NOT EXISTS " + m.MessagesTable(topic) + " (",
		"`offset` BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,",
		"`uuid` BINARY(16) NOT NULL,",
		"`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,",
		"`payload` JSON DEFAULT NULL,",
		"`metadata` JSON DEFAULT NULL",
		");",
	}, "\n")

	return []string{createMessagesTable}
}

func (m MyMessagesSchema) UnmarshalMessage(row *sql.Row) (offset int, msg *message.Message, err error) {
	// ...

For debugging your custom schema, we recommend to inject logger with trace logging level which will print all SQL queries.

func (DefaultMySQLSchema) DeleteQuery

func (s DefaultMySQLSchema) DeleteQuery(topic string, offset int64) (string, []any)

func (DefaultMySQLSchema) InsertQuery

func (s DefaultMySQLSchema) InsertQuery(topic string, msgs message.Messages) (string, []any, error)

func (DefaultMySQLSchema) MessagesTable

func (s DefaultMySQLSchema) MessagesTable(topic string) string

func (DefaultMySQLSchema) SchemaInitializingQueries

func (s DefaultMySQLSchema) SchemaInitializingQueries(topic string) []string

func (DefaultMySQLSchema) SelectQuery

func (s DefaultMySQLSchema) SelectQuery(topic string, consumerGroup string,
	offsetsAdapter OffsetsAdapter) (string, []any)

func (DefaultMySQLSchema) SubscribeIsolationLevel

func (s DefaultMySQLSchema) SubscribeIsolationLevel() sql.IsolationLevel

func (DefaultMySQLSchema) UnmarshalMessage

func (s DefaultMySQLSchema) UnmarshalMessage(row Scanner) (Row, error)

type DefaultPostgreSQLOffsetsAdapter

type DefaultPostgreSQLOffsetsAdapter struct {
	// GenerateMessagesOffsetsTableName may be used to override how the messages/offsets table name is generated.
	GenerateMessagesOffsetsTableName func(topic string) string
}

DefaultPostgreSQLOffsetsAdapter is adapter for storing offsets in PostgreSQL database.

DefaultPostgreSQLOffsetsAdapter is designed to support multiple subscribers with exactly once delivery and guaranteed order.

We are using FOR UPDATE in NextOffsetQuery to lock consumer group in offsets table.

When another consumer is trying to consume the same message, deadlock should occur in ConsumedMessageQuery. After deadlock, consumer will consume next message.

func (DefaultPostgreSQLOffsetsAdapter) AckMessageQuery

func (a DefaultPostgreSQLOffsetsAdapter) AckMessageQuery(topic string, row Row, consumerGroup string) (string, []any)

func (DefaultPostgreSQLOffsetsAdapter) ConsumedMessageQuery

func (a DefaultPostgreSQLOffsetsAdapter) ConsumedMessageQuery(topic string,
	row Row, consumerGroup string, consumerULID []byte) (string, []any)

func (DefaultPostgreSQLOffsetsAdapter) MessagesOffsetsTable

func (a DefaultPostgreSQLOffsetsAdapter) MessagesOffsetsTable(topic string) string

func (DefaultPostgreSQLOffsetsAdapter) NextOffsetQuery

func (a DefaultPostgreSQLOffsetsAdapter) NextOffsetQuery(topic, consumerGroup string) (string, []any)

func (DefaultPostgreSQLOffsetsAdapter) SchemaInitializingQueries

func (a DefaultPostgreSQLOffsetsAdapter) SchemaInitializingQueries(topic string) []string

type DefaultPostgreSQLSchema

type DefaultPostgreSQLSchema struct {
	// GenerateMessagesTableName may be used to override how the messages table name is generated.
	GenerateMessagesTableName func(topic string) string

	// SubscribeBatchSize is the number of messages to be queried at once.
	//
	// Higher value, increases a chance of message re-delivery in case of crash or networking issues.
	// 1 is the safest value, but it may have a negative impact on performance when consuming a lot of messages.
	//
	// Default value is 100.
	SubscribeBatchSize int
}

DefaultPostgreSQLSchema is a default implementation of SchemaAdapter based on PostgreSQL.

func (DefaultPostgreSQLSchema) DeleteQuery

func (s DefaultPostgreSQLSchema) DeleteQuery(topic string, offset int64) (string, []any)

func (DefaultPostgreSQLSchema) InsertQuery

func (s DefaultPostgreSQLSchema) InsertQuery(topic string, msgs message.Messages) (string, []any, error)

func (DefaultPostgreSQLSchema) MessagesTable

func (s DefaultPostgreSQLSchema) MessagesTable(topic string) string

func (DefaultPostgreSQLSchema) SchemaInitializingQueries

func (s DefaultPostgreSQLSchema) SchemaInitializingQueries(topic string) []string

func (DefaultPostgreSQLSchema) SelectQuery

func (s DefaultPostgreSQLSchema) SelectQuery(topic string,
	consumerGroup string, offsetsAdapter OffsetsAdapter) (string, []any)

func (DefaultPostgreSQLSchema) SubscribeIsolationLevel

func (s DefaultPostgreSQLSchema) SubscribeIsolationLevel() sql.IsolationLevel

func (DefaultPostgreSQLSchema) UnmarshalMessage

func (s DefaultPostgreSQLSchema) UnmarshalMessage(row Scanner) (Row, error)

type DefaultSchema deprecated

type DefaultSchema = DefaultMySQLSchema

Deprecated: Use DefaultMySQLSchema instead.

type Executor

type Executor interface {
	Exec(query string, args ...any) (sql.Result, error)
	Query(query string, args ...any) (*sql.Rows, error)
	QueryRow(query string, args ...any) *sql.Row
}

Executor can perform SQL queries.

type OffsetsAdapter

type OffsetsAdapter interface {
	// AckMessageQuery the SQL query and arguments that will mark a message as read for a given consumer group.
	AckMessageQuery(topic string, row Row, consumerGroup string) (string, []any)

	// ConsumedMessageQuery will return the SQL query and arguments which be executed after consuming message,
	// but before ack.
	//
	// ConsumedMessageQuery is optional, and will be not executed if query is empty.
	ConsumedMessageQuery(topic string, row Row, consumerGroup string, consumerULID []byte) (string, []any)

	// NextOffsetQuery returns the SQL query and arguments which should return offset of next message to consume.
	NextOffsetQuery(topic, consumerGroup string) (string, []any)

	// SchemaInitializingQueries returns SQL queries which will make sure (CREATE IF NOT EXISTS)
	// that the appropriate tables exist to write messages to the given topic.
	SchemaInitializingQueries(topic string) []string
}

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher inserts the Messages as rows into a SQL table..

func NewPublisher

func NewPublisher(db ContextExecutor, config PublisherConfig, logger watermill.LoggerAdapter) (*Publisher, error)

func (*Publisher) Close

func (p *Publisher) Close() error

Close closes the publisher, which means that all the Publish calls called before are finished and no more Publish calls are accepted. Close is blocking until all the ongoing Publish calls have returned.

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, topic string, messages ...*message.Message) (err error)

Publish inserts the messages as rows into the MessagesTable. Order is guaranteed for messages within one call. Publish is blocking until all rows have been added to the Publisher's transaction. Publisher doesn't guarantee publishing messages in a single transaction, but the constructor accepts both *sql.DB and *sql.Tx, so transactions may be handled upstream by the user.

type PublisherConfig

type PublisherConfig struct {
	// SchemaAdapter provides the schema-dependent queries and arguments for them, based on topic/message etc.
	SchemaAdapter SchemaAdapter

	// AutoInitializeSchema enables initialization of schema database during publish.
	// Schema is initialized once per topic per publisher instance.
	// AutoInitializeSchema is forbidden if using an ongoing transaction as database handle;
	// That could result in an implicit commit of the transaction by a CREATE TABLE statement.
	AutoInitializeSchema bool

	AppID string
}

type Row

type Row struct {
	Offset   int64
	UUID     []byte
	Payload  []byte
	Metadata []byte

	Msg *message.Message

	ExtraData map[string]any
}

type Scanner

type Scanner interface {
	Scan(dest ...any) error
}

type SchemaAdapter

type SchemaAdapter interface {
	// InsertQuery returns the SQL query and arguments that will insert the Watermill message into the SQL storage.
	InsertQuery(topic string, msgs message.Messages) (string, []any, error)

	// SelectQuery returns the SQL query and arguments
	// that returns the next unread message for a given consumer group.
	SelectQuery(topic string, consumerGroup string, offsetsAdapter OffsetsAdapter) (string, []any)

	// DeleteQuery returns the SQL query and arguments
	DeleteQuery(topic string, offset int64) (string, []any)

	// UnmarshalMessage transforms the Row obtained SelectQuery a Watermill message.
	// It also returns the offset of the last read message, for the purpose of acking.
	UnmarshalMessage(row Scanner) (Row, error)

	// SchemaInitializingQueries returns SQL queries which will make sure (CREATE IF NOT EXISTS)
	// that the appropriate tables exist to write messages to the given topic.
	SchemaInitializingQueries(topic string) []string

	// SubscribeIsolationLevel returns the isolation level that will be used when subscribing.
	SubscribeIsolationLevel() sql.IsolationLevel
}

SchemaAdapter produces the SQL queries and arguments appropriately for a specific schema and dialect It also transforms sql.Rows into Watermill messages.

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber makes SELECT queries on the chosen table with the interval defined in the config. The rows are unmarshaled into Watermill messages.

func NewSubscriber

func NewSubscriber(db Beginner, config SubscriberConfig, logger watermill.LoggerAdapter) (*Subscriber, error)

func (*Subscriber) Close

func (s *Subscriber) Close() error

func (*Subscriber) Subscribe

func (s *Subscriber) Subscribe(ctx context.Context, topic string) (o <-chan *message.Message, err error)

func (*Subscriber) SubscribeInitialize

func (s *Subscriber) SubscribeInitialize(topic string) error

type SubscriberConfig

type SubscriberConfig struct {
	ConsumerGroup string

	// PollInterval is the interval to wait between subsequent SELECT queries,
	// if no more messages were found in the database (Prefer using the BackoffManager instead).
	// Must be non-negative. Defaults to 1s.
	PollInterval time.Duration

	// ResendInterval is the time to wait before resending a nacked message.
	// Must be non-negative. Defaults to 1s.
	ResendInterval time.Duration

	// RetryInterval is the time to wait before resuming querying for messages
	// after an error (Prefer using the BackoffManager instead).
	// Must be non-negative. Defaults to 1s.
	RetryInterval time.Duration

	// BackoffManager defines how much to backoff when receiving errors.
	BackoffManager BackoffManager

	// SchemaAdapter provides the schema-dependent queries and arguments for them, based on topic/message etc.
	SchemaAdapter SchemaAdapter

	// OffsetsAdapter provides mechanism for saving acks and offsets of consumers.
	OffsetsAdapter OffsetsAdapter

	// InitializeSchema option enables initializing schema on making subscription.
	InitializeSchema bool

	// DisablePersistent option delete message after consumed
	DisablePersistent bool
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL