Documentation ¶
Index ¶
- Variables
- func NewSyncProducer(cfg *ProducerConfig) (sarama.SyncProducer, error)
- type AsyncProducer
- type ConfluentConsumer
- type ConfluentDo
- type Consumer
- func (s *Consumer) Cleanup(sarama.ConsumerGroupSession) error
- func (s *Consumer) Close() error
- func (s *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (s *Consumer) Setup(sarama.ConsumerGroupSession) error
- func (s *Consumer) Start(do Do) error
- type ConsumerConfig
- type Do
- type Log
- type MessageOption
- type Producer
- type ProducerConfig
- type XDGSCRAMClient
Constants ¶
This section is empty.
Variables ¶
View Source
var ( SHA256 scram.HashGeneratorFcn = sha256.New SHA512 scram.HashGeneratorFcn = sha512.New )
Functions ¶
func NewSyncProducer ¶
func NewSyncProducer(cfg *ProducerConfig) (sarama.SyncProducer, error)
Types ¶
type AsyncProducer ¶
type AsyncProducer struct {
// contains filtered or unexported fields
}
func NewAsyncProducer ¶
func NewAsyncProducer(cfg *ProducerConfig) (*AsyncProducer, error)
func (*AsyncProducer) AsyncMessage ¶
func (p *AsyncProducer) AsyncMessage(topic string, val sarama.Encoder, opts ...MessageOption)
func (*AsyncProducer) Close ¶
func (p *AsyncProducer) Close()
type ConfluentConsumer ¶
type ConfluentConsumer struct {
// contains filtered or unexported fields
}
func NewConfluentConsumer ¶
func NewConfluentConsumer(cfg *ConsumerConfig) (*ConfluentConsumer, error)
func (*ConfluentConsumer) Close ¶
func (s *ConfluentConsumer) Close() error
func (*ConfluentConsumer) Start ¶
func (s *ConfluentConsumer) Start(do ConfluentDo)
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func NewConsumer ¶
func NewConsumer(cfg *ConsumerConfig) (*Consumer, error)
func (*Consumer) ConsumeClaim ¶
func (s *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
type ConsumerConfig ¶
type ConsumerConfig struct { Brokers []string `json:"brokers" yaml:"brokers"` Topics []string `json:"topics" yaml:"topics"` GroupId string `json:"group_id" yaml:"group_id"` Username string `json:"username" yaml:"username"` Password string `json:"password" yaml:"password"` OffsetOldest bool `json:"offset_oldest" yaml:"offset_oldest"` EnableSASL bool `json:"enable_sasl" yaml:"enable_sasl"` EnableTLS bool `json:"enable_tls" yaml:"enable_tls"` Version string `json:"version" yaml:"version"` AutoCommit bool `json:"auto_commit" yaml:"auto_commit"` MaxProcessingTime time.Duration `json:"max_processing_time" yaml:"max_processing_time"` }
type Do ¶
type Do func(session sarama.ConsumerGroupSession, message *sarama.ConsumerMessage)
type MessageOption ¶
type MessageOption func(o *messageOption)
func WithMessageHeaders ¶
func WithMessageHeaders(in []sarama.RecordHeader) MessageOption
func WithMessageKey ¶
func WithMessageKey(in sarama.Encoder) MessageOption
func WithMessageMetadata ¶
func WithMessageMetadata(in interface{}) MessageOption
func WithMessageOffset ¶
func WithMessageOffset(in int64) MessageOption
func WithMessagePartition ¶
func WithMessagePartition(in int32) MessageOption
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
func NewProducer ¶
func NewProducer(cfg *ProducerConfig) (*Producer, error)
func (*Producer) AsyncMessage ¶
func (p *Producer) AsyncMessage(topic string, value sarama.Encoder, opts ...MessageOption)
func (*Producer) Borrow ¶
func (p *Producer) Borrow() (producer sarama.AsyncProducer)
func (*Producer) Release ¶
func (p *Producer) Release(producer sarama.AsyncProducer)
type ProducerConfig ¶
type ProducerConfig struct { Brokers []string `json:"brokers" yaml:"brokers"` Username string `json:"username" yaml:"username"` Password string `json:"password" yaml:"password"` EnableSASL bool `json:"enable_sasl" yaml:"enableSASL"` EnableTLS bool `json:"enable_tls" yaml:"enableTLS"` Version string `json:"version" yaml:"version"` Topic string `json:"topic" yaml:"topic"` Algorithm string `json:"algorithm"` RequiredAcks int `json:"required_acks"` }
type XDGSCRAMClient ¶
type XDGSCRAMClient struct { *scram.Client *scram.ClientConversation scram.HashGeneratorFcn }
func (*XDGSCRAMClient) Begin ¶
func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)
func (*XDGSCRAMClient) Done ¶
func (x *XDGSCRAMClient) Done() bool
Click to show internal directories.
Click to hide internal directories.