Documentation ¶
Index ¶
- func CheckConsistency(resourceMeta ResourceMeta, clusterConfig ClusterConfig) error
- type ACL
- type ACLConfig
- type ACLResource
- type ACLSpec
- type AdminClientOpts
- type ClusterConfig
- type ClusterMeta
- type ClusterSpec
- type PickerMethod
- type PlacementStrategy
- type ResourceMeta
- type SASLConfig
- type TLSConfig
- type TopicConfig
- type TopicMigrationConfig
- type TopicPlacementConfig
- type TopicSettings
- func (t TopicSettings) ConfigMapDiffs(configMap map[string]string) ([]string, []string, error)
- func (t TopicSettings) Copy() TopicSettings
- func (t TopicSettings) GetValueStr(key string) (string, error)
- func (t TopicSettings) HasKey(key string) bool
- func (t TopicSettings) ReduceRetentionDrop(configMap map[string]string, retentionDropStepDuration time.Duration) (bool, error)
- func (t TopicSettings) ToConfigEntries(keys []string) ([]kafka.ConfigEntry, error)
- func (t TopicSettings) Validate() error
- type TopicSpec
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CheckConsistency ¶
func CheckConsistency(resourceMeta ResourceMeta, clusterConfig ClusterConfig) error
CheckConsistency verifies that the argument topic config is consistent with the argument cluster, e.g. has the same environment and region, etc.
Types ¶
type ACL ¶ added in v1.12.0
type ACL struct { Resource ACLResource `json:"resource"` Operations []kafka.ACLOperationType `json:"operations"` }
type ACLConfig ¶ added in v1.12.0
type ACLConfig struct { Meta ResourceMeta `json:"meta"` Spec ACLSpec `json:"spec"` }
func LoadACLBytes ¶ added in v1.12.0
LoadACLBytes loads an ACLConfig from YAML bytes.
func LoadACLsFile ¶ added in v1.12.0
LoadACLsFile loads one or more ACLConfigs from a path to a YAML file.
func (*ACLConfig) SetDefaults ¶ added in v1.12.0
func (a *ACLConfig) SetDefaults()
SetDefaults sets the default host and permission for each ACL in an ACL config if these aren't set
func (ACLConfig) ToNewACLEntries ¶ added in v1.12.0
func (a ACLConfig) ToNewACLEntries() []kafka.ACLEntry
type ACLResource ¶ added in v1.12.0
type AdminClientOpts ¶ added in v1.13.0
type ClusterConfig ¶
type ClusterConfig struct { Meta ClusterMeta `json:"meta"` Spec ClusterSpec `json:"spec"` // RootDir is the root relative to which paths are evaluated. Set by loader. RootDir string `json:"-"` }
ClusterConfig stores information about a cluster that's referred to by one or more topic configs. These configs should reflect the reality of what's been set up externally; there's no way to "apply" these at the moment.
func LoadClusterBytes ¶
func LoadClusterBytes(contents []byte) (ClusterConfig, error)
LoadClusterBytes loads a ClusterConfig from YAML bytes.
func LoadClusterFile ¶
func LoadClusterFile(path string, expandEnv bool) (ClusterConfig, error)
LoadClusterFile loads a ClusterConfig from a path to a YAML file.
func (ClusterConfig) GetDefaultRetentionDropStepDuration ¶
func (c ClusterConfig) GetDefaultRetentionDropStepDuration() (time.Duration, error)
GetDefaultRetentionDropStepDuration gets the default step size to use when reducing the message retention in a topic.
func (ClusterConfig) NewAdminClient ¶
func (c ClusterConfig) NewAdminClient( ctx context.Context, sess *session.Session, opts AdminClientOpts, ) (admin.Client, error)
NewAdminClient returns a new admin client using the parameters in the current cluster config.
func (ClusterConfig) Validate ¶
func (c ClusterConfig) Validate() error
Validate evaluates whether the cluster config is valid.
type ClusterMeta ¶
type ClusterMeta struct { Name string `json:"name"` Region string `json:"region"` Environment string `json:"environment"` Shard int `json:"shard"` Description string `json:"description"` Labels map[string]string `json:"labels"` }
ClusterMeta contains (mostly immutable) metadata about the cluster. Inspired by the meta fields in Kubernetes objects.
type ClusterSpec ¶
type ClusterSpec struct { // BootstrapAddrs is a list of one or more broker bootstrap addresses. These can use IPs // or DNS names. BootstrapAddrs []string `json:"bootstrapAddrs"` // ZKAddrs is a list of one or more zookeeper addresses. These can use IPs // or DNS names. If these are omitted, then the tool will use broker APIs exclusively. ZKAddrs []string `json:"zkAddrs"` // ZKPrefix is the prefix under which all zk nodes for the cluster are stored. If blank, // these are assumed to be under the zk root. ZKPrefix string `json:"zkPrefix"` // ZKLockPath indicates where locks are stored in zookeeper. If blank, then // no locking will be used on apply operations. ZKLockPath string `json:"zkLockPath"` // ClusterID is the value of the [prefix]/cluster/id node in zookeeper. If set, it's used // to validate that the cluster we're communicating with is the right one. If blank, // this check isn't done. ClusterID string `json:"clusterID"` // DefaultThrottleMB is the default broker throttle used for migrations in this // cluster. If unset, then a reasonable default is used instead. DefaultThrottleMB int64 `json:"defaultThrottleMB"` // DefaultRetentionDropStepDuration is the default amount of time that retention drops will be // limited by. If unset, no retention drop limiting will be applied. DefaultRetentionDropStepDurationStr string `json:"defaultRetentionDropStepDuration"` // TLS stores how we should use TLS with broker connections, if appropriate. Only // applies if using the broker admin. TLS TLSConfig `json:"tls"` // SASL stores how we should use SASL with broker connections, if appropriate. Only // applies if using the broker admin. SASL SASLConfig `json:"sasl"` }
ClusterSpec contains the details necessary to communicate with a kafka cluster.
type PickerMethod ¶
type PickerMethod string
PickerMethod is a string type that stores a picker method for breaking ties when choosing the replica placements for a topic.
const ( // PickerMethodClusterUse uses broker frequency in the topic, breaking ties by // looking at the total number of replicas across the entire cluster that each broker // appears in. PickerMethodClusterUse PickerMethod = "cluster-use" // PickerMethodLowestIndex uses broker frequency in the topic, breaking ties by // choosing the broker with the lowest index. PickerMethodLowestIndex PickerMethod = "lowest-index" // PickerMethodRandomized uses broker frequency in the topic, breaking ties by // using a repeatably random choice from the options. PickerMethodRandomized PickerMethod = "randomized" )
type PlacementStrategy ¶
type PlacementStrategy string
PlacementStrategy is a string type that stores a replica placement strategy for a topic.
const ( // PlacementStrategyAny allows any partition placement. PlacementStrategyAny PlacementStrategy = "any" // PlacementStrategyBalancedLeaders is a strategy that ensures the leaders of // each partition are balanced by rack, but does not care about the placements // of the non-leader replicas. PlacementStrategyBalancedLeaders PlacementStrategy = "balanced-leaders" // PlacementStrategyInRack is a strategy in which the leaders are balanced // and the replicas for each partition are in the same rack as the leader. PlacementStrategyInRack PlacementStrategy = "in-rack" // PlacementStrategyCrossRack is a strategy in which the leaders are balanced // and the replicas in each partition are spread to separate racks. PlacementStrategyCrossRack PlacementStrategy = "cross-rack" // PlacementStrategyStatic uses a static placement defined in the config. This is for // testing only and should generally not be used in production. PlacementStrategyStatic PlacementStrategy = "static" // PlacementStrategyStaticInRack is a strategy in which the replicas in each partition // are chosen from the rack in a static list, but the specific replicas within each partition // aren't specified. PlacementStrategyStaticInRack PlacementStrategy = "static-in-rack" )
type ResourceMeta ¶ added in v1.12.0
type ResourceMeta struct { Name string `json:"name"` Cluster string `json:"cluster"` Region string `json:"region"` Environment string `json:"environment"` Description string `json:"description"` Labels map[string]string `json:"labels"` // Consumers is a list of consumers who are expected to consume from this // topic. Consumers []string `json:"consumers,omitempty"` }
ResourceMeta stores the (mostly immutable) metadata associated with a resource. Inspired by the meta structs in Kubernetes objects.
func (*ResourceMeta) Validate ¶ added in v1.12.0
func (rm *ResourceMeta) Validate() error
Validate evalutes whether the ResourceMeta is valid.
type SASLConfig ¶ added in v1.0.0
type SASLConfig struct { // Enabled is whether SASL is enabled. Enabled bool `json:"enabled"` // Mechanism is the name of the SASL mechanism. Valid values are AWS-MSK-IAM, PLAIN, // SCRAM-SHA-256, and SCRAM-SHA-512 (case insensitive). Mechanism string `json:"mechanism"` // Username is the SASL username. Ignored if mechanism is AWS-MSK-IAM. Username string `json:"username"` // Password is the SASL password. Ignored if mechanism is AWS-MSK-IAM. Password string `json:"password"` // SecretsManagerArn is the ARN of the AWS Secrets Manager secret containing the SASL credentials. // Ignored if mechanism is AWS-MSK-IAM. Username and Password will be ignored if this is set. SecretsManagerArn string `json:"secretsManagerArn"` }
SASLConfig contains the details required to use SASL to authenticate cluster clients.
type TLSConfig ¶ added in v1.0.0
type TLSConfig struct { // Enabled is whether TLS is enabled. Enabled bool `json:"enabled"` // CACertPath is the path the CA certificate file CACertPath string `json:"caCertPath"` // CertPath is the path to the client certificate file CertPath string `json:"certPath"` // KeyPath is the path to the client secret key KeyPath string `json:"keyPath"` // ServerName is the name that should be used to validate the server certificate. Optional, // if not set defaults to the name in the broker address. ServerName string `json:"serverName"` // SkipVerify indicates whether we should skip all verification of the server TLS // certificate. SkipVerify bool `json:"skipVerify"` }
TLSConfig contains the details required to use TLS in communication with broker clients.
type TopicConfig ¶
type TopicConfig struct { Meta ResourceMeta `json:"meta"` Spec TopicSpec `json:"spec"` }
TopicConfig represents the desired configuration of a topic.
func LoadTopicBytes ¶
func LoadTopicBytes(contents []byte) (TopicConfig, error)
LoadTopicBytes loads a TopicConfig from YAML bytes.
func LoadTopicsFile ¶ added in v0.0.3
func LoadTopicsFile(path string) ([]TopicConfig, error)
LoadTopicsFile loads one or more TopicConfigs from a path to a YAML file.
func TopicConfigFromTopicInfo ¶
func TopicConfigFromTopicInfo( clusterConfig ClusterConfig, topicInfo admin.TopicInfo, ) TopicConfig
TopicConfigFromTopicInfo generates a TopicConfig from a ClusterConfig and admin.TopicInfo struct generated from the cluster state.
func (*TopicConfig) SetDefaults ¶
func (t *TopicConfig) SetDefaults()
SetDefaults sets the default migration and placement settings in a topic config if these aren't set.
func (TopicConfig) ToNewTopicConfig ¶
func (t TopicConfig) ToNewTopicConfig() (kafka.TopicConfig, error)
ToNewTopicConfig converts a TopicConfig to a kafka.TopicConfig that can be used by kafka-go to create a new topic.
func (TopicConfig) ToYAML ¶
func (t TopicConfig) ToYAML() (string, error)
ToYAML converts the current TopicConfig to a YAML string.
func (TopicConfig) Validate ¶
func (t TopicConfig) Validate(numRacks int) error
Validate evaluates whether the topic config is valid.
type TopicMigrationConfig ¶
type TopicMigrationConfig struct { ThrottleMB int64 `json:"throttleMB"` PartitionBatchSize int `json:"partitionBatchSize"` }
TopicMigrationConfig configures the throttles and batch sizes used when running a partition migration. If these are left unset, resonable defaults will be used instead.
type TopicPlacementConfig ¶
type TopicPlacementConfig struct { Strategy PlacementStrategy `json:"strategy"` Picker PickerMethod `json:"picker,omitempty"` // StaticAssignments is a list of lists of desired replica assignments. It's used // for the "static" strategy only. StaticAssignments [][]int `json:"staticAssignments,omitempty"` // StaticRackAssignments is a list of list of desired replica assignments. It's used // for the "static-in-rack" strategy only. StaticRackAssignments []string `json:"staticRackAssignments,omitempty"` }
TopicPlacementConfig describes how the partition replicas in a topic should be chosen.
type TopicSettings ¶
type TopicSettings map[string]interface{}
TopicSettings is a map of key/value pairs that correspond to Kafka topic config settings.
func FromConfigMap ¶
func FromConfigMap(configMap map[string]string) TopicSettings
FromConfigMap converts a string map from a Kafka topic to a TopicSettings instance.
func (TopicSettings) ConfigMapDiffs ¶
ConfigMapDiffs compares these topic settings to a string map fetched from the cluster. It returns the keys that are set in the settings but different in the cluster and also the keys that are set in the cluster but not set in the settings.
func (TopicSettings) Copy ¶
func (t TopicSettings) Copy() TopicSettings
Copy returns a shallow copy of this settings instance.
func (TopicSettings) GetValueStr ¶
func (t TopicSettings) GetValueStr(key string) (string, error)
GetValueStr returns the string value for a key in this settings instance. It returns an error if the key is not found.
func (TopicSettings) HasKey ¶
func (t TopicSettings) HasKey(key string) bool
HasKey returns whether the current settings instance contains the argument key.
func (TopicSettings) ReduceRetentionDrop ¶
func (t TopicSettings) ReduceRetentionDrop( configMap map[string]string, retentionDropStepDuration time.Duration, ) (bool, error)
ReduceRetentionDrop updates the retention in this TopicSettings instance so that it's dropping by no more than the argument retentionDropStepDuration.
func (TopicSettings) ToConfigEntries ¶
func (t TopicSettings) ToConfigEntries(keys []string) ([]kafka.ConfigEntry, error)
ToConfigEntries converts the argument keys in the current settings into a slice of kafka-go config entries. If keys is nil, then all fields are converted.
func (TopicSettings) Validate ¶
func (t TopicSettings) Validate() error
Validate determines whether the given settings are valid. See https://kafka.apache.org/documentation/#topicconfigs for details.
type TopicSpec ¶
type TopicSpec struct { Partitions int `json:"partitions"` ReplicationFactor int `json:"replicationFactor"` RetentionMinutes int `json:"retentionMinutes,omitempty"` Settings TopicSettings `json:"settings,omitempty"` PlacementConfig TopicPlacementConfig `json:"placement"` MigrationConfig *TopicMigrationConfig `json:"migration,omitempty"` }
TopicSpec stores the (mutable) specification for a topic.