Documentation ¶
Index ¶
- Constants
- type Connection
- type Delay
- type DurableCalculator
- type GobMarshaler
- type JSONMarshaler
- type JetStreamConfig
- type Marshaler
- type MarshalerUnmarshaler
- type MaxRetryDelay
- type NATSMarshaler
- type Publisher
- type PublisherConfig
- type PublisherPublishConfig
- type StaticDelay
- type SubjectCalculator
- type SubjectDetail
- type Subscriber
- type SubscriberConfig
- type SubscriberSubscriptionConfig
- type Unmarshaler
Constants ¶
const TermSignal = time.Duration(-1)
TermSignal if this duration was returned, event will be term`ed
const WatermillUUIDHdr = "_watermill_message_uuid"
reserved header for NATSMarshaler to send UUID
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Connection ¶
type Connection interface { // QueueSubscribe subscribes to a NATS subject, equivalent to default Subscribe if queuegroup not supplied. QueueSubscribe(string, string, nats.MsgHandler) (*nats.Subscription, error) // PublishMsg sends the provided NATS message to the broker. PublishMsg(*nats.Msg) error // Drain will end all active subscription interest and attempt to wait for in-flight messages to process before closing. Drain() error // Close will close the connection Close() }
type DurableCalculator ¶
type GobMarshaler ¶
type GobMarshaler struct{}
GobMarshaler is marshaller which is using Gob to marshal Watermill messages.
type JSONMarshaler ¶
type JSONMarshaler struct{}
JSONMarshaler uses encoding/json to marshal Watermill messages.
type JetStreamConfig ¶
type JetStreamConfig struct { // Disabled controls whether JetStream semantics should be used Disabled bool // AutoProvision indicates the application should create the configured stream if missing on the broker AutoProvision bool // ConnectOptions contains JetStream-specific options to be used when establishing context ConnectOptions []nats.JSOpt // SubscribeOptions contains options to be used when establishing subscriptions SubscribeOptions []nats.SubOpt // PublishOptions contains options to be sent on every publish operation PublishOptions []nats.PubOpt // TrackMsgId uses the Nats.MsgId option with the msg UUID to prevent duplication (needed for exactly once processing) TrackMsgId bool // AckAsync enables asynchronous acknowledgement AckAsync bool // DurablePrefix is the prefix used by to derive the durable name from the topic. // // By default the prefix will be used on its own to form the durable name. This only allows use // of a single subscription per configuration. For more flexibility provide a DurableCalculator // that will receive durable prefix + topic. // // Subscriptions may also specify a “durable name” which will survive client restarts. // Durable subscriptions cause the server to track the last acknowledged message // sequence number for a client and durable name. When the client restarts/resubscribes, // and uses the same client ID and durable name, the server will resume delivery beginning // with the earliest unacknowledged message for this durable subscription. // // Doing this causes the JetStream server to track // the last acknowledged message for that ClientID + Durable. DurablePrefix string // DurableCalculator is a custom function used to derive a durable name from a topic + durable prefix DurableCalculator DurableCalculator }
JetStreamConfig contains configuration settings specific to running in JetStream mode
func (JetStreamConfig) CalculateDurableName ¶
func (c JetStreamConfig) CalculateDurableName(topic string) string
func (JetStreamConfig) ShouldAutoProvision ¶
func (c JetStreamConfig) ShouldAutoProvision() bool
type Marshaler ¶
type Marshaler interface { // Marshal transforms a watermill message into NATS wire format. Marshal(topic string, msg *message.Message) (*nats.Msg, error) }
Marshaler provides transport encoding functions
type MarshalerUnmarshaler ¶
type MarshalerUnmarshaler interface { Marshaler Unmarshaler }
MarshalerUnmarshaler provides both Marshaler and Unmarshaler implementations
type MaxRetryDelay ¶
type MaxRetryDelay struct { StaticDelay // contains filtered or unexported fields }
MaxRetryDelay delay that returns the same time.Duration up to a maximum before sending term
func NewMaxRetryDelay ¶
func NewMaxRetryDelay(delay time.Duration, retryLimit uint64) MaxRetryDelay
type NATSMarshaler ¶
type NATSMarshaler struct{}
NATSMarshaler uses NATS header to marshal directly between watermill and NATS formats. The watermill UUID is stored at _watermill_message_uuid
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher provides the nats implementation for watermill publish operations
func NewPublisher ¶
func NewPublisher(config PublisherConfig, logger watermill.LoggerAdapter) (*Publisher, error)
NewPublisher creates a new Publisher.
func NewPublisherWithNatsConn ¶
func NewPublisherWithNatsConn(conn *nats.Conn, config PublisherPublishConfig, logger watermill.LoggerAdapter) (*Publisher, error)
NewPublisherWithNatsConn creates a new Publisher with the provided nats connection.
type PublisherConfig ¶
type PublisherConfig struct { // URL is the NATS URL. URL string // NatsOptions are custom options for a connection. NatsOptions []nats.Option // Marshaler is marshaler used to marshal messages between watermill and wire formats Marshaler Marshaler // SubjectCalculator is a function used to transform a topic to an array of subjects on creation (defaults to topic as Primary and queueGroupPrefix as QueueGroup) SubjectCalculator SubjectCalculator // JetStream holds JetStream specific settings JetStream JetStreamConfig }
PublisherConfig is the configuration to create a publisher
func (PublisherConfig) GetPublisherPublishConfig ¶
func (c PublisherConfig) GetPublisherPublishConfig() PublisherPublishConfig
GetPublisherPublishConfig gets the configuration subset needed for individual publish calls once a connection has been established
func (PublisherConfig) Validate ¶
func (c PublisherConfig) Validate() error
Validate ensures configuration is valid before use
type PublisherPublishConfig ¶
type PublisherPublishConfig struct { // Marshaler is marshaler used to marshal messages between watermill and wire formats Marshaler Marshaler // SubjectCalculator is a function used to transform a topic to an array of subjects on creation (defaults to topic as Primary and queueGroupPrefix as QueueGroup) SubjectCalculator SubjectCalculator // JetStream holds JetStream specific settings JetStream JetStreamConfig }
PublisherPublishConfig is the configuration subset needed for an individual publish call
type StaticDelay ¶
StaticDelay delay that always return the same time.Duration
func NewStaticDelay ¶
func NewStaticDelay(delay time.Duration) StaticDelay
type SubjectCalculator ¶
type SubjectCalculator func(queueGroupPrefix, topic string) *SubjectDetail
SubjectCalculator is a function used to calculate nats subject(s) for the given topic.
type SubjectDetail ¶
SubjectDetail contains jetstream subject detail (primary + all additional) along with durable and queue group names for a given watermill topic.
func DefaultSubjectCalculator ¶
func DefaultSubjectCalculator(queueGroupPrefix, topic string) *SubjectDetail
func (*SubjectDetail) All ¶
func (s *SubjectDetail) All() []string
All combines the primary and all additional subjects for use by the jetstream client on creation.
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber provides the nats implementation for watermill subscribe operations
func NewSubscriber ¶
func NewSubscriber(config SubscriberConfig, logger watermill.LoggerAdapter) (*Subscriber, error)
NewSubscriber creates a new Subscriber.
func NewSubscriberWithNatsConn ¶
func NewSubscriberWithNatsConn(conn *nats.Conn, config SubscriberSubscriptionConfig, logger watermill.LoggerAdapter) (*Subscriber, error)
NewSubscriberWithNatsConn creates a new Subscriber with the provided nats connection.
func (*Subscriber) Close ¶
func (s *Subscriber) Close() error
Close closes the publisher and the underlying connection. It will attempt to wait for in-flight messages to complete.
func (*Subscriber) SubscribeInitialize ¶
func (s *Subscriber) SubscribeInitialize(topic string) error
SubscribeInitialize offers a way to ensure the stream for a topic exists prior to subscribe
type SubscriberConfig ¶
type SubscriberConfig struct { // URL is the URL to the broker URL string // QueueGroupPrefix is the prefix used by SubjectCalculator to derive queue group from the topic. // // All subscriptions with the same queue name (regardless of the connection they originate from) // will form a queue group. Each message will be delivered to only one subscriber per queue group, // using queuing semantics. // // For JetStream is recommended to set it with DurablePrefix. // For non durable queue subscribers, when the last member leaves the group, // that group is removed. A durable queue group (DurablePrefix) allows you to have all members leave // but still maintain state. When a member re-joins, it starts at the last position in that group. QueueGroupPrefix string // SubscribersCount determines how many concurrent subscribers should be started. SubscribersCount int // CloseTimeout determines how long subscriber will wait for Ack/Nack on close. // When no Ack/Nack is received after CloseTimeout, subscriber will be closed. CloseTimeout time.Duration // How long subscriber should wait for Ack/Nack. When no Ack/Nack was received, message will be redelivered. AckWaitTimeout time.Duration // SubscribeTimeout determines how long subscriber will wait for a successful subscription SubscribeTimeout time.Duration // NatsOptions are custom []nats.Option passed to the connection. // It is also used to provide connection parameters, for example: // nats.URL("nats://localhost:4222") NatsOptions []nats.Option // Unmarshaler is an unmarshaler used to unmarshaling messages from NATS format to Watermill format. Unmarshaler Unmarshaler // SubjectCalculator is a function used to transform a topic to an array of subjects on creation (defaults to topic as Primary and queueGroupPrefix as QueueGroup) SubjectCalculator SubjectCalculator // NakDelay sets duration after which the NACKed message will be resent. // By default, it's NACKed without delay. NakDelay Delay // JetStream holds JetStream specific settings JetStream JetStreamConfig }
SubscriberConfig is the configuration to create a subscriber
func (*SubscriberConfig) GetSubscriberSubscriptionConfig ¶
func (c *SubscriberConfig) GetSubscriberSubscriptionConfig() SubscriberSubscriptionConfig
GetSubscriberSubscriptionConfig gets the configuration subset needed for individual subscribe calls once a connection has been established
type SubscriberSubscriptionConfig ¶
type SubscriberSubscriptionConfig struct { // Unmarshaler is an unmarshaler used to unmarshaling messages from NATS format to Watermill format. Unmarshaler Unmarshaler // SubscribersCount determines wow much concurrent subscribers should be started. SubscribersCount int // How long subscriber should wait for Ack/Nack. When no Ack/Nack was received, message will be redelivered. AckWaitTimeout time.Duration // CloseTimeout determines how long subscriber will wait for Ack/Nack on close. // When no Ack/Nack is received after CloseTimeout, subscriber will be closed. CloseTimeout time.Duration // SubscribeTimeout determines how long subscriber will wait for a successful subscription SubscribeTimeout time.Duration // SubjectCalculator is a function used to transform a topic to an array of subjects on creation (defaults to topic as Primary and queueGroupPrefix as QueueGroup) SubjectCalculator SubjectCalculator // NakDelay sets duration after which the NACKed message will be resent. // By default, it's NACKed without delay. NakDelay Delay // JetStream holds JetStream specific settings JetStream JetStreamConfig // QueueGroupPrefix is the prefix used by SubjectCalculator to derive queue group from the topic. // // All subscriptions with the same queue name (regardless of the connection they originate from) // will form a queue group. Each message will be delivered to only one subscriber per queue group, // using queuing semantics. // // For JetStream is recommended to set it with DurablePrefix. // For non durable queue subscribers, when the last member leaves the group, // that group is removed. A durable queue group (DurablePrefix) allows you to have all members leave // but still maintain state. When a member re-joins, it starts at the last position in that group. QueueGroupPrefix string }
SubscriberSubscriptionConfig is the configurationz
func (*SubscriberSubscriptionConfig) Validate ¶
func (c *SubscriberSubscriptionConfig) Validate() error
Validate ensures configuration is valid before use
type Unmarshaler ¶
type Unmarshaler interface { // Unmarshal produces a watermill message from NATS wire format. Unmarshal(*nats.Msg) (*message.Message, error) }
Unmarshaler provides transport decoding function