config

package
v1.18.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 13, 2024 License: MIT Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CheckConsistency

func CheckConsistency(resourceMeta ResourceMeta, clusterConfig ClusterConfig) error

CheckConsistency verifies that the argument topic config is consistent with the argument cluster, e.g. has the same environment and region, etc.

Types

type ACL added in v1.12.0

type ACL struct {
	Resource   ACLResource              `json:"resource"`
	Operations []kafka.ACLOperationType `json:"operations"`
}

type ACLConfig added in v1.12.0

type ACLConfig struct {
	Meta ResourceMeta `json:"meta"`
	Spec ACLSpec      `json:"spec"`
}

func LoadACLBytes added in v1.12.0

func LoadACLBytes(contents []byte) (ACLConfig, error)

LoadACLBytes loads an ACLConfig from YAML bytes.

func LoadACLsFile added in v1.12.0

func LoadACLsFile(path string) ([]ACLConfig, error)

LoadACLsFile loads one or more ACLConfigs from a path to a YAML file.

func (*ACLConfig) SetDefaults added in v1.12.0

func (a *ACLConfig) SetDefaults()

SetDefaults sets the default host and permission for each ACL in an ACL config if these aren't set

func (ACLConfig) ToNewACLEntries added in v1.12.0

func (a ACLConfig) ToNewACLEntries() []kafka.ACLEntry

func (*ACLConfig) Validate added in v1.12.0

func (a *ACLConfig) Validate() error

Validate evaluates whether the ACL config is valid.

type ACLResource added in v1.12.0

type ACLResource struct {
	Type        kafka.ResourceType      `json:"type"`
	Name        string                  `json:"name"`
	PatternType kafka.PatternType       `json:"patternType"`
	Principal   string                  `json:"principal"`
	Host        string                  `json:"host"`
	Permission  kafka.ACLPermissionType `json:"permission"`
}

type ACLSpec added in v1.12.0

type ACLSpec struct {
	ACLs []ACL `json:"acls"`
}

type AdminClientOpts added in v1.13.0

type AdminClientOpts struct {
	ReadOnly                  bool
	UsernameOverride          string
	PasswordOverride          string
	SecretsManagerArnOverride string
}

type ClusterConfig

type ClusterConfig struct {
	Meta ClusterMeta `json:"meta"`
	Spec ClusterSpec `json:"spec"`

	// RootDir is the root relative to which paths are evaluated. Set by loader.
	RootDir string `json:"-"`
}

ClusterConfig stores information about a cluster that's referred to by one or more topic configs. These configs should reflect the reality of what's been set up externally; there's no way to "apply" these at the moment.

func LoadClusterBytes

func LoadClusterBytes(contents []byte) (ClusterConfig, error)

LoadClusterBytes loads a ClusterConfig from YAML bytes.

func LoadClusterFile

func LoadClusterFile(path string, expandEnv bool) (ClusterConfig, error)

LoadClusterFile loads a ClusterConfig from a path to a YAML file.

func (ClusterConfig) GetDefaultRetentionDropStepDuration

func (c ClusterConfig) GetDefaultRetentionDropStepDuration() (time.Duration, error)

GetDefaultRetentionDropStepDuration gets the default step size to use when reducing the message retention in a topic.

func (ClusterConfig) NewAdminClient

func (c ClusterConfig) NewAdminClient(
	ctx context.Context,
	sess *session.Session,
	opts AdminClientOpts,
) (admin.Client, error)

NewAdminClient returns a new admin client using the parameters in the current cluster config.

func (ClusterConfig) Validate

func (c ClusterConfig) Validate() error

Validate evaluates whether the cluster config is valid.

type ClusterMeta

type ClusterMeta struct {
	Name        string            `json:"name"`
	Region      string            `json:"region"`
	Environment string            `json:"environment"`
	Shard       int               `json:"shard"`
	Description string            `json:"description"`
	Labels      map[string]string `json:"labels"`
}

ClusterMeta contains (mostly immutable) metadata about the cluster. Inspired by the meta fields in Kubernetes objects.

type ClusterSpec

type ClusterSpec struct {
	// BootstrapAddrs is a list of one or more broker bootstrap addresses. These can use IPs
	// or DNS names.
	BootstrapAddrs []string `json:"bootstrapAddrs"`

	// ZKAddrs is a list of one or more zookeeper addresses. These can use IPs
	// or DNS names. If these are omitted, then the tool will use broker APIs exclusively.
	ZKAddrs []string `json:"zkAddrs"`

	// ZKPrefix is the prefix under which all zk nodes for the cluster are stored. If blank,
	// these are assumed to be under the zk root.
	ZKPrefix string `json:"zkPrefix"`

	// ZKLockPath indicates where locks are stored in zookeeper. If blank, then
	// no locking will be used on apply operations.
	ZKLockPath string `json:"zkLockPath"`

	// ClusterID is the value of the [prefix]/cluster/id node in zookeeper. If set, it's used
	// to validate that the cluster we're communicating with is the right one. If blank,
	// this check isn't done.
	ClusterID string `json:"clusterID"`

	// DefaultThrottleMB is the default broker throttle used for migrations in this
	// cluster. If unset, then a reasonable default is used instead.
	DefaultThrottleMB int64 `json:"defaultThrottleMB"`

	// DefaultRetentionDropStepDuration is the default amount of time that retention drops will be
	// limited by. If unset, no retention drop limiting will be applied.
	DefaultRetentionDropStepDurationStr string `json:"defaultRetentionDropStepDuration"`

	// TLS stores how we should use TLS with broker connections, if appropriate. Only
	// applies if using the broker admin.
	TLS TLSConfig `json:"tls"`

	// SASL stores how we should use SASL with broker connections, if appropriate. Only
	// applies if using the broker admin.
	SASL SASLConfig `json:"sasl"`
}

ClusterSpec contains the details necessary to communicate with a kafka cluster.

type PickerMethod

type PickerMethod string

PickerMethod is a string type that stores a picker method for breaking ties when choosing the replica placements for a topic.

const (
	// PickerMethodClusterUse uses broker frequency in the topic, breaking ties by
	// looking at the total number of replicas across the entire cluster that each broker
	// appears in.
	PickerMethodClusterUse PickerMethod = "cluster-use"

	// PickerMethodLowestIndex uses broker frequency in the topic, breaking ties by
	// choosing the broker with the lowest index.
	PickerMethodLowestIndex PickerMethod = "lowest-index"

	// PickerMethodRandomized uses broker frequency in the topic, breaking ties by
	// using a repeatably random choice from the options.
	PickerMethodRandomized PickerMethod = "randomized"
)

type PlacementStrategy

type PlacementStrategy string

PlacementStrategy is a string type that stores a replica placement strategy for a topic.

const (
	// PlacementStrategyAny allows any partition placement.
	PlacementStrategyAny PlacementStrategy = "any"

	// PlacementStrategyBalancedLeaders is a strategy that ensures the leaders of
	// each partition are balanced by rack, but does not care about the placements
	// of the non-leader replicas.
	PlacementStrategyBalancedLeaders PlacementStrategy = "balanced-leaders"

	// PlacementStrategyInRack is a strategy in which the leaders are balanced
	// and the replicas for each partition are in the same rack as the leader.
	PlacementStrategyInRack PlacementStrategy = "in-rack"

	// PlacementStrategyCrossRack is a strategy in which the leaders are balanced
	// and the replicas in each partition are spread to separate racks.
	PlacementStrategyCrossRack PlacementStrategy = "cross-rack"

	// PlacementStrategyStatic uses a static placement defined in the config. This is for
	// testing only and should generally not be used in production.
	PlacementStrategyStatic PlacementStrategy = "static"

	// PlacementStrategyStaticInRack is a strategy in which the replicas in each partition
	// are chosen from the rack in a static list, but the specific replicas within each partition
	// aren't specified.
	PlacementStrategyStaticInRack PlacementStrategy = "static-in-rack"
)

