natspubsub

package module
v0.40.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 10, 2024 License: Apache-2.0 Imports: 18 Imported by: 33

Documentation

Overview

Package natspubsub provides a pubsub implementation for NATS.io. Use OpenTopic to construct a *pubsub.Topic, and/or OpenSubscription to construct a *pubsub.Subscription. This package uses gob to encode and decode driver.Message to []byte.

URLs

For pubsub.OpenTopic and pubsub.OpenSubscription, natspubsub registers for the scheme "nats". The default URL opener will connect to a default server based on the environment variable "NATS_SERVER_URL".

For servers that support it (NATS Server 2.2.0 or later), messages can be encoded using native NATS message headers, and native message content. This provides full support for non-Go clients. Versions prior to 2.2.0 uses gob.Encoder to encode the message headers and content, which limits the subscribers only to Go clients. To use this feature, set the query parameter "natsv2" in the URL. If no value is provided, it assumes the value is true. Otherwise, the value needs to be parsable as a boolean. For example:

  • nats://mysubject?natsv2
  • nats://mysubject?natsv2=true

This feature can also be enabled by setting the UseV2 field in the URLOpener. If the server does not support this feature, any attempt to use it will result in an error. Using native NATS message headers and content is more efficient than using gob.Encoder, and allows non-Go clients to subscribe to the topic and receive messages. It is recommended to use this feature if the server supports it.

To customize the URL opener, or for more details on the URL format, see URLOpener. See https://gocloud.dev/concepts/urls/ for background information.

Message Delivery Semantics

NATS supports at-most-semantics; applications need not call Message.Ack, and must not call Message.Nack. See https://godoc.org/gocloud.dev/pubsub#hdr-At_most_once_and_At_least_once_Delivery for more background.

As

natspubsub exposes the following types for As:

  • Topic: *nats.Conn
  • Subscription: *nats.Subscription
  • Message.BeforeSend: None for v1, *nats.Msg for v2.
  • Message.AfterSend: None.
  • Message: *nats.Msg
Example (OpenQueueSubscriptionFromURL)
package main

import (
	"context"
	"log"

	"gocloud.dev/pubsub"
)

func main() {
	// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/natspubsub"
	// PRAGMA: On gocloud.dev, hide lines until the next blank line.
	ctx := context.Background()

	// pubsub.OpenSubscription creates a *pubsub.Subscription from a URL.
	// This URL will Dial the NATS server at the URL in the environment variable
	// NATS_SERVER_URL and receive messages with subject "example.mysubject"
	// This URL will be parsed and the queue attribute will be used as the Queue parameter when creating the NATS Subscription.
	subscription, err := pubsub.OpenSubscription(ctx, "nats://example.mysubject?queue=myqueue")
	if err != nil {
		log.Fatal(err)
	}
	defer subscription.Shutdown(ctx)
}
Output:

Example (OpenSubscriptionFromURL)
package main

import (
	"context"
	"log"

	"gocloud.dev/pubsub"
)

func main() {
	// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/natspubsub"
	// PRAGMA: On gocloud.dev, hide lines until the next blank line.
	ctx := context.Background()

	// pubsub.OpenSubscription creates a *pubsub.Subscription from a URL.
	// This URL will Dial the NATS server at the URL in the environment variable
	// NATS_SERVER_URL and receive messages with subject "example.mysubject".
	subscription, err := pubsub.OpenSubscription(ctx, "nats://example.mysubject")
	if err != nil {
		log.Fatal(err)
	}
	defer subscription.Shutdown(ctx)
}
Output:

Example (OpenSubscriptionV2FromURL)
package main

import (
	"context"
	"log"

	"gocloud.dev/pubsub"
)

func main() {
	// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/natspubsub"
	// PRAGMA: On gocloud.dev, hide lines until the next blank line.
	ctx := context.Background()

	// pubsub.OpenSubscription creates a *pubsub.Subscription from a URL.
	// This URL will Dial the NATS server at the URL in the environment variable
	// NATS_SERVER_URL and receive messages with subject "example.mysubject".
	// This URL will be parsed and the natsv2 attribute will be used to
	// use NATS v2.2.0+ native message headers as the message metadata.
	subscription, err := pubsub.OpenSubscription(ctx, "nats://example.mysubject?natsv2")
	if err != nil {
		log.Fatal(err)
	}
	defer subscription.Shutdown(ctx)
}
Output:

Example (OpenTopicFromURL)
package main

import (
	"context"
	"log"

	"gocloud.dev/pubsub"
)

func main() {
	// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/natspubsub"
	// PRAGMA: On gocloud.dev, hide lines until the next blank line.
	ctx := context.Background()

	// pubsub.OpenTopic creates a *pubsub.Topic from a URL.
	// This URL will Dial the NATS server at the URL in the environment variable
	// NATS_SERVER_URL and send messages with subject "example.mysubject".
	topic, err := pubsub.OpenTopic(ctx, "nats://example.mysubject")
	if err != nil {
		log.Fatal(err)
	}
	defer topic.Shutdown(ctx)
}
Output:

Example (OpenTopicV2FromURL)
package main

import (
	"context"
	"log"

	"gocloud.dev/pubsub"
)

func main() {
	// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
	// PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/natspubsub"
	// PRAGMA: On gocloud.dev, hide lines until the next blank line.
	ctx := context.Background()

	// pubsub.OpenTopic creates a *pubsub.Topic from a URL.
	// This URL will Dial the NATS server at the URL in the environment variable
	// NATS_SERVER_URL and send messages with subject "example.mysubject".
	// This URL will be parsed and the natsv2 attribute will be used to
	// use NATS v2.2.0+ native message headers as the message metadata.
	topic, err := pubsub.OpenTopic(ctx, "nats://example.mysubject?natsv2")
	if err != nil {
		log.Fatal(err)
	}
	defer topic.Shutdown(ctx)
}
Output:

Index

Examples

Constants

View Source
const Scheme = "nats"

Scheme is the URL scheme natspubsub registers its URLOpeners under on pubsub.DefaultMux.

Variables

This section is empty.

Functions

func OpenSubscription

func OpenSubscription(nc *nats.Conn, subject string, opts *SubscriptionOptions) (*pubsub.Subscription, error)

OpenSubscription returns a *pubsub.Subscription representing a NATS subscription or NATS queue subscription. The subject is the NATS Subject to subscribe to; for more info, see https://nats.io/documentation/writing_applications/subjects.

Example
// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
// PRAGMA: On gocloud.dev, hide lines until the next blank line.
ctx := context.Background()

natsConn, err := nats.Connect("nats://nats.example.com")
if err != nil {
	log.Fatal(err)
}
defer natsConn.Close()

subscription, err := natspubsub.OpenSubscription(
	natsConn,
	"example.mysubject",
	nil)
if err != nil {
	log.Fatal(err)
}
defer subscription.Shutdown(ctx)
Output:

func OpenSubscriptionV2 added in v0.33.0

func OpenSubscriptionV2(nc *nats.Conn, subject string, opts *SubscriptionOptions) (*pubsub.Subscription, error)

OpenSubscriptionV2 returns a *pubsub.Subscription representing a NATS subscription or NATS queue subscription for use with NATS at least version 2.2.0. This changes the encoding of the message as, starting with version 2.2.0, NATS supports message headers. In previous versions the message headers were encoded along with the message content using gob.Encoder, which limits the subscribers only to Go clients. This implementation uses native NATS message headers, and native message content, which provides full support for non-Go clients.

Example
// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
// PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/natspubsub"
// PRAGMA: On gocloud.dev, hide lines until the next blank line.
ctx := context.Background()
natsConn, err := nats.Connect("nats://nats.example.com")
if err != nil {
	log.Fatal(err)
}
defer natsConn.Close()

subscription, err := natspubsub.OpenSubscriptionV2(
	natsConn,
	"example.mysubject",
	nil)
if err != nil {
	log.Fatal(err)
}
defer subscription.Shutdown(ctx)
Output:

func OpenTopic

func OpenTopic(nc *nats.Conn, subject string, _ *TopicOptions) (*pubsub.Topic, error)

OpenTopic returns a *pubsub.Topic for use with NATS. The subject is the NATS Subject; for more info, see https://nats.io/documentation/writing_applications/subjects.

Example
// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
// PRAGMA: On gocloud.dev, hide lines until the next blank line.
ctx := context.Background()

natsConn, err := nats.Connect("nats://nats.example.com")
if err != nil {
	log.Fatal(err)
}
defer natsConn.Close()

topic, err := natspubsub.OpenTopic(natsConn, "example.mysubject", nil)
if err != nil {
	log.Fatal(err)
}
defer topic.Shutdown(ctx)
Output:

func OpenTopicV2 added in v0.33.0

func OpenTopicV2(nc *nats.Conn, subject string, _ *TopicOptions) (*pubsub.Topic, error)

OpenTopicV2 returns a *pubsub.Topic for use with NATS at least version 2.2.0. This changes the encoding of the message as, starting with version 2.2.0, NATS supports message headers. In previous versions the message headers were encoded along with the message content using gob.Encoder, which limits the subscribers only to Go clients. This implementation uses native NATS message headers, and native message content, which provides full support for non-Go clients.

Example
// PRAGMA: This example is used on gocloud.dev; PRAGMA comments adjust how it is shown and can be ignored.
// PRAGMA: On gocloud.dev, add a blank import: _ "gocloud.dev/pubsub/natspubsub"
// PRAGMA: On gocloud.dev, hide lines until the next blank line.
ctx := context.Background()

natsConn, err := nats.Connect("nats://nats.example.com")
if err != nil {
	log.Fatal(err)
}
defer natsConn.Close()

topic, err := natspubsub.OpenTopicV2(natsConn, "example.mysubject", nil)
if err != nil {
	log.Fatal(err)
}
defer topic.Shutdown(ctx)
Output:

Types

type SubscriptionOptions

type SubscriptionOptions struct {
	// Queue sets the subscription as a QueueSubcription.
	// For more info, see https://docs.nats.io/nats-concepts/queue.
	Queue string
}

SubscriptionOptions sets options for constructing a *pubsub.Subscription backed by NATS.

type TopicOptions

type TopicOptions struct{}

TopicOptions sets options for constructing a *pubsub.Topic backed by NATS.

type URLOpener

type URLOpener struct {
	// Connection to use for communication with the server.
	Connection *nats.Conn
	// TopicOptions specifies the options to pass to OpenTopic.
	TopicOptions TopicOptions
	// SubscriptionOptions specifies the options to pass to OpenSubscription.
	SubscriptionOptions SubscriptionOptions
	// UseV2 indicates whether the NATS Server is at least version 2.2.0.
	UseV2 bool
}

URLOpener opens NATS URLs like "nats://mysubject?natsv2=true".

The URL host+path is used as the subject.

No query parameters are supported.

func (*URLOpener) OpenSubscriptionURL

func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error)

OpenSubscriptionURL opens a pubsub.Subscription based on u.

func (*URLOpener) OpenTopicURL

func (o *URLOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic, error)

OpenTopicURL opens a pubsub.Topic based on u.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL