Versions in this module Expand all Collapse all v1 v1.0.37 Aug 12, 2023 Changes in this version + func NewMq(providerType string, config *MqConfig, logger types.LogProvider) (types.Mq, error) + type BaseClient struct + type ConsumeOption struct + AutoCommit bool + InitialOffset int64 + ReturnErrors bool + func (q ConsumeOption) GetType() types.MqOptionType + type ConsumerClient struct + Option *ConsumeOption + Parameter *ConsumerParameter + type ConsumerConfig struct + GroupId string + Name string + Partition int + Password string + Topic string + User string + type ConsumerGroupHandler struct + Logger types.LogProvider + Process types.MqMsgProcessFunc + func (cgh ConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error + func (cgh ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error + func (cgh ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error + type ConsumerParameter struct + GroupId string + Name string + Password string + Topic string + User string + type Kafka struct + Config *MqConfig + Logger types.LogProvider + func (k *Kafka) CreateConsumer(processFunc types.MqMsgProcessFunc, parameters map[string]interface{}, ...) (types.MqConsumer, error) + func (k *Kafka) CreateProducer(parameters map[string]interface{}, args ...types.MqOptioner) (types.MqProducer, error) + func (k *Kafka) GetDefaultOptions() map[types.MqOptionType]types.MqOptioner + type KafkaConsumer struct + Client *ConsumerClient + Logger types.LogProvider + func (kc *KafkaConsumer) Close() + func (kc *KafkaConsumer) Consume() + type KafkaProvider struct + func (kp *KafkaProvider) Init(rootConfiger types.Configer, logger types.LogProvider, args ...interface{}) error + type MqConfig struct + Brokers []string + Consumers []*ConsumerConfig + Name string + Producers []*ProducerConfig + type MqProviderConfig struct + Default *MqConfig + Items []*MqConfig + type Producer struct + Client *ProducerClient + Logger types.LogProvider + Option *PublishOption + func (p *Producer) Close() + func (p *Producer) GetLastConfirmedId() uint64 + func (p Producer) Publish(data []byte, args ...interface{}) error + func (p Producer) PublishDelay(data []byte, ttl int64, args ...interface{}) error + type ProducerClient struct + Option *PublishOption + Parameter *ProducerParameter + type ProducerConfig struct + Balance string + Compression string + Name string + Topics []string + type ProducerParameter struct + Topics []string + type PublishOption struct + RequiredAcks sarama.RequiredAcks + RetryMax int + ReturnError bool + ReturnSuccess bool + func GetPublishOption(options map[types.MqOptionType]types.MqOptioner) *PublishOption + func (q PublishOption) GetType() types.MqOptionType