Documentation ¶
Index ¶
- Constants
- Variables
- func AdjustOptions(ctx context.Context, admin ClusterAdminClient, options *Options, topic string) error
- func InitMetrics(registry *prometheus.Registry)
- func NewKafkaClientID(captureAddr string, changefeedID model.ChangeFeedID, configuredClientID string) (clientID string, err error)
- func NewSaramaConfig(ctx context.Context, o *Options) (*sarama.Config, error)
- type AsyncProducer
- type AutoCreateTopicConfig
- type Broker
- type ClusterAdminClient
- type ClusterAdminClientMockImpl
- func (c *ClusterAdminClientMockImpl) Close()
- func (c *ClusterAdminClientMockImpl) CreateTopic(_ context.Context, detail *TopicDetail, _ bool) error
- func (c *ClusterAdminClientMockImpl) DeleteTopic(topicName string)
- func (c *ClusterAdminClientMockImpl) DropBrokerConfig(configName string)
- func (c *ClusterAdminClientMockImpl) GetAllBrokers(context.Context) ([]Broker, error)
- func (c *ClusterAdminClientMockImpl) GetBrokerConfig(_ context.Context, configName string) (string, error)
- func (c *ClusterAdminClientMockImpl) GetBrokerMessageMaxBytes() int
- func (c *ClusterAdminClientMockImpl) GetDefaultMockTopicName() string
- func (c *ClusterAdminClientMockImpl) GetTopicConfig(ctx context.Context, topicName string, configName string) (string, error)
- func (c *ClusterAdminClientMockImpl) GetTopicMaxMessageBytes() int
- func (c *ClusterAdminClientMockImpl) GetTopicsMeta(_ context.Context, topics []string, _ bool) (map[string]TopicDetail, error)
- func (c *ClusterAdminClientMockImpl) GetTopicsPartitionsNum(_ context.Context, topics []string) (map[string]int32, error)
- func (c *ClusterAdminClientMockImpl) SetMinInsyncReplicas(minInsyncReplicas string)
- func (c *ClusterAdminClientMockImpl) SetRemainingFetchesUntilTopicVisible(topicName string, fetchesRemainingUntilVisible int) error
- type Factory
- type FactoryCreator
- type MetricsCollector
- type MockFactory
- func (f *MockFactory) AdminClient(_ context.Context) (ClusterAdminClient, error)
- func (f *MockFactory) AsyncProducer(ctx context.Context, failpointCh chan error) (AsyncProducer, error)
- func (f *MockFactory) MetricsCollector(_ util.Role, _ ClusterAdminClient) MetricsCollector
- func (f *MockFactory) SyncProducer(ctx context.Context) (SyncProducer, error)
- type MockSaramaAsyncProducer
- type MockSaramaSyncProducer
- func (m *MockSaramaSyncProducer) Close()
- func (m *MockSaramaSyncProducer) SendMessage(_ context.Context, topic string, partitionNum int32, message *common.Message) error
- func (m *MockSaramaSyncProducer) SendMessages(ctx context.Context, topic string, partitionNum int32, message *common.Message) error
- type Options
- type RequiredAcks
- type SyncProducer
- type TopicDetail
Constants ¶
const ( // DefaultMockTopicName specifies the default mock topic name. DefaultMockTopicName = "mock_topic" // DefaultMockPartitionNum is the default partition number of default mock topic. DefaultMockPartitionNum = 3 )
const ( // BrokerMessageMaxBytesConfigName specifies the largest record batch size allowed by // Kafka brokers. // See: https://kafka.apache.org/documentation/#brokerconfigs_message.max.bytes BrokerMessageMaxBytesConfigName = "message.max.bytes" // TopicMaxMessageBytesConfigName specifies the largest record batch size allowed by // Kafka topics. // See: https://kafka.apache.org/documentation/#topicconfigs_max.message.bytes TopicMaxMessageBytesConfigName = "max.message.bytes" // MinInsyncReplicasConfigName the minimum number of replicas that must acknowledge a write // for the write to be considered successful. // Only works if the producer's acks is "all" (or "-1"). // See: https://kafka.apache.org/documentation/#brokerconfigs_min.insync.replicas and // https://kafka.apache.org/documentation/#topicconfigs_min.insync.replicas MinInsyncReplicasConfigName = "min.insync.replicas" )
const ( // SASLTypePlaintext represents the plain mechanism SASLTypePlaintext = "PLAIN" // SASLTypeSCRAMSHA256 represents the SCRAM-SHA-256 mechanism. SASLTypeSCRAMSHA256 = "SCRAM-SHA-256" // SASLTypeSCRAMSHA512 represents the SCRAM-SHA-512 mechanism. SASLTypeSCRAMSHA512 = "SCRAM-SHA-512" // SASLTypeGSSAPI represents the gssapi mechanism. SASLTypeGSSAPI = "GSSAPI" // SASLTypeOAuth represents the SASL/OAUTHBEARER mechanism (Kafka 2.0.0+) SASLTypeOAuth = "OAUTHBEARER" )
const ( // RefreshMetricsInterval specifies the interval of refresh kafka client metrics. RefreshMetricsInterval = 5 * time.Second )
Variables ¶
var ( // BrokerMessageMaxBytes is the broker's `message.max.bytes` BrokerMessageMaxBytes = defaultMaxMessageBytes // TopicMaxMessageBytes is the topic's `max.message.bytes` TopicMaxMessageBytes = defaultMaxMessageBytes // MinInSyncReplicas is the `min.insync.replicas` MinInSyncReplicas = defaultMinInsyncReplicas )
var ( // OutgoingByteRateGauge for outgoing events. // Meter mark for each request's size in bytes. OutgoingByteRateGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "ticdc", Subsystem: "sink", Name: "kafka_producer_outgoing_byte_rate", Help: "Bytes/second written off all brokers.", }, []string{"namespace", "changefeed", "broker"}) // RequestRateGauge Meter mark by 1 for each request. RequestRateGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "ticdc", Subsystem: "sink", Name: "kafka_producer_request_rate", Help: "Requests/second sent to all brokers.", }, []string{"namespace", "changefeed", "broker"}) // RequestLatencyGauge Histogram update by `requestLatency`. RequestLatencyGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "ticdc", Subsystem: "sink", Name: "kafka_producer_request_latency", Help: "The request latency for all brokers.", }, []string{"namespace", "changefeed", "broker", "type"}) // ClientRetryGauge only for kafka-go client to track internal retry count. ClientRetryGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "ticdc", Subsystem: "sink", Name: "kafka_producer_retry_count", Help: "Kafka client send request retry count", }, []string{"namespace", "changefeed"}) // ClientErrorGauge only for kafka-go client to track internal error count. ClientErrorGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "ticdc", Subsystem: "sink", Name: "kafka_producer_err_count", Help: "Kafka client send request retry count", }, []string{"namespace", "changefeed"}) // BatchDurationGauge only for kafka-go client to track internal batch duration. BatchDurationGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "ticdc", Subsystem: "sink", Name: "kafka_producer_batch_duration", Help: "Kafka client internal average batch message time cost in milliseconds", }, []string{"namespace", "changefeed"}) // BatchMessageCountGauge only for kafka-go client to track each batch's messages count. BatchMessageCountGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "ticdc", Subsystem: "sink", Name: "kafka_producer_batch_message_count", Help: "Kafka client internal average batch message count", }, []string{"namespace", "changefeed"}) // BatchSizeGauge only for kafka-go client to track each batch's size in bytes. BatchSizeGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "ticdc", Subsystem: "sink", Name: "kafka_producer_batch_size", Help: "Kafka client internal average batch size in bytes", }, []string{"namespace", "changefeed"}) )
Functions ¶
func AdjustOptions ¶
func AdjustOptions( ctx context.Context, admin ClusterAdminClient, options *Options, topic string, ) error
AdjustOptions adjust the `Options` and `sarama.Config` by condition.
func InitMetrics ¶
func InitMetrics(registry *prometheus.Registry)
InitMetrics registers all metrics in this file.
func NewKafkaClientID ¶
func NewKafkaClientID(captureAddr string, changefeedID model.ChangeFeedID, configuredClientID string, ) (clientID string, err error)
NewKafkaClientID generates kafka client id
Types ¶
type AsyncProducer ¶
type AsyncProducer interface { // Close shuts down the producer and waits for any buffered messages to be // flushed. You must call this function before a producer object passes out of // scope, as it may otherwise leak memory. You must call this before process // shutting down, or you may lose messages. You must call this before calling // Close on the underlying client. Close() // AsyncSend is the input channel for the user to write messages to that they // wish to send. AsyncSend(ctx context.Context, topic string, partition int32, message *common.Message) error // AsyncRunCallback process the messages that has sent to kafka, // and run tha attached callback. the caller should call this // method in a background goroutine AsyncRunCallback(ctx context.Context) error }
AsyncProducer is the kafka async producer
type AutoCreateTopicConfig ¶
AutoCreateTopicConfig is used to create topic configuration.
type ClusterAdminClient ¶
type ClusterAdminClient interface { // GetAllBrokers return all brokers among the cluster GetAllBrokers(ctx context.Context) ([]Broker, error) // GetBrokerConfig return the broker level configuration with the `configName` GetBrokerConfig(ctx context.Context, configName string) (string, error) // GetTopicConfig return the topic level configuration with the `configName` GetTopicConfig(ctx context.Context, topicName string, configName string) (string, error) // GetTopicsMeta return all target topics' metadata // if `ignoreTopicError` is true, ignore the topic error and return the metadata of valid topics GetTopicsMeta(ctx context.Context, topics []string, ignoreTopicError bool) (map[string]TopicDetail, error) // GetTopicsPartitionsNum return the number of partitions of each topic. GetTopicsPartitionsNum(ctx context.Context, topics []string) (map[string]int32, error) // CreateTopic creates a new topic. CreateTopic(ctx context.Context, detail *TopicDetail, validateOnly bool) error // Close shuts down the admin client. Close() }
ClusterAdminClient is the administrative client for Kafka, which supports managing and inspecting topics, brokers, configurations and ACLs.
type ClusterAdminClientMockImpl ¶
type ClusterAdminClientMockImpl struct {
// contains filtered or unexported fields
}
ClusterAdminClientMockImpl mock implements the admin client interface.
func NewClusterAdminClientMockImpl ¶
func NewClusterAdminClientMockImpl() *ClusterAdminClientMockImpl
NewClusterAdminClientMockImpl news a ClusterAdminClientMockImpl struct with default configurations.
func (*ClusterAdminClientMockImpl) Close ¶
func (c *ClusterAdminClientMockImpl) Close()
Close do nothing.
func (*ClusterAdminClientMockImpl) CreateTopic ¶
func (c *ClusterAdminClientMockImpl) CreateTopic( _ context.Context, detail *TopicDetail, _ bool, ) error
CreateTopic adds topic into map.
func (*ClusterAdminClientMockImpl) DeleteTopic ¶
func (c *ClusterAdminClientMockImpl) DeleteTopic(topicName string)
DeleteTopic deletes a topic, only used for testing.
func (*ClusterAdminClientMockImpl) DropBrokerConfig ¶
func (c *ClusterAdminClientMockImpl) DropBrokerConfig(configName string)
DropBrokerConfig remove all broker level configuration for test purpose.
func (*ClusterAdminClientMockImpl) GetAllBrokers ¶
func (c *ClusterAdminClientMockImpl) GetAllBrokers(context.Context) ([]Broker, error)
GetAllBrokers implement the ClusterAdminClient interface
func (*ClusterAdminClientMockImpl) GetBrokerConfig ¶
func (c *ClusterAdminClientMockImpl) GetBrokerConfig( _ context.Context, configName string, ) (string, error)
GetBrokerConfig implement the ClusterAdminClient interface
func (*ClusterAdminClientMockImpl) GetBrokerMessageMaxBytes ¶
func (c *ClusterAdminClientMockImpl) GetBrokerMessageMaxBytes() int
GetBrokerMessageMaxBytes returns broker's `message.max.bytes`
func (*ClusterAdminClientMockImpl) GetDefaultMockTopicName ¶
func (c *ClusterAdminClientMockImpl) GetDefaultMockTopicName() string
GetDefaultMockTopicName returns the default topic name
func (*ClusterAdminClientMockImpl) GetTopicConfig ¶
func (c *ClusterAdminClientMockImpl) GetTopicConfig(ctx context.Context, topicName string, configName string) (string, error)
GetTopicConfig implement the ClusterAdminClient interface
func (*ClusterAdminClientMockImpl) GetTopicMaxMessageBytes ¶
func (c *ClusterAdminClientMockImpl) GetTopicMaxMessageBytes() int
GetTopicMaxMessageBytes returns topic's `max.message.bytes`
func (*ClusterAdminClientMockImpl) GetTopicsMeta ¶
func (c *ClusterAdminClientMockImpl) GetTopicsMeta( _ context.Context, topics []string, _ bool, ) (map[string]TopicDetail, error)
GetTopicsMeta implement the ClusterAdminClient interface
func (*ClusterAdminClientMockImpl) GetTopicsPartitionsNum ¶
func (c *ClusterAdminClientMockImpl) GetTopicsPartitionsNum( _ context.Context, topics []string, ) (map[string]int32, error)
GetTopicsPartitionsNum implement the ClusterAdminClient interface
func (*ClusterAdminClientMockImpl) SetMinInsyncReplicas ¶
func (c *ClusterAdminClientMockImpl) SetMinInsyncReplicas(minInsyncReplicas string)
SetMinInsyncReplicas sets the MinInsyncReplicas for broker and default topic.
func (*ClusterAdminClientMockImpl) SetRemainingFetchesUntilTopicVisible ¶
func (c *ClusterAdminClientMockImpl) SetRemainingFetchesUntilTopicVisible( topicName string, fetchesRemainingUntilVisible int, ) error
SetRemainingFetchesUntilTopicVisible is used to control the visibility of a specific topic. It is used to mock the topic creation delay.
type Factory ¶
type Factory interface { // AdminClient return a kafka cluster admin client AdminClient(ctx context.Context) (ClusterAdminClient, error) // SyncProducer creates a sync producer to writer message to kafka SyncProducer(ctx context.Context) (SyncProducer, error) // AsyncProducer creates an async producer to writer message to kafka AsyncProducer(ctx context.Context, failpointCh chan error) (AsyncProducer, error) // MetricsCollector returns the kafka metrics collector MetricsCollector(role util.Role, adminClient ClusterAdminClient) MetricsCollector }
Factory is used to produce all kafka components.
func NewMockFactory ¶
func NewMockFactory( o *Options, changefeedID model.ChangeFeedID, ) (Factory, error)
NewMockFactory constructs a Factory with mock implementation.
func NewSaramaFactory ¶
func NewSaramaFactory( o *Options, changefeedID model.ChangeFeedID, ) (Factory, error)
NewSaramaFactory constructs a Factory with sarama implementation.
type FactoryCreator ¶
type FactoryCreator func(*Options, model.ChangeFeedID) (Factory, error)
FactoryCreator defines the type of factory creator.
type MetricsCollector ¶
MetricsCollector is the interface for kafka metrics collector.
func NewSaramaMetricsCollector ¶
func NewSaramaMetricsCollector( changefeedID model.ChangeFeedID, role util.Role, adminClient ClusterAdminClient, registry metrics.Registry, ) MetricsCollector
NewSaramaMetricsCollector return a kafka metrics collector based on sarama library.
type MockFactory ¶
type MockFactory struct {
// contains filtered or unexported fields
}
MockFactory is a mock implementation of Factory interface.
func (*MockFactory) AdminClient ¶
func (f *MockFactory) AdminClient(_ context.Context) (ClusterAdminClient, error)
AdminClient return a mocked admin client
func (*MockFactory) AsyncProducer ¶
func (f *MockFactory) AsyncProducer( ctx context.Context, failpointCh chan error, ) (AsyncProducer, error)
AsyncProducer creates an async producer
func (*MockFactory) MetricsCollector ¶
func (f *MockFactory) MetricsCollector( _ util.Role, _ ClusterAdminClient, ) MetricsCollector
MetricsCollector returns the metric collector
func (*MockFactory) SyncProducer ¶
func (f *MockFactory) SyncProducer(ctx context.Context) (SyncProducer, error)
SyncProducer creates a sync producer
type MockSaramaAsyncProducer ¶
type MockSaramaAsyncProducer struct { AsyncProducer *mocks.AsyncProducer // contains filtered or unexported fields }
MockSaramaAsyncProducer is a mock implementation of AsyncProducer interface.
func (*MockSaramaAsyncProducer) AsyncRunCallback ¶
func (p *MockSaramaAsyncProducer) AsyncRunCallback( ctx context.Context, ) error
AsyncRunCallback implement the AsyncProducer interface.
func (*MockSaramaAsyncProducer) AsyncSend ¶
func (p *MockSaramaAsyncProducer) AsyncSend(ctx context.Context, topic string, partition int32, message *common.Message) error
AsyncSend implement the AsyncProducer interface.
func (*MockSaramaAsyncProducer) Close ¶
func (p *MockSaramaAsyncProducer) Close()
Close implement the AsyncProducer interface.
type MockSaramaSyncProducer ¶
type MockSaramaSyncProducer struct {
Producer *mocks.SyncProducer
}
MockSaramaSyncProducer is a mock implementation of SyncProducer interface.
func (*MockSaramaSyncProducer) Close ¶
func (m *MockSaramaSyncProducer) Close()
Close implement the SyncProducer interface.
func (*MockSaramaSyncProducer) SendMessage ¶
func (m *MockSaramaSyncProducer) SendMessage( _ context.Context, topic string, partitionNum int32, message *common.Message, ) error
SendMessage implement the SyncProducer interface.
func (*MockSaramaSyncProducer) SendMessages ¶
func (m *MockSaramaSyncProducer) SendMessages(ctx context.Context, topic string, partitionNum int32, message *common.Message) error
SendMessages implement the SyncProducer interface.
type Options ¶
type Options struct { BrokerEndpoints []string // control whether to create topic AutoCreate bool PartitionNum int32 // User should make sure that `replication-factor` not greater than the number of kafka brokers. ReplicationFactor int16 Version string IsAssignedVersion bool RequestVersion int16 MaxMessageBytes int Compression string ClientID string RequiredAcks RequiredAcks // Only for test. User can not set this value. // The current prod default value is 0. MaxMessages int // Credential is used to connect to kafka cluster. EnableTLS bool Credential *security.Credential InsecureSkipVerify bool SASL *security.SASL // Timeout for network configurations, default to `10s` DialTimeout time.Duration WriteTimeout time.Duration ReadTimeout time.Duration }
Options stores user specified configurations
func (*Options) Apply ¶
func (o *Options) Apply(changefeedID model.ChangeFeedID, sinkURI *url.URL, replicaConfig *config.ReplicaConfig, ) error
Apply the sinkURI to update Options
func (*Options) DeriveTopicConfig ¶
func (o *Options) DeriveTopicConfig() *AutoCreateTopicConfig
DeriveTopicConfig derive a `topicConfig` from the `Options`
func (*Options) SetPartitionNum ¶
SetPartitionNum set the partition-num by the topic's partition count.
type RequiredAcks ¶
type RequiredAcks int16
RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements it must see before responding. Any of the constants defined here are valid. On broker versions prior to 0.8.2.0 any other positive int16 is also valid (the broker will wait for that many acknowledgements) but in 0.8.2.0 and later this will raise an exception (it has been replaced by setting the `min.isr` value in the brokers configuration).
const ( // NoResponse doesn't send any response, the TCP ACK is all you get. NoResponse RequiredAcks = 0 // WaitForLocal waits for only the local commit to succeed before responding. WaitForLocal RequiredAcks = 1 // WaitForAll waits for all in-sync replicas to commit before responding. // The minimum number of in-sync replicas is configured on the broker via // the `min.insync.replicas` configuration key. WaitForAll RequiredAcks = -1 // Unknown should never have been use in real config. Unknown RequiredAcks = 2 )
type SyncProducer ¶
type SyncProducer interface { // SendMessage produces a given message, and returns only when it either has // succeeded or failed to produce. It will return the partition and the offset // of the produced message, or an error if the message failed to produce. SendMessage(ctx context.Context, topic string, partitionNum int32, message *common.Message) error // SendMessages produces a given set of messages, and returns only when all // messages in the set have either succeeded or failed. Note that messages // can succeed and fail individually; if some succeed and some fail, // SendMessages will return an error. SendMessages(ctx context.Context, topic string, partitionNum int32, message *common.Message) error // Close shuts down the producer; you must call this function before a producer // object passes out of scope, as it may otherwise leak memory. // You must call this before calling Close on the underlying client. Close() }
SyncProducer is the kafka sync producer
type TopicDetail ¶
TopicDetail represent a topic's detail information.