dispatcher

package
v0.43.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 23, 2024 License: Apache-2.0 Imports: 60 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ReasonJetstreamStreamCreated   = "JetstreamStreamCreated"
	ReasonJetstreamStreamFailed    = "JetstreamStreamFailed"
	ReasonJetstreamConsumerCreated = "JetstreamConsumerCreated"
	ReasonJetstreamConsumerFailed  = "JetstreamConsumerFailed"
)

Variables

View Source
var (
	// FetchBatchSize is the number of messages that will be fetched from JetStream in a single
	// request. This can be configured via the FETCH_BATCH_SIZE environment variable.
	//
	// If you expect to process a high-volume of messages, you may want to increase this number to
	// reduce the number of requests made to JetStream.
	FetchBatchSize = 32

	FetchMaxWaitDefault = 200 * time.Millisecond
)
View Source
var (
	ErrConsumerClosed = errors.New("dispatcher consumer closed")
)

Functions

func NewController

func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl

func SendMessage added in v0.41.1

func SendMessage(dispatcher *kncloudevents.Dispatcher, ctx context.Context, message binding.Message, destination duckv1.Addressable, ackWait time.Duration, msg *nats.Msg, options ...SendOption) (*kncloudevents.DispatchInfo, error)

Types

type ChannelConfig

type ChannelConfig struct {
	channel.ChannelReference
	HostName               string
	StreamName             string
	ConsumerConfigTemplate *v1alpha1.ConsumerConfigTemplate
	Subscriptions          []Subscription
}

func (ChannelConfig) SubscriptionsUIDs

func (cc ChannelConfig) SubscriptionsUIDs() []string

type Consumer

type Consumer interface {
	ConsumerType() ConsumerType
	Close() error
	UpdateSubscription(c *ChannelConfig, sub Subscription)
}

type ConsumerNameFunc

type ConsumerNameFunc func(subID string) string

type ConsumerSubjectFunc

type ConsumerSubjectFunc func(namespace, name, uid string) string

type ConsumerType added in v0.43.0

type ConsumerType string
const (
	PushConsumerType ConsumerType = "Push"
	PullConsumerType ConsumerType = "Pull"
)

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatcher is responsible for managing both directions of events over the NatsJetStreamChannel. It manages the lifecycle of the following components: - Stream per NatsJetStreamChannel - HTTP receiver which publishes to the desired Stream - Consumer per .spec.subscribers[] of a channel, forwarding events to the specified subscriber address.

func NewDispatcher

func NewDispatcher(ctx context.Context, args NatsDispatcherArgs) (*Dispatcher, error)

func (*Dispatcher) ReconcileConsumers

func (d *Dispatcher) ReconcileConsumers(ctx context.Context, config ChannelConfig, isLeader bool) error

func (*Dispatcher) RegisterChannelHost

func (d *Dispatcher) RegisterChannelHost(config ChannelConfig) error

RegisterChannelHost registers the NatsDispatcher to accept HTTP events matching the specified HostName

func (*Dispatcher) Start

func (d *Dispatcher) Start(ctx context.Context) error

type EnqueueFunc

type EnqueueFunc func(ref types.NamespacedName)

EnqueueFunc is passed to the Reconciler for when a follower instance attempts to sync on a Consumer which does not yet exist

type NatsDispatcherArgs

type NatsDispatcherArgs struct {
	JetStream nats.JetStreamContext

	SubjectFunc         StreamSubjectFunc
	ConsumerNameFunc    ConsumerNameFunc
	ConsumerSubjectFunc ConsumerSubjectFunc

	PodName       string
	ContainerName string
}

type PullConsumer added in v0.43.0

type PullConsumer struct {
	// contains filtered or unexported fields
}

func NewPullConsumer added in v0.43.0

func NewPullConsumer(ctx context.Context, consumer *nats.Subscription, subscription Subscription, dispatcher *kncloudevents.Dispatcher, reporter eventingchannels.StatsReporter, channelConfig *ChannelConfig) (*PullConsumer, error)

func (*PullConsumer) Close added in v0.43.0

func (c *PullConsumer) Close() error

func (*PullConsumer) ConsumerType added in v0.43.0

func (c *PullConsumer) ConsumerType() ConsumerType

func (*PullConsumer) Start added in v0.43.0

func (c *PullConsumer) Start() error

Start begins the consumer and handles messages until Close is called. This method is blocking and will return an error if the consumer fails prematurely. A nil error will be returned upon being stopped by the Close method.

func (*PullConsumer) UpdateSubscription added in v0.43.0

func (c *PullConsumer) UpdateSubscription(config *ChannelConfig, sub Subscription)

type PullSubscription added in v0.43.0

type PullSubscription struct {
	FetchMaxWait   time.Duration
	FetchBatchSize int
}

type PushConsumer added in v0.43.0

type PushConsumer struct {
	// contains filtered or unexported fields
}

func (*PushConsumer) Close added in v0.43.0

func (c *PushConsumer) Close() error

func (*PushConsumer) ConsumerType added in v0.43.0

func (c *PushConsumer) ConsumerType() ConsumerType

func (*PushConsumer) MsgHandler added in v0.43.0

func (c *PushConsumer) MsgHandler(msg *nats.Msg)

func (*PushConsumer) UpdateSubscription added in v0.43.0

func (c *PushConsumer) UpdateSubscription(_ *ChannelConfig, sub Subscription)

type Reconciler

type Reconciler struct {
	// contains filtered or unexported fields
}

Reconciler reconciles incoming NatsJetstreamChannel CRDs by ensuring the following states: - Creates a JSM Stream for each channel - Creates a HTTP listener which publishes received events to the Stream - Creates a consumer for each .spec.subscribers[] and forwards events to the subscriber address

func (*Reconciler) FinalizeKind

func (r *Reconciler) FinalizeKind(ctx context.Context, nc *v1alpha1.NatsJetStreamChannel) (err pkgreconciler.Event)

FinalizeKind is invoked when the resource is set for deletion. This method is only called when the controller is leader, so unsubscribe all consumers and then delete the stream.

func (*Reconciler) ObserveDeletion

func (r *Reconciler) ObserveDeletion(ctx context.Context, key types.NamespacedName) error

ObserveDeletion is called on non-leader controllers after the actual resource is deleted. In this case we just unsubscribe from all consumers, the leader will clean the actual JetStream resources up for us via FinalizeKind.

func (*Reconciler) ObserveKind

ObserveKind is invoked when a NatsJetStreamChannel requires reconciliation but the dispatcher is not the leader. This will wait until the channel has been marked "ready" by the leader, then subscribe to the JSM stream to forward to Knative subscriptions; this enables us to scale dispatchers horizontally to cope with large message volumes. The only requirement to allow this to work is the use of Queue Subscribers.

func (*Reconciler) ReconcileKind

type SendOption added in v0.41.1

type SendOption func(*senderConfig) error

func WithDeadLetterSink added in v0.41.1

func WithDeadLetterSink(dls *duckv1.Addressable) SendOption

func WithHeader added in v0.41.1

func WithHeader(header http.Header) SendOption

func WithReply added in v0.41.1

func WithReply(reply *duckv1.Addressable) SendOption

func WithRetryConfig added in v0.41.1

func WithRetryConfig(retryConfig *kncloudevents.RetryConfig) SendOption

func WithTransformers added in v0.41.1

func WithTransformers(transformers ...binding.Transformer) SendOption

type StreamNameFunc

type StreamNameFunc func(nc *v1alpha1.NatsJetStreamChannel) string

type StreamSubjectFunc

type StreamSubjectFunc func(namespace, name string) string

type SubscriberStatus

type SubscriberStatus struct {
	UID   types.UID
	Type  SubscriberStatusType
	Error error
}

func NewSubscriberStatusCreated

func NewSubscriberStatusCreated(uid types.UID) SubscriberStatus

func NewSubscriberStatusError

func NewSubscriberStatusError(uid types.UID, err error) SubscriberStatus

func NewSubscriberStatusSkipped

func NewSubscriberStatusSkipped(uid types.UID) SubscriberStatus

func NewSubscriberStatusUpToDate

func NewSubscriberStatusUpToDate(uid types.UID) SubscriberStatus

type SubscriberStatusType

type SubscriberStatusType int
const (
	SubscriberStatusTypeCreated SubscriberStatusType = iota
	SubscriberStatusTypeSkipped
	SubscriberStatusTypeUpToDate
	SubscriberStatusTypeError
	SubscriberStatusTypeDeleted
)

type Subscription

type Subscription struct {
	UID types.UID
	fanout.Subscription
	PullSubscription *PullSubscription
}

type TypeExtractorTransformer added in v0.41.0

type TypeExtractorTransformer string

func (*TypeExtractorTransformer) Transform added in v0.41.0

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL