kafka

package
v0.1.16 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 7, 2023 License: Apache-2.0 Imports: 19 Imported by: 3

Documentation

Index

Constants

View Source
const (
	ConsumerMsgs        = 3
	AggregationMessages = 4
	ChannelNum          = 100
)

Variables

This section is empty.

Functions

func OperationIDGenerator added in v0.1.14

func OperationIDGenerator() string

Types

type BatchAggregationIdListHandlerF added in v0.1.14

type BatchAggregationIdListHandlerF func(triggerID string, idList []string)

type BatchConsumerGroup added in v0.1.14

type BatchConsumerGroup struct {
	sarama.ConsumerGroup
	// contains filtered or unexported fields
}

BatchConsumerGroup kafka consumer

func MustKafkaBatchConsumer added in v0.1.14

func MustKafkaBatchConsumer(c *KafkaConsumerConf) *BatchConsumerGroup

func (*BatchConsumerGroup) Cleanup added in v0.1.14

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*BatchConsumerGroup) ConsumeClaim added in v0.1.14

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*BatchConsumerGroup) Group added in v0.1.14

func (c *BatchConsumerGroup) Group() string

func (*BatchConsumerGroup) MessagesDistributionHandle added in v0.1.14

func (c *BatchConsumerGroup) MessagesDistributionHandle()

func (*BatchConsumerGroup) RegisterHandlers added in v0.1.14

func (*BatchConsumerGroup) Run added in v0.1.14

func (c *BatchConsumerGroup) Run(channelID int)

func (*BatchConsumerGroup) Setup added in v0.1.14

Setup is run at the beginning of a new session, before ConsumeClaim

func (*BatchConsumerGroup) Start added in v0.1.14

func (c *BatchConsumerGroup) Start()

Start start consume messages, watch signals

func (*BatchConsumerGroup) Stop added in v0.1.14

func (c *BatchConsumerGroup) Stop()

Stop Stop consume messages, watch signals

func (*BatchConsumerGroup) Topics added in v0.1.14

func (c *BatchConsumerGroup) Topics() []string

type BatchMessageHandlerF added in v0.1.14

type BatchMessageHandlerF func(ctx context.Context, msgs MsgChannelValue)

type Cmd2Value added in v0.1.14

type Cmd2Value struct {
	Cmd   int
	Value interface{}
}

type ConsumerGroup

type ConsumerGroup struct {
	sarama.ConsumerGroup
	// contains filtered or unexported fields
}

ConsumerGroup kafka consumer

func MustKafkaConsumer

func MustKafkaConsumer(c *KafkaConsumerConf) *ConsumerGroup

func (*ConsumerGroup) Cleanup

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*ConsumerGroup) ConsumeClaim

func (c *ConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*ConsumerGroup) Group

func (c *ConsumerGroup) Group() string

func (*ConsumerGroup) RegisterHandlers

func (c *ConsumerGroup) RegisterHandlers(topic string, cb MessageHandlerF)

func (*ConsumerGroup) Setup

Setup is run at the beginning of a new session, before ConsumeClaim

func (*ConsumerGroup) Start

func (c *ConsumerGroup) Start()

Start start consume messages, watch signals

func (*ConsumerGroup) Stop

func (c *ConsumerGroup) Stop()

Stop Stop consume messages, watch signals

func (*ConsumerGroup) Topics

func (c *ConsumerGroup) Topics() []string

type KafkaBatchConsumerConf added in v0.1.14

type KafkaBatchConsumerConf struct {
	KafkaConsumerConf
	Duration   int `json:",default=100"`
	ChannelNum int `json:"default=50"`
}

KafkaBatchConsumerConf kafka client settings.

type KafkaConsumerConf

type KafkaConsumerConf struct {
	Brokers []string
	Topics  []string
	Group   string
}

KafkaConsumerConf kafka client settings.

type KafkaProducerConf

type KafkaProducerConf struct {
	Brokers []string
	Topic   string
}

KafkaProducerConf kafka producer settings.

type MessageHandlerF

type MessageHandlerF func(ctx context.Context, key string, value []byte)

type MsgChannelValue added in v0.1.14

type MsgChannelValue struct {
	AggregationID string //maybe userID or super groupID
	TriggerID     string
	MsgList       []*MsgDataToMQ
}

type MsgDataToMQ added in v0.1.14

type MsgDataToMQ struct {
	TraceId string
	MsgType string
	MsgData []byte
}

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func GetCachedMQClient

func GetCachedMQClient(c *KafkaProducerConf) *Producer

func MustKafkaProducer

func MustKafkaProducer(c *KafkaProducerConf) *Producer

func (*Producer) Close

func (p *Producer) Close() (err error)

func (*Producer) SendMessage

func (p *Producer) SendMessage(ctx context.Context, key string, value []byte) (partition int32, offset int64, err error)

SendMessage Input send msg to kafka NOTE: If producer has beed created failed, the message will lose.

func (*Producer) Topic

func (p *Producer) Topic() string

type TriggerChannelValue added in v0.1.14

type TriggerChannelValue struct {
	// contains filtered or unexported fields
}

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL