producer

package
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 9, 2023 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultBootstrapServers = "127.0.0.1:9092"
	// DefaultMessageMaxBytes 允许的最大记录批量大小
	DefaultMessageMaxBytes = 1048588
	// DefaultBatchSize 发往每个分区(Partition)的消息缓存量(消息内容的字节数之和, 不是条数)
	DefaultBatchSize = 16384
	// DefaultLingerMs 每条消息在缓存中的最长时间
	DefaultLingerMs = 1000
	// DefaultStickyPartitioningLingerMs 黏性分区策略每条消息在缓存中的最长时间
	DefaultStickyPartitioningLingerMs = 1000
	// DefaultRetries 重试次数
	DefaultRetries = 3
	// DefaultRetryBackoffMs 重试间隔
	DefaultRetryBackoffMs = 1000
	// DefaultAcks 确认机制
	DefaultAcks = "1"
	// DefaultCompressionType 压缩方式
	DefaultCompressionType  = "snappy"
	DefaultSecurityProtocol = "PLAINTEXT"
	DefaultSaslMechanism    = "PLAIN"
	DefaultSaslUsername     = "kafka"
	DefaultSaslPassword     = "123456"
)

Variables

This section is empty.

Functions

func InitMetrics

func InitMetrics(appName string)

Types

type KafkaProducer

type KafkaProducer struct {
	Producer *kafka.Producer
}

func (*KafkaProducer) InitProducer

func (k *KafkaProducer) InitProducer(fn func(), options ...Option) (err error)

InitProducer

1. confluent-kafka-go build refer to docs/kafka.md

2. (fn func()) examples:

type Handler struct {
	 sourceChan chan MsgProducerData
}
func (h *Handler) producerFunc(p *KafkaProducer) func() {
	 return func() {
		 select {
		 case v := <-h.sourceChan:
          go func() {
              _ = p.SendMsg(&v)
          }()
		 }
	 }
}

func (*KafkaProducer) SendMsg

func (k *KafkaProducer) SendMsg(v *MsgProducerData) (err error)

type MsgProducerData

type MsgProducerData struct {
	Data  string
	Key   string
	Topic string
}

type Option

type Option func(*Options)

func Acks

func Acks(acks string) Option

func BatchSize

func BatchSize(batchSize int) Option

func BootstrapServers

func BootstrapServers(bootstrapServers string) Option

func CompressionType

func CompressionType(compressionType string) Option

func LingerMs

func LingerMs(lingerMs int) Option

func MessageMaxBytes

func MessageMaxBytes(messageMaxBytes int) Option

func Retries

func Retries(retries int) Option

func RetryBackoffMs

func RetryBackoffMs(retryBackoffMs int) Option

func SaslMechanism

func SaslMechanism(saslMechanism string) Option

func SaslPassword

func SaslPassword(saslPassword string) Option

func SaslUsername

func SaslUsername(saslUsername string) Option

func SecurityProtocol

func SecurityProtocol(securityProtocol string) Option

func SignChan

func SignChan(signChan chan os.Signal) Option

func StickyPartitioningLingerMs

func StickyPartitioningLingerMs(stickyPartitioningLingerMs int) Option

type Options

type Options struct {
	// contains filtered or unexported fields
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL