stormrpc

package module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 3, 2022 License: MIT Imports: 10 Imported by: 1

README

StormRPC ⚡

Build Status

StormRPC is an abstraction or wrapper on NATS Request/Reply messaging capabilities.

It provides some convenient features including:

  • Middleware

    Middleware are decorators around HandlerFuncs. Some middleware are available within the package including RequestID, Tracing (via OpenTelemetry) Logger and Recoverer.

  • Body encoding and decoding

    Marshalling and unmarshalling request bodies to structs. JSON, Protobuf, and Msgpack are supported out of the box.

  • Deadline propagation

    Request deadlines are propagated from client to server so both ends will stop processing once the deadline has passed.

  • Error propagation

    Responses have an Error attribute and these are propagated across the wire without needing to tweak your request/response schemas.

Installation

Runtime Library

The runtime library package github.com/actatum/stormrpc contains common types like stormrpc.Error, stormrpc.Client and stormrpc.Server. If you aren't generating servers and clients from protobuf definitions you only need to import the stormrpc package.

$ go get github.com/actatum/stormrpc
Code Generator

You need to install go and the protoc compiler on your system. Then, install the protoc plugins protoc-gen-stormrpc and protoc-gen-go to generate Go code.

$ go install github.com/actatum/stormrpc/protoc-gen-stormrpc@latest
$ go install google.golang.org/protobuf/cmd/protoc-gen-go@latest

Code generation examples can be found here

Basic Usage

Server
package main

import (
  "context"
  "log"
  "os"
  "os/signal"
  "syscall"
  "time"

  "github.com/actatum/stormrpc"
  "github.com/nats-io/nats.go"
)

func echo(ctx context.Context, req stormrpc.Request) stormrpc.Response {
  var b any
  if err := req.Decode(&b); err != nil {
    return stormrpc.NewErrorResponse(req.Reply, err)
  }

  resp, err := stormrpc.NewResponse(req.Reply, b)
  if err != nil {
    return stormrpc.NewErrorResponse(req.Reply, err)
  }

  return resp
}

func main() {
  srv, err := stormrpc.NewServer("echo", nats.DefaultURL)
  if err != nil {
    log.Fatal(err)
  }
  srv.Handle("echo", echo)

  go func() {
    _ = srv.Run()
  }()
  log.Printf("👋 Listening on %v", srv.Subjects())

  done := make(chan os.Signal, 1)
  signal.Notify(done, syscall.SIGINT, syscall.SIGTERM)
  <-done
  log.Printf("💀 Shutting down")
  ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  defer cancel()

  if err = srv.Shutdown(ctx); err != nil {
    log.Fatal(err)
  }
}
Client
package main

import (
  "context"
  "fmt"
  "log"
  "time"

  "github.com/actatum/stormrpc"
  "github.com/nats-io/nats.go"
)

func main() {
  client, err := stormrpc.NewClient(nats.DefaultURL)
  if err != nil {
    log.Fatal(err)
  }
  defer client.Close()

  ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  defer cancel()

  r, err := stormrpc.NewRequest("echo", map[string]string{"hello": "me"})
  if err != nil {
    log.Fatal(err)
  }

  resp := client.Do(ctx, r)
  if resp.Err != nil {
    log.Fatal(resp.Err)
  }

  fmt.Println(resp.Header)

  var result map[string]string
  if err = resp.Decode(&result); err != nil {
    log.Fatal(err)
  }

  fmt.Printf("Result: %v\n", result)
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func HeadersFromContext added in v0.2.0

func HeadersFromContext(ctx context.Context) nats.Header

HeadersFromContext retrieves RPC headers from the given context.

func MessageFromErr

func MessageFromErr(err error) string

MessageFromErr retrieves the message from a given error. If the error is not of type Error, "unknown error" is returned.

Types

type CallOption added in v0.2.0

type CallOption interface {
	// contains filtered or unexported methods
}

CallOption configures an RPC to perform actions before it starts or after the RPC has completed.

func WithHeaders added in v0.2.0

func WithHeaders(h map[string]string) CallOption

WithHeaders returns a CallOption that appends the given headers to the request.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client represents a stormRPC client. It contains all functionality for making RPC requests to stormRPC servers.

func NewClient

func NewClient(natsURL string, opts ...ClientOption) (*Client, error)

NewClient returns a new instance of a Client.

func (*Client) Close

func (c *Client) Close()

Close closes the underlying nats connection.

func (*Client) Do

func (c *Client) Do(ctx context.Context, r Request, opts ...CallOption) Response

Do completes a request to a stormRPC Server.

type ClientOption

type ClientOption interface {
	// contains filtered or unexported methods
}

ClientOption represents functional options for configuring a stormRPC Client.

type Error

type Error struct {
	Code    ErrorCode
	Message string
}

Error represents an RPC error.

func Errorf

func Errorf(code ErrorCode, format string, args ...any) *Error

Errorf constructs a new RPC Error.

func (Error) Error

func (e Error) Error() string

Error allows for the Error type to conform to the built-in error interface.

type ErrorCode

type ErrorCode int

ErrorCode represents an enum type for stormRPC error codes.

const (
	ErrorCodeUnknown          ErrorCode = 0
	ErrorCodeInternal         ErrorCode = 1
	ErrorCodeNotFound         ErrorCode = 2
	ErrorCodeInvalidArgument  ErrorCode = 3
	ErrorCodeUnimplemented    ErrorCode = 4
	ErrorCodeUnauthenticated  ErrorCode = 5
	ErrorCodePermissionDenied ErrorCode = 6
	ErrorCodeAlreadyExists    ErrorCode = 7
	ErrorCodeDeadlineExceeded ErrorCode = 8
)

RPC ErrorCodes.

func CodeFromErr

func CodeFromErr(err error) ErrorCode

CodeFromErr retrieves the ErrorCode from a given error. If the error is not of type Error, ErrorCodeUnknown is returned.

func (ErrorCode) String

func (c ErrorCode) String() string

type ErrorHandler

type ErrorHandler func(context.Context, error)

ErrorHandler is the function signature for handling server errors.

type HandlerFunc

type HandlerFunc func(ctx context.Context, r Request) Response

HandlerFunc is the function signature for handling of a single request to a stormRPC server.

type HeaderCallOption added in v0.2.0

type HeaderCallOption struct {
	Headers map[string]string
}

HeaderCallOption is used to configure which headers to append to the outgoing RPC.

type Middleware

type Middleware func(next HandlerFunc) HandlerFunc

Middleware is the function signature for wrapping HandlerFunc's to extend their functionality.

type Request

type Request struct {
	*nats.Msg
}

Request is stormRPC's wrapper around a nats.Msg and is used by both clients and servers.

func NewRequest

func NewRequest(subject string, body any, opts ...RequestOption) (Request, error)

NewRequest constructs a new request with the given parameters. It also handles encoding the request body.

func (*Request) Decode

func (r *Request) Decode(v any) error

Decode de-serializes the body into the passed in object. The de-serialization method is based on the request's Content-Type header.

func (*Request) Subject

func (r *Request) Subject() string

Subject returns the underlying nats.Msg subject.

type RequestOption

type RequestOption interface {
	// contains filtered or unexported methods
}

RequestOption represents functional options for configuring a request.

func WithEncodeMsgpack

func WithEncodeMsgpack() RequestOption

WithEncodeMsgpack is a RequestOption to encode the request body using the msgpack.Marshal method.

func WithEncodeProto

func WithEncodeProto() RequestOption

WithEncodeProto is a RequestOption to encode the request body using the proto.Marshal method.

type Response

type Response struct {
	*nats.Msg
	Err error
}

Response is stormRPC's wrapper around a nats.Msg and is used by both clients and servers.

func NewErrorResponse

func NewErrorResponse(reply string, err error) Response

NewErrorResponse constructs a new error response with the given parameters.

func NewResponse

func NewResponse(reply string, body any, opts ...ResponseOption) (Response, error)

NewResponse constructs a new response with the given parameters. It also handles encoding the response body.

func (*Response) Decode

func (r *Response) Decode(v any) error

Decode de-serializes the body into the passed in object. The de-serialization method is based on the response's Content-Type header.

type ResponseOption

type ResponseOption RequestOption

ResponseOption represents functional options for configuring a response.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server represents a stormRPC server. It contains all functionality for handling RPC requests.

func NewServer

func NewServer(name, natsURL string, opts ...ServerOption) (*Server, error)

NewServer returns a new instance of a Server.

func (*Server) Handle

func (s *Server) Handle(subject string, fn HandlerFunc)

Handle registers a new HandlerFunc on the server.

func (*Server) Run

func (s *Server) Run() error

Run listens on the configured subjects.

func (*Server) Shutdown

func (s *Server) Shutdown(ctx context.Context) error

Shutdown stops the server.

func (*Server) Subjects

func (s *Server) Subjects() []string

Subjects returns a list of all subjects with registered handler funcs.

func (*Server) Use

func (s *Server) Use(mw ...Middleware)

Use applies all given middleware globally across all handlers.

type ServerOption

type ServerOption interface {
	// contains filtered or unexported methods
}

ServerOption represents functional options for configuring a stormRPC Server.

func WithErrorHandler

func WithErrorHandler(fn ErrorHandler) ServerOption

WithErrorHandler is a ServerOption that allows for registering a function for handling server errors.

Directories

Path Synopsis
examples
internal
gen

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL