Documentation ¶
Index ¶
- Constants
- Variables
- func AssignmentsToReplicas(assignments []PartitionAssignment) ([][]int, error)
- func BrokerCountsPerRack(brokers []BrokerInfo) map[string]int
- func BrokerIDs(brokers []BrokerInfo) []int
- func BrokerRacks(brokers []BrokerInfo) map[int]string
- func BrokersPerRack(brokers []BrokerInfo) map[string][]int
- func CheckAssignments(assignments []PartitionAssignment) error
- func DistinctRacks(brokers []BrokerInfo) []string
- func FormatAssignentDiffs(curr []PartitionAssignment, desired []PartitionAssignment, ...) string
- func FormatBrokerMaxPartitions(curr []PartitionAssignment, desired []PartitionAssignment, ...) string
- func FormatBrokerRackReplicas(brokers []BrokerInfo, topics []TopicInfo) string
- func FormatBrokerReplicas(brokers []BrokerInfo, topics []TopicInfo) string
- func FormatBrokers(brokers []BrokerInfo, full bool) string
- func FormatBrokersPerRack(brokers []BrokerInfo) string
- func FormatConfig(configMap map[string]string) string
- func FormatTopicLeadersPerRack(topic TopicInfo, brokers []BrokerInfo) string
- func FormatTopicPartitions(partitions []PartitionInfo, brokers []BrokerInfo) string
- func FormatTopics(topics []TopicInfo, brokers []BrokerInfo, full bool) string
- func HasLeaders(topics []TopicInfo) bool
- func LeadersPerRack(brokers []BrokerInfo, topic TopicInfo) map[string]int
- func MaxPartitionsPerBroker(allAssignments ...[]PartitionAssignment) map[int]int
- func MaxReplication(topics []TopicInfo) int
- func NewLeaderPartitions(current []PartitionAssignment, desired []PartitionAssignment) []int
- func ParseBrokerThrottles(brokers []BrokerInfo) ([]BrokerThrottle, []BrokerThrottle, error)
- func ParsePartitionThrottles(topic TopicInfo) ([]PartitionThrottle, []PartitionThrottle, error)
- func PartitionIDs(partitions []PartitionInfo) []int
- func PartitionThrottleConfigEntries(leaderThrottles []PartitionThrottle, followerThrottles []PartitionThrottle) []kafka.ConfigEntry
- func SameBrokers(a PartitionAssignment, b PartitionAssignment) bool
- func ThrottledBrokerIDs(brokers []BrokerInfo) []int
- func ThrottledTopicNames(topics []TopicInfo) []string
- type AssignmentDiff
- type BrokerAdminClient
- func (c *BrokerAdminClient) AcquireLock(ctx context.Context, path string) (zk.Lock, error)
- func (c *BrokerAdminClient) AddPartitions(ctx context.Context, topic string, newAssignments []PartitionAssignment) error
- func (c *BrokerAdminClient) AssignPartitions(ctx context.Context, topic string, assignments []PartitionAssignment) error
- func (c *BrokerAdminClient) Close() error
- func (c *BrokerAdminClient) CreateTopic(ctx context.Context, config kafka.TopicConfig) error
- func (c *BrokerAdminClient) GetBrokerIDs(ctx context.Context) ([]int, error)
- func (c *BrokerAdminClient) GetBrokers(ctx context.Context, ids []int) ([]BrokerInfo, error)
- func (c *BrokerAdminClient) GetClusterID(ctx context.Context) (string, error)
- func (c *BrokerAdminClient) GetConnector() *Connector
- func (c *BrokerAdminClient) GetSupportedFeatures() SupportedFeatures
- func (c *BrokerAdminClient) GetTopic(ctx context.Context, name string, detailed bool) (TopicInfo, error)
- func (c *BrokerAdminClient) GetTopicNames(ctx context.Context) ([]string, error)
- func (c *BrokerAdminClient) GetTopics(ctx context.Context, names []string, detailed bool) ([]TopicInfo, error)
- func (c *BrokerAdminClient) LockHeld(ctx context.Context, path string) (bool, error)
- func (c *BrokerAdminClient) RunLeaderElection(ctx context.Context, topic string, partitions []int) error
- func (c *BrokerAdminClient) UpdateBrokerConfig(ctx context.Context, id int, configEntries []kafka.ConfigEntry, overwrite bool) ([]string, error)
- func (c *BrokerAdminClient) UpdateTopicConfig(ctx context.Context, name string, configEntries []kafka.ConfigEntry, ...) ([]string, error)
- type BrokerAdminClientConfig
- type BrokerInfo
- type BrokerThrottle
- type Client
- type Connector
- type ConnectorConfig
- type PartitionAssignment
- type PartitionInfo
- type PartitionThrottle
- type SASLConfig
- type SASLMechanism
- type SupportedFeatures
- type TLSConfig
- type TopicInfo
- func (t TopicInfo) AllLeadersCorrect() bool
- func (t TopicInfo) AllReplicasInSync() bool
- func (t TopicInfo) IsThrottled() bool
- func (t TopicInfo) MaxISR() int
- func (t TopicInfo) MaxReplication() int
- func (t TopicInfo) OutOfSyncPartitions(subset []int) []PartitionInfo
- func (t TopicInfo) PartitionIDs() []int
- func (t TopicInfo) RackCounts(brokerRacks map[int]string) (int, int, error)
- func (t TopicInfo) Retention() time.Duration
- func (t TopicInfo) ToAssignments() []PartitionAssignment
- func (t TopicInfo) WrongLeaderPartitions(subset []int) []PartitionInfo
- type ZKAdminClient
- func (c *ZKAdminClient) AcquireLock(ctx context.Context, path string) (zk.Lock, error)
- func (c *ZKAdminClient) AddPartitions(ctx context.Context, topic string, newAssignments []PartitionAssignment) error
- func (c *ZKAdminClient) AssignPartitions(ctx context.Context, topic string, assignments []PartitionAssignment) error
- func (c *ZKAdminClient) Close() error
- func (c *ZKAdminClient) CreateTopic(ctx context.Context, config kafka.TopicConfig) error
- func (c *ZKAdminClient) GetBrokerIDs(ctx context.Context) ([]int, error)
- func (c *ZKAdminClient) GetBrokers(ctx context.Context, ids []int) ([]BrokerInfo, error)
- func (c *ZKAdminClient) GetClusterID(ctx context.Context) (string, error)
- func (c *ZKAdminClient) GetConnector() *Connector
- func (c *ZKAdminClient) GetSupportedFeatures() SupportedFeatures
- func (c *ZKAdminClient) GetTopic(ctx context.Context, name string, detailed bool) (TopicInfo, error)
- func (c *ZKAdminClient) GetTopicNames(ctx context.Context) ([]string, error)
- func (c *ZKAdminClient) GetTopics(ctx context.Context, names []string, detailed bool) ([]TopicInfo, error)
- func (c *ZKAdminClient) LockHeld(ctx context.Context, path string) (bool, error)
- func (c *ZKAdminClient) RunLeaderElection(ctx context.Context, topic string, partitions []int) error
- func (c *ZKAdminClient) UpdateBrokerConfig(ctx context.Context, id int, configEntries []kafka.ConfigEntry, overwrite bool) ([]string, error)
- func (c *ZKAdminClient) UpdateTopicConfig(ctx context.Context, name string, configEntries []kafka.ConfigEntry, ...) ([]string, error)
- type ZKAdminClientConfig
Constants ¶
const ( // RetentionKey is the config key used for topic time retention. RetentionKey = "retention.ms" // LeaderThrottledKey is the config key for the leader throttle rate. LeaderThrottledKey = "leader.replication.throttled.rate" // FollowerThrottledKey is the config key for the follower throttle rate. FollowerThrottledKey = "follower.replication.throttled.rate" // LeaderReplicasThrottledKey is the config key for the list of leader replicas // that should be throttled. LeaderReplicasThrottledKey = "leader.replication.throttled.replicas" // FollowerReplicasThrottledKey is the config key for the list of follower replicas // that should be throttled. FollowerReplicasThrottledKey = "follower.replication.throttled.replicas" )
Variables ¶
var ( // ErrTopicDoesNotExist is returned by admin functions when a topic that should exist // does not. ErrTopicDoesNotExist = errors.New("Topic does not exist") )
Functions ¶
func AssignmentsToReplicas ¶
func AssignmentsToReplicas(assignments []PartitionAssignment) ([][]int, error)
AssignmentsToReplicas is the inverse of ReplicasToAssignments. Used for unit tests.
func BrokerCountsPerRack ¶
func BrokerCountsPerRack(brokers []BrokerInfo) map[string]int
BrokerCountsPerRack returns a mapping of rack -> number of brokers.
func BrokerIDs ¶
func BrokerIDs(brokers []BrokerInfo) []int
BrokerIDs returns a slice of the IDs of the argument brokers.
func BrokerRacks ¶
func BrokerRacks(brokers []BrokerInfo) map[int]string
BrokerRacks returns a mapping of broker ID -> rack.
func BrokersPerRack ¶
func BrokersPerRack(brokers []BrokerInfo) map[string][]int
BrokersPerRack returns a mapping of rack -> broker IDs.
func CheckAssignments ¶
func CheckAssignments(assignments []PartitionAssignment) error
CheckAssignments does some basic sanity checks on the assignments that are passed into an Assigner or extender so that we can fail early if something is obviously wrong.
func DistinctRacks ¶
func DistinctRacks(brokers []BrokerInfo) []string
DistinctRacks returns a sorted slice of all the distinct racks in the cluster.
func FormatAssignentDiffs ¶
func FormatAssignentDiffs( curr []PartitionAssignment, desired []PartitionAssignment, brokers []BrokerInfo, ) string
FormatAssignentDiffs generates a pretty table that shows the before and after states of a partition replica and/or leader update.
func FormatBrokerMaxPartitions ¶
func FormatBrokerMaxPartitions( curr []PartitionAssignment, desired []PartitionAssignment, brokers []BrokerInfo, ) string
FormatBrokerMaxPartitions generates a pretty table that shows the total number of partitions that each broker is involved in for a diff. It's used to evaluate the potential extra load that could occur on brokers during a migration.
func FormatBrokerRackReplicas ¶
func FormatBrokerRackReplicas(brokers []BrokerInfo, topics []TopicInfo) string
FormatBrokerRackReplicas creates a pretty table that shows how many replicas are in each position (i.e., leader, second, third) by rack across all topics. Useful for showing total-topic balance.
func FormatBrokerReplicas ¶
func FormatBrokerReplicas(brokers []BrokerInfo, topics []TopicInfo) string
FormatBrokerReplicas creates a pretty table that shows how many replicas are in each position (i.e., leader, second, third) by broker across all topics. Useful for showing total-topic balance.
func FormatBrokers ¶
func FormatBrokers(brokers []BrokerInfo, full bool) string
FormatBrokers creates a pretty table from a list of brokers.
func FormatBrokersPerRack ¶
func FormatBrokersPerRack(brokers []BrokerInfo) string
FormatBrokersPerRack creates a pretty table that shows the number of brokers per rack.
func FormatConfig ¶
FormatConfig creates a pretty table with all of the keys and values in a topic or broker config.
func FormatTopicLeadersPerRack ¶
func FormatTopicLeadersPerRack(topic TopicInfo, brokers []BrokerInfo) string
FormatTopicLeadersPerRack creates a pretty table that shows the number of partitions with a leader in each rack.
func FormatTopicPartitions ¶
func FormatTopicPartitions(partitions []PartitionInfo, brokers []BrokerInfo) string
FormatTopicPartitions creates a pretty table with information on all of the partitions for a topic.
func FormatTopics ¶
func FormatTopics(topics []TopicInfo, brokers []BrokerInfo, full bool) string
FormatTopics creates a pretty table that lists the details of the argument topics.
func HasLeaders ¶
HasLeaders returns whether at least one partition in the argument topics has a non-zero leader set. Used for formatting purposes.
func LeadersPerRack ¶
func LeadersPerRack(brokers []BrokerInfo, topic TopicInfo) map[string]int
LeadersPerRack returns a mapping of rack -> number of partitions with a leader in that rack.
func MaxPartitionsPerBroker ¶
func MaxPartitionsPerBroker( allAssignments ...[]PartitionAssignment, ) map[int]int
MaxPartitionsPerBroker calculates the number of partitions that each broker may need to handle during a migration.
func MaxReplication ¶
MaxReplication returns the maximum amount of replication across all partitions in the argument topics.
func NewLeaderPartitions ¶
func NewLeaderPartitions( current []PartitionAssignment, desired []PartitionAssignment) []int
NewLeaderPartitions returns the partition IDs which will have new leaders given the current and desired assignments.
func ParseBrokerThrottles ¶
func ParseBrokerThrottles(brokers []BrokerInfo) ( []BrokerThrottle, []BrokerThrottle, error, )
ParseBrokerThrottles returns slices of the leader and follower throttles for the argument brokers.
func ParsePartitionThrottles ¶
func ParsePartitionThrottles(topic TopicInfo) ( []PartitionThrottle, []PartitionThrottle, error, )
ParsePartitionThrottles returns slices of the leader and follower partition throttles for the argument topic.
func PartitionIDs ¶
func PartitionIDs(partitions []PartitionInfo) []int
PartitionIDs returns the IDs from the argument partitions.
func PartitionThrottleConfigEntries ¶
func PartitionThrottleConfigEntries( leaderThrottles []PartitionThrottle, followerThrottles []PartitionThrottle, ) []kafka.ConfigEntry
PartitionThrottleConfigEntries generates the topic config entries for the provided leader and follower throttles.
func SameBrokers ¶
func SameBrokers( a PartitionAssignment, b PartitionAssignment, ) bool
SameBrokers returns whether two PartitionAssignments have the same brokers.
func ThrottledBrokerIDs ¶
func ThrottledBrokerIDs(brokers []BrokerInfo) []int
ThrottledBrokerIDs returns a slice of the IDs of the subset of argument brokers that have throttles on them.
func ThrottledTopicNames ¶
ThrottledTopicNames returns the names of topics in the argument slice that have throttles on them.
Types ¶
type AssignmentDiff ¶
type AssignmentDiff struct { PartitionID int Old PartitionAssignment New PartitionAssignment }
AssignmentDiff represents the diff in a single partition reassignment.
func AssignmentDiffs ¶
func AssignmentDiffs( current []PartitionAssignment, desired []PartitionAssignment, ) []AssignmentDiff
AssignmentDiffs returns the diffs implied by the argument current and desired PartitionAssignments. Used for displaying diffs to user.
type BrokerAdminClient ¶
type BrokerAdminClient struct {
// contains filtered or unexported fields
}
BrokerAdminClient is a Client implementation that only uses broker APIs, without any zookeeper access.
func NewBrokerAdminClient ¶
func NewBrokerAdminClient( ctx context.Context, config BrokerAdminClientConfig, ) (*BrokerAdminClient, error)
NewBrokerAdminClient constructs a new BrokerAdminClient instance.
func (*BrokerAdminClient) AcquireLock ¶
AcquireLock acquires a lock that can be used to prevent simultaneous changes to a topic. NOTE: Not implemented for broker-based clients.
func (*BrokerAdminClient) AddPartitions ¶
func (c *BrokerAdminClient) AddPartitions( ctx context.Context, topic string, newAssignments []PartitionAssignment, ) error
AddPartitions extends a topic by adding one or more new partitions to it.
func (*BrokerAdminClient) AssignPartitions ¶
func (c *BrokerAdminClient) AssignPartitions( ctx context.Context, topic string, assignments []PartitionAssignment, ) error
AssignPartitions sets the replica broker IDs for one or more partitions in a topic.
func (*BrokerAdminClient) Close ¶
func (c *BrokerAdminClient) Close() error
Close closes the client.
func (*BrokerAdminClient) CreateTopic ¶
func (c *BrokerAdminClient) CreateTopic( ctx context.Context, config kafka.TopicConfig, ) error
CreateTopic creates a topic in the cluster.
func (*BrokerAdminClient) GetBrokerIDs ¶
func (c *BrokerAdminClient) GetBrokerIDs(ctx context.Context) ([]int, error)
GetBrokerIDs get the IDs of all brokers in the cluster.
func (*BrokerAdminClient) GetBrokers ¶
func (c *BrokerAdminClient) GetBrokers(ctx context.Context, ids []int) ( []BrokerInfo, error, )
GetBrokers gets information about all brokers in the cluster.
func (*BrokerAdminClient) GetClusterID ¶
func (c *BrokerAdminClient) GetClusterID(ctx context.Context) (string, error)
GetClusterID gets the ID of the cluster.
func (*BrokerAdminClient) GetConnector ¶
func (c *BrokerAdminClient) GetConnector() *Connector
GetConnector gets the Connector instance for this cluster.
func (*BrokerAdminClient) GetSupportedFeatures ¶
func (c *BrokerAdminClient) GetSupportedFeatures() SupportedFeatures
GetSupportedFeatures gets the features supported by the cluster for this client.
func (*BrokerAdminClient) GetTopic ¶
func (c *BrokerAdminClient) GetTopic( ctx context.Context, name string, detailed bool, ) (TopicInfo, error)
GetTopic gets the details of a single topic in the cluster.
func (*BrokerAdminClient) GetTopicNames ¶
func (c *BrokerAdminClient) GetTopicNames(ctx context.Context) ([]string, error)
GetTopicNames gets just the names of each topic in the cluster.
func (*BrokerAdminClient) GetTopics ¶
func (c *BrokerAdminClient) GetTopics( ctx context.Context, names []string, detailed bool, ) ([]TopicInfo, error)
GetTopics gets full information about each topic in the cluster.
func (*BrokerAdminClient) LockHeld ¶
LockHeld returns whether a lock is currently held for the given path. NOTE: Not implemented for broker-based clients.
func (*BrokerAdminClient) RunLeaderElection ¶
func (c *BrokerAdminClient) RunLeaderElection( ctx context.Context, topic string, partitions []int, ) error
RunLeaderElection triggers a leader election for one or more partitions in a topic.
func (*BrokerAdminClient) UpdateBrokerConfig ¶
func (c *BrokerAdminClient) UpdateBrokerConfig( ctx context.Context, id int, configEntries []kafka.ConfigEntry, overwrite bool, ) ([]string, error)
UpdateBrokerConfig updates the configuration for the argument broker. It returns the config keys that were updated.
func (*BrokerAdminClient) UpdateTopicConfig ¶
func (c *BrokerAdminClient) UpdateTopicConfig( ctx context.Context, name string, configEntries []kafka.ConfigEntry, overwrite bool, ) ([]string, error)
UpdateTopicConfig updates the configuration for the argument topic. It returns the config keys that were updated.
type BrokerAdminClientConfig ¶
type BrokerAdminClientConfig struct { ConnectorConfig ReadOnly bool ExpectedClusterID string }
BrokerAdminClientConfig contains the configuration settings to construct a BrokerAdminClient instance.
type BrokerInfo ¶
type BrokerInfo struct { ID int `json:"id"` Endpoints []string `json:"endpoints"` Host string `json:"host"` Port int32 `json:"port"` InstanceID string `json:"instanceID"` AvailabilityZone string `json:"availabilityZone"` Rack string `json:"rack"` InstanceType string `json:"instanceType"` Version int `json:"version"` Timestamp time.Time `json:"timestamp"` Config map[string]string `json:"config"` }
BrokerInfo represents the information stored about a broker in zookeeper.
func (BrokerInfo) Addr ¶
func (b BrokerInfo) Addr() string
Addr returns the address of the current BrokerInfo.
func (BrokerInfo) IsThrottled ¶
func (b BrokerInfo) IsThrottled() bool
IsThrottled determines whether the broker has any throttles in its config.
type BrokerThrottle ¶
BrokerThrottle represents a throttle being applied to a single broker.
func BrokerThrottles ¶
func BrokerThrottles( leaderThrottles []PartitionThrottle, followerThrottles []PartitionThrottle, throttleBytes int64, ) []BrokerThrottle
BrokerThrottles returns a slice of BrokerThrottles that we should apply. It's currently just set from the union of the leader and follower brokers (matching the behavior of bin/kafka-reassign-partitions.sh).
func (BrokerThrottle) ConfigEntries ¶
func (b BrokerThrottle) ConfigEntries() []kafka.ConfigEntry
ConfigEntries returns the kafka config entries associated with this broker throttle.
type Client ¶
type Client interface { // GetClusterID gets the ID of the cluster. GetClusterID(ctx context.Context) (string, error) // GetBrokers gets information about all brokers in the cluster. GetBrokers(ctx context.Context, ids []int) ([]BrokerInfo, error) // GetBrokerIDs get the IDs of all brokers in the cluster. GetBrokerIDs(ctx context.Context) ([]int, error) // GetConnector gets the Connector instance for this cluster. GetConnector() *Connector // GetTopics gets full information about each topic in the cluster. GetTopics( ctx context.Context, names []string, detailed bool, ) ([]TopicInfo, error) // GetTopicNames gets just the names of each topic in the cluster. GetTopicNames(ctx context.Context) ([]string, error) // GetTopic gets the details of a single topic in the cluster. GetTopic( ctx context.Context, name string, detailed bool, ) (TopicInfo, error) // UpdateTopicConfig updates the configuration for the argument topic. It returns the config // keys that were updated. UpdateTopicConfig( ctx context.Context, name string, configEntries []kafka.ConfigEntry, overwrite bool, ) ([]string, error) // UpdateBrokerConfig updates the configuration for the argument broker. It returns the config // keys that were updated. UpdateBrokerConfig( ctx context.Context, id int, configEntries []kafka.ConfigEntry, overwrite bool, ) ([]string, error) // CreateTopic creates a topic in the cluster. CreateTopic( ctx context.Context, config kafka.TopicConfig, ) error // AssignPartitions sets the replica broker IDs for one or more partitions in a topic. AssignPartitions( ctx context.Context, topic string, assignments []PartitionAssignment, ) error // AddPartitions extends a topic by adding one or more new partitions to it. AddPartitions( ctx context.Context, topic string, newAssignments []PartitionAssignment, ) error // RunLeaderElection triggers a leader election for one or more partitions in a topic. RunLeaderElection( ctx context.Context, topic string, partitions []int, ) error // AcquireLock acquires a lock that can be used to prevent simultaneous changes to a topic. AcquireLock(ctx context.Context, path string) (zk.Lock, error) // LockHeld returns whether a lock is currently held for the given path. LockHeld(ctx context.Context, path string) (bool, error) // GetSupportedFeatures gets the features supported by the cluster for this client. GetSupportedFeatures() SupportedFeatures // Close closes the client. Close() error }
Client is an interface for interacting with a cluster for administrative tasks.
type Connector ¶
type Connector struct { Config ConnectorConfig Dialer *kafka.Dialer KafkaClient *kafka.Client }
Connector is a wrapper around the low-level, kafka-go dialer and client.
func NewConnector ¶
func NewConnector(config ConnectorConfig) (*Connector, error)
NewConnector contructs a new Connector instance given the argument config.
type ConnectorConfig ¶
type ConnectorConfig struct { BrokerAddr string TLS TLSConfig SASL SASLConfig }
ConnectorConfig contains the configuration used to contruct a connector.
type PartitionAssignment ¶
PartitionAssignment contains the actual or desired assignment of replicas in a topic partition.
func AssignmentsToUpdate ¶
func AssignmentsToUpdate( current []PartitionAssignment, desired []PartitionAssignment, ) []PartitionAssignment
AssignmentsToUpdate returns the subset of assignments that need to be updated given the current and desired states.
func CopyAssignments ¶
func CopyAssignments( curr []PartitionAssignment, ) []PartitionAssignment
CopyAssignments returns a deep copy of the argument PartitionAssignment slice.
func ReplicasToAssignments ¶
func ReplicasToAssignments( replicaSlices [][]int, ) []PartitionAssignment
ReplicasToAssignments converts a slice of slices to a slice of PartitionAssignments, assuming that the argument slices are in partition order. Used for unit tests.
func (PartitionAssignment) Copy ¶
func (a PartitionAssignment) Copy() PartitionAssignment
Copy returns a deep copy of this PartitionAssignment.
func (PartitionAssignment) DistinctRacks ¶
func (a PartitionAssignment) DistinctRacks( brokerRacks map[int]string, ) map[string]struct{}
DistinctRacks returns a map of the distinct racks in this PartitionAssignment.
func (PartitionAssignment) Index ¶
func (a PartitionAssignment) Index(replica int) int
Index returns the index of the argument replica, or -1 if it can't be found.
type PartitionInfo ¶
type PartitionInfo struct { Topic string `json:"topic"` ID int `json:"ID"` Leader int `json:"leader"` Version int `json:"version"` Replicas []int `json:"replicas"` ISR []int `json:"isr"` ControllerEpoch int `json:"controllerEpoch"` LeaderEpoch int `json:"leaderEpoch"` }
PartitionInfo represents the information stored about a topic partition in zookeeper.
type PartitionThrottle ¶
PartitionThrottle represents a throttle being applied to a single partition, broker combination.
func FollowerPartitionThrottles ¶
func FollowerPartitionThrottles( curr []PartitionAssignment, desired []PartitionAssignment, ) []PartitionThrottle
FollowerPartitionThrottles returns a slice of PartitionThrottles that we should apply on the follower side.
See https://kafka.apache.org/0101/documentation.html for discussion on how these should be applied.
func LeaderPartitionThrottles ¶
func LeaderPartitionThrottles( curr []PartitionAssignment, desired []PartitionAssignment, ) []PartitionThrottle
LeaderPartitionThrottles returns a slice of PartitionThrottles that we should apply on the leader side.
See https://kafka.apache.org/0101/documentation.html for discussion on how these should be applied.
func ParsePartitionThrottleStr ¶
func ParsePartitionThrottleStr(valuesStr string) ([]PartitionThrottle, error)
ParsePartitionThrottleStr converts a throttle config string from zk into a slice of PartitionThrottle structs.
func (PartitionThrottle) String ¶
func (p PartitionThrottle) String() string
type SASLConfig ¶
type SASLConfig struct { Enabled bool Mechanism SASLMechanism Username string Password string }
SASLConfig stores the SASL-related configuration for a connection.
type SASLMechanism ¶
type SASLMechanism string
SASLMechanism is the name of a SASL mechanism that will be used for client authentication.
const ( SASLMechanismAWSMSKIAM SASLMechanism = "aws-msk-iam" SASLMechanismPlain SASLMechanism = "plain" SASLMechanismScramSHA256 SASLMechanism = "scram-sha-256" SASLMechanismScramSHA512 SASLMechanism = "scram-sha-512" )
func SASLNameToMechanism ¶
func SASLNameToMechanism(name string) (SASLMechanism, error)
SASLNameToMechanism converts the argument SASL mechanism name string to a valid instance of the SASLMechanism enum.
type SupportedFeatures ¶
type SupportedFeatures struct { // Reads indicates whether the client supports reading basic cluster information // (metadata, configs, etc.). Reads bool // Applies indicates whether the client supports the functionality required for applying // (e.g., changing configs, electing leaders, etc.). Applies bool // Locks indicates whether the client supports locking. Locks bool // DynamicBrokerConfigs indicates whether the client can return dynamic broker configs // like leader.replication.throttled.rate. DynamicBrokerConfigs bool }
SupportedFeatures provides a summary of what an admin client supports.
type TLSConfig ¶
type TLSConfig struct { Enabled bool CertPath string KeyPath string CACertPath string ServerName string SkipVerify bool }
TLSConfig stores the TLS-related configuration for a connection.
type TopicInfo ¶
type TopicInfo struct { Name string `json:"name"` Config map[string]string `json:"config"` Partitions []PartitionInfo `json:"partitions"` Version int `json:"version"` }
TopicInfo represents the information stored about a topic in zookeeper.
func (TopicInfo) AllLeadersCorrect ¶
AllLeadersCorrect returns whether leader == replicas[0] for all partitions.
func (TopicInfo) AllReplicasInSync ¶
AllReplicasInSync returns whether all partitions have ISR == replicas (ignoring order).
func (TopicInfo) IsThrottled ¶
IsThrottled determines whether the topic has any throttles in its config.
func (TopicInfo) MaxISR ¶
MaxISR returns the maximum number of in-sync replicas across all partitions in a topic.
func (TopicInfo) MaxReplication ¶
MaxReplication returns the maximum number of replicas across all partitions in a topic.
func (TopicInfo) OutOfSyncPartitions ¶
func (t TopicInfo) OutOfSyncPartitions(subset []int) []PartitionInfo
OutOfSyncPartitions returns the partitions for which ISR != replicas (ignoring order).
func (TopicInfo) PartitionIDs ¶
PartitionIDs returns an ordered slice of partition IDs for a topic.
func (TopicInfo) RackCounts ¶
RackCounts returns the minimum and maximum distinct rack counts across all partitions in a topic.
func (TopicInfo) Retention ¶
Retention returns the retention duration implied by a topic config. If unset, it returns 0.
func (TopicInfo) ToAssignments ¶
func (t TopicInfo) ToAssignments() []PartitionAssignment
ToAssignments converts a topic to a slice of partition assignments.
func (TopicInfo) WrongLeaderPartitions ¶
func (t TopicInfo) WrongLeaderPartitions(subset []int) []PartitionInfo
WrongLeaderPartitions returns the partitions where leader != replicas[0].
type ZKAdminClient ¶
type ZKAdminClient struct { Connector *Connector // contains filtered or unexported fields }
ZKAdminClient is a general client for interacting with a kafka cluster that assumes zookeeper access. Most interactions are done via the latter, but a few (e.g., creating topics or getting the controller address) are done via the broker API instead.
func NewZKAdminClient ¶
func NewZKAdminClient( ctx context.Context, config ZKAdminClientConfig, ) (*ZKAdminClient, error)
NewZKAdminClient creates and returns a new Client instance.
func (*ZKAdminClient) AcquireLock ¶
AcquireLock acquires and returns a lock from the underlying zookeeper client. The Unlock method should be called on the lock when it's safe to release.
func (*ZKAdminClient) AddPartitions ¶
func (c *ZKAdminClient) AddPartitions( ctx context.Context, topic string, newAssignments []PartitionAssignment, ) error
AddPartitions adds one or more partitions to an existing topic. Unlike AssignPartitions, this directly updates the topic's partition config in zookeeper.
func (*ZKAdminClient) AssignPartitions ¶
func (c *ZKAdminClient) AssignPartitions( ctx context.Context, topic string, assignments []PartitionAssignment, ) error
AssignPartitions notifies the cluster to begin a partition reassignment. This should only be used for existing partitions; to create new partitions, use the AddPartitions method.
func (*ZKAdminClient) Close ¶
func (c *ZKAdminClient) Close() error
Close closes the connections in the underlying zookeeper client.
func (*ZKAdminClient) CreateTopic ¶
func (c *ZKAdminClient) CreateTopic( ctx context.Context, config kafka.TopicConfig, ) error
CreateTopic creates a new topic with the argument config. It uses the topic creation API exposed on the controller broker.
func (*ZKAdminClient) GetBrokerIDs ¶
func (c *ZKAdminClient) GetBrokerIDs(ctx context.Context) ([]int, error)
GetBrokerIDs returns a slice of all broker IDs.
func (*ZKAdminClient) GetBrokers ¶
func (c *ZKAdminClient) GetBrokers( ctx context.Context, ids []int, ) ([]BrokerInfo, error)
GetBrokers gets information on one or more cluster brokers from zookeeper. If the argument ids is unset, then it fetches all brokers.
func (*ZKAdminClient) GetClusterID ¶
func (c *ZKAdminClient) GetClusterID( ctx context.Context, ) (string, error)
GetClusterID gets the cluster ID from zookeeper. This ID is generated when the cluster is created and should be stable over the life of the cluster.
func (*ZKAdminClient) GetConnector ¶
func (c *ZKAdminClient) GetConnector() *Connector
GetConnector returns the Connector instance associated with this client.
func (*ZKAdminClient) GetSupportedFeatures ¶
func (c *ZKAdminClient) GetSupportedFeatures() SupportedFeatures
GetSupportedFeatures returns the features that are supported by this client.
func (*ZKAdminClient) GetTopic ¶
func (c *ZKAdminClient) GetTopic( ctx context.Context, name string, detailed bool, ) (TopicInfo, error)
GetTopic is a wrapper around GetTopics(...) for getting information about a single topic.
func (*ZKAdminClient) GetTopicNames ¶
func (c *ZKAdminClient) GetTopicNames(ctx context.Context) ([]string, error)
GetTopicNames gets all topic names from zookeeper.
func (*ZKAdminClient) GetTopics ¶
func (c *ZKAdminClient) GetTopics( ctx context.Context, names []string, detailed bool, ) ([]TopicInfo, error)
GetTopics gets information about one or more cluster topics from zookeeper. If the argument names is unset, then it fetches all topics. The detailed parameter determines whether the ISRs and leaders are fetched for each partition.
func (*ZKAdminClient) LockHeld ¶
LockHeld determines whether the lock with the provided path is held (i.e., has children).
func (*ZKAdminClient) RunLeaderElection ¶
func (c *ZKAdminClient) RunLeaderElection( ctx context.Context, topic string, partitions []int, ) error
RunLeaderElection triggers a leader election for the argument topic and partitions.
func (*ZKAdminClient) UpdateBrokerConfig ¶
func (c *ZKAdminClient) UpdateBrokerConfig( ctx context.Context, id int, configEntries []kafka.ConfigEntry, overwrite bool, ) ([]string, error)
UpdateBrokerConfig updates the config JSON for a cluster broker and sets a change notification so the cluster brokers are notified. If overwrite is true, then it will overwrite existing config entries.
The function returns the list of keys that were modified. If overwrite is set to false, this can be used to determine the subset of entries
func (*ZKAdminClient) UpdateTopicConfig ¶
func (c *ZKAdminClient) UpdateTopicConfig( ctx context.Context, name string, configEntries []kafka.ConfigEntry, overwrite bool, ) ([]string, error)
UpdateTopicConfig updates the config JSON for a topic and sets a change notification so that the brokers are notified. If overwrite is true, then it will overwrite existing config entries.
The function returns the list of keys that were modified. If overwrite is set to false, this can be used to determine the subset of entries that were already set.