sql

package module
v0.0.0-...-3fffaf9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 20, 2024 License: MIT Imports: 7 Imported by: 0

README

watermill-sqlite

WIP

Nothing to see here

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DefaultSQLiteOffsetsAdapter

type DefaultSQLiteOffsetsAdapter struct {
	// GenerateMessagesOffsetsTableName may be used to override how the messages/offsets table name is generated.
	GenerateMessagesOffsetsTableName func(topic string) string
}

DefaultSQLiteOffsetsAdapter is adapter for storing offsets for SQLite databases.

DefaultSQLiteOffsetsAdapter is designed to support multiple subscribers with exactly once delivery and guaranteed order.

We aren't using FOR UPDATE in NextOffsetQuery because SQLite already lock the entire database. Use _txlock=immediate.

When another consumer is trying to consume the same message, deadlock should occur in ConsumedMessageQuery. After deadlock, consumer will consume next message.

func (DefaultSQLiteOffsetsAdapter) AckMessageQuery

func (a DefaultSQLiteOffsetsAdapter) AckMessageQuery(topic string, row sql.Row, consumerGroup string) sql.Query

func (DefaultSQLiteOffsetsAdapter) BeforeSubscribingQueries

func (a DefaultSQLiteOffsetsAdapter) BeforeSubscribingQueries(topic, consumerGroup string) []sql.Query

func (DefaultSQLiteOffsetsAdapter) ConsumedMessageQuery

func (a DefaultSQLiteOffsetsAdapter) ConsumedMessageQuery(topic string, row sql.Row, consumerGroup string, consumerULID []byte) sql.Query

func (DefaultSQLiteOffsetsAdapter) MessagesOffsetsTable

func (a DefaultSQLiteOffsetsAdapter) MessagesOffsetsTable(topic string) string

func (DefaultSQLiteOffsetsAdapter) NextOffsetQuery

func (a DefaultSQLiteOffsetsAdapter) NextOffsetQuery(topic, consumerGroup string) sql.Query

func (DefaultSQLiteOffsetsAdapter) SchemaInitializingQueries

func (a DefaultSQLiteOffsetsAdapter) SchemaInitializingQueries(topic string) []sql.Query

type DefaultSQLiteSchema

type DefaultSQLiteSchema struct {
	// GenerateMessagesTableName may be used to override how the messages table name is generated.
	GenerateMessagesTableName func(topic string) string

	// SubscribeBatchSize is the number of messages to be queried at once.
	//
	// Higher value, increases a chance of message re-delivery in case of crash or networking issues.
	// 1 is the safest value, but it may have a negative impact on performance when consuming a lot of messages.
	//
	// Default value is 100.
	SubscribeBatchSize int
}

DefaultSQLiteSchema is a default implementation of SchemaAdapter based on SQLite.

func (DefaultSQLiteSchema) InsertQuery

func (s DefaultSQLiteSchema) InsertQuery(topic string, msgs message.Messages) (sql.Query, error)

func (DefaultSQLiteSchema) MessagesTable

func (s DefaultSQLiteSchema) MessagesTable(topic string) string

func (DefaultSQLiteSchema) SchemaInitializingQueries

func (s DefaultSQLiteSchema) SchemaInitializingQueries(topic string) []sql.Query

func (DefaultSQLiteSchema) SelectQuery

func (s DefaultSQLiteSchema) SelectQuery(topic string, consumerGroup string, offsetsAdapter sql.OffsetsAdapter) sql.Query

func (DefaultSQLiteSchema) SubscribeIsolationLevel

func (s DefaultSQLiteSchema) SubscribeIsolationLevel() stdSql.IsolationLevel

func (DefaultSQLiteSchema) UnmarshalMessage

func (s DefaultSQLiteSchema) UnmarshalMessage(row sql.Scanner) (sql.Row, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL