Documentation ¶
Overview ¶
Package kafka
Package kafka @Title log capability of zerolog @Description zerolog implementation of log capability @Author Ryan Fan 2021-06-09 @Update Ryan Fan 2021-06-09
Index ¶
- func NewMq(providerType string, config *MqConfig, logger types.LogProvider) (types.Mq, error)
- type BaseClient
- type ConsumeOption
- type ConsumerClient
- type ConsumerConfig
- type ConsumerGroupHandler
- type Kafka
- func (k *Kafka) CreateConsumer(name string, processFunc types.MqMsgProcessFunc, ...) (types.MqConsumer, error)
- func (k *Kafka) CreateProducer(name string, args ...map[types.MqOptionType]types.MqOptioner) (types.MqProducer, error)
- func (k *Kafka) GetDefaultOptions() map[types.MqOptionType]types.MqOptioner
- type KafkaConsumer
- type KafkaProvider
- type MqConfig
- type MqProviderConfig
- type Producer
- type ProducerClient
- type ProducerConfig
- type PublishOption
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type BaseClient ¶
type BaseClient struct {
// contains filtered or unexported fields
}
BaseClient 消息队列客户端维护connection和channel
type ConsumeOption ¶
type ConsumeOption struct { InitialOffset int64 // 如果之前没有offset提交,选择offset的策略 // broker等待Consumer.Fetch.Min的最长时间, // 如果没取到足够Consumer.Fetch.Min, 等待MaxWaitTime后也会返回 // MaxWaitTime time.Duration // 是否在消费时有任何错误都会返回到Errors通道 ReturnErrors bool // 是否自动提交 AutoCommit bool }
func (ConsumeOption) GetType ¶
func (q ConsumeOption) GetType() types.MqOptionType
type ConsumerClient ¶
type ConsumerClient struct { *BaseClient Option *ConsumeOption Config *ConsumerConfig // contains filtered or unexported fields }
type ConsumerConfig ¶
type ConsumerConfig struct { Name string `mapstructure:"name"` Topic string `mapstructure:"topic"` Partition int `mapstructure:"partition"` GroupId string `mapstructure:"group_id"` User string `mapstructure:"user"` Password string `mapstructure:"password"` }
ConsumerConfig 客户端配置
type ConsumerGroupHandler ¶
type ConsumerGroupHandler struct { Logger types.LogProvider Process types.MqMsgProcessFunc // contains filtered or unexported fields }
func (ConsumerGroupHandler) Cleanup ¶
func (cgh ConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (ConsumerGroupHandler) ConsumeClaim ¶
func (cgh ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim 在ConsumeClaim()中必须执行不断取消息的循环:ConsumerGroupClaim.Messages()
func (ConsumerGroupHandler) Setup ¶
func (cgh ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error
Setup 消费组在执行Consume时会初始化consumer,并依次调用Setup()->ConsumeClaim(), 关闭之前会调用Cleanup() 消息的具体消费发生在ConsumeClaim()中,你可以在Setup()中初始化一些东西
type Kafka ¶
type Kafka struct { Logger types.LogProvider Config *MqConfig }
func (*Kafka) CreateConsumer ¶
func (k *Kafka) CreateConsumer(name string, processFunc types.MqMsgProcessFunc, args ...map[types.MqOptionType]types.MqOptioner) (types.MqConsumer, error)
func (*Kafka) CreateProducer ¶
func (k *Kafka) CreateProducer(name string, args ...map[types.MqOptionType]types.MqOptioner) (types.MqProducer, error)
CreateProducer 创造一个生产者
func (*Kafka) GetDefaultOptions ¶
func (k *Kafka) GetDefaultOptions() map[types.MqOptionType]types.MqOptioner
type KafkaConsumer ¶
type KafkaConsumer struct { Logger types.LogProvider Client *ConsumerClient // contains filtered or unexported fields }
func (*KafkaConsumer) Close ¶
func (kc *KafkaConsumer) Close()
type KafkaProvider ¶
type KafkaProvider struct {
mq.BaseMqProvider
}
func (*KafkaProvider) Init ¶
func (kp *KafkaProvider) Init(rootConfiger types.Configer, logger types.LogProvider, args ...interface{}) error
Init implements types.Provider interface, used to initialize the capability @author Ryan Fan (2021-06-09) @param baseconf.Configer root config interface to extract config info @return error
type MqConfig ¶
type MqConfig struct { Name string `mapstructure:"name"` Brokers []string `mapstructure:"brokers"` Consumers []*ConsumerConfig `mapstructure:"consumers"` Producers []*ProducerConfig `mapstructure:"producers"` }
MqConfig amqp://user:pass@host:10000/vhost
type MqProviderConfig ¶
type Producer ¶
type Producer struct { Logger types.LogProvider Option *PublishOption Client *ProducerClient }
func (*Producer) GetLastConfirmedId ¶
type ProducerClient ¶
type ProducerClient struct { *BaseClient Config *ProducerConfig Option *PublishOption // contains filtered or unexported fields }
type ProducerConfig ¶
type ProducerConfig struct { Name string `mapstructure:"name"` Topics []string `mapstructure:"topics"` Balance string `mapstructure:"balance"` Compression string `mapstructure:"compression"` }
ProducerConfig 发送端配置
type PublishOption ¶
type PublishOption struct { // NoResponse:0, doesn't send any response, the TCP ACK is all you get // WaitForLocal:1, waits for only the local commit to succeed before responding // WaitForAll:-1, waits for all in-sync replicas to commit before responding RequiredAcks sarama.RequiredAcks // The total number of times to retry sending a message (default 3) RetryMax int // If enabled, successfully delivered messages will be returned on the successes channel ReturnSuccess bool // If enabled, successfully delivered messages will be returned on the successes channel ReturnError bool }
func (PublishOption) GetType ¶
func (q PublishOption) GetType() types.MqOptionType