jetstream

package
v0.0.0-...-a7b4c27 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 10, 2024 License: Apache-2.0 Imports: 32 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultStreamName             = "kyma"
	DefaultJetStreamSubjectPrefix = "kyma"
	DefaultMaxReconnects          = 10
	DefaultMaxInFlights           = 10
)
View Source
const (
	StorageTypeMemory = "memory"
	StorageTypeFile   = "file"

	RetentionPolicyLimits   = "limits"
	RetentionPolicyInterest = "interest"

	DiscardPolicyNew = "new"
	DiscardPolicyOld = "old"

	ConsumerDeliverPolicyAll            = "all"
	ConsumerDeliverPolicyLast           = "last"
	ConsumerDeliverPolicyLastPerSubject = "last_per_subject"
	ConsumerDeliverPolicyNew            = "new"
)

Variables

View Source
var (
	ErrMissingSubscription = errors.New("failed to find a NATS subscription for a subject")
	ErrAddConsumer         = errors.New("failed to add a consumer")
	ErrGetConsumer         = errors.New("failed to get consumer info")
	ErrUpdateConsumer      = errors.New("failed to update consumer")
	ErrDeleteConsumer      = errors.New("failed to delete consumer")
	ErrFailedSubscribe     = errors.New("failed to create NATS JetStream subscription")
	ErrFailedUnsubscribe   = errors.New("failed to unsubscribe from NATS JetStream")

	ErrConnect           = errors.New("failed to connect to NATS JetStream")
	ErrEmptyStreamName   = errors.New("stream name cannot be empty")
	ErrStreamNameTooLong = fmt.Errorf("stream name should be max %d characters long", jsMaxStreamNameLength)
)
View Source
var (
	ErrInvalidStorageType     = errors.NewArgumentError("invalid stream storage type: %q")
	ErrInvalidRetentionPolicy = errors.NewArgumentError("invalid stream retention policy: %q")
	ErrInvalidDiscardPolicy   = errors.NewArgumentError("invalid stream discard policy: %q")
)

Functions

func AddJSCleanEventTypesToStatus

func AddJSCleanEventTypesToStatus(sub *v1alpha2.Subscription, cleaner cleaner.Cleaner)

func GetBackendJetStreamTypes

func GetBackendJetStreamTypes(subscription *eventingv1alpha2.Subscription,
	jsSubjects []string,
) ([]eventingv1alpha2.JetStreamTypes, error)

GetBackendJetStreamTypes gets the original event type and the consumer name for all the subscriptions and this slice is set as the backend specific status for JetStream.

func GetCleanEventTypes

func GetCleanEventTypes(sub *eventingv1alpha2.Subscription, cleaner cleaner.Cleaner) []eventingv1alpha2.EventType

GetCleanEventTypes returns a list of clean eventTypes from the unique types in the subscription.

func GetCleanEventTypesFromEventTypes

func GetCleanEventTypesFromEventTypes(eventTypes []eventingv1alpha2.EventType) []string

func NewNatsMessagePayload

func NewNatsMessagePayload(data, id, source, eventTime, eventType string) string

func SendCloudEventToJetStream

func SendCloudEventToJetStream(jetStreamClient *JetStream, subject, eventData, cetype string) error

func SendEventToJetStream

func SendEventToJetStream(jsClient *JetStream, data string) error

func StartNATSServer

func StartNATSServer(serverOpts ...eventingtesting.NatsServerOpt) (*server.Server, int, error)

func Validate

func Validate(natsConfig env.NATSConfig) error

Types

type Backend

type Backend interface {
	// Initialize should initialize the communication layer with the messaging backend system
	Initialize(connCloseHandler backendutils.ConnClosedHandler) error

	// Shutdown should stop all clients.
	Shutdown()

	// SyncSubscription should synchronize the Kyma eventing subscription with the subscriber infrastructure of JetStream.
	SyncSubscription(subscription *eventingv1alpha2.Subscription) error

	// DeleteSubscription should delete the corresponding subscriber data of messaging backend
	DeleteSubscription(subscription *eventingv1alpha2.Subscription) error

	// DeleteSubscriptionsOnly should delete the JetStream subscriptions only.
	// The corresponding JetStream consumers of the subscriptions must not be deleted.
	DeleteSubscriptionsOnly(subscription *eventingv1alpha2.Subscription) error

	// GetJetStreamSubjects returns a list of subjects appended with stream name and source as prefix if needed
	GetJetStreamSubjects(source string, subjects []string, typeMatching eventingv1alpha2.TypeMatching) []string

	// DeleteInvalidConsumers deletes all JetStream consumers having no subscription types in subscription resources
	DeleteInvalidConsumers(subscriptions []eventingv1alpha2.Subscription) error

	// GetJetStreamContext returns the current JetStreamContext
	GetJetStreamContext() nats.JetStreamContext

	// GetConfig returns the backends Configuration
	GetConfig() env.NATSConfig
}

type Builder

type Builder interface {
	Build() (ConnectionInterface, error)
}

func NewConnectionBuilder

func NewConnectionBuilder(config env.NATSConfig) Builder

type ConnectionBuilder

type ConnectionBuilder struct {
	// contains filtered or unexported fields
}

ConnectionBuilder helps in establishing a connection to NATS.

func (ConnectionBuilder) Build

Build connects to NATS and returns the connection. If an error occurs, ErrConnect is returned.

type ConnectionInterface

type ConnectionInterface interface {
	IsConnected() bool
	SetClosedHandler(cb nats.ConnHandler)
	SetReconnectHandler(rcb nats.ConnHandler)
	JetStream(opts ...nats.JSOpt) (nats.JetStreamContext, error)
}

ConnectionInterface is a contract for a NATS connection object.

type DefaultSubOpts

type DefaultSubOpts []nats.SubOpt

type JetStream

type JetStream struct {
	Config env.NATSConfig
	Conn   *nats.Conn
	// contains filtered or unexported fields
}

func NewJetStream

func NewJetStream(config env.NATSConfig, metricsCollector *backendmetrics.Collector,
	cleaner cleaner.Cleaner, subsConfig env.DefaultSubscriptionConfig, logger *logger.Logger,
) *JetStream

func (*JetStream) DeleteInvalidConsumers

func (js *JetStream) DeleteInvalidConsumers(subscriptions []eventingv1alpha2.Subscription) error

DeleteInvalidConsumers deletes all JetStream consumers having no subscription event types in subscription resources.

func (*JetStream) DeleteSubscription

func (js *JetStream) DeleteSubscription(subscription *eventingv1alpha2.Subscription) error

func (*JetStream) DeleteSubscriptionsOnly

func (js *JetStream) DeleteSubscriptionsOnly(subscription *eventingv1alpha2.Subscription) error

func (*JetStream) GetConfig

func (js *JetStream) GetConfig() env.NATSConfig

func (*JetStream) GetJetStreamContext

func (js *JetStream) GetJetStreamContext() nats.JetStreamContext

GetJetStreamContext returns the current JetStreamContext.

func (*JetStream) GetJetStreamSubject

func (js *JetStream) GetJetStreamSubject(source, subject string, typeMatching eventingv1alpha2.TypeMatching) string

GetJetStreamSubject appends the prefix and the cleaned source to subject.

func (*JetStream) GetJetStreamSubjects

func (js *JetStream) GetJetStreamSubjects(source string, subjects []string,
	typeMatching eventingv1alpha2.TypeMatching,
) []string

GetJetStreamSubjects returns a list of subjects appended with prefix if needed.

func (*JetStream) GetNATSSubscriptions

func (js *JetStream) GetNATSSubscriptions() map[SubscriptionSubjectIdentifier]Subscriber

GetNATSSubscriptions returns the map which contains details of all NATS subscriptions and consumers. Use this only for testing purposes.

func (*JetStream) Initialize

func (js *JetStream) Initialize(connCloseHandler backendutils.ConnClosedHandler) error

func (*JetStream) Shutdown

func (js *JetStream) Shutdown()

func (*JetStream) SyncSubscription

func (js *JetStream) SyncSubscription(subscription *eventingv1alpha2.Subscription) error

type Subscriber

type Subscriber interface {
	SubscriptionSubject() string
	ConsumerInfo() (*nats.ConsumerInfo, error)
	IsValid() bool
	Unsubscribe() error
	SetPendingLimits(msgLimit, byteLimit int) error
	PendingLimits() (int, int, error)
}

type Subscription

type Subscription struct {
	*nats.Subscription
}

func (Subscription) SubscriptionSubject

func (js Subscription) SubscriptionSubject() string

type SubscriptionSubjectIdentifier

type SubscriptionSubjectIdentifier struct {
	// contains filtered or unexported fields
}

SubscriptionSubjectIdentifier is used to uniquely identify a Subscription subject. It should be used only with JetStream backend.

func NewSubscriptionSubjectIdentifier

func NewSubscriptionSubjectIdentifier(subscription *eventingv1alpha2.Subscription,
	subject string,
) SubscriptionSubjectIdentifier

NewSubscriptionSubjectIdentifier returns a new SubscriptionSubjectIdentifier instance.

func (SubscriptionSubjectIdentifier) ConsumerName

func (s SubscriptionSubjectIdentifier) ConsumerName() string

ConsumerName returns the JetStream consumer name.

func (SubscriptionSubjectIdentifier) NamespacedName

func (s SubscriptionSubjectIdentifier) NamespacedName() string

NamespacedName returns the Kubernetes namespaced name.

type TestEnvironment

type TestEnvironment struct {
	// contains filtered or unexported fields
}

TestEnvironment provides mocked resources for tests.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL