micro

package
v1.37.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 13, 2024 License: Apache-2.0 Imports: 10 Imported by: 90

README

NATS micro

Overview

The micro package in the NATS.go library provides a simple way to create microservices that leverage NATS for scalability, load management and observability.

Basic usage

To start using the micro package, import it in your application:

import "github.com/nats-io/nats.go/micro"

The core of the micro package is the Service. A Service aggregates endpoints for handling application logic. Services are named and versioned. You create a Service using the micro.NewService() function, passing in the NATS connection and Service configuration.

nc, _ := nats.Connect(nats.DefaultURL)

// request handler
echoHandler := func(req micro.Request) {
    req.Respond(req.Data())
}

srv, err := micro.AddService(nc, micro.Config{
    Name:        "EchoService",
    Version:     "1.0.0",
    // base handler
    Endpoint: &micro.EndpointConfig{
        Subject: "svc.echo",
        Handler: micro.HandlerFunc(echoHandler),
    },
})

After creating the service, it can be accessed by publishing a request on endpoint subject. For given configuration, run:

nats req svc.echo "hello!"

To get:

17:37:32 Sending request on "svc.echo"
17:37:32 Received with rtt 365.875µs
hello!

Endpoints and groups

Base endpoint can be optionally configured on a service, but it is also possible to add more endpoints after the service is created.

srv, _ := micro.AddService(nc, config)

// endpoint will be registered under "svc.add" subject
err = srv.AddEndpoint("svc.add", micro.HandlerFunc(add))

In the above example svc.add is an endpoint name and subject. It is possible have a different endpoint name then the endpoint subject by using micro.WithEndpointSubject() option in AddEndpoint().

// endpoint will be registered under "svc.add" subject
err = srv.AddEndpoint("Adder", micro.HandlerFunc(echoHandler), micro.WithEndpointSubject("svc.add"))

Endpoints can also be aggregated using groups. A group represents a common subject prefix used by all endpoints associated with it.

srv, _ := micro.AddService(nc, config)

numbersGroup := srv.AddGroup("numbers")

// endpoint will be registered under "numbers.add" subject
_ = numbersGroup.AddEndpoint("add", micro.HandlerFunc(addHandler))
// endpoint will be registered under "numbers.multiply" subject
_ = numbersGroup.AddEndpoint("multiply", micro.HandlerFunc(multiplyHandler))

Customizing queue groups

For each service, group and endpoint the queue group used to gather responses can be customized. If not provided a default queue group will be used (q). Customizing queue groups can be useful to e.g. implement fanout request pattern or hedged request pattern (to reduce tail latencies by only waiting for the first response for multiple service instances).

Let's say we have multiple services listening on the same subject, but with different queue groups:

for i := 0; i < 5; i++ {
  srv, _ := micro.AddService(nc, micro.Config{
    Name:        "EchoService",
    Version:     "1.0.0",
    QueueGroup:  fmt.Sprintf("q-%d", i),
    // base handler
    Endpoint: &micro.EndpointConfig{
        Subject: "svc.echo",
        Handler: micro.HandlerFunc(echoHandler),
    },
  })
}

In the client, we can send request to svc.echo to receive responses from all services registered on this subject (or wait only for the first response):

sub, _ := nc.SubscribeSync("rply")
nc.PublishRequest("svc.echo", "rply", nil)
for start := time.Now(); time.Since(start) < 5*time.Second; {
  msg, err := sub.NextMsg(1 * time.Second)
  if err != nil {
    break
  }
  fmt.Println("Received ", string(msg.Data))
}

Queue groups can be overwritten by setting them on groups and endpoints as well:

  srv, _ := micro.AddService(nc, micro.Config{
    Name:        "EchoService",
    Version:     "1.0.0",
    QueueGroup:  "q1",
  })

  g := srv.AddGroup("g", micro.WithGroupQueueGroup("q2"))

  // will be registered with queue group 'q2' from parent group
  g.AddEndpoint("bar", micro.HandlerFunc(func(r micro.Request) {}))

  // will be registered with queue group 'q3'
  g.AddEndpoint("bar", micro.HandlerFunc(func(r micro.Request) {}), micro.WithEndpointQueueGroup("q3"))

Discovery and Monitoring

Each service is assigned a unique ID on creation. A service instance is identified by service name and ID. Multiple services with the same name, but different IDs can be created.

Each service exposes 3 endpoints when created:

  • PING - used for service discovery and RTT calculation
  • INFO - returns service configuration details (used subjects, service metadata etc.)
  • STATS - service statistics

Each of those operations can be performed on 3 subjects:

  • all services: $SRV.<operation> - returns a response for each created service and service instance
  • by service name: $SRV.<operation>.<service_name> - returns a response for each service with given service_name
  • by service name and ID: $SRV.<operation>.<service_name>.<service_id> - returns a response for a service with given service_name and service_id

For given configuration

nc, _ := nats.Connect("nats://localhost:4222")
echoHandler := func(req micro.Request) {
    req.Respond(req.Data())
}

config := micro.Config{
    Name:    "EchoService",
    Version: "1.0.0",
    Endpoint: &micro.EndpointConfig{
        Subject: "svc.echo",
        Handler: micro.HandlerFunc(echoHandler),
    },
}
for i := 0; i < 3; i++ {
    srv, err := micro.AddService(nc, config)
    if err != nil {
        log.Fatal(err)
    }
    defer srv.Stop()
}

Service IDs can be discovered by:

nats req '$SRV.PING.EchoService' '' --replies=3

8:59:41 Sending request on "$SRV.PING.EchoService"
18:59:41 Received with rtt 688.042µs
{"name":"EchoService","id":"tNoopzL5Sp1M4qJZdhdxqC","version":"1.0.0","metadata":{},"type":"io.nats.micro.v1.ping_response"}

18:59:41 Received with rtt 704.167µs
{"name":"EchoService","id":"tNoopzL5Sp1M4qJZdhdxvO","version":"1.0.0","metadata":{},"type":"io.nats.micro.v1.ping_response"}

18:59:41 Received with rtt 707.875µs
{"name":"EchoService","id":"tNoopzL5Sp1M4qJZdhdy0a","version":"1.0.0","metadata":{},"type":"io.nats.micro.v1.ping_response"}

A specific service instance info can be retrieved:

nats req '$SRV.INFO.EchoService.tNoopzL5Sp1M4qJZdhdxqC' ''

19:40:06 Sending request on "$SRV.INFO.EchoService.tNoopzL5Sp1M4qJZdhdxqC"
19:40:06 Received with rtt 282.375µs
{"name":"EchoService","id":"tNoopzL5Sp1M4qJZdhdxqC","version":"1.0.0","metadata":{},"type":"io.nats.micro.v1.info_response","description":"","subjects":["svc.echo"]}

To get statistics for this service:

nats req '$SRV.STATS.EchoService.tNoopzL5Sp1M4qJZdhdxqC' ''

19:40:47 Sending request on "$SRV.STATS.EchoService.tNoopzL5Sp1M4qJZdhdxqC"
19:40:47 Received with rtt 421.666µs
{"name":"EchoService","id":"tNoopzL5Sp1M4qJZdhdxqC","version":"1.0.0","metadata":{},"type":"io.nats.micro.v1.stats_response","started":"2023-05-22T16:59:39.938514Z","endpoints":[{"name":"default","subject":"svc.echo","metadata":null,"num_requests":0,"num_errors":0,"last_error":"","processing_time":0,"average_processing_time":0}]}

Examples

For more detailed examples, refer to the ./test/example_test.go directory in this package.

Documentation

The complete documentation is available on GoDoc.

Documentation

Overview

Example
nc, err := nats.Connect("127.0.0.1:4222")
if err != nil {
	log.Fatal(err)
}
defer nc.Close()

// endpoint handler - in this case, HandlerFunc is used,
// which is a built-in implementation of Handler interface
echoHandler := func(req micro.Request) {
	req.Respond(req.Data())
}

// second endpoint
incrementHandler := func(req micro.Request) {
	val, err := strconv.Atoi(string(req.Data()))
	if err != nil {
		req.Error("400", "request data should be a number", nil)
		return
	}

	responseData := val + 1
	req.Respond([]byte(strconv.Itoa(responseData)))
}

// third endpoint
multiplyHandler := func(req micro.Request) {
	val, err := strconv.Atoi(string(req.Data()))
	if err != nil {
		req.Error("400", "request data should be a number", nil)
		return
	}

	responseData := val * 2
	req.Respond([]byte(strconv.Itoa(responseData)))
}

config := micro.Config{
	Name:        "IncrementService",
	Version:     "0.1.0",
	Description: "Increment numbers",

	// base handler - for simple services with single endpoints this is sufficient
	Endpoint: &micro.EndpointConfig{
		Subject: "echo",
		Handler: micro.HandlerFunc(echoHandler),
	},
}
svc, err := micro.AddService(nc, config)
if err != nil {
	log.Fatal(err)
}
defer svc.Stop()

// add a group to aggregate endpoints under common prefix
numbers := svc.AddGroup("numbers")

// register endpoints in a group
err = numbers.AddEndpoint("Increment", micro.HandlerFunc(incrementHandler))
if err != nil {
	log.Fatal(err)
}
err = numbers.AddEndpoint("Multiply", micro.HandlerFunc(multiplyHandler))
if err != nil {
	log.Fatal(err)
}

// send a request to a service
resp, err := nc.Request("numbers.Increment", []byte("3"), 1*time.Second)
if err != nil {
	log.Fatal(err)
}
responseVal, err := strconv.Atoi(string(resp.Data))
if err != nil {
	log.Fatal(err)
}
fmt.Println(responseVal)
Output:

Index

Examples

Constants

View Source
const (
	// Queue Group name used across all services
	DefaultQueueGroup = "q"

	// APIPrefix is the root of all control subjects
	APIPrefix = "$SRV"
)
View Source
const (
	ErrorHeader     = "Nats-Service-Error"
	ErrorCodeHeader = "Nats-Service-Error-Code"
)

Service Error headers

View Source
const (
	InfoResponseType  = "io.nats.micro.v1.info_response"
	PingResponseType  = "io.nats.micro.v1.ping_response"
	StatsResponseType = "io.nats.micro.v1.stats_response"
)

Variables

View Source
var (
	ErrRespond         = errors.New("NATS error when sending response")
	ErrMarshalResponse = errors.New("marshaling response")
	ErrArgRequired     = errors.New("argument required")
)
View Source
var (
	// ErrConfigValidation is returned when service configuration is invalid
	ErrConfigValidation = errors.New("validation")

	// ErrVerbNotSupported is returned when invalid [Verb] is used (PING, INFO, STATS)
	ErrVerbNotSupported = errors.New("unsupported verb")

	// ErrServiceNameRequired is returned when attempting to generate control subject with ID but empty name
	ErrServiceNameRequired = errors.New("service name is required to generate ID control subject")
)

Common errors returned by the Service framework.

Functions

func ControlSubject

func ControlSubject(verb Verb, name, id string) (string, error)

ControlSubject returns monitoring subjects used by the Service. Providing a verb is mandatory (it should be one of Ping, Info or Stats). Depending on whether kind and id are provided, ControlSubject will return one of the following:

  • verb only: subject used to monitor all available services
  • verb and kind: subject used to monitor services with the provided name
  • verb, name and id: subject used to monitor an instance of a service with the provided ID
Example
package main

import (
	"fmt"

	"github.com/nats-io/nats.go/micro"
)

func main() {

	// subject used to get PING from all services
	subjectPINGAll, _ := micro.ControlSubject(micro.PingVerb, "", "")
	fmt.Println(subjectPINGAll)

	// subject used to get PING from services with provided name
	subjectPINGName, _ := micro.ControlSubject(micro.PingVerb, "CoolService", "")
	fmt.Println(subjectPINGName)

	// subject used to get PING from a service with provided name and ID
	subjectPINGInstance, _ := micro.ControlSubject(micro.PingVerb, "CoolService", "123")
	fmt.Println(subjectPINGInstance)

}
Output:

$SRV.PING
$SRV.PING.CoolService
$SRV.PING.CoolService.123

Types

type Config

type Config struct {
	// Name represents the name of the service.
	Name string `json:"name"`

	// Endpoint is an optional endpoint configuration.
	// More complex, multi-endpoint services can be configured using
	// Service.AddGroup and Service.AddEndpoint methods.
	Endpoint *EndpointConfig `json:"endpoint"`

	// Version is a SemVer compatible version string.
	Version string `json:"version"`

	// Description of the service.
	Description string `json:"description"`

	// Metadata annotates the service
	Metadata map[string]string `json:"metadata,omitempty"`

	// QueueGroup can be used to override the default queue group name.
	QueueGroup string `json:"queue_group"`

	// StatsHandler is a user-defined custom function.
	// used to calculate additional service stats.
	StatsHandler StatsHandler

	// DoneHandler is invoked when all service subscription are stopped.
	DoneHandler DoneHandler

	// ErrorHandler is invoked on any nats-related service error.
	ErrorHandler ErrHandler
}

Config is a configuration of a service.

type DoneHandler

type DoneHandler func(Service)

DoneHandler is a function used to configure a custom done handler for a service.

type Endpoint

type Endpoint struct {
	EndpointConfig
	Name string
	// contains filtered or unexported fields
}

Endpoint manages a service endpoint.

type EndpointConfig added in v1.23.0

type EndpointConfig struct {
	// Subject on which the endpoint is registered.
	Subject string

	// Handler used by the endpoint.
	Handler Handler

	// Metadata annotates the service
	Metadata map[string]string `json:"metadata,omitempty"`

	// QueueGroup can be used to override the default queue group name.
	QueueGroup string `json:"queue_group"`
}

type EndpointInfo added in v1.27.0

type EndpointInfo struct {
	Name       string            `json:"name"`
	Subject    string            `json:"subject"`
	QueueGroup string            `json:"queue_group"`
	Metadata   map[string]string `json:"metadata"`
}

type EndpointOpt added in v1.23.0

type EndpointOpt func(*endpointOpts) error

func WithEndpointMetadata added in v1.25.0

func WithEndpointMetadata(metadata map[string]string) EndpointOpt

func WithEndpointQueueGroup added in v1.30.0

func WithEndpointQueueGroup(queueGroup string) EndpointOpt

func WithEndpointSubject added in v1.23.0

func WithEndpointSubject(subject string) EndpointOpt
Example
nc, err := nats.Connect("127.0.0.1:4222")
if err != nil {
	log.Fatal(err)
}
defer nc.Close()

echoHandler := func(req micro.Request) {
	req.Respond(req.Data())
}

config := micro.Config{
	Name:    "EchoService",
	Version: "1.0.0",
}

srv, err := micro.AddService(nc, config)
if err != nil {
	log.Fatal(err)
}

// endpoint will be registered under "service.echo" subject
err = srv.AddEndpoint("Echo", micro.HandlerFunc(echoHandler), micro.WithEndpointSubject("service.echo"))
if err != nil {
	log.Fatal(err)
}
Output:

type EndpointStats added in v1.23.0

type EndpointStats struct {
	Name                  string          `json:"name"`
	Subject               string          `json:"subject"`
	QueueGroup            string          `json:"queue_group"`
	NumRequests           int             `json:"num_requests"`
	NumErrors             int             `json:"num_errors"`
	LastError             string          `json:"last_error"`
	ProcessingTime        time.Duration   `json:"processing_time"`
	AverageProcessingTime time.Duration   `json:"average_processing_time"`
	Data                  json.RawMessage `json:"data,omitempty"`
}

EndpointStats contains stats for a specific endpoint.

type ErrHandler

type ErrHandler func(Service, *NATSError)

ErrHandler is a function used to configure a custom error handler for a service,

type Group added in v1.23.0

type Group interface {
	// AddGroup creates a new group, prefixed by this group's prefix.
	AddGroup(string, ...GroupOpt) Group

	// AddEndpoint registers new endpoints on a service.
	// The endpoint's subject will be prefixed with the group prefix.
	AddEndpoint(string, Handler, ...EndpointOpt) error
}

Group allows for grouping endpoints on a service.

Endpoints created using AddEndpoint will be grouped under common prefix (group name) New groups can also be derived from a group using AddGroup.

type GroupOpt added in v1.30.0

type GroupOpt func(*groupOpts)

func WithGroupQueueGroup added in v1.30.0

func WithGroupQueueGroup(queueGroup string) GroupOpt

type Handler added in v1.23.0

type Handler interface {
	Handle(Request)
}

Handler is used to respond to service requests.

Example
package main

import (
	"log"
	"strconv"

	"github.com/nats-io/nats.go"
	"github.com/nats-io/nats.go/micro"
)

type rectangle struct {
	height int
	width  int
}

// Handle is an implementation of micro.Handler used to
// calculate the area of a rectangle
func (r rectangle) Handle(req micro.Request) {
	area := r.height * r.width
	req.Respond([]byte(strconv.Itoa(area)))
}

func main() {
	nc, err := nats.Connect("127.0.0.1:4222")
	if err != nil {
		log.Fatal(err)
	}
	defer nc.Close()

	rec := rectangle{10, 5}

	config := micro.Config{
		Name:    "RectangleAreaService",
		Version: "0.1.0",
		Endpoint: &micro.EndpointConfig{
			Subject: "area.rectangle",
			Handler: rec,
		},
	}
	svc, err := micro.AddService(nc, config)
	if err != nil {
		log.Fatal(err)
	}
	defer svc.Stop()
}
Output:

func ContextHandler added in v1.24.0

func ContextHandler(ctx context.Context, handler func(context.Context, Request)) Handler

ContextHandler is a helper function used to utilize context.Context in request handlers.

Example
nc, err := nats.Connect("127.0.0.1:4222")
if err != nil {
	log.Fatal(err)
}
defer nc.Close()

handler := func(ctx context.Context, req micro.Request) {
	select {
	case <-ctx.Done():
		req.Error("400", "context canceled", nil)
	default:
		req.Respond([]byte("ok"))
	}
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
config := micro.Config{
	Name:    "EchoService",
	Version: "0.1.0",
	Endpoint: &micro.EndpointConfig{
		Subject: "echo",
		Handler: micro.ContextHandler(ctx, handler),
	},
}

srv, _ := micro.AddService(nc, config)
defer srv.Stop()
Output:

type HandlerFunc added in v1.23.0

type HandlerFunc func(Request)

HandlerFunc is a function implementing Handler. It allows using a function as a request handler, without having to implement Handle on a separate type.

func (HandlerFunc) Handle added in v1.23.0

func (fn HandlerFunc) Handle(req Request)

type Headers added in v1.22.1

type Headers nats.Header

Headers is a wrapper around *nats.Header

func (Headers) Get added in v1.22.1

func (h Headers) Get(key string) string

Get gets the first value associated with the given key. It is case-sensitive.

func (Headers) Values added in v1.22.1

func (h Headers) Values(key string) []string

Values returns all values associated with the given key. It is case-sensitive.

type Info

type Info struct {
	ServiceIdentity
	Type        string         `json:"type"`
	Description string         `json:"description"`
	Endpoints   []EndpointInfo `json:"endpoints"`
}

Info is the basic information about a service type.

type NATSError

type NATSError struct {
	Subject     string
	Description string
}

NATSError represents an error returned by a NATS Subscription. It contains a subject on which the subscription failed, so that it can be linked with a specific service endpoint.

func (*NATSError) Error

func (e *NATSError) Error() string

type Ping

type Ping struct {
	ServiceIdentity
	Type string `json:"type"`
}

Ping is the response type for PING monitoring endpoint.

type Request

type Request interface {
	// Respond sends the response for the request.
	// Additional headers can be passed using [WithHeaders] option.
	Respond([]byte, ...RespondOpt) error

	// RespondJSON marshals the given response value and responds to the request.
	// Additional headers can be passed using [WithHeaders] option.
	RespondJSON(any, ...RespondOpt) error

	// Error prepares and publishes error response from a handler.
	// A response error should be set containing an error code and description.
	// Optionally, data can be set as response payload.
	Error(code, description string, data []byte, opts ...RespondOpt) error

	// Data returns request data.
	Data() []byte

	// Headers returns request headers.
	Headers() Headers

	// Subject returns underlying NATS message subject.
	Subject() string

	// Reply returns underlying NATS message reply subject.
	Reply() string
}

Request represents service request available in the service handler. It exposes methods to respond to the request, as well as getting the request data and headers.

type RespondOpt added in v1.22.1

type RespondOpt func(*nats.Msg)

RespondOpt is a function used to configure [Request.Respond] and [Request.RespondJSON] methods.

func WithHeaders added in v1.22.1

func WithHeaders(headers Headers) RespondOpt

WithHeaders can be used to configure response with custom headers.

type Service

type Service interface {
	// AddEndpoint registers endpoint with given name on a specific subject.
	AddEndpoint(string, Handler, ...EndpointOpt) error

	// AddGroup returns a Group interface, allowing for more complex endpoint topologies.
	// A group can be used to register endpoints with given prefix.
	AddGroup(string, ...GroupOpt) Group

	// Info returns the service info.
	Info() Info

	// Stats returns statistics for the service endpoint and all monitoring endpoints.
	Stats() Stats

	// Reset resets all statistics (for all endpoints) on a service instance.
	Reset()

	// Stop drains the endpoint subscriptions and marks the service as stopped.
	Stop() error

	// Stopped informs whether [Stop] was executed on the service.
	Stopped() bool
}

Service exposes methods to operate on a service instance.

func AddService

func AddService(nc *nats.Conn, config Config) (Service, error)

AddService adds a microservice. It will enable internal common services (PING, STATS and INFO). Request handlers have to be registered separately using Service.AddEndpoint. A service name, version and Endpoint configuration are required to add a service. AddService returns a Service interface, allowing service management. Each service is assigned a unique ID.

Example
nc, err := nats.Connect("127.0.0.1:4222")
if err != nil {
	log.Fatal(err)
}
defer nc.Close()

echoHandler := func(req micro.Request) {
	req.Respond(req.Data())
}

config := micro.Config{
	Name:        "EchoService",
	Version:     "1.0.0",
	Description: "Send back what you receive",
	// DoneHandler can be set to customize behavior on stopping a service.
	DoneHandler: func(srv micro.Service) {
		info := srv.Info()
		fmt.Printf("stopped service %q with ID %q\n", info.Name, info.ID)
	},

	// ErrorHandler can be used to customize behavior on service execution error.
	ErrorHandler: func(srv micro.Service, err *micro.NATSError) {
		info := srv.Info()
		fmt.Printf("Service %q returned an error on subject %q: %s", info.Name, err.Subject, err.Description)
	},

	// optional base handler
	Endpoint: &micro.EndpointConfig{
		Subject: "echo",
		Handler: micro.HandlerFunc(echoHandler),
	},
}

srv, err := micro.AddService(nc, config)
if err != nil {
	log.Fatal(err)
}
defer srv.Stop()
Output:

type ServiceIdentity

type ServiceIdentity struct {
	Name     string            `json:"name"`
	ID       string            `json:"id"`
	Version  string            `json:"version"`
	Metadata map[string]string `json:"metadata"`
}

ServiceIdentity contains fields helping to identity a service instance.

type Stats

type Stats struct {
	ServiceIdentity
	Type      string           `json:"type"`
	Started   time.Time        `json:"started"`
	Endpoints []*EndpointStats `json:"endpoints"`
}

Stats is the type returned by STATS monitoring endpoint. It contains stats of all registered endpoints.

type StatsHandler

type StatsHandler func(*Endpoint) any

StatsHandler is a function used to configure a custom STATS endpoint. It should return a value which can be serialized to JSON.

type Verb

type Verb int64

Verb represents a name of the monitoring service.

const (
	PingVerb Verb = iota
	StatsVerb
	InfoVerb
)

Verbs being used to set up a specific control subject.

func (Verb) String

func (s Verb) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL