Documentation ¶
Index ¶
- type MemoryDatabase
- type Naffka
- func (n *Naffka) Close() error
- func (n *Naffka) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error)
- func (n *Naffka) HighWaterMarks() map[string]map[int32]int64
- func (n *Naffka) Partitions(_ string) ([]int32, error)
- func (n *Naffka) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error)
- func (n *Naffka) SendMessages(msgs []*sarama.ProducerMessage) error
- func (n *Naffka) Topics() ([]string, error)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type MemoryDatabase ¶
type MemoryDatabase struct {
// contains filtered or unexported fields
}
A MemoryDatabase stores the message history as arrays in memory. It can be used to run unit tests. If the process is stopped then any messages that haven't been processed by a consumer are lost forever and all offsets become invalid.
func (*MemoryDatabase) FetchMessages ¶
func (m *MemoryDatabase) FetchMessages(topic string, startOffset, endOffset int64) ([]types.Message, error)
FetchMessages implements Database
func (*MemoryDatabase) MaxOffsets ¶
func (m *MemoryDatabase) MaxOffsets() (map[string]int64, error)
MaxOffsets implements Database
func (*MemoryDatabase) StoreMessages ¶
func (m *MemoryDatabase) StoreMessages(topic string, messages []types.Message) error
StoreMessages implements Database
type Naffka ¶
type Naffka struct {
// contains filtered or unexported fields
}
Naffka is an implementation of the sarama kafka API designed to run within a single go process. It implements both the sarama.SyncProducer and the sarama.Consumer interfaces. This means it can act as a drop in replacement for kafka for testing or single instance deployment. Does not support multiple partitions.
func (*Naffka) ConsumePartition ¶
func (n *Naffka) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error)
ConsumePartition implements sarama.Consumer Note: offset is *inclusive*, i.e. it will include the message with that offset.
func (*Naffka) HighWaterMarks ¶
HighWaterMarks implements sarama.Consumer
func (*Naffka) Partitions ¶
Partitions implements sarama.Consumer
func (*Naffka) SendMessage ¶
func (n *Naffka) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error)
SendMessage implements sarama.SyncProducer
func (*Naffka) SendMessages ¶
func (n *Naffka) SendMessages(msgs []*sarama.ProducerMessage) error
SendMessages implements sarama.SyncProducer