Documentation ¶
Index ¶
- Constants
- Variables
- func AddJSCleanEventTypesToStatus(sub *v1alpha2.Subscription, cleaner cleaner.Cleaner)
- func GetBackendJetStreamTypes(subscription *eventingv1alpha2.Subscription, jsSubjects []string) ([]eventingv1alpha2.JetStreamTypes, error)
- func GetCleanEventTypes(sub *eventingv1alpha2.Subscription, cleaner cleaner.Cleaner) []eventingv1alpha2.EventType
- func GetCleanEventTypesFromEventTypes(eventTypes []eventingv1alpha2.EventType) []string
- func NewNatsMessagePayload(data, id, source, eventTime, eventType string) string
- func SendCloudEventToJetStream(jetStreamClient *JetStream, subject, eventData, cetype string) error
- func SendEventToJetStream(jsClient *JetStream, data string) error
- func StartNATSServer(serverOpts ...evtesting.NatsServerOpt) (*server.Server, int, error)
- func Validate(natsConfig env.NATSConfig) error
- type Backend
- type Builder
- type ConnectionBuilder
- type ConnectionInterface
- type DefaultSubOpts
- type JetStream
- func (js *JetStream) DeleteInvalidConsumers(subscriptions []eventingv1alpha2.Subscription) error
- func (js *JetStream) DeleteSubscription(subscription *eventingv1alpha2.Subscription) error
- func (js *JetStream) DeleteSubscriptionsOnly(subscription *eventingv1alpha2.Subscription) error
- func (js *JetStream) GetConfig() env.NATSConfig
- func (js *JetStream) GetJetStreamContext() nats.JetStreamContext
- func (js *JetStream) GetJetStreamSubject(source, subject string, typeMatching eventingv1alpha2.TypeMatching) string
- func (js *JetStream) GetJetStreamSubjects(source string, subjects []string, typeMatching eventingv1alpha2.TypeMatching) []string
- func (js *JetStream) GetNATSSubscriptions() map[SubscriptionSubjectIdentifier]Subscriber
- func (js *JetStream) Initialize(connCloseHandler backendutils.ConnClosedHandler) error
- func (js *JetStream) SyncSubscription(subscription *eventingv1alpha2.Subscription) error
- type Subscriber
- type Subscription
- type SubscriptionSubjectIdentifier
- type TestEnvironment
Constants ¶
const ( DefaultStreamName = "kyma" DefaultJetStreamSubjectPrefix = "kyma" DefaultMaxReconnects = 10 DefaultMaxInFlights = 10 )
const ( StorageTypeMemory = "memory" StorageTypeFile = "file" RetentionPolicyLimits = "limits" RetentionPolicyInterest = "interest" DiscardPolicyNew = "new" DiscardPolicyOld = "old" ConsumerDeliverPolicyAll = "all" ConsumerDeliverPolicyLast = "last" ConsumerDeliverPolicyLastPerSubject = "last_per_subject" ConsumerDeliverPolicyNew = "new" )
Variables ¶
var ( ErrMissingSubscription = errors.New("failed to find a NATS subscription for a subject") ErrAddConsumer = errors.New("failed to add a consumer") ErrGetConsumer = errors.New("failed to get consumer info") ErrUpdateConsumer = errors.New("failed to update consumer") ErrDeleteConsumer = errors.New("failed to delete consumer") ErrFailedSubscribe = errors.New("failed to create NATS JetStream subscription") ErrFailedUnsubscribe = errors.New("failed to unsubscribe from NATS JetStream") ErrConnect = errors.New("failed to connect to NATS JetStream") ErrEmptyStreamName = errors.New("stream name cannot be empty") ErrStreamNameTooLong = fmt.Errorf("stream name should be max %d characters long", jsMaxStreamNameLength) )
var ErrInvalidDiscardPolicy = pkgerrors.NewArgumentError("invalid stream discard policy: %q")
var ErrInvalidRetentionPolicy = pkgerrors.NewArgumentError("invalid stream retention policy: %q")
var ErrInvalidStorageType = pkgerrors.NewArgumentError("invalid stream storage type: %q")
Functions ¶
func AddJSCleanEventTypesToStatus ¶
func AddJSCleanEventTypesToStatus(sub *v1alpha2.Subscription, cleaner cleaner.Cleaner)
func GetBackendJetStreamTypes ¶
func GetBackendJetStreamTypes(subscription *eventingv1alpha2.Subscription, jsSubjects []string) ([]eventingv1alpha2.JetStreamTypes, error)
GetBackendJetStreamTypes gets the original event type and the consumer name for all the subscriptions and this slice is set as the backend specific status for JetStream.
func GetCleanEventTypes ¶
func GetCleanEventTypes(sub *eventingv1alpha2.Subscription, cleaner cleaner.Cleaner) []eventingv1alpha2.EventType
GetCleanEventTypes returns a list of clean eventTypes from the unique types in the subscription.
func GetCleanEventTypesFromEventTypes ¶
func GetCleanEventTypesFromEventTypes(eventTypes []eventingv1alpha2.EventType) []string
func NewNatsMessagePayload ¶
func SendEventToJetStream ¶
func StartNATSServer ¶
func Validate ¶
func Validate(natsConfig env.NATSConfig) error
Validate ensures that the NatsConfig is valid and therefore can be used safely. TODO: as soon as backend/nats is gone, make this method a function of backendnats.Config.
Types ¶
type Backend ¶
type Backend interface { // Initialize should initialize the communication layer with the messaging backend system Initialize(connCloseHandler backendutilsv2.ConnClosedHandler) error // SyncSubscription should synchronize the Kyma eventing subscription with the subscriber infrastructure of JetStream. SyncSubscription(subscription *eventingv1alpha2.Subscription) error // DeleteSubscription should delete the corresponding subscriber data of messaging backend DeleteSubscription(subscription *eventingv1alpha2.Subscription) error // DeleteSubscriptionsOnly should delete the JetStream subscriptions only. // The corresponding JetStream consumers of the subscriptions must not be deleted. DeleteSubscriptionsOnly(subscription *eventingv1alpha2.Subscription) error // GetJetStreamSubjects returns a list of subjects appended with stream name and source as prefix if needed GetJetStreamSubjects(source string, subjects []string, typeMatching eventingv1alpha2.TypeMatching) []string // DeleteInvalidConsumers deletes all JetStream consumers having no subscription types in subscription resources DeleteInvalidConsumers(subscriptions []eventingv1alpha2.Subscription) error // GetJetStreamContext returns the current JetStreamContext GetJetStreamContext() nats.JetStreamContext // GetConfig returns the backends Configuration GetConfig() env.NATSConfig }
type Builder ¶
type Builder interface {
Build() (ConnectionInterface, error)
}
func NewConnectionBuilder ¶
func NewConnectionBuilder(config env.NATSConfig) Builder
type ConnectionBuilder ¶
type ConnectionBuilder struct {
// contains filtered or unexported fields
}
ConnectionBuilder helps in establishing a connection to NATS.
func (ConnectionBuilder) Build ¶
func (b ConnectionBuilder) Build() (ConnectionInterface, error)
Build connects to NATS and returns the connection. If an error occurs, ErrConnect is returned.
type ConnectionInterface ¶
type ConnectionInterface interface { IsConnected() bool SetClosedHandler(cb nats.ConnHandler) SetReconnectHandler(rcb nats.ConnHandler) JetStream(opts ...nats.JSOpt) (nats.JetStreamContext, error) }
ConnectionInterface is a contract for a NATS connection object.
type DefaultSubOpts ¶
type DefaultSubOpts []nats.SubOpt
type JetStream ¶
type JetStream struct { Config env.NATSConfig Conn *nats.Conn // contains filtered or unexported fields }
func NewJetStream ¶
func NewJetStream(config env.NATSConfig, metricsCollector *backendmetrics.Collector, cleaner cleaner.Cleaner, subsConfig env.DefaultSubscriptionConfig, logger *logger.Logger) *JetStream
func (*JetStream) DeleteInvalidConsumers ¶
func (js *JetStream) DeleteInvalidConsumers(subscriptions []eventingv1alpha2.Subscription) error
DeleteInvalidConsumers deletes all JetStream consumers having no subscription event types in subscription resources.
func (*JetStream) DeleteSubscription ¶
func (js *JetStream) DeleteSubscription(subscription *eventingv1alpha2.Subscription) error
func (*JetStream) DeleteSubscriptionsOnly ¶
func (js *JetStream) DeleteSubscriptionsOnly(subscription *eventingv1alpha2.Subscription) error
func (*JetStream) GetConfig ¶
func (js *JetStream) GetConfig() env.NATSConfig
func (*JetStream) GetJetStreamContext ¶
func (js *JetStream) GetJetStreamContext() nats.JetStreamContext
GetJetStreamContext returns the current JetStreamContext.
func (*JetStream) GetJetStreamSubject ¶
func (js *JetStream) GetJetStreamSubject(source, subject string, typeMatching eventingv1alpha2.TypeMatching) string
GetJetStreamSubject appends the prefix and the cleaned source to subject.
func (*JetStream) GetJetStreamSubjects ¶
func (js *JetStream) GetJetStreamSubjects(source string, subjects []string, typeMatching eventingv1alpha2.TypeMatching) []string
GetJetStreamSubjects returns a list of subjects appended with prefix if needed.
func (*JetStream) GetNATSSubscriptions ¶
func (js *JetStream) GetNATSSubscriptions() map[SubscriptionSubjectIdentifier]Subscriber
GetNATSSubscriptions returns the map which contains details of all NATS subscriptions and consumers. Use this only for testing purposes.
func (*JetStream) Initialize ¶
func (js *JetStream) Initialize(connCloseHandler backendutils.ConnClosedHandler) error
func (*JetStream) SyncSubscription ¶
func (js *JetStream) SyncSubscription(subscription *eventingv1alpha2.Subscription) error
type Subscriber ¶
type Subscription ¶
type Subscription struct {
*nats.Subscription
}
func (Subscription) SubscriptionSubject ¶
func (js Subscription) SubscriptionSubject() string
type SubscriptionSubjectIdentifier ¶
type SubscriptionSubjectIdentifier struct {
// contains filtered or unexported fields
}
SubscriptionSubjectIdentifier is used to uniquely identify a Subscription subject. It should be used only with JetStream backend.
func NewSubscriptionSubjectIdentifier ¶
func NewSubscriptionSubjectIdentifier(subscription *eventingv1alpha2.Subscription, subject string) SubscriptionSubjectIdentifier
NewSubscriptionSubjectIdentifier returns a new SubscriptionSubjectIdentifier instance.
func (SubscriptionSubjectIdentifier) ConsumerName ¶
func (s SubscriptionSubjectIdentifier) ConsumerName() string
ConsumerName returns the JetStream consumer name.
func (SubscriptionSubjectIdentifier) NamespacedName ¶
func (s SubscriptionSubjectIdentifier) NamespacedName() string
NamespacedName returns the Kubernetes namespaced name.
type TestEnvironment ¶
type TestEnvironment struct {
// contains filtered or unexported fields
}
TestEnvironment provides mocked resources for tests.