dispatcher

package
v0.30.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 3, 2023 License: Apache-2.0 Imports: 45 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ReasonJetstreamStreamCreated  = "JetstreamStreamCreated"
	ReasonJetstreamStreamFailed   = "JetstreamStreamFailed"
	ReasonJetstreamConsumerFailed = "JetstreamConsumerFailed"
)

Variables

View Source
var (
	ErrConsumerClosed = errors.New("dispatcher consumer closed")
)

Functions

func NewController

func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl

Types

type ChannelConfig

type ChannelConfig struct {
	channel.ChannelReference
	HostName               string
	StreamName             string
	ConsumerConfigTemplate *v1alpha1.ConsumerConfigTemplate
	Subscriptions          []Subscription
}

func (ChannelConfig) SubscriptionsUIDs

func (cc ChannelConfig) SubscriptionsUIDs() []string

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func (*Consumer) Close

func (c *Consumer) Close() error

func (*Consumer) MsgHandler

func (c *Consumer) MsgHandler(msg *nats.Msg)

type ConsumerNameFunc

type ConsumerNameFunc func(subID string) string

type ConsumerSubjectFunc

type ConsumerSubjectFunc func(namespace, name, uid string) string

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatcher is responsible for managing both directions of events over the NatsJetStreamChannel. It manages the lifecycle of the following components: - Stream per NatsJetStreamChannel - HTTP receiver which publishes to the desired Stream - Consumer per .spec.subscribers[] of a channel, forwarding events to the specified subscriber address.

func NewDispatcher

func NewDispatcher(ctx context.Context, args NatsDispatcherArgs) (*Dispatcher, error)

func (*Dispatcher) ReconcileConsumers

func (d *Dispatcher) ReconcileConsumers(ctx context.Context, config ChannelConfig, isLeader bool) error

func (*Dispatcher) RegisterChannelHost

func (d *Dispatcher) RegisterChannelHost(config ChannelConfig) error

RegisterChannelHost registers the Dispatcher to accept HTTP events matching the specified HostName

func (*Dispatcher) Start

func (d *Dispatcher) Start(ctx context.Context) error

type EnqueueFunc

type EnqueueFunc func(ref types.NamespacedName)

EnqueueFunc is passed to the Reconciler for when a follower instance attempts to sync on a Consumer which does not yet exist

type NatsDispatcherArgs

type NatsDispatcherArgs struct {
	JetStream nats.JetStreamContext

	SubjectFunc         StreamSubjectFunc
	ConsumerNameFunc    ConsumerNameFunc
	ConsumerSubjectFunc ConsumerSubjectFunc

	PodName       string
	ContainerName string
}

type Reconciler

type Reconciler struct {
	// contains filtered or unexported fields
}

Reconciler reconciles incoming NatsJetstreamChannel CRDs by ensuring the following states: - Creates a JSM Stream for each channel - Creates a HTTP listener which publishes received events to the Stream - Creates a consumer for each .spec.subscribers[] and forwards events to the subscriber address

func (*Reconciler) FinalizeKind

func (r *Reconciler) FinalizeKind(ctx context.Context, nc *v1alpha1.NatsJetStreamChannel) (err pkgreconciler.Event)

FinalizeKind is invoked when the resource is set for deletion. This method is only called when the controller is leader, so unsubscribe all consumers and then delete the stream.

func (*Reconciler) ObserveDeletion

func (r *Reconciler) ObserveDeletion(ctx context.Context, key types.NamespacedName) error

ObserveDeletion is called on non-leader controllers after the actual resource is deleted. In this case we just unsubscribe from all consumers, the leader will clean the actual JetStream resources up for us via FinalizeKind.

func (*Reconciler) ObserveKind

ObserveKind is invoked when a NatsJetStreamChannel requires reconciliation but the dispatcher is not the leader. This will wait until the channel has been marked "ready" by the leader, then subscribe to the JSM stream to forward to Knative subscriptions; this enables us to scale dispatchers horizontally to cope with large message volumes. The only requirement to allow this to work is the use of Queue Subscribers.

func (*Reconciler) ReconcileKind

type StreamNameFunc

type StreamNameFunc func(nc *v1alpha1.NatsJetStreamChannel) string

type StreamSubjectFunc

type StreamSubjectFunc func(namespace, name string) string

type SubscriberStatus

type SubscriberStatus struct {
	UID   types.UID
	Type  SubscriberStatusType
	Error error
}

func NewSubscriberStatusCreated

func NewSubscriberStatusCreated(uid types.UID) SubscriberStatus

func NewSubscriberStatusError

func NewSubscriberStatusError(uid types.UID, err error) SubscriberStatus

func NewSubscriberStatusSkipped

func NewSubscriberStatusSkipped(uid types.UID) SubscriberStatus

func NewSubscriberStatusUpToDate

func NewSubscriberStatusUpToDate(uid types.UID) SubscriberStatus

type SubscriberStatusType

type SubscriberStatusType int
const (
	SubscriberStatusTypeCreated SubscriberStatusType = iota
	SubscriberStatusTypeSkipped
	SubscriberStatusTypeUpToDate
	SubscriberStatusTypeError
	SubscriberStatusTypeDeleted
)

type Subscription

type Subscription struct {
	UID types.UID
	fanout.Subscription
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL