nats

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 1, 2022 License: MIT Imports: 16 Imported by: 1

Documentation

Overview

Package nats provides an event bus that uses NATS to publish and subscribe to events over a network with support for both NATS Core and NATS JetStream.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrPullTimeout is raised by an EventBus when a subscriber doesn't pull an
	// event from the event channel within the specified PullTimeout. In such
	// case, the event is dropped to avoid blocking the application because of a
	// slow consumer.
	ErrPullTimeout = errors.New("pull timed out. slow consumer?")
)

Functions

This section is empty.

Types

type Driver

type Driver interface {
	// contains filtered or unexported methods
}

A Driver provides the specific implementation for interacting with either NATS Core or NATS JetStream. Use the Core or JetStream functions to create a Driver.

func Core

func Core() Driver

Core returns the NATS Core Driver (which is enabled by default):

bus := NewEventBus(enc, Use(Core())) // or
bus := NewEventBus(enc)

func JetStream

func JetStream() Driver

JetStream returns the NATS JetStream Driver:

bus := NewEventBus(enc, Use(JetStream()))

Consumer Subscriptions

Consumer subscriptions are given the following options by default:

  • DeliverPolicy: DeliverNew
  • AckPolicy: AckAll

You can add custom options using the SubOpts option:

bus := NewEventBus(enc, Use(JetStream()), SubOpts(nats.DeliverAll(), nats.AckNone()))

type EventBus

type EventBus struct {
	// contains filtered or unexported fields
}

EventBus is an event bus that uses NATS to publish and subscribe to events.

Drivers

The event bus supports both NATS Core and NATS JetStream. By default, the Core driver is used, but you can create and specify the JetStream driver with the Use option:

var enc codec.Encoding
bus := nats.NewEventBus(enc, nats.Use(nats.JetStream()))

func NewEventBus

func NewEventBus(enc codec.Encoding, opts ...EventBusOption) *EventBus

NewEventBus returns a NATS event bus.

The provided Encoder is used to encode and decode event data when publishing and subscribing to events.

If no other specified, the returned event bus will use the NATS Core Driver. To use the NATS JetStream Driver instead, explicitly set the Driver:

NewEventBus(enc, Use(JetStream()))

func (*EventBus) Connect

func (bus *EventBus) Connect(ctx context.Context) error

Connects connects to NATS.

It is not required to call Connect to use the EventBus because Connect is automatically called by Subscribe and Publish.

func (*EventBus) Disconnect

func (bus *EventBus) Disconnect(ctx context.Context) error

Disconnect closes the underlying *nats.Conn. Should ctx be canceled before the connection is closed, ctx.Err() is returned.

func (*EventBus) Publish

func (bus *EventBus) Publish(ctx context.Context, events ...event.Event) error

Publish publishes events.

func (*EventBus) Subscribe

func (bus *EventBus) Subscribe(ctx context.Context, names ...string) (<-chan event.Event, <-chan error, error)

Subscribe subscribes to events.

type EventBusOption

type EventBusOption func(*EventBus)

EventBusOption is an option for an EventBus.

func Conn

func Conn(conn *nats.Conn) EventBusOption

Conn returns an option that provides the underlying *nats.Conn to the event bus. When providing a connection, the event bus does not try to connect to NATS but uses the provided connection instead.

func Durable

func Durable(name string) EventBusOption

Durable returns an option that specifies the durable name for new subscriptions when using the JetStream Driver.

Use the DurableFunc option if you need to know the subject or queue group to build the durable name.

This option is valid only for the JetStream Driver.

func DurableFunc

func DurableFunc(fn func(subject, queue string) string) EventBusOption

DurableFunc returns an option that specifies the durable name for new subscriptions when using the JetStream Driver. When subscribing to an event, the provided function is called with the event name and queue group (see QueueGroupByXXX and WithLoadBalancer options) and the returned string is used as the durable name for the subscription. If the durable name is an empty string, the subscription is not made durable.

Can also be set with the `NATS_DURABLE_NAME` environment variable. The following example generates the durable names by concatenating the subject together with the queue group using an underscore (the environment variable is executed using text/template, so you have access to the subject and queue group):

`NATS_DURABLE_NAME={{ .Subject }}_{{ .Queue }}`

This option is valid only for the JetStream Driver.

Use Case

The following example uses durable subscriptions while load-balancing between instances of a replicated (micro-)service:

serviceName := "foo-service"
bus := NewEventBus(enc,
	WithLoadBalancer(serviceName),
	Durable(serviceName),
)

Read more about durable subscriptions: https://docs.nats.io/nats-concepts/jetstream/consumers#durable-name

func EatErrors

func EatErrors() EventBusOption

EatErrors returns an option that discards any asynchronous errors of subscriptions. When subscribing to an event, you can safely ignore the returned error channel:

var bus *EventBus
events, _, err := bus.Subscribe(context.TODO(), ...)

func PullTimeout

func PullTimeout(d time.Duration) EventBusOption

PullTimeout returns an option that limits the Duration an EventBus tries to push an event into a subscribed event channel. When the pull timeout is exceeded, the event gets dropped and a warning is logged.

Default is no timeout.

func QueueGroup

func QueueGroup(queue string) EventBusOption

QueueGroupBy returns an option that specifies the NATS queue group for new subscriptions.

Can also be set with the `NATS_QUEUE_GROUP=foo` environment variable.

Read more about queue groups: https://docs.nats.io/nats-concepts/core-nats/queue

func QueueGroupByEvent

func QueueGroupByEvent() EventBusOption

QueueGroupByFunc returns an option that specifies the NATS queue group for new subscriptions. When subscribing to an event, the event name is used as the name of the queue group.

Use this option if you want events to be received only by a single subscriber.

Can also be set with the `NATS_QUEUE_GROUP_BY_EVENT=1` environment variable.

Read more about queue groups: https://docs.nats.io/nats-concepts/core-nats/queue

func QueueGroupByFunc

func QueueGroupByFunc(fn func(eventName string) string) EventBusOption

QueueGroupByFunc returns an option that specifies the NATS queue group for new subscriptions. When subscribing to an event, fn(eventName) is called to determine the queue group name for that subscription. If the returned queue group is an empty string, the queue group feature will not be used for the subscription.

Use Case

Queue groups can be used to load-balance between multiple subscribers of the same event. When multiple subscribers are subscribed to an event and use the same queue group, only one of the subscribers will receive the event. To load-balance between instances of a replicated (micro-)service, use a shared name (service name) that is the same between the replicated services and use that id as the queue group:

serviceName := "foo-service"
bus := NewEventBus(enc,
	QueueGroupByFunc(func(eventName) string {
		return serviceName
	}),
)

The example above can also be written as:

serviceName := "foo-service"
bus := NewEventBus(enc, WithLoadBalancer(serviceName))

Queue groups are disabled by default.

Read more about queue groups: https://docs.nats.io/nats-concepts/core-nats/queue

func StreamNameFunc

func StreamNameFunc(fn func(subject, queue string) string) EventBusOption

StreamNameFunc returns an option that specifies the stream name for new subscriptions when using the JetStream Driver. When subscribing to an event, the provided fn is called with the generated subject and queue group to determine the JetStream stream name for the subscription.

If the StreamNameFunc option is not used, the provided DurableXXX option is used to generate the stream name instead. If the generated durable name is empty, the subscription falls back to using the default stream name function, which is the default durable name function.

This option is valid only for the JetStream Driver.

Read more about streams: https://docs.nats.io/nats-concepts/jetstream/streams

func SubOpts

func SubOpts(opts ...nats.SubOpt) EventBusOption

SubOpts returns an option that adds custom nats.SubOpts when creating a JetStream subscription.

This option is valid only for the JetStream Driver.

func SubjectFunc

func SubjectFunc(fn func(eventName string) string) EventBusOption

SubjectFunc returns an option that specifies how the NATS subjects for event names are generated.

By default, subjects are the event names with "." replaced with "_".

func SubjectPrefix

func SubjectPrefix(prefix string) EventBusOption

SubjectFunc returns an option that specifies how the NATS subjects for event names are generated.

Can also be set with the `NATS_SUBJECT_PREFIX` environment variable.

func URL

func URL(url string) EventBusOption

URL returns an option that sets the connection URL to the NATS server. If no URL is specified, the environment variable `NATS_URL` will be used as the connection URL. If that is also not set, the default NATS URL (nats.DefaultURL) is used instead.

func Use

func Use(d Driver) EventBusOption

Use returns the option to specify the Driver to use to communicate with NATS. By default, the Core Driver is used.

bus := NewEventBus(enc, Use(JetStream()))

func WithLoadBalancer

func WithLoadBalancer(serviceName string) EventBusOption

WithLoadBalancer returns a QueueGroupByFunc option that load-balances events between instances of a replicted (micro-)service. The provided serviceName is used as the queue group name. Any "." in serviceName are replaced with "_".

Can also be set with the `NATS_LOAD_BALANCER=foo` environment variable.

Caution

Providing a load-balanced event bus as the underlying bus to a command bus should be avoided and providing it to a projection schedule should be done with thought and caution. Create another instance of an event bus without this option and pass that to cmdbus.New() when creating the command bus. When you create a projection schedule, you have to think about what makes sense in the context of your projection, because a load-balanced event bus will cause only a single instance of a replicated service to trigger a projection. Also, each event may be received by a different instance which can make the projection jobs fragmented and less efficient. A common example is some kind of lookup table that is projected from events and that your instances keep "live" in-memory. Each instance needs the lookup table to work, so it wouldn't make sense to load-balance the projection. In most cases where projections are not kept in memory but instead fetched from a database, updated and then saved back to the database, a load-balanced projection schedule is exactly what you want, but then again, context matters.

Read more about queue groups: https://docs.nats.io/nats-concepts/core-nats/queue

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL