kafka

package
v0.37.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 26, 2023 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// Kafka broker class annotation value.
	BrokerClass           = "Kafka"
	NamespacedBrokerClass = "KafkaNamespaced"
)
View Source
const (
	NamespacedBrokerDataplaneLabelKey   = "eventing.knative.dev/namespaced"
	NamespacedBrokerDataplaneLabelValue = "true"
)
View Source
const (
	DefaultTopicNumPartitionConfigMapKey      = "default.topic.partitions"
	DefaultTopicReplicationFactorConfigMapKey = "default.topic.replication.factor"
	BootstrapServersConfigMapKey              = "bootstrap.servers"

	GroupIDConfigMapKey = "group.id"

	TopicAnnotation = "default.topic"
)
View Source
const (
	GroupIdAnnotation = "group.id"
)

Variables

This section is empty.

Functions

func AreConsumerGroupsPresentAndValid added in v0.37.0

func AreConsumerGroupsPresentAndValid(kafkaClusterAdmin sarama.ClusterAdmin, consumerGroups ...string) (bool, error)

func AreTopicsPresentAndValid added in v0.30.1

func AreTopicsPresentAndValid(kafkaClusterAdmin sarama.ClusterAdmin, topics ...string) (bool, error)

func BootstrapServersArray added in v0.30.1

func BootstrapServersArray(bootstrapServers string) []string

func BootstrapServersCommaSeparated added in v0.30.1

func BootstrapServersCommaSeparated(bootstrapServers []string) string

func BootstrapServersFromConfigMap added in v0.30.1

func BootstrapServersFromConfigMap(logger *zap.Logger, cm *corev1.ConfigMap) ([]string, error)

func BrokerClassFilter added in v0.30.1

func BrokerClassFilter() func(interface{}) bool

func BrokerTopic added in v0.30.1

func BrokerTopic(prefix string, obj metav1.Object) string

BrokerTopic returns a topic name given a topic prefix and a Broker.

func ChannelTopic added in v0.30.1

func ChannelTopic(prefix string, obj metav1.Object) string

ChannelTopic returns a topic name given a topic prefix and a KafkaChannel.

func CreateTopicIfDoesntExist added in v0.30.1

func CreateTopicIfDoesntExist(admin sarama.ClusterAdmin, logger *zap.Logger, topic string, config *TopicConfig) (string, error)

CreateTopicIfDoesntExist creates a topic with name 'topic' following the TopicConfig configuration passed as parameter.

It returns the topic name or an error.

If the topic already exists, it will return no errors. TODO: what happens if the topic exists but it has a different config?

func DeleteTopic added in v0.30.1

func DeleteTopic(admin sarama.ClusterAdmin, topic string) (string, error)

func DisableOffsetAutoCommitConfigOption added in v0.30.1

func DisableOffsetAutoCommitConfigOption(config *sarama.Config) error

func FilterAny added in v0.34.0

func FilterAny(funcs ...func(obj interface{}) bool) func(obj interface{}) bool

func FilterWithLabel added in v0.34.0

func FilterWithLabel(key, value string) func(obj interface{}) bool

func GetSaramaConfig added in v0.30.1

func GetSaramaConfig(configOptions ...ConfigOption) (*sarama.Config, error)

GetSaramaConfig returns Kafka Client configuration with the given options applied.

func IsOffsetLatest added in v0.35.0

func IsOffsetLatest(lister corelisters.ConfigMapLister, namespace, name, key string) (bool, error)

IsOffsetLatest returns whether the configured `auto.offset.reset` it set to latest in the given ConfigMap.

func NamespacedBrokerClassFilter added in v0.34.0

func NamespacedBrokerClassFilter() func(interface{}) bool

func NamespacedDataplaneLabelConfigmapOption added in v0.34.0

func NamespacedDataplaneLabelConfigmapOption(cm *corev1.ConfigMap)

func NoOpConfigOption added in v0.30.1

func NoOpConfigOption(*sarama.Config) error

NoOpConfigOption is a no-op ConfigOption.

func Options added in v0.30.1

func Options(config *sarama.Config, options ...ConfigOption) error

Types

type ConfigOption added in v0.30.1

type ConfigOption func(config *sarama.Config) error

type ConsumerGroupLag

type ConsumerGroupLag struct {
	Topic         string
	ConsumerGroup string
	ByPartition   []PartitionLag
}

ConsumerGroupLag contains partition lag of a topic.

func (ConsumerGroupLag) String

func (cgl ConsumerGroupLag) String() string

func (ConsumerGroupLag) Total

func (cgl ConsumerGroupLag) Total() uint64

Total returns the sum of each partition lag.

type ConsumerGroupLagProvider

type ConsumerGroupLagProvider interface {
	// GetLag returns consumer group lag for a given topic and a given consumer group.
	GetLag(topic, consumerGroup string) (ConsumerGroupLag, error)

	// Close closes the consumer group lag provider.
	Close() error
}

ConsumerGroupLagProvider provides consumer group lags.

func NewConsumerGroupLagProvider

func NewConsumerGroupLagProvider(client sarama.Client, adminFunc adminFunc, saramaOffsetStrategy int64) ConsumerGroupLagProvider

NewConsumerGroupLagProvider creates a new ConsumerGroupLagProvider.

type InitOffsetsFunc added in v0.30.1

type InitOffsetsFunc func(ctx context.Context, kafkaClient sarama.Client, kafkaAdminClient sarama.ClusterAdmin, topics []string, consumerGroup string) (int32, error)

InitOffsetsFunc initialize offsets for a provided set of topics and a provided consumer group id.

type InvalidOrNotPresentTopic added in v0.30.1

type InvalidOrNotPresentTopic struct {
	Topic string
}

func (InvalidOrNotPresentTopic) Error added in v0.30.1

func (it InvalidOrNotPresentTopic) Error() string

type NewClientFunc added in v0.30.1

type NewClientFunc func(addrs []string, config *sarama.Config) (sarama.Client, error)

NewClientFunc creates new sarama.Client.

type NewClusterAdminClientFunc added in v0.30.1

type NewClusterAdminClientFunc func(addrs []string, config *sarama.Config) (sarama.ClusterAdmin, error)

NewClusterAdminClientFunc creates new sarama.ClusterAdmin.

type PartitionLag

type PartitionLag struct {
	// Offset that will be produced next.
	LatestOffset int64
	// Offset that will be consumed next.
	ConsumerOffset int64
	// Signal whether a consumer made a fetch request or not to the leader of this partition.
	//
	// Note: a committed offset is the offset that will be consumed next.
	//
	// OffsetCommitted = false, no fetch request has been made by a consumer to the leader of this partition.
	// OffsetCommitted = true, a fetch request has been made by a consumer to the leader of this partition.
	OffsetCommitted bool
}

PartitionLag contains consumer lag information of a partition.

func (PartitionLag) Lag

func (pl PartitionLag) Lag() int64

Lag returns LatestOffset - ConsumerOffset.

func (PartitionLag) String

func (pl PartitionLag) String() string

type TopicConfig added in v0.30.1

type TopicConfig struct {
	TopicDetail      sarama.TopicDetail
	BootstrapServers []string
}

TopicConfig contains configurations for creating a topic.

func TopicConfigFromConfigMap added in v0.30.1

func TopicConfigFromConfigMap(logger *zap.Logger, cm *corev1.ConfigMap) (*TopicConfig, error)

func (TopicConfig) GetBootstrapServers added in v0.30.1

func (c TopicConfig) GetBootstrapServers() string

GetBootstrapServers returns TopicConfig.BootstrapServers as a comma separated list of bootstrap servers.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL