Documentation ¶
Index ¶
- func NewSyncProducer(addrs []string, opts ...ConfigOption) (sarama.SyncProducer, error)
- func NewSyncProducerFromConfig(config config.Kafka, opts ...ConfigOption) (sarama.SyncProducer, error)
- type ConfigOption
- func SetConsumerFetchMax(n int32) ConfigOption
- func SetConsumerGroupBalanceStrategy(strategy ...sarama.BalanceStrategy) ConfigOption
- func SetConsumerOffsetInitial(offset int64) ConfigOption
- func SetNetSASL(user, password string) ConfigOption
- func SetProducerPartitioner(partitioner sarama.PartitionerConstructor) ConfigOption
- func SetVersion(version sarama.KafkaVersion) ConfigOption
- type ConsumerGroup
- func (r *ConsumerGroup) Cleanup(sess sarama.ConsumerGroupSession) error
- func (r *ConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (r *ConsumerGroup) Run(ctx context.Context, handler func(message *sarama.ConsumerMessage) error) error
- func (r *ConsumerGroup) Setup(sess sarama.ConsumerGroupSession) error
- type ConsumerGroupOption
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewSyncProducer ¶
func NewSyncProducer(addrs []string, opts ...ConfigOption) (sarama.SyncProducer, error)
func NewSyncProducerFromConfig ¶
func NewSyncProducerFromConfig(config config.Kafka, opts ...ConfigOption) (sarama.SyncProducer, error)
Types ¶
type ConfigOption ¶
func SetConsumerFetchMax ¶
func SetConsumerFetchMax(n int32) ConfigOption
func SetConsumerGroupBalanceStrategy ¶
func SetConsumerGroupBalanceStrategy(strategy ...sarama.BalanceStrategy) ConfigOption
func SetConsumerOffsetInitial ¶
func SetConsumerOffsetInitial(offset int64) ConfigOption
func SetNetSASL ¶
func SetNetSASL(user, password string) ConfigOption
func SetProducerPartitioner ¶
func SetProducerPartitioner(partitioner sarama.PartitionerConstructor) ConfigOption
func SetVersion ¶
func SetVersion(version sarama.KafkaVersion) ConfigOption
type ConsumerGroup ¶
type ConsumerGroup struct {
// contains filtered or unexported fields
}
func NewConsumerGroup ¶
func NewConsumerGroup(brokers []string, group string, topics []string, opts ...ConsumerGroupOption) *ConsumerGroup
func NewConsumerGroupFromConfig ¶
func NewConsumerGroupFromConfig(config config.Kafka, consumerGroup config.KafkaConsumerGroup, opts ...ConfigOption) *ConsumerGroup
func (*ConsumerGroup) Cleanup ¶
func (r *ConsumerGroup) Cleanup(sess sarama.ConsumerGroupSession) error
func (*ConsumerGroup) ConsumeClaim ¶
func (r *ConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
func (*ConsumerGroup) Run ¶
func (r *ConsumerGroup) Run(ctx context.Context, handler func(message *sarama.ConsumerMessage) error) error
func (*ConsumerGroup) Setup ¶
func (r *ConsumerGroup) Setup(sess sarama.ConsumerGroupSession) error
type ConsumerGroupOption ¶
type ConsumerGroupOption func(cg *ConsumerGroup)
func SetConfigOptions ¶
func SetConfigOptions(opts ...ConfigOption) ConsumerGroupOption
func SetHandlerCleanup ¶
func SetHandlerCleanup(h func(s sarama.ConsumerGroupSession) error) ConsumerGroupOption
func SetHandlerSetup ¶
func SetHandlerSetup(h func(s sarama.ConsumerGroupSession) error) ConsumerGroupOption
func SetLogger ¶
func SetLogger(logger log.Logger) ConsumerGroupOption
Click to show internal directories.
Click to hide internal directories.