Documentation ¶
Overview ¶
Package nats provides an event bus that uses NATS to publish and subscribe to events over a network with support for both NATS Core and NATS JetStream.
Index ¶
- Variables
- type Driver
- type EventBus
- func (bus *EventBus) Connect(ctx context.Context) error
- func (bus *EventBus) Connection() *nats.Conn
- func (bus *EventBus) Disconnect(ctx context.Context) error
- func (bus *EventBus) Publish(ctx context.Context, events ...event.Event) error
- func (bus *EventBus) Subscribe(ctx context.Context, names ...string) (<-chan event.Event, <-chan error, error)
- type EventBusOption
- func Conn(conn *nats.Conn) EventBusOption
- func EatErrors() EventBusOption
- func LoadBalancer(serviceName string) EventBusOption
- func PullTimeout(d time.Duration) EventBusOption
- func QueueGroup(fn func(eventName string) string) EventBusOption
- func SubjectFunc(fn func(eventName string) string) EventBusOption
- func SubjectPrefix(prefix string) EventBusOption
- func URL(url string) EventBusOption
- func Use(d Driver) EventBusOption
- type JetStreamOption
Constants ¶
This section is empty.
Variables ¶
var ( // ErrStreamExists is returned when the JetStream driver tries to create a // stream that already exists with a different configuration. ErrStreamExists = errors.New("stream already exists with a different configuration") // ErrConsumerExists is returned when the JetStream driver tries to create a // consumer that already exists with a different configuration. ErrConsumerExists = errors.New("consumer already exists with a different configuration") )
var ( // DefaultStream is the default JetStream stream name to use/create if no // explicit name is provided using the StreamName() option. DefaultStream = "goes" )
var ( // ErrPullTimeout is raised by an eventBus when a subscriber doesn't pull an // event from the event channel within the specified PullTimeout. In such // case, the event is dropped to avoid blocking the application because of a // slow consumer. ErrPullTimeout = errors.New("pull timed out. slow consumer?") )
Functions ¶
This section is empty.
Types ¶
type Driver ¶
type Driver interface {
// contains filtered or unexported methods
}
A Driver provides the specific implementation for interacting with either NATS Core or NATS JetStream. Use the Core or JetStream functions to create a Driver.
func Core ¶
func Core() Driver
Core returns the NATS Core Driver (which is enabled by default):
bus := NewEventBus(enc, Use(Core())) // or bus := NewEventBus(enc)
func JetStream ¶
func JetStream(opts ...JetStreamOption) Driver
JetStream returns the NATS JetStream Driver:
bus := NewEventBus(enc, Use(JetStream()))
type EventBus ¶
type EventBus struct {
// contains filtered or unexported fields
}
EventBus is an event bus that uses NATS to publish and subscribe to events.
Drivers ¶
The event bus supports both NATS Core and NATS JetStream. By default, the Core driver is used, but you can create and specify the JetStream driver with the Use option:
var enc codec.Encoding bus := nats.NewEventBus(enc, nats.Use(nats.JetStream()))
func NewEventBus ¶
func NewEventBus(enc codec.Encoding, opts ...EventBusOption) *EventBus
NewEventBus returns a NATS event bus.
The provided Encoder is used to encode and decode event data when publishing and subscribing to events.
If no other specified, the returned event bus will use the NATS Core Driver. To use the NATS JetStream Driver instead, explicitly set the Driver:
NewEventBus(enc, Use(JetStream()))
func (*EventBus) Connect ¶
Connects connects to NATS.
It is not required to call Connect to use the eventBus because Connect is automatically called by Subscribe and Publish.
func (*EventBus) Connection ¶ added in v0.1.2
func (bus *EventBus) Connection() *nats.Conn
Connection returns the underlying *nats.Conn.
func (*EventBus) Disconnect ¶
Disconnect closes the underlying *nats.Conn. Should ctx be canceled before the connection is closed, ctx.Err() is returned.
type EventBusOption ¶
type EventBusOption func(*EventBus)
EventBusOption is an option for an eventBus.
func Conn ¶
func Conn(conn *nats.Conn) EventBusOption
Conn returns an option that provides the underlying *nats.Conn to the event bus. When providing a connection, the event bus does not try to connect to NATS but uses the provided connection instead.
func EatErrors ¶
func EatErrors() EventBusOption
EatErrors returns an option that discards any asynchronous errors of subscriptions. When subscribing to an event, you can safely ignore the returned error channel:
var bus *EventBus events, _, err := bus.Subscribe(context.TODO(), ...)
func LoadBalancer ¶ added in v0.1.2
func LoadBalancer(serviceName string) EventBusOption
LoadBalancer returns a QueueGroup option that enables load-blancing between event buses that share the same serviceName. The option applies the QueueGroup option so that the queue group for the subscription to an event is built in the following format:
fmt.Sprintf("%s:%s", <serviceName>, <eventName>)
Caution ¶
Providing a load-balanced event bus as the underlying bus to a command bus should be avoided, and providing it to a projection schedule should be done with caution.
To create a command bus, create another instance of the event bus with load- balancing disabled, and pass that bus to cmdbus.New().
When you create a projection schedule, you have to think about what makes sense in the context of your projection, because a load-balanced event bus will cause only a single instance of a replicated service to trigger a projection. Also, each event may be received by a different instance which can make the projection jobs fragmented and less efficient. A common example is some kind of lookup table that is projected from events and that your instances keep "live" in-memory. Each instance needs the lookup table to work, so it wouldn't make sense to load-balance the projection. In most cases where projections are not kept in memory but instead fetched from a database, updated and then saved back to the database, a load-balanced projection schedule is exactly what you want, but then again, context matters.
Read more about queue groups: https://docs.nats.io/nats-concepts/core-nats/queue
func PullTimeout ¶
func PullTimeout(d time.Duration) EventBusOption
PullTimeout returns an option that limits the Duration an event bus tries to push an event into a subscribed event channel. When the pull timeout is exceeded, the event gets dropped and a warning is logged. Default is the zero-Duration which means "no timeout".
func QueueGroup ¶
func QueueGroup(fn func(eventName string) string) EventBusOption
QueueGroup returns an option that specifies the NATS queue group for new subscriptions. When subscribing to an event, fn(eventName) is called to determine the queue group name for that subscription. If the returned queue group is an empty string, the queue group feature will not be used for the subscription.
This option has no effect if used with the "jetstream" driver in "pull" mode (default).
Use Case ¶
Queue groups can be used to load-balance between multiple subscribers of the same event. When multiple subscribers are subscribed to an event and use the same queue group, only one of the subscribers will receive the event. To load-balance between instances of a replicated (micro-)service, use a shared name (service name) that is the same between the replicated services and use that id as the queue group:
serviceName := "foo-service" bus := NewEventBus(enc, QueueGroup(func(eventName) string { return serviceName }), )
The example above can also be written as:
serviceName := "foo-service" bus := NewEventBus(enc, WithLoadBalancer(serviceName))
Queue groups are disabled by default.
Read more about queue groups: https://docs.nats.io/nats-concepts/core-nats/queue
func SubjectFunc ¶
func SubjectFunc(fn func(eventName string) string) EventBusOption
SubjectFunc returns an option that specifies how the NATS subjects for event names are generated. Any "." in the subject are replaced by "_".
By default, a subject is the event name with "." replaced by "_".
func SubjectPrefix ¶
func SubjectPrefix(prefix string) EventBusOption
SubjectFunc returns an option that specifies how the NATS subjects for event names are generated.
func URL ¶
func URL(url string) EventBusOption
URL returns an option that sets the connection URL to the NATS server. If no URL is specified, the environment variable `NATS_URL` will be used as the connection URL. If that is also not set, the default NATS URL (nats.DefaultURL) is used instead.
func Use ¶
func Use(d Driver) EventBusOption
Use returns the option to specify the Driver to use to communicate with NATS. By default, the "core" driver is used.
bus := NewEventBus(enc, Use(JetStream()))
type JetStreamOption ¶ added in v0.1.2
type JetStreamOption func(*jetStream)
JetStreamOption is an option for the JetStream driver.
func Durable ¶
func Durable(prefix string) JetStreamOption
Durable returns an option that makes JetStream consumers / subscriptions durable (see DurableFunc). The durable name is formatted like this:
fmt.Sprintf("%s:%s:%s", prefix, queue, event)
func DurableFunc ¶
func DurableFunc(fn func(event, queue string) string) JetStreamOption
DurableFunc returns an option that makes JetStream consumers / subscriptions durable. When creating a consumer, the provided function is called with the event name and queue group (see QueueGroup and LoadBalancer options) to generate the durable name. If the event is the wildcard "*", it is passed as "$all". Similarly, if the queue group is an empty string, it is passed as "$noqueue". Any ".", "*", or ">" characters in the returned durable name will be replaced by "_".
The JetStream driver creates one consumer / subscription per event.
Read more about durable subscriptions: https://docs.nats.io/nats-concepts/jetstream/consumers#durable-name
func StreamName ¶ added in v0.1.2
func StreamName(stream string) JetStreamOption
StreamName returns a JetStreamOption that specifies the stream name that is created by the JetStream driver. The default stream name is "goes".
func SubOpts ¶
func SubOpts(opts ...nats.SubOpt) JetStreamOption
SubOpts returns an option that adds custom nats.SubOpts when creating a JetStream subscription.