Documentation
¶
Index ¶
- Constants
- func InitMetrics(appName string)
- type KafkaProducer
- type MsgProducerData
- type Option
- func Acks(acks string) Option
- func BatchSize(batchSize int) Option
- func BootstrapServers(bootstrapServers string) Option
- func CompressionType(compressionType string) Option
- func LingerMs(lingerMs int) Option
- func MessageMaxBytes(messageMaxBytes int) Option
- func Retries(retries int) Option
- func RetryBackoffMs(retryBackoffMs int) Option
- func SaslMechanism(saslMechanism string) Option
- func SaslPassword(saslPassword string) Option
- func SaslUsername(saslUsername string) Option
- func SecurityProtocol(securityProtocol string) Option
- func SignChan(signChan chan os.Signal) Option
- func StickyPartitioningLingerMs(stickyPartitioningLingerMs int) Option
- type Options
Constants ¶
View Source
const ( DefaultBootstrapServers = "127.0.0.1:9092" // DefaultMessageMaxBytes 允许的最大记录批量大小 DefaultMessageMaxBytes = 1048588 // DefaultBatchSize 发往每个分区(Partition)的消息缓存量(消息内容的字节数之和, 不是条数) DefaultBatchSize = 16384 // DefaultLingerMs 每条消息在缓存中的最长时间 DefaultLingerMs = 1000 // DefaultStickyPartitioningLingerMs 黏性分区策略每条消息在缓存中的最长时间 DefaultStickyPartitioningLingerMs = 1000 // DefaultRetries 重试次数 DefaultRetries = 3 // DefaultRetryBackoffMs 重试间隔 DefaultRetryBackoffMs = 1000 // DefaultAcks 确认机制 DefaultAcks = "1" // DefaultCompressionType 压缩方式 DefaultCompressionType = "snappy" DefaultSecurityProtocol = "PLAINTEXT" DefaultSaslMechanism = "PLAIN" DefaultSaslUsername = "kafka" DefaultSaslPassword = "123456" )
Variables ¶
This section is empty.
Functions ¶
func InitMetrics ¶
func InitMetrics(appName string)
Types ¶
type KafkaProducer ¶
func (*KafkaProducer) InitProducer ¶
func (k *KafkaProducer) InitProducer(fn func(), options ...Option) (err error)
InitProducer
1. confluent-kafka-go build refer to docs/kafka.md
2. (fn func()) examples:
type Handler struct { sourceChan chan MsgProducerData }
func (h *Handler) producerFunc(p *KafkaProducer) func() { return func() { select { case v := <-h.sourceChan: go func() { _ = p.SendMsg(&v) }() } } }
func (*KafkaProducer) SendMsg ¶
func (k *KafkaProducer) SendMsg(v *MsgProducerData) (err error)
type MsgProducerData ¶
type Option ¶
type Option func(*Options)
func BootstrapServers ¶
func CompressionType ¶
func MessageMaxBytes ¶
func RetryBackoffMs ¶
func SaslMechanism ¶
func SaslPassword ¶
func SaslUsername ¶
func SecurityProtocol ¶
Click to show internal directories.
Click to hide internal directories.