kafka

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 6, 2021 License: MIT Imports: 5 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type HandCommitConsumer

type HandCommitConsumer struct {
	HandleMsg func(msg *sarama.ConsumerMessage) bool // true,  msg means consume
}

func (*HandCommitConsumer) Cleanup

func (*HandCommitConsumer) ConsumeClaim

func (consumer *HandCommitConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

func (*HandCommitConsumer) Setup

type KafkaMessageQueueConsumer

type KafkaMessageQueueConsumer struct {
	ConsumerGroup sarama.ConsumerGroup
	Ctx           context.Context
	// contains filtered or unexported fields
}

func NewDefaultKafkaMessageQueueConsumer

func NewDefaultKafkaMessageQueueConsumer(brokers []string, groupName string, autoCommit bool, ctx context.Context) (*KafkaMessageQueueConsumer, error)

func (*KafkaMessageQueueConsumer) Close

func (c *KafkaMessageQueueConsumer) Close() error

func (*KafkaMessageQueueConsumer) ConsumeWithHandCommit

func (c *KafkaMessageQueueConsumer) ConsumeWithHandCommit(topics []string, handleMsg func(msg *sarama.ConsumerMessage) bool)

handleMsg return true means will commit this msg after handle it

type KafkaMessageQueueProducer

type KafkaMessageQueueProducer struct {
	SyncProducer *sarama.SyncProducer
	ReqTryCfg    RetryConfig
}

func NewDefaultKafkaMessageQueueProducer

func NewDefaultKafkaMessageQueueProducer(brokersAddress []string, reqTryCfg RetryConfig) (*KafkaMessageQueueProducer, error)

func (*KafkaMessageQueueProducer) Close

func (k *KafkaMessageQueueProducer) Close()

func (*KafkaMessageQueueProducer) CloseKafkaProducer

func (k *KafkaMessageQueueProducer) CloseKafkaProducer()

func (*KafkaMessageQueueProducer) SendJsonMessage

func (k *KafkaMessageQueueProducer) SendJsonMessage(topic string, jsonByte []byte) error

single send

func (*KafkaMessageQueueProducer) SendJsonMessages

func (k *KafkaMessageQueueProducer) SendJsonMessages(topic string, jsonBytes ...[]byte) error

batch send

type RetryConfig

type RetryConfig struct {
	RetryTime int
	DelayTime time.Duration
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL