kafka

package module
v1.2.17 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 13, 2024 License: MIT Imports: 23 Imported by: 3

README

Kafka

Kafka是一个分布式流处理系统,流处理系统使它可以像消息队列一样publish或者subscribe消息,分布式提供了容错性,并发处理消息的机制。

Kafka的基本概念

kafka运行在集群上,集群包含一个或多个服务器。kafka把消息存在topic中,每一条消息包含键值(key),值(value)和时间戳(timestamp)。

kafka有以下一些基本概念:

  • Producer - 消息生产者,就是向kafka broker发消息的客户端。

  • Consumer - 消息消费者,是消息的使用方,负责消费Kafka服务器上的消息。

  • Topic - 主题,由用户定义并配置在Kafka服务器,用于建立Producer和Consumer之间的订阅关系。生产者发送消息到指定的Topic下,消息者从这个Topic下消费消息。

  • Partition - 消息分区,一个topic可以分为多个 partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。

  • Broker - 一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。

  • Consumer Group - 消费者分组,用于归组同类消费者。每个consumer属于一个特定的consumer group,多个消费者可以共同消息一个Topic下的消息,每个消费者消费其中的部分消息,这些消费者就组成了一个分组,拥有同一个分组名称,通常也被称为消费者集群。

  • Offset - 消息在partition中的偏移量。每一条消息在partition都有唯一的偏移量,消息者可以指定偏移量来指定要消费的消息。

Docker部署开发环境

docker pull bitnami/kafka:latest
docker pull bitnami/zookeeper:latest
docker pull bitnami/kafka-exporter:latest

docker run -itd \
    --name zookeeper-test \
    -p 2181:2181 \
    -e ALLOW_ANONYMOUS_LOGIN=yes \
    bitnami/zookeeper:latest

docker run -itd \
    --name kafka-standalone \
    --link zookeeper-test \
    -p 9092:9092 \
    -v /home/data/kafka:/bitnami/kafka \
    -e KAFKA_BROKER_ID=1 \
    -e KAFKA_LISTENERS=PLAINTEXT://:9092 \
    -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \
    -e KAFKA_ZOOKEEPER_CONNECT=zookeeper-test:2181 \
    -e ALLOW_PLAINTEXT_LISTENER=yes \
    --user root \
    bitnami/kafka:latest

管理工具

参考资料

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CreateTopic added in v1.2.2

func CreateTopic(addr string, topic string, numPartitions, replicationFactor int) error

func DeleteTopic added in v1.2.2

func DeleteTopic(addr string, topics ...string) error

func NewBroker

func NewBroker(opts ...broker.Option) broker.Broker

func WithAllowPublishAutoTopicCreation added in v1.2.3

func WithAllowPublishAutoTopicCreation(enable bool) broker.Option

WithAllowPublishAutoTopicCreation .

func WithAsync

func WithAsync(enable bool) broker.Option

WithAsync 异步发送消息 default:true

func WithBatchBytes

func WithBatchBytes(by int64) broker.Option

WithBatchBytes

default:1048576 bytes

func WithBatchSize

func WithBatchSize(size int) broker.Option

WithBatchSize 发送批次大小 batch.size

default:100

func WithBatchTimeout

func WithBatchTimeout(timeout time.Duration) broker.Option

WithBatchTimeout linger.ms

default:10ms

func WithCommitInterval

func WithCommitInterval(interval time.Duration) broker.Option

WithCommitInterval .

func WithCompletion added in v1.2.9

func WithCompletion(completion func(messages []kafkaGo.Message, err error)) broker.Option

WithCompletion 消息发布完成回调

func WithCrc32Balancer added in v1.1.0

func WithCrc32Balancer(consistent bool) broker.PublishOption

WithCrc32Balancer CRC32负载均衡器

func WithDialer added in v1.0.2

func WithDialer(cfg *kafkaGo.Dialer) broker.Option

WithDialer .

func WithDialerTimeout added in v1.0.2

func WithDialerTimeout(tm time.Duration) broker.Option

WithDialerTimeout .

func WithEnableErrorLogger added in v1.1.0

func WithEnableErrorLogger(enable bool) broker.Option

WithEnableErrorLogger enable kratos error logger

func WithEnableLogger added in v1.1.0

func WithEnableLogger(enable bool) broker.Option

WithEnableLogger enable kratos info logger

func WithEnableOneTopicOneWriter added in v1.1.0

func WithEnableOneTopicOneWriter(enable bool) broker.Option

