Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type GobMarshaler ¶
type GobMarshaler struct{}
GobMarshaler is marshaller which is using Gob to marshal Watermill messages.
type JSONMarshaler ¶ added in v0.0.4
type JSONMarshaler struct{}
JSONMarshaler uses encoding/json to marshal Watermill messages.
type MarshalerUnmarshaler ¶
type MarshalerUnmarshaler interface { Marshaler Unmarshaler }
type NATSMarshaler ¶ added in v0.0.4
type NATSMarshaler struct{}
NATSMarshaler uses NATS header to marshal directly between watermill and NATS formats. The watermill UUID is stored at _watermill_message_uuid
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher provides the jetstream implementation for watermill publish operations
func NewPublisher ¶
func NewPublisher(config PublisherConfig, logger watermill.LoggerAdapter) (*Publisher, error)
NewPublisher creates a new Publisher.
func NewPublisherWithNatsConn ¶
func NewPublisherWithNatsConn(conn *nats.Conn, config PublisherPublishConfig, logger watermill.LoggerAdapter) (*Publisher, error)
NewPublisherWithNatsConn creates a new Publisher with the provided nats connection.
type PublisherConfig ¶
type PublisherConfig struct { // URL is the NATS URL. URL string // NatsOptions are custom options for a connection. NatsOptions []nats.Option // JetstreamOptions are custom Jetstream options for a connection. JetstreamOptions []nats.JSOpt // Marshaler is marshaler used to marshal messages between watermill and wire formats Marshaler Marshaler // SubjectCalculator is a function used to transform a topic to an array of subjects on creation (defaults to "{topic}.*") SubjectCalculator SubjectCalculator // AutoProvision bypasses client validation and provisioning of streams AutoProvision bool // PublishOptions are custom publish option to be used on all publication PublishOptions []nats.PubOpt // TrackMsgId uses the Nats.MsgId option with the msg UUID to prevent duplication TrackMsgId bool }
PublisherConfig is the configuration to create a publisher
func (PublisherConfig) GetPublisherPublishConfig ¶
func (c PublisherConfig) GetPublisherPublishConfig() PublisherPublishConfig
GetPublisherPublishConfig gets the configuration subset needed for individual publish calls once a connection has been established
func (PublisherConfig) Validate ¶
func (c PublisherConfig) Validate() error
Validate ensures configuration is valid before use
type PublisherPublishConfig ¶
type PublisherPublishConfig struct { // Marshaler is marshaler used to marshal messages between watermill and wire formats Marshaler Marshaler // SubjectCalculator is a function used to transform a topic to an array of subjects on creation (defaults to "{topic}.*") SubjectCalculator SubjectCalculator // AutoProvision bypasses client validation and provisioning of streams AutoProvision bool // JetstreamOptions are custom Jetstream options for a connection. JetstreamOptions []nats.JSOpt // PublishOptions are custom publish option to be used on all publication PublishOptions []nats.PubOpt // TrackMsgId uses the Nats.MsgId option with the msg UUID to prevent duplication TrackMsgId bool }
PublisherPublishConfig is the configuration subset needed for an individual publish call
type SubjectCalculator ¶ added in v0.0.2
SubjectCalculator is a function used to calculate nats subject(s) for the given topic.
type Subjects ¶ added in v0.0.3
Subjects contains nats subject detail (primary + all additional) for a given watermill topic.
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber provides the jetstream implementation for watermill subscribe operations
func NewSubscriber ¶
func NewSubscriber(config SubscriberConfig, logger watermill.LoggerAdapter) (*Subscriber, error)
NewSubscriber creates a new Subscriber.
func NewSubscriberWithNatsConn ¶
func NewSubscriberWithNatsConn(conn *nats.Conn, config SubscriberSubscriptionConfig, logger watermill.LoggerAdapter) (*Subscriber, error)
NewSubscriberWithNatsConn creates a new Subscriber with the provided nats connection.
func (*Subscriber) Close ¶
func (s *Subscriber) Close() error
Close closes the publisher and the underlying connection. It will attempt to wait for in-flight messages to complete.
func (*Subscriber) SubscribeInitialize ¶
func (s *Subscriber) SubscribeInitialize(topic string) error
SubscribeInitialize offers a way to ensure the stream for a topic exists prior to subscribe
type SubscriberConfig ¶
type SubscriberConfig struct { // URL is the URL to the broker URL string // QueueGroup is the JetStream queue group. // // All subscriptions with the same queue name (regardless of the connection they originate from) // will form a queue group. Each message will be delivered to only one subscriber per queue group, // using queuing semantics. // // It is recommended to set it with DurableName. // For non durable queue subscribers, when the last member leaves the group, // that group is removed. A durable queue group (DurableName) allows you to have all members leave // but still maintain state. When a member re-joins, it starts at the last position in that group. // // When QueueGroup is empty, subscribe without QueueGroup will be used. QueueGroup string // DurableName is the JetStream durable name. // // Subscriptions may also specify a “durable name” which will survive client restarts. // Durable subscriptions cause the server to track the last acknowledged message // sequence number for a client and durable name. When the client restarts/resubscribes, // and uses the same client ID and durable name, the server will resume delivery beginning // with the earliest unacknowledged message for this durable subscription. // // Doing this causes the JetStream server to track // the last acknowledged message for that ClientID + DurableName. DurableName string // SubscribersCount determines how many concurrent subscribers should be started. SubscribersCount int // CloseTimeout determines how long subscriber will wait for Ack/Nack on close. // When no Ack/Nack is received after CloseTimeout, subscriber will be closed. CloseTimeout time.Duration // How long subscriber should wait for Ack/Nack. When no Ack/Nack was received, message will be redelivered. // It is mapped to stan.AckWait option. AckWaitTimeout time.Duration // SubscribeTimeout determines how long subscriber will wait for a successful subscription SubscribeTimeout time.Duration // NatsOptions are custom []nats.Option passed to the connection. // It is also used to provide connection parameters, for example: // nats.URL("nats://localhost:4222") NatsOptions []nats.Option // JetstreamOptions are custom Jetstream options for a connection. JetstreamOptions []nats.JSOpt // Unmarshaler is an unmarshaler used to unmarshaling messages from NATS format to Watermill format. Unmarshaler Unmarshaler // SubscribeOptions defines nats options to be used when subscribing SubscribeOptions []nats.SubOpt // SubjectCalculator is a function used to transform a topic to an array of subjects on creation (defaults to "{topic}.*") SubjectCalculator SubjectCalculator // AutoProvision bypasses client validation and provisioning of streams AutoProvision bool // AckSync enables synchronous acknowledgement (needed for exactly once processing) AckSync bool }
SubscriberConfig is the configuration to create a subscriber
func (*SubscriberConfig) GetSubscriberSubscriptionConfig ¶ added in v0.0.2
func (c *SubscriberConfig) GetSubscriberSubscriptionConfig() SubscriberSubscriptionConfig
GetSubscriberSubscriptionConfig gets the configuration subset needed for individual subscribe calls once a connection has been established
type SubscriberSubscriptionConfig ¶
type SubscriberSubscriptionConfig struct { // Unmarshaler is an unmarshaler used to unmarshaling messages from NATS format to Watermill format. Unmarshaler Unmarshaler // QueueGroup is the JetStream queue group. // // All subscriptions with the same queue name (regardless of the connection they originate from) // will form a queue group. Each message will be delivered to only one subscriber per queue group, // using queuing semantics. // // It is recommended to set it with DurableName. // For non durable queue subscribers, when the last member leaves the group, // that group is removed. A durable queue group (DurableName) allows you to have all members leave // but still maintain state. When a member re-joins, it starts at the last position in that group. // // When QueueGroup is empty, subscribe without QueueGroup will be used. QueueGroup string // DurableName is the JetStream durable name. // // Subscriptions may also specify a “durable name” which will survive client restarts. // Durable subscriptions cause the server to track the last acknowledged message // sequence number for a client and durable name. When the client restarts/resubscribes, // and uses the same client ID and durable name, the server will resume delivery beginning // with the earliest unacknowledged message for this durable subscription. // // Doing this causes the JetStream server to track // the last acknowledged message for that ClientID + DurableName. DurableName string // SubscribersCount determines wow much concurrent subscribers should be started. SubscribersCount int // How long subscriber should wait for Ack/Nack. When no Ack/Nack was received, message will be redelivered. // It is mapped to stan.AckWait option. AckWaitTimeout time.Duration // CloseTimeout determines how long subscriber will wait for Ack/Nack on close. // When no Ack/Nack is received after CloseTimeout, subscriber will be closed. CloseTimeout time.Duration // SubscribeTimeout determines how long subscriber will wait for a successful subscription SubscribeTimeout time.Duration // JetstreamOptions are custom Jetstream options for a connection. JetstreamOptions []nats.JSOpt // SubscribeOptions defines nats options to be used when subscribing SubscribeOptions []nats.SubOpt // SubjectCalculator is a function used to transform a topic to an array of subjects on creation (defaults to "{topic}.*") SubjectCalculator SubjectCalculator // AutoProvision bypasses client validation and provisioning of streams AutoProvision bool // AckSync enables synchronous acknowledgement (needed for exactly once processing) AckSync bool }
SubscriberSubscriptionConfig is the configurationz
func (*SubscriberSubscriptionConfig) Validate ¶
func (c *SubscriberSubscriptionConfig) Validate() error
Validate ensures configuration is valid before use