consumer

package
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 9, 2023 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultBootstrapServers = "127.0.0.1:9092"
	DefaultGroupId          = "test-group"
	// DefaultAutoOffsetReset 消费位点重置策略
	DefaultAutoOffsetReset = "latest"
	// DefaultHeartbeatIntervalMs 指定对消费者组协调器的心跳检查之间的间隔(以毫秒为单位),以指示消费者处于活动状态并已连接
	DefaultHeartbeatIntervalMs = 3000
	// DefaultSessionTimeoutMs 指定消费者组中的消费者在被视为不活动之前可以与代理断开联系的最长时间(以毫秒为单位)
	DefaultSessionTimeoutMs = 30000
	// DefaultMaxPollIntervalMs 设置检查消费者是否继续处理消息的时间间隔
	DefaultMaxPollIntervalMs = 300000
	// DefaultFetchMaxBytes 设置一次从代理获取的数据量的最大字节数限制
	DefaultFetchMaxBytes = 52428800
	// DefaultMaxPartitionFetchBytes 设置为每个分区返回多少数据的最大字节数限制
	DefaultMaxPartitionFetchBytes = 1048576
	DefaultSecurityProtocol       = "PLAINTEXT"
	DefaultSaslMechanism          = "PLAIN"
	DefaultSaslUsername           = "kafka"
	DefaultSaslPassword           = "123456"
	DefaultSubscribeTopics        = "test-topic"
)

Variables

This section is empty.

Functions

func InitMetrics

func InitMetrics(appName string)

Types

type KafkaConsumer

type KafkaConsumer struct {
	Consumer *kafka.Consumer
}

func (*KafkaConsumer) InitConsumer

func (k *KafkaConsumer) InitConsumer(options ...Option) (err error)

InitConsumer

1. confluent-kafka-go build refer to docs/kafka.md

type MsgConsumerData

type MsgConsumerData struct {
	Data      []byte
	Partition int32
	Offset    int64
}

type Option

type Option func(*Options)

func AutoOffsetReset

func AutoOffsetReset(autoOffsetReset string) Option

func BootstrapServers

func BootstrapServers(bootstrapServers string) Option

func ChanConsumerData

func ChanConsumerData(chanConsumerData chan MsgConsumerData) Option

func FetchMaxBytes

func FetchMaxBytes(fetchMaxBytes int) Option

func GroupId

func GroupId(groupId string) Option

func HeartbeatIntervalMs

func HeartbeatIntervalMs(heartbeatIntervalMs int) Option

func MaxPartitionFetchBytes

func MaxPartitionFetchBytes(maxPartitionFetchBytes int) Option

func MaxPollIntervalMs

func MaxPollIntervalMs(maxPollIntervalMs int) Option

func SaslMechanism

func SaslMechanism(saslMechanism string) Option

func SaslPassword

func SaslPassword(saslPassword string) Option

func SaslUsername

func SaslUsername(saslUsername string) Option

func SecurityProtocol

func SecurityProtocol(securityProtocol string) Option

func SessionTimeoutMs

func SessionTimeoutMs(sessionTimeoutMs int) Option

func SignChan

func SignChan(signChan chan os.Signal) Option

func SubscribeTopics

func SubscribeTopics(subscribeTopics []string) Option

type Options

type Options struct {
	// contains filtered or unexported fields
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL