Documentation ¶
Overview ¶
Package kafkago
Package kafkago @Title log capability of zerolog @Description zerolog implementation of log capability @Author Ryan Fan 2021-06-09 @Update Ryan Fan 2021-06-09
Index ¶
- Variables
- func NewMq(providerType string, config *MqConfig, logger types.LogProvider) (types.Mq, error)
- type BaseClient
- type ConsumeOption
- type ConsumerClient
- type ConsumerConfig
- type Kafka
- func (k *Kafka) CreateConsumer(name string, processFunc types.MqMsgProcessFunc, ...) (types.MqConsumer, error)
- func (k *Kafka) CreateProducer(name string, args ...map[types.MqOptionType]types.MqOptioner) (types.MqProducer, error)
- func (k *Kafka) GetDefaultOptions() map[types.MqOptionType]types.MqOptioner
- func (k *Kafka) NewConsumerClient(name string, options map[types.MqOptionType]types.MqOptioner) (*ConsumerClient, error)
- type KafkaConsumer
- type KafkaProvider
- type MqConfig
- type MqProviderConfig
- type Producer
- type ProducerClient
- type ProducerConfig
- type PublishOption
Constants ¶
This section is empty.
Variables ¶
View Source
var (
ErrInvalidBalancer = errors.New("invalid balancer")
)
Functions ¶
Types ¶
type BaseClient ¶
type BaseClient struct { Logger types.LogProvider Name string // 客户端名字 Options map[types.MqOptionType]types.MqOptioner // contains filtered or unexported fields }
消息队列客户端维护connection和channel
type ConsumeOption ¶
type ConsumeOption struct { MinBytes int MaxBytes int CommitInterval int // flushes commits to Kafka every second }
func (ConsumeOption) GetType ¶
func (q ConsumeOption) GetType() types.MqOptionType
type ConsumerClient ¶
type ConsumerClient struct { *BaseClient Option *ConsumeOption Config *ConsumerConfig Reader *kafka.Reader }
type ConsumerConfig ¶
type ConsumerConfig struct { Name string `mapstructure:"name"` Topic string `mapstructure:"topic"` Partition int `mapstructure:"partition"` GroupId string `mapstructure:"group_id"` User string `mapstructure:"user"` Password string `mapstructure:"password"` }
ConsumerConfig 客户端配置
type Kafka ¶
type Kafka struct { Logger types.LogProvider Config *MqConfig }
func (*Kafka) CreateConsumer ¶
func (k *Kafka) CreateConsumer(name string, processFunc types.MqMsgProcessFunc, args ...map[types.MqOptionType]types.MqOptioner) (types.MqConsumer, error)
func (*Kafka) CreateProducer ¶
func (k *Kafka) CreateProducer(name string, args ...map[types.MqOptionType]types.MqOptioner) (types.MqProducer, error)
CreateProducer 创造一个生产者
func (*Kafka) GetDefaultOptions ¶
func (k *Kafka) GetDefaultOptions() map[types.MqOptionType]types.MqOptioner
func (*Kafka) NewConsumerClient ¶
func (k *Kafka) NewConsumerClient(name string, options map[types.MqOptionType]types.MqOptioner) (*ConsumerClient, error)
type KafkaConsumer ¶
type KafkaConsumer struct { Logger types.LogProvider Client *ConsumerClient Process types.MqMsgProcessFunc Buffers []kafka.Message }
func (*KafkaConsumer) Close ¶
func (kc *KafkaConsumer) Close()
type KafkaProvider ¶
type KafkaProvider struct {
mq.BaseMqProvider
}
func (*KafkaProvider) Init ¶
func (kp *KafkaProvider) Init(rootConfiger types.Configer, logger types.LogProvider, args ...interface{}) error
Init implements types.Provider interface, used to initialize the capability @author Ryan Fan (2021-06-09) @param baseconf.Configer root config interface to extract config info @return error
type MqConfig ¶
type MqConfig struct { Name string `mapstructure:"name"` Brokers []string `mapstructure:"brokers"` Consumers []*ConsumerConfig `mapstructure:"consumers"` Producers []*ProducerConfig `mapstructure:"producers"` }
MqConfig amqp://user:pass@host:10000/vhost
type MqProviderConfig ¶
type Producer ¶
type Producer struct { Logger types.LogProvider Option *PublishOption Client *ProducerClient }
func (*Producer) GetLastConfirmedId ¶
type ProducerClient ¶
type ProducerClient struct { *BaseClient Config *ProducerConfig Option *PublishOption Writer *kafka.Writer }
type ProducerConfig ¶
type ProducerConfig struct { Name string `mapstructure:"name"` Topics []string `mapstructure:"topics"` Balance string `mapstructure:"balance"` Compression string `mapstructure:"compression"` }
ProducerConfig 发送端配置
type PublishOption ¶
mandatory=true 当没有队列匹配routingKey, 发布的消息也可能处于不能递交状态 immediate=true 如果在匹配的队列上没有消费者准备好,发布的消息也可能处于不能递交状态
func (PublishOption) GetType ¶
func (q PublishOption) GetType() types.MqOptionType
Click to show internal directories.
Click to hide internal directories.