natsbus

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 1, 2022 License: MIT Imports: 16 Imported by: 0

Documentation

Overview

Package natsbus provides an event.Bus implementation with support for both NATS Core and NATS Streaming as the backend.

Deprecated: Use github.com/modernice/goes/backend/nats instead.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrReceiveTimeout is returned when an Event is not received from a
	// subscriber Event channel after the configured ReceiveTimeout.
	ErrReceiveTimeout = errors.New("receive timed out")
)

Functions

This section is empty.

Types

type Bus

type Bus struct {
	// contains filtered or unexported fields
}

Bus is the NATS event.Bus implementation.

func New deprecated

func New(enc codec.Encoding, opts ...Option) *Bus

New returns an event bus that communicates over NATS or NATS Streaming.

New panics if enc is nil or initialization fails because of a malformed environment variable.

Deprecated: Use github.com/modernice/goes/backend/nats instead.

func (*Bus) Connect

func (bus *Bus) Connect(ctx context.Context) error

Connect establishes the connection to the NATS server. If Connect is not called manually, the Bus will connect automatically on the first call to Publish or Subscribe.

func (*Bus) Disconnect

func (bus *Bus) Disconnect(ctx context.Context) error

func (*Bus) Publish

func (bus *Bus) Publish(ctx context.Context, events ...event.Event) error

Publish implements event.Bus.

func (*Bus) Subscribe

func (bus *Bus) Subscribe(ctx context.Context, names ...string) (<-chan event.Event, <-chan error, error)

Subscribe implements event.Bus.

Callers must ensure to range over the error channel if the EatErrors Option is not used; otherwise the subscription will block forever and no further Events will be received when the first async error happens.

type Driver

type Driver interface {
	// contains filtered or unexported methods
}

A Driver connects to a NATS cluster. Available Drivers:

  • Core() returns the NATS Core Driver (default)
  • Streaming() returns the NATS Streaming Driver

func Core

func Core(opts ...nats.Option) Driver

Core returns the NATS Core Driver (at-most-once delivery).

func Streaming

func Streaming(clusterID, clientID string, opts ...stan.Option) Driver

Streaming returns the NATS Streaming Driver (at-least-once delivery).

type Option

type Option func(*Bus)

Option is a Bus option.

func Conn

func Conn(conn *nats.Conn) Option

Conn returns an Option that provides the underlying *nats.Conn for the EventBus. When the Conn Option is used, the Use Option has no effect.

func Durable

func Durable() Option

Durable returns an Option that makes the NATS subscriptions durable.

If the queue group is not empty, the durable name is built by concatenating the subject and queue group with an underscore:

fmt.Sprintf("%s_%s", subject, queue)

If the queue group is an empty string, the durable name is set to the subject.

Can also be set with the "NATS_DURABLE_NAME" environment variable:

`NATS_DURABLE_NAME={{ .Subject }}_{{ .Queue }}`

Use DurableFunc instead to control how the durable name is built.

func DurableFunc

func DurableFunc(fn func(subject, queue string) string) Option

DurableFunc returns an Option that sets fn as the function to build the DurableName for the NATS Streaming subscriptions. When fn return an empty string, the subscription will not be made durable.

DurableFunc has no effect when using the NATS Core Driver because NATS Core doesn't support durable subscriptions.

Can also be set with the "NATS_DURABLE_NAME" environment variable:

`NATS_DURABLE_NAME={{ .Subject }}_{{ .Queue }}`

Read more about durable subscriptions: https://docs.nats.io/developing-with-nats-streaming/durables

func EatErrors

func EatErrors() Option

EatErrors returns an Option that makes the Bus start a goroutine to range over and discard any errors from the returned error channel, so that they don't have to be received manually if there's no interest in handling those errors.

func QueueGroupByEvent

func QueueGroupByEvent() Option

QueueGroupByEvent returns an Option that sets the NATS queue group for subscriptions to the name of the handled Event. This can be used to load-balance Events between subscribers of the same Event name.

Can also be set with the "NATS_QUEUE_GROUP_BY_EVENT" environment variable.

Read more about queue groups: https://docs.nats.io/nats-concepts/queue

func QueueGroupByFunc

func QueueGroupByFunc(fn func(eventName string) string) Option

QueueGroupByFunc returns an Option that sets the NATS queue group for subscriptions by calling fn with the name of the subscribed Event. This can be used to load-balance Events between subscribers.

Read more about queue groups: https://docs.nats.io/nats-concepts/queue

func ReceiveTimeout

func ReceiveTimeout(d time.Duration) Option

ReceiveTimeout returns an Option that limits the duration the EventBus tries to send Events into the channel returned by bus.Subscribe. When d is exceeded the Event will be dropped. The default is a duration of 0 and means no timeout.

Can also be set with the "NATS_RECEIVE_TIMEOUT" environment variable in a format understood by time.ParseDuration. If the environment value is not parseable by time.ParseDuration, no timeout will be used.

func StreamingConn

func StreamingConn(conn stan.Conn) Option

StreamingConn returns an Option that provides the underlying stan.Conn for the EventBus. When the StreamingConn Option is used, the Use Option has no effect.

func SubjectFunc

func SubjectFunc(fn func(eventName string) string) Option

SubjectFunc returns an Option that sets the NATS subject for subscriptions and outgoing Events by calling fn with the name of the handled Event.

func SubjectPrefix

func SubjectPrefix(prefix string) Option

SubjectPrefix returns an Option that sets the NATS subject for subscriptions and outgoing Events by prepending prefix to the name of the handled Event.

Can also be set with the "NATS_SUBJECT_PREFIX" environment variable.

func URL

func URL(url string) Option

URL returns an Option that sets the connection URL to the NATS server. If no URL is specified, the environment variable "NATS_URL" will be used as the connection URL.

Can also be set with the "NATS_URL" environment variable.

func Use

func Use(d Driver) Option

Use returns an Option that specifies which Driver to use to communicate with NATS. Defaults to Core().

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL