Documentation
¶
Overview ¶
Package kafkaadmin provides Kafka administrative functionality.
Index ¶
- Variables
- func NewConsumer(cfg Config) (*kafka.Consumer, error)
- type BrokerState
- type BrokerStates
- type BrokerThrottleConfig
- type Client
- func (c Client) Close()
- func (c Client) CreateTopic(ctx context.Context, cfg CreateTopicConfig) error
- func (c Client) DeleteTopic(ctx context.Context, name string) error
- func (c Client) DescribeBrokers(ctx context.Context, fullData bool) (BrokerStates, error)
- func (c Client) DescribeTopics(ctx context.Context, topics []string) (TopicStates, error)
- func (c Client) GetConfigs(ctx context.Context, kind string, names []string) (ResourceConfigs, error)
- func (c Client) GetDynamicConfigs(ctx context.Context, kind string, names []string) (ResourceConfigs, error)
- func (c Client) ListBrokers(ctx context.Context) ([]int, error)
- func (c Client) RemoveThrottle(ctx context.Context, cfg RemoveThrottleConfig) error
- func (c Client) SetThrottle(ctx context.Context, cfg SetThrottleConfig) error
- func (c Client) UnderReplicatedTopics(ctx context.Context) (TopicStates, error)
- type Config
- type CreateTopicConfig
- type ErrRemoveThrottle
- type ErrSetThrottle
- type ErrorFetchingMetadata
- type FactoryFunc
- type KafkaAdmin
- type PartitionState
- type RemoveThrottleConfig
- type ReplicaAssignment
- type ResourceConfigs
- type SetThrottleConfig
- type TopicState
- type TopicStates
Constants ¶
This section is empty.
Variables ¶
var ( // SecurityProtocolSet is the set of protocols supported to communicate with brokers SecurityProtocolSet = map[string]struct{}{"PLAINTEXT": empty, "SSL": empty, "SASL_PLAINTEXT": empty, "SASL_SSL": empty} // SASLMechanismSet is the set of mechanisms supported for client to broker authentication SASLMechanismSet = map[string]struct{}{"PLAIN": empty, "SCRAM-SHA-256": empty, "SCRAM-SHA-512": empty} )
var ( // ErrNoData is a generic error for no data available to be returned. ErrNoData = fmt.Errorf("no data returned") )
Functions ¶
Types ¶
type BrokerState ¶ added in v4.1.0
type BrokerState struct { // Key metadata from the Kafka cluster state. Host string Port int Rack string // broker.rack LogMessageFormat string // log.message.format.version InterBrokerProtocolVersion string // inter.broker.protocol.version // All metadata. FullData map[string]string }
BrokerMeta holds metadata that describes a broker.
type BrokerStates ¶ added in v4.1.0
type BrokerStates map[int]BrokerState
BrokerStates is a map of broker IDs to BrokerState.
func NewBrokerStates ¶ added in v4.1.0
func NewBrokerStates() BrokerStates
NewBrokerStates returns a BrokerStates.
type BrokerThrottleConfig ¶
BrokerThrottleConfig defines an inbound and outbound throttle rate in bytes to be applied to a broker.
type Client ¶
type Client struct { DefaultTimeoutMs int // contains filtered or unexported fields }
Client implements a KafkaAdmin.
func NewClientWithFactory ¶
func NewClientWithFactory(cfg Config, factory FactoryFunc) (*Client, error)
NewClientWithFactory returns a new admin Client using a factory func for the kafkaAdminClient
func (Client) CreateTopic ¶
func (c Client) CreateTopic(ctx context.Context, cfg CreateTopicConfig) error
CreateTopic creates a topic.
func (Client) DeleteTopic ¶
DeleteTopic deletes a topic.
func (Client) DescribeBrokers ¶ added in v4.1.0
DescribeBrokers returns a BrokerStates for all live brokers. By default, key metadata is populated for each broker's BrokerState entry. If the fullData bool is set to True, complete metadata will be included in the BrokerState.FullData field. This includes all broker configs found in the cluster state including dynamic configs.
func (Client) DescribeTopics ¶
DescribeTopics takes a []string of topic names. Topic names can be name literals or optional regex. A TopicStates is returned for all matching topics.
func (Client) GetConfigs ¶ added in v4.1.0
func (c Client) GetConfigs(ctx context.Context, kind string, names []string) (ResourceConfigs, error)
GetConfigs takes a kafka resource type (ie topic, broker) and list of names and returns a ResourceConfigs for all configurations discovered for each resource by name. Nil configs are excluded.
func (Client) GetDynamicConfigs ¶
func (c Client) GetDynamicConfigs(ctx context.Context, kind string, names []string) (ResourceConfigs, error)
GetDynamicConfigs takes a kafka resource type (ie topic, broker) and list of names and returns a ResourceConfigs for all dynamic configurations discovered for each resource by name.
func (Client) ListBrokers ¶ added in v4.1.0
ListBrokers returns a []int of all live broker IDs.
func (Client) RemoveThrottle ¶
func (c Client) RemoveThrottle(ctx context.Context, cfg RemoveThrottleConfig) error
RemoveThrottle takes a RemoveThrottleConfig that includes an optionally specified list of brokers and topics to remove all throttle configurations from.
func (Client) SetThrottle ¶
func (c Client) SetThrottle(ctx context.Context, cfg SetThrottleConfig) error
SetThrottle takes a SetThrottleConfig and sets the underlying throttle configs accordingly. A throttle is a combination of topic throttled replicas configs and broker inbound/outbound throttle configs.
func (Client) UnderReplicatedTopics ¶ added in v4.1.0
func (c Client) UnderReplicatedTopics(ctx context.Context) (TopicStates, error)
UnderReplicatedTopics returns a TopicStates that only includes under-replicated topics.
type Config ¶
type Config struct { // Required. BootstrapServers string // Misc. DefaultTimeoutMs int GroupId string SSLCALocation string SecurityProtocol string SASLMechanism string SASLUsername string SASLPassword string }
Config holds Client configuration parameters.
type CreateTopicConfig ¶
type CreateTopicConfig struct { Name string Partitions int ReplicationFactor int Config map[string]string ReplicaAssignment ReplicaAssignment }
CreateTopicConfig holds CreateTopic parameters.
type ErrRemoveThrottle ¶
type ErrRemoveThrottle struct{ Message string }
ErrRemoveThrottle is a generic error for RemoveThrottle.
func (ErrRemoveThrottle) Error ¶
func (e ErrRemoveThrottle) Error() string
type ErrSetThrottle ¶
type ErrSetThrottle struct{ Message string }
ErrSetThrottle is a generic error for SetThrottle.
func (ErrSetThrottle) Error ¶
func (e ErrSetThrottle) Error() string
type ErrorFetchingMetadata ¶
type ErrorFetchingMetadata struct{ Message string }
ErrorFetchingMetadata is an error encountered fetching Kafka cluster metadata.
func (ErrorFetchingMetadata) Error ¶
func (e ErrorFetchingMetadata) Error() string
type FactoryFunc ¶
type FactoryFunc func(conf *kafka.ConfigMap) (*kafka.AdminClient, error)
type KafkaAdmin ¶
type KafkaAdmin interface { Close() // Topics. CreateTopic(context.Context, CreateTopicConfig) error DeleteTopic(context.Context, string) error DescribeTopics(context.Context, []string) (TopicStates, error) UnderReplicatedTopics(context.Context) (TopicStates, error) // Brokers. ListBrokers(context.Context) ([]int, error) DescribeBrokers(context.Context, bool) (BrokerStates, error) // Cluster. SetThrottle(context.Context, SetThrottleConfig) error RemoveThrottle(context.Context, RemoveThrottleConfig) error GetConfigs(context.Context, string, []string) (ResourceConfigs, error) GetDynamicConfigs(context.Context, string, []string) (ResourceConfigs, error) }
KafkaAdmin interface.
type PartitionState ¶
PartitionState describes the state of a partition.
type RemoveThrottleConfig ¶
RemoveThrottleConfig holds lists of all topics and brokers to remove throttles from.
type ReplicaAssignment ¶
type ReplicaAssignment [][]int32
ReplicaAssignment is a [][]int32 of partition assignments. The outer slice index maps to the partition ID (ie index position 3 describes partition 3 for the reference topic), the inner slice is an []int32 of broker assignments.
type ResourceConfigs ¶
ResourceConfigs is a map of resource name to a map of configuration name and configuration value Example: map["my_topic"]map["retention.ms"] = "4000000"
func (ResourceConfigs) AddConfig ¶
func (rc ResourceConfigs) AddConfig(name, key, value string) error
AddConfig takes a resource name and populates the config key to the specified value.
func (ResourceConfigs) AddConfigEntry ¶
func (rc ResourceConfigs) AddConfigEntry(name string, config kafka.ConfigEntryResult) error
AddConfigEntry takes a resource name (ie a broker ID or topic name) and a kafka.ConfigEntryResult. It populates ResourceConfigs with the provided ConfigEntryResult for the respective resource by name.
type SetThrottleConfig ¶
type SetThrottleConfig struct { // Topics is a list of all topics that require throttled replica configs. Topics []string // Brokers is a mapping of broker ID to BrokerThrottleConfig. Brokers map[int]BrokerThrottleConfig }
SetThrottleConfig holds SetThrottle configs.
type TopicState ¶
type TopicState struct { Name string Partitions int32 ReplicationFactor int32 PartitionStates map[int]PartitionState }
TopicState describes the current state of a topic.
func NewTopicState ¶
func NewTopicState(name string) TopicState
NewTopicState initializes a TopicState.
func (TopicState) Brokers ¶ added in v4.1.0
func (t TopicState) Brokers() []int
Brokers returns a list of all brokers assigned to any partition in the TopicState.
type TopicStates ¶
type TopicStates map[string]TopicState
TopicStates is a map of topic names to TopicState.
func TopicStatesFromMetadata ¶
func TopicStatesFromMetadata(md *kafka.Metadata) (TopicStates, error)
func (TopicStates) List ¶ added in v4.1.0
func (t TopicStates) List() []string
List returns a []string of all topic names in the TopicStates.
func (TopicStates) UnderReplicated ¶ added in v4.1.0
func (ts TopicStates) UnderReplicated() TopicStates
UnderReplicated returns a TopicStates and only includes under-replicated topics.