Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConfigKafka ¶
type ConfigKafka struct { Zookeepers string `required:"true" desc:"Zookeeper nodes for offset storage"` // CFMR_KAFKA_ZOOKEEPERS Topics []string `required:"true" desc:"Topics to read events from"` // CFMR_KAFKA_TOPICS ConsumerGroup string `required:"true" desc:"Name of the Kafka consumer group"` // CFMR_KAFKA_CONSUMERGROUP ProcessingTimeout time.Duration `default:"1m" desc:"Time to wait for all the offsets for a partition to be processed after stopping to consume from it"` // CFMR_KAFKA_PROCESSINGTIMEOUT OffsetNewest bool `default:"false" desc:"If true start from the newest message in Kafka in case the offset in zookeeper does not exist"` // CFMR_KAFKA_OFFSETNEWEST }
type KafkaConsumer ¶
type KafkaConsumer struct { // FIXME: these should be private CG *consumergroup.ConsumerGroup Offsets map[string]map[int32]int64 }
func NewKafkaConsumer ¶
func NewKafkaConsumer(i *ConfigKafka) (*KafkaConsumer, error)
Create Kafka consumer group
func (*KafkaConsumer) Process ¶
func (c *KafkaConsumer) Process(message *sarama.ConsumerMessage) (*transformer.Envelope, error)
func (*KafkaConsumer) Read ¶
func (c *KafkaConsumer) Read() (*transformer.Envelope, error)
Read message from Kafka
type Reader ¶
type Reader interface {
Read() (*transformer.Envelope, error)
}
Click to show internal directories.
Click to hide internal directories.