Documentation ¶
Index ¶
- func NewOffsetConsumer(messageHandler MessageHandler, client sarama.Client, topic string, db *bolt.DB, ...) consumer.Consumer
- func NewOffsetMessageHandler(db *bolt.DB, offsetBucketName []byte, messageHandler MessageHandler) consumer.MessageHandler
- type MessageHandler
- type MessageHandlerFunc
- type Offset
- type OffsetRegistry
- type Partition
- type Storage
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewOffsetConsumer ¶
func NewOffsetConsumer( messageHandler MessageHandler, client sarama.Client, topic string, db *bolt.DB, offsetBucketName []byte, ) consumer.Consumer
NewOffsetConsumer return an consumer with offset tracking.
func NewOffsetMessageHandler ¶
func NewOffsetMessageHandler( db *bolt.DB, offsetBucketName []byte, messageHandler MessageHandler, ) consumer.MessageHandler
Types ¶
type MessageHandler ¶
type MessageHandler interface { // HandleMessage with a open Bolt transaction. ConsumeMessage(ctx context.Context, tx *bolt.Tx, msg *sarama.ConsumerMessage) error }
MessageHandler handles Kafka messages with an open database connection.
type MessageHandlerFunc ¶
MessageHandlerFunc allow use a function as MessageHandler.
func (MessageHandlerFunc) ConsumeMessage ¶
func (m MessageHandlerFunc) ConsumeMessage(ctx context.Context, tx *bolt.Tx, msg *sarama.ConsumerMessage) error
ConsumeMessage forward to the function.
type Offset ¶
type Offset int64
Offset in the Kafka topic.
func OffsetFromBytes ¶
OffsetFromBytes returns the offset for the given bytes.
type OffsetRegistry ¶
type OffsetRegistry interface { Get(partition int32) (int64, error) Set(partition int32, offset int64) error }
OffsetRegistry save and load the current offset from Bolt.
func NewOffsetRegistry ¶
func NewOffsetRegistry( tx *bolt.Tx, bucketName []byte, ) OffsetRegistry
type Partition ¶
type Partition int32
Partition in Kafka.
func PartitionFromBytes ¶
PartitionFromBytes returns the partition for the given bytes.
type Storage ¶
type Storage interface { // Get value for the given key. Get(ctx context.Context, key []byte) ([]byte, error) // Set a value for the given key. Set(ctx context.Context, key []byte, value []byte) error // Read fill Bolt db until context is canceled. Read(ctx context.Context) error }
Storage saves all write to a Kafka topic and caches reads in a local Bolt database.
func NewStorage ¶
Click to show internal directories.
Click to hide internal directories.