Documentation ¶
Index ¶
- Constants
- func AreTopicsPresentAndValid(kafkaClusterAdmin sarama.ClusterAdmin, topics ...string) (bool, error)
- func BootstrapServersArray(bootstrapServers string) []string
- func BootstrapServersCommaSeparated(bootstrapServers []string) string
- func BrokerClassFilter() func(interface{}) bool
- func CreateTopicIfDoesntExist(admin sarama.ClusterAdmin, logger *zap.Logger, topic string, ...) (string, error)
- func DeleteTopic(admin sarama.ClusterAdmin, topic string) (string, error)
- func DisableOffsetAutoCommitConfigOption(config *sarama.Config) error
- func GetSaramaConfig(configOptions ...ConfigOption) (*sarama.Config, error)
- func NoOpConfigOption(*sarama.Config) error
- func Options(config *sarama.Config, options ...ConfigOption) error
- func Topic(prefix string, obj metav1.Object) string
- type ConfigOption
- type InvalidOrNotPresentTopic
- type NewClientFunc
- type NewClusterAdminFunc
- type TopicConfig
Constants ¶
View Source
const (
// Kafka broker class annotation value.
BrokerClass = "Kafka"
)
Variables ¶
This section is empty.
Functions ¶
func AreTopicsPresentAndValid ¶ added in v0.27.0
func AreTopicsPresentAndValid(kafkaClusterAdmin sarama.ClusterAdmin, topics ...string) (bool, error)
func BootstrapServersArray ¶ added in v0.18.0
func BootstrapServersCommaSeparated ¶ added in v0.18.0
func BrokerClassFilter ¶
func BrokerClassFilter() func(interface{}) bool
func CreateTopicIfDoesntExist ¶ added in v0.26.0
func CreateTopicIfDoesntExist(admin sarama.ClusterAdmin, logger *zap.Logger, topic string, config *TopicConfig) (string, error)
CreateTopicIfDoesntExist creates a topic with name 'topic' following the TopicConfig configuration passed as parameter.
It returns the topic name or an error.
If the topic already exists, it will return no errors. TODO: what happens if the topic exists but it has a different config?
func DeleteTopic ¶ added in v0.18.0
func DeleteTopic(admin sarama.ClusterAdmin, topic string) (string, error)
func DisableOffsetAutoCommitConfigOption ¶ added in v0.27.0
func GetSaramaConfig ¶ added in v0.27.0
func GetSaramaConfig(configOptions ...ConfigOption) (*sarama.Config, error)
GetSaramaConfig returns Kafka Client configuration with the given options applied.
func NoOpConfigOption ¶ added in v0.27.0
NoOpConfigOption is a no-op ConfigOption.
Types ¶
type ConfigOption ¶ added in v0.27.0
type InvalidOrNotPresentTopic ¶ added in v0.27.0
type InvalidOrNotPresentTopic struct {
Topic string
}
func (InvalidOrNotPresentTopic) Error ¶ added in v0.27.0
func (it InvalidOrNotPresentTopic) Error() string
type NewClientFunc ¶ added in v0.27.0
NewClientFunc creates new sarama.Client.
type NewClusterAdminFunc ¶ added in v0.18.0
NewClusterAdminFunc creates new sarama.ClusterAdmin.
type TopicConfig ¶ added in v0.18.0
type TopicConfig struct { TopicDetail sarama.TopicDetail BootstrapServers []string }
TopicConfig contains configurations for creating a topic.
func (TopicConfig) GetBootstrapServers ¶ added in v0.18.0
func (c TopicConfig) GetBootstrapServers() string
GetBootstrapServers returns TopicConfig.BootstrapServers as a comma separated list of bootstrap servers.
Click to show internal directories.
Click to hide internal directories.