nrpc

package module
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 3, 2020 License: Apache-2.0 Imports: 18 Imported by: 0

README

nRPC

Build Status

nRPC is an RPC framework like gRPC, but for NATS.

It can generate a Go client and server from the same .proto file that you'd use to generate gRPC clients and servers. The server is generated as a NATS MsgHandler.

Why NATS?

Doing RPC over NATS' request-response model has some advantages over a gRPC model:

  • Minimal service discovery: The clients and servers only need to know the endpoints of a NATS cluster. The clients do not need to discover the endpoints of individual services they depend on.
  • Load balancing without load balancers: Stateless microservices can be hosted redundantly and connected to the same NATS cluster. The incoming requests can then be random-routed among these using NATS queueing. There is no need to setup a (high availability) load balancer per microservice.

The lunch is not always free, however. At scale, the NATS cluster itself can become a bottleneck. Features of gRPC like streaming and advanced auth are not available.

Still, NATS - and nRPC - offer much lower operational complexity if your scale and requirements fit.

At RapidLoop, we use this model for our OpsDash SaaS product in production and are quite happy with it. nRPC is the third iteration of an internal library.

Overview

nRPC comes with a protobuf compiler plugin protoc-gen-nrpc, which generates Go code from a .proto file.

Given a .proto file like helloworld.proto, the usage is like this:

$ ls
helloworld.proto
$ protoc --go_out=. --nrpc_out=. helloworld.proto
$ ls
helloworld.nrpc.go	helloworld.pb.go	helloworld.proto

The .pb.go file, which contains the definitions for the message classes, is generated by the standard Go plugin for protoc. The .nrpc.go file, which contains the definitions for a client, a server interface, and a NATS handler is generated by the nRPC plugin.

Have a look at the generated and example files:

How It Works

The .proto file defines messages (like HelloRequest and HelloReply in the example) and services (Greeter) that have methods (SayHello).

The messages are generated as Go structs by the regular Go protobuf compiler plugin and gets written out to *.pb.go files.

For the rest, nRPC generates three logical pieces.

The first is a Go interface type (GreeterServer) which your actual microservice code should implement:

// This is what is contained in the .proto file
service Greeter {
    rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// This is the generated interface which you've to implement
type GreeterServer interface {
    SayHello(ctx context.Context, req HelloRequest) (resp HelloReply, err error)
}

The second is a client (GreeterClient struct). This struct has methods with appropriate types, that correspond to the service definition. The client code will marshal and wrap the request object (HelloRequest) and do a NATS Request.

// The client is associated with a NATS connection.
func NewGreeterClient(nc *nats.Conn) *GreeterClient {...}

// And has properly typed methods that will marshal and perform a NATS request.
func (c *GreeterClient) SayHello(req HelloRequest) (resp HelloReply, err error) {...}

The third and final piece is the handler (GreeterHandler). Given a NATS connection and a server implementation, it can accept NATS requests in the format sent by the client above. It should be installed as a message handler for a particular NATS subject (defaults to the name of the service) using the NATS Subscribe() or QueueSubscribe() methods. It will invoke the appropriate method of the GreeterServer interface upon receiving the appropriate request.

// A handler is associated with a NATS connection and a server implementation.
func NewGreeterHandler(ctx context.Context, nc *nats.Conn, s GreeterServer) *GreeterHandler {...}

// It has a method that can (should) be used as a NATS message handler.
func (h *GreeterHandler) Handler(msg *nats.Msg) {...}

Standing up a microservice involves:

  • writing the .proto service definition file
  • generating the *.pb.go and *.nrpc.go files
  • implementing the server interface
  • writing a main app that will connect to NATS and start the handler (see example)

To call the service:

  • import the package that contains the generated *.nrpc.go files
  • in the client code, connect to NATS
  • create a Caller object and call the methods as necessary (see example)

Features

The following wiki pages describe nRPC features in more detail:

Installation

nRPC needs Go 1.7 or higher. $GOPATH/bin needs to be in $PATH for the protoc invocation to work. To generate code, you need the protobuf compiler (which you can install from here) and the nRPC protoc plugin.

To install the nRPC protoc plugin:

$ go get github.com/nats-rpc/nrpc/protoc-gen-nrpc

To build and run the example greeter_server:

$ go get github.com/nats-rpc/nrpc/examples/helloworld/greeter_server
$ greeter_server
server is running, ^C quits.

To build and run the example greeter_client:

$ go get github.com/nats-rpc/nrpc/examples/helloworld/greeter_client
$ greeter_client
Greeting: Hello world
$

Documentation

To learn more about describing gRPC services using .proto files, see here. To learn more about NATS, start with their website. To learn more about nRPC, um, read the source code.

Status

nRPC is in alpha. This means that it will work, but APIs may change without notice.

Currently there is support only for Go clients and servers.

Built by RapidLoop. Released under Apache 2.0 license.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	SubjectRule_name = map[int32]string{
		0: "COPY",
		1: "TOLOWER",
	}
	SubjectRule_value = map[string]int32{
		"COPY":    0,
		"TOLOWER": 1,
	}
)

Enum value maps for SubjectRule.

View Source
var (
	Error_Type_name = map[int32]string{
		0: "CLIENT",
		1: "SERVER",
		3: "EOS",
		4: "SERVERTOOBUSY",
	}
	Error_Type_value = map[string]int32{
		"CLIENT":        0,
		"SERVER":        1,
		"EOS":           3,
		"SERVERTOOBUSY": 4,
	}
)

Enum value maps for Error_Type.

View Source
var (
	// A custom subject prefix to use instead of the package name
	//
	// optional string packageSubject = 50000;
	E_PackageSubject = &file_nrpc_proto_extTypes[0]
	// Parameters included in the subject at the package level
	//
	// repeated string packageSubjectParams = 50001;
	E_PackageSubjectParams = &file_nrpc_proto_extTypes[1]
	// Default rule to build a service subject from the service name
	//
	// optional nrpc.SubjectRule serviceSubjectRule = 50002;
	E_ServiceSubjectRule = &file_nrpc_proto_extTypes[2]
	// Default rule to build a method subject from its name
	//
	// optional nrpc.SubjectRule methodSubjectRule = 50003;
	E_MethodSubjectRule = &file_nrpc_proto_extTypes[3]
)

Extension fields to descriptor.FileOptions.

View Source
var (
	// A custom subject token to use instead of (service name + serviceSubjectRule)
	//
	// optional string serviceSubject = 51000;
	E_ServiceSubject = &file_nrpc_proto_extTypes[4]
	// Parameters included in the subject at the service level
	//
	// repeated string serviceSubjectParams = 51001;
	E_ServiceSubjectParams = &file_nrpc_proto_extTypes[5]
)

Extension fields to descriptor.ServiceOptions.

View Source
var (
	// A custom subject to use instead of (methor name + methodSubjectRule)
	//
	// optional string methodSubject = 52000;
	E_MethodSubject = &file_nrpc_proto_extTypes[6]
	// Parameters included in the subject at the method level
	//
	// repeated string methodSubjectParams = 52001;
	E_MethodSubjectParams = &file_nrpc_proto_extTypes[7]
	// If true, the method returns a stream of reply messages instead of just one
	//
	// optional bool streamedReply = 52002;
	E_StreamedReply = &file_nrpc_proto_extTypes[8]
)

Extension fields to descriptor.MethodOptions.

View Source
var ErrCanceled = errors.New("Call canceled")
View Source
var ErrEOS = errors.New("End of stream")
View Source
var ErrStreamInvalidMsgCount = errors.New("Stream reply received an incorrect number of messages")

ErrStreamInvalidMsgCount is when a stream reply gets a wrong number of messages

Functions

func Call

func Call(req proto.Message, rep proto.Message, nc NatsConn, subject string, encoding string, timeout time.Duration) error

func Marshal

func Marshal(encoding string, msg proto.Message) ([]byte, error)

func MarshalErrorResponse

func MarshalErrorResponse(encoding string, repErr *Error) ([]byte, error)

func ParseSubject

func ParseSubject(
	packageSubject string, packageParamsCount int,
	serviceSubject string, serviceParamsCount int,
	subject string,
) (packageParams []string, serviceParams []string,
	name string, tail []string, err error,
)

