kafka

package module
v2.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 21, 2023 License: MIT Imports: 20 Imported by: 1

Documentation

Index

Constants

View Source
const (
	LeastBytesBalancer    = "LeastBytes"
	RoundRobinBalancer    = "RoundRobin"
	HashBalancer          = "Hash"
	ReferenceHashBalancer = "ReferenceHash"
	Crc32Balancer         = "CRC32Balancer"
	Murmur2Balancer       = "Murmur2Balancer"
)

Variables

This section is empty.

Functions

func NewBroker

func NewBroker(opts ...broker.Option) broker.Broker

NewBroker .

func WithAllowAutoTopicCreation

func WithAllowAutoTopicCreation(enable bool) broker.Option

WithAllowAutoTopicCreation .

func WithAsync

func WithAsync(enable bool) broker.Option

WithAsync default:true

func WithBatchBytes

func WithBatchBytes(by int64) broker.Option

WithBatchBytes default:1048576 bytes

func WithBatchSize

func WithBatchSize(size int) broker.Option

WithBatchSize batch.size default:100

func WithBatchTimeout

func WithBatchTimeout(timeout time.Duration) broker.Option

WithBatchTimeout linger.ms default:10ms

func WithCommitInterval

func WithCommitInterval(interval time.Duration) broker.Option

WithCommitInterval .

func WithCrc32Balancer

func WithCrc32Balancer(consistent bool) broker.PublishOption

WithCrc32Balancer .

func WithDialer

func WithDialer(cfg *kafkaGo.Dialer) broker.Option

WithDialer .

func WithDialerTimeout

func WithDialerTimeout(tm time.Duration) broker.Option

WithDialerTimeout .

func WithEnableErrorLogger

func WithEnableErrorLogger(enable bool) broker.Option

WithEnableErrorLogger enable go-micro error logger

func WithEnableLogger

func WithEnableLogger(enable bool) broker.Option

WithEnableLogger enable go-micro info logger

func WithEnableOneTopicOneWriter

func WithEnableOneTopicOneWriter(enable bool) broker.Option

WithEnableOneTopicOneWriter .

func WithErrorLogger

func WithErrorLogger(l kafkaGo.Logger) broker.Option

WithErrorLogger inject error logger

func WithHashBalancer

func WithHashBalancer(hasher hash.Hash32) broker.PublishOption

WithHashBalancer .

func WithHeaders

func WithHeaders(headers map[string]interface{}) broker.PublishOption

WithHeaders .

func WithHeartbeatInterval

func WithHeartbeatInterval(interval time.Duration) broker.Option

WithHeartbeatInterval .

func WithLeastBytesBalancer

func WithLeastBytesBalancer() broker.PublishOption

WithLeastBytesBalancer .

func WithLogger

func WithLogger(l kafkaGo.Logger) broker.Option

WithLogger inject info logger

func WithMaxAttempts

func WithMaxAttempts(cnt int) broker.Option

WithMaxAttempts .

func WithMaxBytes

func WithMaxBytes(bytes int) broker.Option

WithMaxBytes .

func WithMaxWait

func WithMaxWait(time time.Duration) broker.Option

WithMaxWait fetch.max.wait.ms

func WithMessageKey

func WithMessageKey(key []byte) broker.PublishOption

WithMessageKey .

func WithMessageOffset

func WithMessageOffset(offset int64) broker.PublishOption

WithMessageOffset .

func WithMinBytes

func WithMinBytes(bytes int) broker.Option

WithMinBytes fetch.min.bytes

func WithMurmur2Balancer

func WithMurmur2Balancer(consistent bool) broker.PublishOption

WithMurmur2Balancer .

func WithPartitionWatchInterval

func WithPartitionWatchInterval(interval time.Duration) broker.Option

WithPartitionWatchInterval .

func WithPlainMechanism

func WithPlainMechanism(username, password string) broker.Option

WithPlainMechanism .

func WithPublishMaxAttempts

func WithPublishMaxAttempts(cnt int) broker.Option

WithPublishMaxAttempts .

func WithQueueCapacity

func WithQueueCapacity(cap int) broker.Option

WithQueueCapacity .

func WithReadLagInterval

func WithReadLagInterval(interval time.Duration) broker.Option

WithReadLagInterval .

func WithReadTimeout

func WithReadTimeout(timeout time.Duration) broker.Option

WithReadTimeout default:10s

func WithReaderConfig

func WithReaderConfig(cfg kafkaGo.ReaderConfig) broker.Option

WithReaderConfig .

func WithRebalanceTimeout

func WithRebalanceTimeout(timeout time.Duration) broker.Option

WithRebalanceTimeout .

func WithReferenceHashBalancer

