Documentation ¶
Index ¶
- Constants
- Variables
- func GetGeneratedDeepCopyFuncs() []conversion.GeneratedDeepCopyFuncdeprecated
- func PrintCluster(cluster *Kafkacluster) string
- func Resource(resource string) schema.GroupResource
- type BrokerState
- type KafkaBrokerSpec
- type KafkaBrokerState
- type KafkaEventType
- type KafkaOption
- type KafkaOptions
- type KafkaPartition
- type KafkaReassignmentConfig
- type KafkaTopic
- type KafkaTopicSpec
- type Kafkacluster
- type KafkaclusterEvent
- type KafkaclusterList
- type KafkaclusterScale
- type KafkaclusterSpec
- type KafkaclusterState
- type KafkaclusterWatchEvent
- type ResourceSpec
Constants ¶
const ( CRDGroupName = "krallistic.github.com" CRDRessourcePlural = "kafkaclusters" CRDName = "kafkacluster" CRDVersion = "v1" )
Variables ¶
var ( SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes) AddToScheme = SchemeBuilder.AddToScheme )
var (
CRDFullName = CRDRessourcePlural + "." + CRDGroupName
)
var SchemeGroupVersion = schema.GroupVersion{Group: CRDGroupName, Version: CRDVersion}
SchemeGroupVersion is the group version used to register these objects.
Functions ¶
func GetGeneratedDeepCopyFuncs
deprecated
func GetGeneratedDeepCopyFuncs() []conversion.GeneratedDeepCopyFunc
Deprecated: GetGeneratedDeepCopyFuncs returns the generated funcs, since we aren't registering them.
func Resource ¶
func Resource(resource string) schema.GroupResource
Resource takes an unqualified resource and returns a Group-qualified GroupResource.
Types ¶
type BrokerState ¶
BrokerState contains state about brokers
type KafkaBrokerSpec ¶
type KafkaBrokerState ¶
type KafkaBrokerState string
const ( EMPTY_BROKER KafkaBrokerState = "EMPTYING" REBALANCE_BROKER KafkaBrokerState = "REBALANCING" NORMAL_STATE KafkaBrokerState = "NORMAL" )
type KafkaEventType ¶
type KafkaEventType int32
const ( //Event when a new CR Object is detected and a cluster needs to be created. NEW_CLUSTER KafkaEventType = iota + 1 //Event when a deletion of a CR Object is detected. Deletion of that Cluster is initiatated DELETE_CLUSTER //Event when a CR Object is changed and currentReplicas < DesisredReplicas. Creating new node and trigger an rebalance. UPSIZE_CLUSTER //CR Object changed, current > desired. Initiating Rebalancing. DOWNSIZE_CLUSTER //Different Image of the Broker, Update Chain CHANGE_IMAGE //Different Broker Ressource, Rolling Update CHANGE_BROKER_RESOURCES CHANGE_NAME CHANGE_ZOOKEEPER_CONNECT //Different Broker Config, Rolling Update. BROKER_CONFIG_CHANGE UNKNOWN_CHANGE DOWNSIZE__EVENT //Cleanup event which get emmised after a Cluster Delete. //Its ensure the deletion of the Statefulset after it has been scaled down. CLEANUP_EVENT KAKFA_EVENT STATE_CHANGE SCALE_CHANGE ERROR_STATE )
type KafkaOption ¶
type KafkaOptions ¶
type KafkaOptions struct { // Default: true AutoCreateTopicsEnable *bool `json:"autoCreateTopicsEnable,omitempty"` //Enables auto balancing of topic leaders. Done by a background thread. // Default: true AutoLeaderRebalanceEnable *bool `json:"autoLeaderRebalanceEnable,omitempty"` //Amount of threads for various background tasks // Default: 10 BackgroundThreads *int32 `json:"backgroudThreads,omitempty"` //Default compression type for a topic. Can be "gzip", "snappy", "lz4" // Default: "gzip" CompressionType *string `json:"compressionType,omitempty"` // Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off // Default: false DeleteTopicEnable *bool `json:"deleteTopicEnable,omitempty"` //The frequency with which the partition rebalance check is triggered by the controller // Default:300 LeaderImbalanceCheckIntervalSeconds *int32 `json:"leaderImbalanceCheckIntervalSeconds,omitempty"` // The ratio of leader imbalance allowed per broker. // The controller would trigger a leader balance if it goes above this value per broker. The value is specified in percentage. // Default: 10 LeaderImbalanceBrokerPercentage *int32 `json:"leaderImbalanceBrokerPercentage,omitempty"` //The number of messages accumulated on a log partition before messages are flushed to disk // Default: 9223372036854775807 LogFlushIntervalMessages *int64 `json:"logFlushIntervalMessages,omitempty"` // The maximum time in ms that a message in any topic is kept in memory before flushed to disk. // If not set, the value in log.flush.scheduler.interval.ms is used // Default: null LogFlushIntervalMs *int64 `json:"logFlushIntervalMs,omitempty"` //The frequency with which we update the persistent record of the last flush which acts as the log recovery point // Default: 60000 LogFlushOffsetCheckpointIntervalMs *int32 `json:"logFlushOffsetCheckpointIntervalMs,omitempty"` //The frequency in ms that the log flusher checks whether any log needs to be flushed to disk // Default: 9223372036854775807 LogFlushSchedulerIntervalMs *int64 `json:"LogFlushSchedulerIntervalMs,omitempty"` // The maximum size of the log before deleting it // Default: -1 LogRetentionBytes *string `json:"logRetentionBytes,omitempty"` // The number of hours to keep a log file before deleting it (in hours), tertiary to log.retention.ms property // Default: 168 LogRetentionHours *int32 `json:"logRetentionHours,omitempty"` //The maximum time before a new log segment is rolled out (in hours), secondary to log.roll.ms property // Default: 168 LogRollHours *int32 `json:"logRollHours,omitempty"` // The maximum jitter to subtract from logRollTimeMillis (in hours), secondary to log.roll.jitter.ms property // Default: 0 LogRollJitterHours *int32 `json:"logRollJitterHours,omitempty"` //The maximum size of a single log file // Default: 1073741824 LogSegmentBytes *int32 `json:"logSegmentBytes,omitempty"` // The amount of time to wait before deleting a file from the filesystem // Default: 60000 LogSegmentDeleteDelayMS *int64 `json:"logSegmentDeleteDelayMS,omitempty"` // The maximum size of message that the server can receive // Default: 1000012 MessagesMaxBytes *int32 `json:"messagesMaxBytes,omitempty"` // When a producer sets acks to "all" (or "-1"), min.insync.replicas specifies the minimum number of replicas that must acknowledge // a write for the write to be considered successful. // Can be overwritten at topic level // Default: 1 MinInsyncReplicas *int32 `json:"minInsyncReplicas,omitempty"` // The number of io threads that the server uses for carrying out network requests // Default: 8 NumIOThreads *int32 `json:"numIOThreads,omitempty"` // The number of network threads that the server uses for handling network requests // Default: 3 NumNetworkThreads *int32 `json:"numNetworkThreads,omitempty"` //The number of threads per data directory to be used for log recovery at startup and flushing at shutdown // Default: 1 NumRecoveryThreadsPerDataDir *int32 `json:"numRecoveryThreadsPerDataDir,omitempty"` // Number of fetcher threads used to replicate messages from a source broker. // Increasing this value can increase the degree of I/O parallelism in the follower broker. // Default: 1 NumReplicaFetchers *int32 `json:"numReplicaFetchers,omitempty"` // The maximum size for a metadata entry associated with an offset commit. // Default: 4096 OffsetMetadataMaxBytes *int32 `json:"offsetMetadataMaxBytes,omitempty"` //Offset commit will be delayed until all replicas for the offsets topic receive the commit or this timeout is reached. // This is similar to the producer request timeout. // Default: 5000 OffsetCommitTimeoutMs *int32 `json:"offsetCommitTimeoutMs,omitempty"` // Batch size for reading from the offsets segments when loading offsets into the cache. // Default: 5242880 OffsetLoadBufferSize *int32 `json:"offsetLoadBufferSize,omitempty"` // Frequency at which to check for stale offsets // Default: 600000 OffsetRetentionCheckIntervalMs *int64 `json:"offsetRetentionCheckIntervalMs,omitempty"` // Log retention window in minutes for offsets topic // Default: 1440 OffsetRetentionMinutes *int32 `json:"offsetRetentionMinutes,omitempty"` // The number of partitions for the offset commit topic (should not change after deployment) // Default: 50 OffsetTopicNumPartitions *int32 `json:"offsetTopicNumPartitions,omitempty"` // The replication factor for the offsets topic (set higher to ensure availability). // To ensure that the effective replication factor of the offsets topic is the configured value, the number of alive brokers has to be at least the replication factor at the time of the first request for the offsets topic. // If not, either the offsets topic creation will fail or it will get a replication factor of min(alive brokers, configured replication factor) // Default: 3 OffsetTopicReplicationFactor *int32 `json:"offsetTopicReplicationFactor,omitempty"` // The offsets topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads // Default: 104857600 OffsetTopicSegmentsBytes *int32 `json:"offsetTopicSegmentsBytes,omitempty"` // The number of queued requests allowed before blocking the network threads // Default: 100 QueuedMaxRequest *int32 `json:"queuedMaxRequest,omitempty"` // Minimum bytes expected for each fetch response. If not enough bytes, wait up to replicaMaxWaitTimeMs // Default: 1 ReplicaFetchMinBytes *int32 `json:"replicaFetchMinBytes,omitempty"` //max wait time for each fetcher request issued by follower replicas. // This value should always be less than the replica.lag.time.max.ms at all times to prevent frequent shrinking of ISR for low throughput topics //Default: 500 ReplicaFetchWaitMaxMs *int32 `json:"replicaFetchWaitMaxMs,omitempty"` //The frequency with which the high watermark is saved out to disk // Default: 5000 ReplicaHighWatermarkCheckpointIntervalMs *int64 `json:"replicaHighWatermarkCheckpointIntervalMs,omitempty"` // If a follower hasn't sent any fetch requests or hasn't consumed up to the leaders log end offset for at least this time, // the leader will remove the follower from isr // Defaut: 10000 ReplicaLagTimeMaxMs *int64 `json:"replicaLagTimeMaxMs,omitempty"` // The socket receive buffer for network requests // Default: 65536 ReplicaSocketReceiveBufferBytes *int32 `json:"replicaSocketReceiveBufferBytes,omitempty"` // The socket timeout for network requests. Its value should be at least replica.fetch.wait.max.ms // Default: 30000 ReplicaSocketTimeoutMs *int32 `json:"replicaSocketTimeoutMs,omitempty"` // The configuration controls the maximum amount of time the client will wait for the response of a request. // If the response is not received before the timeout elapses the client will resend the request if necessary // or fail the request if retries are exhausted. // Default: 30000 RequestTimeoutMs *int32 `json:"requestTimeoutMs,omitempty"` // Indicates whether to enable replicas not in the ISR set to be elected as leader as a last resort, even though doing so may result in data loss // Default: true UncleanLeaderElectionEnable *bool `json:"uncleanLeaderElectionEnable,omitempty"` // The max time that the client waits to establish a connection to zookeeper. // If not set, the value in zookeeper.session.timeout.ms is used // Default: null ZookeeperConnectionTimeoutMs *int32 `json:"zookeeperConnectionTimeoutMs,omitempty"` // Zookeeper session timeout // Default: 6000 ZookeeperSessionTimeoutMs *int32 `json:"zookeeperSessionTimeoutMs,omitempty"` }
Unsused
type KafkaPartition ¶
type KafkaReassignmentConfig ¶
type KafkaReassignmentConfig struct { Partition []KafkaPartition `json:"partition"` Version string `json:"version"` }
ReassigmentConfig
type KafkaTopic ¶
type KafkaTopic struct { Topic string `json:"topic"` PartitionFactor int32 `json:"partition_factor"` ReplicationFactor int32 `json:"replication_factor"` Partitions []KafkaPartition `json:"partitions"` }
type KafkaTopicSpec ¶
type Kafkacluster ¶
type Kafkacluster struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata"` Spec KafkaclusterSpec `json:"spec"` State KafkaclusterState `json:"state,omitempty"` Scale KafkaclusterScale `json:"scale,omitempty"` }
Main API Object
func (*Kafkacluster) DeepCopy ¶
func (x *Kafkacluster) DeepCopy() *Kafkacluster
DeepCopyInto is an autogenerated deepcopy function, copying the receiver, creating a new Kafkacluster.
func (*Kafkacluster) DeepCopyInto ¶
func (in *Kafkacluster) DeepCopyInto(out *Kafkacluster)
DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (*Kafkacluster) DeepCopyObject ¶
func (x *Kafkacluster) DeepCopyObject() runtime.Object
DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (*Kafkacluster) GetObjectKind ¶
func (e *Kafkacluster) GetObjectKind() schema.ObjectKind
Required to satisfy Object interface
type KafkaclusterEvent ¶
type KafkaclusterEvent struct { Type KafkaEventType Cluster Kafkacluster OldCluster Kafkacluster }
No json needed since internal Event type.
type KafkaclusterList ¶
type KafkaclusterList struct { metav1.TypeMeta `json:",inline"` metav1.ListMeta `json:"metadata"` Items []Kafkacluster `json:"items"` }
k8s API List Type
func (*KafkaclusterList) DeepCopy ¶
func (x *KafkaclusterList) DeepCopy() *KafkaclusterList
DeepCopyInto is an autogenerated deepcopy function, copying the receiver, creating a new KafkaclusterList.
func (*KafkaclusterList) DeepCopyInto ¶
func (in *KafkaclusterList) DeepCopyInto(out *KafkaclusterList)
DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (*KafkaclusterList) DeepCopyObject ¶
func (x *KafkaclusterList) DeepCopyObject() runtime.Object
DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (*KafkaclusterList) GetObjectKind ¶
func (el *KafkaclusterList) GetObjectKind() schema.ObjectKind
Required to satisfy Object interface
type KafkaclusterScale ¶
type KafkaclusterScale struct { CurrentScale int32 `json:"currentScale,omitempty"` DesiredScale int32 `json:"desiredScale,omitempty"` }
KafkaclusterScale represent the `scale` field inside the crd
type KafkaclusterSpec ¶
type KafkaclusterSpec struct { //Amount of Broker Nodes Image string `json:"image"` BrokerCount int32 `json:"brokerCount"` Resources ResourceSpec `json:"resources"` KafkaOptions KafkaOptions `json:"kafkaOptions"` JmxSidecar bool `json:"jmxSidecar"` Topics []KafkaTopicSpec `json:"topics"` ZookeeperConnect string `json:"zookeeperConnect"` NodeSelector map[string]string `json:"nodeSelector,omitempty"` StorageClass string `json:"storageClass"` //TODO use k8s type? // Toleration time if node is down/unreachable/not ready before moving to a new net // Set to 0 to disable moving to all together. MinimumGracePeriod int64 `json:"minimumGracePeriod"` LeaderImbalanceRatio float32 `json:"leaderImbalanceRatio"` LeaderImbalanceInterval int32 `json:"leaderImbalanceInterval"` }
func (*KafkaclusterSpec) DeepCopy ¶
func (x *KafkaclusterSpec) DeepCopy() *KafkaclusterSpec
DeepCopyInto is an autogenerated deepcopy function, copying the receiver, creating a new KafkaclusterSpec.
func (*KafkaclusterSpec) DeepCopyInto ¶
func (in *KafkaclusterSpec) DeepCopyInto(out *KafkaclusterSpec)
DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
type KafkaclusterState ¶
type KafkaclusterState struct { Status string `json:"status,omitempty"` Topics []string `json:"topics,omitempty"` Brokers []BrokerState `json:"brokers,omitempty"` }
KafkaclusterState Represent State field inside cluster, is used to do insert current state information.
type KafkaclusterWatchEvent ¶
type KafkaclusterWatchEvent struct { Type string `json:"type"` Object Kafkacluster `json:"object"` OldObject Kafkacluster `json:"oldObject"` }
type ResourceSpec ¶
type ResourceSpec struct { Memory string `json:"memory"` DiskSpace string `json:"diskSpace"` CPU string `json:"cpu"` }
TODO refactor to just use native k8s types