Documentation ¶
Overview ¶
Package natspubsub provides a pubsub implementation for NATS.io. Use OpenTopic to construct a *pubsub.Topic, and/or OpenSubscription to construct a *pubsub.Subscription. This package uses gob to encode and decode driver.Message to []byte.
URLs ¶
For pubsub.OpenTopic and pubsub.OpenSubscription, natspubsub registers for the scheme "nats". The default URL opener will connect to a default server based on the environment variable "NATS_SERVER_URL". To customize the URL opener, or for more details on the URL format, see URLOpener. See https://sraphs.github.io/gdk/concepts/urls/ for background information.
Message Delivery Semantics ¶
NATS supports at-most-semantics; applications need not call Message.Ack, and must not call Message.Nack. See https://godoc.org/github.com/sraphs/gdk/pubsub#hdr-At_most_once_and_At_least_once_Delivery for more background.
As ¶
natspubsub exposes the following types for As:
- Topic: *nats.Conn
- Subscription: *nats.Subscription
- Message.BeforeSend: None.
- Message.AfterSend: None.
- Message: *nats.Msg
Example (OpenQueueSubscriptionFromURL) ¶
package main import ( "context" "log" "github.com/sraphs/gdk/pubsub" ) func main() { // PRAGMA: This example is used on github.com/sraphs/gdk; PRAGMA comments adjust how it is shown and can be ignored. // PRAGMA: On github.com/sraphs/gdk, add a blank import: _ "github.com/sraphs/gdk/pubsub/natspubsub" // PRAGMA: On github.com/sraphs/gdk, hide lines until the next blank line. ctx := context.Background() // pubsub.OpenSubscription creates a *pubsub.Subscription from a URL. // This URL will Dial the NATS server at the URL in the environment variable // NATS_SERVER_URL and receive messages with subject "example.my-subject" // This URL will be parsed and the queue attribute will be used as the Queue parameter when creating the NATS Subscription. subscription, err := pubsub.OpenSubscription(ctx, "nats://example.my-subject?queue=myqueue") if err != nil { log.Fatal(err) } defer subscription.Shutdown(ctx) }
Output:
Example (OpenSubscriptionFromURL) ¶
package main import ( "context" "log" "github.com/sraphs/gdk/pubsub" ) func main() { // PRAGMA: This example is used on github.com/sraphs/gdk; PRAGMA comments adjust how it is shown and can be ignored. // PRAGMA: On github.com/sraphs/gdk, add a blank import: _ "github.com/sraphs/gdk/pubsub/natspubsub" // PRAGMA: On github.com/sraphs/gdk, hide lines until the next blank line. ctx := context.Background() // pubsub.OpenSubscription creates a *pubsub.Subscription from a URL. // This URL will Dial the NATS server at the URL in the environment variable // NATS_SERVER_URL and receive messages with subject "example.my-subject". subscription, err := pubsub.OpenSubscription(ctx, "nats://example.my-subject") if err != nil { log.Fatal(err) } defer subscription.Shutdown(ctx) }
Output:
Example (OpenTopicFromURL) ¶
package main import ( "context" "log" "github.com/sraphs/gdk/pubsub" ) func main() { // PRAGMA: This example is used on github.com/sraphs/gdk; PRAGMA comments adjust how it is shown and can be ignored. // PRAGMA: On github.com/sraphs/gdk, add a blank import: _ "github.com/sraphs/gdk/pubsub/natspubsub" // PRAGMA: On github.com/sraphs/gdk, hide lines until the next blank line. ctx := context.Background() // pubsub.OpenTopic creates a *pubsub.Topic from a URL. // This URL will Dial the NATS server at the URL in the environment variable // NATS_SERVER_URL and send messages with subject "example.my-subject". topic, err := pubsub.OpenTopic(ctx, "nats://example.my-subject") if err != nil { log.Fatal(err) } defer topic.Shutdown(ctx) }
Output:
Index ¶
Examples ¶
Constants ¶
const Scheme = "nats"
Scheme is the URL scheme natspubsub registers its URLOpeners under on pubsub.DefaultMux.
Variables ¶
This section is empty.
Functions ¶
func OpenSubscription ¶
func OpenSubscription(nc *nats.Conn, subject string, opts *SubscriptionOptions) (*pubsub.Subscription, error)
OpenSubscription returns a *pubsub.Subscription representing a NATS subscription or NATS queue subscription. The subject is the NATS Subject to subscribe to; for more info, see https://nats.io/documentation/writing_applications/subjects.
Example ¶
// PRAGMA: This example is used on github.com/sraphs/gdk; PRAGMA comments adjust how it is shown and can be ignored. // PRAGMA: On github.com/sraphs/gdk, hide lines until the next blank line. ctx := context.Background() natsConn, err := nats.Connect("nats://nats.example.com") if err != nil { log.Fatal(err) } defer natsConn.Close() subscription, err := natspubsub.OpenSubscription( natsConn, "example.my-subject", nil) if err != nil { log.Fatal(err) } defer subscription.Shutdown(ctx)
Output:
func OpenTopic ¶
func OpenTopic(nc *nats.Conn, subject string, _ *TopicOptions) (*pubsub.Topic, error)
OpenTopic returns a *pubsub.Topic for use with NATS. The subject is the NATS Subject; for more info, see https://nats.io/documentation/writing_applications/subjects.
Example ¶
// PRAGMA: This example is used on github.com/sraphs/gdk; PRAGMA comments adjust how it is shown and can be ignored. // PRAGMA: On github.com/sraphs/gdk, hide lines until the next blank line. ctx := context.Background() natsConn, err := nats.Connect("nats://nats.example.com") if err != nil { log.Fatal(err) } defer natsConn.Close() topic, err := natspubsub.OpenTopic(natsConn, "example.my-subject", nil) if err != nil { log.Fatal(err) } defer topic.Shutdown(ctx)
Output:
Types ¶
type SubscriptionOptions ¶
type SubscriptionOptions struct { // Queue sets the subscription as a QueueSubcription. // For more info, see https://docs.nats.io/nats-concepts/queue. Queue string }
SubscriptionOptions sets options for constructing a *pubsub.Subscription backed by NATS.
type TopicOptions ¶
type TopicOptions struct{}
TopicOptions sets options for constructing a *pubsub.Topic backed by NATS.
type URLOpener ¶
type URLOpener struct { // Connection to use for communication with the server. Connection *nats.Conn // TopicOptions specifies the options to pass to OpenTopic. TopicOptions TopicOptions // SubscriptionOptions specifies the options to pass to OpenSubscription. SubscriptionOptions SubscriptionOptions }
URLOpener opens NATS URLs like "nats://my-subject".
The URL host+path is used as the subject.
No query parameters are supported.
func (*URLOpener) OpenSubscriptionURL ¶
func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error)
OpenSubscriptionURL opens a pubsub.Subscription based on u.