Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type KafkaConf ¶
type KafkaConf struct { ConsumerGroupName string `json:"name"` ZkPath string `json:"zk_path"` Topic string `json:"topic"` ForceRestart bool `json:"force_restart"` ReadNewest bool `json:"read_newest"` KafkaVersion int `json:"kafka_version_major"` SASLEnabled bool `json:"sasl_enabled"` SASLUsername string `json:"username"` SASLPasswordKey string `json:"passwordKey"` }
KafkaConf holds configuration options for KafkaSource
type KafkaMsg ¶
type KafkaMsg interface { MarkDone() GetRawMsg() *sarama.ConsumerMessage IsProcessed() bool }
type KafkaMsgFactory ¶
type KafkaMsgFactory interface { //Create call to wrap consumer message inside KafkaMsg Create(msg *sarama.ConsumerMessage) KafkaMsg }
type KafkaOffsetTracker ¶
type KafkaOffsetTracker struct {
// contains filtered or unexported fields
}
KafkaOffsetTracker is implementation of OffsetTracker to track offsets for KafkaSource, KafkaMessage
func (*KafkaOffsetTracker) TrackMe ¶
func (k *KafkaOffsetTracker) TrackMe(kmsg KafkaMsg)
TrackMe method ensures messages to track are enqued for tracking
type KafkaSource ¶
type KafkaSource struct {
// contains filtered or unexported fields
}
KafkaSource is Source implementation which reads from Kafka. This implementation uses sarama lib and wvanbergen implementation of HA Kafka Consumer using zookeeper
func GetKafkaSource ¶
func GetKafkaSource(conf KafkaConf, factory KafkaMsgFactory) *KafkaSource
GetKafkaSource method is used to get instance of KafkaSource.
func (*KafkaSource) CommitOffsets ¶
func (k *KafkaSource) CommitOffsets(data KafkaMsg) error
CommitOffsets enables cliento explicity commit the Offset that is processed.
func (*KafkaSource) Generate ¶
func (k *KafkaSource) Generate(out chan<- interface{})
Generate is Source method implementation, which connect to Kafka and pushes KafkaMessage into the channel
func (*KafkaSource) RegisterHook ¶
func (k *KafkaSource) RegisterHook(hook KafkaSourceHook)
RegisterHook used to registerHook with KafkSource
func (*KafkaSource) Stop ¶
func (k *KafkaSource) Stop()
Stop method implements Source interface stop method, to Stop the KafkaConsumer
type KafkaSourceHook ¶
type KafkaSourceHook interface { //Pre called before passing the message to DMux Pre(k KafkaMsg) }
KafkaSourceHook to track messages coming out of the source in order
type OffsetTracker ¶
type OffsetTracker interface {
TrackMe(kmsg KafkaMsg)
}
OffsetTracker is interface which defines methods to track Messages which have been queued for processing
func GetKafkaOffsetTracker ¶
func GetKafkaOffsetTracker(size int, source *KafkaSource) OffsetTracker
GetKafkaOffsetTracker is Global function to get instance of KafkaOffsetTracker