Documentation ¶
Index ¶
- Constants
- func AreTopicsPresentAndValid(kafkaClusterAdmin sarama.ClusterAdmin, topics ...string) (bool, error)
- func BootstrapServersArray(bootstrapServers string) []string
- func BootstrapServersCommaSeparated(bootstrapServers []string) string
- func BootstrapServersFromConfigMap(logger *zap.Logger, cm *corev1.ConfigMap) ([]string, error)
- func BrokerClassFilter() func(interface{}) bool
- func BrokerTopic(prefix string, obj metav1.Object) string
- func ChannelTopic(prefix string, obj metav1.Object) string
- func CreateTopicIfDoesntExist(admin sarama.ClusterAdmin, logger *zap.Logger, topic string, ...) (string, error)
- func DeleteTopic(admin sarama.ClusterAdmin, topic string) (string, error)
- func DisableOffsetAutoCommitConfigOption(config *sarama.Config) error
- func GetSaramaConfig(configOptions ...ConfigOption) (*sarama.Config, error)
- func NoOpConfigOption(*sarama.Config) error
- func Options(config *sarama.Config, options ...ConfigOption) error
- type ConfigOption
- type InitOffsetsFunc
- type InvalidOrNotPresentTopic
- type NewClientFunc
- type NewClusterAdminClientFunc
- type TopicConfig
Constants ¶
const ( DefaultTopicNumPartitionConfigMapKey = "default.topic.partitions" DefaultTopicReplicationFactorConfigMapKey = "default.topic.replication.factor" BootstrapServersConfigMapKey = "bootstrap.servers" )
const (
// Kafka broker class annotation value.
BrokerClass = "Kafka"
)
Variables ¶
This section is empty.
Functions ¶
func AreTopicsPresentAndValid ¶ added in v0.27.0
func AreTopicsPresentAndValid(kafkaClusterAdmin sarama.ClusterAdmin, topics ...string) (bool, error)
func BootstrapServersArray ¶ added in v0.18.0
func BootstrapServersCommaSeparated ¶ added in v0.18.0
func BootstrapServersFromConfigMap ¶ added in v0.28.0
func BrokerClassFilter ¶
func BrokerClassFilter() func(interface{}) bool
func BrokerTopic ¶ added in v0.28.0
BrokerTopic returns a topic name given a topic prefix and a Broker.
func ChannelTopic ¶ added in v0.28.0
ChannelTopic returns a topic name given a topic prefix and a KafkaChannel.
func CreateTopicIfDoesntExist ¶ added in v0.26.0
func CreateTopicIfDoesntExist(admin sarama.ClusterAdmin, logger *zap.Logger, topic string, config *TopicConfig) (string, error)
CreateTopicIfDoesntExist creates a topic with name 'topic' following the TopicConfig configuration passed as parameter.
It returns the topic name or an error.
If the topic already exists, it will return no errors. TODO: what happens if the topic exists but it has a different config?
func DeleteTopic ¶ added in v0.18.0
func DeleteTopic(admin sarama.ClusterAdmin, topic string) (string, error)
func DisableOffsetAutoCommitConfigOption ¶ added in v0.27.0
func GetSaramaConfig ¶ added in v0.27.0
func GetSaramaConfig(configOptions ...ConfigOption) (*sarama.Config, error)
GetSaramaConfig returns Kafka Client configuration with the given options applied.
func NoOpConfigOption ¶ added in v0.27.0
NoOpConfigOption is a no-op ConfigOption.
Types ¶
type ConfigOption ¶ added in v0.27.0
type InitOffsetsFunc ¶ added in v0.28.0
type InitOffsetsFunc func(ctx context.Context, kafkaClient sarama.Client, kafkaAdminClient sarama.ClusterAdmin, topics []string, consumerGroup string) (int32, error)
InitOffsetsFunc initialize offsets for a provided set of topics and a provided consumer group id.
type InvalidOrNotPresentTopic ¶ added in v0.27.0
type InvalidOrNotPresentTopic struct {
Topic string
}
func (InvalidOrNotPresentTopic) Error ¶ added in v0.27.0
func (it InvalidOrNotPresentTopic) Error() string
type NewClientFunc ¶ added in v0.27.0
NewClientFunc creates new sarama.Client.
type NewClusterAdminClientFunc ¶ added in v0.28.0
type NewClusterAdminClientFunc func(addrs []string, config *sarama.Config) (sarama.ClusterAdmin, error)
NewClusterAdminClientFunc creates new sarama.ClusterAdmin.
type TopicConfig ¶ added in v0.18.0
type TopicConfig struct { TopicDetail sarama.TopicDetail BootstrapServers []string }
TopicConfig contains configurations for creating a topic.
func TopicConfigFromConfigMap ¶ added in v0.28.0
func (TopicConfig) GetBootstrapServers ¶ added in v0.18.0
func (c TopicConfig) GetBootstrapServers() string
GetBootstrapServers returns TopicConfig.BootstrapServers as a comma separated list of bootstrap servers.