type ResourceMeta added in v1.12.0

type ResourceMeta struct {
	Name        string            `json:"name"`
	Cluster     string            `json:"cluster"`
	Region      string            `json:"region"`
	Environment string            `json:"environment"`
	Description string            `json:"description"`
	Labels      map[string]string `json:"labels"`

	// Consumers is a list of consumers who are expected to consume from this
	// topic.
	Consumers []string `json:"consumers,omitempty"`
}

ResourceMeta stores the (mostly immutable) metadata associated with a resource. Inspired by the meta structs in Kubernetes objects.

func (*ResourceMeta) Validate added in v1.12.0

func (rm *ResourceMeta) Validate() error

Validate evalutes whether the ResourceMeta is valid.

type SASLConfig added in v1.0.0

type SASLConfig struct {
	// Enabled is whether SASL is enabled.
	Enabled bool `json:"enabled"`

	// Mechanism is the name of the SASL mechanism. Valid values are AWS-MSK-IAM, PLAIN,
	// SCRAM-SHA-256, and SCRAM-SHA-512 (case insensitive).
	Mechanism string `json:"mechanism"`

	// Username is the SASL username. Ignored if mechanism is AWS-MSK-IAM.
	Username string `json:"username"`

	// Password is the SASL password. Ignored if mechanism is AWS-MSK-IAM.
	Password string `json:"password"`

	// SecretsManagerArn is the ARN of the AWS Secrets Manager secret containing the SASL credentials.
	// Ignored if mechanism is AWS-MSK-IAM. Username and Password will be ignored if this is set.
	SecretsManagerArn string `json:"secretsManagerArn"`
}

SASLConfig contains the details required to use SASL to authenticate cluster clients.

type TLSConfig added in v1.0.0

type TLSConfig struct {
	// Enabled is whether TLS is enabled.
	Enabled bool `json:"enabled"`

	// CACertPath is the path the CA certificate file
	CACertPath string `json:"caCertPath"`

	// CertPath is the path to the client certificate file
	CertPath string `json:"certPath"`

	// KeyPath is the path to the client secret key
	KeyPath string `json:"keyPath"`

	// ServerName is the name that should be used to validate the server certificate. Optional,
	// if not set defaults to the name in the broker address.
	ServerName string `json:"serverName"`

	// SkipVerify indicates whether we should skip all verification of the server TLS
	// certificate.
	SkipVerify bool `json:"skipVerify"`
}

TLSConfig contains the details required to use TLS in communication with broker clients.

type TopicConfig

type TopicConfig struct {
	Meta ResourceMeta `json:"meta"`
	Spec TopicSpec    `json:"spec"`
}

TopicConfig represents the desired configuration of a topic.

func LoadTopicBytes

func LoadTopicBytes(contents []byte) (TopicConfig, error)

LoadTopicBytes loads a TopicConfig from YAML bytes.

func LoadTopicsFile added in v0.0.3

func LoadTopicsFile(path string) ([]TopicConfig, error)

LoadTopicsFile loads one or more TopicConfigs from a path to a YAML file.

func TopicConfigFromTopicInfo

func TopicConfigFromTopicInfo(
	clusterConfig ClusterConfig,
	topicInfo admin.TopicInfo,
) TopicConfig

TopicConfigFromTopicInfo generates a TopicConfig from a ClusterConfig and admin.TopicInfo struct generated from the cluster state.

func (*TopicConfig) SetDefaults

func (t *TopicConfig) SetDefaults()

SetDefaults sets the default migration and placement settings in a topic config if these aren't set.

func (TopicConfig) ToNewTopicConfig

func (t TopicConfig) ToNewTopicConfig() (kafka.TopicConfig, error)

ToNewTopicConfig converts a TopicConfig to a kafka.TopicConfig that can be used by kafka-go to create a new topic.

func (TopicConfig) ToYAML

func (t TopicConfig) ToYAML() (string, error)

ToYAML converts the current TopicConfig to a YAML string.

func (TopicConfig) Validate

func (t TopicConfig) Validate(numRacks int) error

Validate evaluates whether the topic config is valid.

type TopicMigrationConfig

type TopicMigrationConfig struct {
	ThrottleMB         int64 `json:"throttleMB"`
	PartitionBatchSize int   `json:"partitionBatchSize"`
}

TopicMigrationConfig configures the throttles and batch sizes used when running a partition migration. If these are left unset, resonable defaults will be used instead.

type TopicPlacementConfig

type TopicPlacementConfig struct {
	Strategy PlacementStrategy `json:"strategy"`
	Picker   PickerMethod      `json:"picker,omitempty"`

	// StaticAssignments is a list of lists of desired replica assignments. It's used
	// for the "static" strategy only.
	StaticAssignments [][]int `json:"staticAssignments,omitempty"`

	// StaticRackAssignments is a list of list of desired replica assignments. It's used
	// for the "static-in-rack" strategy only.
	StaticRackAssignments []string `json:"staticRackAssignments,omitempty"`
}

TopicPlacementConfig describes how the partition replicas in a topic should be chosen.

type TopicSettings

type TopicSettings map[string]interface{}

TopicSettings is a map of key/value pairs that correspond to Kafka topic config settings.

func FromConfigMap

func FromConfigMap(configMap map[string]string) TopicSettings

FromConfigMap converts a string map from a Kafka topic to a TopicSettings instance.

func (TopicSettings) ConfigMapDiffs

func (t TopicSettings) ConfigMapDiffs(
	configMap map[string]string,
) ([]string, []string, error)

ConfigMapDiffs compares these topic settings to a string map fetched from the cluster. It returns the keys that are set in the settings but different in the cluster and also the keys that are set in the cluster but not set in the settings.

func (TopicSettings) Copy

func (t TopicSettings) Copy() TopicSettings

Copy returns a shallow copy of this settings instance.

func (TopicSettings) GetValueStr

func (t TopicSettings) GetValueStr(key string) (string, error)

GetValueStr returns the string value for a key in this settings instance. It returns an error if the key is not found.

func (TopicSettings) HasKey

func (t TopicSettings) HasKey(key string) bool

HasKey returns whether the current settings instance contains the argument key.

func (TopicSettings) ReduceRetentionDrop

func (t TopicSettings) ReduceRetentionDrop(
	configMap map[string]string,
	retentionDropStepDuration time.Duration,
) (bool, error)

ReduceRetentionDrop updates the retention in this TopicSettings instance so that it's dropping by no more than the argument retentionDropStepDuration.

func (TopicSettings) ToConfigEntries

func (t TopicSettings) ToConfigEntries(keys []string) ([]kafka.ConfigEntry, error)

ToConfigEntries converts the argument keys in the current settings into a slice of kafka-go config entries. If keys is nil, then all fields are converted.

func (TopicSettings) Validate

func (t TopicSettings) Validate() error

Validate determines whether the given settings are valid. See https://kafka.apache.org/documentation/#topicconfigs for details.

type TopicSpec

type TopicSpec struct {
	Partitions        int           `json:"partitions"`
	ReplicationFactor int           `json:"replicationFactor"`
	RetentionMinutes  int           `json:"retentionMinutes,omitempty"`
	Settings          TopicSettings `json:"settings,omitempty"`

	PlacementConfig TopicPlacementConfig  `json:"placement"`
	MigrationConfig *TopicMigrationConfig `json:"migration,omitempty"`
}

TopicSpec stores the (mutable) specification for a topic.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL