kafka

package
v0.1.21 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 13, 2024 License: Apache-2.0 Imports: 31 Imported by: 3

Documentation

Index

Constants

View Source
const (
	ConsumerMsgs        = 3
	AggregationMessages = 4
	ChannelNum          = 100
)

Variables

View Source
var (
	DefaultDataChanSize = 1000
	DefaultSize         = 100
	DefaultBuffer       = 100
	DefaultWorker       = 5
	DefaultInterval     = time.Second
)

Functions

func BuildConsumerGroupConfig added in v0.1.21

func BuildConsumerGroupConfig(conf *KafkaConsumerConf, initial int64, autoCommitEnable bool) (*sarama.Config, error)

func BuildProducerConfig added in v0.1.21

func BuildProducerConfig(conf KafkaProducerConf) (*sarama.Config, error)

func Check added in v0.1.21

func Check(ctx context.Context, conf *KafkaConsumerConf, topics []string) error

func NewConsumerGroup added in v0.1.21

func NewConsumerGroup(conf *sarama.Config, addr []string, groupID string) (sarama.ConsumerGroup, error)

func NewProducer added in v0.1.21

func NewProducer(conf *sarama.Config, addr []string) (sarama.SyncProducer, error)

func OperationIDGenerator added in v0.1.14

func OperationIDGenerator() string

func WrapperTracerHandler added in v0.1.20

func WrapperTracerHandler(aggregationID, triggerID string, msg *MsgDataToMQCtx, cb func(ctx context.Context, method, key string, value []byte) error) error

Types

type BatchAggregationIdListHandlerF added in v0.1.14

type BatchAggregationIdListHandlerF func(triggerID string, idList []string)

type BatchConsumerGroup added in v0.1.14

type BatchConsumerGroup struct {
	sarama.ConsumerGroup
	// contains filtered or unexported fields
}

BatchConsumerGroup kafka consumer

func MustKafkaBatchConsumer added in v0.1.14

func MustKafkaBatchConsumer(c *KafkaConsumerConf) *BatchConsumerGroup

func (*BatchConsumerGroup) Cleanup added in v0.1.14

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*BatchConsumerGroup) ConsumeClaim added in v0.1.14

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*BatchConsumerGroup) Group added in v0.1.14

func (c *BatchConsumerGroup) Group() string

func (*BatchConsumerGroup) MessagesDistributionHandle added in v0.1.14

func (c *BatchConsumerGroup) MessagesDistributionHandle()

func (*BatchConsumerGroup) RegisterHandlers added in v0.1.14

func (*BatchConsumerGroup) Run added in v0.1.14

func (c *BatchConsumerGroup) Run(channelID int)

func (*BatchConsumerGroup) Setup added in v0.1.14

Setup is run at the beginning of a new session, before ConsumeClaim

func (*BatchConsumerGroup) Start added in v0.1.14

func (c *BatchConsumerGroup) Start()

Start start consume messages, watch signals

func (*BatchConsumerGroup) Stop added in v0.1.14

func (c *BatchConsumerGroup) Stop()

Stop Stop consume messages, watch signals

func (*BatchConsumerGroup) Topics added in v0.1.14

func (c *BatchConsumerGroup) Topics() []string

type BatchConsumerGroupV2 added in v0.1.21

type BatchConsumerGroupV2 struct {
	sarama.ConsumerGroup
	// contains filtered or unexported fields
}

BatchConsumerGroupV2 kafka consumer

func MustKafkaBatchConsumerV2 added in v0.1.21

func MustKafkaBatchConsumerV2(c *KafkaConsumerConf, autoCommitEnable bool) *BatchConsumerGroupV2

func (*BatchConsumerGroupV2) Cleanup added in v0.1.21

func (*BatchConsumerGroupV2) ConsumeClaim added in v0.1.21

func (*BatchConsumerGroupV2) Group added in v0.1.21

func (c *BatchConsumerGroupV2) Group() string

func (*BatchConsumerGroupV2) RegisterHandler added in v0.1.21

func (c *BatchConsumerGroupV2) RegisterHandler(cb2 BatchMessageHandlerFV2)

func (*BatchConsumerGroupV2) Setup added in v0.1.21

func (*BatchConsumerGroupV2) Start added in v0.1.21

func (c *BatchConsumerGroupV2) Start()

Start start consume messages, watch signals

func (*BatchConsumerGroupV2) Stop added in v0.1.21

func (c *BatchConsumerGroupV2) Stop()

Stop Stop consume messages, watch signals

func (*BatchConsumerGroupV2) Topics added in v0.1.21

func (c *BatchConsumerGroupV2) Topics() []string

type BatchMessageHandlerF added in v0.1.14

type BatchMessageHandlerF func(msgs MsgChannelValue)

type BatchMessageHandlerFV2 added in v0.1.21

type BatchMessageHandlerFV2 func(channelID int, msg *MsgConsumerMessage)

type BatcherConsumerMessage added in v0.1.21

type BatcherConsumerMessage struct {
	Do         func(channelID int, msg *MsgConsumerMessage)
	OnComplete func(lastMessage *sarama.ConsumerMessage, totalCount int)
	Sharding   func(key string) int
	Key        func(data *sarama.ConsumerMessage) string
	HookFunc   func(triggerID string, messages map[string][]*sarama.ConsumerMessage, totalCount int, lastMessage *sarama.ConsumerMessage)
	// contains filtered or unexported fields
}

func NewBatcherConsumerMessage added in v0.1.21

func NewBatcherConsumerMessage(opts ...Option) *BatcherConsumerMessage

func (*BatcherConsumerMessage) Close added in v0.1.21

func (b *BatcherConsumerMessage) Close()

func (*BatcherConsumerMessage) Put added in v0.1.21

func (*BatcherConsumerMessage) Start added in v0.1.21

func (b *BatcherConsumerMessage) Start() error

func (*BatcherConsumerMessage) Worker added in v0.1.21

func (b *BatcherConsumerMessage) Worker() int

type CMessage added in v0.1.17

type CMessage struct {
	Message     *sarama.ConsumerMessage
	MarkMessage func()
}

type Cmd2Value added in v0.1.14

type Cmd2Value struct {
	Cmd   int
	Value interface{}
}

type Config added in v0.1.21

type Config struct {
	// contains filtered or unexported fields
}

type ConsumerGroup

type ConsumerGroup struct {
	sarama.ConsumerGroup
	// contains filtered or unexported fields
}

ConsumerGroup kafka consumer

func MustKafkaConsumer

func MustKafkaConsumer(c *KafkaConsumerConf) *ConsumerGroup

func (*ConsumerGroup) Cleanup

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*ConsumerGroup) ConsumeClaim

func (c *ConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*ConsumerGroup) Group

func (c *ConsumerGroup) Group() string

func (*ConsumerGroup) RegisterHandlers

func (c *ConsumerGroup) RegisterHandlers(topic string, cb MessageHandlerF)

func (*ConsumerGroup) Setup

Setup is run at the beginning of a new session, before ConsumeClaim

func (*ConsumerGroup) Start

func (c *ConsumerGroup) Start()

Start start consume messages, watch signals

func (*ConsumerGroup) Stop

func (c *ConsumerGroup) Stop()

Stop Stop consume messages, watch signals

func (*ConsumerGroup) Topics

func (c *ConsumerGroup) Topics() []string

type KafkaBatchConsumerConf added in v0.1.14

type KafkaBatchConsumerConf struct {
	KafkaConsumerConf
	Duration   int `json:",default=100"`
	ChannelNum int `json:"default=50"`
}

KafkaBatchConsumerConf kafka client settings.

type KafkaConsumerConf

type KafkaConsumerConf struct {
	Username     string    `json:",optional"`
	Password     string    `json:",optional"`
	ProducerAck  string    `json:",optional"`
	CompressType string    `json:",optional"`
	TLS          TLSConfig `json:",optional"`
	Brokers      []string
	Topics       []string
	Group        string
}

KafkaConsumerConf kafka client settings.

type KafkaProducerConf

type KafkaProducerConf struct {
	Username     string    `json:",optional"`
	Password     string    `json:",optional"`
	ProducerAck  string    `json:",optional"`
	CompressType string    `json:",optional"`
	TLS          TLSConfig `json:",optional"`
	Brokers      []string
	Topic        string
}

KafkaProducerConf kafka producer settings.

type KafkaShardingConsumerConf added in v0.1.17

type KafkaShardingConsumerConf struct {
	KafkaConsumerConf
	Concurrency int    `json:",default=2048"`
	QueueBuffer int    `json:",default=128"`
	ClientId    string `json:",default=sarama"`
}

type MessageHandlerF

type MessageHandlerF func(ctx context.Context, method, key string, value []byte)

type MsgChannelValue added in v0.1.14

type MsgChannelValue struct {
	AggregationID string //maybe userID or super groupID
	TriggerID     string
	MsgList       []*MsgDataToMQCtx
}

type MsgConsumerMessage added in v0.1.21

type MsgConsumerMessage struct {
	Key       string
	TriggerID string
	MsgList   []*MsgDataToMQCtx
}

func (MsgConsumerMessage) String added in v0.1.21

func (m MsgConsumerMessage) String() string

type MsgDataToMQCtx added in v0.1.20

type MsgDataToMQCtx struct {
	Ctx     context.Context
	Method  string
	MsgData []byte
}

type None added in v0.1.17

type None struct{}

type Option added in v0.1.21

type Option func(c *Config)

func WithBuffer added in v0.1.21

func WithBuffer(b int) Option

func WithDataBuffer added in v0.1.21

func WithDataBuffer(size int) Option

func WithInterval added in v0.1.21

func WithInterval(i time.Duration) Option

func WithSize added in v0.1.21

func WithSize(s int) Option

func WithSyncWait added in v0.1.21

func WithSyncWait(wait bool) Option

func WithWorker added in v0.1.21

func WithWorker(w int) Option

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func GetCachedMQClient

func GetCachedMQClient(c *KafkaProducerConf) *Producer

func MustKafkaProducer

func MustKafkaProducer(c *KafkaProducerConf) *Producer

func (*Producer) Close

func (p *Producer) Close() (err error)

func (*Producer) SendMessage

func (p *Producer) SendMessage(ctx context.Context, key string, value []byte) (partition int32, offset int64, err error)

SendMessage Input send msg to kafka NOTE: If producer has beed created failed, the message will lose.

func (*Producer) SendMessageV2 added in v0.1.20

func (p *Producer) SendMessageV2(ctx context.Context, method, key string, value []byte) (partition int32, offset int64, err error)

func (*Producer) Topic

func (p *Producer) Topic() string

type ShardingConsumerGroup added in v0.1.17

type ShardingConsumerGroup struct {
	sarama.ConsumerGroup
	// contains filtered or unexported fields
}

ShardingConsumerGroup represents a Sarama consumer GroupName consumer

func MustShardingConsumerGroup added in v0.1.17

func MustShardingConsumerGroup(c *KafkaShardingConsumerConf) *ShardingConsumerGroup

func (*ShardingConsumerGroup) Cleanup added in v0.1.17

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*ShardingConsumerGroup) ConsumeClaim added in v0.1.17

func (c *ShardingConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) (err error)

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*ShardingConsumerGroup) Group added in v0.1.17

func (c *ShardingConsumerGroup) Group() string

func (*ShardingConsumerGroup) RegisterHandler added in v0.1.17

func (c *ShardingConsumerGroup) RegisterHandler(cb MessageHandlerF)

func (*ShardingConsumerGroup) Run added in v0.1.17

func (c *ShardingConsumerGroup) Run(channelID int)

func (*ShardingConsumerGroup) Setup added in v0.1.17

Setup is run at the beginning of a new session, before ConsumeClaim

func (*ShardingConsumerGroup) Sharding added in v0.1.17

func (c *ShardingConsumerGroup) Sharding(message *sarama.ConsumerMessage) int

func (*ShardingConsumerGroup) Start added in v0.1.17

func (c *ShardingConsumerGroup) Start()

Start start consume messages, watch signals

func (*ShardingConsumerGroup) Stop added in v0.1.17

func (c *ShardingConsumerGroup) Stop()

Stop Stop consume messages, watch signals

func (*ShardingConsumerGroup) Topics added in v0.1.17

func (c *ShardingConsumerGroup) Topics() []string

type TLSConfig added in v0.1.21

type TLSConfig struct {
	EnableTLS          bool   `json:",optional"`
	CACrt              string `json:",optional"`
	ClientCrt          string `json:",optional"`
	ClientKey          string `json:",optional"`
	ClientKeyPwd       string `json:",optional"`
	InsecureSkipVerify bool   `json:",optional"`
}

type TriggerChannelValue added in v0.1.14

type TriggerChannelValue struct {
	// contains filtered or unexported fields
}

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL