Documentation ¶
Overview ¶
Package kafka https://github.com/Shopify/sarama/blob/master/examples/sasl_scram_client/scram_client.go
Index ¶
- Variables
- type Publisher
- type PublisherConfig
- type Subscriber
- func (s *Subscriber) Cleanup(sarama.ConsumerGroupSession) error
- func (s *Subscriber) Connect() error
- func (s *Subscriber) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (s *Subscriber) Disconnect() error
- func (s *Subscriber) Setup(sarama.ConsumerGroupSession) error
- func (s *Subscriber) Sub(topic string, h bus.EventHandler) error
- type SubscriberConfig
- type XDGSCRAMClient
Constants ¶
This section is empty.
Variables ¶
View Source
var ( // SHA256 todo SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() } // SHA512 todo SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() } )
View Source
var ( // ErrNoBroker todo ErrNoBroker = errors.New("connect kafka error, no broker here") )
Functions ¶
This section is empty.
Types ¶
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher kafka broker
func NewPublisher ¶
func NewPublisher(conf *PublisherConfig) (*Publisher, error)
NewPublisher kafka broker
type PublisherConfig ¶
type PublisherConfig struct { BulkMaxSize int `json:"bulk_max_size" yaml:"bulk_max_size" toml:"bulk_max_size" env:"BUS_KAFKA_PUBLISHER_BULK_MAX_SIZE"` PublishTimeout time.Duration `json:"publish_timeout" yaml:"publish_timeout" toml:"publish_timeout" env:"BUS_KAFKA_PUBLISHER_TIMEOUT"` BulkFlushFrequency time.Duration `` /* 130-byte string literal not displayed */ Partitioner string `json:"partitioner" yaml:"partitioner" toml:"partitioner" env:"BUS_KAFKA_PUBLISHER_PARTITIONER"` Compression string `json:"compression" yaml:"compression" toml:"compression" env:"BUS_KAFKA_PUBLISHER_COMPRESSION"` CompressionLevel int `json:"compression_level" yaml:"compression_level" toml:"compression_level" env:"BUS_KAFKA_PUBLISHER_COMPRESSION_LEVEL"` MaxRetries int `json:"max_retries" yaml:"max_retries" toml:"max_retries" env:"BUS_KAFKA_PUBLISHER_MAX_RETRIES"` MaxMessageBytes *int `json:"max_message_bytes" yaml:"max_message_bytes" toml:"max_message_bytes" env:"BUS_KAFKA_PUBLISHER_MAX_MESSAGE_BYTES"` RequiredACKs *int `json:"required_acks" yaml:"required_acks" toml:"required_acks" env:"BUS_KAFKA_PUBLISHER_REQUIRED_ACKS"` // contains filtered or unexported fields }
PublisherConfig todo
func DefaultPublisherConfig ¶
func DefaultPublisherConfig() *PublisherConfig
DefaultPublisherConfig 默认配置
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber kafka broker
func NewSubscriber ¶
func NewSubscriber(conf *SubscriberConfig) (*Subscriber, error)
NewSubscriber kafka broker
func (*Subscriber) Cleanup ¶
func (s *Subscriber) Cleanup(sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*Subscriber) ConsumeClaim ¶
func (s *Subscriber) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (*Subscriber) Setup ¶
func (s *Subscriber) Setup(sarama.ConsumerGroupSession) error
Setup is run at the beginning of a new session, before ConsumeClaim
type SubscriberConfig ¶
type SubscriberConfig struct { GroupID string `json:"group_id" yaml:"group_id" toml:"group_id" env:"BUS_KAFKA_SUBSCRIBER_GROUP_ID"` Offset string `json:"offset" yaml:"offset" toml:"offset" env:"BUS_KAFKA_SUBSCRIBER_OFFSET"` BalanceStrategy string `json:"balance_strategy" yaml:"balance_strategy" toml:"balance_strategy" env:"BUS_KAFKA_SUBSCRIBER_BALANCE_STRATEGY"` // contains filtered or unexported fields }
SubscriberConfig todo
func DefaultSubscriberConfig ¶
func DefaultSubscriberConfig() *SubscriberConfig
DefaultSubscriberConfig 默认配置
type XDGSCRAMClient ¶
type XDGSCRAMClient struct { *scram.Client *scram.ClientConversation scram.HashGeneratorFcn }
XDGSCRAMClient todo
func (*XDGSCRAMClient) Begin ¶
func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)
Begin todo
Click to show internal directories.
Click to hide internal directories.