Documentation ¶
Index ¶
- Constants
- Variables
- func BuildConsumerGroupConfig(conf *KafkaConsumerConf, initial int64, autoCommitEnable bool) (*sarama.Config, error)
- func BuildProducerConfig(conf KafkaProducerConf) (*sarama.Config, error)
- func Check(ctx context.Context, conf *KafkaConsumerConf, topics []string) error
- func NewConsumerGroup(conf *sarama.Config, addr []string, groupID string) (sarama.ConsumerGroup, error)
- func NewProducer(conf *sarama.Config, addr []string) (sarama.SyncProducer, error)
- func OperationIDGenerator() string
- func WrapperTracerHandler(aggregationID, triggerID string, msg *MsgDataToMQCtx, ...) error
- type BatchAggregationIdListHandlerF
- type BatchConsumerGroup
- func (c *BatchConsumerGroup) Cleanup(sarama.ConsumerGroupSession) error
- func (c *BatchConsumerGroup) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (c *BatchConsumerGroup) Group() string
- func (c *BatchConsumerGroup) MessagesDistributionHandle()
- func (c *BatchConsumerGroup) RegisterHandlers(cb BatchAggregationIdListHandlerF, cb2 BatchMessageHandlerF)
- func (c *BatchConsumerGroup) Run(channelID int)
- func (c *BatchConsumerGroup) Setup(sarama.ConsumerGroupSession) error
- func (c *BatchConsumerGroup) Start()
- func (c *BatchConsumerGroup) Stop()
- func (c *BatchConsumerGroup) Topics() []string
- type BatchConsumerGroupV2
- func (c *BatchConsumerGroupV2) Cleanup(_ sarama.ConsumerGroupSession) error
- func (c *BatchConsumerGroupV2) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (c *BatchConsumerGroupV2) Group() string
- func (c *BatchConsumerGroupV2) RegisterHandler(cb2 BatchMessageHandlerFV2)
- func (c *BatchConsumerGroupV2) Setup(_ sarama.ConsumerGroupSession) error
- func (c *BatchConsumerGroupV2) Start()
- func (c *BatchConsumerGroupV2) Stop()
- func (c *BatchConsumerGroupV2) Topics() []string
- type BatchMessageHandlerF
- type BatchMessageHandlerFV2
- type BatcherConsumerMessage
- type CMessage
- type Cmd2Value
- type Config
- type ConsumerGroup
- func (c *ConsumerGroup) Cleanup(sarama.ConsumerGroupSession) error
- func (c *ConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (c *ConsumerGroup) Group() string
- func (c *ConsumerGroup) RegisterHandlers(topic string, cb MessageHandlerF)
- func (c *ConsumerGroup) Setup(sarama.ConsumerGroupSession) error
- func (c *ConsumerGroup) Start()
- func (c *ConsumerGroup) Stop()
- func (c *ConsumerGroup) Topics() []string
- type KafkaBatchConsumerConf
- type KafkaConsumerConf
- type KafkaProducerConf
- type KafkaShardingConsumerConf
- type MessageHandlerF
- type MsgChannelValue
- type MsgConsumerMessage
- type MsgDataToMQCtx
- type None
- type Option
- type Producer
- func (p *Producer) Close() (err error)
- func (p *Producer) SendMessage(ctx context.Context, key string, value []byte) (partition int32, offset int64, err error)
- func (p *Producer) SendMessageV2(ctx context.Context, method, key string, value []byte) (partition int32, offset int64, err error)
- func (p *Producer) Topic() string
- type ShardingConsumerGroup
- func (c *ShardingConsumerGroup) Cleanup(sarama.ConsumerGroupSession) error
- func (c *ShardingConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) (err error)
- func (c *ShardingConsumerGroup) Group() string
- func (c *ShardingConsumerGroup) RegisterHandler(cb MessageHandlerF)
- func (c *ShardingConsumerGroup) Run(channelID int)
- func (c *ShardingConsumerGroup) Setup(sarama.ConsumerGroupSession) error
- func (c *ShardingConsumerGroup) Sharding(message *sarama.ConsumerMessage) int
- func (c *ShardingConsumerGroup) Start()
- func (c *ShardingConsumerGroup) Stop()
- func (c *ShardingConsumerGroup) Topics() []string
- type TLSConfig
- type TriggerChannelValue
Constants ¶
const ( ConsumerMsgs = 3 AggregationMessages = 4 ChannelNum = 100 )
Variables ¶
var ( DefaultDataChanSize = 1000 DefaultSize = 100 DefaultBuffer = 100 DefaultWorker = 5 DefaultInterval = time.Second )
Functions ¶
func BuildProducerConfig ¶
func BuildProducerConfig(conf KafkaProducerConf) (*sarama.Config, error)
func NewConsumerGroup ¶
func NewProducer ¶
func OperationIDGenerator ¶
func OperationIDGenerator() string
func WrapperTracerHandler ¶
Types ¶
type BatchConsumerGroup ¶
type BatchConsumerGroup struct { sarama.ConsumerGroup // contains filtered or unexported fields }
BatchConsumerGroup kafka consumer
func MustKafkaBatchConsumer ¶
func MustKafkaBatchConsumer(c *KafkaConsumerConf) *BatchConsumerGroup
func (*BatchConsumerGroup) Cleanup ¶
func (c *BatchConsumerGroup) Cleanup(sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*BatchConsumerGroup) ConsumeClaim ¶
func (c *BatchConsumerGroup) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (*BatchConsumerGroup) Group ¶
func (c *BatchConsumerGroup) Group() string
func (*BatchConsumerGroup) MessagesDistributionHandle ¶
func (c *BatchConsumerGroup) MessagesDistributionHandle()
func (*BatchConsumerGroup) RegisterHandlers ¶
func (c *BatchConsumerGroup) RegisterHandlers(cb BatchAggregationIdListHandlerF, cb2 BatchMessageHandlerF)
func (*BatchConsumerGroup) Run ¶
func (c *BatchConsumerGroup) Run(channelID int)
func (*BatchConsumerGroup) Setup ¶
func (c *BatchConsumerGroup) Setup(sarama.ConsumerGroupSession) error
Setup is run at the beginning of a new session, before ConsumeClaim
func (*BatchConsumerGroup) Start ¶
func (c *BatchConsumerGroup) Start()
Start start consume messages, watch signals
func (*BatchConsumerGroup) Stop ¶
func (c *BatchConsumerGroup) Stop()
Stop Stop consume messages, watch signals
func (*BatchConsumerGroup) Topics ¶
func (c *BatchConsumerGroup) Topics() []string
type BatchConsumerGroupV2 ¶
type BatchConsumerGroupV2 struct { sarama.ConsumerGroup // contains filtered or unexported fields }
BatchConsumerGroupV2 kafka consumer
func MustKafkaBatchConsumerV2 ¶
func MustKafkaBatchConsumerV2(c *KafkaConsumerConf, autoCommitEnable bool) *BatchConsumerGroupV2
func (*BatchConsumerGroupV2) Cleanup ¶
func (c *BatchConsumerGroupV2) Cleanup(_ sarama.ConsumerGroupSession) error
func (*BatchConsumerGroupV2) ConsumeClaim ¶
func (c *BatchConsumerGroupV2) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
func (*BatchConsumerGroupV2) Group ¶
func (c *BatchConsumerGroupV2) Group() string
func (*BatchConsumerGroupV2) RegisterHandler ¶
func (c *BatchConsumerGroupV2) RegisterHandler(cb2 BatchMessageHandlerFV2)
func (*BatchConsumerGroupV2) Setup ¶
func (c *BatchConsumerGroupV2) Setup(_ sarama.ConsumerGroupSession) error
func (*BatchConsumerGroupV2) Start ¶
func (c *BatchConsumerGroupV2) Start()
Start start consume messages, watch signals
func (*BatchConsumerGroupV2) Stop ¶
func (c *BatchConsumerGroupV2) Stop()
Stop Stop consume messages, watch signals
func (*BatchConsumerGroupV2) Topics ¶
func (c *BatchConsumerGroupV2) Topics() []string
type BatchMessageHandlerF ¶
type BatchMessageHandlerF func(msgs MsgChannelValue)
type BatchMessageHandlerFV2 ¶
type BatchMessageHandlerFV2 func(channelID int, msg *MsgConsumerMessage)
type BatcherConsumerMessage ¶
type BatcherConsumerMessage struct { Do func(channelID int, msg *MsgConsumerMessage) OnComplete func(lastMessage *sarama.ConsumerMessage, totalCount int) Sharding func(key string) int Key func(data *sarama.ConsumerMessage) string HookFunc func(triggerID string, messages map[string][]*sarama.ConsumerMessage, totalCount int, lastMessage *sarama.ConsumerMessage) // contains filtered or unexported fields }
func NewBatcherConsumerMessage ¶
func NewBatcherConsumerMessage(opts ...Option) *BatcherConsumerMessage
func (*BatcherConsumerMessage) Close ¶
func (b *BatcherConsumerMessage) Close()
func (*BatcherConsumerMessage) Put ¶
func (b *BatcherConsumerMessage) Put(ctx context.Context, data *sarama.ConsumerMessage) error
func (*BatcherConsumerMessage) Start ¶
func (b *BatcherConsumerMessage) Start() error
func (*BatcherConsumerMessage) Worker ¶
func (b *BatcherConsumerMessage) Worker() int
type CMessage ¶
type CMessage struct { Message *sarama.ConsumerMessage MarkMessage func() }
type ConsumerGroup ¶
type ConsumerGroup struct { sarama.ConsumerGroup // contains filtered or unexported fields }
ConsumerGroup kafka consumer
func MustKafkaConsumer ¶
func MustKafkaConsumer(c *KafkaConsumerConf) *ConsumerGroup
func (*ConsumerGroup) Cleanup ¶
func (c *ConsumerGroup) Cleanup(sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*ConsumerGroup) ConsumeClaim ¶
func (c *ConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (*ConsumerGroup) Group ¶
func (c *ConsumerGroup) Group() string
func (*ConsumerGroup) RegisterHandlers ¶
func (c *ConsumerGroup) RegisterHandlers(topic string, cb MessageHandlerF)
func (*ConsumerGroup) Setup ¶
func (c *ConsumerGroup) Setup(sarama.ConsumerGroupSession) error
Setup is run at the beginning of a new session, before ConsumeClaim
func (*ConsumerGroup) Start ¶
func (c *ConsumerGroup) Start()
Start start consume messages, watch signals
func (*ConsumerGroup) Stop ¶
func (c *ConsumerGroup) Stop()
Stop Stop consume messages, watch signals
func (*ConsumerGroup) Topics ¶
func (c *ConsumerGroup) Topics() []string
type KafkaBatchConsumerConf ¶
type KafkaBatchConsumerConf struct { KafkaConsumerConf Duration int `json:",default=100"` ChannelNum int `json:"default=50"` }
KafkaBatchConsumerConf kafka client settings.
type KafkaConsumerConf ¶
type KafkaConsumerConf struct { Username string `json:",optional"` Password string `json:",optional"` ProducerAck string `json:",optional"` CompressType string `json:",optional"` TLS TLSConfig `json:",optional"` Brokers []string Topics []string Group string }
KafkaConsumerConf kafka client settings.
type KafkaProducerConf ¶
type KafkaProducerConf struct { Username string `json:",optional"` Password string `json:",optional"` ProducerAck string `json:",optional"` CompressType string `json:",optional"` TLS TLSConfig `json:",optional"` Brokers []string Topic string }
KafkaProducerConf kafka producer settings.
type KafkaShardingConsumerConf ¶
type KafkaShardingConsumerConf struct { KafkaConsumerConf Concurrency int `json:",default=2048"` QueueBuffer int `json:",default=128"` ClientId string `json:",default=sarama"` }
type MessageHandlerF ¶
type MsgChannelValue ¶
type MsgChannelValue struct { AggregationID string //maybe userID or super groupID TriggerID string MsgList []*MsgDataToMQCtx }
type MsgConsumerMessage ¶
type MsgConsumerMessage struct { Key string TriggerID string MsgList []*MsgDataToMQCtx }
func (MsgConsumerMessage) String ¶
func (m MsgConsumerMessage) String() string
type MsgDataToMQCtx ¶
type Option ¶
type Option func(c *Config)
func WithBuffer ¶
func WithDataBuffer ¶
func WithInterval ¶
func WithSyncWait ¶
func WithWorker ¶
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
func GetCachedMQClient ¶
func GetCachedMQClient(c *KafkaProducerConf) *Producer
func MustKafkaProducer ¶
func MustKafkaProducer(c *KafkaProducerConf) *Producer
func (*Producer) SendMessage ¶
func (p *Producer) SendMessage(ctx context.Context, key string, value []byte) (partition int32, offset int64, err error)
SendMessage Input send msg to kafka NOTE: If producer has beed created failed, the message will lose.
func (*Producer) SendMessageV2 ¶
type ShardingConsumerGroup ¶
type ShardingConsumerGroup struct { sarama.ConsumerGroup // contains filtered or unexported fields }
ShardingConsumerGroup represents a Sarama consumer GroupName consumer
func MustShardingConsumerGroup ¶
func MustShardingConsumerGroup(c *KafkaShardingConsumerConf) *ShardingConsumerGroup
func (*ShardingConsumerGroup) Cleanup ¶
func (c *ShardingConsumerGroup) Cleanup(sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*ShardingConsumerGroup) ConsumeClaim ¶
func (c *ShardingConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) (err error)
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (*ShardingConsumerGroup) Group ¶
func (c *ShardingConsumerGroup) Group() string
func (*ShardingConsumerGroup) RegisterHandler ¶
func (c *ShardingConsumerGroup) RegisterHandler(cb MessageHandlerF)
func (*ShardingConsumerGroup) Run ¶
func (c *ShardingConsumerGroup) Run(channelID int)
func (*ShardingConsumerGroup) Setup ¶
func (c *ShardingConsumerGroup) Setup(sarama.ConsumerGroupSession) error
Setup is run at the beginning of a new session, before ConsumeClaim
func (*ShardingConsumerGroup) Sharding ¶
func (c *ShardingConsumerGroup) Sharding(message *sarama.ConsumerMessage) int
func (*ShardingConsumerGroup) Start ¶
func (c *ShardingConsumerGroup) Start()
Start start consume messages, watch signals
func (*ShardingConsumerGroup) Stop ¶
func (c *ShardingConsumerGroup) Stop()
Stop Stop consume messages, watch signals
func (*ShardingConsumerGroup) Topics ¶
func (c *ShardingConsumerGroup) Topics() []string
type TriggerChannelValue ¶
type TriggerChannelValue struct {
// contains filtered or unexported fields
}