micro

package
v1.999.9999 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 10, 2023 License: Apache-2.0 Imports: 10 Imported by: 0

README

NATS micro

Overview

The micro package in the NATS.go library provides a simple way to create microservices that leverage NATS for scalability, load management and observability.

Basic usage

To start using the micro package, import it in your application:

import "github.com/nats-io/nats.go/micro"

The core of the micro package is the Service. A Service aggregates endpoints for handling application logic. Services are named and versioned. You create a Service using the micro.NewService() function, passing in the NATS connection and Service configuration.

nc, _ := nats.Connect(nats.DefaultURL)

// request handler
echoHandler := func(req micro.Request) {
    req.Respond(req.Data())
}

srv, err := micro.AddService(nc, micro.Config{
    Name:        "EchoService",
    Version:     "1.0.0",
    // base handler
    Endpoint: &micro.EndpointConfig{
        Subject: "svc.echo",
        Handler: micro.HandlerFunc(echoHandler),
    },
})

After creating the service, it can be accessed by publishing a request on endpoint subject. For given configuration, run:

nats req svc.echo "hello!"

To get:

17:37:32 Sending request on "svc.echo"
17:37:32 Received with rtt 365.875µs
hello!

Endpoints and groups

Base endpoint can be optionally configured on a service, but it is also possible to add more endpoints after the service is created.

srv, _ := micro.AddService(nc, config)

// endpoint will be registered under "svc.add" subject
err = srv.AddEndpoint("svc.add", micro.HandlerFunc(add))

In the above example svc.add is an endpoint name and subject. It is possible have a different endpoint name then the endpoint subject by using micro.WithEndpointSubject() option in AddEndpoint().

// endpoint will be registered under "svc.add" subject
err = srv.AddEndpoint("Adder", micro.HandlerFunc(echoHandler), micro.WithEndpointSubject("svc.add"))

Endpoints can also be aggregated using groups. A group represents a common subject prefix used by all endpoints associated with it.

srv, _ := micro.AddService(nc, config)

numbersGroup := srv.AddGroup("numbers")

// endpoint will be registered under "numbers.add" subject
_ = numbersGroup.AddEndpoint("add", micro.HandlerFunc(addHandler))
// endpoint will be registered under "numbers.multiply" subject
_ = numbersGroup.AddEndpoint("multiply", micro.HandlerFunc(multiplyHandler))

Discovery and Monitoring

Each service is assigned a unique ID on creation. A service instance is identified by service name and ID. Multiple services with the same name, but different IDs can be created.

Each service exposes 3 endpoints when created:

  • PING - used for service discovery and RTT calculation
  • INFO - returns service configuration details (used subjects, service metadata etc.)
  • STATS - service statistics

Each of those operations can be performed on 3 subjects:

  • all services: $SRV.<operation> - returns a response for each created service and service instance
  • by service name: $SRV.<operation>.<service_name> - returns a response for each service with given service_name
  • by service name and ID: $SRV.<operation>.<service_name>.<service_id> - returns a response for a service with given service_name and service_id

For given configuration

nc, _ := nats.Connect("nats://localhost:4222")
echoHandler := func(req micro.Request) {
    req.Respond(req.Data())
}

config := micro.Config{
    Name:    "EchoService",
    Version: "1.0.0",
    Endpoint: &micro.EndpointConfig{
        Subject: "svc.echo",
        Handler: micro.HandlerFunc(echoHandler),
    },
}
for i := 0; i < 3; i++ {
    srv, err := micro.AddService(nc, config)
    if err != nil {
        log.Fatal(err)
    }
    defer srv.Stop()
}

Service IDs can be discovered by:

nats req '$SRV.PING.EchoService' '' --replies=3

8:59:41 Sending request on "$SRV.PING.EchoService"
18:59:41 Received with rtt 688.042µs
{"name":"EchoService","id":"tNoopzL5Sp1M4qJZdhdxqC","version":"1.0.0","metadata":{},"type":"io.nats.micro.v1.ping_response"}

18:59:41 Received with rtt 704.167µs
{"name":"EchoService","id":"tNoopzL5Sp1M4qJZdhdxvO","version":"1.0.0","metadata":{},"type":"io.nats.micro.v1.ping_response"}

18:59:41 Received with rtt 707.875µs
{"name":"EchoService","id":"tNoopzL5Sp1M4qJZdhdy0a","version":"1.0.0","metadata":{},"type":"io.nats.micro.v1.ping_response"}

A specific service instance info can be retrieved:

nats req '$SRV.INFO.EchoService.tNoopzL5Sp1M4qJZdhdxqC' ''

19:40:06 Sending request on "$SRV.INFO.EchoService.tNoopzL5Sp1M4qJZdhdxqC"
19:40:06 Received with rtt 282.375µs
{"name":"EchoService","id":"tNoopzL5Sp1M4qJZdhdxqC","version":"1.0.0","metadata":{},"type":"io.nats.micro.v1.info_response","description":"","subjects":["svc.echo"]}

To get statistics for this service:

nats req '$SRV.STATS.EchoService.tNoopzL5Sp1M4qJZdhdxqC' ''

19:40:47 Sending request on "$SRV.STATS.EchoService.tNoopzL5Sp1M4qJZdhdxqC"
19:40:47 Received with rtt 421.666µs
{"name":"EchoService","id":"tNoopzL5Sp1M4qJZdhdxqC","version":"1.0.0","metadata":{},"type":"io.nats.micro.v1.stats_response","started":"2023-05-22T16:59:39.938514Z","endpoints":[{"name":"default","subject":"svc.echo","metadata":null,"num_requests":0,"num_errors":0,"last_error":"","processing_time":0,"average_processing_time":0}]}

Examples

For more detailed examples, refer to the ./test/example_test.go directory in this package.

Documentation

The complete documentation is available on GoDoc.

Documentation

Overview

Example
nc, err := nats.Connect("127.0.0.1:4222")
if err != nil {
	log.Fatal(err)
}
defer nc.Close()

// endpoint handler - in this case, HandlerFunc is used,
// which is a built-in implementation of Handler interface
echoHandler := func(req micro.Request) {
	req.Respond(req.Data())
}

// second endpoint
incrementHandler := func(req micro.Request) {
	val, err := strconv.Atoi(string(req.Data()))
	if err != nil {
		req.Error("400", "request data should be a number", nil)
		return
	}

	responseData := val + 1
	req.Respond([]byte(strconv.Itoa(responseData)))
}

// third endpoint
multiplyHandler := func(req micro.Request) {
	val, err := strconv.Atoi(string(req.Data()))
	if err != nil {
		req.Error("400", "request data should be a number", nil)
		return
	}

	responseData := val * 2
	req.Respond([]byte(strconv.Itoa(responseData)))
}

config := micro.Config{
	Name:        "IncrementService",
	Version:     "0.1.0",
	Description: "Increment numbers",

	// base handler - for simple services with single endpoints this is sufficient
	Endpoint: &micro.EndpointConfig{
		Subject: "echo",
		Handler: micro.HandlerFunc(echoHandler),
	},
}
svc, err := micro.AddService(nc, config)
if err != nil {
	log.Fatal(err)
}
defer svc.Stop()

// add a group to aggregate endpoints under common prefix
numbers := svc.AddGroup("numbers")

// register endpoints in a group
err = numbers.AddEndpoint("Increment", micro.HandlerFunc(incrementHandler))
if err != nil {
	log.Fatal(err)
}
err = numbers.AddEndpoint("Multiply", micro.HandlerFunc(multiplyHandler))
if err != nil {
	log.Fatal(err)
}

// send a request to a service
resp, err := nc.Request("numbers.Increment", []byte("3"), 1*time.Second)
if err != nil {
	log.Fatal(err)
}
responseVal, err := strconv.Atoi(string(resp.Data))
if err != nil {
	log.Fatal(err)
}
fmt.Println(responseVal)
Output:

Index

Examples

Constants

View Source
const (
	// Queue Group name used across all services
	QG = "q"

	// APIPrefix is the root of all control subjects
	APIPrefix = "$SRV"
)
View Source
const (
	ErrorHeader     = "Nats-Service-Error"
	ErrorCodeHeader = "Nats-Service-Error-Code"
)

Service Error headers

View Source
const (
	InfoResponseType  = "io.nats.micro.v1.info_response"
	PingResponseType  = "io.nats.micro.v1.ping_response"
	StatsResponseType = "io.nats.micro.v1.stats_response"
)

Variables

View Source
var (
	ErrRespond         = errors.New("NATS error when sending response")
	ErrMarshalResponse = errors.New("marshaling response")
	ErrArgRequired     = errors.New("argument required")
)
View Source
var (
	// ErrConfigValidation is returned when service configuration is invalid
	ErrConfigValidation = errors.New("validation")

	// ErrVerbNotSupported is returned when invalid [Verb] is used (PING, INFO, STATS)
	ErrVerbNotSupported = errors.New("unsupported verb")

	// ErrServiceNameRequired is returned when attempting to generate control subject with ID but empty name
	ErrServiceNameRequired = errors.New("service name is required to generate ID control subject")
)

Common errors returned by the Service framework.

Functions

func ControlSubject

func ControlSubject(verb Verb, name, id string) (string, error)

ControlSubject returns monitoring subjects used by the Service. Providing a verb is mandatory (it should be one of Ping, Info or Stats). Depending on whether kind and id are provided, ControlSubject will return one of the following:

  • verb only: subject used to monitor all available services
  • verb and kind: subject used to monitor services with the provided name
  • verb, name and id: subject used to monitor an instance of a service with the provided ID
Example
package main

import (
	"fmt"

	"github.com/nats-io/nats.go/micro"
)

func main() {

	// subject used to get PING from all services
	subjectPINGAll, _ := micro.ControlSubject(micro.PingVerb, "", "")
	fmt.Println(subjectPINGAll)

	// subject used to get PING from services with provided name
	subjectPINGName, _ := micro.ControlSubject(micro.PingVerb, "CoolService", "")
	fmt.Println(subjectPINGName)

	// subject used to get PING from a service with provided name and ID
	subjectPINGInstance, _ := micro.ControlSubject(micro.PingVerb, "CoolService", "123")
	fmt.Println(subjectPINGInstance)

}
Output:

$SRV.PING
$SRV.PING.CoolService
$SRV.PING.CoolService.123

Types

type Config

type Config struct {
	// Name represents the name of the service.
	Name string `json:"name"`

	// Endpoint is an optional endpoint configuration.
	// More complex, multi-endpoint services can be configured using
	// Service.AddGroup and Service.AddEndpoint methods.
	Endpoint *EndpointConfig `json:"endpoint"`

	// Version is a SemVer compatible version string.
	Version string `json:"version"`

	// Description of the service.
	Description string `json:"description"`

	// Metadata annotates the service
	Metadata map[string]string `json:"metadata,omitempty"`

	// StatsHandler is a user-defined custom function.
	// used to calculate additional service stats.
	StatsHandler StatsHandler

	// DoneHandler is invoked when all service subscription are stopped.
	DoneHandler DoneHandler

	// ErrorHandler is invoked on any nats-related service error.
	ErrorHandler ErrHandler
}

Config is a configuration of a service.

type DoneHandler

type DoneHandler func(Service)

DoneHandler is a function used to configure a custom done handler for a service.

type Endpoint

type Endpoint struct {
	EndpointConfig
	Name string
	// contains filtered or unexported fields
}

Endpoint manages a service endpoint.

type EndpointConfig

type EndpointConfig struct {
	// Subject on which the endpoint is registered.
	Subject string

	// Handler used by the endpoint.
	Handler Handler

	// Metadata annotates the service
	Metadata map[string]string `json:"metadata,omitempty"`
}

type EndpointInfo

type EndpointInfo struct {
	Name     string            `json:"name"`
	Subject  string            `json:"subject"`
	Metadata map[string]string `json:"metadata"`
}

type EndpointOpt

type EndpointOpt func(*endpointOpts) error

func WithEndpointMetadata

func WithEndpointMetadata(metadata map[string]string) EndpointOpt

func WithEndpointSubject

func WithEndpointSubject(subject string) EndpointOpt
Example
nc, err := nats.Connect("127.0.0.1:4222")
if err != nil {
	log.Fatal(err)
}
defer nc.Close()

echoHandler := func(req micro.Request) {
	req.Respond(req.Data())
}

config := micro.Config{
	Name:    "EchoService",
	Version: "1.0.0",
}

srv, err := micro.AddService(nc, config)
if err != nil {
	log.Fatal(err)
}

// endpoint will be registered under "service.echo" subject
err = srv.AddEndpoint("Echo", micro.HandlerFunc(echoHandler), micro.WithEndpointSubject("service.echo"))
if err != nil {
	log.Fatal(err)
}
Output:

type EndpointStats

type EndpointStats struct {
	Name                  string          `json:"name"`
	Subject               string          `json:"subject"`
	NumRequests           int             `json:"num_requests"`
	NumErrors             int             `json:"num_errors"`
	LastError             string          `json:"last_error"`
	ProcessingTime        time.Duration   `json:"processing_time"`
	AverageProcessingTime time.Duration   `json:"average_processing_time"`
	Data                  json.RawMessage `json:"data,omitempty"`
}

EndpointStats contains stats for a specific endpoint.

type ErrHandler

type ErrHandler func(Service, *NATSError)

ErrHandler is a function used to configure a custom error handler for a service,

type Group

type Group interface {
	// AddGroup creates a new group, prefixed by this group's prefix.
	AddGroup(string) Group

	// AddEndpoint registers new endpoints on a service.
	// The endpoint's subject will be prefixed with the group prefix.
	AddEndpoint(string, Handler, ...EndpointOpt) error
}

Group allows for grouping endpoints on a service.

Endpoints created using AddEndpoint will be grouped under common prefix (group name) New groups can also be derived from a group using AddGroup.

type Handler

type Handler interface {
	Handle(Request)
}

Handler is used to respond to service requests.

Example
package main

import (
	"log"
	"strconv"

	"github.com/nats-io/nats.go"
	"github.com/nats-io/nats.go/micro"
)

type rectangle struct {
	height int
	width  int
}

// Handle is an implementation of micro.Handler used to
// calculate the area of a rectangle
func (r rectangle) Handle(req micro.Request) {
	area := r.height * r.width
	req.Respond([]byte(strconv.Itoa(area)))
}

func main() {
	nc, err := nats.Connect("127.0.0.1:4222")
	if err != nil {
		log.Fatal(err)
	}
	defer nc.Close()

	rec := rectangle{10, 5}

	config := micro.Config{
		Name:    "RectangleAreaService",
		Version: "0.1.0",
		Endpoint: &micro.EndpointConfig{
			Subject: "area.rectangle",
			Handler: rec,
		},
	}
	svc, err := micro.AddService(nc, config)
	if err != nil {
		log.Fatal(err)
	}
	defer svc.Stop()
}
Output:

func ContextHandler

func ContextHandler(ctx context.Context, handler func(context.Context, Request)) Handler

ContextHandler is a helper function used to utilize context.Context in request handlers.

Example
nc, err := nats.Connect("127.0.0.1:4222")
if err != nil {
	log.Fatal(err)
}
defer nc.Close()

handler := func(ctx context.Context, req micro.Request) {
	select {
	case <-ctx.Done():
		req.Error("400", "context canceled", nil)
	default:
		req.Respond([]byte("ok"))
	}
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
config := micro.Config{
	Name:    "EchoService",
	Version: "0.1.0",
	Endpoint: &micro.EndpointConfig{
		Subject: "echo",
		Handler: micro.ContextHandler(ctx, handler),
	},
}

srv, _ := micro.AddService(nc, config)
defer srv.Stop()
Output:

type HandlerFunc

type HandlerFunc func(Request)

HandlerFunc is a function implementing Handler. It allows using a function as a request handler, without having to implement Handle on a separate type.

func (HandlerFunc) Handle

func (fn HandlerFunc) Handle(req Request)

type Headers

type Headers nats.Header

Headers is a wrapper around *nats.Header

func (Headers) Get

func (h Headers) Get(key string) string

Get gets the first value associated with the given key. It is case-sensitive.

func (Headers) Values

func (h Headers) Values(key string) []string

Values returns all values associated with the given key. It is case-sensitive.

type Info

type Info struct {
	ServiceIdentity
	Type        string         `json:"type"`
	Description string         `json:"description"`
	Endpoints   []EndpointInfo `json:"endpoints"`
}

Info is the basic information about a service type.

type NATSError

type NATSError struct {
	Subject     string
	Description string
}

NATSError represents an error returned by a NATS Subscription. It contains a subject on which the subscription failed, so that it can be linked with a specific service endpoint.

func (*NATSError) Error

func (e *NATSError) Error() string

type Ping

type Ping struct {
	ServiceIdentity
	Type string `json:"type"`
}

Ping is the response type for PING monitoring endpoint.

type Request

type Request interface {
	// Respond sends the response for the request.
	// Additional headers can be passed using [WithHeaders] option.
	Respond([]byte, ...RespondOpt) error

	// RespondJSON marshals the given response value and responds to the request.
	// Additional headers can be passed using [WithHeaders] option.
	RespondJSON(interface{}, ...RespondOpt) error

	// Error prepares and publishes error response from a handler.
	// A response error should be set containing an error code and description.
	// Optionally, data can be set as response payload.
	Error(code, description string, data []byte, opts ...RespondOpt) error

	// Data returns request data.
	Data() []byte

	// Headers returns request headers.
	Headers() Headers

	// Subject returns underlying NATS message subject.
	Subject() string
}

Request represents service request available in the service handler. It exposes methods to respond to the request, as well as getting the request data and headers.

type RespondOpt

type RespondOpt func(*nats.Msg)

RespondOpt is a function used to configure [Request.Respond] and [Request.RespondJSON] methods.

func WithHeaders

func WithHeaders(headers Headers) RespondOpt

WithHeaders can be used to configure response with custom headers.

type Service

type Service interface {
	// AddEndpoint registers endpoint with given name on a specific subject.
	AddEndpoint(string, Handler, ...EndpointOpt) error

	// AddGroup returns a Group interface, allowing for more complex endpoint topologies.
	// A group can be used to register endpoints with given prefix.
	AddGroup(string) Group

	// Info returns the service info.
	Info() Info

	// Stats returns statistics for the service endpoint and all monitoring endpoints.
	Stats() Stats

	// Reset resets all statistics (for all endpoints) on a service instance.
	Reset()

	// Stop drains the endpoint subscriptions and marks the service as stopped.
	Stop() error

	// Stopped informs whether [Stop] was executed on the service.
	Stopped() bool
}

Service exposes methods to operate on a service instance.

func AddService

func AddService(nc *nats.Conn, config Config) (Service, error)

AddService adds a microservice. It will enable internal common services (PING, STATS and INFO). Request handlers have to be registered separately using Service.AddEndpoint. A service name, version and Endpoint configuration are required to add a service. AddService returns a Service interface, allowing service management. Each service is assigned a unique ID.

Example
nc, err := nats.Connect("127.0.0.1:4222")
if err != nil {
	log.Fatal(err)
}
defer nc.Close()

echoHandler := func(req micro.Request) {
	req.Respond(req.Data())
}

config := micro.Config{
	Name:        "EchoService",
	Version:     "1.0.0",
	Description: "Send back what you receive",
	// DoneHandler can be set to customize behavior on stopping a service.
	DoneHandler: func(srv micro.Service) {
		info := srv.Info()
		fmt.Printf("stopped service %q with ID %q\n", info.Name, info.ID)
	},

	// ErrorHandler can be used to customize behavior on service execution error.
	ErrorHandler: func(srv micro.Service, err *micro.NATSError) {
		info := srv.Info()
		fmt.Printf("Service %q returned an error on subject %q: %s", info.Name, err.Subject, err.Description)
	},

	// optional base handler
	Endpoint: &micro.EndpointConfig{
		Subject: "echo",
		Handler: micro.HandlerFunc(echoHandler),
	},
}

srv, err := micro.AddService(nc, config)
if err != nil {
	log.Fatal(err)
}
defer srv.Stop()
Output:

type ServiceIdentity

type ServiceIdentity struct {
	Name     string            `json:"name"`
	ID       string            `json:"id"`
	Version  string            `json:"version"`
	Metadata map[string]string `json:"metadata"`
}

ServiceIdentity contains fields helping to identity a service instance.

type Stats

type Stats struct {
	ServiceIdentity
	Type      string           `json:"type"`
	Started   time.Time        `json:"started"`
	Endpoints []*EndpointStats `json:"endpoints"`
}

Stats is the type returned by STATS monitoring endpoint. It contains stats of all registered endpoints.

type StatsHandler

type StatsHandler func(*Endpoint) interface{}

StatsHandler is a function used to configure a custom STATS endpoint. It should return a value which can be serialized to JSON.

type Verb

type Verb int64

Verb represents a name of the monitoring service.

const (
	PingVerb Verb = iota
	StatsVerb
	InfoVerb
)

Verbs being used to set up a specific control subject.

func (Verb) String

func (s Verb) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL