jetstream

package
v2.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 3, 2023 License: MIT Imports: 11 Imported by: 0

Documentation

Overview

Package jetstream is a beta release and should not be considered a stable API at this time. Targeting a first stable release in v2.1

Index

Constants

View Source
const TermSignal = time.Duration(-1)

TermSignal if this duration was returned, event will be term`ed

Variables

This section is empty.

Functions

This section is empty.

Types

type ConsumerConfigurator

type ConsumerConfigurator func(string, string) jetstream.ConsumerConfig

type Delay

type Delay interface {
	// WaitTime return time.Duration that we need to wait.
	// retryNum is how many times WaitTime was called for
	// specific message
	WaitTime(retryNum uint64) time.Duration
}

type MaxRetryDelay

type MaxRetryDelay struct {
	StaticDelay
	// contains filtered or unexported fields
}

MaxRetryDelay delay that returns the same time.Duration up to a maximum before sending term

func NewMaxRetryDelay

func NewMaxRetryDelay(delay time.Duration, retryLimit uint64) MaxRetryDelay

func (MaxRetryDelay) WaitTime

func (s MaxRetryDelay) WaitTime(retryNum uint64) time.Duration

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher provides a watermill publisher interface to NATS JetStream

func NewPublisher

func NewPublisher(config PublisherConfig) (*Publisher, error)

NewPublisher creates a new watermill JetStream publisher. This middleware is currently considered an experimental / beta release - for production use it is recommended to use watermill-nats/pkg/nats.Publisher with JetStream enabled.

func (*Publisher) Close

func (p *Publisher) Close() error

Close closes the publisher and its underlying connection

func (*Publisher) Publish

func (p *Publisher) Publish(topic string, messages ...*message.Message) error

Publish sends provided watermill messages to the given topic.

type PublisherConfig

type PublisherConfig struct {
	// URL is the path to the NATS jetstream-enabled broker
	URL string
	// Conn is an optional *nats.Conn that can be provided instead of using a watermill-managed connection to URL
	Conn *nats.Conn
	// Logger is a watermill logger (defaults to stdout with debug / trace disabled)
	Logger watermill.LoggerAdapter
	// ConfigureStream is a custom function that can be used to define stream configuration from a topic.  Publisher uses it to calculate publish destination from topic.
	ConfigureStream StreamConfigurator
}

PublisherConfig defines the watermill configuration for a JetStream publisher

type ResourceInitializer

type ResourceInitializer func(ctx context.Context, js jetstream.JetStream, topic string) (
	jetstream.Consumer,
	func(context.Context, watermill.LoggerAdapter),
	error)

func EphemeralConsumer

func EphemeralConsumer() ResourceInitializer

EphemeralConsumer builds a callback to create a consumer, returning a function that will be used to delete the broker-managed consumer.

func ExistingConsumer

func ExistingConsumer(consumerNamer ConsumerConfigurator, group string) ResourceInitializer

ExistingConsumer is used to connect to a stream/consumer that already exist with the given topic name - it will not attempt creation of any broker-managed resources. It takes as an argument a function to transform the topic into a consumer name, passing nil will invoke the default behavior consumerName := fmt.Sprintf("watermill__%s", topic)

func GroupedConsumer

func GroupedConsumer(groupName string) ResourceInitializer

GroupedConsumer builds a callback to create a consumer in the given group. The closing function is not returned since a single subscription in the group cannot know when the backing consumer should be deleted.

type StaticDelay

type StaticDelay struct {
	Delay time.Duration
}

StaticDelay delay that always return the same time.Duration

func NewStaticDelay

func NewStaticDelay(delay time.Duration) StaticDelay

func (StaticDelay) WaitTime

func (s StaticDelay) WaitTime(retryNum uint64) time.Duration

type StreamConfigurator

type StreamConfigurator func(string) jetstream.StreamConfig

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber provides a watermill subscriber interface to NATS JetStream

func NewSubscriber

func NewSubscriber(config SubscriberConfig) (*Subscriber, error)

NewSubscriber creates a new watermill JetStream subscriber. This middleware is currently considered an experimental / beta release - for production use it is recommended to use watermill-nats/pkg/nats.Subscriber with JetStream enabled.

func (*Subscriber) Close

func (s *Subscriber) Close() error

Close closes the subscriber and signals to close any subscriptions it created along with the underlying connection.

func (*Subscriber) Subscribe

func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error)

Subscribe establishes a JetStream subscription to the given topic.

func (*Subscriber) SubscribeInitialize

func (s *Subscriber) SubscribeInitialize(topic string) error

SubscribeInitialize offers a way to ensure the stream for a topic exists prior to subscribe

type SubscriberConfig

type SubscriberConfig struct {
	// URL is the path to the NATS jetstream-enabled broker
	URL string
	// Conn is an optional *nats.Conn that can be provided instead of using a watermill-managed connection to URL
	// TODO: should we expose jetstream here?  Currently need the NATS conn for graceful subscription shutdown:
	// https://github.com/nats-io/nats.go/issues/1328
	Conn *nats.Conn
	// Logger is a watermill logger (defaults to stdout with debug / trace disabled)
	Logger watermill.LoggerAdapter
	// AckWaitTimeout is how long watermill should wait for your application to finish processing a given message
	AckWaitTimeout time.Duration
	// ResourceInitializer is a custom function to turn a topic and consumer group into the necessary jetstream resources
	ResourceInitializer ResourceInitializer
	// NakDelay provides a delay function that can be used to delay reprocessing and eventually terminate
	NakDelay Delay
	// ConfigureStream is a custom function that can be used to define stream configuration from a topic.  Publisher uses it to calculate publish destination from topic.
	ConfigureStream StreamConfigurator
	// ConfigureConsumer is a custom function that can be used to define consumer configuration from a topic.  Publisher uses it to calculate publish destination from topic.
	ConfigureConsumer ConsumerConfigurator
}

SubscriberConfig defines the watermill configuration for a JetStream subscriber

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL