Documentation ¶
Index ¶
- Constants
- func InitMetrics(appName string)
- type KafkaConsumer
- type MsgConsumerData
- type Option
- func AutoOffsetReset(autoOffsetReset string) Option
- func BootstrapServers(bootstrapServers string) Option
- func ChanConsumerData(chanConsumerData chan MsgConsumerData) Option
- func FetchMaxBytes(fetchMaxBytes int) Option
- func GroupId(groupId string) Option
- func HeartbeatIntervalMs(heartbeatIntervalMs int) Option
- func MaxPartitionFetchBytes(maxPartitionFetchBytes int) Option
- func MaxPollIntervalMs(maxPollIntervalMs int) Option
- func SaslMechanism(saslMechanism string) Option
- func SaslPassword(saslPassword string) Option
- func SaslUsername(saslUsername string) Option
- func SecurityProtocol(securityProtocol string) Option
- func SessionTimeoutMs(sessionTimeoutMs int) Option
- func SignChan(signChan chan os.Signal) Option
- func SubscribeTopics(subscribeTopics []string) Option
- type Options
Constants ¶
View Source
const ( DefaultBootstrapServers = "127.0.0.1:9092" DefaultGroupId = "test-group" // DefaultAutoOffsetReset 消费位点重置策略 DefaultAutoOffsetReset = "latest" // DefaultHeartbeatIntervalMs 指定对消费者组协调器的心跳检查之间的间隔(以毫秒为单位),以指示消费者处于活动状态并已连接 DefaultHeartbeatIntervalMs = 3000 // DefaultSessionTimeoutMs 指定消费者组中的消费者在被视为不活动之前可以与代理断开联系的最长时间(以毫秒为单位) DefaultSessionTimeoutMs = 30000 // DefaultMaxPollIntervalMs 设置检查消费者是否继续处理消息的时间间隔 DefaultMaxPollIntervalMs = 300000 // DefaultFetchMaxBytes 设置一次从代理获取的数据量的最大字节数限制 DefaultFetchMaxBytes = 52428800 // DefaultMaxPartitionFetchBytes 设置为每个分区返回多少数据的最大字节数限制 DefaultMaxPartitionFetchBytes = 1048576 DefaultSecurityProtocol = "PLAINTEXT" DefaultSaslMechanism = "PLAIN" DefaultSaslUsername = "kafka" DefaultSaslPassword = "123456" DefaultSubscribeTopics = "test-topic" )
Variables ¶
This section is empty.
Functions ¶
func InitMetrics ¶
func InitMetrics(appName string)
Types ¶
type KafkaConsumer ¶
func (*KafkaConsumer) InitConsumer ¶
func (k *KafkaConsumer) InitConsumer(options ...Option) (err error)
InitConsumer
1. confluent-kafka-go build refer to docs/kafka.md
type MsgConsumerData ¶
type Option ¶
type Option func(*Options)
func AutoOffsetReset ¶
func BootstrapServers ¶
func ChanConsumerData ¶
func ChanConsumerData(chanConsumerData chan MsgConsumerData) Option
func FetchMaxBytes ¶
func HeartbeatIntervalMs ¶
func MaxPartitionFetchBytes ¶
func MaxPollIntervalMs ¶
func SaslMechanism ¶
func SaslPassword ¶
func SaslUsername ¶
func SecurityProtocol ¶
func SessionTimeoutMs ¶
func SubscribeTopics ¶
Click to show internal directories.
Click to hide internal directories.