dispatcher

package
v0.22.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 8, 2021 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Dispatcher

type Dispatcher interface {
	ConfigChanged(ctx context.Context, configMap *corev1.ConfigMap) Dispatcher
	SecretChanged(ctx context.Context, secret *corev1.Secret) Dispatcher
	Shutdown()
	UpdateSubscriptions(subscriberSpecs []eventingduck.SubscriberSpec) map[eventingduck.SubscriberSpec]error
}

Dispatcher Interface

func NewDispatcher

func NewDispatcher(dispatcherConfig DispatcherConfig) Dispatcher

Dispatcher Constructor

type DispatcherConfig

type DispatcherConfig struct {
	Logger          *zap.Logger
	ClientId        string
	Brokers         []string
	Topic           string
	Username        string
	Password        string
	ChannelKey      string
	StatsReporter   metrics.StatsReporter
	MetricsRegistry gometrics.Registry
	SaramaConfig    *sarama.Config
	SubscriberSpecs []eventingduck.SubscriberSpec
}

Define A Dispatcher Config Struct To Hold Configuration

type DispatcherImpl

type DispatcherImpl struct {
	DispatcherConfig

	MetricsStopChan    chan struct{}
	MetricsStoppedChan chan struct{}
	// contains filtered or unexported fields
}

Define A DispatcherImpl Struct With Configuration & ConsumerGroup State

func (*DispatcherImpl) ConfigChanged

func (d *DispatcherImpl) ConfigChanged(ctx context.Context, configMap *corev1.ConfigMap) Dispatcher

ConfigChanged is called by the configMapObserver handler function in main() so that settings specific to the dispatcher may be extracted and the ConsumerGroups restarted if necessary. The new configmap could technically have changes to the eventing-kafka section as well as the sarama section, but none of those matter to a currently-running Dispatcher, so those are ignored here (which avoids the necessity of calling env.GetEnvironment). If those settings are needed in the future, the environment will also need to be re-parsed here. If there aren't any consumer-specific differences between the current config and the new one, then just log that and move on; do not restart the ConsumerGroups unnecessarily.

func (*DispatcherImpl) ObserveMetrics added in v0.22.0

func (d *DispatcherImpl) ObserveMetrics(interval time.Duration)

Async Process For Observing Kafka Metrics

func (*DispatcherImpl) SecretChanged added in v0.21.0

func (d *DispatcherImpl) SecretChanged(ctx context.Context, secret *corev1.Secret) Dispatcher

SecretChanged is called by the secretObserver handler function in main() so that settings specific to the dispatcher may be extracted and the dispatcher restarted if necessary.

func (*DispatcherImpl) Shutdown

func (d *DispatcherImpl) Shutdown()

Shutdown The Dispatcher

func (*DispatcherImpl) UpdateSubscriptions

func (d *DispatcherImpl) UpdateSubscriptions(subscriberSpecs []eventingduck.SubscriberSpec) map[eventingduck.SubscriberSpec]error

Update The Dispatcher's Subscriptions To Align With New State

type Handler

type Handler struct {
	Logger            *zap.Logger
	Subscriber        *eventingduck.SubscriberSpec
	MessageDispatcher channel.MessageDispatcher
}

Define A Sarama ConsumerGroupHandler Implementation

func NewHandler

func NewHandler(logger *zap.Logger, subscriber *eventingduck.SubscriberSpec) *Handler

Create A New Handler

func (*Handler) Cleanup

func (h *Handler) Cleanup(_ sarama.ConsumerGroupSession) error

ConsumerGroupHandler Lifecycle Method (Runs after all ConsumeClaims stop but before final offset commit)

func (*Handler) ConsumeClaim

func (h *Handler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumerGroupHandler Lifecycle Method (Main processing loop, must finish when claim.Messages() channel closes.)

func (*Handler) Setup

ConsumerGroupHandler Lifecycle Method (Runs before any ConsumeClaims)

type SubscriberWrapper

type SubscriberWrapper struct {
	eventingduck.SubscriberSpec
	GroupId       string
	ConsumerGroup sarama.ConsumerGroup
	StopChan      chan struct{}
}

Knative Eventing SubscriberSpec Wrapper Enhanced With Sarama ConsumerGroup

func NewSubscriberWrapper

func NewSubscriberWrapper(subscriberSpec eventingduck.SubscriberSpec, groupId string, consumerGroup sarama.ConsumerGroup) *SubscriberWrapper

SubscriberWrapper Constructor

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL