Documentation
¶
Index ¶
- Constants
- func AdminConfig() *sarama.Config
- func BootstrapServersArray(bootstrapServers string) []string
- func BootstrapServersCommaSeparated(bootstrapServers []string) string
- func BrokerClassFilter() func(interface{}) bool
- func CreateTopicIfDoesntExist(admin sarama.ClusterAdmin, logger *zap.Logger, topic string, ...) (string, error)
- func DeleteTopic(admin sarama.ClusterAdmin, topic string) (string, error)
- func GetClusterAdmin(adminFunc NewClusterAdminFunc, bootstrapServers []string, ...) (sarama.ClusterAdmin, error)
- func GetClusterAdminFromConfig(adminFunc NewClusterAdminFunc, config *sarama.Config, ...) (sarama.ClusterAdmin, error)
- func Topic(prefix string, obj metav1.Object) string
- type NewClusterAdminFunc
- func (f NewClusterAdminFunc) CreateTopicIfDoesntExist(logger *zap.Logger, topic string, config *TopicConfig, ...) (string, error)
- func (f NewClusterAdminFunc) DeleteTopic(topic string, bootstrapServers []string, secOptions security.ConfigOption) (string, error)
- func (f NewClusterAdminFunc) IsTopicPresentAndValid(topic string, bootstrapServers []string, secOptions security.ConfigOption) (bool, error)
- type TopicConfig
Constants ¶
const (
// Kafka broker class annotation value.
BrokerClass = "Kafka"
)
Variables ¶
This section is empty.
Functions ¶
func AdminConfig ¶ added in v0.18.0
AdminConfig returns Kafka Admin configurations.
func BootstrapServersArray ¶ added in v0.18.0
func BootstrapServersCommaSeparated ¶ added in v0.18.0
func BrokerClassFilter ¶
func BrokerClassFilter() func(interface{}) bool
func CreateTopicIfDoesntExist ¶ added in v0.26.0
func CreateTopicIfDoesntExist(admin sarama.ClusterAdmin, logger *zap.Logger, topic string, config *TopicConfig) (string, error)
CreateTopicIfDoesntExist creates a topic with name 'topic' following the TopicConfig configuration passed as parameter.
It returns the topic name or an error.
If the topic already exists, it will return no errors. TODO: what happens if the topic exists but it has a different config?
func DeleteTopic ¶ added in v0.18.0
func DeleteTopic(admin sarama.ClusterAdmin, topic string) (string, error)
func GetClusterAdmin ¶ added in v0.18.0
func GetClusterAdmin(adminFunc NewClusterAdminFunc, bootstrapServers []string, secOptions security.ConfigOption) (sarama.ClusterAdmin, error)
GetClusterAdmin creates a new sarama.ClusterAdmin.
The caller is responsible for closing the sarama.ClusterAdmin.
func GetClusterAdminFromConfig ¶ added in v0.18.0
func GetClusterAdminFromConfig(adminFunc NewClusterAdminFunc, config *sarama.Config, bootstrapServers []string, secOptions security.ConfigOption) (sarama.ClusterAdmin, error)
GetClusterAdminFromConfig creates a new sarama.ClusterAdmin.
The caller is responsible for closing the sarama.ClusterAdmin.
Types ¶
type NewClusterAdminFunc ¶ added in v0.18.0
NewClusterAdminFunc creates new sarama.ClusterAdmin.
func (NewClusterAdminFunc) CreateTopicIfDoesntExist ¶ added in v0.26.0
func (f NewClusterAdminFunc) CreateTopicIfDoesntExist(logger *zap.Logger, topic string, config *TopicConfig, secOptions security.ConfigOption) (string, error)
func (NewClusterAdminFunc) DeleteTopic ¶ added in v0.18.0
func (f NewClusterAdminFunc) DeleteTopic(topic string, bootstrapServers []string, secOptions security.ConfigOption) (string, error)
func (NewClusterAdminFunc) IsTopicPresentAndValid ¶ added in v0.18.0
func (f NewClusterAdminFunc) IsTopicPresentAndValid(topic string, bootstrapServers []string, secOptions security.ConfigOption) (bool, error)
type TopicConfig ¶ added in v0.18.0
type TopicConfig struct { TopicDetail sarama.TopicDetail BootstrapServers []string }
TopicConfig contains configurations for creating a topic.
func (TopicConfig) GetBootstrapServers ¶ added in v0.18.0
func (c TopicConfig) GetBootstrapServers() string
GetBootstrapServers returns TopicConfig.BootstrapServers as a comma separated list of bootstrap servers.