Documentation ¶
Index ¶
- Variables
- type KafkaAuthentication
- type KafkaConfiguration
- type KafkaIngestor
- func (k *KafkaIngestor) Cleanup(sarama.ConsumerGroupSession) error
- func (k *KafkaIngestor) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (k *KafkaIngestor) GetEvents() error
- func (k *KafkaIngestor) Initialize() error
- func (k *KafkaIngestor) Setup(sarama.ConsumerGroupSession) error
- func (k *KafkaIngestor) Shutdown(failure bool) error
Constants ¶
This section is empty.
Variables ¶
View Source
var ( // MaxNumberOfWorkers is the max number of concurrent goroutines for uploading data MaxNumberOfWorkers = runtime.NumCPU() // FlushIntervalInSec is the amount of time before executing the Flush operation in case the buffer is not full FlushIntervalInSec = 10 )
Functions ¶
This section is empty.
Types ¶
type KafkaAuthentication ¶
type KafkaConfiguration ¶
type KafkaConfiguration struct { // Brokers are the Kafka bootstrap brokers to connect to, as a comma separated list Brokers string `json:"brokers"` // Topic is the Kafka topic to be consumed Topic string `json:"topic"` // ConsumerGroup is the Kafka consumer group definition ConsumerGroup string `json:"consumerGroup"` // Kafka cluster version Version string `json:"version,omitempty"` // Assignor is the Consumer group partition assignment strategy (range, roundrobin, sticky) Assignor string `json:"assignor,omitempty"` // InitialOffset is the Kafka consumer consume initial offset from oldest InitialOffset string `json:"offset,omitempty"` Authentication *KafkaAuthentication `json:"authentication,omitempty"` }
type KafkaIngestor ¶
type KafkaIngestor struct { // below are used to build the Kafka client Consumer sarama.ConsumerGroup Topic string Brokers []string // contains filtered or unexported fields }
func NewIngestor ¶
func NewIngestor(kcfg *KafkaConfiguration, manager *manager.TableManager) (*KafkaIngestor, error)
func (*KafkaIngestor) Cleanup ¶
func (k *KafkaIngestor) Cleanup(sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*KafkaIngestor) ConsumeClaim ¶
func (k *KafkaIngestor) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (*KafkaIngestor) GetEvents ¶
func (k *KafkaIngestor) GetEvents() error
GetEvents needs to be called from a goroutine
func (*KafkaIngestor) Initialize ¶
func (k *KafkaIngestor) Initialize() error
func (*KafkaIngestor) Setup ¶
func (k *KafkaIngestor) Setup(sarama.ConsumerGroupSession) error
Setup is run at the beginning of a new session, before ConsumeClaim
func (*KafkaIngestor) Shutdown ¶
func (k *KafkaIngestor) Shutdown(failure bool) error
Shutdown closes the producer object
Click to show internal directories.
Click to hide internal directories.