Documentation ¶
Index ¶
- func NewStanConnection(config *StanConnConfig) (stan.Conn, error)
- type GobMarshaler
- type Marshaler
- type MarshalerUnmarshaler
- type StanConnConfig
- type StreamingPublisher
- type StreamingPublisherConfig
- type StreamingPublisherPublishConfig
- type StreamingSubscriber
- type StreamingSubscriberConfig
- type StreamingSubscriberSubscriptionConfig
- type Unmarshaler
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewStanConnection ¶ added in v0.4.0
func NewStanConnection(config *StanConnConfig) (stan.Conn, error)
Types ¶
type GobMarshaler ¶
type GobMarshaler struct{}
GobMarshaler is marshaller which is using Gob to marshal Watermill messages.
type MarshalerUnmarshaler ¶
type MarshalerUnmarshaler interface { Marshaler Unmarshaler }
type StanConnConfig ¶ added in v0.4.0
type StanConnConfig struct { // ClusterID is the NATS Streaming cluster ID. ClusterID string // ClientID is the NATS Streaming client ID to connect with. // ClientID can contain only alphanumeric and `-` or `_` characters. // // Using DurableName causes the NATS Streaming server to track // the last acknowledged message for that ClientID + DurableName. ClientID string // StanOptions are custom []stan.Option passed to the connection. // It is also used to provide connection parameters, for example: // stan.NatsURL("nats://localhost:4222") StanOptions []stan.Option }
type StreamingPublisher ¶
type StreamingPublisher struct {
// contains filtered or unexported fields
}
func NewStreamingPublisher ¶
func NewStreamingPublisher(config StreamingPublisherConfig, logger watermill.LoggerAdapter) (*StreamingPublisher, error)
NewStreamingPublisher creates a new StreamingPublisher.
When using custom NATS hostname, you should pass it by options StreamingPublisherConfig.StanOptions:
// ... StanOptions: []stan.Option{ stan.NatsURL("nats://your-nats-hostname:4222"), } // ...
func NewStreamingPublisherWithStanConn ¶ added in v0.4.0
func NewStreamingPublisherWithStanConn(conn stan.Conn, config StreamingPublisherPublishConfig, logger watermill.LoggerAdapter) (*StreamingPublisher, error)
func (StreamingPublisher) Close ¶
func (p StreamingPublisher) Close() error
type StreamingPublisherConfig ¶
type StreamingPublisherConfig struct { // ClusterID is the NATS Streaming cluster ID. ClusterID string // ClientID is the NATS Streaming client ID to connect with. // ClientID can contain only alphanumeric and `-` or `_` characters. ClientID string // StanOptions are custom options for a connection. StanOptions []stan.Option // Marshaler is marshaler used to marshal messages to stan format. Marshaler Marshaler }
func (StreamingPublisherConfig) GetStreamingPublisherPublishConfig ¶ added in v0.4.0
func (c StreamingPublisherConfig) GetStreamingPublisherPublishConfig() StreamingPublisherPublishConfig
func (StreamingPublisherConfig) Validate ¶
func (c StreamingPublisherConfig) Validate() error
type StreamingPublisherPublishConfig ¶ added in v0.4.0
type StreamingPublisherPublishConfig struct { // Marshaler is marshaler used to marshal messages to stan format. Marshaler Marshaler }
type StreamingSubscriber ¶
type StreamingSubscriber struct {
// contains filtered or unexported fields
}
func NewStreamingSubscriber ¶
func NewStreamingSubscriber(config StreamingSubscriberConfig, logger watermill.LoggerAdapter) (*StreamingSubscriber, error)
NewStreamingSubscriber creates a new StreamingSubscriber.
When using custom NATS hostname, you should pass it by options StreamingSubscriberConfig.StanOptions:
// ... StanOptions: []stan.Option{ stan.NatsURL("nats://your-nats-hostname:4222"), } // ...
func NewStreamingSubscriberWithStanConn ¶ added in v0.4.0
func NewStreamingSubscriberWithStanConn(conn stan.Conn, config StreamingSubscriberSubscriptionConfig, logger watermill.LoggerAdapter) (*StreamingSubscriber, error)
func (*StreamingSubscriber) Close ¶
func (s *StreamingSubscriber) Close() error
func (*StreamingSubscriber) Subscribe ¶
func (s *StreamingSubscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error)
Subscribe subscribes messages from NATS Streaming.
Subscribe will spawn SubscribersCount goroutines making subscribe.
func (*StreamingSubscriber) SubscribeInitialize ¶ added in v0.3.0
func (s *StreamingSubscriber) SubscribeInitialize(topic string) (err error)
type StreamingSubscriberConfig ¶
type StreamingSubscriberConfig struct { // ClusterID is the NATS Streaming cluster ID. ClusterID string // ClientID is the NATS Streaming client ID to connect with. // ClientID can contain only alphanumeric and `-` or `_` characters. // // Using DurableName causes the NATS Streaming server to track // the last acknowledged message for that ClientID + DurableName. ClientID string // QueueGroup is the NATS Streaming queue group. // // All subscriptions with the same queue name (regardless of the connection they originate from) // will form a queue group. Each message will be delivered to only one subscriber per queue group, // using queuing semantics. // // It is recommended to set it with DurableName. // For non durable queue subscribers, when the last member leaves the group, // that group is removed. A durable queue group (DurableName) allows you to have all members leave // but still maintain state. When a member re-joins, it starts at the last position in that group. // // When QueueGroup is empty, subscribe without QueueGroup will be used. QueueGroup string // DurableName is the NATS streaming durable name. // // Subscriptions may also specify a “durable name” which will survive client restarts. // Durable subscriptions cause the server to track the last acknowledged message // sequence number for a client and durable name. When the client restarts/resubscribes, // and uses the same client ID and durable name, the server will resume delivery beginning // with the earliest unacknowledged message for this durable subscription. // // Doing this causes the NATS Streaming server to track // the last acknowledged message for that ClientID + DurableName. DurableName string // SubscribersCount determines wow much concurrent subscribers should be started. SubscribersCount int // CloseTimeout determines how long subscriber will wait for Ack/Nack on close. // When no Ack/Nack is received after CloseTimeout, subscriber will be closed. CloseTimeout time.Duration // How long subscriber should wait for Ack/Nack. When no Ack/Nack was received, message will be redelivered. // It is mapped to stan.AckWait option. AckWaitTimeout time.Duration // StanOptions are custom []stan.Option passed to the connection. // It is also used to provide connection parameters, for example: // stan.NatsURL("nats://localhost:4222") StanOptions []stan.Option // StanSubscriptionOptions are custom []stan.SubscriptionOption passed to subscription. StanSubscriptionOptions []stan.SubscriptionOption // Unmarshaler is an unmarshaler used to unmarshaling messages from NATS format to Watermill format. Unmarshaler Unmarshaler }
func (*StreamingSubscriberConfig) GetStreamingSubscriberSubscriptionConfig ¶ added in v0.4.0
func (c *StreamingSubscriberConfig) GetStreamingSubscriberSubscriptionConfig() StreamingSubscriberSubscriptionConfig
type StreamingSubscriberSubscriptionConfig ¶ added in v0.4.0
type StreamingSubscriberSubscriptionConfig struct { // StanSubscriptionOptions are custom []stan.SubscriptionOption passed to subscription. StanSubscriptionOptions []stan.SubscriptionOption // Unmarshaler is an unmarshaler used to unmarshaling messages from NATS format to Watermill format. Unmarshaler Unmarshaler // QueueGroup is the NATS Streaming queue group. // // All subscriptions with the same queue name (regardless of the connection they originate from) // will form a queue group. Each message will be delivered to only one subscriber per queue group, // using queuing semantics. // // It is recommended to set it with DurableName. // For non durable queue subscribers, when the last member leaves the group, // that group is removed. A durable queue group (DurableName) allows you to have all members leave // but still maintain state. When a member re-joins, it starts at the last position in that group. // // When QueueGroup is empty, subscribe without QueueGroup will be used. QueueGroup string // DurableName is the NATS streaming durable name. // // Subscriptions may also specify a “durable name” which will survive client restarts. // Durable subscriptions cause the server to track the last acknowledged message // sequence number for a client and durable name. When the client restarts/resubscribes, // and uses the same client ID and durable name, the server will resume delivery beginning // with the earliest unacknowledged message for this durable subscription. // // Doing this causes the NATS Streaming server to track // the last acknowledged message for that ClientID + DurableName. DurableName string // SubscribersCount determines wow much concurrent subscribers should be started. SubscribersCount int // How long subscriber should wait for Ack/Nack. When no Ack/Nack was received, message will be redelivered. // It is mapped to stan.AckWait option. AckWaitTimeout time.Duration // CloseTimeout determines how long subscriber will wait for Ack/Nack on close. // When no Ack/Nack is received after CloseTimeout, subscriber will be closed. CloseTimeout time.Duration }
func (*StreamingSubscriberSubscriptionConfig) Validate ¶ added in v0.4.0
func (c *StreamingSubscriberSubscriptionConfig) Validate() error
type Unmarshaler ¶
Click to show internal directories.
Click to hide internal directories.