Documentation
¶
Index ¶
- Constants
- func NewConsumerGroup(handler func(*sarama.ConsumerMessage)) sarama.ConsumerGroupHandler
- type AsyncProducer
- type AsyncProducerClient
- func (p *AsyncProducerClient) CloseProducer()
- func (p *AsyncProducerClient) IsRunning() bool
- func (p *AsyncProducerClient) Produce(topic string, value []byte, keys ...string) error
- func (p *AsyncProducerClient) ProducerErrors() <-chan *sarama.ProducerError
- func (p *AsyncProducerClient) RunAsyncProducer()
- type Cli
- type CliCfg
- type Config
- type Consumer
- type ConsumerClient
- type ConsumerGroup
Constants ¶
View Source
const LogDebug = "debug"
Variables ¶
This section is empty.
Functions ¶
func NewConsumerGroup ¶
func NewConsumerGroup(handler func(*sarama.ConsumerMessage)) sarama.ConsumerGroupHandler
Types ¶
type AsyncProducer ¶
type AsyncProducerClient ¶
type AsyncProducerClient struct {
// contains filtered or unexported fields
}
func (*AsyncProducerClient) CloseProducer ¶
func (p *AsyncProducerClient) CloseProducer()
func (*AsyncProducerClient) IsRunning ¶
func (p *AsyncProducerClient) IsRunning() bool
func (*AsyncProducerClient) Produce ¶
func (p *AsyncProducerClient) Produce(topic string, value []byte, keys ...string) error
Produce 发送消息到队列。仅当需要保证消息顺序时,才使用参数 keys,并且只允许传一个 key
func (*AsyncProducerClient) ProducerErrors ¶
func (p *AsyncProducerClient) ProducerErrors() <-chan *sarama.ProducerError
func (*AsyncProducerClient) RunAsyncProducer ¶
func (p *AsyncProducerClient) RunAsyncProducer()
type Cli ¶
type Cli interface { Address() string // Address 返回kafka地址 NewConsumer() (*ConsumerClient, error) // NewConsumer 新建消费者,kafka连接失败将返回error NewAsyncProducerClient() (AsyncProducer, error) NewSyncProducerClient() (sarama.SyncProducer, error) }
type CliCfg ¶
type CliCfg struct {
// contains filtered or unexported fields
}
func (*CliCfg) NewAsyncProducerClient ¶
func (k *CliCfg) NewAsyncProducerClient() (AsyncProducer, error)
func (*CliCfg) NewConsumer ¶
func (k *CliCfg) NewConsumer() (*ConsumerClient, error)
func (*CliCfg) NewSyncProducerClient ¶
func (k *CliCfg) NewSyncProducerClient() (sarama.SyncProducer, error)
type Config ¶
type Config struct { Addr string `yaml:"addr" env:"KafkaAddr" env-description:"address of kafka cluster"` QueueLength int `yaml:"queue_length"` KafkaVersion string `yaml:"kafka_version" env:"KafkaVersion" env-description:"version of kafka cluster"` EnableLog bool `yaml:"enable_log" env:"KafkaEnableLog" env-description:"enable kafka log or not"` LogLevel string `yaml:"log_level" env:"KafkaLogLevel" env-description:"record logs in which level, only support debug/info"` }
type ConsumerClient ¶
type ConsumerClient struct {
// contains filtered or unexported fields
}
func (*ConsumerClient) Close ¶
func (cc *ConsumerClient) Close()
func (*ConsumerClient) IsRunning ¶
func (cc *ConsumerClient) IsRunning() bool
func (*ConsumerClient) RunConsumer ¶
func (cc *ConsumerClient) RunConsumer(group string, topic []string, groupHandler sarama.ConsumerGroupHandler) error
type ConsumerGroup ¶
type ConsumerGroup struct {
// contains filtered or unexported fields
}
func (ConsumerGroup) Cleanup ¶
func (ConsumerGroup) Cleanup(sarama.ConsumerGroupSession) error
func (ConsumerGroup) ConsumeClaim ¶
func (h ConsumerGroup) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
func (ConsumerGroup) Setup ¶
func (ConsumerGroup) Setup(sarama.ConsumerGroupSession) error
Click to show internal directories.
Click to hide internal directories.