func ParseSubjectTail

func ParseSubjectTail(
	methodParamsCount int,
	tail []string,
) (
	methodParams []string, encoding string, err error,
)

func Publish

func Publish(resp proto.Message, withError *Error, nc NatsConn, subject string, encoding string) error

func Unmarshal

func Unmarshal(encoding string, data []byte, msg proto.Message) error

func UnmarshalResponse

func UnmarshalResponse(encoding string, data []byte, msg proto.Message) error

Types

type ContextKey

type ContextKey int

ContextKey type for storing values into context.Context

const (
	// RequestContextKey is the key for string the request into the context
	RequestContextKey ContextKey = iota
)

type Error

type Error struct {
	Type     Error_Type `protobuf:"varint,1,opt,name=type,proto3,enum=nrpc.Error_Type" json:"type,omitempty"`
	Message  string     `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"`
	MsgCount uint32     `protobuf:"varint,3,opt,name=msgCount,proto3" json:"msgCount,omitempty"`
	// contains filtered or unexported fields
}

func CaptureErrors

func CaptureErrors(fn func() (proto.Message, error)) (msg proto.Message, replyError *Error)

CaptureErrors runs a handler and convert error and panics into proper Error

func (*Error) Descriptor deprecated

func (*Error) Descriptor() ([]byte, []int)

Deprecated: Use Error.ProtoReflect.Descriptor instead.

func (*Error) Error

func (e *Error) Error() string

func (*Error) GetMessage

func (x *Error) GetMessage() string

func (*Error) GetMsgCount

func (x *Error) GetMsgCount() uint32

func (*Error) GetType

func (x *Error) GetType() Error_Type

func (*Error) ProtoMessage

func (*Error) ProtoMessage()

func (*Error) ProtoReflect

func (x *Error) ProtoReflect() protoreflect.Message

func (*Error) Reset

func (x *Error) Reset()

func (*Error) String

func (x *Error) String() string

type Error_Type

type Error_Type int32
const (
	Error_CLIENT        Error_Type = 0
	Error_SERVER        Error_Type = 1
	Error_EOS           Error_Type = 3
	Error_SERVERTOOBUSY Error_Type = 4
)

func (Error_Type) Descriptor

func (Error_Type) Descriptor() protoreflect.EnumDescriptor

func (Error_Type) Enum

func (x Error_Type) Enum() *Error_Type

func (Error_Type) EnumDescriptor deprecated

func (Error_Type) EnumDescriptor() ([]byte, []int)

Deprecated: Use Error_Type.Descriptor instead.

func (Error_Type) Number

func (x Error_Type) Number() protoreflect.EnumNumber

func (Error_Type) String

func (x Error_Type) String() string

func (Error_Type) Type

type HeartBeat

type HeartBeat struct {
	Lastbeat bool `protobuf:"varint,1,opt,name=lastbeat,proto3" json:"lastbeat,omitempty"`
	// contains filtered or unexported fields
}

func (*HeartBeat) Descriptor deprecated

func (*HeartBeat) Descriptor() ([]byte, []int)

Deprecated: Use HeartBeat.ProtoReflect.Descriptor instead.

func (*HeartBeat) GetLastbeat

func (x *HeartBeat) GetLastbeat() bool

func (*HeartBeat) ProtoMessage

func (*HeartBeat) ProtoMessage()

func (*HeartBeat) ProtoReflect

func (x *HeartBeat) ProtoReflect() protoreflect.Message

func (*HeartBeat) Reset

func (x *HeartBeat) Reset()

func (*HeartBeat) String

func (x *HeartBeat) String() string

type KeepStreamAlive

type KeepStreamAlive struct {
	// contains filtered or unexported fields
}

func NewKeepStreamAlive

func NewKeepStreamAlive(nc NatsConn, subject string, encoding string, onError func()) *KeepStreamAlive

func (*KeepStreamAlive) Stop

func (k *KeepStreamAlive) Stop()

type NatsConn

type NatsConn interface {
	Publish(subj string, data []byte) error
	PublishRequest(subj, reply string, data []byte) error
	Request(subj string, data []byte, timeout time.Duration) (*nats.Msg, error)

	Subscribe(subj string, handler nats.MsgHandler) (*nats.Subscription, error)
	ChanSubscribe(subj string, ch chan *nats.Msg) (*nats.Subscription, error)
	SubscribeSync(subj string) (*nats.Subscription, error)

	QueueSubscribe(subj, queue string, handler nats.MsgHandler) (*nats.Subscription, error)
	ChanQueueSubscribe(subj, queue string, ch chan *nats.Msg) (*nats.Subscription, error)
	QueueSubscribeSync(subj, queue string) (*nats.Subscription, error)
}

type NoReply

type NoReply struct {
	// contains filtered or unexported fields
}

func (*NoReply) Descriptor deprecated

func (*NoReply) Descriptor() ([]byte, []int)

Deprecated: Use NoReply.ProtoReflect.Descriptor instead.

func (*NoReply) ProtoMessage

func (*NoReply) ProtoMessage()

func (*NoReply) ProtoReflect

func (x *NoReply) ProtoReflect() protoreflect.Message

func (*NoReply) Reset

func (x *NoReply) Reset()

func (*NoReply) String

func (x *NoReply) String() string

type NoRequest

type NoRequest struct {
	// contains filtered or unexported fields
}

func (*NoRequest) Descriptor deprecated

func (*NoRequest) Descriptor() ([]byte, []int)

Deprecated: Use NoRequest.ProtoReflect.Descriptor instead.

func (*NoRequest) ProtoMessage

func (*NoRequest) ProtoMessage()

func (*NoRequest) ProtoReflect

func (x *NoRequest) ProtoReflect() protoreflect.Message

func (*NoRequest) Reset

func (x *NoRequest) Reset()

func (*NoRequest) String

func (x *NoRequest) String() string

type ReplyInboxMaker

type ReplyInboxMaker func(NatsConn) string

ReplyInboxMaker returns a new inbox subject for a given nats connection.

var GetReplyInbox ReplyInboxMaker = func(NatsConn) string {
	return nats.NewInbox()
}

GetReplyInbox is used by StreamCall to get a inbox subject It can be changed by a client lib that needs custom inbox subjects

type Request

type Request struct {
	Context context.Context
	Conn    NatsConn

	KeepStreamAlive *KeepStreamAlive
	StreamContext   context.Context
	StreamCancel    func()
	StreamMsgCount  uint32

	Subject     string
	MethodName  string
	SubjectTail []string

	CreatedAt time.Time
	StartedAt time.Time

	Encoding     string
	NoReply      bool
	ReplySubject string

	PackageParams map[string]string
	ServiceParams map[string]string

	AfterReply func(r *Request, success bool, replySuccess bool)

	Handler func(context.Context) (proto.Message, error)
	// contains filtered or unexported fields
}

Request is a server-side incoming request

func GetRequest

func GetRequest(ctx context.Context) *Request

GetRequest returns the Request associated with a context, or nil if absent

func NewRequest

func NewRequest(ctx context.Context, conn NatsConn, subject string, replySubject string) *Request

NewRequest creates a Request instance

func (*Request) Elapsed

func (r *Request) Elapsed() time.Duration

Elapsed duration since request was started

func (*Request) EnableStreamedReply

func (r *Request) EnableStreamedReply()

EnableStreamedReply enables the streamed reply mode

func (*Request) PackageParam

func (r *Request) PackageParam(key string) string

PackageParam returns a package parameter value, or "" if absent

func (*Request) Run

func (r *Request) Run() (msg proto.Message, replyError *Error)

Run the handler and capture any error. Returns the response or the error that should be returned to the caller

func (*Request) RunAndReply

func (r *Request) RunAndReply()

RunAndReply calls Run() and send the reply back to the caller

func (*Request) SendErrorTooBusy

func (r *Request) SendErrorTooBusy(msg string) error

SendErrorTooBusy cancels the request with a 'SERVERTOOBUSY' error

func (*Request) SendReply

func (r *Request) SendReply(resp proto.Message, withError *Error) error

SendReply sends a reply to the caller

func (*Request) SendStreamReply

func (r *Request) SendStreamReply(msg proto.Message)

SendStreamReply send a reply a part of a stream

func (*Request) ServiceParam

func (r *Request) ServiceParam(key string) string

ServiceParam returns a package parameter value, or "" if absent

func (*Request) SetPackageParam

func (r *Request) SetPackageParam(key, value string)

SetPackageParam sets a package param value

func (*Request) SetServiceParam

func (r *Request) SetServiceParam(key, value string)

SetServiceParam sets a service param value

func (*Request) StreamedReply

func (r *Request) StreamedReply() bool

StreamedReply returns true if the request reply is streamed

type StreamCallSubscription

type StreamCallSubscription struct {
	// contains filtered or unexported fields
}

func NewStreamCallSubscription

func NewStreamCallSubscription(
	ctx context.Context, nc NatsConn, encoding string, subject string,
	timeout time.Duration,
) (*StreamCallSubscription, error)

func StreamCall

func StreamCall(ctx context.Context, nc NatsConn, subject string, req proto.Message, encoding string, timeout time.Duration) (*StreamCallSubscription, error)

func (*StreamCallSubscription) Next

func (sub *StreamCallSubscription) Next(rep proto.Message) error

type SubjectRule

type SubjectRule int32
const (
	SubjectRule_COPY    SubjectRule = 0
	SubjectRule_TOLOWER SubjectRule = 1
)

func (SubjectRule) Descriptor

func (SubjectRule) Enum

func (x SubjectRule) Enum() *SubjectRule

func (SubjectRule) EnumDescriptor deprecated

func (SubjectRule) EnumDescriptor() ([]byte, []int)

Deprecated: Use SubjectRule.Descriptor instead.

func (SubjectRule) Number

func (x SubjectRule) Number() protoreflect.EnumNumber

func (SubjectRule) String

func (x SubjectRule) String() string

func (SubjectRule) Type

type Void

type Void struct {
	// contains filtered or unexported fields
}

func (*Void) Descriptor deprecated

func (*Void) Descriptor() ([]byte, []int)

Deprecated: Use Void.ProtoReflect.Descriptor instead.

func (*Void) ProtoMessage

func (*Void) ProtoMessage()

func (*Void) ProtoReflect

func (x *Void) ProtoReflect() protoreflect.Message

func (*Void) Reset

func (x *Void) Reset()

func (*Void) String

func (x *Void) String() string

type WorkerPool

type WorkerPool struct {
	Context context.Context
	// contains filtered or unexported fields
}

WorkerPool is a pool of workers

func NewWorkerPool

func NewWorkerPool(
	ctx context.Context,
	size uint,
	maxPending uint,
	maxPendingDuration time.Duration,
) *WorkerPool

NewWorkerPool creates a pool of workers

func (*WorkerPool) Close

func (pool *WorkerPool) Close(timeout time.Duration)

Close stops all the workers and wait for their completion If the workers do not stop before the timeout, their context is canceled Will never return if a request ignores the context

func (*WorkerPool) QueueRequest

func (pool *WorkerPool) QueueRequest(request *Request) error

QueueRequest adds a request to the queue Send a SERVERTOOBUSY error to the client if the queue is full

func (*WorkerPool) SetMaxPending

func (pool *WorkerPool) SetMaxPending(value uint)

SetMaxPending changes the queue size

func (*WorkerPool) SetMaxPendingDuration

func (pool *WorkerPool) SetMaxPendingDuration(value time.Duration)

SetMaxPendingDuration changes the max pending delay

func (*WorkerPool) SetSize

func (pool *WorkerPool) SetSize(size uint)

SetSize changes the number of workers

Directories

Path Synopsis
examples
alloptions
This code was autogenerated from alloptions.proto, do not edit.
This code was autogenerated from alloptions.proto, do not edit.
helloworld/helloworld
This code was autogenerated from helloworld.proto, do not edit.
This code was autogenerated from helloworld.proto, do not edit.
metrics_helloworld/helloworld
This code was autogenerated from helloworld.proto, do not edit.
This code was autogenerated from helloworld.proto, do not edit.
nooption
This code was autogenerated from nooption.proto, do not edit.
This code was autogenerated from nooption.proto, do not edit.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL