stormrpc

package module
v0.4.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 24, 2024 License: MIT Imports: 12 Imported by: 1

README

StormRPC ⚡

Go Report Card Build Status codecov Godoc Release

StormRPC is an abstraction or wrapper on NATS Request/Reply messaging capabilities.

It provides some convenient features including:

  • Middleware

    Middleware are decorators around HandlerFuncs. Some middleware are available within the package including RequestID, Tracing (via OpenTelemetry) Logger and Recoverer.

  • Body encoding and decoding

    Marshalling and unmarshalling request bodies to structs. JSON, Protobuf, and Msgpack are supported out of the box.

  • Deadline propagation

    Request deadlines are propagated from client to server so both ends will stop processing once the deadline has passed.

  • Error propagation

    Responses have an Error attribute and these are propagated across the wire without needing to tweak your request/response schemas.

Installation

Runtime Library

The runtime library package github.com/actatum/stormrpc contains common types like stormrpc.Error, stormrpc.Client and stormrpc.Server. If you aren't generating servers and clients from protobuf definitions you only need to import the stormrpc package.

go get github.com/actatum/stormrpc
Code Generator

You need to install go and the protoc compiler on your system. Then, install the protoc plugins protoc-gen-stormrpc and protoc-gen-go to generate Go code.

go install github.com/actatum/stormrpc/cmd/protoc-gen-stormrpc@latest
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
Using Buf

You'll need to initialize a buf mod file alongside your protobuf definitions and a buf gen file in the root of your project.

├── pb
│   ├── v1
│       ├── buf.yaml
│       ├── service.proto
├── main.go
├── buf.gen.yaml
├── go.mod
├── go.sum
└── .gitignore

buf.yaml

version: v1
breaking:
  use:
    - FILE
lint:
  use:
    - DEFAULT

buf.gen.yaml

version: v1
plugins:
  - plugin: go
    out: ./
    opt: paths=source_relative
  - plugin: stormrpc
    out: ./
    opt: paths=source_relative

To generate client and server stubs using buf, run the following command

buf generate
Using Protoc

To generate client and server stubs using protoc, run the following command

protoc --go_out=$output_dir --stormrpc_out=$output_dir $input_proto_file
Examples

Code generation examples can be found here

Basic Usage

Server
package main

import (
	"context"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/actatum/stormrpc"
	"github.com/nats-io/nats-server/v2/server"
)

func echo(ctx context.Context, req stormrpc.Request) stormrpc.Response {
	var b any
	if err := req.Decode(&b); err != nil {
		return stormrpc.NewErrorResponse(req.Reply, err)
	}

	resp, err := stormrpc.NewResponse(req.Reply, b)
	if err != nil {
		return stormrpc.NewErrorResponse(req.Reply, err)
	}

	return resp
}

func main() {
	ns, err := server.NewServer(&server.Options{
		Port: 40897,
	})
	if err != nil {
		log.Fatal(err)
	}
	ns.Start()
	defer func() {
		ns.Shutdown()
		ns.WaitForShutdown()
	}()

	if !ns.ReadyForConnections(1 * time.Second) {
		log.Fatal("timeout waiting for nats server")
	}

	srv, err := stormrpc.NewServer(&stormrpc.ServerConfig{
		NatsURL: ns.ClientURL(),
		Name:    "echo",
	})
	if err != nil {
		log.Fatal(err)
	}

	srv.Handle("echo", echo)

	go func() {
		_ = srv.Run()
	}()
	log.Printf("👋 Listening on %v", srv.Subjects())

	done := make(chan os.Signal, 1)
	signal.Notify(done, syscall.SIGINT, syscall.SIGTERM)
	<-done
	log.Printf("💀 Shutting down")
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	if err = srv.Shutdown(ctx); err != nil {
		log.Fatal(err)
	}
}
Client
package main

import (
  "context"
  "fmt"
  "log"
  "time"

  "github.com/actatum/stormrpc"
  "github.com/nats-io/nats.go"
)

func main() {
  client, err := stormrpc.NewClient(nats.DefaultURL)
  if err != nil {
    log.Fatal(err)
  }
  defer client.Close()

  ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  defer cancel()

  r, err := stormrpc.NewRequest("echo", map[string]string{"hello": "me"})
  if err != nil {
    log.Fatal(err)
  }

  resp := client.Do(ctx, r)
  if resp.Err != nil {
    log.Fatal(resp.Err)
  }

  fmt.Println(resp.Header)

  var result map[string]string
  if err = resp.Decode(&result); err != nil {
    log.Fatal(err)
  }

  fmt.Printf("Result: %v\n", result)
}

Documentation

Overview

Package stormrpc provides the functionality for creating RPC servers/clients that communicate via NATS.

Package stormrpc provides the functionality for creating RPC servers/clients that communicate via NATS.

Package stormrpc provides the functionality for creating RPC servers/clients that communicate via NATS.

Package stormrpc provides the functionality for creating RPC servers/clients that communicate via NATS.

Package stormrpc provides the functionality for creating RPC servers/clients that communicate via NATS.

Package stormrpc provides the functionality for creating RPC servers/clients that communicate via NATS.

Package stormrpc provides the functionality for creating RPC servers/clients that communicate via NATS.

Package stormrpc provides the functionality for creating RPC servers/clients that communicate via NATS.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func HeadersFromContext added in v0.2.0

func HeadersFromContext(ctx context.Context) nats.Header

HeadersFromContext retrieves RPC headers from the given context.

func MessageFromErr

func MessageFromErr(err error) string

MessageFromErr retrieves the message from a given error. If the error is not of type Error, "unknown error" is returned.

Types

type CallOption added in v0.2.0

type CallOption interface {
	// contains filtered or unexported methods
}

CallOption configures an RPC to perform actions before it starts or after the RPC has completed.

func WithHeaders added in v0.2.0

func WithHeaders(h map[string]string) CallOption

WithHeaders returns a CallOption that appends the given headers to the request.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client represents a stormRPC client. It contains all functionality for making RPC requests to stormRPC servers.

func NewClient

func NewClient(natsURL string, opts ...ClientOption) (*Client, error)

NewClient returns a new instance of a Client.

func (*Client) Close

func (c *Client) Close()

Close closes the underlying nats connection.

func (*Client) Do

func (c *Client) Do(ctx context.Context, r Request, opts ...CallOption) Response

Do completes a request to a stormRPC Server.

type ClientOption

type ClientOption interface {
	// contains filtered or unexported methods
}

ClientOption represents functional options for configuring a stormRPC Client.

type Error

type Error struct {
	Message string
	Code    ErrorCode
}

Error represents an RPC error.

func Errorf

func Errorf(code ErrorCode, format string, args ...any) *Error

Errorf constructs a new RPC Error.

func (Error) Error

func (e Error) Error() string

Error allows for the Error type to conform to the built-in error interface.

type ErrorCode

type ErrorCode int

ErrorCode represents an enum type for stormRPC error codes.

const (
	ErrorCodeUnknown          ErrorCode = 0
	ErrorCodeInternal         ErrorCode = 1
	ErrorCodeNotFound         ErrorCode = 2
	ErrorCodeInvalidArgument  ErrorCode = 3
	ErrorCodeUnimplemented    ErrorCode = 4
	ErrorCodeUnauthenticated  ErrorCode = 5
	ErrorCodePermissionDenied ErrorCode = 6
	ErrorCodeAlreadyExists    ErrorCode = 7
	ErrorCodeDeadlineExceeded ErrorCode = 8
)

RPC ErrorCodes.

func CodeFromErr

func CodeFromErr(err error) ErrorCode

CodeFromErr retrieves the ErrorCode from a given error. If the error is not of type Error, ErrorCodeUnknown is returned.

func (ErrorCode) String

func (c ErrorCode) String() string

type ErrorHandler

type ErrorHandler func(context.Context, error)

ErrorHandler is the function signature for handling server errors.

type HandlerFunc

type HandlerFunc func(ctx context.Context, r Request) Response

HandlerFunc is the function signature for handling of a single request to a stormRPC server.

type HeaderCallOption added in v0.2.0

type HeaderCallOption struct {
	Headers map[string]string
}

HeaderCallOption is used to configure which headers to append to the outgoing RPC.

type Middleware

type Middleware func(next HandlerFunc) HandlerFunc

Middleware is the function signature for wrapping HandlerFunc's to extend their functionality.

type Option added in v0.4.1

type Option interface {
	ClientOption
	ServerOption
}

Option represents functional options used to configure stormRPC clients and servers.

func WithNatsConn added in v0.4.1

func WithNatsConn(nc *nats.Conn) Option

WithNatsConn is an Option that allows for using an existing nats client connection.

type Request

type Request struct {
	*nats.Msg
}

Request is stormRPC's wrapper around a nats.Msg and is used by both clients and servers.

func NewRequest

func NewRequest(subject string, body any, opts ...RequestOption) (Request, error)

NewRequest constructs a new request with the given parameters. It also handles encoding the request body.

func (*Request) Decode

func (r *Request) Decode(v any) error

Decode de-serializes the body into the passed in object. The de-serialization method is based on the request's Content-Type header.

func (*Request) Subject

func (r *Request) Subject() string

Subject returns the underlying nats.Msg subject.

type RequestOption

type RequestOption interface {
	// contains filtered or unexported methods
}

RequestOption represents functional options for configuring a request.

func WithEncodeMsgpack

func WithEncodeMsgpack() RequestOption

WithEncodeMsgpack is a RequestOption to encode the request body using the msgpack.Marshal method.

func WithEncodeProto

func WithEncodeProto() RequestOption

WithEncodeProto is a RequestOption to encode the request body using the proto.Marshal method.

type Response

type Response struct {
	*nats.Msg
	Err error
}

Response is stormRPC's wrapper around a nats.Msg and is used by both clients and servers.

func NewErrorResponse

func NewErrorResponse(reply string, err error) Response

NewErrorResponse constructs a new error response with the given parameters.

func NewResponse

func NewResponse(reply string, body any, opts ...ResponseOption) (Response, error)

NewResponse constructs a new response with the given parameters. It also handles encoding the response body.

func (*Response) Decode

func (r *Response) Decode(v any) error

Decode de-serializes the body into the passed in object. The de-serialization method is based on the response's Content-Type header.

type ResponseOption

type ResponseOption RequestOption

ResponseOption represents functional options for configuring a response.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server represents a stormRPC server. It contains all functionality for handling RPC requests.

func NewServer

func NewServer(cfg *ServerConfig, opts ...ServerOption) (*Server, error)

NewServer returns a new instance of a Server.

func (*Server) Handle

func (s *Server) Handle(subject string, fn HandlerFunc)

Handle registers a new HandlerFunc on the server.

func (*Server) Run

func (s *Server) Run() error

Run listens on the configured subjects.

func (*Server) Shutdown

func (s *Server) Shutdown(ctx context.Context) error

Shutdown stops the server.

func (*Server) Subjects

func (s *Server) Subjects() []string

Subjects returns a list of all subjects with registered handler funcs.

func (*Server) Use

func (s *Server) Use(mw ...Middleware)

Use applies all given middleware globally across all handlers.

type ServerConfig added in v0.4.0

type ServerConfig struct {
	NatsURL string
	Name    string
	Version string
	// contains filtered or unexported fields
}

ServerConfig is used to configure required fields for a StormRPC server. If any fields aren't present a default value will be used.

type ServerOption

type ServerOption interface {
	// contains filtered or unexported methods
}

ServerOption represents functional options for configuring a stormRPC Server.

func WithErrorHandler

func WithErrorHandler(fn ErrorHandler) ServerOption

WithErrorHandler is a ServerOption that allows for registering a function for handling server errors.

Directories

Path Synopsis
cmd
protoc-gen-stormrpc
Package main provides the executable function for the protoc-gen-stormrpc binary.
Package main provides the executable function for the protoc-gen-stormrpc binary.
examples
internal
gen
Package gen includes the internals for generating code based on protobuf definitions.
Package gen includes the internals for generating code based on protobuf definitions.
Package middleware provides some useful and commonly implemented middleware functions for StormRPC servers.
Package middleware provides some useful and commonly implemented middleware functions for StormRPC servers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL