Documentation ¶
Overview ¶
Package jetstream is a beta release and should not be considered a stable API at this time. Targeting a first stable release in v2.1
Index ¶
Constants ¶
const TermSignal = time.Duration(-1)
TermSignal if this duration was returned, event will be term`ed
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConsumerConfigurator ¶
type ConsumerConfigurator func(string, string) jetstream.ConsumerConfig
type MaxRetryDelay ¶
type MaxRetryDelay struct { StaticDelay // contains filtered or unexported fields }
MaxRetryDelay delay that returns the same time.Duration up to a maximum before sending term
func NewMaxRetryDelay ¶
func NewMaxRetryDelay(delay time.Duration, retryLimit uint64) MaxRetryDelay
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher provides a watermill publisher interface to NATS JetStream
func NewPublisher ¶
func NewPublisher(config PublisherConfig) (*Publisher, error)
NewPublisher creates a new watermill JetStream publisher. This middleware is currently considered an experimental / beta release - for production use it is recommended to use watermill-nats/pkg/nats.Publisher with JetStream enabled.
type PublisherConfig ¶
type PublisherConfig struct { // URL is the path to the NATS jetstream-enabled broker URL string // Conn is an optional *nats.Conn that can be provided instead of using a watermill-managed connection to URL Conn *nats.Conn // Logger is a watermill logger (defaults to stdout with debug / trace disabled) Logger watermill.LoggerAdapter // ConfigureStream is a custom function that can be used to define stream configuration from a topic. Publisher uses it to calculate publish destination from topic. ConfigureStream StreamConfigurator }
PublisherConfig defines the watermill configuration for a JetStream publisher
type ResourceInitializer ¶
type ResourceInitializer func(ctx context.Context, js jetstream.JetStream, topic string) ( jetstream.Consumer, func(context.Context, watermill.LoggerAdapter), error)
func EphemeralConsumer ¶
func EphemeralConsumer() ResourceInitializer
EphemeralConsumer builds a callback to create a consumer, returning a function that will be used to delete the broker-managed consumer.
func ExistingConsumer ¶
func ExistingConsumer(consumerNamer ConsumerConfigurator, group string) ResourceInitializer
ExistingConsumer is used to connect to a stream/consumer that already exist with the given topic name - it will not attempt creation of any broker-managed resources. It takes as an argument a function to transform the topic into a consumer name, passing nil will invoke the default behavior consumerName := fmt.Sprintf("watermill__%s", topic)
func GroupedConsumer ¶
func GroupedConsumer(groupName string) ResourceInitializer
GroupedConsumer builds a callback to create a consumer in the given group. The closing function is not returned since a single subscription in the group cannot know when the backing consumer should be deleted.
type StaticDelay ¶
StaticDelay delay that always return the same time.Duration
func NewStaticDelay ¶
func NewStaticDelay(delay time.Duration) StaticDelay
type StreamConfigurator ¶
type StreamConfigurator func(string) jetstream.StreamConfig
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber provides a watermill subscriber interface to NATS JetStream
func NewSubscriber ¶
func NewSubscriber(config SubscriberConfig) (*Subscriber, error)
NewSubscriber creates a new watermill JetStream subscriber. This middleware is currently considered an experimental / beta release - for production use it is recommended to use watermill-nats/pkg/nats.Subscriber with JetStream enabled.
func (*Subscriber) Close ¶
func (s *Subscriber) Close() error
Close closes the subscriber and signals to close any subscriptions it created along with the underlying connection.
func (*Subscriber) SubscribeInitialize ¶
func (s *Subscriber) SubscribeInitialize(topic string) error
SubscribeInitialize offers a way to ensure the stream for a topic exists prior to subscribe
type SubscriberConfig ¶
type SubscriberConfig struct { // URL is the path to the NATS jetstream-enabled broker URL string // Conn is an optional *nats.Conn that can be provided instead of using a watermill-managed connection to URL // TODO: should we expose jetstream here? Currently need the NATS conn for graceful subscription shutdown: // https://github.com/nats-io/nats.go/issues/1328 Conn *nats.Conn // Logger is a watermill logger (defaults to stdout with debug / trace disabled) Logger watermill.LoggerAdapter // AckWaitTimeout is how long watermill should wait for your application to finish processing a given message AckWaitTimeout time.Duration // ResourceInitializer is a custom function to turn a topic and consumer group into the necessary jetstream resources ResourceInitializer ResourceInitializer // NakDelay provides a delay function that can be used to delay reprocessing and eventually terminate NakDelay Delay // ConfigureStream is a custom function that can be used to define stream configuration from a topic. Publisher uses it to calculate publish destination from topic. ConfigureStream StreamConfigurator // ConfigureConsumer is a custom function that can be used to define consumer configuration from a topic. Publisher uses it to calculate publish destination from topic. ConfigureConsumer ConsumerConfigurator }
SubscriberConfig defines the watermill configuration for a JetStream subscriber