Documentation ¶
Index ¶
- Constants
- Variables
- func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl
- func SendMessage(dispatcher *kncloudevents.Dispatcher, ctx context.Context, ...) (*kncloudevents.DispatchInfo, error)
- type ChannelConfig
- type Consumer
- type ConsumerNameFunc
- type ConsumerSubjectFunc
- type ConsumerType
- type Dispatcher
- type EnqueueFunc
- type NatsDispatcherArgs
- type PullConsumer
- type PullSubscription
- type PushConsumer
- type Reconciler
- func (r *Reconciler) FinalizeKind(ctx context.Context, nc *v1alpha1.NatsJetStreamChannel) (err pkgreconciler.Event)
- func (r *Reconciler) ObserveDeletion(ctx context.Context, key types.NamespacedName) error
- func (r *Reconciler) ObserveKind(ctx context.Context, nc *v1alpha1.NatsJetStreamChannel) pkgreconciler.Event
- func (r *Reconciler) ReconcileKind(ctx context.Context, nc *v1alpha1.NatsJetStreamChannel) pkgreconciler.Event
- type SendOption
- func WithDeadLetterSink(dls *duckv1.Addressable) SendOption
- func WithHeader(header http.Header) SendOption
- func WithReply(reply *duckv1.Addressable) SendOption
- func WithRetryConfig(retryConfig *kncloudevents.RetryConfig) SendOption
- func WithTransformers(transformers ...binding.Transformer) SendOption
- type StreamNameFunc
- type StreamSubjectFunc
- type SubscriberStatus
- type SubscriberStatusType
- type Subscription
- type TypeExtractorTransformer
Constants ¶
const ( ReasonJetstreamStreamCreated = "JetstreamStreamCreated" ReasonJetstreamStreamFailed = "JetstreamStreamFailed" ReasonJetstreamConsumerCreated = "JetstreamConsumerCreated" ReasonJetstreamConsumerFailed = "JetstreamConsumerFailed" )
Variables ¶
var ( // FetchBatchSize is the number of messages that will be fetched from JetStream in a single // request. This can be configured via the FETCH_BATCH_SIZE environment variable. // // If you expect to process a high-volume of messages, you may want to increase this number to // reduce the number of requests made to JetStream. FetchBatchSize = 32 FetchMaxWaitDefault = 200 * time.Millisecond )
var (
ErrConsumerClosed = errors.New("dispatcher consumer closed")
)
Functions ¶
func NewController ¶
func SendMessage ¶ added in v0.41.1
func SendMessage(dispatcher *kncloudevents.Dispatcher, ctx context.Context, message binding.Message, destination duckv1.Addressable, ackWait time.Duration, msg *nats.Msg, options ...SendOption) (*kncloudevents.DispatchInfo, error)
Types ¶
type ChannelConfig ¶
type ChannelConfig struct { channel.ChannelReference HostName string StreamName string ConsumerConfigTemplate *v1alpha1.ConsumerConfigTemplate Subscriptions []Subscription }
func (ChannelConfig) SubscriptionsUIDs ¶
func (cc ChannelConfig) SubscriptionsUIDs() []string
type Consumer ¶
type Consumer interface { ConsumerType() ConsumerType Close() error UpdateSubscription(c *ChannelConfig, sub Subscription) }
type ConsumerNameFunc ¶
type ConsumerSubjectFunc ¶
type ConsumerType ¶ added in v0.43.0
type ConsumerType string
const ( PushConsumerType ConsumerType = "Push" PullConsumerType ConsumerType = "Pull" )
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
Dispatcher is responsible for managing both directions of events over the NatsJetStreamChannel. It manages the lifecycle of the following components: - Stream per NatsJetStreamChannel - HTTP receiver which publishes to the desired Stream - Consumer per .spec.subscribers[] of a channel, forwarding events to the specified subscriber address.
func NewDispatcher ¶
func NewDispatcher(ctx context.Context, args NatsDispatcherArgs) (*Dispatcher, error)
func (*Dispatcher) ReconcileConsumers ¶
func (d *Dispatcher) ReconcileConsumers(ctx context.Context, config ChannelConfig, isLeader bool) error
func (*Dispatcher) RegisterChannelHost ¶
func (d *Dispatcher) RegisterChannelHost(config ChannelConfig) error
RegisterChannelHost registers the NatsDispatcher to accept HTTP events matching the specified HostName
type EnqueueFunc ¶
type EnqueueFunc func(ref types.NamespacedName)
EnqueueFunc is passed to the Reconciler for when a follower instance attempts to sync on a Consumer which does not yet exist
type NatsDispatcherArgs ¶
type NatsDispatcherArgs struct { JetStream nats.JetStreamContext SubjectFunc StreamSubjectFunc ConsumerNameFunc ConsumerNameFunc ConsumerSubjectFunc ConsumerSubjectFunc PodName string ContainerName string }
type PullConsumer ¶ added in v0.43.0
type PullConsumer struct {
// contains filtered or unexported fields
}
func NewPullConsumer ¶ added in v0.43.0
func NewPullConsumer(ctx context.Context, consumer *nats.Subscription, subscription Subscription, dispatcher *kncloudevents.Dispatcher, reporter eventingchannels.StatsReporter, channelConfig *ChannelConfig) (*PullConsumer, error)
func (*PullConsumer) Close ¶ added in v0.43.0
func (c *PullConsumer) Close() error
func (*PullConsumer) ConsumerType ¶ added in v0.43.0
func (c *PullConsumer) ConsumerType() ConsumerType
func (*PullConsumer) Start ¶ added in v0.43.0
func (c *PullConsumer) Start() error
Start begins the consumer and handles messages until Close is called. This method is blocking and will return an error if the consumer fails prematurely. A nil error will be returned upon being stopped by the Close method.
func (*PullConsumer) UpdateSubscription ¶ added in v0.43.0
func (c *PullConsumer) UpdateSubscription(config *ChannelConfig, sub Subscription)
type PullSubscription ¶ added in v0.43.0
type PushConsumer ¶ added in v0.43.0
type PushConsumer struct {
// contains filtered or unexported fields
}
func (*PushConsumer) Close ¶ added in v0.43.0
func (c *PushConsumer) Close() error
func (*PushConsumer) ConsumerType ¶ added in v0.43.0
func (c *PushConsumer) ConsumerType() ConsumerType
func (*PushConsumer) MsgHandler ¶ added in v0.43.0
func (c *PushConsumer) MsgHandler(msg *nats.Msg)
func (*PushConsumer) UpdateSubscription ¶ added in v0.43.0
func (c *PushConsumer) UpdateSubscription(_ *ChannelConfig, sub Subscription)
type Reconciler ¶
type Reconciler struct {
// contains filtered or unexported fields
}
Reconciler reconciles incoming NatsJetstreamChannel CRDs by ensuring the following states: - Creates a JSM Stream for each channel - Creates a HTTP listener which publishes received events to the Stream - Creates a consumer for each .spec.subscribers[] and forwards events to the subscriber address
func (*Reconciler) FinalizeKind ¶
func (r *Reconciler) FinalizeKind(ctx context.Context, nc *v1alpha1.NatsJetStreamChannel) (err pkgreconciler.Event)
FinalizeKind is invoked when the resource is set for deletion. This method is only called when the controller is leader, so unsubscribe all consumers and then delete the stream.
func (*Reconciler) ObserveDeletion ¶
func (r *Reconciler) ObserveDeletion(ctx context.Context, key types.NamespacedName) error
ObserveDeletion is called on non-leader controllers after the actual resource is deleted. In this case we just unsubscribe from all consumers, the leader will clean the actual JetStream resources up for us via FinalizeKind.
func (*Reconciler) ObserveKind ¶
func (r *Reconciler) ObserveKind(ctx context.Context, nc *v1alpha1.NatsJetStreamChannel) pkgreconciler.Event
ObserveKind is invoked when a NatsJetStreamChannel requires reconciliation but the dispatcher is not the leader. This will wait until the channel has been marked "ready" by the leader, then subscribe to the JSM stream to forward to Knative subscriptions; this enables us to scale dispatchers horizontally to cope with large message volumes. The only requirement to allow this to work is the use of Queue Subscribers.
func (*Reconciler) ReconcileKind ¶
func (r *Reconciler) ReconcileKind(ctx context.Context, nc *v1alpha1.NatsJetStreamChannel) pkgreconciler.Event
type SendOption ¶ added in v0.41.1
type SendOption func(*senderConfig) error
func WithDeadLetterSink ¶ added in v0.41.1
func WithDeadLetterSink(dls *duckv1.Addressable) SendOption
func WithHeader ¶ added in v0.41.1
func WithHeader(header http.Header) SendOption
func WithReply ¶ added in v0.41.1
func WithReply(reply *duckv1.Addressable) SendOption
func WithRetryConfig ¶ added in v0.41.1
func WithRetryConfig(retryConfig *kncloudevents.RetryConfig) SendOption
func WithTransformers ¶ added in v0.41.1
func WithTransformers(transformers ...binding.Transformer) SendOption
type StreamNameFunc ¶
type StreamNameFunc func(nc *v1alpha1.NatsJetStreamChannel) string
type StreamSubjectFunc ¶
type SubscriberStatus ¶
type SubscriberStatus struct { UID types.UID Type SubscriberStatusType Error error }
func NewSubscriberStatusCreated ¶
func NewSubscriberStatusCreated(uid types.UID) SubscriberStatus
func NewSubscriberStatusError ¶
func NewSubscriberStatusError(uid types.UID, err error) SubscriberStatus
func NewSubscriberStatusSkipped ¶
func NewSubscriberStatusSkipped(uid types.UID) SubscriberStatus
func NewSubscriberStatusUpToDate ¶
func NewSubscriberStatusUpToDate(uid types.UID) SubscriberStatus
type SubscriberStatusType ¶
type SubscriberStatusType int
const ( SubscriberStatusTypeCreated SubscriberStatusType = iota SubscriberStatusTypeSkipped SubscriberStatusTypeUpToDate SubscriberStatusTypeError SubscriberStatusTypeDeleted )
type Subscription ¶
type Subscription struct { UID types.UID fanout.Subscription PullSubscription *PullSubscription }
type TypeExtractorTransformer ¶ added in v0.41.0
type TypeExtractorTransformer string
func (*TypeExtractorTransformer) Transform ¶ added in v0.41.0
func (a *TypeExtractorTransformer) Transform(reader binding.MessageMetadataReader, _ binding.MessageMetadataWriter) error