Documentation ¶
Index ¶
- type Dispatcher
- type DispatcherConfig
- type DispatcherImpl
- func (d *DispatcherImpl) ConfigChanged(ctx context.Context, configMap *corev1.ConfigMap) Dispatcher
- func (d *DispatcherImpl) ObserveMetrics(interval time.Duration)
- func (d *DispatcherImpl) SecretChanged(ctx context.Context, secret *corev1.Secret) Dispatcher
- func (d *DispatcherImpl) Shutdown()
- func (d *DispatcherImpl) UpdateSubscriptions(subscriberSpecs []eventingduck.SubscriberSpec) map[eventingduck.SubscriberSpec]error
- type Handler
- type SubscriberWrapper
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Dispatcher ¶
type Dispatcher interface { ConfigChanged(ctx context.Context, configMap *corev1.ConfigMap) Dispatcher SecretChanged(ctx context.Context, secret *corev1.Secret) Dispatcher Shutdown() UpdateSubscriptions(subscriberSpecs []eventingduck.SubscriberSpec) map[eventingduck.SubscriberSpec]error }
Dispatcher Interface
func NewDispatcher ¶
func NewDispatcher(dispatcherConfig DispatcherConfig) Dispatcher
Dispatcher Constructor
type DispatcherConfig ¶
type DispatcherConfig struct { Logger *zap.Logger ClientId string Brokers []string Topic string Username string Password string ChannelKey string StatsReporter metrics.StatsReporter MetricsRegistry gometrics.Registry SaramaConfig *sarama.Config SubscriberSpecs []eventingduck.SubscriberSpec }
Define A Dispatcher Config Struct To Hold Configuration
type DispatcherImpl ¶
type DispatcherImpl struct { DispatcherConfig MetricsStopChan chan struct{} MetricsStoppedChan chan struct{} // contains filtered or unexported fields }
Define A DispatcherImpl Struct With Configuration & ConsumerGroup State
func (*DispatcherImpl) ConfigChanged ¶
func (d *DispatcherImpl) ConfigChanged(ctx context.Context, configMap *corev1.ConfigMap) Dispatcher
ConfigChanged is called by the configMapObserver handler function in main() so that settings specific to the dispatcher may be extracted and the ConsumerGroups restarted if necessary. The new configmap could technically have changes to the eventing-kafka section as well as the sarama section, but none of those matter to a currently-running Dispatcher, so those are ignored here (which avoids the necessity of calling env.GetEnvironment). If those settings are needed in the future, the environment will also need to be re-parsed here. If there aren't any consumer-specific differences between the current config and the new one, then just log that and move on; do not restart the ConsumerGroups unnecessarily.
func (*DispatcherImpl) ObserveMetrics ¶ added in v0.22.0
func (d *DispatcherImpl) ObserveMetrics(interval time.Duration)
Async Process For Observing Kafka Metrics
func (*DispatcherImpl) SecretChanged ¶ added in v0.21.0
func (d *DispatcherImpl) SecretChanged(ctx context.Context, secret *corev1.Secret) Dispatcher
SecretChanged is called by the secretObserver handler function in main() so that settings specific to the dispatcher may be extracted and the dispatcher restarted if necessary.
func (*DispatcherImpl) UpdateSubscriptions ¶
func (d *DispatcherImpl) UpdateSubscriptions(subscriberSpecs []eventingduck.SubscriberSpec) map[eventingduck.SubscriberSpec]error
Update The Dispatcher's Subscriptions To Align With New State
type Handler ¶
type Handler struct { Logger *zap.Logger Subscriber *eventingduck.SubscriberSpec MessageDispatcher channel.MessageDispatcher }
Define A Sarama ConsumerGroupHandler Implementation
func NewHandler ¶
func NewHandler(logger *zap.Logger, subscriber *eventingduck.SubscriberSpec) *Handler
Create A New Handler
func (*Handler) Cleanup ¶
func (h *Handler) Cleanup(_ sarama.ConsumerGroupSession) error
ConsumerGroupHandler Lifecycle Method (Runs after all ConsumeClaims stop but before final offset commit)
func (*Handler) ConsumeClaim ¶
func (h *Handler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumerGroupHandler Lifecycle Method (Main processing loop, must finish when claim.Messages() channel closes.)
type SubscriberWrapper ¶
type SubscriberWrapper struct { eventingduck.SubscriberSpec GroupId string ConsumerGroup sarama.ConsumerGroup StopChan chan struct{} }
Knative Eventing SubscriberSpec Wrapper Enhanced With Sarama ConsumerGroup
func NewSubscriberWrapper ¶
func NewSubscriberWrapper(subscriberSpec eventingduck.SubscriberSpec, groupId string, consumerGroup sarama.ConsumerGroup) *SubscriberWrapper
SubscriberWrapper Constructor