Documentation ¶
Index ¶
- Variables
- func NewDelayedPostgreSQLPublisher(db *sql.DB, config DelayedPostgreSQLPublisherConfig) (message.Publisher, error)
- func NewDelayedPostgreSQLSubscriber(db *sql.DB, config DelayedPostgreSQLSubscriberConfig) (message.Subscriber, error)
- func TxFromContext(ctx context.Context) (*sql.Tx, bool)
- type AckMessageQueryParams
- type BackoffManager
- type BeforeSubscribingQueriesParams
- type Beginner
- type ConsumedMessageQueryParams
- type ContextExecutor
- type DefaultMySQLOffsetsAdapter
- func (a DefaultMySQLOffsetsAdapter) AckMessageQuery(params AckMessageQueryParams) (Query, error)
- func (a DefaultMySQLOffsetsAdapter) BeforeSubscribingQueries(params BeforeSubscribingQueriesParams) ([]Query, error)
- func (a DefaultMySQLOffsetsAdapter) ConsumedMessageQuery(params ConsumedMessageQueryParams) (Query, error)
- func (a DefaultMySQLOffsetsAdapter) MessagesOffsetsTable(topic string) string
- func (a DefaultMySQLOffsetsAdapter) NextOffsetQuery(params NextOffsetQueryParams) (Query, error)
- func (a DefaultMySQLOffsetsAdapter) SchemaInitializingQueries(params OffsetsSchemaInitializingQueriesParams) ([]Query, error)
- type DefaultMySQLSchema
- func (s DefaultMySQLSchema) InsertQuery(params InsertQueryParams) (Query, error)
- func (s DefaultMySQLSchema) MessagesTable(topic string) string
- func (s DefaultMySQLSchema) PayloadColumnType(topic string) string
- func (s DefaultMySQLSchema) SchemaInitializingQueries(params SchemaInitializingQueriesParams) ([]Query, error)
- func (s DefaultMySQLSchema) SelectQuery(params SelectQueryParams) (Query, error)
- func (s DefaultMySQLSchema) SubscribeIsolationLevel() sql.IsolationLevel
- func (s DefaultMySQLSchema) UnmarshalMessage(params UnmarshalMessageParams) (Row, error)
- type DefaultPostgreSQLOffsetsAdapter
- func (a DefaultPostgreSQLOffsetsAdapter) AckMessageQuery(params AckMessageQueryParams) (Query, error)
- func (a DefaultPostgreSQLOffsetsAdapter) BeforeSubscribingQueries(params BeforeSubscribingQueriesParams) ([]Query, error)
- func (a DefaultPostgreSQLOffsetsAdapter) ConsumedMessageQuery(params ConsumedMessageQueryParams) (Query, error)
- func (a DefaultPostgreSQLOffsetsAdapter) MessagesOffsetsTable(topic string) string
- func (a DefaultPostgreSQLOffsetsAdapter) NextOffsetQuery(params NextOffsetQueryParams) (Query, error)
- func (a DefaultPostgreSQLOffsetsAdapter) SchemaInitializingQueries(params OffsetsSchemaInitializingQueriesParams) ([]Query, error)
- type DefaultPostgreSQLSchema
- func (s DefaultPostgreSQLSchema) InsertQuery(params InsertQueryParams) (Query, error)
- func (s DefaultPostgreSQLSchema) MessagesTable(topic string) string
- func (s DefaultPostgreSQLSchema) PayloadColumnType(topic string) string
- func (s DefaultPostgreSQLSchema) SchemaInitializingQueries(params SchemaInitializingQueriesParams) ([]Query, error)
- func (s DefaultPostgreSQLSchema) SelectQuery(params SelectQueryParams) (Query, error)
- func (s DefaultPostgreSQLSchema) SubscribeIsolationLevel() sql.IsolationLevel
- func (s DefaultPostgreSQLSchema) UnmarshalMessage(params UnmarshalMessageParams) (Row, error)
- type DefaultSchemadeprecated
- type DelayedPostgreSQLPublisherConfig
- type DelayedPostgreSQLSubscriberConfig
- type DelayedRequeuer
- type DelayedRequeuerConfig
- type Executor
- type GenerateWhereClauseParams
- type InsertQueryParams
- type NextOffsetQueryParams
- type OffsetsAdapter
- type OffsetsSchemaInitializingQueriesParams
- type PostgreSQLQueueOffsetsAdapter
- func (a PostgreSQLQueueOffsetsAdapter) AckMessageQuery(params AckMessageQueryParams) (Query, error)
- func (a PostgreSQLQueueOffsetsAdapter) BeforeSubscribingQueries(params BeforeSubscribingQueriesParams) ([]Query, error)
- func (a PostgreSQLQueueOffsetsAdapter) ConsumedMessageQuery(params ConsumedMessageQueryParams) (Query, error)
- func (a PostgreSQLQueueOffsetsAdapter) MessagesTable(topic string) string
- func (a PostgreSQLQueueOffsetsAdapter) NextOffsetQuery(params NextOffsetQueryParams) (Query, error)
- func (a PostgreSQLQueueOffsetsAdapter) SchemaInitializingQueries(params OffsetsSchemaInitializingQueriesParams) ([]Query, error)
- type PostgreSQLQueueSchema
- func (s PostgreSQLQueueSchema) InsertQuery(params InsertQueryParams) (Query, error)
- func (s PostgreSQLQueueSchema) MessagesTable(topic string) string
- func (s PostgreSQLQueueSchema) SchemaInitializingQueries(params SchemaInitializingQueriesParams) ([]Query, error)
- func (s PostgreSQLQueueSchema) SelectQuery(params SelectQueryParams) (Query, error)
- func (s PostgreSQLQueueSchema) SubscribeIsolationLevel() sql.IsolationLevel
- func (s PostgreSQLQueueSchema) UnmarshalMessage(params UnmarshalMessageParams) (Row, error)
- type Publisher
- type PublisherConfig
- type Query
- type Row
- type Scanner
- type SchemaAdapter
- type SchemaInitializingQueriesParams
- type SelectQueryParams
- type Subscriber
- type SubscriberConfig
- type UnmarshalMessageParams
Constants ¶
This section is empty.
Variables ¶
var ErrInvalidTopicName = errors.New("topic name should not contain characters matched by " + disallowedTopicCharacters.String())
var (
ErrPublisherClosed = errors.New("publisher is closed")
)
var (
ErrSubscriberClosed = errors.New("subscriber is closed")
)
Functions ¶
func NewDelayedPostgreSQLPublisher ¶
func NewDelayedPostgreSQLPublisher(db *sql.DB, config DelayedPostgreSQLPublisherConfig) (message.Publisher, error)
NewDelayedPostgreSQLPublisher creates a new Publisher that stores messages in PostgreSQL with a delay. The delay can be set per message with the Watermill's components/delay metadata.
func NewDelayedPostgreSQLSubscriber ¶
func NewDelayedPostgreSQLSubscriber(db *sql.DB, config DelayedPostgreSQLSubscriberConfig) (message.Subscriber, error)
NewDelayedPostgreSQLSubscriber creates a new Subscriber that reads messages from PostgreSQL with a delay. The delay can be set per message with the Watermill's components/delay metadata.
func TxFromContext ¶
TxFromContext returns the transaction used by the subscriber to consume the message. The transaction will be committed if ack of the message is successful. When a nack is sent, the transaction will be rolled back.
It is useful when you want to ensure that data is updated only when the message is processed. Example usage: https://github.com/ThreeDotsLabs/watermill/tree/master/_examples/real-world-examples/exactly-once-delivery-counter
Types ¶
type AckMessageQueryParams ¶
type BackoffManager ¶
type BackoffManager interface { // HandleError handles the error possibly logging it or returning a backoff time depending on the error or the absence of the message. HandleError(logger watermill.LoggerAdapter, noMsg bool, err error) time.Duration }
BackoffManager handles errors or empty result sets and computes the backoff time. You could for example create a stateful version that computes a backoff depending on the error frequency or make errors more or less persistent.
func NewDefaultBackoffManager ¶
func NewDefaultBackoffManager(pollInterval, retryInterval time.Duration) BackoffManager
type Beginner ¶
type Beginner interface { BeginTx(context.Context, *sql.TxOptions) (*sql.Tx, error) ContextExecutor }
Beginner begins transactions.
type ContextExecutor ¶
type ContextExecutor interface { Executor ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row }
ContextExecutor can perform SQL queries with context
type DefaultMySQLOffsetsAdapter ¶
type DefaultMySQLOffsetsAdapter struct { // GenerateMessagesOffsetsTableName may be used to override how the messages/offsets table name is generated. GenerateMessagesOffsetsTableName func(topic string) string }
DefaultMySQLOffsetsAdapter is adapter for storing offsets for MySQL (or MariaDB) databases.
DefaultMySQLOffsetsAdapter is designed to support multiple subscribers with exactly once delivery and guaranteed order.
We are using FOR UPDATE in NextOffsetQuery to lock consumer group in offsets table.
When another consumer is trying to consume the same message, deadlock should occur in ConsumedMessageQuery. After deadlock, consumer will consume next message.
func (DefaultMySQLOffsetsAdapter) AckMessageQuery ¶
func (a DefaultMySQLOffsetsAdapter) AckMessageQuery(params AckMessageQueryParams) (Query, error)
func (DefaultMySQLOffsetsAdapter) BeforeSubscribingQueries ¶
func (a DefaultMySQLOffsetsAdapter) BeforeSubscribingQueries(params BeforeSubscribingQueriesParams) ([]Query, error)
func (DefaultMySQLOffsetsAdapter) ConsumedMessageQuery ¶
func (a DefaultMySQLOffsetsAdapter) ConsumedMessageQuery(params ConsumedMessageQueryParams) (Query, error)
func (DefaultMySQLOffsetsAdapter) MessagesOffsetsTable ¶
func (a DefaultMySQLOffsetsAdapter) MessagesOffsetsTable(topic string) string
func (DefaultMySQLOffsetsAdapter) NextOffsetQuery ¶
func (a DefaultMySQLOffsetsAdapter) NextOffsetQuery(params NextOffsetQueryParams) (Query, error)
func (DefaultMySQLOffsetsAdapter) SchemaInitializingQueries ¶
func (a DefaultMySQLOffsetsAdapter) SchemaInitializingQueries(params OffsetsSchemaInitializingQueriesParams) ([]Query, error)
type DefaultMySQLSchema ¶
type DefaultMySQLSchema struct { // GenerateMessagesTableName may be used to override how the messages table name is generated. GenerateMessagesTableName func(topic string) string // GeneratePayloadType is the type of the payload column in the messages table. // By default, it's JSON. If your payload is not JSON, you can use BYTEA. GeneratePayloadType func(topic string) string // SubscribeBatchSize is the number of messages to be queried at once. // // Higher value, increases a chance of message re-delivery in case of crash or networking issues. // 1 is the safest value, but it may have a negative impact on performance when consuming a lot of messages. // // Default value is 100. SubscribeBatchSize int }
DefaultMySQLSchema is a default implementation of SchemaAdapter based on MySQL. If you need some customization, you can use composition to change schema and method of unmarshaling.
type MyMessagesSchema struct { DefaultMySQLSchema } func (m MyMessagesSchema) SchemaInitializingQueries(params SchemaInitializingQueriesParams) []string { createMessagesTable := strings.Join([]string{ "CREATE TABLE IF NOT EXISTS " + m.MessagesTable(topic) + " (", "`offset` BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,", "`uuid` BINARY(16) NOT NULL,", "`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,", "`payload` JSON DEFAULT NULL,", "`metadata` JSON DEFAULT NULL", ");", }, "\n") return []string{createMessagesTable} } func (m MyMessagesSchema) UnmarshalMessage(params UnmarshalMessageParams) (offset int, msg *message.Message, err error) { // ...
For debugging your custom schema, we recommend to inject logger with trace logging level which will print all SQL queries.
func (DefaultMySQLSchema) InsertQuery ¶
func (s DefaultMySQLSchema) InsertQuery(params InsertQueryParams) (Query, error)
func (DefaultMySQLSchema) MessagesTable ¶
func (s DefaultMySQLSchema) MessagesTable(topic string) string
func (DefaultMySQLSchema) PayloadColumnType ¶
func (s DefaultMySQLSchema) PayloadColumnType(topic string) string
func (DefaultMySQLSchema) SchemaInitializingQueries ¶
func (s DefaultMySQLSchema) SchemaInitializingQueries(params SchemaInitializingQueriesParams) ([]Query, error)
func (DefaultMySQLSchema) SelectQuery ¶
func (s DefaultMySQLSchema) SelectQuery(params SelectQueryParams) (Query, error)
func (DefaultMySQLSchema) SubscribeIsolationLevel ¶
func (s DefaultMySQLSchema) SubscribeIsolationLevel() sql.IsolationLevel
func (DefaultMySQLSchema) UnmarshalMessage ¶
func (s DefaultMySQLSchema) UnmarshalMessage(params UnmarshalMessageParams) (Row, error)
type DefaultPostgreSQLOffsetsAdapter ¶
type DefaultPostgreSQLOffsetsAdapter struct { // GenerateMessagesOffsetsTableName may be used to override how the messages/offsets table name is generated. GenerateMessagesOffsetsTableName func(topic string) string }
DefaultPostgreSQLOffsetsAdapter is adapter for storing offsets in PostgreSQL database.
DefaultPostgreSQLOffsetsAdapter is designed to support multiple subscribers with exactly once delivery and guaranteed order.
We are using FOR UPDATE in NextOffsetQuery to lock consumer group in offsets table.
When another consumer is trying to consume the same message, deadlock should occur in ConsumedMessageQuery. After deadlock, consumer will consume next message.
func (DefaultPostgreSQLOffsetsAdapter) AckMessageQuery ¶
func (a DefaultPostgreSQLOffsetsAdapter) AckMessageQuery(params AckMessageQueryParams) (Query, error)
func (DefaultPostgreSQLOffsetsAdapter) BeforeSubscribingQueries ¶
func (a DefaultPostgreSQLOffsetsAdapter) BeforeSubscribingQueries(params BeforeSubscribingQueriesParams) ([]Query, error)
func (DefaultPostgreSQLOffsetsAdapter) ConsumedMessageQuery ¶
func (a DefaultPostgreSQLOffsetsAdapter) ConsumedMessageQuery(params ConsumedMessageQueryParams) (Query, error)
func (DefaultPostgreSQLOffsetsAdapter) MessagesOffsetsTable ¶
func (a DefaultPostgreSQLOffsetsAdapter) MessagesOffsetsTable(topic string) string
func (DefaultPostgreSQLOffsetsAdapter) NextOffsetQuery ¶
func (a DefaultPostgreSQLOffsetsAdapter) NextOffsetQuery(params NextOffsetQueryParams) (Query, error)
func (DefaultPostgreSQLOffsetsAdapter) SchemaInitializingQueries ¶
func (a DefaultPostgreSQLOffsetsAdapter) SchemaInitializingQueries(params OffsetsSchemaInitializingQueriesParams) ([]Query, error)
type DefaultPostgreSQLSchema ¶
type DefaultPostgreSQLSchema struct { // GenerateMessagesTableName may be used to override how the messages table name is generated. GenerateMessagesTableName func(topic string) string // GeneratePayloadType is the type of the payload column in the messages table. // By default, it's JSON. If your payload is not JSON, you can use BYTEA. GeneratePayloadType func(topic string) string // SubscribeBatchSize is the number of messages to be queried at once. // // Higher value, increases a chance of message re-delivery in case of crash or networking issues. // 1 is the safest value, but it may have a negative impact on performance when consuming a lot of messages. // // Default value is 100. SubscribeBatchSize int }
DefaultPostgreSQLSchema is a default implementation of SchemaAdapter based on PostgreSQL.
func (DefaultPostgreSQLSchema) InsertQuery ¶
func (s DefaultPostgreSQLSchema) InsertQuery(params InsertQueryParams) (Query, error)
func (DefaultPostgreSQLSchema) MessagesTable ¶
func (s DefaultPostgreSQLSchema) MessagesTable(topic string) string
func (DefaultPostgreSQLSchema) PayloadColumnType ¶
func (s DefaultPostgreSQLSchema) PayloadColumnType(topic string) string
func (DefaultPostgreSQLSchema) SchemaInitializingQueries ¶
func (s DefaultPostgreSQLSchema) SchemaInitializingQueries(params SchemaInitializingQueriesParams) ([]Query, error)
func (DefaultPostgreSQLSchema) SelectQuery ¶
func (s DefaultPostgreSQLSchema) SelectQuery(params SelectQueryParams) (Query, error)
func (DefaultPostgreSQLSchema) SubscribeIsolationLevel ¶
func (s DefaultPostgreSQLSchema) SubscribeIsolationLevel() sql.IsolationLevel
func (DefaultPostgreSQLSchema) UnmarshalMessage ¶
func (s DefaultPostgreSQLSchema) UnmarshalMessage(params UnmarshalMessageParams) (Row, error)
type DefaultSchema
deprecated
type DefaultSchema = DefaultMySQLSchema
Deprecated: Use DefaultMySQLSchema instead.
type DelayedPostgreSQLPublisherConfig ¶
type DelayedPostgreSQLPublisherConfig struct { // DelayPublisherConfig is a configuration for the delay.Publisher. DelayPublisherConfig delay.PublisherConfig // OverridePublisherConfig allows overriding the default PublisherConfig. OverridePublisherConfig func(config *PublisherConfig) error Logger watermill.LoggerAdapter }
type DelayedPostgreSQLSubscriberConfig ¶
type DelayedPostgreSQLSubscriberConfig struct { // OverrideSubscriberConfig allows overriding the default SubscriberConfig. OverrideSubscriberConfig func(config *SubscriberConfig) error // DeleteOnAck deletes the message from the queue when it's acknowledged. DeleteOnAck bool // AllowNoDelay allows receiving messages without the delay metadata. // By default, such messages will be skipped. // If set to true, messages without delay metadata will be received immediately. AllowNoDelay bool Logger watermill.LoggerAdapter }
type DelayedRequeuer ¶
type DelayedRequeuer struct {
// contains filtered or unexported fields
}
DelayedRequeuer is a requeuer that uses a delayed publisher and subscriber to requeue messages.
After creating it, you should: 1. Add the Middleware() to your router. 2. Run it with the Run method.
func NewPostgreSQLDelayedRequeuer ¶
func NewPostgreSQLDelayedRequeuer(config DelayedRequeuerConfig) (*DelayedRequeuer, error)
NewPostgreSQLDelayedRequeuer creates a new DelayedRequeuer that uses PostgreSQL as a storage.
func (DelayedRequeuer) Middleware ¶
func (q DelayedRequeuer) Middleware() []message.HandlerMiddleware
Middleware returns the middleware that should be added to the router.
type DelayedRequeuerConfig ¶
type DelayedRequeuerConfig struct { // DB is a database connection. Required. DB *sql.DB // Publisher is a publisher that will be used to publish requeued messages. Required. Publisher message.Publisher // RequeueTopic is a topic where requeued messages will be published. Defaults to "requeue". RequeueTopic string // GeneratePublishTopic is a function that generates the topic where the message should be published after requeue. // Defaults to getting the original topic from the message metadata (provided by the PoisonQueue middleware). GeneratePublishTopic func(params requeuer.GeneratePublishTopicParams) (string, error) // DelayOnError middleware. Optional DelayOnError *middleware.DelayOnError Logger watermill.LoggerAdapter }
DelayedRequeuerConfig is a configuration for DelayedRequeuer.
func (*DelayedRequeuerConfig) Validate ¶
func (c *DelayedRequeuerConfig) Validate() error
type Executor ¶
type Executor interface { Exec(query string, args ...interface{}) (sql.Result, error) Query(query string, args ...interface{}) (*sql.Rows, error) QueryRow(query string, args ...interface{}) *sql.Row }
Executor can perform SQL queries.
type GenerateWhereClauseParams ¶
type GenerateWhereClauseParams struct {
Topic string
}
type InsertQueryParams ¶
type NextOffsetQueryParams ¶
type OffsetsAdapter ¶
type OffsetsAdapter interface { // AckMessageQuery the SQL query and arguments that will mark a message as read for a given consumer group. AckMessageQuery(params AckMessageQueryParams) (Query, error) // ConsumedMessageQuery will return the SQL query and arguments which be executed after consuming message, // but before ack. // // ConsumedMessageQuery is optional, and will be not executed if query is empty. ConsumedMessageQuery(params ConsumedMessageQueryParams) (Query, error) // NextOffsetQuery returns the SQL query and arguments which should return offset of next message to consume. NextOffsetQuery(params NextOffsetQueryParams) (Query, error) // SchemaInitializingQueries returns SQL queries which will make sure (CREATE IF NOT EXISTS) // that the appropriate tables exist to write messages to the given topic. SchemaInitializingQueries(params OffsetsSchemaInitializingQueriesParams) ([]Query, error) // BeforeSubscribingQueries returns queries which will be executed before subscribing to a topic. // All queries will be executed in a single transaction. BeforeSubscribingQueries(params BeforeSubscribingQueriesParams) ([]Query, error) }
type OffsetsSchemaInitializingQueriesParams ¶
type OffsetsSchemaInitializingQueriesParams struct {
Topic string
}
type PostgreSQLQueueOffsetsAdapter ¶
type PostgreSQLQueueOffsetsAdapter struct { // DeleteOnAck determines whether the message should be deleted from the table when it is acknowledged. // If false, the message will be marked as acked. DeleteOnAck bool // GenerateMessagesTableName may be used to override how the messages table name is generated. GenerateMessagesTableName func(topic string) string }
PostgreSQLQueueOffsetsAdapter is an OffsetsAdapter for the PostgreSQLQueueSchema.
func (PostgreSQLQueueOffsetsAdapter) AckMessageQuery ¶
func (a PostgreSQLQueueOffsetsAdapter) AckMessageQuery(params AckMessageQueryParams) (Query, error)
func (PostgreSQLQueueOffsetsAdapter) BeforeSubscribingQueries ¶
func (a PostgreSQLQueueOffsetsAdapter) BeforeSubscribingQueries(params BeforeSubscribingQueriesParams) ([]Query, error)
func (PostgreSQLQueueOffsetsAdapter) ConsumedMessageQuery ¶
func (a PostgreSQLQueueOffsetsAdapter) ConsumedMessageQuery(params ConsumedMessageQueryParams) (Query, error)
func (PostgreSQLQueueOffsetsAdapter) MessagesTable ¶
func (a PostgreSQLQueueOffsetsAdapter) MessagesTable(topic string) string
func (PostgreSQLQueueOffsetsAdapter) NextOffsetQuery ¶
func (a PostgreSQLQueueOffsetsAdapter) NextOffsetQuery(params NextOffsetQueryParams) (Query, error)
func (PostgreSQLQueueOffsetsAdapter) SchemaInitializingQueries ¶
func (a PostgreSQLQueueOffsetsAdapter) SchemaInitializingQueries(params OffsetsSchemaInitializingQueriesParams) ([]Query, error)
type PostgreSQLQueueSchema ¶
type PostgreSQLQueueSchema struct { // GenerateWhereClause is a function that returns a where clause and arguments for the SELECT query. // It may be used to filter messages by some condition. // If empty, no where clause will be added. GenerateWhereClause func(params GenerateWhereClauseParams) (string, []any) // GeneratePayloadType is the type of the payload column in the messages table. // By default, it's JSON. If your payload is not JSON, you can use BYTEA. GeneratePayloadType func(topic string) string // GenerateMessagesTableName may be used to override how the messages table name is generated. GenerateMessagesTableName func(topic string) string // SubscribeBatchSize is the number of messages to be queried at once. // // Higher value, increases a chance of message re-delivery in case of crash or networking issues. // 1 is the safest value, but it may have a negative impact on performance when consuming a lot of messages. // // Default value is 100. SubscribeBatchSize int }
PostgreSQLQueueSchema is a schema adapter for PostgreSQL that allows filtering messages by some condition. It DOES NOT support consumer groups. It supports deleting messages on ack.
func (PostgreSQLQueueSchema) InsertQuery ¶
func (s PostgreSQLQueueSchema) InsertQuery(params InsertQueryParams) (Query, error)
func (PostgreSQLQueueSchema) MessagesTable ¶
func (s PostgreSQLQueueSchema) MessagesTable(topic string) string
func (PostgreSQLQueueSchema) SchemaInitializingQueries ¶
func (s PostgreSQLQueueSchema) SchemaInitializingQueries(params SchemaInitializingQueriesParams) ([]Query, error)
func (PostgreSQLQueueSchema) SelectQuery ¶
func (s PostgreSQLQueueSchema) SelectQuery(params SelectQueryParams) (Query, error)
func (PostgreSQLQueueSchema) SubscribeIsolationLevel ¶
func (s PostgreSQLQueueSchema) SubscribeIsolationLevel() sql.IsolationLevel
func (PostgreSQLQueueSchema) UnmarshalMessage ¶
func (s PostgreSQLQueueSchema) UnmarshalMessage(params UnmarshalMessageParams) (Row, error)
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher inserts the Messages as rows into a SQL table..
func NewPublisher ¶
func NewPublisher(db ContextExecutor, config PublisherConfig, logger watermill.LoggerAdapter) (*Publisher, error)
func (*Publisher) Close ¶
Close closes the publisher, which means that all the Publish calls called before are finished and no more Publish calls are accepted. Close is blocking until all the ongoing Publish calls have returned.
func (*Publisher) Publish ¶
Publish inserts the messages as rows into the MessagesTable. Order is guaranteed for messages within one call. Publish is blocking until all rows have been added to the Publisher's transaction. Publisher doesn't guarantee publishing messages in a single transaction, but the constructor accepts both *sql.DB and *sql.Tx, so transactions may be handled upstream by the user.
type PublisherConfig ¶
type PublisherConfig struct { // SchemaAdapter provides the schema-dependent queries and arguments for them, based on topic/message etc. SchemaAdapter SchemaAdapter // AutoInitializeSchema enables initialization of schema database during publish. // Schema is initialized once per topic per publisher instance. // AutoInitializeSchema is forbidden if using an ongoing transaction as database handle; // That could result in an implicit commit of the transaction by a CREATE TABLE statement. AutoInitializeSchema bool }
type SchemaAdapter ¶
type SchemaAdapter interface { // InsertQuery returns the SQL query and arguments that will insert the Watermill message into the SQL storage. InsertQuery(params InsertQueryParams) (Query, error) // SelectQuery returns the SQL query and arguments // that returns the next unread message for a given consumer group. SelectQuery(params SelectQueryParams) (Query, error) // UnmarshalMessage transforms the Row obtained SelectQuery a Watermill message. // It also returns the offset of the last read message, for the purpose of acking. UnmarshalMessage(params UnmarshalMessageParams) (Row, error) // SchemaInitializingQueries returns SQL queries which will make sure (CREATE IF NOT EXISTS) // that the appropriate tables exist to write messages to the given topic. SchemaInitializingQueries(params SchemaInitializingQueriesParams) ([]Query, error) // SubscribeIsolationLevel returns the isolation level that will be used when subscribing. SubscribeIsolationLevel() sql.IsolationLevel }
SchemaAdapter produces the SQL queries and arguments appropriately for a specific schema and dialect It also transforms sql.Rows into Watermill messages.
type SchemaInitializingQueriesParams ¶
type SchemaInitializingQueriesParams struct {
Topic string
}
type SelectQueryParams ¶
type SelectQueryParams struct { Topic string ConsumerGroup string OffsetsAdapter OffsetsAdapter }
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber makes SELECT queries on the chosen table with the interval defined in the config. The rows are unmarshaled into Watermill messages.
func NewSubscriber ¶
func NewSubscriber(db Beginner, config SubscriberConfig, logger watermill.LoggerAdapter) (*Subscriber, error)
func (*Subscriber) Close ¶
func (s *Subscriber) Close() error
func (*Subscriber) SubscribeInitialize ¶
func (s *Subscriber) SubscribeInitialize(topic string) error
type SubscriberConfig ¶
type SubscriberConfig struct { ConsumerGroup string // AckDeadline is the time to wait for acking a message. // If message is not acked within this time, it will be nacked and re-delivered. // // When messages are read in bulk, this time is calculated for each message separately. // // If you want to disable ack deadline, set it to 0. // Warning: when ack deadline is disabled, messages which are not acked may block PostgreSQL subscriber from reading new messages // due to not increasing `pg_snapshot_xmin(pg_current_snapshot())` value. // // Must be non-negative. Nil value defaults to 30s. AckDeadline *time.Duration // PollInterval is the interval to wait between subsequent SELECT queries, if no more messages were found in the database (Prefer using the BackoffManager instead). // Must be non-negative. Defaults to 1s. PollInterval time.Duration // ResendInterval is the time to wait before resending a nacked message. // Must be non-negative. Defaults to 1s. ResendInterval time.Duration // RetryInterval is the time to wait before resuming querying for messages after an error (Prefer using the BackoffManager instead). // Must be non-negative. Defaults to 1s. RetryInterval time.Duration // BackoffManager defines how much to backoff when receiving errors. BackoffManager BackoffManager // SchemaAdapter provides the schema-dependent queries and arguments for them, based on topic/message etc. SchemaAdapter SchemaAdapter // OffsetsAdapter provides mechanism for saving acks and offsets of consumers. OffsetsAdapter OffsetsAdapter // InitializeSchema option enables initializing schema on making subscription. InitializeSchema bool }
type UnmarshalMessageParams ¶
type UnmarshalMessageParams struct {
Row Scanner
}
Source Files ¶
- backoff_manager.go
- context.go
- delayed_postgresql.go
- delayed_requeuer.go
- offsets_adapter.go
- offsets_adapter_mysql.go
- offsets_adapter_postgresql.go
- publisher.go
- queue_offsets_adapter_postgresql.go
- queue_schema_adapter_postgresql.go
- schema.go
- schema_adapter.go
- schema_adapter_mysql.go
- schema_adapter_postgresql.go
- sql.go
- subscriber.go
- topic.go
- tx.go