Versions in this module Expand all Collapse all v0 v0.1.22 Jul 22, 2024 v0.1.21 Jul 22, 2024 Changes in this version + const AggregationMessages + const ChannelNum + const ConsumerMsgs + var DefaultBuffer = 100 + var DefaultDataChanSize = 1000 + var DefaultInterval = time.Second + var DefaultSize = 100 + var DefaultWorker = 5 + func BuildConsumerGroupConfig(conf *KafkaConsumerConf, initial int64, autoCommitEnable bool) (*sarama.Config, error) + func BuildProducerConfig(conf KafkaProducerConf) (*sarama.Config, error) + func Check(ctx context.Context, conf *KafkaConsumerConf, topics []string) error + func NewConsumerGroup(conf *sarama.Config, addr []string, groupID string) (sarama.ConsumerGroup, error) + func NewProducer(conf *sarama.Config, addr []string) (sarama.SyncProducer, error) + func OperationIDGenerator() string + func WrapperTracerHandler(aggregationID, triggerID string, msg *MsgDataToMQCtx, ...) error + type BatchAggregationIdListHandlerF func(triggerID string, idList []string) + type BatchConsumerGroup struct + func MustKafkaBatchConsumer(c *KafkaConsumerConf) *BatchConsumerGroup + func (c *BatchConsumerGroup) Cleanup(sarama.ConsumerGroupSession) error + func (c *BatchConsumerGroup) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error + func (c *BatchConsumerGroup) Group() string + func (c *BatchConsumerGroup) MessagesDistributionHandle() + func (c *BatchConsumerGroup) RegisterHandlers(cb BatchAggregationIdListHandlerF, cb2 BatchMessageHandlerF) + func (c *BatchConsumerGroup) Run(channelID int) + func (c *BatchConsumerGroup) Setup(sarama.ConsumerGroupSession) error + func (c *BatchConsumerGroup) Start() + func (c *BatchConsumerGroup) Stop() + func (c *BatchConsumerGroup) Topics() []string + type BatchConsumerGroupV2 struct + func MustKafkaBatchConsumerV2(c *KafkaConsumerConf, autoCommitEnable bool) *BatchConsumerGroupV2 + func (c *BatchConsumerGroupV2) Cleanup(_ sarama.ConsumerGroupSession) error + func (c *BatchConsumerGroupV2) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error + func (c *BatchConsumerGroupV2) Group() string + func (c *BatchConsumerGroupV2) RegisterHandler(cb2 BatchMessageHandlerFV2) + func (c *BatchConsumerGroupV2) Setup(_ sarama.ConsumerGroupSession) error + func (c *BatchConsumerGroupV2) Start() + func (c *BatchConsumerGroupV2) Stop() + func (c *BatchConsumerGroupV2) Topics() []string + type BatchMessageHandlerF func(msgs MsgChannelValue) + type BatchMessageHandlerFV2 func(channelID int, msg *MsgConsumerMessage) + type BatcherConsumerMessage struct + Do func(channelID int, msg *MsgConsumerMessage) + HookFunc func(triggerID string, messages map[string][]*sarama.ConsumerMessage, ...) + Key func(data *sarama.ConsumerMessage) string + OnComplete func(lastMessage *sarama.ConsumerMessage, totalCount int) + Sharding func(key string) int + func NewBatcherConsumerMessage(opts ...Option) *BatcherConsumerMessage + func (b *BatcherConsumerMessage) Close() + func (b *BatcherConsumerMessage) Put(ctx context.Context, data *sarama.ConsumerMessage) error + func (b *BatcherConsumerMessage) Start() error + func (b *BatcherConsumerMessage) Worker() int + type CMessage struct + MarkMessage func() + Message *sarama.ConsumerMessage + type Cmd2Value struct + Cmd int + Value interface{} + type Config struct + type ConsumerGroup struct + func MustKafkaConsumer(c *KafkaConsumerConf) *ConsumerGroup + func (c *ConsumerGroup) Cleanup(sarama.ConsumerGroupSession) error + func (c *ConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error + func (c *ConsumerGroup) Group() string + func (c *ConsumerGroup) RegisterHandlers(topic string, cb MessageHandlerF) + func (c *ConsumerGroup) Setup(sarama.ConsumerGroupSession) error + func (c *ConsumerGroup) Start() + func (c *ConsumerGroup) Stop() + func (c *ConsumerGroup) Topics() []string + type KafkaBatchConsumerConf struct + ChannelNum int + Duration int + type KafkaConsumerConf struct + Brokers []string + CompressType string + Group string + Password string + ProducerAck string + TLS TLSConfig + Topics []string + Username string + type KafkaProducerConf struct + Brokers []string + CompressType string + Password string + ProducerAck string + TLS TLSConfig + Topic string + Username string + type KafkaShardingConsumerConf struct + ClientId string + Concurrency int + QueueBuffer int + type MessageHandlerF func(ctx context.Context, method, key string, value []byte) + type MsgChannelValue struct + AggregationID string + MsgList []*MsgDataToMQCtx + TriggerID string + type MsgConsumerMessage struct + Key string + MsgList []*MsgDataToMQCtx + TriggerID string + func (m MsgConsumerMessage) String() string + type MsgDataToMQCtx struct + Ctx context.Context + Method string + MsgData []byte + type None struct + type Option func(c *Config) + func WithBuffer(b int) Option + func WithDataBuffer(size int) Option + func WithInterval(i time.Duration) Option + func WithSize(s int) Option + func WithSyncWait(wait bool) Option + func WithWorker(w int) Option + type Producer struct + func GetCachedMQClient(c *KafkaProducerConf) *Producer + func MustKafkaProducer(c *KafkaProducerConf) *Producer + func (p *Producer) Close() (err error) + func (p *Producer) SendMessage(ctx context.Context, key string, value []byte) (partition int32, offset int64, err error) + func (p *Producer) SendMessageV2(ctx context.Context, method, key string, value []byte) (partition int32, offset int64, err error) + func (p *Producer) Topic() string + type ShardingConsumerGroup struct + func MustShardingConsumerGroup(c *KafkaShardingConsumerConf) *ShardingConsumerGroup + func (c *ShardingConsumerGroup) Cleanup(sarama.ConsumerGroupSession) error + func (c *ShardingConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) (err error) + func (c *ShardingConsumerGroup) Group() string + func (c *ShardingConsumerGroup) RegisterHandler(cb MessageHandlerF) + func (c *ShardingConsumerGroup) Run(channelID int) + func (c *ShardingConsumerGroup) Setup(sarama.ConsumerGroupSession) error + func (c *ShardingConsumerGroup) Sharding(message *sarama.ConsumerMessage) int + func (c *ShardingConsumerGroup) Start() + func (c *ShardingConsumerGroup) Stop() + func (c *ShardingConsumerGroup) Topics() []string + type TLSConfig struct + CACrt string + ClientCrt string + ClientKey string + ClientKeyPwd string + EnableTLS bool + InsecureSkipVerify bool + type TriggerChannelValue struct