Documentation ¶
Index ¶
- Constants
- Variables
- func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl
- func SendMessage(dispatcher *kncloudevents.Dispatcher, ctx context.Context, ...) (*kncloudevents.DispatchInfo, error)
- type ChannelConfig
- type Consumer
- type ConsumerNameFunc
- type ConsumerSubjectFunc
- type Dispatcher
- type EnqueueFunc
- type NatsDispatcherArgs
- type Reconciler
- func (r *Reconciler) FinalizeKind(ctx context.Context, nc *v1alpha1.NatsJetStreamChannel) (err pkgreconciler.Event)
- func (r *Reconciler) ObserveDeletion(ctx context.Context, key types.NamespacedName) error
- func (r *Reconciler) ObserveKind(ctx context.Context, nc *v1alpha1.NatsJetStreamChannel) pkgreconciler.Event
- func (r *Reconciler) ReconcileKind(ctx context.Context, nc *v1alpha1.NatsJetStreamChannel) pkgreconciler.Event
- type SendOption
- func WithDeadLetterSink(dls *duckv1.Addressable) SendOption
- func WithHeader(header http.Header) SendOption
- func WithReply(reply *duckv1.Addressable) SendOption
- func WithRetryConfig(retryConfig *kncloudevents.RetryConfig) SendOption
- func WithTransformers(transformers ...binding.Transformer) SendOption
- type StreamNameFunc
- type StreamSubjectFunc
- type SubscriberStatus
- type SubscriberStatusType
- type Subscription
- type TypeExtractorTransformer
Constants ¶
const ( ReasonJetstreamStreamCreated = "JetstreamStreamCreated" ReasonJetstreamStreamFailed = "JetstreamStreamFailed" ReasonJetstreamConsumerCreated = "JetstreamConsumerCreated" ReasonJetstreamConsumerFailed = "JetstreamConsumerFailed" )
Variables ¶
var (
ErrConsumerClosed = errors.New("dispatcher consumer closed")
)
Functions ¶
func NewController ¶
func SendMessage ¶ added in v0.41.1
func SendMessage(dispatcher *kncloudevents.Dispatcher, ctx context.Context, message binding.Message, destination duckv1.Addressable, ackWait time.Duration, msg *nats.Msg, options ...SendOption) (*kncloudevents.DispatchInfo, error)
Types ¶
type ChannelConfig ¶
type ChannelConfig struct { channel.ChannelReference HostName string StreamName string ConsumerConfigTemplate *v1alpha1.ConsumerConfigTemplate Subscriptions []Subscription }
func (ChannelConfig) SubscriptionsUIDs ¶
func (cc ChannelConfig) SubscriptionsUIDs() []string
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func (*Consumer) MsgHandler ¶
func (c *Consumer) MsgHandler(msg *nats.Msg)
type ConsumerNameFunc ¶
type ConsumerSubjectFunc ¶
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
Dispatcher is responsible for managing both directions of events over the NatsJetStreamChannel. It manages the lifecycle of the following components: - Stream per NatsJetStreamChannel - HTTP receiver which publishes to the desired Stream - Consumer per .spec.subscribers[] of a channel, forwarding events to the specified subscriber address.
func NewDispatcher ¶
func NewDispatcher(ctx context.Context, args NatsDispatcherArgs) (*Dispatcher, error)
func (*Dispatcher) ReconcileConsumers ¶
func (d *Dispatcher) ReconcileConsumers(ctx context.Context, config ChannelConfig, isLeader bool) error
func (*Dispatcher) RegisterChannelHost ¶
func (d *Dispatcher) RegisterChannelHost(config ChannelConfig) error
RegisterChannelHost registers the NatsDispatcher to accept HTTP events matching the specified HostName
type EnqueueFunc ¶
type EnqueueFunc func(ref types.NamespacedName)
EnqueueFunc is passed to the Reconciler for when a follower instance attempts to sync on a Consumer which does not yet exist
type NatsDispatcherArgs ¶
type NatsDispatcherArgs struct { JetStream nats.JetStreamContext SubjectFunc StreamSubjectFunc ConsumerNameFunc ConsumerNameFunc ConsumerSubjectFunc ConsumerSubjectFunc PodName string ContainerName string }
type Reconciler ¶
type Reconciler struct {
// contains filtered or unexported fields
}
Reconciler reconciles incoming NatsJetstreamChannel CRDs by ensuring the following states: - Creates a JSM Stream for each channel - Creates a HTTP listener which publishes received events to the Stream - Creates a consumer for each .spec.subscribers[] and forwards events to the subscriber address
func (*Reconciler) FinalizeKind ¶
func (r *Reconciler) FinalizeKind(ctx context.Context, nc *v1alpha1.NatsJetStreamChannel) (err pkgreconciler.Event)
FinalizeKind is invoked when the resource is set for deletion. This method is only called when the controller is leader, so unsubscribe all consumers and then delete the stream.
func (*Reconciler) ObserveDeletion ¶
func (r *Reconciler) ObserveDeletion(ctx context.Context, key types.NamespacedName) error
ObserveDeletion is called on non-leader controllers after the actual resource is deleted. In this case we just unsubscribe from all consumers, the leader will clean the actual JetStream resources up for us via FinalizeKind.
func (*Reconciler) ObserveKind ¶
func (r *Reconciler) ObserveKind(ctx context.Context, nc *v1alpha1.NatsJetStreamChannel) pkgreconciler.Event
ObserveKind is invoked when a NatsJetStreamChannel requires reconciliation but the dispatcher is not the leader. This will wait until the channel has been marked "ready" by the leader, then subscribe to the JSM stream to forward to Knative subscriptions; this enables us to scale dispatchers horizontally to cope with large message volumes. The only requirement to allow this to work is the use of Queue Subscribers.
func (*Reconciler) ReconcileKind ¶
func (r *Reconciler) ReconcileKind(ctx context.Context, nc *v1alpha1.NatsJetStreamChannel) pkgreconciler.Event
type SendOption ¶ added in v0.41.1
type SendOption func(*senderConfig) error
func WithDeadLetterSink ¶ added in v0.41.1
func WithDeadLetterSink(dls *duckv1.Addressable) SendOption
func WithHeader ¶ added in v0.41.1
func WithHeader(header http.Header) SendOption
func WithReply ¶ added in v0.41.1
func WithReply(reply *duckv1.Addressable) SendOption
func WithRetryConfig ¶ added in v0.41.1
func WithRetryConfig(retryConfig *kncloudevents.RetryConfig) SendOption
func WithTransformers ¶ added in v0.41.1
func WithTransformers(transformers ...binding.Transformer) SendOption
type StreamNameFunc ¶
type StreamNameFunc func(nc *v1alpha1.NatsJetStreamChannel) string
type StreamSubjectFunc ¶
type SubscriberStatus ¶
type SubscriberStatus struct { UID types.UID Type SubscriberStatusType Error error }
func NewSubscriberStatusCreated ¶
func NewSubscriberStatusCreated(uid types.UID) SubscriberStatus
func NewSubscriberStatusError ¶
func NewSubscriberStatusError(uid types.UID, err error) SubscriberStatus
func NewSubscriberStatusSkipped ¶
func NewSubscriberStatusSkipped(uid types.UID) SubscriberStatus
func NewSubscriberStatusUpToDate ¶
func NewSubscriberStatusUpToDate(uid types.UID) SubscriberStatus
type SubscriberStatusType ¶
type SubscriberStatusType int
const ( SubscriberStatusTypeCreated SubscriberStatusType = iota SubscriberStatusTypeSkipped SubscriberStatusTypeUpToDate SubscriberStatusTypeError SubscriberStatusTypeDeleted )
type Subscription ¶
type Subscription struct { UID types.UID fanout.Subscription }
type TypeExtractorTransformer ¶ added in v0.41.0
type TypeExtractorTransformer string
func (*TypeExtractorTransformer) Transform ¶ added in v0.41.0
func (a *TypeExtractorTransformer) Transform(reader binding.MessageMetadataReader, _ binding.MessageMetadataWriter) error