kafka

package
v0.0.0-...-91902aa Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 31, 2024 License: Apache-2.0 Imports: 29 Imported by: 6

Documentation

Index

Constants

View Source
const (
	// DefaultMockTopicName specifies the default mock topic name.
	DefaultMockTopicName = "mock_topic"
	// DefaultMockPartitionNum is the default partition number of default mock topic.
	DefaultMockPartitionNum = 3
)
View Source
const (
	// BrokerMessageMaxBytesConfigName specifies the largest record batch size allowed by
	// Kafka brokers.
	// See: https://kafka.apache.org/documentation/#brokerconfigs_message.max.bytes
	BrokerMessageMaxBytesConfigName = "message.max.bytes"
	// TopicMaxMessageBytesConfigName specifies the largest record batch size allowed by
	// Kafka topics.
	// See: https://kafka.apache.org/documentation/#topicconfigs_max.message.bytes
	TopicMaxMessageBytesConfigName = "max.message.bytes"
	// MinInsyncReplicasConfigName the minimum number of replicas that must acknowledge a write
	// for the write to be considered successful.
	// Only works if the producer's acks is "all" (or "-1").
	// See: https://kafka.apache.org/documentation/#brokerconfigs_min.insync.replicas and
	// https://kafka.apache.org/documentation/#topicconfigs_min.insync.replicas
	MinInsyncReplicasConfigName = "min.insync.replicas"
)
View Source
const (
	// SASLTypePlaintext represents the plain mechanism
	SASLTypePlaintext = "PLAIN"
	// SASLTypeSCRAMSHA256 represents the SCRAM-SHA-256 mechanism.
	SASLTypeSCRAMSHA256 = "SCRAM-SHA-256"
	// SASLTypeSCRAMSHA512 represents the SCRAM-SHA-512 mechanism.
	SASLTypeSCRAMSHA512 = "SCRAM-SHA-512"
	// SASLTypeGSSAPI represents the gssapi mechanism.
	SASLTypeGSSAPI = "GSSAPI"
	// SASLTypeOAuth represents the SASL/OAUTHBEARER mechanism (Kafka 2.0.0+)
	SASLTypeOAuth = "OAUTHBEARER"
)
View Source
const (
	// RefreshMetricsInterval specifies the interval of refresh kafka client metrics.
	RefreshMetricsInterval = 5 * time.Second
)

Variables

View Source
var (
	// BrokerMessageMaxBytes is the broker's `message.max.bytes`
	BrokerMessageMaxBytes = defaultMaxMessageBytes
	// TopicMaxMessageBytes is the topic's `max.message.bytes`
	TopicMaxMessageBytes = defaultMaxMessageBytes
	// MinInSyncReplicas is the `min.insync.replicas`
	MinInSyncReplicas = defaultMinInsyncReplicas
)
View Source
var (

	// OutgoingByteRateGauge for outgoing events.
	// Meter mark for each request's size in bytes.
	OutgoingByteRateGauge = prometheus.NewGaugeVec(
		prometheus.GaugeOpts{
			Namespace: "ticdc",
			Subsystem: "sink",
			Name:      "kafka_producer_outgoing_byte_rate",
			Help:      "Bytes/second written off all brokers.",
		}, []string{"namespace", "changefeed", "broker"})
	// RequestRateGauge Meter mark by 1 for each request.
	RequestRateGauge = prometheus.NewGaugeVec(
		prometheus.GaugeOpts{
			Namespace: "ticdc",
			Subsystem: "sink",
			Name:      "kafka_producer_request_rate",
			Help:      "Requests/second sent to all brokers.",
		}, []string{"namespace", "changefeed", "broker"})
	// RequestLatencyGauge Histogram update by `requestLatency`.
	RequestLatencyGauge = prometheus.NewGaugeVec(
		prometheus.GaugeOpts{
			Namespace: "ticdc",
			Subsystem: "sink",
			Name:      "kafka_producer_request_latency",
			Help:      "The request latency for all brokers.",
		}, []string{"namespace", "changefeed", "broker", "type"})

	// ClientRetryGauge only for kafka-go client to track internal retry count.
	ClientRetryGauge = prometheus.NewGaugeVec(
		prometheus.GaugeOpts{
			Namespace: "ticdc",
			Subsystem: "sink",
			Name:      "kafka_producer_retry_count",
			Help:      "Kafka client send request retry count",
		}, []string{"namespace", "changefeed"})

	// ClientErrorGauge only for kafka-go client to track internal error count.
	ClientErrorGauge = prometheus.NewGaugeVec(
		prometheus.GaugeOpts{
			Namespace: "ticdc",
			Subsystem: "sink",
			Name:      "kafka_producer_err_count",
			Help:      "Kafka client send request retry count",
		}, []string{"namespace", "changefeed"})

	// BatchDurationGauge only for kafka-go client to track internal batch duration.
	BatchDurationGauge = prometheus.NewGaugeVec(
		prometheus.GaugeOpts{
			Namespace: "ticdc",
			Subsystem: "sink",
			Name:      "kafka_producer_batch_duration",
			Help:      "Kafka client internal average batch message time cost in milliseconds",
		}, []string{"namespace", "changefeed"})

	// BatchMessageCountGauge only for kafka-go client to track each batch's messages count.
	BatchMessageCountGauge = prometheus.NewGaugeVec(
		prometheus.GaugeOpts{
			Namespace: "ticdc",
			Subsystem: "sink",
			Name:      "kafka_producer_batch_message_count",
			Help:      "Kafka client internal average batch message count",
		}, []string{"namespace", "changefeed"})

	// BatchSizeGauge only for kafka-go client to track each batch's size in bytes.
	BatchSizeGauge = prometheus.NewGaugeVec(
		prometheus.GaugeOpts{
			Namespace: "ticdc",
			Subsystem: "sink",
			Name:      "kafka_producer_batch_size",
			Help:      "Kafka client internal average batch size in bytes",
		}, []string{"namespace", "changefeed"})
)

Functions

func AdjustOptions

func AdjustOptions(
	ctx context.Context,
	admin ClusterAdminClient,
	options *Options,
	topic string,
) error

AdjustOptions adjust the `Options` and `sarama.Config` by condition.

func InitMetrics

func InitMetrics(registry *prometheus.Registry)

InitMetrics registers all metrics in this file.

func NewKafkaClientID

func NewKafkaClientID(captureAddr string,
	changefeedID model.ChangeFeedID,
	configuredClientID string,
) (clientID string, err error)

NewKafkaClientID generates kafka client id

func NewSaramaConfig

func NewSaramaConfig(ctx context.Context, o *Options) (*sarama.Config, error)

NewSaramaConfig return the default config and set the according version and metrics

Types

type AsyncProducer

type AsyncProducer interface {
	// Close shuts down the producer and waits for any buffered messages to be
	// flushed. You must call this function before a producer object passes out of
	// scope, as it may otherwise leak memory. You must call this before process
	// shutting down, or you may lose messages. You must call this before calling
	// Close on the underlying client.
	Close()

	// AsyncSend is the input channel for the user to write messages to that they
	// wish to send.
	AsyncSend(ctx context.Context, topic string, partition int32, message *common.Message) error

	// AsyncRunCallback process the messages that has sent to kafka,
	// and run tha attached callback. the caller should call this
	// method in a background goroutine
	AsyncRunCallback(ctx context.Context) error
}

AsyncProducer is the kafka async producer

type AutoCreateTopicConfig

type AutoCreateTopicConfig struct {
	AutoCreate        bool
	PartitionNum      int32
	ReplicationFactor int16
}

AutoCreateTopicConfig is used to create topic configuration.

type Broker

type Broker struct {
	ID int32
}

Broker represents a Kafka broker.

type ClusterAdminClient

type ClusterAdminClient interface {
	// GetAllBrokers return all brokers among the cluster
	GetAllBrokers(ctx context.Context) ([]Broker, error)

	// GetBrokerConfig return the broker level configuration with the `configName`
	GetBrokerConfig(ctx context.Context, configName string) (string, error)

	// GetTopicConfig return the topic level configuration with the `configName`
	GetTopicConfig(ctx context.Context, topicName string, configName string) (string, error)

	// GetTopicsMeta return all target topics' metadata
	// if `ignoreTopicError` is true, ignore the topic error and return the metadata of valid topics
	GetTopicsMeta(ctx context.Context,
		topics []string, ignoreTopicError bool) (map[string]TopicDetail, error)

	// GetTopicsPartitionsNum return the number of partitions of each topic.
	GetTopicsPartitionsNum(ctx context.Context, topics []string) (map[string]int32, error)

	// CreateTopic creates a new topic.
	CreateTopic(ctx context.Context, detail *TopicDetail, validateOnly bool) error

	// Close shuts down the admin client.
	Close()
}

ClusterAdminClient is the administrative client for Kafka, which supports managing and inspecting topics, brokers, configurations and ACLs.

type ClusterAdminClientMockImpl

type ClusterAdminClientMockImpl struct {
	// contains filtered or unexported fields
}

ClusterAdminClientMockImpl mock implements the admin client interface.

func NewClusterAdminClientMockImpl

func NewClusterAdminClientMockImpl() *ClusterAdminClientMockImpl

NewClusterAdminClientMockImpl news a ClusterAdminClientMockImpl struct with default configurations.

func (*ClusterAdminClientMockImpl) Close

func (c *ClusterAdminClientMockImpl) Close()

Close do nothing.

func (*ClusterAdminClientMockImpl) CreateTopic

func (c *ClusterAdminClientMockImpl) CreateTopic(
	_ context.Context,
	detail *TopicDetail,
	_ bool,
) error

CreateTopic adds topic into map.

func (*ClusterAdminClientMockImpl) DeleteTopic

func (c *ClusterAdminClientMockImpl) DeleteTopic(topicName string)

DeleteTopic deletes a topic, only used for testing.

func (*ClusterAdminClientMockImpl) DropBrokerConfig

func (c *ClusterAdminClientMockImpl) DropBrokerConfig(configName string)

DropBrokerConfig remove all broker level configuration for test purpose.

func (*ClusterAdminClientMockImpl) GetAllBrokers

func (c *ClusterAdminClientMockImpl) GetAllBrokers(context.Context) ([]Broker, error)

GetAllBrokers implement the ClusterAdminClient interface

func (*ClusterAdminClientMockImpl) GetBrokerConfig

func (c *ClusterAdminClientMockImpl) GetBrokerConfig(
	_ context.Context,
	configName string,
) (string, error)

GetBrokerConfig implement the ClusterAdminClient interface

func (*ClusterAdminClientMockImpl) GetBrokerMessageMaxBytes

func (c *ClusterAdminClientMockImpl) GetBrokerMessageMaxBytes() int

GetBrokerMessageMaxBytes returns broker's `message.max.bytes`

func (*ClusterAdminClientMockImpl) GetDefaultMockTopicName

func (c *ClusterAdminClientMockImpl) GetDefaultMockTopicName() string

GetDefaultMockTopicName returns the default topic name

func (*ClusterAdminClientMockImpl) GetTopicConfig

func (c *ClusterAdminClientMockImpl) GetTopicConfig(ctx context.Context, topicName string, configName string) (string, error)

GetTopicConfig implement the ClusterAdminClient interface

func (*ClusterAdminClientMockImpl) GetTopicMaxMessageBytes

func (c *ClusterAdminClientMockImpl) GetTopicMaxMessageBytes() int

GetTopicMaxMessageBytes returns topic's `max.message.bytes`

func (*ClusterAdminClientMockImpl) GetTopicsMeta

func (c *ClusterAdminClientMockImpl) GetTopicsMeta(
	_ context.Context,
	topics []string,
	_ bool,
) (map[string]TopicDetail, error)

GetTopicsMeta implement the ClusterAdminClient interface

func (*ClusterAdminClientMockImpl) GetTopicsPartitionsNum

func (c *ClusterAdminClientMockImpl) GetTopicsPartitionsNum(
	_ context.Context, topics []string,
) (map[string]int32, error)

GetTopicsPartitionsNum implement the ClusterAdminClient interface

func (*ClusterAdminClientMockImpl) SetMinInsyncReplicas

func (c *ClusterAdminClientMockImpl) SetMinInsyncReplicas(minInsyncReplicas string)

SetMinInsyncReplicas sets the MinInsyncReplicas for broker and default topic.

func (*ClusterAdminClientMockImpl) SetRemainingFetchesUntilTopicVisible

func (c *ClusterAdminClientMockImpl) SetRemainingFetchesUntilTopicVisible(
	topicName string,
	fetchesRemainingUntilVisible int,
) error

SetRemainingFetchesUntilTopicVisible is used to control the visibility of a specific topic. It is used to mock the topic creation delay.

type Factory

type Factory interface {
	// AdminClient return a kafka cluster admin client
	AdminClient(ctx context.Context) (ClusterAdminClient, error)
	// SyncProducer creates a sync producer to writer message to kafka
	SyncProducer(ctx context.Context) (SyncProducer, error)
	// AsyncProducer creates an async producer to writer message to kafka
	AsyncProducer(ctx context.Context, failpointCh chan error) (AsyncProducer, error)
	// MetricsCollector returns the kafka metrics collector
	MetricsCollector(role util.Role, adminClient ClusterAdminClient) MetricsCollector
}

Factory is used to produce all kafka components.

func NewMockFactory

func NewMockFactory(
	o *Options, changefeedID model.ChangeFeedID,
) (Factory, error)

NewMockFactory constructs a Factory with mock implementation.

func NewSaramaFactory

func NewSaramaFactory(
	o *Options,
	changefeedID model.ChangeFeedID,
) (Factory, error)

NewSaramaFactory constructs a Factory with sarama implementation.

type FactoryCreator

type FactoryCreator func(*Options, model.ChangeFeedID) (Factory, error)

FactoryCreator defines the type of factory creator.

type MetricsCollector

type MetricsCollector interface {
	Run(ctx context.Context)
}

MetricsCollector is the interface for kafka metrics collector.

func NewSaramaMetricsCollector

func NewSaramaMetricsCollector(
	changefeedID model.ChangeFeedID,
	role util.Role,
	adminClient ClusterAdminClient,
	registry metrics.Registry,
) MetricsCollector

NewSaramaMetricsCollector return a kafka metrics collector based on sarama library.

type MockFactory

type MockFactory struct {
	// contains filtered or unexported fields
}

MockFactory is a mock implementation of Factory interface.

func (*MockFactory) AdminClient

func (f *MockFactory) AdminClient(_ context.Context) (ClusterAdminClient, error)

AdminClient return a mocked admin client

func (*MockFactory) AsyncProducer

func (f *MockFactory) AsyncProducer(
	ctx context.Context,
	failpointCh chan error,
) (AsyncProducer, error)

AsyncProducer creates an async producer

func (*MockFactory) MetricsCollector

func (f *MockFactory) MetricsCollector(
	_ util.Role, _ ClusterAdminClient,
) MetricsCollector

MetricsCollector returns the metric collector

func (*MockFactory) SyncProducer

func (f *MockFactory) SyncProducer(ctx context.Context) (SyncProducer, error)

SyncProducer creates a sync producer

type MockSaramaAsyncProducer

type MockSaramaAsyncProducer struct {
	AsyncProducer *mocks.AsyncProducer
	// contains filtered or unexported fields
}

MockSaramaAsyncProducer is a mock implementation of AsyncProducer interface.

func (*MockSaramaAsyncProducer) AsyncRunCallback

func (p *MockSaramaAsyncProducer) AsyncRunCallback(
	ctx context.Context,
) error

AsyncRunCallback implement the AsyncProducer interface.

func (*MockSaramaAsyncProducer) AsyncSend

func (p *MockSaramaAsyncProducer) AsyncSend(ctx context.Context, topic string, partition int32, message *common.Message) error

AsyncSend implement the AsyncProducer interface.

func (*MockSaramaAsyncProducer) Close

func (p *MockSaramaAsyncProducer) Close()

Close implement the AsyncProducer interface.

type MockSaramaSyncProducer

type MockSaramaSyncProducer struct {
	Producer *mocks.SyncProducer
}

MockSaramaSyncProducer is a mock implementation of SyncProducer interface.

func (*MockSaramaSyncProducer) Close

func (m *MockSaramaSyncProducer) Close()

Close implement the SyncProducer interface.

func (*MockSaramaSyncProducer) SendMessage

func (m *MockSaramaSyncProducer) SendMessage(
	_ context.Context,
	topic string, partitionNum int32,
	message *common.Message,
) error

SendMessage implement the SyncProducer interface.

func (*MockSaramaSyncProducer) SendMessages

func (m *MockSaramaSyncProducer) SendMessages(ctx context.Context, topic string, partitionNum int32, message *common.Message) error

SendMessages implement the SyncProducer interface.

type Options

type Options struct {
	BrokerEndpoints []string

	// control whether to create topic
	AutoCreate   bool
	PartitionNum int32
	// User should make sure that `replication-factor` not greater than the number of kafka brokers.
	ReplicationFactor int16
	Version           string
	IsAssignedVersion bool
	RequestVersion    int16
	MaxMessageBytes   int
	Compression       string
	ClientID          string
	RequiredAcks      RequiredAcks
	// Only for test. User can not set this value.
	// The current prod default value is 0.
	MaxMessages int

	// Credential is used to connect to kafka cluster.
	EnableTLS          bool
	Credential         *security.Credential
	InsecureSkipVerify bool
	SASL               *security.SASL

	// Timeout for network configurations, default to `10s`
	DialTimeout  time.Duration
	WriteTimeout time.Duration
	ReadTimeout  time.Duration
}

Options stores user specified configurations

func NewOptions

func NewOptions() *Options

NewOptions returns a default Kafka configuration

func (*Options) Apply

func (o *Options) Apply(changefeedID model.ChangeFeedID,
	sinkURI *url.URL, replicaConfig *config.ReplicaConfig,
) error

Apply the sinkURI to update Options

func (*Options) DeriveTopicConfig

func (o *Options) DeriveTopicConfig() *AutoCreateTopicConfig

DeriveTopicConfig derive a `topicConfig` from the `Options`

func (*Options) SetPartitionNum

func (o *Options) SetPartitionNum(realPartitionCount int32) error

SetPartitionNum set the partition-num by the topic's partition count.

type RequiredAcks

type RequiredAcks int16

RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements it must see before responding. Any of the constants defined here are valid. On broker versions prior to 0.8.2.0 any other positive int16 is also valid (the broker will wait for that many acknowledgements) but in 0.8.2.0 and later this will raise an exception (it has been replaced by setting the `min.isr` value in the brokers configuration).

const (
	// NoResponse doesn't send any response, the TCP ACK is all you get.
	NoResponse RequiredAcks = 0
	// WaitForLocal waits for only the local commit to succeed before responding.
	WaitForLocal RequiredAcks = 1
	// WaitForAll waits for all in-sync replicas to commit before responding.
	// The minimum number of in-sync replicas is configured on the broker via
	// the `min.insync.replicas` configuration key.
	WaitForAll RequiredAcks = -1
	// Unknown should never have been use in real config.
	Unknown RequiredAcks = 2
)

type SyncProducer

type SyncProducer interface {
	// SendMessage produces a given message, and returns only when it either has
	// succeeded or failed to produce. It will return the partition and the offset
	// of the produced message, or an error if the message failed to produce.
	SendMessage(ctx context.Context,
		topic string, partitionNum int32,
		message *common.Message) error

	// SendMessages produces a given set of messages, and returns only when all
	// messages in the set have either succeeded or failed. Note that messages
	// can succeed and fail individually; if some succeed and some fail,
	// SendMessages will return an error.
	SendMessages(ctx context.Context, topic string, partitionNum int32, message *common.Message) error

	// Close shuts down the producer; you must call this function before a producer
	// object passes out of scope, as it may otherwise leak memory.
	// You must call this before calling Close on the underlying client.
	Close()
}

SyncProducer is the kafka sync producer

type TopicDetail

type TopicDetail struct {
	Name              string
	NumPartitions     int32
	ReplicationFactor int16
}

TopicDetail represent a topic's detail information.

Directories

Path Synopsis
v2
mock
Package mock_v2 is a generated GoMock package.
Package mock_v2 is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL