Documentation
¶
Index ¶
- type DefaultSQLiteOffsetsAdapter
- func (a DefaultSQLiteOffsetsAdapter) AckMessageQuery(topic string, row sql.Row, consumerGroup string) sql.Query
- func (a DefaultSQLiteOffsetsAdapter) BeforeSubscribingQueries(topic, consumerGroup string) []sql.Query
- func (a DefaultSQLiteOffsetsAdapter) ConsumedMessageQuery(topic string, row sql.Row, consumerGroup string, consumerULID []byte) sql.Query
- func (a DefaultSQLiteOffsetsAdapter) MessagesOffsetsTable(topic string) string
- func (a DefaultSQLiteOffsetsAdapter) NextOffsetQuery(topic, consumerGroup string) sql.Query
- func (a DefaultSQLiteOffsetsAdapter) SchemaInitializingQueries(topic string) []sql.Query
- type DefaultSQLiteSchema
- func (s DefaultSQLiteSchema) InsertQuery(topic string, msgs message.Messages) (sql.Query, error)
- func (s DefaultSQLiteSchema) MessagesTable(topic string) string
- func (s DefaultSQLiteSchema) SchemaInitializingQueries(topic string) []sql.Query
- func (s DefaultSQLiteSchema) SelectQuery(topic string, consumerGroup string, offsetsAdapter sql.OffsetsAdapter) sql.Query
- func (s DefaultSQLiteSchema) SubscribeIsolationLevel() stdSql.IsolationLevel
- func (s DefaultSQLiteSchema) UnmarshalMessage(row sql.Scanner) (sql.Row, error)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DefaultSQLiteOffsetsAdapter ¶
type DefaultSQLiteOffsetsAdapter struct { // GenerateMessagesOffsetsTableName may be used to override how the messages/offsets table name is generated. GenerateMessagesOffsetsTableName func(topic string) string }
DefaultSQLiteOffsetsAdapter is adapter for storing offsets for SQLite databases.
DefaultSQLiteOffsetsAdapter is designed to support multiple subscribers with exactly once delivery and guaranteed order.
We aren't using FOR UPDATE in NextOffsetQuery because SQLite already lock the entire database. Use _txlock=immediate.
When another consumer is trying to consume the same message, deadlock should occur in ConsumedMessageQuery. After deadlock, consumer will consume next message.
func (DefaultSQLiteOffsetsAdapter) AckMessageQuery ¶
func (DefaultSQLiteOffsetsAdapter) BeforeSubscribingQueries ¶
func (a DefaultSQLiteOffsetsAdapter) BeforeSubscribingQueries(topic, consumerGroup string) []sql.Query
func (DefaultSQLiteOffsetsAdapter) ConsumedMessageQuery ¶
func (DefaultSQLiteOffsetsAdapter) MessagesOffsetsTable ¶
func (a DefaultSQLiteOffsetsAdapter) MessagesOffsetsTable(topic string) string
func (DefaultSQLiteOffsetsAdapter) NextOffsetQuery ¶
func (a DefaultSQLiteOffsetsAdapter) NextOffsetQuery(topic, consumerGroup string) sql.Query
func (DefaultSQLiteOffsetsAdapter) SchemaInitializingQueries ¶
func (a DefaultSQLiteOffsetsAdapter) SchemaInitializingQueries(topic string) []sql.Query
type DefaultSQLiteSchema ¶
type DefaultSQLiteSchema struct { // GenerateMessagesTableName may be used to override how the messages table name is generated. GenerateMessagesTableName func(topic string) string // SubscribeBatchSize is the number of messages to be queried at once. // // Higher value, increases a chance of message re-delivery in case of crash or networking issues. // 1 is the safest value, but it may have a negative impact on performance when consuming a lot of messages. // // Default value is 100. SubscribeBatchSize int }
DefaultSQLiteSchema is a default implementation of SchemaAdapter based on SQLite.
func (DefaultSQLiteSchema) InsertQuery ¶
func (DefaultSQLiteSchema) MessagesTable ¶
func (s DefaultSQLiteSchema) MessagesTable(topic string) string
func (DefaultSQLiteSchema) SchemaInitializingQueries ¶
func (s DefaultSQLiteSchema) SchemaInitializingQueries(topic string) []sql.Query
func (DefaultSQLiteSchema) SelectQuery ¶
func (s DefaultSQLiteSchema) SelectQuery(topic string, consumerGroup string, offsetsAdapter sql.OffsetsAdapter) sql.Query
func (DefaultSQLiteSchema) SubscribeIsolationLevel ¶
func (s DefaultSQLiteSchema) SubscribeIsolationLevel() stdSql.IsolationLevel
func (DefaultSQLiteSchema) UnmarshalMessage ¶
Click to show internal directories.
Click to hide internal directories.