kafka

package
v0.1.22 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 22, 2024 License: Apache-2.0 Imports: 31 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ConsumerMsgs        = 3
	AggregationMessages = 4
	ChannelNum          = 100
)

Variables

View Source
var (
	DefaultDataChanSize = 1000
	DefaultSize         = 100
	DefaultBuffer       = 100
	DefaultWorker       = 5
	DefaultInterval     = time.Second
)

Functions

func BuildConsumerGroupConfig

func BuildConsumerGroupConfig(conf *KafkaConsumerConf, initial int64, autoCommitEnable bool) (*sarama.Config, error)

func BuildProducerConfig

func BuildProducerConfig(conf KafkaProducerConf) (*sarama.Config, error)

func Check

func Check(ctx context.Context, conf *KafkaConsumerConf, topics []string) error

func NewConsumerGroup

func NewConsumerGroup(conf *sarama.Config, addr []string, groupID string) (sarama.ConsumerGroup, error)

func NewProducer

func NewProducer(conf *sarama.Config, addr []string) (sarama.SyncProducer, error)

func OperationIDGenerator

func OperationIDGenerator() string

func WrapperTracerHandler

func WrapperTracerHandler(aggregationID, triggerID string, msg *MsgDataToMQCtx, cb func(ctx context.Context, method, key string, value []byte) error) error

Types

type BatchAggregationIdListHandlerF

type BatchAggregationIdListHandlerF func(triggerID string, idList []string)

type BatchConsumerGroup

type BatchConsumerGroup struct {
	sarama.ConsumerGroup
	// contains filtered or unexported fields
}

BatchConsumerGroup kafka consumer

func MustKafkaBatchConsumer

func MustKafkaBatchConsumer(c *KafkaConsumerConf) *BatchConsumerGroup

func (*BatchConsumerGroup) Cleanup

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*BatchConsumerGroup) ConsumeClaim

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*BatchConsumerGroup) Group

func (c *BatchConsumerGroup) Group() string

func (*BatchConsumerGroup) MessagesDistributionHandle

func (c *BatchConsumerGroup) MessagesDistributionHandle()

func (*BatchConsumerGroup) RegisterHandlers

func (*BatchConsumerGroup) Run

func (c *BatchConsumerGroup) Run(channelID int)

func (*BatchConsumerGroup) Setup

Setup is run at the beginning of a new session, before ConsumeClaim

func (*BatchConsumerGroup) Start

func (c *BatchConsumerGroup) Start()

Start start consume messages, watch signals

func (*BatchConsumerGroup) Stop

func (c *BatchConsumerGroup) Stop()

Stop Stop consume messages, watch signals

func (*BatchConsumerGroup) Topics

func (c *BatchConsumerGroup) Topics() []string

type BatchConsumerGroupV2

type BatchConsumerGroupV2 struct {
	sarama.ConsumerGroup
	// contains filtered or unexported fields
}

BatchConsumerGroupV2 kafka consumer

func MustKafkaBatchConsumerV2

func MustKafkaBatchConsumerV2(c *KafkaConsumerConf, autoCommitEnable bool) *BatchConsumerGroupV2

func (*BatchConsumerGroupV2) Cleanup

func (*BatchConsumerGroupV2) ConsumeClaim

func (*BatchConsumerGroupV2) Group

func (c *BatchConsumerGroupV2) Group() string

func (*BatchConsumerGroupV2) RegisterHandler

func (c *BatchConsumerGroupV2) RegisterHandler(cb2 BatchMessageHandlerFV2)

func (*BatchConsumerGroupV2) Setup

func (*BatchConsumerGroupV2) Start

func (c *BatchConsumerGroupV2) Start()

Start start consume messages, watch signals

func (*BatchConsumerGroupV2) Stop

func (c *BatchConsumerGroupV2) Stop()

Stop Stop consume messages, watch signals

func (*BatchConsumerGroupV2) Topics

func (c *BatchConsumerGroupV2) Topics() []string

type BatchMessageHandlerF

type BatchMessageHandlerF func(msgs MsgChannelValue)

type BatchMessageHandlerFV2

type BatchMessageHandlerFV2 func(channelID int, msg *MsgConsumerMessage)

type BatcherConsumerMessage

type BatcherConsumerMessage struct {
	Do         func(channelID int, msg *MsgConsumerMessage)
	OnComplete func(lastMessage *sarama.ConsumerMessage, totalCount int)
	Sharding   func(key string) int
	Key        func(data *sarama.ConsumerMessage) string
	HookFunc   func(triggerID string, messages map[string][]*sarama.ConsumerMessage, totalCount int, lastMessage *sarama.ConsumerMessage)
	// contains filtered or unexported fields
}

func NewBatcherConsumerMessage

func NewBatcherConsumerMessage(opts ...Option) *BatcherConsumerMessage

func (*BatcherConsumerMessage) Close

func (b *BatcherConsumerMessage) Close()

func (*BatcherConsumerMessage) Put

func (*BatcherConsumerMessage) Start

func (b *BatcherConsumerMessage) Start() error

func (*BatcherConsumerMessage) Worker

func (b *BatcherConsumerMessage) Worker() int

type CMessage

type CMessage struct {
	Message     *sarama.ConsumerMessage
	MarkMessage func()
}

type Cmd2Value

type Cmd2Value struct {
	Cmd   int
	Value interface{}
}

type Config

type Config struct {
	// contains filtered or unexported fields
}

type ConsumerGroup

type ConsumerGroup struct {
	sarama.ConsumerGroup
	// contains filtered or unexported fields
}

ConsumerGroup kafka consumer

func MustKafkaConsumer

func MustKafkaConsumer(c *KafkaConsumerConf) *ConsumerGroup

func (*ConsumerGroup) Cleanup

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*ConsumerGroup) ConsumeClaim

func (c *ConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*ConsumerGroup) Group

func (c *ConsumerGroup) Group() string

func (*ConsumerGroup) RegisterHandlers

func (c *ConsumerGroup) RegisterHandlers(topic string, cb MessageHandlerF)

func (*ConsumerGroup) Setup

Setup is run at the beginning of a new session, before ConsumeClaim

func (*ConsumerGroup) Start

func (c *ConsumerGroup) Start()

Start start consume messages, watch signals

func (*ConsumerGroup) Stop

func (c *ConsumerGroup) Stop()

Stop Stop consume messages, watch signals

func (*ConsumerGroup) Topics

func (c *ConsumerGroup) Topics() []string

type KafkaBatchConsumerConf

type KafkaBatchConsumerConf struct {
	KafkaConsumerConf
	Duration   int `json:",default=100"`
	ChannelNum int `json:"default=50"`
}

KafkaBatchConsumerConf kafka client settings.

type KafkaConsumerConf

type KafkaConsumerConf struct {
	Username     string    `json:",optional"`
	Password     string    `json:",optional"`
	ProducerAck  string    `json:",optional"`
	CompressType string    `json:",optional"`
	TLS          TLSConfig `json:",optional"`
	Brokers      []string
	Topics       []string
	Group        string
}

KafkaConsumerConf kafka client settings.

type KafkaProducerConf

type KafkaProducerConf struct {
	Username     string    `json:",optional"`
	Password     string    `json:",optional"`
	ProducerAck  string    `json:",optional"`
	CompressType string    `json:",optional"`
	TLS          TLSConfig `json:",optional"`
	Brokers      []string
	Topic        string
}

KafkaProducerConf kafka producer settings.

type KafkaShardingConsumerConf

type KafkaShardingConsumerConf struct {
	KafkaConsumerConf
	Concurrency int    `json:",default=2048"`
	QueueBuffer int    `json:",default=128"`
	ClientId    string `json:",default=sarama"`
}

type MessageHandlerF

type MessageHandlerF func(ctx context.Context, method, key string, value []byte)

type MsgChannelValue

type MsgChannelValue struct {
	AggregationID string //maybe userID or super groupID
	TriggerID     string
	MsgList       []*MsgDataToMQCtx
}

type MsgConsumerMessage

type MsgConsumerMessage struct {
	Key       string
	TriggerID string
	MsgList   []*MsgDataToMQCtx
}

func (MsgConsumerMessage) String

func (m MsgConsumerMessage) String() string

type MsgDataToMQCtx

type MsgDataToMQCtx struct {
	Ctx     context.Context
	Method  string
	MsgData []byte
}

type None

type None struct{}

type Option

type Option func(c *Config)

func WithBuffer

func WithBuffer(b int) Option

func WithDataBuffer

func WithDataBuffer(size int) Option

func WithInterval

func WithInterval(i time.Duration) Option

func WithSize

func WithSize(s int) Option

func WithSyncWait

func WithSyncWait(wait bool) Option

func WithWorker

func WithWorker(w int) Option

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func GetCachedMQClient

func GetCachedMQClient(c *KafkaProducerConf) *Producer

func MustKafkaProducer

func MustKafkaProducer(c *KafkaProducerConf) *Producer

func (*Producer) Close

func (p *Producer) Close() (err error)

func (*Producer) SendMessage

func (p *Producer) SendMessage(ctx context.Context, key string, value []byte) (partition int32, offset int64, err error)

SendMessage Input send msg to kafka NOTE: If producer has beed created failed, the message will lose.

func (*Producer) SendMessageV2

func (p *Producer) SendMessageV2(ctx context.Context, method, key string, value []byte) (partition int32, offset int64, err error)

func (*Producer) Topic

func (p *Producer) Topic() string

type ShardingConsumerGroup

type ShardingConsumerGroup struct {
	sarama.ConsumerGroup
	// contains filtered or unexported fields
}

ShardingConsumerGroup represents a Sarama consumer GroupName consumer

func (*ShardingConsumerGroup) Cleanup

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*ShardingConsumerGroup) ConsumeClaim

func (c *ShardingConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) (err error)

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*ShardingConsumerGroup) Group

func (c *ShardingConsumerGroup) Group() string

func (*ShardingConsumerGroup) RegisterHandler

func (c *ShardingConsumerGroup) RegisterHandler(cb MessageHandlerF)

func (*ShardingConsumerGroup) Run

func (c *ShardingConsumerGroup) Run(channelID int)

func (*ShardingConsumerGroup) Setup

Setup is run at the beginning of a new session, before ConsumeClaim

func (*ShardingConsumerGroup) Sharding

func (c *ShardingConsumerGroup) Sharding(message *sarama.ConsumerMessage) int

func (*ShardingConsumerGroup) Start

func (c *ShardingConsumerGroup) Start()

Start start consume messages, watch signals

func (*ShardingConsumerGroup) Stop

func (c *ShardingConsumerGroup) Stop()

Stop Stop consume messages, watch signals

func (*ShardingConsumerGroup) Topics

func (c *ShardingConsumerGroup) Topics() []string

type TLSConfig

type TLSConfig struct {
	EnableTLS          bool   `json:",optional"`
	CACrt              string `json:",optional"`
	ClientCrt          string `json:",optional"`
	ClientKey          string `json:",optional"`
	ClientKeyPwd       string `json:",optional"`
	InsecureSkipVerify bool   `json:",optional"`
}

type TriggerChannelValue

type TriggerChannelValue struct {
	// contains filtered or unexported fields
}

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL