Documentation ¶
Index ¶
- Constants
- Variables
- func BuildConsumerGroupConfig(conf *KafkaConsumerConf, initial int64, autoCommitEnable bool) (*sarama.Config, error)
- func BuildProducerConfig(conf KafkaProducerConf) (*sarama.Config, error)
- func Check(ctx context.Context, conf *KafkaConsumerConf, topics []string) error
- func NewConsumerGroup(conf *sarama.Config, addr []string, groupID string) (sarama.ConsumerGroup, error)
- func NewProducer(conf *sarama.Config, addr []string) (sarama.SyncProducer, error)
- func OperationIDGenerator() string
- func WrapperTracerHandler(aggregationID, triggerID string, msg *MsgDataToMQCtx, ...) error
- type BatchAggregationIdListHandlerF
- type BatchConsumerGroup
- func (c *BatchConsumerGroup) Cleanup(sarama.ConsumerGroupSession) error
- func (c *BatchConsumerGroup) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (c *BatchConsumerGroup) Group() string
- func (c *BatchConsumerGroup) MessagesDistributionHandle()
- func (c *BatchConsumerGroup) RegisterHandlers(cb BatchAggregationIdListHandlerF, cb2 BatchMessageHandlerF)
- func (c *BatchConsumerGroup) Run(channelID int)
- func (c *BatchConsumerGroup) Setup(sarama.ConsumerGroupSession) error
- func (c *BatchConsumerGroup) Start()
- func (c *BatchConsumerGroup) Stop()
- func (c *BatchConsumerGroup) Topics() []string
- type BatchConsumerGroupV2
- func (c *BatchConsumerGroupV2) Cleanup(_ sarama.ConsumerGroupSession) error
- func (c *BatchConsumerGroupV2) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (c *BatchConsumerGroupV2) Group() string
- func (c *BatchConsumerGroupV2) RegisterHandler(cb2 BatchMessageHandlerFV2)
- func (c *BatchConsumerGroupV2) Setup(_ sarama.ConsumerGroupSession) error
- func (c *BatchConsumerGroupV2) Start()
- func (c *BatchConsumerGroupV2) Stop()
- func (c *BatchConsumerGroupV2) Topics() []string
- type BatchMessageHandlerF
- type BatchMessageHandlerFV2
- type BatcherConsumerMessage
- type CMessage
- type Cmd2Value
- type Config
- type ConsumerGroup
- func (c *ConsumerGroup) Cleanup(sarama.ConsumerGroupSession) error
- func (c *ConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (c *ConsumerGroup) Group() string
- func (c *ConsumerGroup) RegisterHandlers(topic string, cb MessageHandlerF)
- func (c *ConsumerGroup) Setup(sarama.ConsumerGroupSession) error
- func (c *ConsumerGroup) Start()
- func (c *ConsumerGroup) Stop()
- func (c *ConsumerGroup) Topics() []string
- type KafkaBatchConsumerConf
- type KafkaConsumerConf
- type KafkaProducerConf
- type KafkaShardingConsumerConf
- type MessageHandlerF
- type MsgChannelValue
- type MsgConsumerMessage
- type MsgDataToMQCtx
- type None
- type Option
- type Producer
- func (p *Producer) Close() (err error)
- func (p *Producer) SendMessage(ctx context.Context, key string, value []byte) (partition int32, offset int64, err error)
- func (p *Producer) SendMessageV2(ctx context.Context, method, key string, value []byte) (partition int32, offset int64, err error)
- func (p *Producer) Topic() string
- type ShardingConsumerGroup
- func (c *ShardingConsumerGroup) Cleanup(sarama.ConsumerGroupSession) error
- func (c *ShardingConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) (err error)
- func (c *ShardingConsumerGroup) Group() string
- func (c *ShardingConsumerGroup) RegisterHandler(cb MessageHandlerF)
- func (c *ShardingConsumerGroup) Run(channelID int)
- func (c *ShardingConsumerGroup) Setup(sarama.ConsumerGroupSession) error
- func (c *ShardingConsumerGroup) Sharding(message *sarama.ConsumerMessage) int
- func (c *ShardingConsumerGroup) Start()
- func (c *ShardingConsumerGroup) Stop()
- func (c *ShardingConsumerGroup) Topics() []string
- type TLSConfig
- type TriggerChannelValue
Constants ¶
const ( ConsumerMsgs = 3 AggregationMessages = 4 ChannelNum = 100 )
Variables ¶
var ( DefaultDataChanSize = 1000 DefaultSize = 100 DefaultBuffer = 100 DefaultWorker = 5 DefaultInterval = time.Second )
Functions ¶
func BuildConsumerGroupConfig ¶ added in v0.1.21
func BuildProducerConfig ¶ added in v0.1.21
func BuildProducerConfig(conf KafkaProducerConf) (*sarama.Config, error)
func Check ¶ added in v0.1.21
func Check(ctx context.Context, conf *KafkaConsumerConf, topics []string) error
func NewConsumerGroup ¶ added in v0.1.21
func NewProducer ¶ added in v0.1.21
func OperationIDGenerator ¶ added in v0.1.14
func OperationIDGenerator() string
func WrapperTracerHandler ¶ added in v0.1.20
Types ¶
type BatchAggregationIdListHandlerF ¶ added in v0.1.14
type BatchConsumerGroup ¶ added in v0.1.14
type BatchConsumerGroup struct { sarama.ConsumerGroup // contains filtered or unexported fields }
BatchConsumerGroup kafka consumer
func MustKafkaBatchConsumer ¶ added in v0.1.14
func MustKafkaBatchConsumer(c *KafkaConsumerConf) *BatchConsumerGroup
func (*BatchConsumerGroup) Cleanup ¶ added in v0.1.14
func (c *BatchConsumerGroup) Cleanup(sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*BatchConsumerGroup) ConsumeClaim ¶ added in v0.1.14
func (c *BatchConsumerGroup) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (*BatchConsumerGroup) Group ¶ added in v0.1.14
func (c *BatchConsumerGroup) Group() string
func (*BatchConsumerGroup) MessagesDistributionHandle ¶ added in v0.1.14
func (c *BatchConsumerGroup) MessagesDistributionHandle()
func (*BatchConsumerGroup) RegisterHandlers ¶ added in v0.1.14
func (c *BatchConsumerGroup) RegisterHandlers(cb BatchAggregationIdListHandlerF, cb2 BatchMessageHandlerF)
func (*BatchConsumerGroup) Run ¶ added in v0.1.14
func (c *BatchConsumerGroup) Run(channelID int)
func (*BatchConsumerGroup) Setup ¶ added in v0.1.14
func (c *BatchConsumerGroup) Setup(sarama.ConsumerGroupSession) error
Setup is run at the beginning of a new session, before ConsumeClaim
func (*BatchConsumerGroup) Start ¶ added in v0.1.14
func (c *BatchConsumerGroup) Start()
Start start consume messages, watch signals
func (*BatchConsumerGroup) Stop ¶ added in v0.1.14
func (c *BatchConsumerGroup) Stop()
Stop Stop consume messages, watch signals
func (*BatchConsumerGroup) Topics ¶ added in v0.1.14
func (c *BatchConsumerGroup) Topics() []string
type BatchConsumerGroupV2 ¶ added in v0.1.21
type BatchConsumerGroupV2 struct { sarama.ConsumerGroup // contains filtered or unexported fields }
BatchConsumerGroupV2 kafka consumer
func MustKafkaBatchConsumerV2 ¶ added in v0.1.21
func MustKafkaBatchConsumerV2(c *KafkaConsumerConf, autoCommitEnable bool) *BatchConsumerGroupV2
func (*BatchConsumerGroupV2) Cleanup ¶ added in v0.1.21
func (c *BatchConsumerGroupV2) Cleanup(_ sarama.ConsumerGroupSession) error
func (*BatchConsumerGroupV2) ConsumeClaim ¶ added in v0.1.21
func (c *BatchConsumerGroupV2) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
func (*BatchConsumerGroupV2) Group ¶ added in v0.1.21
func (c *BatchConsumerGroupV2) Group() string
func (*BatchConsumerGroupV2) RegisterHandler ¶ added in v0.1.21
func (c *BatchConsumerGroupV2) RegisterHandler(cb2 BatchMessageHandlerFV2)
func (*BatchConsumerGroupV2) Setup ¶ added in v0.1.21
func (c *BatchConsumerGroupV2) Setup(_ sarama.ConsumerGroupSession) error
func (*BatchConsumerGroupV2) Start ¶ added in v0.1.21
func (c *BatchConsumerGroupV2) Start()
Start start consume messages, watch signals
func (*BatchConsumerGroupV2) Stop ¶ added in v0.1.21
func (c *BatchConsumerGroupV2) Stop()
Stop Stop consume messages, watch signals
func (*BatchConsumerGroupV2) Topics ¶ added in v0.1.21
func (c *BatchConsumerGroupV2) Topics() []string
type BatchMessageHandlerF ¶ added in v0.1.14
type BatchMessageHandlerF func(msgs MsgChannelValue)
type BatchMessageHandlerFV2 ¶ added in v0.1.21
type BatchMessageHandlerFV2 func(channelID int, msg *MsgConsumerMessage)
type BatcherConsumerMessage ¶ added in v0.1.21
type BatcherConsumerMessage struct { Do func(channelID int, msg *MsgConsumerMessage) OnComplete func(lastMessage *sarama.ConsumerMessage, totalCount int) Sharding func(key string) int Key func(data *sarama.ConsumerMessage) string HookFunc func(triggerID string, messages map[string][]*sarama.ConsumerMessage, totalCount int, lastMessage *sarama.ConsumerMessage) // contains filtered or unexported fields }
func NewBatcherConsumerMessage ¶ added in v0.1.21
func NewBatcherConsumerMessage(opts ...Option) *BatcherConsumerMessage
func (*BatcherConsumerMessage) Close ¶ added in v0.1.21
func (b *BatcherConsumerMessage) Close()
func (*BatcherConsumerMessage) Put ¶ added in v0.1.21
func (b *BatcherConsumerMessage) Put(ctx context.Context, data *sarama.ConsumerMessage) error
func (*BatcherConsumerMessage) Start ¶ added in v0.1.21
func (b *BatcherConsumerMessage) Start() error
func (*BatcherConsumerMessage) Worker ¶ added in v0.1.21
func (b *BatcherConsumerMessage) Worker() int
type CMessage ¶ added in v0.1.17
type CMessage struct { Message *sarama.ConsumerMessage MarkMessage func() }
type ConsumerGroup ¶
type ConsumerGroup struct { sarama.ConsumerGroup // contains filtered or unexported fields }
ConsumerGroup kafka consumer
func MustKafkaConsumer ¶
func MustKafkaConsumer(c *KafkaConsumerConf) *ConsumerGroup
func (*ConsumerGroup) Cleanup ¶
func (c *ConsumerGroup) Cleanup(sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*ConsumerGroup) ConsumeClaim ¶
func (c *ConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (*ConsumerGroup) Group ¶
func (c *ConsumerGroup) Group() string
func (*ConsumerGroup) RegisterHandlers ¶
func (c *ConsumerGroup) RegisterHandlers(topic string, cb MessageHandlerF)
func (*ConsumerGroup) Setup ¶
func (c *ConsumerGroup) Setup(sarama.ConsumerGroupSession) error
Setup is run at the beginning of a new session, before ConsumeClaim
func (*ConsumerGroup) Start ¶
func (c *ConsumerGroup) Start()
Start start consume messages, watch signals
func (*ConsumerGroup) Stop ¶
func (c *ConsumerGroup) Stop()
Stop Stop consume messages, watch signals
func (*ConsumerGroup) Topics ¶
func (c *ConsumerGroup) Topics() []string
type KafkaBatchConsumerConf ¶ added in v0.1.14
type KafkaBatchConsumerConf struct { KafkaConsumerConf Duration int `json:",default=100"` ChannelNum int `json:"default=50"` }
KafkaBatchConsumerConf kafka client settings.
type KafkaConsumerConf ¶
type KafkaConsumerConf struct { Username string `json:",optional"` Password string `json:",optional"` ProducerAck string `json:",optional"` CompressType string `json:",optional"` TLS TLSConfig `json:",optional"` Brokers []string Topics []string Group string }
KafkaConsumerConf kafka client settings.
type KafkaProducerConf ¶
type KafkaProducerConf struct { Username string `json:",optional"` Password string `json:",optional"` ProducerAck string `json:",optional"` CompressType string `json:",optional"` TLS TLSConfig `json:",optional"` Brokers []string Topic string }
KafkaProducerConf kafka producer settings.
type KafkaShardingConsumerConf ¶ added in v0.1.17
type KafkaShardingConsumerConf struct { KafkaConsumerConf Concurrency int `json:",default=2048"` QueueBuffer int `json:",default=128"` ClientId string `json:",default=sarama"` }
type MessageHandlerF ¶
type MsgChannelValue ¶ added in v0.1.14
type MsgChannelValue struct { AggregationID string //maybe userID or super groupID TriggerID string MsgList []*MsgDataToMQCtx }
type MsgConsumerMessage ¶ added in v0.1.21
type MsgConsumerMessage struct { Key string TriggerID string MsgList []*MsgDataToMQCtx }
func (MsgConsumerMessage) String ¶ added in v0.1.21
func (m MsgConsumerMessage) String() string
type MsgDataToMQCtx ¶ added in v0.1.20
type Option ¶ added in v0.1.21
type Option func(c *Config)
func WithBuffer ¶ added in v0.1.21
func WithDataBuffer ¶ added in v0.1.21
func WithInterval ¶ added in v0.1.21
func WithSyncWait ¶ added in v0.1.21
func WithWorker ¶ added in v0.1.21
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
func GetCachedMQClient ¶
func GetCachedMQClient(c *KafkaProducerConf) *Producer
func MustKafkaProducer ¶
func MustKafkaProducer(c *KafkaProducerConf) *Producer
func (*Producer) SendMessage ¶
func (p *Producer) SendMessage(ctx context.Context, key string, value []byte) (partition int32, offset int64, err error)
SendMessage Input send msg to kafka NOTE: If producer has beed created failed, the message will lose.
func (*Producer) SendMessageV2 ¶ added in v0.1.20
type ShardingConsumerGroup ¶ added in v0.1.17
type ShardingConsumerGroup struct { sarama.ConsumerGroup // contains filtered or unexported fields }
ShardingConsumerGroup represents a Sarama consumer GroupName consumer
func MustShardingConsumerGroup ¶ added in v0.1.17
func MustShardingConsumerGroup(c *KafkaShardingConsumerConf) *ShardingConsumerGroup
func (*ShardingConsumerGroup) Cleanup ¶ added in v0.1.17
func (c *ShardingConsumerGroup) Cleanup(sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*ShardingConsumerGroup) ConsumeClaim ¶ added in v0.1.17
func (c *ShardingConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) (err error)
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (*ShardingConsumerGroup) Group ¶ added in v0.1.17
func (c *ShardingConsumerGroup) Group() string
func (*ShardingConsumerGroup) RegisterHandler ¶ added in v0.1.17
func (c *ShardingConsumerGroup) RegisterHandler(cb MessageHandlerF)
func (*ShardingConsumerGroup) Run ¶ added in v0.1.17
func (c *ShardingConsumerGroup) Run(channelID int)
func (*ShardingConsumerGroup) Setup ¶ added in v0.1.17
func (c *ShardingConsumerGroup) Setup(sarama.ConsumerGroupSession) error
Setup is run at the beginning of a new session, before ConsumeClaim
func (*ShardingConsumerGroup) Sharding ¶ added in v0.1.17
func (c *ShardingConsumerGroup) Sharding(message *sarama.ConsumerMessage) int
func (*ShardingConsumerGroup) Start ¶ added in v0.1.17
func (c *ShardingConsumerGroup) Start()
Start start consume messages, watch signals
func (*ShardingConsumerGroup) Stop ¶ added in v0.1.17
func (c *ShardingConsumerGroup) Stop()
Stop Stop consume messages, watch signals
func (*ShardingConsumerGroup) Topics ¶ added in v0.1.17
func (c *ShardingConsumerGroup) Topics() []string
type TriggerChannelValue ¶ added in v0.1.14
type TriggerChannelValue struct {
// contains filtered or unexported fields
}