WithEnableOneTopicOneWriter .

func WithErrorLogger added in v1.1.0

func WithErrorLogger(l kafkaGo.Logger) broker.Option

WithErrorLogger inject error logger

func WithHashBalancer added in v1.1.0

func WithHashBalancer(hasher hash.Hash32) broker.PublishOption

WithHashBalancer Hash负载均衡器

func WithHeaders

func WithHeaders(headers map[string]interface{}) broker.PublishOption

WithHeaders 消息头

func WithHeartbeatInterval

func WithHeartbeatInterval(interval time.Duration) broker.Option

WithHeartbeatInterval .

func WithLeastBytesBalancer added in v1.1.0

func WithLeastBytesBalancer() broker.PublishOption

WithLeastBytesBalancer LeastBytes负载均衡器

func WithLogger added in v1.1.0

func WithLogger(l kafkaGo.Logger) broker.Option

WithLogger inject info logger

func WithMaxAttempts

func WithMaxAttempts(cnt int) broker.Option

WithMaxAttempts .

func WithMaxBytes

func WithMaxBytes(bytes int) broker.Option

WithMaxBytes .

func WithMaxWait

func WithMaxWait(time time.Duration) broker.Option

WithMaxWait fetch.max.wait.ms

func WithMessageKey

func WithMessageKey(key []byte) broker.PublishOption

WithMessageKey 消息键

func WithMessageOffset

func WithMessageOffset(offset int64) broker.PublishOption

WithMessageOffset 消息偏移

func WithMinBytes

func WithMinBytes(bytes int) broker.Option

WithMinBytes fetch.min.bytes

func WithMurmur2Balancer added in v1.1.0

func WithMurmur2Balancer(consistent bool) broker.PublishOption

WithMurmur2Balancer Murmur2负载均衡器

func WithPartitionWatchInterval

func WithPartitionWatchInterval(interval time.Duration) broker.Option

WithPartitionWatchInterval .

func WithPlainMechanism added in v1.0.2

func WithPlainMechanism(username, password string) broker.Option

WithPlainMechanism PLAIN认证信息

func WithPublishMaxAttempts

func WithPublishMaxAttempts(cnt int) broker.Option

WithPublishMaxAttempts .

func WithQueueCapacity

func WithQueueCapacity(cap int) broker.Option

WithQueueCapacity .

func WithReadLagInterval

func WithReadLagInterval(interval time.Duration) broker.Option

WithReadLagInterval .

func WithReadTimeout

func WithReadTimeout(timeout time.Duration) broker.Option

WithReadTimeout 读取超时时间 default:10s

func WithReaderConfig added in v1.0.2

func WithReaderConfig(cfg kafkaGo.ReaderConfig) broker.Option

WithReaderConfig .

func WithRebalanceTimeout

func WithRebalanceTimeout(timeout time.Duration) broker.Option

WithRebalanceTimeout .

func WithReferenceHashBalancer added in v1.1.0

func WithReferenceHashBalancer(hasher hash.Hash32) broker.PublishOption

WithReferenceHashBalancer ReferenceHash负载均衡器

func WithRetentionTime

func WithRetentionTime(time time.Duration) broker.Option

WithRetentionTime .

func WithRetries

func WithRetries(cnt int) broker.Option

WithRetries 设置消息重发的次数

func WithRoundRobinBalancer added in v1.1.0

func WithRoundRobinBalancer() broker.PublishOption

WithRoundRobinBalancer RoundRobin负载均衡器,默认均衡器。

func WithScramMechanism added in v1.0.2

func WithScramMechanism(algoName ScramAlgorithm, username, password string) broker.Option

WithScramMechanism SCRAM认证信息

func WithSessionTimeout

func WithSessionTimeout(timeout time.Duration) broker.Option

WithSessionTimeout .

func WithStartOffset

func WithStartOffset(offset int64) broker.Option

WithStartOffset .

func WithSubscribeAutoCreateTopic added in v1.2.3

func WithSubscribeAutoCreateTopic(topic string, numPartitions, replicationFactor int) broker.SubscribeOption

func WithWatchPartitionChanges

func WithWatchPartitionChanges(enable bool) broker.Option

WithWatchPartitionChanges .

func WithWriteTimeout

func WithWriteTimeout(timeout time.Duration) broker.Option

WithWriteTimeout 写入超时时间 default:10s

func WithWriterConfig added in v1.1.0

func WithWriterConfig(cfg WriterConfig) broker.Option

WithWriterConfig .

Types

type BalancerName added in v1.2.3

type BalancerName string
const (
	LeastBytesBalancer    BalancerName = "LeastBytes"
	RoundRobinBalancer    BalancerName = "RoundRobin"
	HashBalancer          BalancerName = "Hash"
	ReferenceHashBalancer BalancerName = "ReferenceHash"
	Crc32Balancer         BalancerName = "CRC32Balancer"
	Murmur2Balancer       BalancerName = "Murmur2Balancer"
)

type ErrorLogger added in v1.1.0

type ErrorLogger struct {
}

func (ErrorLogger) Printf added in v1.1.0

func (l ErrorLogger) Printf(msg string, args ...interface{})

type Logger added in v1.1.0

type Logger struct {
}

func (Logger) Printf added in v1.1.0

func (l Logger) Printf(msg string, args ...interface{})

type MessageCarrier

type MessageCarrier struct {
	// contains filtered or unexported fields
}

func NewMessageCarrier

func NewMessageCarrier(msg *kafkaGo.Message) MessageCarrier

func (MessageCarrier) Get

func (c MessageCarrier) Get(key string) string

func (MessageCarrier) Keys

func (c MessageCarrier) Keys() []string

func (MessageCarrier) Set

func (c MessageCarrier) Set(key, val string)

type ScramAlgorithm added in v1.2.3

type ScramAlgorithm string
const (
	ScramAlgorithmSHA256 ScramAlgorithm = "SHA256"
	ScramAlgorithmSHA512 ScramAlgorithm = "SHA512"
)

type Writer added in v1.1.0

type Writer struct {
	Writer                  *kafkaGo.Writer
	Writers                 map[string]*kafkaGo.Writer
	EnableOneTopicOneWriter bool
}

func NewWriter added in v1.1.0

func NewWriter(enableOneTopicOneWriter bool) *Writer

func (*Writer) Close added in v1.1.0

func (w *Writer) Close()

func (*Writer) CreateProducer added in v1.1.0

func (w *Writer) CreateProducer(writerConfig WriterConfig, saslMechanism sasl.Mechanism, tlsConfig *tls.Config) *kafkaGo.Writer

CreateProducer create kafka-go Writer

type WriterConfig added in v1.1.0

type WriterConfig struct {
	// The list of broker addresses used to connect to the kafka cluster.
	Brokers []string

	// The balancer used to distribute messages across partitions.
	//
	// The default is to use a round-robin distribution.
	Balancer kafkaGo.Balancer

	// Limit on how many attempts will be made to deliver a message.
	//
	// The default is to try at most 10 times.
	MaxAttempts int

	// Limit on how many messages will be buffered before being sent to a
	// partition.
	//
	// The default is to use a target batch size of 100 messages.
	BatchSize int

	// Limit the maximum size of a request in bytes before being sent to
	// a partition.
	//
	// The default is to use a kafka default value of 1048576.
	BatchBytes int64

	// Time limit on how often incomplete message batches will be flushed to
	// kafka.
	//
	// The default is to flush at least every second.
	BatchTimeout time.Duration

	// Timeout for read operations performed by the Writer.
	//
	// Defaults to 10 seconds.
	ReadTimeout time.Duration

	// Timeout for write operation performed by the Writer.
	//
	// Defaults to 10 seconds.
	WriteTimeout time.Duration

	// Number of acknowledges from partition replicas required before receiving
	// a response to a produce request. The default is -1, which means to wait for
	// all replicas, and a value above 0 is required to indicate how many replicas
	// should acknowledge a message to be considered successful.
	//
	// This version of kafka-go (v0.3) does not support 0 required acks, due to
	// some internal complexity implementing this with the Kafka protocol. If you
	// need that functionality specifically, you'll need to upgrade to v0.4.
	RequiredAcks kafkaGo.RequiredAcks

	// Setting this flag to true causes the WriteMessages method to never block.
	// It also means that errors are ignored since the caller will not receive
	// the returned value. Use this only if you don't care about guarantees of
	// whether the messages were written to kafka.
	Async bool

	// If not nil, specifies a logger used to report internal changes within the
	// Writer.
	Logger kafkaGo.Logger

	// ErrorLogger is the logger used to report errors. If nil, the Writer falls
	// back to using Logger instead.
	ErrorLogger kafkaGo.Logger

	// AllowAutoTopicCreation notifies Writer to create topic if missing.
	AllowAutoTopicCreation bool

	Completion func(messages []kafkaGo.Message, err error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL