kafka

package
v0.36.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 26, 2023 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// Kafka broker class annotation value.
	BrokerClass           = "Kafka"
	NamespacedBrokerClass = "KafkaNamespaced"
)
View Source
const (
	NamespacedBrokerDataplaneLabelKey   = "eventing.knative.dev/namespaced"
	NamespacedBrokerDataplaneLabelValue = "true"
)
View Source
const (
	DefaultTopicNumPartitionConfigMapKey      = "default.topic.partitions"
	DefaultTopicReplicationFactorConfigMapKey = "default.topic.replication.factor"
	BootstrapServersConfigMapKey              = "bootstrap.servers"

	GroupIDConfigMapKey = "group.id"

	TopicAnnotation = "default.topic"
)

Variables

This section is empty.

Functions

func AreTopicsPresentAndValid added in v0.30.1

func AreTopicsPresentAndValid(kafkaClusterAdmin sarama.ClusterAdmin, topics ...string) (bool, error)

func BootstrapServersArray added in v0.30.1

func BootstrapServersArray(bootstrapServers string) []string

func BootstrapServersCommaSeparated added in v0.30.1

func BootstrapServersCommaSeparated(bootstrapServers []string) string

func BootstrapServersFromConfigMap added in v0.30.1

func BootstrapServersFromConfigMap(logger *zap.Logger, cm *corev1.ConfigMap) ([]string, error)

func BrokerClassFilter added in v0.30.1

func BrokerClassFilter() func(interface{}) bool

func BrokerTopic added in v0.30.1

func BrokerTopic(prefix string, obj metav1.Object) string

BrokerTopic returns a topic name given a topic prefix and a Broker.

func ChannelTopic added in v0.30.1

func ChannelTopic(prefix string, obj metav1.Object) string

ChannelTopic returns a topic name given a topic prefix and a KafkaChannel.

func CreateTopicIfDoesntExist added in v0.30.1

func CreateTopicIfDoesntExist(admin sarama.ClusterAdmin, logger *zap.Logger, topic string, config *TopicConfig) (string, error)

CreateTopicIfDoesntExist creates a topic with name 'topic' following the TopicConfig configuration passed as parameter.

It returns the topic name or an error.

If the topic already exists, it will return no errors. TODO: what happens if the topic exists but it has a different config?

func DeleteTopic added in v0.30.1

func DeleteTopic(admin sarama.ClusterAdmin, topic string) (string, error)

func DisableOffsetAutoCommitConfigOption added in v0.30.1

func DisableOffsetAutoCommitConfigOption(config *sarama.Config) error

func FilterAny added in v0.34.0

func FilterAny(funcs ...func(obj interface{}) bool) func(obj interface{}) bool

func FilterWithLabel added in v0.34.0

func FilterWithLabel(key, value string) func(obj interface{}) bool

func GetSaramaConfig added in v0.30.1

func GetSaramaConfig(configOptions ...ConfigOption) (*sarama.Config, error)

GetSaramaConfig returns Kafka Client configuration with the given options applied.

func IsOffsetLatest added in v0.35.0

func IsOffsetLatest(lister corelisters.ConfigMapLister, namespace, name, key string) (bool, error)

IsOffsetLatest returns whether the configured `auto.offset.reset` it set to latest in the given ConfigMap.

func NamespacedBrokerClassFilter added in v0.34.0

func NamespacedBrokerClassFilter() func(interface{}) bool

func NamespacedDataplaneLabelConfigmapOption added in v0.34.0

func NamespacedDataplaneLabelConfigmapOption(cm *corev1.ConfigMap)

func NoOpConfigOption added in v0.30.1

func NoOpConfigOption(*sarama.Config) error

NoOpConfigOption is a no-op ConfigOption.

func Options added in v0.30.1

func Options(config *sarama.Config, options ...ConfigOption) error

Types

type ConfigOption added in v0.30.1

type ConfigOption func(config *sarama.Config) error

type ConsumerGroupLag

type ConsumerGroupLag struct {
	Topic         string
	ConsumerGroup string
	ByPartition   []PartitionLag
}

ConsumerGroupLag contains partition lag of a topic.

func (ConsumerGroupLag) String

func (cgl ConsumerGroupLag) String() string

func (ConsumerGroupLag) Total

func (cgl ConsumerGroupLag) Total() uint64

Total returns the sum of each partition lag.

type ConsumerGroupLagProvider

type ConsumerGroupLagProvider interface {
	// GetLag returns consumer group lag for a given topic and a given consumer group.
	GetLag(topic, consumerGroup string) (ConsumerGroupLag, error)

	// Close closes the consumer group lag provider.
	Close() error
}

ConsumerGroupLagProvider provides consumer group lags.

func NewConsumerGroupLagProvider

func NewConsumerGroupLagProvider(client sarama.Client, adminFunc adminFunc, saramaOffsetStrategy int64) ConsumerGroupLagProvider

NewConsumerGroupLagProvider creates a new ConsumerGroupLagProvider.

type InitOffsetsFunc added in v0.30.1

type InitOffsetsFunc func(ctx context.Context, kafkaClient sarama.Client, kafkaAdminClient sarama.ClusterAdmin, topics []string, consumerGroup string) (int32, error)

InitOffsetsFunc initialize offsets for a provided set of topics and a provided consumer group id.

type InvalidOrNotPresentTopic added in v0.30.1

type InvalidOrNotPresentTopic struct {
	Topic string
}

func (InvalidOrNotPresentTopic) Error added in v0.30.1

func (it InvalidOrNotPresentTopic) Error() string

type NewClientFunc added in v0.30.1

type NewClientFunc func(addrs []string, config *sarama.Config) (sarama.Client, error)

NewClientFunc creates new sarama.Client.

type NewClusterAdminClientFunc added in v0.30.1

type NewClusterAdminClientFunc func(addrs []string, config *sarama.Config) (sarama.ClusterAdmin, error)

NewClusterAdminClientFunc creates new sarama.ClusterAdmin.

type PartitionLag

type PartitionLag struct {
	// Offset that will be produced next.
	LatestOffset int64
	// Offset that will be consumed next.
	ConsumerOffset int64
	// Signal whether a consumer made a fetch request or not to the leader of this partition.
	//
	// Note: a committed offset is the offset that will be consumed next.
	//
	// OffsetCommitted = false, no fetch request has been made by a consumer to the leader of this partition.
	// OffsetCommitted = true, a fetch request has been made by a consumer to the leader of this partition.
	OffsetCommitted bool
}

PartitionLag contains consumer lag information of a partition.

func (PartitionLag) Lag

func (pl PartitionLag) Lag() int64

Lag returns LatestOffset - ConsumerOffset.

func (PartitionLag) String

func (pl PartitionLag) String() string

type TopicConfig added in v0.30.1

type TopicConfig struct {
	TopicDetail      sarama.TopicDetail
	BootstrapServers []string
}

TopicConfig contains configurations for creating a topic.

func TopicConfigFromConfigMap added in v0.30.1

func TopicConfigFromConfigMap(logger *zap.Logger, cm *corev1.ConfigMap) (*TopicConfig, error)

func (TopicConfig) GetBootstrapServers added in v0.30.1

func (c TopicConfig) GetBootstrapServers() string

GetBootstrapServers returns TopicConfig.BootstrapServers as a comma separated list of bootstrap servers.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL