natsby

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 25, 2020 License: MIT Imports: 11 Imported by: 1

README

Natsby

ci status Coverage Status Go Report Card

Natsby enables NATS-driven services.

It provides with a familiar API similar to HTTP Web frameworks like Gin.

NOTE: ⚠️ This project is experimental, very early and the API is not stable! Use with caution! ⚠️

Installation

Install the dependency:

go get -u github.com/jasonblanchard/natsby

Import in your code:

import "github.com/nats-io/nats.go"

Quickstart

Assuming a NATS server running at http://localhost:4222, initialize a NATS connection and pass it to Natsby:

package main

import (
	"github.com/jasonblanchard/natsby"
	"github.com/nats-io/nats.go"
)

func main() {
	nc, err := nats.Connect(nats.DefaultURL)
	engine, err := natsby.New(nc)
	if err != nil {
		panic(err)
	}

	logger := natsby.DefaultLogger()
	engine.Use(natsby.WithLogger(logger)) // (1)

	engine.Subscribe("ping", natsby.WithByteReply(), func(c *natsby.Context) { // (2)
		c.ByteReplyPayload = []byte("pong")
	})

	engine.Run(func() { // (3)
		logger.Info().Msg("Ready 🚀")
	})
}

This will:

  1. Set up the default logger as global middleware. This will be run on each received message.
  2. Register a middleware chain to be called when messages are received on the "ping" subject. The handlers will be called right-to-left. WithByteReply is a built-in middleware that will publish the payload in ByteReplyPayload to the message reply subject if one exists.
  3. Start all the subscriptions. This is blocking.

See the examples directory for more sample code.

Middleware

Middleware is a chain of functions. Each are run when a message is received on a subscribed topic.

Middleware can be added globally using engine.Use() (i.e. it is called on every subscription) or per subscription as functions passed to engine.Subscribe().

Middleware functions are called with a Context instance unique to that subscription handler invocation. The Context instance can be mutated to provide data for subsequent middleware via specific setters on the Context struct or generic key/value pairs via Context.Get()/Context.Set().

Built-in Middleware
Logging

Logs information as messages are received and handled. This middleware uses zerolog (this may change in the future).

You can bring your own zerolog instance or use the default logger:

logger := natsby.DefaultLogger()
engine.Use(natsby.WithLogger(logger))
Replier

Publishes the payload in context.ByteReplyPayload to the context.Msg.Reply subject if it exists:

engine.Subscribe("ping", natsby.WithByteReply(), func(c *natsby.Context) {
	c.ByteReplyPayload = []byte("pong")
})
Recovery

Catches panic()s and logs an error so that the process does not crash. By default, it logs the error and stack trace:

engine.Use(natsby.WithRecovery())

You can also bring your own recovery handler:

	engine.Use(natsby.WithCustomRecovery(func(c *natsby.Context, err interface{}) {
		logger.Error().Msg(fmt.Sprintf("%v", err))

		if c.Msg.Reply != "" {
			c.NatsConnection.Publish(c.Msg.Reply, []byte("oops"))
		}
  }))
Metrics

Collect observability metrics for various aspects of Natsby. The "observers" (when metrics are collected) and "collectors" (where/how the metrics are sent) are intended to be pluggable. Natsby ships with a Prometheus collector which will start a metrics server on the port passed to natsby.NewPrometheusCollector:

collector := natsby.NewPrometheusCollector("2112")
observer := natsby.NewDefaultObserver(collector)
engine.Use(natsby.WithMetrics(observer))

FAQ

Why would I use this?

If you find yourself needing to write a service and you usually reach for something like Gin but want to use NATS instead of HTTP, Natsby might be for you.

Why would I use NATS instead of HTTP?

The creators of NATS have more to say on this, but it really boils down to a few key things:

  1. Decoupling message passing and identity - Services publishing messages don't need to know anything about who is handling those messages. No service discovery or DNS records to pass around. Services receiving messages can change over time without any changes required on the sending side.
  2. Multiple messaging semantics - Messages can be sent through your system in any combination of:
  • Consumed (intended for one receiver) or observed (intended for many receivers)
  • Synchronous (sender expects a reply immediately) or asynchronous (sending accepts a reply later or never)

NOTE: This terminology comes from The Tao of Microservices by Richard Roger.

You can replicate HTTP by using synchronous, consumed messages (i.e. request/reply), but any other combination is available. This can also change over time - two services might interact in a synchronous request/reply pattern, but you can also add other services that act as asynchronous observers listening on the same subjects. This is completely additive - nothing about the other collaborating services needs to change.

NOTE: Out of the box, NATS does not support durable queues and only guarantees "at most once" delivery. This means that if there are no subscribers for a given topic, messages sent to it will never be received. Other projects such as NATS Streaming Server, Liftbridge and Jetstream, are building persistent messageing on top of NATS, but are out of scope of this project for now.

Guiding Principles

  • The API should feel familiar to developers familiar with Gin, Martini or even Express, Sinatra, etc.
  • Logic not core to NATS subscriptions/replies should live in middleware and be as pluggable as possible.
  • Natsby should be able to do everything the nats-io Go client can do (this is a bit of a WIP).
  • Observability is first-class.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DefaultLogger

func DefaultLogger() *zerolog.Logger

DefaultLogger sets up a simple zerolog instance

Types

type Collector

type Collector interface {
	CollectSubjectReceived(subject string)
	CollectLatency(subject string, latency time.Duration)
	CollectReply(subject string)
	CollectError(subject string)
	Collect() error
}

Collector metrics collector

type Context

type Context struct {
	Msg *nats.Msg

	ByteReplyPayload []byte
	JSONReplyPayload interface{}

	NatsConnection        *nats.Conn
	NatsEncodedConnection *nats.EncodedConn
	Err                   error
	Keys                  map[string]interface{}
	// contains filtered or unexported fields
}

Context context that's passed through handlers and middleware

func (*Context) Get

func (c *Context) Get(k string) interface{}

Get gets arbirary value from the context map

func (*Context) GetByteReplyPayload

func (c *Context) GetByteReplyPayload() ([]byte, bool)

GetByteReplyPayload getter for byte reply payload with metadata about if it was set

func (*Context) Next

func (c *Context) Next()

Next to be called in middleware to invoke the middleware chain

func (*Context) NextWithLatencyDuration

func (c *Context) NextWithLatencyDuration() time.Duration

NextWithLatencyDuration Calls Next() and returns duration of execution

func (*Context) Set

func (c *Context) Set(k string, v interface{})

Set sets arbitrary value that will be available in the context map

type DefaultObserver

type DefaultObserver struct {
	// contains filtered or unexported fields
}

DefaultObserver Default concrete Observer implementation

func NewDefaultObserver

func NewDefaultObserver(c Collector) *DefaultObserver

NewDefaultObserver creates a default observer instance

func (*DefaultObserver) Observe

func (o *DefaultObserver) Observe() error

Observe start observing and collecting metrics

func (*DefaultObserver) ObserveError

func (o *DefaultObserver) ObserveError(c *Context)

ObserveError collect metrics on replies

func (*DefaultObserver) ObserveLatency

func (o *DefaultObserver) ObserveLatency(c *Context, latency time.Duration)

ObserveLatency collect metrics on handler chain latency

func (*DefaultObserver) ObserveReply

func (o *DefaultObserver) ObserveReply(c *Context)

ObserveReply collect metrics on replies

func (*DefaultObserver) ObserveSubjectReceived

func (o *DefaultObserver) ObserveSubjectReceived(c *Context)

ObserveSubjectReceived collect metrics when subject is received

type Engine

type Engine struct {
	NatsConnection        *nats.Conn
	NatsEncodedConnection *nats.EncodedConn

	QueueGroup string
	OutWriter  io.ReadWriter
	ErrWriter  io.ReadWriter
	// contains filtered or unexported fields
}

Engine framework instance

func New

func New(nc *nats.Conn, options ...func(*Engine) error) (*Engine, error)

New creates a new Router object

func (*Engine) Run

func (e *Engine) Run(callbacks ...func()) error

Run starts all the subscribers and blocks

func (*Engine) Shutdown

func (e *Engine) Shutdown()

Shutdown terminates all listeners and drains connections

func (*Engine) Subscribe

func (e *Engine) Subscribe(subject string, handlers ...HandlerFunc)

Subscribe adds a subscriber to the NATS instance with middleware

func (*Engine) Use

func (e *Engine) Use(middleware ...HandlerFunc)

Use adds global middleware to the engine which will be called for every subscription

type HandlerFunc

type HandlerFunc func(*Context)

HandlerFunc defines handler used by middleware as return value

func WithByteReply

func WithByteReply() HandlerFunc

WithByteReply Checks for reply channel and sends back byte response

func WithCustomRecovery

func WithCustomRecovery(handle RecoveryFunc) HandlerFunc

WithCustomRecovery catches panics with custom handler

func WithJSONReply

func WithJSONReply() HandlerFunc

WithJSONReply Checks for reply channel and sends back JSON response

func WithLogger

func WithLogger(logger *zerolog.Logger) HandlerFunc

WithLogger wraps handler with logging

func WithMetrics

func WithMetrics(o Observer) HandlerFunc

WithMetrics Observes handler behavior and reports metrics

func WithRecovery

func WithRecovery() HandlerFunc

WithRecovery catches panics to prevent program crashes

type HandlersChain

type HandlersChain []HandlerFunc

HandlersChain HandlerFunc array

type Observer

type Observer interface {
	ObserveSubjectReceived(c *Context)
	ObserveLatency(c *Context, latency time.Duration)
	ObserveReply(c *Context)
	ObserveError(c *Context)
	Observe() error
}

Observer describes when collectors should collect data

type PrometheusCollector

type PrometheusCollector struct {
	// contains filtered or unexported fields
}

PrometheusCollector observer for sending metrics to Prometheus

func NewPrometheusCollector

func NewPrometheusCollector(port string) *PrometheusCollector

NewPrometheusCollector initialize new Prometheus observer

func (*PrometheusCollector) Collect

func (p *PrometheusCollector) Collect() error

Collect starts prometheus server on p.port in a new goroutine

func (*PrometheusCollector) CollectError

func (p *PrometheusCollector) CollectError(subject string)

CollectError increment counter for errors

func (*PrometheusCollector) CollectLatency

func (p *PrometheusCollector) CollectLatency(subject string, latency time.Duration)

CollectLatency set histogram value for latency

func (*PrometheusCollector) CollectReply

func (p *PrometheusCollector) CollectReply(subject string)

CollectReply increment counter for replies

func (*PrometheusCollector) CollectSubjectReceived

func (p *PrometheusCollector) CollectSubjectReceived(subject string)

CollectSubjectReceived increment counter when message is received

type RecoveryFunc

type RecoveryFunc func(c *Context, err interface{})

RecoveryFunc defines the function passable to CustomRecovery.

type Subscriber

type Subscriber struct {
	Subject  string
	Handlers HandlersChain
}

Subscriber respresents a subscriber to be set up in Run()

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL