Documentation
¶
Index ¶
- func CheckConsistency(topicConfig TopicConfig, clusterConfig ClusterConfig) error
- type ClusterConfig
- type ClusterMeta
- type ClusterSpec
- type PickerMethod
- type PlacementStrategy
- type SASLConfig
- type TLSConfig
- type TopicConfig
- type TopicMeta
- type TopicMigrationConfig
- type TopicPlacementConfig
- type TopicSettings
- func (t TopicSettings) ConfigMapDiffs(configMap map[string]string) ([]string, []string, error)
- func (t TopicSettings) Copy() TopicSettings
- func (t TopicSettings) GetValueStr(key string) (string, error)
- func (t TopicSettings) HasKey(key string) bool
- func (t TopicSettings) ReduceRetentionDrop(configMap map[string]string, retentionDropStepDuration time.Duration) (bool, error)
- func (t TopicSettings) ToConfigEntries(keys []string) ([]kafka.ConfigEntry, error)
- func (t TopicSettings) Validate() error
- type TopicSpec
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CheckConsistency ¶
func CheckConsistency(topicConfig TopicConfig, clusterConfig ClusterConfig) error
CheckConsistency verifies that the argument topic config is consistent with the argument cluster, e.g. has the same environment and region, etc.
Types ¶
type ClusterConfig ¶
type ClusterConfig struct { Meta ClusterMeta `json:"meta"` Spec ClusterSpec `json:"spec"` // RootDir is the root relative to which paths are evaluated. Set by loader. RootDir string `json:"-"` }
ClusterConfig stores information about a cluster that's referred to by one or more topic configs. These configs should reflect the reality of what's been set up externally; there's no way to "apply" these at the moment.
func LoadClusterBytes ¶
func LoadClusterBytes(contents []byte) (ClusterConfig, error)
LoadClusterBytes loads a ClusterConfig from YAML bytes.
func LoadClusterFile ¶
func LoadClusterFile(path string) (ClusterConfig, error)
LoadClusterFile loads a ClusterConfig from a path to a YAML file.
func (ClusterConfig) GetDefaultRetentionDropStepDuration ¶
func (c ClusterConfig) GetDefaultRetentionDropStepDuration() (time.Duration, error)
func (ClusterConfig) NewAdminClient ¶
func (c ClusterConfig) NewAdminClient( ctx context.Context, sess *session.Session, readOnly bool, usernameOverride string, passwordOverride string, ) (admin.Client, error)
NewAdminClient returns a new admin client using the parameters in the current cluster config.
func (ClusterConfig) Validate ¶
func (c ClusterConfig) Validate() error
Validate evaluates whether the cluster config is valid.
type ClusterMeta ¶
type ClusterMeta struct { Name string `json:"name"` Region string `json:"region"` Environment string `json:"environment"` Description string `json:"description"` }
ClusterMeta contains (mostly immutable) metadata about the cluster. Inspired by the meta fields in Kubernetes objects.
type ClusterSpec ¶
type ClusterSpec struct { // BootstrapAddrs is a list of one or more broker bootstrap addresses. These can use IPs // or DNS names. BootstrapAddrs []string `json:"bootstrapAddrs"` // ZKAddrs is a list of one or more zookeeper addresses. These can use IPs // or DNS names. If these are omitted, then the tool will use broker APIs exclusively. ZKAddrs []string `json:"zkAddrs"` // ZKPrefix is the prefix under which all zk nodes for the cluster are stored. If blank, // these are assumed to be under the zk root. ZKPrefix string `json:"zkPrefix"` // ZKLockPath indicates where locks are stored in zookeeper. If blank, then // no locking will be used on apply operations. ZKLockPath string `json:"zkLockPath"` // ClusterID is the value of the [prefix]/cluster/id node in zookeeper. If set, it's used // to validate that the cluster we're communicating with is the right one. If blank, // this check isn't done. ClusterID string `json:"clusterID"` // DefaultThrottleMB is the default broker throttle used for migrations in this // cluster. If unset, then a reasonable default is used instead. DefaultThrottleMB int64 `json:"defaultThrottleMB"` // DefaultRetentionDropStepDuration is the default amount of time that retention drops will be // limited by. If unset, no retention drop limiting will be applied. DefaultRetentionDropStepDurationStr string `json:"defaultRetentionDropStepDuration"` // TLS stores how we should use TLS with broker connections, if appropriate. Only // applies if using the broker admin. TLS TLSConfig `json:"tls"` // SASL stores how we should use SASL with broker connections, if appropriate. Only // applies if using the broker admin. SASL SASLConfig `json:"sasl"` }
ClusterSpec contains the details necessary to communicate with a kafka cluster.
type PickerMethod ¶
type PickerMethod string
PickerMethod is a string type that stores a picker method for breaking ties when choosing the replica placements for a topic.
const ( // PickerMethodClusterUse uses broker frequency in the topic, breaking ties by // looking at the total number of replicas across the entire cluster that each broker // appears in. PickerMethodClusterUse PickerMethod = "cluster-use" // PickerMethodLowestIndex uses broker frequency in the topic, breaking ties by // choosing the broker with the lowest index. PickerMethodLowestIndex PickerMethod = "lowest-index" // PickerMethodRandomized uses broker frequency in the topic, breaking ties by // using a repeatably random choice from the options. PickerMethodRandomized PickerMethod = "randomized" )
type PlacementStrategy ¶
type PlacementStrategy string
PlacementStrategy is a string type that stores a replica placement strategy for a topic.
const ( // PlacementStrategyAny allows any partition placement. PlacementStrategyAny PlacementStrategy = "any" // PlacementStrategyBalancedLeaders is a strategy that ensures the leaders of // each partition are balanced by rack, but does not care about the placements // of the non-leader replicas. PlacementStrategyBalancedLeaders PlacementStrategy = "balanced-leaders" // PlacementStrategyInRack is a strategy in which the leaders are balanced // and the replicas for each partition are in the same rack as the leader. PlacementStrategyInRack PlacementStrategy = "in-rack" // PlacementStrategyStatic uses a static placement defined in the config. This is for // testing only and should generally not be used in production. PlacementStrategyStatic PlacementStrategy = "static" // PlacementStrategyStaticInRack is a strategy in which the replicas in each partition // are chosen from the rack in a static list, but the specific replicas within each partition // aren't specified. PlacementStrategyStaticInRack PlacementStrategy = "static-in-rack" )
type SASLConfig ¶ added in v1.0.0
type TopicConfig ¶
TopicConfig represents the desired configuration of a topic.
func LoadTopicBytes ¶
func LoadTopicBytes(contents []byte) (TopicConfig, error)
LoadTopicBytes loads a TopicConfig from YAML bytes.
func LoadTopicsFile ¶ added in v0.0.3
func LoadTopicsFile(path string) ([]TopicConfig, error)
LoadTopicsFile loads one or more TopicConfigs from a path to a YAML file.
func TopicConfigFromTopicInfo ¶
func TopicConfigFromTopicInfo( clusterConfig ClusterConfig, topicInfo admin.TopicInfo, ) TopicConfig
TopicConfigFromTopicInfo generates a TopicConfig from a ClusterConfig and admin.TopicInfo struct generated from the cluster state.
func (*TopicConfig) SetDefaults ¶
func (t *TopicConfig) SetDefaults()
SetDefaults sets the default migration and placement settings in a topic config if these aren't set.
func (TopicConfig) ToNewTopicConfig ¶
func (t TopicConfig) ToNewTopicConfig() (kafka.TopicConfig, error)
ToNewTopicConfig converts a TopicConfig to a kafka.TopicConfig that can be used by kafka-go to create a new topic.
func (TopicConfig) ToYAML ¶
func (t TopicConfig) ToYAML() (string, error)
ToYAML converts the current TopicConfig to a YAML string.
func (TopicConfig) Validate ¶
func (t TopicConfig) Validate(numRacks int) error
Validate evaluates whether the topic config is valid.
type TopicMeta ¶
type TopicMeta struct { Name string `json:"name"` Cluster string `json:"cluster"` Region string `json:"region"` Environment string `json:"environment"` Description string `json:"description"` // Consumers is a list of consumers who are expected to consume from this // topic. Consumers []string `json:"consumers,omitempty"` }
TopicMeta stores the (mostly immutable) metadata associated with a topic. Inspired by the meta structs in Kubernetes objects.
type TopicMigrationConfig ¶
type TopicMigrationConfig struct { ThrottleMB int64 `json:"throttleMB"` PartitionBatchSize int `json:"partitionBatchSize"` }
TopicMigrationConfig configures the throttles and batch sizes used when running a partition migration. If these are left unset, resonable defaults will be used instead.
type TopicPlacementConfig ¶
type TopicPlacementConfig struct { Strategy PlacementStrategy `json:"strategy"` Picker PickerMethod `json:"picker,omitempty"` // StaticAssignments is a list of lists of desired replica assignments. It's used // for the "static" strategy only. StaticAssignments [][]int `json:"staticAssignments,omitempty"` // StaticRackAssignments is a list of list of desired replica assignments. It's used // for the "static-in-rack" strategy only. StaticRackAssignments []string `json:"staticRackAssignments,omitempty"` }
TopicPlacementConfig describes how the partition replicas in a topic should be chosen.
type TopicSettings ¶
type TopicSettings map[string]interface{}
TopicSettings is a map of key/value pairs that correspond to Kafka topic config settings.
func FromConfigMap ¶
func FromConfigMap(configMap map[string]string) TopicSettings
FromConfigMap converts a string map from a Kafka topic to a TopicSettings instance.
func (TopicSettings) ConfigMapDiffs ¶
ConfigMapDiffs compares these topic settings to a string map fetched from the cluster. It returns the keys that are set in the settings but different in the cluster and also the keys that are set in the cluster but not set in the settings.
func (TopicSettings) Copy ¶
func (t TopicSettings) Copy() TopicSettings
Copy returns a shallow copy of this settings instance.
func (TopicSettings) GetValueStr ¶
func (t TopicSettings) GetValueStr(key string) (string, error)
GetValueStr returns the string value for a key in this settings instance. It returns an error if the key is not found.
func (TopicSettings) HasKey ¶
func (t TopicSettings) HasKey(key string) bool
HasKey returns whether the current settings instance contains the argument key.
func (TopicSettings) ReduceRetentionDrop ¶
func (TopicSettings) ToConfigEntries ¶
func (t TopicSettings) ToConfigEntries(keys []string) ([]kafka.ConfigEntry, error)
ToConfigEntries converts the argument keys in the current settings into a slice of kafka-go config entries. If keys is nil, then all fields are converted.
func (TopicSettings) Validate ¶
func (t TopicSettings) Validate() error
Validate determines whether the given settings are valid. See https://kafka.apache.org/documentation/#topicconfigs for details.
type TopicSpec ¶
type TopicSpec struct { Partitions int `json:"partitions"` ReplicationFactor int `json:"replicationFactor"` RetentionMinutes int `json:"retentionMinutes,omitempty"` Settings TopicSettings `json:"settings,omitempty"` PlacementConfig TopicPlacementConfig `json:"placement"` MigrationConfig *TopicMigrationConfig `json:"migration,omitempty"` }
TopicSpec stores the (mutable) specification for a topic.