natspubsub

package
v0.43.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 15, 2019 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Overview

Package natspubsub provides a pubsub implementation for NATS.io. Use OpenTopic to construct a *pubsub.Topic, and/or OpenSubscription to construct a *pubsub.Subscription. This package uses gob to encode and decode driver.Message to []byte.

URLs

For pubsub.OpenTopic and pubsub.OpenSubscription, natspubsub registers for the scheme "nats". The default URL opener will connect to a default server based on the environment variable "NATS_SERVER_URL". To customize the URL opener, or for more details on the URL format, see URLOpener. See https://github.com/eliben/gocdkx/concepts/urls/ for background information.

Message Delivery Semantics

NATS supports at-most-semantics; applications need not call Message.Ack, and must not call Message.Nack. See https://godoc.org/github.com/eliben/gocdkx/pubsub#hdr-At_most_once_and_At_least_once_Delivery for more background.

As

natspubsub exposes the following types for As:

  • Topic: *nats.Conn
  • Subscription: *nats.Subscription
  • Message.BeforeSend: None.
  • Message: *nats.Msg
Example (OpenSubscription)
package main

import (
	"context"
	"log"

	"github.com/eliben/gocdkx/pubsub"
)

func main() {
	// This example is used in https://github.com/eliben/gocdkx/howto/pubsub/subscribe/#nats

	// import _ "github.com/eliben/gocdkx/pubsub/natspubsub"

	// Variables set up elsewhere:
	ctx := context.Background()

	// OpenSubscription creates a *pubsub.Subscription from a URL.
	// This URL will Dial the NATS server at the URL in the environment variable
	// NATS_SERVER_URL and receive messages with subject "example.mysubject".
	subscription, err := pubsub.OpenSubscription(ctx,
		"nats://example.mysubject?ackfunc=panic")
	if err != nil {
		log.Fatal(err)
	}
	defer subscription.Shutdown(ctx)
}
Output:

Example (OpenTopic)
package main

import (
	"context"
	"log"

	"github.com/eliben/gocdkx/pubsub"
)

func main() {
	// This example is used in https://github.com/eliben/gocdkx/howto/pubsub/publish/#nats

	// import _ "github.com/eliben/gocdkx/pubsub/natspubsub"

	// Variables set up elsewhere:
	ctx := context.Background()

	// OpenTopic creates a *pubsub.Topic from a URL.
	// This URL will Dial the NATS server at the URL in the environment variable
	// NATS_SERVER_URL and send messages with subject "example.mysubject".
	topic, err := pubsub.OpenTopic(ctx, "nats://example.mysubject")
	if err != nil {
		log.Fatal(err)
	}
	defer topic.Shutdown(ctx)
}
Output:

Index

Examples

Constants

View Source
const AckWarning = "" /* 188-byte string literal not displayed */

AckWarning is a message that may be used in ackFuncs.

View Source
const Scheme = "nats"

Scheme is the URL scheme natspubsub registers its URLOpeners under on pubsub.DefaultMux.

Variables

This section is empty.

Functions

func OpenSubscription

func OpenSubscription(nc *nats.Conn, subject string, ackFunc func(), _ *SubscriptionOptions) (*pubsub.Subscription, error)

OpenSubscription returns a *pubsub.Subscription representing a NATS subscription. The subject is the NATS Subject to subscribe to; for more info, see https://nats.io/documentation/writing_applications/subjects.

ackFunc will be called when the application calls pubsub.Topic.Ack on a received message; Ack is a meaningless no-op for NATS. You can provide an empty function to leave it a no-op, or panic/log a warning if you don't expect Ack to be called. See https://godoc.org/github.com/eliben/gocdkx/pubsub#hdr-At_most_once_and_At_least_once_Delivery for more background.

TODO(dlc) - Options for queue groups?

Example
// This example is used in https://github.com/eliben/gocdkx/howto/pubsub/subscribe/#nats-ctor

// Variables set up elsewhere:
ctx := context.Background()

natsConn, err := nats.Connect("nats://nats.example.com")
if err != nil {
	log.Fatal(err)
}
defer natsConn.Close()

subscription, err := natspubsub.OpenSubscription(
	natsConn,
	"example.mysubject",
	func() { panic("nats does not have ack") },
	nil)
if err != nil {
	log.Fatal(err)
}
defer subscription.Shutdown(ctx)
Output:

func OpenTopic

func OpenTopic(nc *nats.Conn, subject string, _ *TopicOptions) (*pubsub.Topic, error)

OpenTopic returns a *pubsub.Topic for use with NATS. The subject is the NATS Subject; for more info, see https://nats.io/documentation/writing_applications/subjects.

Example
// This example is used in https://github.com/eliben/gocdkx/howto/pubsub/publish/#nats-ctor

// Variables set up elsewhere:
ctx := context.Background()

natsConn, err := nats.Connect("nats://nats.example.com")
if err != nil {
	log.Fatal(err)
}
defer natsConn.Close()

topic, err := natspubsub.OpenTopic(natsConn, "example.mysubject", nil)
if err != nil {
	log.Fatal(err)
}
defer topic.Shutdown(ctx)
Output:

Types

type SubscriptionOptions

type SubscriptionOptions struct{}

SubscriptionOptions sets options for constructing a *pubsub.Subscription backed by NATS.

type TopicOptions

type TopicOptions struct{}

TopicOptions sets options for constructing a *pubsub.Topic backed by NATS.

type URLOpener

type URLOpener struct {
	// Connection to use for communication with the server.
	Connection *nats.Conn
	// TopicOptions specifies the options to pass to OpenTopic.
	TopicOptions TopicOptions
	// SubscriptionOptions specifies the options to pass to OpenSubscription.
	SubscriptionOptions SubscriptionOptions
}

URLOpener opens NATS URLs like "nats://mysubject".

The URL host+path is used as the subject.

The following query parameters are supported:

No query parameters are supported.

func (*URLOpener) OpenSubscriptionURL

func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error)

OpenSubscriptionURL opens a pubsub.Subscription based on u.

func (*URLOpener) OpenTopicURL

func (o *URLOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic, error)

OpenTopicURL opens a pubsub.Topic based on u.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL