Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type GobMarshaler ¶
type GobMarshaler struct{}
GobMarshaler is marshaller which is using Gob to marshal Watermill messages.
type MarshalerUnmarshaler ¶
type MarshalerUnmarshaler interface { Marshaler Unmarshaler }
type StreamingPublisher ¶
type StreamingPublisher struct {
// contains filtered or unexported fields
}
func NewStreamingPublisher ¶
func NewStreamingPublisher(config StreamingPublisherConfig, logger watermill.LoggerAdapter) (*StreamingPublisher, error)
NewStreamingPublisher creates a new StreamingPublisher.
When using custom NATS hostname, you should pass it by options StreamingPublisherConfig.StanOptions:
// ... StanOptions: []stan.Option{ stan.NatsURL("nats://your-nats-hostname:4222"), } // ...
func (StreamingPublisher) Close ¶
func (p StreamingPublisher) Close() error
type StreamingPublisherConfig ¶
type StreamingPublisherConfig struct { // ClusterID is the NATS Streaming cluster ID. ClusterID string // ClientID is the NATS Streaming client ID to connect with. // ClientID can contain only alphanumeric and `-` or `_` characters. ClientID string // StanOptions are custom options for a connection. StanOptions []stan.Option // Marshaler is marshaler used to marshal messages to stan format. Marshaler Marshaler }
func (StreamingPublisherConfig) Validate ¶
func (c StreamingPublisherConfig) Validate() error
type StreamingSubscriber ¶
type StreamingSubscriber struct {
// contains filtered or unexported fields
}
func NewStreamingSubscriber ¶
func NewStreamingSubscriber(config StreamingSubscriberConfig, logger watermill.LoggerAdapter) (*StreamingSubscriber, error)
NewStreamingSubscriber creates a new StreamingSubscriber.
When using custom NATS hostname, you should pass it by options StreamingSubscriberConfig.StanOptions:
// ... StanOptions: []stan.Option{ stan.NatsURL("nats://your-nats-hostname:4222"), } // ...
func (*StreamingSubscriber) Close ¶
func (s *StreamingSubscriber) Close() error
func (*StreamingSubscriber) Subscribe ¶
func (s *StreamingSubscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error)
Subscribe subscribes messages from NATS Streaming.
Subscribe will spawn SubscribersCount goroutines making subscribe.
func (*StreamingSubscriber) SubscribeInitialize ¶ added in v0.3.0
func (s *StreamingSubscriber) SubscribeInitialize(topic string) (err error)
type StreamingSubscriberConfig ¶
type StreamingSubscriberConfig struct { // ClusterID is the NATS Streaming cluster ID. ClusterID string // ClientID is the NATS Streaming client ID to connect with. // ClientID can contain only alphanumeric and `-` or `_` characters. // // Using DurableName causes the NATS Streaming server to track // the last acknowledged message for that ClientID + DurableName. ClientID string // QueueGroup is the NATS Streaming queue group. // // All subscriptions with the same queue name (regardless of the connection they originate from) // will form a queue group. Each message will be delivered to only one subscriber per queue group, // using queuing semantics. // // It is recommended to set it with DurableName. // For non durable queue subscribers, when the last member leaves the group, // that group is removed. A durable queue group (DurableName) allows you to have all members leave // but still maintain state. When a member re-joins, it starts at the last position in that group. // // When QueueGroup is empty, subscribe without QueueGroup will be used. QueueGroup string // DurableName is the NATS streaming durable name. // // Subscriptions may also specify a “durable name” which will survive client restarts. // Durable subscriptions cause the server to track the last acknowledged message // sequence number for a client and durable name. When the client restarts/resubscribes, // and uses the same client ID and durable name, the server will resume delivery beginning // with the earliest unacknowledged message for this durable subscription. // // Doing this causes the NATS Streaming server to track // the last acknowledged message for that ClientID + DurableName. DurableName string // SubscribersCount determines wow much concurrent subscribers should be started. SubscribersCount int // CloseTimeout determines how long subscriber will wait for Ack/Nack on close. // When no Ack/Nack is received after CloseTimeout, subscriber will be closed. CloseTimeout time.Duration // How long subscriber should wait for Ack/Nack. When no Ack/Nack was received, message will be redelivered. // It is mapped to stan.AckWait option. AckWaitTimeout time.Duration // StanOptions are custom []stan.Option passed to the connection. // It is also used to provide connection parameters, for example: // stan.NatsURL("nats://localhost:4222") StanOptions []stan.Option // StanSubscriptionOptions are custom []stan.SubscriptionOption passed to subscription. StanSubscriptionOptions []stan.SubscriptionOption // Unmarshaler is an unmarshaler used to unmarshaling messages from NATS format to Watermill format. Unmarshaler Unmarshaler }
func (*StreamingSubscriberConfig) Validate ¶
func (c *StreamingSubscriberConfig) Validate() error
type Unmarshaler ¶
Click to show internal directories.
Click to hide internal directories.