Documentation ¶
Index ¶
- Constants
- Variables
- func AssignmentsToReplicas(assignments []PartitionAssignment) ([][]int, error)
- func BrokerCountsPerRack(brokers []BrokerInfo) map[string]int
- func BrokerIDs(brokers []BrokerInfo) []int
- func BrokerRacks(brokers []BrokerInfo) map[int]string
- func BrokersPerRack(brokers []BrokerInfo) map[string][]int
- func CheckAssignments(assignments []PartitionAssignment) error
- func DistinctRacks(brokers []BrokerInfo) []string
- func FormatACLInfo(a ACLInfo) string
- func FormatACLs(acls []ACLInfo) string
- func FormatAssignentDiffs(curr []PartitionAssignment, desired []PartitionAssignment, ...) string
- func FormatBrokerMaxPartitions(curr []PartitionAssignment, desired []PartitionAssignment, ...) string
- func FormatBrokerRackReplicas(brokers []BrokerInfo, topics []TopicInfo) string
- func FormatBrokerReplicas(brokers []BrokerInfo, topics []TopicInfo) string
- func FormatBrokers(brokers []BrokerInfo, full bool) string
- func FormatBrokersPerRack(brokers []BrokerInfo) string
- func FormatClusterID(clusterID string) string
- func FormatConfig(configMap map[string]string) string
- func FormatControllerID(brokerID int) string
- func FormatTopicLeadersPerRack(topic TopicInfo, brokers []BrokerInfo) string
- func FormatTopicPartitions(partitions []PartitionInfo, brokers []BrokerInfo) string
- func FormatTopics(topics []TopicInfo, brokers []BrokerInfo, full bool) string
- func FormatTopicsPartitions(topicsPartitionsStatusInfo map[string][]PartitionStatusInfo, ...) string
- func FormatTopicsPartitionsSummary(topicsPartitionsStatusSummary map[string]map[PartitionStatus][]int) string
- func FormatUsers(users []UserInfo) string
- func GetAllTopicNamesFromMetadata(metadata *kafka.MetadataResponse) map[string]bool
- func GetKafkaCredentials(svc secretsmanageriface.SecretsManagerAPI, secretArn string) (credentials, error)
- func GetTopicsPartitionsStatusInfo(metadata *kafka.MetadataResponse, topics []string, status PartitionStatus) map[string][]PartitionStatusInfo
- func GetTopicsPartitionsStatusSummary(metadata *kafka.MetadataResponse, topics []string, status PartitionStatus) (map[string]map[PartitionStatus][]int, int, int, int)
- func GetValidTopicNamesFromMetadata(topics []string, metadata *kafka.MetadataResponse) map[string]bool
- func HasLeaders(topics []TopicInfo) bool
- func LeadersPerRack(brokers []BrokerInfo, topic TopicInfo) map[string]int
- func MaxPartitionsPerBroker(allAssignments ...[]PartitionAssignment) map[int]int
- func MaxReplication(topics []TopicInfo) int
- func NewLeaderPartitions(current []PartitionAssignment, desired []PartitionAssignment) []int
- func ParseBrokerThrottles(brokers []BrokerInfo) ([]BrokerThrottle, []BrokerThrottle, error)
- func ParsePartitionThrottles(topic TopicInfo) ([]PartitionThrottle, []PartitionThrottle, error)
- func PartitionIDs(partitions []PartitionInfo) []int
- func PartitionThrottleConfigEntries(leaderThrottles []PartitionThrottle, followerThrottles []PartitionThrottle) []kafka.ConfigEntry
- func SameBrokers(a PartitionAssignment, b PartitionAssignment) bool
- func ThrottledBrokerIDs(brokers []BrokerInfo) []int
- func ThrottledTopicNames(topics []TopicInfo) []string
- type ACLInfo
- type ACLOperationType
- type ACLPermissionType
- type AssignmentDiff
- type BrokerAdminClient
- func (c *BrokerAdminClient) AcquireLock(ctx context.Context, path string) (zk.Lock, error)
- func (c *BrokerAdminClient) AddPartitions(ctx context.Context, topic string, newAssignments []PartitionAssignment) error
- func (c *BrokerAdminClient) AssignPartitions(ctx context.Context, topic string, assignments []PartitionAssignment) error
- func (c *BrokerAdminClient) Close() error
- func (c *BrokerAdminClient) CreateACLs(ctx context.Context, acls []kafka.ACLEntry) error
- func (c *BrokerAdminClient) CreateTopic(ctx context.Context, config kafka.TopicConfig) error
- func (c *BrokerAdminClient) DeleteACLs(ctx context.Context, filters []kafka.DeleteACLsFilter) (*kafka.DeleteACLsResponse, error)
- func (c *BrokerAdminClient) GetACLs(ctx context.Context, filter kafka.ACLFilter) ([]ACLInfo, error)
- func (c *BrokerAdminClient) GetAllTopicsMetadata(ctx context.Context) (*kafka.MetadataResponse, error)
- func (c *BrokerAdminClient) GetBrokerIDs(ctx context.Context) ([]int, error)
- func (c *BrokerAdminClient) GetBrokers(ctx context.Context, ids []int) ([]BrokerInfo, error)
- func (c *BrokerAdminClient) GetClusterID(ctx context.Context) (string, error)
- func (c *BrokerAdminClient) GetConnector() *Connector
- func (c *BrokerAdminClient) GetControllerID(ctx context.Context) (int, error)
- func (c *BrokerAdminClient) GetSupportedFeatures() SupportedFeatures
- func (c *BrokerAdminClient) GetTopic(ctx context.Context, name string, detailed bool) (TopicInfo, error)
- func (c *BrokerAdminClient) GetTopicNames(ctx context.Context) ([]string, error)
- func (c *BrokerAdminClient) GetTopics(ctx context.Context, names []string, detailed bool) ([]TopicInfo, error)
- func (c *BrokerAdminClient) GetUsers(ctx context.Context, names []string) ([]UserInfo, error)
- func (c *BrokerAdminClient) LockHeld(ctx context.Context, path string) (bool, error)
- func (c *BrokerAdminClient) RunLeaderElection(ctx context.Context, topic string, partitions []int) error
- func (c *BrokerAdminClient) UpdateBrokerConfig(ctx context.Context, id int, configEntries []kafka.ConfigEntry, overwrite bool) ([]string, error)
- func (c *BrokerAdminClient) UpdateTopicConfig(ctx context.Context, name string, configEntries []kafka.ConfigEntry, ...) ([]string, error)
- func (c *BrokerAdminClient) UpsertUser(ctx context.Context, user kafka.UserScramCredentialsUpsertion) error
- type BrokerAdminClientConfig
- type BrokerInfo
- type BrokerThrottle
- type Client
- type Connector
- type ConnectorConfig
- type CredentialInfo
- type PartitionAssignment
- type PartitionInfo
- type PartitionLeaderState
- type PartitionStatus
- type PartitionStatusInfo
- type PartitionThrottle
- type PatternType
- type ResourceType
- type SASLConfig
- type SASLMechanism
- type ScramMechanism
- type SupportedFeatures
- type TLSConfig
- type TopicInfo
- func (t TopicInfo) AllLeadersCorrect() bool
- func (t TopicInfo) AllReplicasInSync() bool
- func (t TopicInfo) IsThrottled() bool
- func (t TopicInfo) MaxISR() int
- func (t TopicInfo) MaxReplication() int
- func (t TopicInfo) OutOfSyncPartitions(subset []int) []PartitionInfo
- func (t TopicInfo) PartitionIDs() []int
- func (t TopicInfo) RackCounts(brokerRacks map[int]string) (int, int, error)
- func (t TopicInfo) Retention() time.Duration
- func (t TopicInfo) ToAssignments() []PartitionAssignment
- func (t TopicInfo) WrongLeaderPartitions(subset []int) []PartitionInfo
- type UserInfo
- type ZKAdminClient
- func (c *ZKAdminClient) AcquireLock(ctx context.Context, path string) (zk.Lock, error)
- func (c *ZKAdminClient) AddPartitions(ctx context.Context, topic string, newAssignments []PartitionAssignment) error
- func (c *ZKAdminClient) AssignPartitions(ctx context.Context, topic string, assignments []PartitionAssignment) error
- func (c *ZKAdminClient) Close() error
- func (c *ZKAdminClient) CreateACLs(ctx context.Context, acls []kafka.ACLEntry) error
- func (c *ZKAdminClient) CreateTopic(ctx context.Context, config kafka.TopicConfig) error
- func (c *ZKAdminClient) DeleteACLs(ctx context.Context, filters []kafka.DeleteACLsFilter) (*kafka.DeleteACLsResponse, error)
- func (c *ZKAdminClient) GetACLs(ctx context.Context, filter kafka.ACLFilter) ([]ACLInfo, error)
- func (c *ZKAdminClient) GetAllTopicsMetadata(ctx context.Context) (*kafka.MetadataResponse, error)
- func (c *ZKAdminClient) GetBrokerIDs(ctx context.Context) ([]int, error)
- func (c *ZKAdminClient) GetBrokers(ctx context.Context, ids []int) ([]BrokerInfo, error)
- func (c *ZKAdminClient) GetClusterID(ctx context.Context) (string, error)
- func (c *ZKAdminClient) GetConnector() *Connector
- func (c *ZKAdminClient) GetControllerID(ctx context.Context) (int, error)
- func (c *ZKAdminClient) GetSupportedFeatures() SupportedFeatures
- func (c *ZKAdminClient) GetTopic(ctx context.Context, name string, detailed bool) (TopicInfo, error)
- func (c *ZKAdminClient) GetTopicNames(ctx context.Context) ([]string, error)
- func (c *ZKAdminClient) GetTopics(ctx context.Context, names []string, detailed bool) ([]TopicInfo, error)
- func (c *ZKAdminClient) GetUsers(ctx context.Context, names []string) ([]UserInfo, error)
- func (c *ZKAdminClient) LockHeld(ctx context.Context, path string) (bool, error)
- func (c *ZKAdminClient) RunLeaderElection(ctx context.Context, topic string, partitions []int) error
- func (c *ZKAdminClient) UpdateBrokerConfig(ctx context.Context, id int, configEntries []kafka.ConfigEntry, overwrite bool) ([]string, error)
- func (c *ZKAdminClient) UpdateTopicConfig(ctx context.Context, name string, configEntries []kafka.ConfigEntry, ...) ([]string, error)
- func (c *ZKAdminClient) UpsertUser(ctx context.Context, user kafka.UserScramCredentialsUpsertion) error
- type ZKAdminClientConfig
Constants ¶
const ( // RetentionKey is the config key used for topic time retention. RetentionKey = "retention.ms" // LeaderThrottledKey is the config key for the leader throttle rate. LeaderThrottledKey = "leader.replication.throttled.rate" // FollowerThrottledKey is the config key for the follower throttle rate. FollowerThrottledKey = "follower.replication.throttled.rate" // LeaderReplicasThrottledKey is the config key for the list of leader replicas // that should be throttled. LeaderReplicasThrottledKey = "leader.replication.throttled.replicas" // FollowerReplicasThrottledKey is the config key for the list of follower replicas // that should be throttled. FollowerReplicasThrottledKey = "follower.replication.throttled.replicas" )
const (
ListenerNotFoundError kafka.Error = 72
)
Variables ¶
var ( // ErrTopicDoesNotExist is returned by admin functions when a topic that should exist // does not. ErrTopicDoesNotExist = errors.New("Topic does not exist") )
Functions ¶
func AssignmentsToReplicas ¶
func AssignmentsToReplicas(assignments []PartitionAssignment) ([][]int, error)
AssignmentsToReplicas is the inverse of ReplicasToAssignments. Used for unit tests.
func BrokerCountsPerRack ¶
func BrokerCountsPerRack(brokers []BrokerInfo) map[string]int
BrokerCountsPerRack returns a mapping of rack -> number of brokers.
func BrokerIDs ¶
func BrokerIDs(brokers []BrokerInfo) []int
BrokerIDs returns a slice of the IDs of the argument brokers.
func BrokerRacks ¶
func BrokerRacks(brokers []BrokerInfo) map[int]string
BrokerRacks returns a mapping of broker ID -> rack.
func BrokersPerRack ¶
func BrokersPerRack(brokers []BrokerInfo) map[string][]int
BrokersPerRack returns a mapping of rack -> broker IDs.
func CheckAssignments ¶
func CheckAssignments(assignments []PartitionAssignment) error
CheckAssignments does some basic sanity checks on the assignments that are passed into an Assigner or extender so that we can fail early if something is obviously wrong.
func DistinctRacks ¶
func DistinctRacks(brokers []BrokerInfo) []string
DistinctRacks returns a sorted slice of all the distinct racks in the cluster.
func FormatACLInfo ¶ added in v1.12.0
FormatACLInfo formats an ACLInfo struct as a string, using the string version of all the fields.
func FormatACLs ¶ added in v1.10.3
FormatACLs creates a pretty table that lists the details of the argument acls.
func FormatAssignentDiffs ¶
func FormatAssignentDiffs( curr []PartitionAssignment, desired []PartitionAssignment, brokers []BrokerInfo, ) string
FormatAssignentDiffs generates a pretty table that shows the before and after states of a partition replica and/or leader update.
func FormatBrokerMaxPartitions ¶
func FormatBrokerMaxPartitions( curr []PartitionAssignment, desired []PartitionAssignment, brokers []BrokerInfo, ) string
FormatBrokerMaxPartitions generates a pretty table that shows the total number of partitions that each broker is involved in for a diff. It's used to evaluate the potential extra load that could occur on brokers during a migration.
func FormatBrokerRackReplicas ¶
func FormatBrokerRackReplicas(brokers []BrokerInfo, topics []TopicInfo) string
FormatBrokerRackReplicas creates a pretty table that shows how many replicas are in each position (i.e., leader, second, third) by rack across all topics. Useful for showing total-topic balance.
func FormatBrokerReplicas ¶
func FormatBrokerReplicas(brokers []BrokerInfo, topics []TopicInfo) string
FormatBrokerReplicas creates a pretty table that shows how many replicas are in each position (i.e., leader, second, third) by broker across all topics. Useful for showing total-topic balance.
func FormatBrokers ¶
func FormatBrokers(brokers []BrokerInfo, full bool) string
FormatBrokers creates a pretty table from a list of brokers.
func FormatBrokersPerRack ¶
func FormatBrokersPerRack(brokers []BrokerInfo) string
FormatBrokersPerRack creates a pretty table that shows the number of brokers per rack.
func FormatClusterID ¶ added in v1.14.0
FormatClusterID creates a pretty table for cluster ID.
func FormatConfig ¶
FormatConfig creates a pretty table with all of the keys and values in a topic or broker config.
func FormatControllerID ¶ added in v1.14.0
FormatControllerID creates a pretty table for controller broker.
func FormatTopicLeadersPerRack ¶
func FormatTopicLeadersPerRack(topic TopicInfo, brokers []BrokerInfo) string
FormatTopicLeadersPerRack creates a pretty table that shows the number of partitions with a leader in each rack.
func FormatTopicPartitions ¶
func FormatTopicPartitions(partitions []PartitionInfo, brokers []BrokerInfo) string
FormatTopicPartitions creates a pretty table with information on all of the partitions for a topic.
func FormatTopics ¶
func FormatTopics(topics []TopicInfo, brokers []BrokerInfo, full bool) string
FormatTopics creates a pretty table that lists the details of the argument topics.
func FormatTopicsPartitions ¶ added in v1.10.3
func FormatTopicsPartitions( topicsPartitionsStatusInfo map[string][]PartitionStatusInfo, brokers []BrokerInfo, ) string
FormatTopicsPartitions creates a pretty table with information on all of the partitions for topics.
func FormatTopicsPartitionsSummary ¶ added in v1.10.3
func FormatTopicsPartitionsSummary( topicsPartitionsStatusSummary map[string]map[PartitionStatus][]int, ) string
FormatTopicsPartitionsSummary creates a pretty table with summary of the partitions for topics.
func FormatUsers ¶ added in v1.10.3
FormatUsers creates a pretty table that lists the details of the argument users.
func GetAllTopicNamesFromMetadata ¶ added in v1.10.3
func GetKafkaCredentials ¶ added in v1.13.0
func GetKafkaCredentials(svc secretsmanageriface.SecretsManagerAPI, secretArn string) (credentials, error)
func GetTopicsPartitionsStatusInfo ¶ added in v1.10.3
func GetTopicsPartitionsStatusInfo( metadata *kafka.MetadataResponse, topics []string, status PartitionStatus, ) map[string][]PartitionStatusInfo
Get the partition status info for specified topics
func GetTopicsPartitionsStatusSummary ¶ added in v1.10.3
func GetTopicsPartitionsStatusSummary( metadata *kafka.MetadataResponse, topics []string, status PartitionStatus, ) (map[string]map[PartitionStatus][]int, int, int, int)
Get the partition status summary
func GetValidTopicNamesFromMetadata ¶ added in v1.10.3
func GetValidTopicNamesFromMetadata( topics []string, metadata *kafka.MetadataResponse, ) map[string]bool
given an input of topics, returns topics that exist in the cluster
func HasLeaders ¶
HasLeaders returns whether at least one partition in the argument topics has a non-zero leader set. Used for formatting purposes.
func LeadersPerRack ¶
func LeadersPerRack(brokers []BrokerInfo, topic TopicInfo) map[string]int
LeadersPerRack returns a mapping of rack -> number of partitions with a leader in that rack.
func MaxPartitionsPerBroker ¶
func MaxPartitionsPerBroker( allAssignments ...[]PartitionAssignment, ) map[int]int
MaxPartitionsPerBroker calculates the number of partitions that each broker may need to handle during a migration.
func MaxReplication ¶
MaxReplication returns the maximum amount of replication across all partitions in the argument topics.
func NewLeaderPartitions ¶
func NewLeaderPartitions( current []PartitionAssignment, desired []PartitionAssignment) []int
NewLeaderPartitions returns the partition IDs which will have new leaders given the current and desired assignments.
func ParseBrokerThrottles ¶
func ParseBrokerThrottles(brokers []BrokerInfo) ( []BrokerThrottle, []BrokerThrottle, error, )
ParseBrokerThrottles returns slices of the leader and follower throttles for the argument brokers.
func ParsePartitionThrottles ¶
func ParsePartitionThrottles(topic TopicInfo) ( []PartitionThrottle, []PartitionThrottle, error, )
ParsePartitionThrottles returns slices of the leader and follower partition throttles for the argument topic.
func PartitionIDs ¶
func PartitionIDs(partitions []PartitionInfo) []int
PartitionIDs returns the IDs from the argument partitions.
func PartitionThrottleConfigEntries ¶
func PartitionThrottleConfigEntries( leaderThrottles []PartitionThrottle, followerThrottles []PartitionThrottle, ) []kafka.ConfigEntry
PartitionThrottleConfigEntries generates the topic config entries for the provided leader and follower throttles.
func SameBrokers ¶
func SameBrokers( a PartitionAssignment, b PartitionAssignment, ) bool
SameBrokers returns whether two PartitionAssignments have the same brokers.
func ThrottledBrokerIDs ¶
func ThrottledBrokerIDs(brokers []BrokerInfo) []int
ThrottledBrokerIDs returns a slice of the IDs of the subset of argument brokers that have throttles on them.
func ThrottledTopicNames ¶
ThrottledTopicNames returns the names of topics in the argument slice that have throttles on them.
Types ¶
type ACLInfo ¶ added in v1.10.3
type ACLInfo struct { ResourceType ResourceType `json:"resourceType"` ResourceName string `json:"resourceName"` PatternType PatternType `json:"patternType"` Principal string `json:"principal"` Host string `json:"host"` Operation ACLOperationType `json:"operation"` PermissionType ACLPermissionType `json:"permissionType"` }
PartitionInfo represents the information stored about an ACL in zookeeper.
type ACLOperationType ¶ added in v1.10.3
type ACLOperationType kafka.ACLOperationType
ACLOperationType presents the Kafka operation type. We need to subtype this to be able to define methods to satisfy the Value interface from Cobra so we can use it as a Cobra flag.
func (*ACLOperationType) Set ¶ added in v1.10.3
func (o *ACLOperationType) Set(v string) error
Set is used by Cobra to set the value of a variable from a Cobra flag.
func (*ACLOperationType) String ¶ added in v1.10.3
func (o *ACLOperationType) String() string
String is used both by fmt.Print and by Cobra in help text.
func (*ACLOperationType) Type ¶ added in v1.10.3
func (o *ACLOperationType) Type() string
Type is used by Cobra in help text.
type ACLPermissionType ¶ added in v1.10.3
type ACLPermissionType kafka.ACLPermissionType
ACLPermissionType presents the Kafka operation type. We need to subtype this to be able to define methods to satisfy the Value interface from Cobra so we can use it as a Cobra flag.
func (*ACLPermissionType) Set ¶ added in v1.10.3
func (p *ACLPermissionType) Set(v string) error
Set is used by Cobra to set the value of a variable from a Cobra flag.
func (*ACLPermissionType) String ¶ added in v1.10.3
func (p *ACLPermissionType) String() string
String is used both by fmt.Print and by Cobra in help text.
func (*ACLPermissionType) Type ¶ added in v1.10.3
func (p *ACLPermissionType) Type() string
Type is used by Cobra in help text.
type AssignmentDiff ¶
type AssignmentDiff struct { PartitionID int Old PartitionAssignment New PartitionAssignment }
AssignmentDiff represents the diff in a single partition reassignment.
func AssignmentDiffs ¶
func AssignmentDiffs( current []PartitionAssignment, desired []PartitionAssignment, ) []AssignmentDiff
AssignmentDiffs returns the diffs implied by the argument current and desired PartitionAssignments. Used for displaying diffs to user.
type BrokerAdminClient ¶ added in v1.0.0
type BrokerAdminClient struct {
// contains filtered or unexported fields
}
BrokerAdminClient is a Client implementation that only uses broker APIs, without any zookeeper access.
func NewBrokerAdminClient ¶ added in v1.0.0
func NewBrokerAdminClient( ctx context.Context, config BrokerAdminClientConfig, ) (*BrokerAdminClient, error)
NewBrokerAdminClient constructs a new BrokerAdminClient instance.
func (*BrokerAdminClient) AcquireLock ¶ added in v1.0.0
AcquireLock acquires a lock that can be used to prevent simultaneous changes to a topic. NOTE: Not implemented for broker-based clients.
func (*BrokerAdminClient) AddPartitions ¶ added in v1.0.0
func (c *BrokerAdminClient) AddPartitions( ctx context.Context, topic string, newAssignments []PartitionAssignment, ) error
AddPartitions extends a topic by adding one or more new partitions to it.
func (*BrokerAdminClient) AssignPartitions ¶ added in v1.0.0
func (c *BrokerAdminClient) AssignPartitions( ctx context.Context, topic string, assignments []PartitionAssignment, ) error
AssignPartitions sets the replica broker IDs for one or more partitions in a topic.
func (*BrokerAdminClient) Close ¶ added in v1.0.0
func (c *BrokerAdminClient) Close() error
Close closes the client.
func (*BrokerAdminClient) CreateACLs ¶ added in v1.10.3
func (c *BrokerAdminClient) CreateACLs( ctx context.Context, acls []kafka.ACLEntry, ) error
CreateACLs creates ACLs in the cluster.
func (*BrokerAdminClient) CreateTopic ¶ added in v1.0.0
func (c *BrokerAdminClient) CreateTopic( ctx context.Context, config kafka.TopicConfig, ) error
CreateTopic creates a topic in the cluster.
func (*BrokerAdminClient) DeleteACLs ¶ added in v1.12.0
func (c *BrokerAdminClient) DeleteACLs( ctx context.Context, filters []kafka.DeleteACLsFilter, ) (*kafka.DeleteACLsResponse, error)
DeleteACLs deletes ACLs in the cluster.
func (*BrokerAdminClient) GetACLs ¶ added in v1.10.3
func (c *BrokerAdminClient) GetACLs( ctx context.Context, filter kafka.ACLFilter, ) ([]ACLInfo, error)
GetACLs gets full information about each ACL in the cluster.
func (*BrokerAdminClient) GetAllTopicsMetadata ¶ added in v1.10.3
func (c *BrokerAdminClient) GetAllTopicsMetadata( ctx context.Context, ) (*kafka.MetadataResponse, error)
func (*BrokerAdminClient) GetBrokerIDs ¶ added in v1.0.0
func (c *BrokerAdminClient) GetBrokerIDs(ctx context.Context) ([]int, error)
GetBrokerIDs get the IDs of all brokers in the cluster.
func (*BrokerAdminClient) GetBrokers ¶ added in v1.0.0
func (c *BrokerAdminClient) GetBrokers(ctx context.Context, ids []int) ( []BrokerInfo, error, )
GetBrokers gets information about all brokers in the cluster.
func (*BrokerAdminClient) GetClusterID ¶ added in v1.0.0
func (c *BrokerAdminClient) GetClusterID(ctx context.Context) (string, error)
GetClusterID gets the ID of the cluster.
func (*BrokerAdminClient) GetConnector ¶ added in v1.0.0
func (c *BrokerAdminClient) GetConnector() *Connector
GetConnector gets the Connector instance for this cluster.
func (*BrokerAdminClient) GetControllerID ¶ added in v1.14.0
func (c *BrokerAdminClient) GetControllerID(ctx context.Context) ( int, error, )
GetControllerID gets ID of the active controller broker
func (*BrokerAdminClient) GetSupportedFeatures ¶ added in v1.0.0
func (c *BrokerAdminClient) GetSupportedFeatures() SupportedFeatures
GetSupportedFeatures gets the features supported by the cluster for this client.
func (*BrokerAdminClient) GetTopic ¶ added in v1.0.0
func (c *BrokerAdminClient) GetTopic( ctx context.Context, name string, detailed bool, ) (TopicInfo, error)
GetTopic gets the details of a single topic in the cluster.
func (*BrokerAdminClient) GetTopicNames ¶ added in v1.0.0
func (c *BrokerAdminClient) GetTopicNames(ctx context.Context) ([]string, error)
GetTopicNames gets just the names of each topic in the cluster.
func (*BrokerAdminClient) GetTopics ¶ added in v1.0.0
func (c *BrokerAdminClient) GetTopics( ctx context.Context, names []string, detailed bool, ) ([]TopicInfo, error)
GetTopics gets full information about each topic in the cluster.
func (*BrokerAdminClient) LockHeld ¶ added in v1.0.0
LockHeld returns whether a lock is currently held for the given path. NOTE: Not implemented for broker-based clients.
func (*BrokerAdminClient) RunLeaderElection ¶ added in v1.0.0
func (c *BrokerAdminClient) RunLeaderElection( ctx context.Context, topic string, partitions []int, ) error
RunLeaderElection triggers a leader election for one or more partitions in a topic.
func (*BrokerAdminClient) UpdateBrokerConfig ¶ added in v1.0.0
func (c *BrokerAdminClient) UpdateBrokerConfig( ctx context.Context, id int, configEntries []kafka.ConfigEntry, overwrite bool, ) ([]string, error)
UpdateBrokerConfig updates the configuration for the argument broker. It returns the config keys that were updated.
func (*BrokerAdminClient) UpdateTopicConfig ¶ added in v1.0.0
func (c *BrokerAdminClient) UpdateTopicConfig( ctx context.Context, name string, configEntries []kafka.ConfigEntry, overwrite bool, ) ([]string, error)
UpdateTopicConfig updates the configuration for the argument topic. It returns the config keys that were updated.
func (*BrokerAdminClient) UpsertUser ¶ added in v1.10.3
func (c *BrokerAdminClient) UpsertUser( ctx context.Context, user kafka.UserScramCredentialsUpsertion, ) error
type BrokerAdminClientConfig ¶ added in v1.0.0
type BrokerAdminClientConfig struct { ConnectorConfig ReadOnly bool ExpectedClusterID string }
BrokerAdminClientConfig contains the configuration settings to construct a BrokerAdminClient instance.
type BrokerInfo ¶
type BrokerInfo struct { ID int `json:"id"` Endpoints []string `json:"endpoints"` Host string `json:"host"` Port int32 `json:"port"` InstanceID string `json:"instanceID"` AvailabilityZone string `json:"availabilityZone"` Rack string `json:"rack"` InstanceType string `json:"instanceType"` Version int `json:"version"` Timestamp time.Time `json:"timestamp"` Config map[string]string `json:"config"` }
BrokerInfo represents the information stored about a broker in zookeeper.
func (BrokerInfo) Addr ¶
func (b BrokerInfo) Addr() string
Addr returns the address of the current BrokerInfo.
func (BrokerInfo) IsThrottled ¶
func (b BrokerInfo) IsThrottled() bool
IsThrottled determines whether the broker has any throttles in its config.
type BrokerThrottle ¶
BrokerThrottle represents a throttle being applied to a single broker.
func BrokerThrottles ¶
func BrokerThrottles( leaderThrottles []PartitionThrottle, followerThrottles []PartitionThrottle, throttleBytes int64, ) []BrokerThrottle
BrokerThrottles returns a slice of BrokerThrottles that we should apply. It's currently just set from the union of the leader and follower brokers (matching the behavior of bin/kafka-reassign-partitions.sh).
func (BrokerThrottle) ConfigEntries ¶
func (b BrokerThrottle) ConfigEntries() []kafka.ConfigEntry
ConfigEntries returns the kafka config entries associated with this broker throttle.
type Client ¶
type Client interface { // GetClusterID gets the ID of the cluster. GetClusterID(ctx context.Context) (string, error) // GetBrokers gets information about all brokers in the cluster. GetBrokers(ctx context.Context, ids []int) ([]BrokerInfo, error) // GetControllerID get the active controller broker ID in the cluster. GetControllerID(ctx context.Context) (int, error) // GetBrokerIDs get the IDs of all brokers in the cluster. GetBrokerIDs(ctx context.Context) ([]int, error) // GetConnector gets the Connector instance for this cluster. GetConnector() *Connector // GetTopics gets full information about each topic in the cluster. GetTopics( ctx context.Context, names []string, detailed bool, ) ([]TopicInfo, error) // GetTopicNames gets just the names of each topic in the cluster. GetTopicNames(ctx context.Context) ([]string, error) // GetTopic gets the details of a single topic in the cluster. GetTopic( ctx context.Context, name string, detailed bool, ) (TopicInfo, error) // GetACLs gets full information about each ACL in the cluster. GetACLs( ctx context.Context, filter kafka.ACLFilter, ) ([]ACLInfo, error) // GetAllTopicsMetadata performs kafka-go metadata call to get topic information GetAllTopicsMetadata(ctx context.Context) (*kafka.MetadataResponse, error) // GetUsers gets information about users in the cluster. GetUsers( ctx context.Context, names []string, ) ([]UserInfo, error) // UpdateTopicConfig updates the configuration for the argument topic. It returns the config // keys that were updated. UpdateTopicConfig( ctx context.Context, name string, configEntries []kafka.ConfigEntry, overwrite bool, ) ([]string, error) // UpdateBrokerConfig updates the configuration for the argument broker. It returns the config // keys that were updated. UpdateBrokerConfig( ctx context.Context, id int, configEntries []kafka.ConfigEntry, overwrite bool, ) ([]string, error) // CreateTopic creates a topic in the cluster. CreateTopic( ctx context.Context, config kafka.TopicConfig, ) error // CreateACLs creates ACLs in the cluster. CreateACLs( ctx context.Context, acls []kafka.ACLEntry, ) error // DeleteACLs deletes ACLs in the cluster. DeleteACLs( ctx context.Context, filters []kafka.DeleteACLsFilter, ) (*kafka.DeleteACLsResponse, error) // UpsertUser creates or updates an user in zookeeper. UpsertUser( ctx context.Context, user kafka.UserScramCredentialsUpsertion, ) error // AssignPartitions sets the replica broker IDs for one or more partitions in a topic. AssignPartitions( ctx context.Context, topic string, assignments []PartitionAssignment, ) error // AddPartitions extends a topic by adding one or more new partitions to it. AddPartitions( ctx context.Context, topic string, newAssignments []PartitionAssignment, ) error // RunLeaderElection triggers a leader election for one or more partitions in a topic. RunLeaderElection( ctx context.Context, topic string, partitions []int, ) error // AcquireLock acquires a lock that can be used to prevent simultaneous changes to a topic. AcquireLock(ctx context.Context, path string) (zk.Lock, error) // LockHeld returns whether a lock is currently held for the given path. LockHeld(ctx context.Context, path string) (bool, error) // GetSupportedFeatures gets the features supported by the cluster for this client. GetSupportedFeatures() SupportedFeatures // Close closes the client. Close() error }
Client is an interface for interacting with a cluster for administrative tasks.
type Connector ¶ added in v1.0.0
type Connector struct { Config ConnectorConfig Dialer *kafka.Dialer KafkaClient *kafka.Client }
Connector is a wrapper around the low-level, kafka-go dialer and client.
func NewConnector ¶ added in v1.0.0
func NewConnector(config ConnectorConfig) (*Connector, error)
NewConnector contructs a new Connector instance given the argument config.
type ConnectorConfig ¶ added in v1.0.0
type ConnectorConfig struct { BrokerAddr string TLS TLSConfig SASL SASLConfig ConnTimeout time.Duration }
ConnectorConfig contains the configuration used to contruct a connector.
type CredentialInfo ¶ added in v1.10.3
type CredentialInfo struct { ScramMechanism ScramMechanism `json:"scramMechanism"` Iterations int `json:"iterations"` }
CredentialInfo represents read only information about a users credentials in zookeeper.
type PartitionAssignment ¶
PartitionAssignment contains the actual or desired assignment of replicas in a topic partition.
func AssignmentsToUpdate ¶
func AssignmentsToUpdate( current []PartitionAssignment, desired []PartitionAssignment, ) []PartitionAssignment
AssignmentsToUpdate returns the subset of assignments that need to be updated given the current and desired states.
func CopyAssignments ¶
func CopyAssignments( curr []PartitionAssignment, ) []PartitionAssignment
CopyAssignments returns a deep copy of the argument PartitionAssignment slice.
func ReplicasToAssignments ¶
func ReplicasToAssignments( replicaSlices [][]int, ) []PartitionAssignment
ReplicasToAssignments converts a slice of slices to a slice of PartitionAssignments, assuming that the argument slices are in partition order. Used for unit tests.
func (PartitionAssignment) Copy ¶
func (a PartitionAssignment) Copy() PartitionAssignment
Copy returns a deep copy of this PartitionAssignment.
func (PartitionAssignment) DistinctRacks ¶
func (a PartitionAssignment) DistinctRacks( brokerRacks map[int]string, ) map[string]struct{}
DistinctRacks returns a map of the distinct racks in this PartitionAssignment.
func (PartitionAssignment) Index ¶
func (a PartitionAssignment) Index(replica int) int
Index returns the index of the argument replica, or -1 if it can't be found.
type PartitionInfo ¶
type PartitionInfo struct { Topic string `json:"topic"` ID int `json:"ID"` Leader int `json:"leader"` Version int `json:"version"` Replicas []int `json:"replicas"` ISR []int `json:"isr"` ControllerEpoch int `json:"controllerEpoch"` LeaderEpoch int `json:"leaderEpoch"` }
PartitionInfo represents the information stored about a topic partition in zookeeper.
type PartitionLeaderState ¶ added in v1.10.3
type PartitionLeaderState string
const ( CorrectLeader PartitionLeaderState = "OK" WrongLeader PartitionLeaderState = "Wrong" )
type PartitionStatus ¶ added in v1.10.3
type PartitionStatus string
const ( Ok PartitionStatus = "OK" Offline PartitionStatus = "Offline" UnderReplicated PartitionStatus = "Under-replicated" )
func GetPartitionStatus ¶ added in v1.10.3
func GetPartitionStatus(partition kafka.Partition) PartitionStatus
Get the Partition Status - ok - offline - under-replicated
NOTE: partition is 1. offline - if ListenerNotFound Error observed for leader partition 2. underreplicated - if number of isrs are lesser than the replicas
func StringToPartitionStatus ¶ added in v1.10.3
func StringToPartitionStatus(status string) (PartitionStatus, bool)
Check if a string is valid PartitionStatus type
func (*PartitionStatus) Set ¶ added in v1.10.3
func (p *PartitionStatus) Set(v string) error
Set is used by Cobra to set the value of a variable from a Cobra flag.
func (*PartitionStatus) String ¶ added in v1.10.3
func (p *PartitionStatus) String() string
String is used by Cobra in help text.
func (*PartitionStatus) Type ¶ added in v1.10.3
func (p *PartitionStatus) Type() string
Type is used by Cobra in help text.
type PartitionStatusInfo ¶ added in v1.10.3
type PartitionStatusInfo struct { Topic string Partition kafka.Partition Status PartitionStatus LeaderState PartitionLeaderState }
type PartitionThrottle ¶
PartitionThrottle represents a throttle being applied to a single partition, broker combination.
func FollowerPartitionThrottles ¶
func FollowerPartitionThrottles( curr []PartitionAssignment, desired []PartitionAssignment, ) []PartitionThrottle
FollowerPartitionThrottles returns a slice of PartitionThrottles that we should apply on the follower side.
See https://kafka.apache.org/0101/documentation.html for discussion on how these should be applied.
func LeaderPartitionThrottles ¶
func LeaderPartitionThrottles( curr []PartitionAssignment, desired []PartitionAssignment, ) []PartitionThrottle
LeaderPartitionThrottles returns a slice of PartitionThrottles that we should apply on the leader side.
See https://kafka.apache.org/0101/documentation.html for discussion on how these should be applied.
func ParsePartitionThrottleStr ¶
func ParsePartitionThrottleStr(valuesStr string) ([]PartitionThrottle, error)
ParsePartitionThrottleStr converts a throttle config string from zk into a slice of PartitionThrottle structs.
func (PartitionThrottle) String ¶
func (p PartitionThrottle) String() string
type PatternType ¶ added in v1.10.3
type PatternType kafka.PatternType
PatternType presents the Kafka pattern type. We need to subtype this to be able to define methods to satisfy the Value interface from Cobra so we can use it as a Cobra flag.
func (*PatternType) Set ¶ added in v1.10.3
func (p *PatternType) Set(v string) error
Set is used by Cobra to set the value of a variable from a Cobra flag.
func (*PatternType) String ¶ added in v1.10.3
func (p *PatternType) String() string
String is used both by fmt.Print and by Cobra in help text.
func (*PatternType) Type ¶ added in v1.10.3
func (r *PatternType) Type() string
Type is used by Cobra in help text.
type ResourceType ¶ added in v1.10.3
type ResourceType kafka.ResourceType
ResourceType presents the Kafka resource type. We need to subtype this to be able to define methods to satisfy the Value interface from Cobra so we can use it as a Cobra flag.
func (*ResourceType) Set ¶ added in v1.10.3
func (r *ResourceType) Set(v string) error
Set is used by Cobra to set the value of a variable from a Cobra flag.
func (*ResourceType) String ¶ added in v1.10.3
func (r *ResourceType) String() string
String is used both by fmt.Print and by Cobra in help text.
func (*ResourceType) Type ¶ added in v1.10.3
func (r *ResourceType) Type() string
Type is used by Cobra in help text.
type SASLConfig ¶ added in v1.0.0
type SASLConfig struct { Enabled bool Mechanism SASLMechanism Username string Password string SecretsManagerArn string }
SASLConfig stores the SASL-related configuration for a connection.
type SASLMechanism ¶ added in v1.3.0
type SASLMechanism string
SASLMechanism is the name of a SASL mechanism that will be used for client authentication.
const ( SASLMechanismAWSMSKIAM SASLMechanism = "aws-msk-iam" SASLMechanismPlain SASLMechanism = "plain" SASLMechanismScramSHA256 SASLMechanism = "scram-sha-256" SASLMechanismScramSHA512 SASLMechanism = "scram-sha-512" )
func SASLNameToMechanism ¶ added in v1.3.0
func SASLNameToMechanism(name string) (SASLMechanism, error)
SASLNameToMechanism converts the argument SASL mechanism name string to a valid instance of the SASLMechanism enum.
type ScramMechanism ¶ added in v1.10.3
type ScramMechanism kafka.ScramMechanism
ScramMechanism represents the ScramMechanism used for a users credential in zookeeper.
func (*ScramMechanism) String ¶ added in v1.10.3
func (s *ScramMechanism) String() string
type SupportedFeatures ¶ added in v1.0.0
type SupportedFeatures struct { // Reads indicates whether the client supports reading basic cluster information // (metadata, configs, etc.). Reads bool // Applies indicates whether the client supports the functionality required for applying // (e.g., changing configs, electing leaders, etc.). Applies bool // Locks indicates whether the client supports locking. Locks bool // DynamicBrokerConfigs indicates whether the client can return dynamic broker configs // like leader.replication.throttled.rate. DynamicBrokerConfigs bool // ACLs indicates whether the client supports access control levels. ACLs bool // Users indicates whether the client supports SASL Users. Users bool }
SupportedFeatures provides a summary of what an admin client supports.
type TLSConfig ¶ added in v1.0.0
type TLSConfig struct { Enabled bool CertPath string KeyPath string CACertPath string ServerName string SkipVerify bool }
TLSConfig stores the TLS-related configuration for a connection.
type TopicInfo ¶
type TopicInfo struct { Name string `json:"name"` Config map[string]string `json:"config"` Partitions []PartitionInfo `json:"partitions"` Version int `json:"version"` }
TopicInfo represents the information stored about a topic in zookeeper.
func (TopicInfo) AllLeadersCorrect ¶
AllLeadersCorrect returns whether leader == replicas[0] for all partitions.
func (TopicInfo) AllReplicasInSync ¶
AllReplicasInSync returns whether all partitions have ISR == replicas (ignoring order).
func (TopicInfo) IsThrottled ¶
IsThrottled determines whether the topic has any throttles in its config.
func (TopicInfo) MaxISR ¶
MaxISR returns the maximum number of in-sync replicas across all partitions in a topic.
func (TopicInfo) MaxReplication ¶
MaxReplication returns the maximum number of replicas across all partitions in a topic.
func (TopicInfo) OutOfSyncPartitions ¶
func (t TopicInfo) OutOfSyncPartitions(subset []int) []PartitionInfo
OutOfSyncPartitions returns the partitions for which ISR != replicas (ignoring order).
func (TopicInfo) PartitionIDs ¶
PartitionIDs returns an ordered slice of partition IDs for a topic.
func (TopicInfo) RackCounts ¶
RackCounts returns the minimum and maximum distinct rack counts across all partitions in a topic.
func (TopicInfo) Retention ¶
Retention returns the retention duration implied by a topic config. If unset, it returns 0.
func (TopicInfo) ToAssignments ¶
func (t TopicInfo) ToAssignments() []PartitionAssignment
ToAssignments converts a topic to a slice of partition assignments.
func (TopicInfo) WrongLeaderPartitions ¶
func (t TopicInfo) WrongLeaderPartitions(subset []int) []PartitionInfo
WrongLeaderPartitions returns the partitions where leader != replicas[0].
type UserInfo ¶ added in v1.10.3
type UserInfo struct { Name string `json:"name"` CredentialInfos []CredentialInfo `json:"credentialInfos"` }
UserInfo represents the information stored about a user in zookeeper.
type ZKAdminClient ¶ added in v1.0.0
type ZKAdminClient struct { Connector *Connector // contains filtered or unexported fields }
ZKAdminClient is a general client for interacting with a kafka cluster that assumes zookeeper access. Most interactions are done via the latter, but a few (e.g., creating topics or getting the controller address) are done via the broker API instead.
func NewZKAdminClient ¶ added in v1.0.0
func NewZKAdminClient( ctx context.Context, config ZKAdminClientConfig, ) (*ZKAdminClient, error)
NewZKAdminClient creates and returns a new Client instance.
func (*ZKAdminClient) AcquireLock ¶ added in v1.0.0
AcquireLock acquires and returns a lock from the underlying zookeeper client. The Unlock method should be called on the lock when it's safe to release.
func (*ZKAdminClient) AddPartitions ¶ added in v1.0.0
func (c *ZKAdminClient) AddPartitions( ctx context.Context, topic string, newAssignments []PartitionAssignment, ) error
AddPartitions adds one or more partitions to an existing topic. Unlike AssignPartitions, this directly updates the topic's partition config in zookeeper.
func (*ZKAdminClient) AssignPartitions ¶ added in v1.0.0
func (c *ZKAdminClient) AssignPartitions( ctx context.Context, topic string, assignments []PartitionAssignment, ) error
AssignPartitions notifies the cluster to begin a partition reassignment. This should only be used for existing partitions; to create new partitions, use the AddPartitions method.
func (*ZKAdminClient) Close ¶ added in v1.0.0
func (c *ZKAdminClient) Close() error
Close closes the connections in the underlying zookeeper client.
func (*ZKAdminClient) CreateACLs ¶ added in v1.10.3
func (c *ZKAdminClient) CreateACLs( ctx context.Context, acls []kafka.ACLEntry, ) error
func (*ZKAdminClient) CreateTopic ¶ added in v1.0.0
func (c *ZKAdminClient) CreateTopic( ctx context.Context, config kafka.TopicConfig, ) error
CreateTopic creates a new topic with the argument config. It uses the topic creation API exposed on the controller broker.
func (*ZKAdminClient) DeleteACLs ¶ added in v1.12.0
func (c *ZKAdminClient) DeleteACLs( ctx context.Context, filters []kafka.DeleteACLsFilter, ) (*kafka.DeleteACLsResponse, error)
func (*ZKAdminClient) GetACLs ¶ added in v1.10.3
func (c *ZKAdminClient) GetACLs( ctx context.Context, filter kafka.ACLFilter, ) ([]ACLInfo, error)
func (*ZKAdminClient) GetAllTopicsMetadata ¶ added in v1.10.3
func (c *ZKAdminClient) GetAllTopicsMetadata( ctx context.Context, ) (*kafka.MetadataResponse, error)
func (*ZKAdminClient) GetBrokerIDs ¶ added in v1.0.0
func (c *ZKAdminClient) GetBrokerIDs(ctx context.Context) ([]int, error)
GetBrokerIDs returns a slice of all broker IDs.
func (*ZKAdminClient) GetBrokers ¶ added in v1.0.0
func (c *ZKAdminClient) GetBrokers( ctx context.Context, ids []int, ) ([]BrokerInfo, error)
GetBrokers gets information on one or more cluster brokers from zookeeper. If the argument ids is unset, then it fetches all brokers.
func (*ZKAdminClient) GetClusterID ¶ added in v1.0.0
func (c *ZKAdminClient) GetClusterID( ctx context.Context, ) (string, error)
GetClusterID gets the cluster ID from zookeeper. This ID is generated when the cluster is created and should be stable over the life of the cluster.
func (*ZKAdminClient) GetConnector ¶ added in v1.0.0
func (c *ZKAdminClient) GetConnector() *Connector
GetConnector returns the Connector instance associated with this client.
func (*ZKAdminClient) GetControllerID ¶ added in v1.14.0
func (c *ZKAdminClient) GetControllerID( ctx context.Context, ) (int, error)
GetControllerID gets ID of the active controller broker
func (*ZKAdminClient) GetSupportedFeatures ¶ added in v1.0.0
func (c *ZKAdminClient) GetSupportedFeatures() SupportedFeatures
GetSupportedFeatures returns the features that are supported by this client.
func (*ZKAdminClient) GetTopic ¶ added in v1.0.0
func (c *ZKAdminClient) GetTopic( ctx context.Context, name string, detailed bool, ) (TopicInfo, error)
GetTopic is a wrapper around GetTopics(...) for getting information about a single topic.
func (*ZKAdminClient) GetTopicNames ¶ added in v1.0.0
func (c *ZKAdminClient) GetTopicNames(ctx context.Context) ([]string, error)
GetTopicNames gets all topic names from zookeeper.
func (*ZKAdminClient) GetTopics ¶ added in v1.0.0
func (c *ZKAdminClient) GetTopics( ctx context.Context, names []string, detailed bool, ) ([]TopicInfo, error)
GetTopics gets information about one or more cluster topics from zookeeper. If the argument names is unset, then it fetches all topics. The detailed parameter determines whether the ISRs and leaders are fetched for each partition.
func (*ZKAdminClient) LockHeld ¶ added in v1.0.0
LockHeld determines whether the lock with the provided path is held (i.e., has children).
func (*ZKAdminClient) RunLeaderElection ¶ added in v1.0.0
func (c *ZKAdminClient) RunLeaderElection( ctx context.Context, topic string, partitions []int, ) error
RunLeaderElection triggers a leader election for the argument topic and partitions.
func (*ZKAdminClient) UpdateBrokerConfig ¶ added in v1.0.0
func (c *ZKAdminClient) UpdateBrokerConfig( ctx context.Context, id int, configEntries []kafka.ConfigEntry, overwrite bool, ) ([]string, error)
UpdateBrokerConfig updates the config JSON for a cluster broker and sets a change notification so the cluster brokers are notified. If overwrite is true, then it will overwrite existing config entries.
The function returns the list of keys that were modified. If overwrite is set to false, this can be used to determine the subset of entries
func (*ZKAdminClient) UpdateTopicConfig ¶ added in v1.0.0
func (c *ZKAdminClient) UpdateTopicConfig( ctx context.Context, name string, configEntries []kafka.ConfigEntry, overwrite bool, ) ([]string, error)
UpdateTopicConfig updates the config JSON for a topic and sets a change notification so that the brokers are notified. If overwrite is true, then it will overwrite existing config entries.
The function returns the list of keys that were modified. If overwrite is set to false, this can be used to determine the subset of entries that were already set.
func (*ZKAdminClient) UpsertUser ¶ added in v1.10.3
func (c *ZKAdminClient) UpsertUser( ctx context.Context, user kafka.UserScramCredentialsUpsertion, ) error
type ZKAdminClientConfig ¶ added in v1.0.0
type ZKAdminClientConfig struct { ZKAddrs []string ZKPrefix string BootstrapAddrs []string ExpectedClusterID string Sess *session.Session ReadOnly bool KafkaConnTimeout time.Duration }
ZKAdminClientConfig contains all of the parameters necessary to create a kafka admin client.