kafka

package
v0.30.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 8, 2022 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultTopicNumPartitionConfigMapKey      = "default.topic.partitions"
	DefaultTopicReplicationFactorConfigMapKey = "default.topic.replication.factor"
	BootstrapServersConfigMapKey              = "bootstrap.servers"
)
View Source
const (
	// Kafka broker class annotation value.
	BrokerClass = "Kafka"
)

Variables

This section is empty.

Functions

func AreTopicsPresentAndValid added in v0.27.0

func AreTopicsPresentAndValid(kafkaClusterAdmin sarama.ClusterAdmin, topics ...string) (bool, error)

func BootstrapServersArray added in v0.18.0

func BootstrapServersArray(bootstrapServers string) []string

func BootstrapServersCommaSeparated added in v0.18.0

func BootstrapServersCommaSeparated(bootstrapServers []string) string

func BootstrapServersFromConfigMap added in v0.28.0

func BootstrapServersFromConfigMap(logger *zap.Logger, cm *corev1.ConfigMap) ([]string, error)

func BrokerClassFilter

func BrokerClassFilter() func(interface{}) bool

func BrokerTopic added in v0.28.0

func BrokerTopic(prefix string, obj metav1.Object) string

BrokerTopic returns a topic name given a topic prefix and a Broker.

func ChannelTopic added in v0.28.0

func ChannelTopic(prefix string, obj metav1.Object) string

ChannelTopic returns a topic name given a topic prefix and a KafkaChannel.

func CreateTopicIfDoesntExist added in v0.26.0

func CreateTopicIfDoesntExist(admin sarama.ClusterAdmin, logger *zap.Logger, topic string, config *TopicConfig) (string, error)

CreateTopicIfDoesntExist creates a topic with name 'topic' following the TopicConfig configuration passed as parameter.

It returns the topic name or an error.

If the topic already exists, it will return no errors. TODO: what happens if the topic exists but it has a different config?

func DeleteTopic added in v0.18.0

func DeleteTopic(admin sarama.ClusterAdmin, topic string) (string, error)

func DisableOffsetAutoCommitConfigOption added in v0.27.0

func DisableOffsetAutoCommitConfigOption(config *sarama.Config) error

func GetSaramaConfig added in v0.27.0

func GetSaramaConfig(configOptions ...ConfigOption) (*sarama.Config, error)

GetSaramaConfig returns Kafka Client configuration with the given options applied.

func NoOpConfigOption added in v0.27.0

func NoOpConfigOption(*sarama.Config) error

NoOpConfigOption is a no-op ConfigOption.

func Options added in v0.27.0

func Options(config *sarama.Config, options ...ConfigOption) error

Types

type ConfigOption added in v0.27.0

type ConfigOption func(config *sarama.Config) error

type InitOffsetsFunc added in v0.28.0

type InitOffsetsFunc func(ctx context.Context, kafkaClient sarama.Client, kafkaAdminClient sarama.ClusterAdmin, topics []string, consumerGroup string) (int32, error)

InitOffsetsFunc initialize offsets for a provided set of topics and a provided consumer group id.

type InvalidOrNotPresentTopic added in v0.27.0

type InvalidOrNotPresentTopic struct {
	Topic string
}

func (InvalidOrNotPresentTopic) Error added in v0.27.0

func (it InvalidOrNotPresentTopic) Error() string

type NewClientFunc added in v0.27.0

type NewClientFunc func(addrs []string, config *sarama.Config) (sarama.Client, error)

NewClientFunc creates new sarama.Client.

type NewClusterAdminClientFunc added in v0.28.0

type NewClusterAdminClientFunc func(addrs []string, config *sarama.Config) (sarama.ClusterAdmin, error)

NewClusterAdminClientFunc creates new sarama.ClusterAdmin.

type TopicConfig added in v0.18.0

type TopicConfig struct {
	TopicDetail      sarama.TopicDetail
	BootstrapServers []string
}

TopicConfig contains configurations for creating a topic.

func TopicConfigFromConfigMap added in v0.28.0

func TopicConfigFromConfigMap(logger *zap.Logger, cm *corev1.ConfigMap) (*TopicConfig, error)

func (TopicConfig) GetBootstrapServers added in v0.18.0

func (c TopicConfig) GetBootstrapServers() string

GetBootstrapServers returns TopicConfig.BootstrapServers as a comma separated list of bootstrap servers.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL