Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type KafkaConsumer ¶
type KafkaConsumer struct { sarama.ConsumerGroup // contains filtered or unexported fields }
func NewKafkaConsumer ¶
func NewKafkaConsumer(ctx context.Context, conf *KafkaConsumerConfig) *KafkaConsumer
func (*KafkaConsumer) Close ¶
func (kc *KafkaConsumer) Close() error
func (*KafkaConsumer) Consume ¶
func (kc *KafkaConsumer) Consume(ctx context.Context, handler sarama.ConsumerGroupHandler) error
type KafkaConsumerConfig ¶
type KafkaConsumerConfig struct { Addrs []string `long:"addrs" env:"ADDRS" env-delim:"," description:"the addresses of Kafka servers" required:"true"` Topic string `long:"topic" env:"TOPIC" description:"the topic for the Kafka consumer group to consume" required:"true"` Group string `long:"group" env:"GROUP" description:"the ID of the Kafka consumer group" required:"true"` }
type KafkaProducer ¶
type KafkaProducer struct { sarama.SyncProducer // contains filtered or unexported fields }
func NewKafkaProducer ¶
func NewKafkaProducer(ctx context.Context, conf *KafkaProducerConfig) *KafkaProducer
func (*KafkaProducer) Close ¶
func (kp *KafkaProducer) Close() error
func (*KafkaProducer) SendMessages ¶
func (kp *KafkaProducer) SendMessages(msgs []*ProducerMessage) error
type KafkaProducerConfig ¶
type KafkaProducerConfig struct { Addrs []string `long:"addrs" env:"ADDRS" env-delim:"," description:"the addresses of Kafka servers" required:"true"` Topic string `long:"topic" env:"TOPIC" description:"the topic for the Kafka producer to send" required:"true"` RequiredAcks int16 `` /* 168-byte string literal not displayed */ }
type Producer ¶
type Producer interface {
SendMessages(msgs []*ProducerMessage) error
}
type ProducerMessage ¶
Click to show internal directories.
Click to hide internal directories.