Documentation ¶
Index ¶
- Constants
- func OperationIDGenerator() string
- type BatchAggregationIdListHandlerF
- type BatchConsumerGroup
- func (c *BatchConsumerGroup) Cleanup(sarama.ConsumerGroupSession) error
- func (c *BatchConsumerGroup) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (c *BatchConsumerGroup) Group() string
- func (c *BatchConsumerGroup) MessagesDistributionHandle()
- func (c *BatchConsumerGroup) RegisterHandlers(cb BatchAggregationIdListHandlerF, cb2 BatchMessageHandlerF)
- func (c *BatchConsumerGroup) Run(channelID int)
- func (c *BatchConsumerGroup) Setup(sarama.ConsumerGroupSession) error
- func (c *BatchConsumerGroup) Start()
- func (c *BatchConsumerGroup) Stop()
- func (c *BatchConsumerGroup) Topics() []string
- type BatchMessageHandlerF
- type CMessage
- type Cmd2Value
- type ConsumerGroup
- func (c *ConsumerGroup) Cleanup(sarama.ConsumerGroupSession) error
- func (c *ConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (c *ConsumerGroup) Group() string
- func (c *ConsumerGroup) RegisterHandlers(topic string, cb MessageHandlerF)
- func (c *ConsumerGroup) Setup(sarama.ConsumerGroupSession) error
- func (c *ConsumerGroup) Start()
- func (c *ConsumerGroup) Stop()
- func (c *ConsumerGroup) Topics() []string
- type KafkaBatchConsumerConf
- type KafkaConsumerConf
- type KafkaProducerConf
- type KafkaShardingConsumerConf
- type MessageHandlerF
- type MsgChannelValue
- type MsgDataToMQ
- type None
- type Producer
- type ShardingConsumerGroup
- func (c *ShardingConsumerGroup) Cleanup(sarama.ConsumerGroupSession) error
- func (c *ShardingConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) (err error)
- func (c *ShardingConsumerGroup) Group() string
- func (c *ShardingConsumerGroup) RegisterHandler(cb MessageHandlerF)
- func (c *ShardingConsumerGroup) Run(channelID int)
- func (c *ShardingConsumerGroup) Setup(sarama.ConsumerGroupSession) error
- func (c *ShardingConsumerGroup) Sharding(message *sarama.ConsumerMessage) int
- func (c *ShardingConsumerGroup) Start()
- func (c *ShardingConsumerGroup) Stop()
- func (c *ShardingConsumerGroup) Topics() []string
- type TriggerChannelValue
Constants ¶
const ( ConsumerMsgs = 3 AggregationMessages = 4 ChannelNum = 100 )
Variables ¶
This section is empty.
Functions ¶
func OperationIDGenerator ¶ added in v0.1.14
func OperationIDGenerator() string
Types ¶
type BatchAggregationIdListHandlerF ¶ added in v0.1.14
type BatchConsumerGroup ¶ added in v0.1.14
type BatchConsumerGroup struct { sarama.ConsumerGroup // contains filtered or unexported fields }
BatchConsumerGroup kafka consumer
func MustKafkaBatchConsumer ¶ added in v0.1.14
func MustKafkaBatchConsumer(c *KafkaConsumerConf) *BatchConsumerGroup
func (*BatchConsumerGroup) Cleanup ¶ added in v0.1.14
func (c *BatchConsumerGroup) Cleanup(sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*BatchConsumerGroup) ConsumeClaim ¶ added in v0.1.14
func (c *BatchConsumerGroup) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (*BatchConsumerGroup) Group ¶ added in v0.1.14
func (c *BatchConsumerGroup) Group() string
func (*BatchConsumerGroup) MessagesDistributionHandle ¶ added in v0.1.14
func (c *BatchConsumerGroup) MessagesDistributionHandle()
func (*BatchConsumerGroup) RegisterHandlers ¶ added in v0.1.14
func (c *BatchConsumerGroup) RegisterHandlers(cb BatchAggregationIdListHandlerF, cb2 BatchMessageHandlerF)
func (*BatchConsumerGroup) Run ¶ added in v0.1.14
func (c *BatchConsumerGroup) Run(channelID int)
func (*BatchConsumerGroup) Setup ¶ added in v0.1.14
func (c *BatchConsumerGroup) Setup(sarama.ConsumerGroupSession) error
Setup is run at the beginning of a new session, before ConsumeClaim
func (*BatchConsumerGroup) Start ¶ added in v0.1.14
func (c *BatchConsumerGroup) Start()
Start start consume messages, watch signals
func (*BatchConsumerGroup) Stop ¶ added in v0.1.14
func (c *BatchConsumerGroup) Stop()
Stop Stop consume messages, watch signals
func (*BatchConsumerGroup) Topics ¶ added in v0.1.14
func (c *BatchConsumerGroup) Topics() []string
type BatchMessageHandlerF ¶ added in v0.1.14
type BatchMessageHandlerF func(ctx context.Context, msgs MsgChannelValue)
type CMessage ¶ added in v0.1.17
type CMessage struct { Message *sarama.ConsumerMessage MarkMessage func() }
type ConsumerGroup ¶
type ConsumerGroup struct { sarama.ConsumerGroup // contains filtered or unexported fields }
ConsumerGroup kafka consumer
func MustKafkaConsumer ¶
func MustKafkaConsumer(c *KafkaConsumerConf) *ConsumerGroup
func (*ConsumerGroup) Cleanup ¶
func (c *ConsumerGroup) Cleanup(sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*ConsumerGroup) ConsumeClaim ¶
func (c *ConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (*ConsumerGroup) Group ¶
func (c *ConsumerGroup) Group() string
func (*ConsumerGroup) RegisterHandlers ¶
func (c *ConsumerGroup) RegisterHandlers(topic string, cb MessageHandlerF)
func (*ConsumerGroup) Setup ¶
func (c *ConsumerGroup) Setup(sarama.ConsumerGroupSession) error
Setup is run at the beginning of a new session, before ConsumeClaim
func (*ConsumerGroup) Start ¶
func (c *ConsumerGroup) Start()
Start start consume messages, watch signals
func (*ConsumerGroup) Stop ¶
func (c *ConsumerGroup) Stop()
Stop Stop consume messages, watch signals
func (*ConsumerGroup) Topics ¶
func (c *ConsumerGroup) Topics() []string
type KafkaBatchConsumerConf ¶ added in v0.1.14
type KafkaBatchConsumerConf struct { KafkaConsumerConf Duration int `json:",default=100"` ChannelNum int `json:"default=50"` }
KafkaBatchConsumerConf kafka client settings.
type KafkaConsumerConf ¶
KafkaConsumerConf kafka client settings.
type KafkaProducerConf ¶
KafkaProducerConf kafka producer settings.
type KafkaShardingConsumerConf ¶ added in v0.1.17
type KafkaShardingConsumerConf struct { KafkaConsumerConf Concurrency int `json:",default=2048"` QueueBuffer int `json:",default=128"` ClientId string `json:",default=sarama"` }
type MsgChannelValue ¶ added in v0.1.14
type MsgChannelValue struct { AggregationID string //maybe userID or super groupID TriggerID string MsgList []*MsgDataToMQ }
type MsgDataToMQ ¶ added in v0.1.14
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
func GetCachedMQClient ¶
func GetCachedMQClient(c *KafkaProducerConf) *Producer
func MustKafkaProducer ¶
func MustKafkaProducer(c *KafkaProducerConf) *Producer
type ShardingConsumerGroup ¶ added in v0.1.17
type ShardingConsumerGroup struct { sarama.ConsumerGroup // contains filtered or unexported fields }
ShardingConsumerGroup represents a Sarama consumer GroupName consumer
func MustShardingConsumerGroup ¶ added in v0.1.17
func MustShardingConsumerGroup(c *KafkaShardingConsumerConf) *ShardingConsumerGroup
func (*ShardingConsumerGroup) Cleanup ¶ added in v0.1.17
func (c *ShardingConsumerGroup) Cleanup(sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*ShardingConsumerGroup) ConsumeClaim ¶ added in v0.1.17
func (c *ShardingConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) (err error)
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (*ShardingConsumerGroup) Group ¶ added in v0.1.17
func (c *ShardingConsumerGroup) Group() string
func (*ShardingConsumerGroup) RegisterHandler ¶ added in v0.1.17
func (c *ShardingConsumerGroup) RegisterHandler(cb MessageHandlerF)
func (*ShardingConsumerGroup) Run ¶ added in v0.1.17
func (c *ShardingConsumerGroup) Run(channelID int)
func (*ShardingConsumerGroup) Setup ¶ added in v0.1.17
func (c *ShardingConsumerGroup) Setup(sarama.ConsumerGroupSession) error
Setup is run at the beginning of a new session, before ConsumeClaim
func (*ShardingConsumerGroup) Sharding ¶ added in v0.1.17
func (c *ShardingConsumerGroup) Sharding(message *sarama.ConsumerMessage) int
func (*ShardingConsumerGroup) Start ¶ added in v0.1.17
func (c *ShardingConsumerGroup) Start()
Start start consume messages, watch signals
func (*ShardingConsumerGroup) Stop ¶ added in v0.1.17
func (c *ShardingConsumerGroup) Stop()
Stop Stop consume messages, watch signals
func (*ShardingConsumerGroup) Topics ¶ added in v0.1.17
func (c *ShardingConsumerGroup) Topics() []string
type TriggerChannelValue ¶ added in v0.1.14
type TriggerChannelValue struct {
// contains filtered or unexported fields
}