Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Offset ¶
type Offset struct {
// contains filtered or unexported fields
}
Offset consumes given topic-partition starting at an offset provided.
func NewOffset ¶
func NewOffset(cfg *OffsetConfig) (*Offset, error)
NewOffset creates a new Offset that immediately starts consuming and whose messages are available on the Messages() channel.
func (*Offset) CommitOffset ¶
CommitOffset writes the provided offset to kafka.
func (*Offset) Consume ¶
func (oc *Offset) Consume() <-chan *sarama.ConsumerMessage
Consume returns a channel of Kafka messages on this topic-partition starting at the provided offset. This channel will close when there is a non-recoverable error, or the context provided at creation time closes.
func (*Offset) Err ¶
Err should be called after the Messages() channel closes to determine if there was an error during processing.
func (*Offset) HighWaterMarkOffset ¶
HighWaterMarkOffset returns the last reported highwatermark offset for the partition this consumer is reading.
type OffsetConfig ¶
type OffsetConfig struct { CacheDuration time.Duration Client sarama.Client Context context.Context Coordinator *cg.Coordinator Offset int64 Partition int32 Topic string }
OffsetConfig is all the instantiated dependencies needed to run an Offset.
type Seek ¶
type Seek struct {
// contains filtered or unexported fields
}
Seek consumes given topic-partition starting at an offset determined by the provided Seek function.
func NewSeek ¶
func NewSeek(cfg *SeekConfig) (*Seek, error)
NewSeek creates a new Seek that immediately starts consuming and whose messages are available on the Messages() channel.
func (*Seek) CommitOffset ¶
CommitOffset writes the provided offset to kafka.
func (*Seek) Consume ¶
func (sk *Seek) Consume() <-chan *sarama.ConsumerMessage
Consume returns a channel of Kafka messages on this topic-partition starting at the provided offset. This channel will close when there is a non-recoverable error, or the context provided at creation time closes.
func (*Seek) Err ¶
Err should be called after the Messages() channel closes to determine if there was an error during processing.
func (*Seek) HighWaterMarkOffset ¶
HighWaterMarkOffset returns the last reported highwatermark offset for the partition this consumer is reading.
type SeekConfig ¶
type SeekConfig struct { CacheDuration time.Duration Client sarama.Client Context context.Context Coordinator *cg.Coordinator Partition int32 SeekFn SeekFn Topic string }
SeekConfig is needed to create a new Seek.
type SeekFn ¶
SeekFn returns an offset for the Seek to begin reading from. Seek is provided a topic, partition. This can be used for an application that needs a history of messages for context before the application can begin reading at the last committed offset.
type StartPosition ¶
type StartPosition int
StartPosition is where the TimeWindow should start at to seek back the Window duration.
const ( // OffsetGroup starts at the committed offset, if no offset is commited or the offset is out of range // of what is readable, the consumer will error upon reading. OffsetGroup StartPosition = iota // OffsetNewest starts at the newest message in Kafka and then seeks back Window amount. OffsetNewest )
type TimeWindow ¶
type TimeWindow struct {
// contains filtered or unexported fields
}
TimeWindow is a consumer that finds the current offset in the group for the given partition-topic, discovers what time that message happened, and then rewinds to past offsets until the provided Window of time is acheived.
func NewTimeWindow ¶
func NewTimeWindow(cfg *TimeWindowConfig) (*TimeWindow, error)
NewTimeWindow creates a new consumer that is ready to begin reading.
func (*TimeWindow) CommitOffset ¶
func (twc *TimeWindow) CommitOffset(offset int64) error
CommitOffset writes the provided offset to kafka.
func (*TimeWindow) Consume ¶
func (twc *TimeWindow) Consume() <-chan *sarama.ConsumerMessage
Consume returns a channel of Kafka messages on this topic-partition starting at the provided offset. This channel will close when there is a non-recoverable error, or the context provided at creation time closes.
func (*TimeWindow) Err ¶
func (twc *TimeWindow) Err() error
Err should be called after the Messages() channel closes to determine if there was an error during processing.
func (*TimeWindow) HighWaterMarkOffset ¶
func (twc *TimeWindow) HighWaterMarkOffset() int64
HighWaterMarkOffset returns the last reported highwatermark offset for the partition this consumer is reading.