Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AclPatternTypeMapping ¶
func AclPatternTypeMapping(patternType v1alpha1.KafkaPatternType) sarama.AclResourcePatternType
AclPatternTypeMapping maps patternType from v1alpha1.KafkaPatternType to sarama.AclResourcePatternType
Types ¶
type CreateTopicOptions ¶
type CreateTopicOptions struct { Name string Partitions int32 ReplicationFactor int16 Config map[string]*string }
CreateTopicOptions holds info about topic configuration
type KafkaClient ¶
type KafkaClient interface { NumBrokers() int ListTopics() (map[string]sarama.TopicDetail, error) CreateTopic(*CreateTopicOptions) error EnsurePartitionCount(string, int32) (bool, error) EnsureTopicConfig(string, map[string]*string) error DeleteTopic(string, bool) error GetTopic(string) (*sarama.TopicDetail, error) DescribeTopic(string) (*sarama.TopicMetadata, error) CreateUserACLs(v1alpha1.KafkaAccessType, v1alpha1.KafkaPatternType, string, string) error ListUserACLs() ([]sarama.ResourceAcls, error) DeleteUserACLs(string, v1alpha1.KafkaPatternType) error Brokers() map[int32]string DescribeCluster() ([]*sarama.Broker, int32, error) // AllOfflineReplicas returns the list of unique offline replica (broker) ids AllOfflineReplicas() ([]int32, error) // OutOfSyncReplicas returns the list of unique out of sync replica (broker) ids OutOfSyncReplicas() ([]int32, error) AlterPerBrokerConfig(int32, map[string]*string, bool) error DescribePerBrokerConfig(int32, []string) ([]*sarama.ConfigEntry, error) AlterClusterWideConfig(map[string]*string, bool) error DescribeClusterWideConfig() ([]sarama.ConfigEntry, error) TopicMetaToStatus(meta *sarama.TopicMetadata) *v1alpha1.KafkaTopicStatus Open() error Close() error }
KafkaClient is the exported interface for kafka operations
func New ¶
func New(opts *KafkaConfig) KafkaClient
func NewFromCluster ¶
func NewFromCluster(k8sclient client.Client, cluster *v1beta1.KafkaCluster) (KafkaClient, func(), error)
NewFromCluster is a convenience wrapper around New() and ClusterConfig()
func NewMockFromCluster ¶
func NewMockFromCluster(client client.Client, cluster *v1beta1.KafkaCluster) (KafkaClient, func(), error)
type KafkaConfig ¶
type KafkaConfig struct { BrokerURI string UseSSL bool TLSConfig *tls.Config OperationTimeout int64 }
KafkaConfig are the options to creating a new ClusterAdmin client
func ClusterConfig ¶
func ClusterConfig(client client.Client, cluster *v1beta1.KafkaCluster) (*KafkaConfig, error)
ClusterConfig creates connection options from a KafkaCluster CR
type Provider ¶
type Provider interface {
NewFromCluster(client client.Client, cluster *v1beta1.KafkaCluster) (KafkaClient, func(), error)
}
func NewDefaultProvider ¶
func NewDefaultProvider() Provider
func NewMockProvider ¶
func NewMockProvider() Provider
Click to show internal directories.
Click to hide internal directories.