Documentation ¶
Index ¶
- Constants
- Variables
- func Enqueue(userFacingResource string, enqueue func(key types.NamespacedName)) func(obj interface{})
- func Filter(userFacingResource string) func(obj interface{}) bool
- func NewController(ctx context.Context, watcher configmap.Watcher) *controller.Impl
- func ResyncOnStatefulSetChange(ctx context.Context, handle func(interface{}))
- type ConsumersPerPlacement
- type NetSpecSecretLocator
- type NoSchedulerFoundError
- type Reconciler
- type Scheduler
- type SchedulerConfig
- type SecretSpecSecretLocator
Constants ¶
View Source
const ( // KafkaSourceScheduler is the key for the scheduler map for any KafkaSource. // Keep this constant lowercase. KafkaSourceScheduler = "kafkasource" // KafkaTriggerScheduler is the key for the scheduler map for any Kafka Trigger. // Keep this constant lowercase. KafkaTriggerScheduler = "trigger" // KafkaChannelScheduler is the key for the scheduler map for any KafkaChannel. // Keep this constant lowercase. KafkaChannelScheduler = "kafkachannel" )
Variables ¶
View Source
var ( ErrNoSubscriberURI = errors.New("no subscriber URI resolved") ErrNoDeadLetterSinkURI = errors.New("no dead letter sink URI resolved") )
View Source
var ( ConsumerNameTagKey = tag.MustNewKey("consumer_name") ConsumerKindTagKey = tag.MustNewKey("consumer_kind") )
Functions ¶
func Enqueue ¶ added in v0.29.0
func Enqueue(userFacingResource string, enqueue func(key types.NamespacedName)) func(obj interface{})
Enqueue enqueues using the provided enqueue function the resource associated with a ConsumerGroup
func Filter ¶ added in v0.29.0
Filter returns a filter function based on the user-facing resource that a controller is tracking. Usable by FilteringResourceEventHandler.
func NewController ¶ added in v0.29.0
func ResyncOnStatefulSetChange ¶ added in v0.30.0
Types ¶
type ConsumersPerPlacement ¶ added in v0.29.0
type ConsumersPerPlacement struct { Placement *eventingduckv1alpha1.Placement Consumers []*kafkainternals.Consumer }
type NetSpecSecretLocator ¶ added in v0.31.0
type NetSpecSecretLocator struct {
*kafkainternals.ConsumerGroup
}
func (*NetSpecSecretLocator) SecretName ¶ added in v0.31.0
func (sl *NetSpecSecretLocator) SecretName() (string, bool)
func (*NetSpecSecretLocator) SecretNamespace ¶ added in v0.31.0
func (sl *NetSpecSecretLocator) SecretNamespace() (string, bool)
type NoSchedulerFoundError ¶ added in v0.40.7
type NoSchedulerFoundError struct{}
func (NoSchedulerFoundError) Error ¶ added in v0.40.7
func (NoSchedulerFoundError) Error() string
type Reconciler ¶
type Reconciler struct { SchedulerFunc schedulerFunc ConsumerLister kafkainternalslisters.ConsumerLister InternalsClient internalv1alpha1.InternalV1alpha1Interface SecretLister corelisters.SecretLister ConfigMapLister corelisters.ConfigMapLister PodLister corelisters.PodLister KubeClient kubernetes.Interface Resolver *resolver.URIResolver NameGenerator names.NameGenerator // GetKafkaClient creates new sarama Client. It's convenient to add this as Reconciler field so that we can // mock the function used during the reconciliation loop. GetKafkaClient clientpool.GetKafkaClientFunc // InitOffsetsFunc initialize offsets for a provided set of topics and a provided consumer group id. // It's convenient to add this as Reconciler field so that we can mock the function used during the // reconciliation loop. InitOffsetsFunc kafka.InitOffsetsFunc SystemNamespace string // GetKafkaClusterAdmin creates new sarama ClusterAdmin. It's convenient to add this as Reconciler field so that we can // mock the function used during the reconciliation loop. GetKafkaClusterAdmin clientpool.GetKafkaClusterAdminFunc KafkaFeatureFlags *config.KafkaFeatureFlags KedaClient kedaclientset.Interface AutoscalerConfig string // DeleteConsumerGroupMetadataCounter is an in-memory counter to count how many times we have // tried to delete consumer group metadata from Kafka. DeleteConsumerGroupMetadataCounter *counter.Counter // InitOffsetLatestInitialOffsetCache is the cache for consumer group offset initialization. // // When there is high load and multiple consumer group schedule calls, we get many // `dial tcp 10.130.4.8:9092: i/o timeout` errors when trying to connect to Kafka. // This leads to increased "time to readiness" for consumer groups. InitOffsetLatestInitialOffsetCache prober.Cache[string, prober.Status, struct{}] EnqueueKey func(key string) }
func (*Reconciler) FinalizeKind ¶ added in v0.32.0
func (r *Reconciler) FinalizeKind(ctx context.Context, cg *kafkainternals.ConsumerGroup) reconciler.Event
func (*Reconciler) ReconcileKind ¶
func (r *Reconciler) ReconcileKind(ctx context.Context, cg *kafkainternals.ConsumerGroup) reconciler.Event
type Scheduler ¶ added in v0.35.4
type Scheduler struct { scheduler.Scheduler SchedulerConfig }
type SchedulerConfig ¶ added in v0.29.0
type SchedulerConfig struct { StatefulSetName string RefreshPeriod time.Duration Capacity int32 SchedulerPolicy *scheduler.SchedulerPolicy DeSchedulerPolicy *scheduler.SchedulerPolicy }
type SecretSpecSecretLocator ¶ added in v0.36.0
type SecretSpecSecretLocator struct {
*kafkainternals.ConsumerGroup
}
func (*SecretSpecSecretLocator) SecretName ¶ added in v0.36.0
func (sl *SecretSpecSecretLocator) SecretName() (string, bool)
func (*SecretSpecSecretLocator) SecretNamespace ¶ added in v0.36.0
func (sl *SecretSpecSecretLocator) SecretNamespace() (string, bool)
Click to show internal directories.
Click to hide internal directories.