func WithReferenceHashBalancer(hasher hash.Hash32) broker.PublishOption

WithReferenceHashBalancer .

func WithRetentionTime

func WithRetentionTime(time time.Duration) broker.Option

WithRetentionTime .

func WithRetries

func WithRetries(cnt int) broker.Option

WithRetries .

func WithRoundRobinBalancer

func WithRoundRobinBalancer() broker.PublishOption

WithRoundRobinBalancer .

func WithSessionTimeout

func WithSessionTimeout(timeout time.Duration) broker.Option

WithSessionTimeout .

func WithStartOffset

func WithStartOffset(offset int64) broker.Option

WithStartOffset .

func WithWatchPartitionChanges

func WithWatchPartitionChanges(enable bool) broker.Option

WithWatchPartitionChanges .

func WithWriteTimeout

func WithWriteTimeout(timeout time.Duration) broker.Option

WithWriteTimeout default:10s

func WithWriterConfig

func WithWriterConfig(cfg WriterConfig) broker.Option

WithWriterConfig .

Types

type ErrorLogger

type ErrorLogger struct {
}

func (ErrorLogger) Printf

func (l ErrorLogger) Printf(msg string, args ...interface{})

Printf .

type Logger

type Logger struct {
}

func (Logger) Printf

func (l Logger) Printf(msg string, args ...interface{})

Printf .

type MessageCarrier

type MessageCarrier struct {
	// contains filtered or unexported fields
}

func NewMessageCarrier

func NewMessageCarrier(msg *kafkaGo.Message) MessageCarrier

NewMessageCarrier .

func (MessageCarrier) Get

func (c MessageCarrier) Get(key string) string

Get .

func (MessageCarrier) Keys

func (c MessageCarrier) Keys() []string

Keys .

func (MessageCarrier) Set

func (c MessageCarrier) Set(key, val string)

Set .

type Writer

type Writer struct {
	Writer                  *kafkaGo.Writer
	Writers                 map[string]*kafkaGo.Writer
	EnableOneTopicOneWriter bool
}

func NewWriter

func NewWriter(enableOneTopicOneWriter bool) *Writer

NewWriter creates new Writer

func (*Writer) Close

func (w *Writer) Close()

Close flushes pending writes, and waits for all writes to complete before returning

func (*Writer) CreateProducer

func (w *Writer) CreateProducer(writerConfig WriterConfig, saslMechanism sasl.Mechanism, tlsConfig *tls.Config) *kafkaGo.Writer

CreateProducer creates kafka-go Writer

type WriterConfig

type WriterConfig struct {
	// The list of broker addresses used to connect to the kafka cluster.
	Brokers []string

	// The balancer used to distribute messages across partitions.
	//
	// The default is to use a round-robin distribution.
	Balancer kafkaGo.Balancer

	// Limit on how many attempts will be made to deliver a message.
	//
	// The default is to try at most 10 times.
	MaxAttempts int

	// Limit on how many messages will be buffered before being sent to a
	// partition.
	//
	// The default is to use a target batch size of 100 messages.
	BatchSize int

	// Limit the maximum size of a request in bytes before being sent to
	// a partition.
	//
	// The default is to use a kafka default value of 1048576.
	BatchBytes int64

	// Time limit on how often incomplete message batches will be flushed to
	// kafka.
	//
	// The default is to flush at least every second.
	BatchTimeout time.Duration

	// Timeout for read operations performed by the Writer.
	//
	// Defaults to 10 seconds.
	ReadTimeout time.Duration

	// Timeout for write operation performed by the Writer.
	//
	// Defaults to 10 seconds.
	WriteTimeout time.Duration

	// Number of acknowledges from partition replicas required before receiving
	// a response to a produce request. The default is -1, which means to wait for
	// all replicas, and a value above 0 is required to indicate how many replicas
	// should acknowledge a message to be considered successful.
	//
	// This version of kafka-go (v0.3) does not support 0 required acks, due to
	// some internal complexity implementing this with the Kafka protocol. If you
	// need that functionality specifically, you'll need to upgrade to v0.4.
	RequiredAcks kafkaGo.RequiredAcks

	// Setting this flag to true causes the WriteMessages method to never block.
	// It also means that errors are ignored since the caller will not receive
	// the returned value. Use this only if you don't care about guarantees of
	// whether the messages were written to kafka.
	Async bool

	// If not nil, specifies a logger used to report internal changes within the
	// Writer.
	Logger kafkaGo.Logger

	// ErrorLogger is the logger used to report errors. If nil, the Writer falls
	// back to using Logger instead.
	ErrorLogger kafkaGo.Logger

	// AllowAutoTopicCreation notifies Writer to create topic if missing.
	AllowAutoTopicCreation bool
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL