Documentation ¶
Index ¶
Constants ¶
const ( CLUSTER_NETWORK_NAME = "kafka-cluster" ZOOKEEPER_PORT = "2181" KAFKA_BROKER_PORT = "9092" KAFKA_CLIENT_PORT = "9093" KAFKA_BROKER_ID = "1" ZOOKEEPER_IMAGE = "confluentinc/cp-zookeeper:5.5.2" KAFKA_IMAGE = "confluentinc/cp-kafka:5.5.2" )
const ( CleanupPolicy = "cleanup.policy" CompressionType = "compression.type" DeleteRetentionMs = "delete.retention.ms" FileDeleteDelayMs = "file.delete.delay.ms" FlushMessages = "flush.messages" FlushMs = "flush.ms" FollowerReplicationThrottledReplicas = "follower.replication.throttled.replicas" IndexIntervalBytes = "index.interval.bytes" LeaderReplicationThrottledReplicas = "leader.replication.throttled.replicas" MaxMessageBytes = "max.message.bytes" MessageDownconversionEnable = "message.downconversion.enable" MessageFormatVersion = "message.format.version" MessageTimestampDifferenceMaxMs = "message.timestamp.difference.max.ms" MessageTimestampType = "message.timestamp.type" MinCleanableDirtyRatio = "min.cleanable.dirty.ratio" MinCompactionLagMs = "min.compaction.lag.ms" MaxCompactionLagMs = "max.compaction.lag.ms" MinInsyncReplicas = "min.insync.replicas" Preallocate = "preallocate" RetentionBytes = "retention.bytes" RetentionMs = "retention.ms" SegmentBytes = "segment.bytes" SegmentIndexBytes = "segment.index.bytes" SegmentJitterMs = "segment.jitter.ms" SegmentMs = "segment.ms" UncleanLeaderElectionEnable = "unclean.leader.election.enable" )
Variables ¶
var KafkaTopicConfigTestData = map[string][]KafkaTopicConfigHolder{ CleanupPolicy: { { infrav1beta1.CleanupPolicyCompact, infrav1beta1.CleanupPolicyCompact, func(v interface{}) *infrav1beta1.KafkaTopicConfig { vv := v.(string) return &infrav1beta1.KafkaTopicConfig{ CleanupPolicy: &vv, } }, }, }, CompressionType: { { infrav1beta1.CompressionTypeSnappy, infrav1beta1.CompressionTypeSnappy, func(v interface{}) *infrav1beta1.KafkaTopicConfig { vv := v.(string) return &infrav1beta1.KafkaTopicConfig{ CompressionType: &vv, } }, }, }, DeleteRetentionMs: { { int64(60000), "60000", func(v interface{}) *infrav1beta1.KafkaTopicConfig { vv := v.(int64) return &infrav1beta1.KafkaTopicConfig{ DeleteRetentionMs: &vv, } }, }, }, FileDeleteDelayMs: { { int64(1000), "1000", func(v interface{}) *infrav1beta1.KafkaTopicConfig { vv := v.(int64) return &infrav1beta1.KafkaTopicConfig{ FileDeleteDelayMs: &vv, } }, }, }, FlushMessages: { { int64(5), "5", func(v interface{}) *infrav1beta1.KafkaTopicConfig { vv := v.(int64) return &infrav1beta1.KafkaTopicConfig{ FlushMessages: &vv, } }, }, }, FlushMs: { { int64(666), "666", func(v interface{}) *infrav1beta1.KafkaTopicConfig { vv := v.(int64) return &infrav1beta1.KafkaTopicConfig{ FlushMs: &vv, } }, }, }, FollowerReplicationThrottledReplicas: { { "*", "*", func(v interface{}) *infrav1beta1.KafkaTopicConfig { vv := v.(string) return &infrav1beta1.KafkaTopicConfig{ FollowerReplicationThrottledReplicas: &vv, } }, }, }, IndexIntervalBytes: { { int64(2048), "2048", func(v interface{}) *infrav1beta1.KafkaTopicConfig { vv := v.(int64) return &infrav1beta1.KafkaTopicConfig{ IndexIntervalBytes: &vv, } }, }, }, LeaderReplicationThrottledReplicas: { { "*", "*", func(v interface{}) *infrav1beta1.KafkaTopicConfig { vv := v.(string) return &infrav1beta1.KafkaTopicConfig{ LeaderReplicationThrottledReplicas: &vv, } }, }, }, MaxMessageBytes: { { int64(999999), "999999", func(v interface{}) *infrav1beta1.KafkaTopicConfig { vv := v.(int64) return &infrav1beta1.KafkaTopicConfig{ MaxMessageBytes: &vv, } }, }, }, MessageDownconversionEnable: { { true, "true", func(v interface{}) *infrav1beta1.KafkaTopicConfig { vv := v.(bool) return &infrav1beta1.KafkaTopicConfig{ MessageDownconversionEnable: &vv, } }, }, }, MessageFormatVersion: { { "0.10.0", "0.10.0", func(v interface{}) *infrav1beta1.KafkaTopicConfig { vv := v.(string) return &infrav1beta1.KafkaTopicConfig{ MessageFormatVersion: &vv, } }, }, }, MessageTimestampDifferenceMaxMs: { { int64(10), "10", func(v interface{}) *infrav1beta1.KafkaTopicConfig { vv := v.(int64) return &infrav1beta1.KafkaTopicConfig{ MessageTimestampDifferenceMaxMs: &vv, } }, }, }, MessageTimestampType: { { "LogAppendTime", "LogAppendTime", func(v interface{}) *infrav1beta1.KafkaTopicConfig { vv := v.(string) return &infrav1beta1.KafkaTopicConfig{ MessageTimestampType: &vv, } }, }, }, MinCleanableDirtyRatio: { { minCleanableDirtyRatio, "0.9", func(v interface{}) *infrav1beta1.KafkaTopicConfig { vv := v.(resource.Quantity) return &infrav1beta1.KafkaTopicConfig{ MinCleanableDirtyRatio: &vv, } }, }, }, MinCompactionLagMs: { { int64(10000), "10000", func(v interface{}) *infrav1beta1.KafkaTopicConfig { vv := v.(int64) return &infrav1beta1.KafkaTopicConfig{ MinCompactionLagMs: &vv, } }, }, }, MaxCompactionLagMs: { { int64(10000), "10000", func(v interface{}) *infrav1beta1.KafkaTopicConfig { vv := v.(int64) return &infrav1beta1.KafkaTopicConfig{ MaxCompactionLagMs: &vv, } }, }, }, MinInsyncReplicas: { { int64(2), "2", func(v interface{}) *infrav1beta1.KafkaTopicConfig { vv := v.(int64) return &infrav1beta1.KafkaTopicConfig{ MinInsyncReplicas: &vv, } }, }, }, Preallocate: { { true, "true", func(v interface{}) *infrav1beta1.KafkaTopicConfig { vv := v.(bool) return &infrav1beta1.KafkaTopicConfig{ Preallocate: &vv, } }, }, }, RetentionBytes: { { int64(1000000), "1000000", func(v interface{}) *infrav1beta1.KafkaTopicConfig { vv := v.(int64) return &infrav1beta1.KafkaTopicConfig{ RetentionBytes: &vv, } }, }, }, RetentionMs: { { int64(1000), "1000", func(v interface{}) *infrav1beta1.KafkaTopicConfig { vv := v.(int64) return &infrav1beta1.KafkaTopicConfig{ RetentionMs: &vv, } }, }, }, SegmentBytes: { { int64(500000), "500000", func(v interface{}) *infrav1beta1.KafkaTopicConfig { vv := v.(int64) return &infrav1beta1.KafkaTopicConfig{ SegmentBytes: &vv, } }, }, }, SegmentIndexBytes: { { int64(250000), "250000", func(v interface{}) *infrav1beta1.KafkaTopicConfig { vv := v.(int64) return &infrav1beta1.KafkaTopicConfig{ SegmentIndexBytes: &vv, } }, }, }, SegmentJitterMs: { { int64(1000), "1000", func(v interface{}) *infrav1beta1.KafkaTopicConfig { vv := v.(int64) return &infrav1beta1.KafkaTopicConfig{ SegmentJitterMs: &vv, } }, }, }, SegmentMs: { { int64(2000), "2000", func(v interface{}) *infrav1beta1.KafkaTopicConfig { vv := v.(int64) return &infrav1beta1.KafkaTopicConfig{ SegmentMs: &vv, } }, }, }, UncleanLeaderElectionEnable: { { true, "true", func(v interface{}) *infrav1beta1.KafkaTopicConfig { vv := v.(bool) return &infrav1beta1.KafkaTopicConfig{ UncleanLeaderElectionEnable: &vv, } }, }, }, }
KafkaTopicConfigTestData
KafkaTopicConfigTestData holds testing data for each topic configuration option.
It is a map of slices, where each entry in the map represents one configuration option, and entries in slices are possible tests variations. To add a new test for existing configuration option, simply create new entry in slice for appropriate config option To add a completely new config option, add a new key with map (value is a slice of KafkaTopicConfigHolder) To change existing tests, simply change it :)
Functions ¶
func TranslateKafkaTopicV1Beta1 ¶
func TranslateKafkaTopicV1Beta1(topic v1beta1.KafkaTopic) *kafka.Topic
NOTE: kafka internal config values are intentionally not put in api.v1beta1 package, in order not to couple our API with kafka internal values. If kafka internals change between versions, we can create new translations without touching our API
Types ¶
type KafkaTopicConfigHolder ¶
type KafkaTopicConfigHolder struct {
// contains filtered or unexported fields
}
KafkaTopicConfigHolder
KafkaTopicConfigHolder holds testing data.
valueToSet : new value to set for config option in test expectedValue : value that is expected as result of the test createKafkaTopicObjectF : function that creates KafkaTopic k8s object from given config value input
type KafkaTopicReconciler ¶
type KafkaTopicReconciler struct { client.Client Log logr.Logger Scheme *runtime.Scheme Recorder record.EventRecorder KafkaClient kafka.KafkaClient }
KafkaTopicReconciler reconciles a KafkaTopic object
func (*KafkaTopicReconciler) SetupWithManager ¶
func (r *KafkaTopicReconciler) SetupWithManager(mgr ctrl.Manager, opts KafkaTopicReconcilerOptions) error
type KafkaTopicReconcilerOptions ¶
type KafkaTopicReconcilerOptions struct {
MaxConcurrentReconciles int
}
type TestingKafkaCluster ¶
type TestingKafkaCluster struct {
// contains filtered or unexported fields
}
func NewTestingKafkaCluster ¶
func NewTestingKafkaCluster() (*TestingKafkaCluster, error)
func (*TestingKafkaCluster) GetKafkaHost ¶
func (kc *TestingKafkaCluster) GetKafkaHost() (string, error)
func (*TestingKafkaCluster) IsAlive ¶
func (kc *TestingKafkaCluster) IsAlive() (bool, error)
func (*TestingKafkaCluster) StartCluster ¶
func (kc *TestingKafkaCluster) StartCluster() error
func (*TestingKafkaCluster) StopCluster ¶
func (kc *TestingKafkaCluster) StopCluster() error