Documentation ¶
Index ¶
- Constants
- func AreConsumerGroupsPresentAndValid(kafkaClusterAdmin sarama.ClusterAdmin, consumerGroups ...string) (bool, error)
- func AreTopicsPresentAndValid(kafkaClusterAdmin sarama.ClusterAdmin, topics ...string) (bool, error)
- func BootstrapServersArray(bootstrapServers string) []string
- func BootstrapServersCommaSeparated(bootstrapServers []string) string
- func BootstrapServersFromConfigMap(logger *zap.Logger, cm *corev1.ConfigMap) ([]string, error)
- func BrokerClassFilter() func(interface{}) bool
- func BrokerTopic(prefix string, obj metav1.Object) string
- func ChannelTopic(prefix string, obj metav1.Object) string
- func CreateTopicIfDoesntExist(admin sarama.ClusterAdmin, logger *zap.Logger, topic string, ...) (string, error)
- func DeleteTopic(admin sarama.ClusterAdmin, topic string) (string, error)
- func DisableOffsetAutoCommitConfigOption(config *sarama.Config) error
- func FilterAny(funcs ...func(obj interface{}) bool) func(obj interface{}) bool
- func FilterWithLabel(key, value string) func(obj interface{}) bool
- func GetSaramaConfig(configOptions ...ConfigOption) (*sarama.Config, error)
- func IsOffsetLatest(lister corelisters.ConfigMapLister, namespace, name, key string) (bool, error)
- func NamespacedBrokerClassFilter() func(interface{}) bool
- func NamespacedDataplaneLabelConfigmapOption(cm *corev1.ConfigMap)
- func NoOpConfigOption(*sarama.Config) error
- func Options(config *sarama.Config, options ...ConfigOption) error
- type ConfigOption
- type ConsumerGroupLag
- type ConsumerGroupLagProvider
- type InitOffsetsFunc
- type InvalidOrNotPresentTopic
- type NewClientFunc
- type NewClusterAdminClientFunc
- type PartitionLag
- type TopicConfig
Constants ¶
const ( // Kafka broker class annotation value. BrokerClass = "Kafka" NamespacedBrokerClass = "KafkaNamespaced" )
const ( NamespacedBrokerDataplaneLabelKey = "eventing.knative.dev/namespaced" NamespacedBrokerDataplaneLabelValue = "true" )
const ( DefaultTopicNumPartitionConfigMapKey = "default.topic.partitions" DefaultTopicReplicationFactorConfigMapKey = "default.topic.replication.factor" BootstrapServersConfigMapKey = "bootstrap.servers" GroupIDConfigMapKey = "group.id" TopicAnnotation = "default.topic" )
const (
GroupIdAnnotation = "group.id"
)
Variables ¶
This section is empty.
Functions ¶
func AreConsumerGroupsPresentAndValid ¶ added in v0.37.0
func AreConsumerGroupsPresentAndValid(kafkaClusterAdmin sarama.ClusterAdmin, consumerGroups ...string) (bool, error)
func AreTopicsPresentAndValid ¶ added in v0.30.1
func AreTopicsPresentAndValid(kafkaClusterAdmin sarama.ClusterAdmin, topics ...string) (bool, error)
func BootstrapServersArray ¶ added in v0.30.1
func BootstrapServersCommaSeparated ¶ added in v0.30.1
func BootstrapServersFromConfigMap ¶ added in v0.30.1
func BrokerClassFilter ¶ added in v0.30.1
func BrokerClassFilter() func(interface{}) bool
func BrokerTopic ¶ added in v0.30.1
BrokerTopic returns a topic name given a topic prefix and a Broker.
func ChannelTopic ¶ added in v0.30.1
ChannelTopic returns a topic name given a topic prefix and a KafkaChannel.
func CreateTopicIfDoesntExist ¶ added in v0.30.1
func CreateTopicIfDoesntExist(admin sarama.ClusterAdmin, logger *zap.Logger, topic string, config *TopicConfig) (string, error)
CreateTopicIfDoesntExist creates a topic with name 'topic' following the TopicConfig configuration passed as parameter.
It returns the topic name or an error.
If the topic already exists, it will return no errors. TODO: what happens if the topic exists but it has a different config?
func DeleteTopic ¶ added in v0.30.1
func DeleteTopic(admin sarama.ClusterAdmin, topic string) (string, error)
func DisableOffsetAutoCommitConfigOption ¶ added in v0.30.1
func FilterWithLabel ¶ added in v0.34.0
func GetSaramaConfig ¶ added in v0.30.1
func GetSaramaConfig(configOptions ...ConfigOption) (*sarama.Config, error)
GetSaramaConfig returns Kafka Client configuration with the given options applied.
func IsOffsetLatest ¶ added in v0.35.0
func IsOffsetLatest(lister corelisters.ConfigMapLister, namespace, name, key string) (bool, error)
IsOffsetLatest returns whether the configured `auto.offset.reset` it set to latest in the given ConfigMap.
func NamespacedBrokerClassFilter ¶ added in v0.34.0
func NamespacedBrokerClassFilter() func(interface{}) bool
func NamespacedDataplaneLabelConfigmapOption ¶ added in v0.34.0
func NoOpConfigOption ¶ added in v0.30.1
NoOpConfigOption is a no-op ConfigOption.
Types ¶
type ConfigOption ¶ added in v0.30.1
type ConsumerGroupLag ¶
type ConsumerGroupLag struct { Topic string ConsumerGroup string ByPartition []PartitionLag }
ConsumerGroupLag contains partition lag of a topic.
func (ConsumerGroupLag) String ¶
func (cgl ConsumerGroupLag) String() string
func (ConsumerGroupLag) Total ¶
func (cgl ConsumerGroupLag) Total() uint64
Total returns the sum of each partition lag.
type ConsumerGroupLagProvider ¶
type ConsumerGroupLagProvider interface { // GetLag returns consumer group lag for a given topic and a given consumer group. GetLag(topic, consumerGroup string) (ConsumerGroupLag, error) // Close closes the consumer group lag provider. Close() error }
ConsumerGroupLagProvider provides consumer group lags.
func NewConsumerGroupLagProvider ¶
func NewConsumerGroupLagProvider(client sarama.Client, adminFunc adminFunc, saramaOffsetStrategy int64) ConsumerGroupLagProvider
NewConsumerGroupLagProvider creates a new ConsumerGroupLagProvider.
type InitOffsetsFunc ¶ added in v0.30.1
type InitOffsetsFunc func(ctx context.Context, kafkaClient sarama.Client, kafkaAdminClient sarama.ClusterAdmin, topics []string, consumerGroup string) (int32, error)
InitOffsetsFunc initialize offsets for a provided set of topics and a provided consumer group id.
type InvalidOrNotPresentTopic ¶ added in v0.30.1
type InvalidOrNotPresentTopic struct {
Topic string
}
func (InvalidOrNotPresentTopic) Error ¶ added in v0.30.1
func (it InvalidOrNotPresentTopic) Error() string
type NewClientFunc ¶ added in v0.30.1
NewClientFunc creates new sarama.Client.
type NewClusterAdminClientFunc ¶ added in v0.30.1
type NewClusterAdminClientFunc func(addrs []string, config *sarama.Config) (sarama.ClusterAdmin, error)
NewClusterAdminClientFunc creates new sarama.ClusterAdmin.
type PartitionLag ¶
type PartitionLag struct { // Offset that will be produced next. LatestOffset int64 // Offset that will be consumed next. ConsumerOffset int64 // Signal whether a consumer made a fetch request or not to the leader of this partition. // // Note: a committed offset is the offset that will be consumed next. // // OffsetCommitted = false, no fetch request has been made by a consumer to the leader of this partition. // OffsetCommitted = true, a fetch request has been made by a consumer to the leader of this partition. OffsetCommitted bool }
PartitionLag contains consumer lag information of a partition.
func (PartitionLag) Lag ¶
func (pl PartitionLag) Lag() int64
Lag returns LatestOffset - ConsumerOffset.
func (PartitionLag) String ¶
func (pl PartitionLag) String() string
type TopicConfig ¶ added in v0.30.1
type TopicConfig struct { TopicDetail sarama.TopicDetail BootstrapServers []string }
TopicConfig contains configurations for creating a topic.
func TopicConfigFromConfigMap ¶ added in v0.30.1
func (TopicConfig) GetBootstrapServers ¶ added in v0.30.1
func (c TopicConfig) GetBootstrapServers() string
GetBootstrapServers returns TopicConfig.BootstrapServers as a comma separated list of bootstrap servers.