natsrpc

package module
v0.18.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 16, 2024 License: Apache-2.0 Imports: 15 Imported by: 0

README

  _   _       _______ _____   _____  _____   _____ 
 | \ | |   /\|__   __/ ____| |  __ \|  __ \ / ____|
 |  \| |  /  \  | | | (___   | |__) | |__) | |     
 | . ` | / /\ \ | |  \___ \  |  _  /|  ___/| |     
 | |\  |/ ____ \| |  ____) | | | \ \| |    | |____ 
 |_| \_/_/    \_\_| |_____/  |_|  \_\_|     \_____|

What is NATSRPC

NATSRPC is based on NATS as a message communication, use gRPC way to define the RPC framework of the interface

English | 中文

GitHub release (with filter)

Motivation

NATS needs to manually define cumbersome and error-prone code such as subject, request, reply, handler, etc. to send and receive messages. gRPC needs to be connected to the server endpoint to send the request. The purpose of NATRPC is to define the interface like gRPC. Like NATS, it does not care about the specific network location. It only needs to listen and send to complete the RPC call.

Feature

  • Use the gRPC interface to define the method, which is simple to use and generates code with one click
  • Support spatial isolation, you can also specify the id to send
  • Multiple services can be load balanced (random in the same group of nats)
  • Support Header and return error
  • Support single coroutine and multi-coroutine handle
  • Support middleware
  • Support delayed reply to messages
  • Support custom encoder

How It Works

The upper layer pairs nats through Server, Service, and Client.Conn and Subscription are encapsulated. The underlying layer transmits messages through nats request and publish.A service will create a subscription with the service name as the subject, and if there is a publish method, a sub will be created to receive the publish. When the client sends a request, the subject will be the name of the service, and the header of nats msg will pass the method name. After the service receives the message, it takes out the method name, and then calls the corresponding handler. The result returned by the handler will be returned to the client through the reply subject of nats msg.

Install Tools

  1. protoc(v3.17.3)

  2. protoc-gen-gogo

    go install github.com/gogo/protobuf/protoc-gen-gogo@v1.3.2
    
  3. protoc-gen-natsrpc

    go install github.com/LeKovr/natsrpc/cmd/protoc-gen-natsrpc@v0.7.0
    

Quick Start

  1. Reference package

    go get github.com/LeKovr/natsrpc
    
  2. Define the service interface example.proto

    syntax = "proto3";
    
    package example;
    option go_package = "github.com/LeKovr/natsrpc/example;example";
    
    message HelloRequest {
      string name = 1;
    }
    
    message HelloReply {
      string message = 1;
    }
    
    service Greeter {
      rpc Hello (HelloRequest) returns (HelloReply) {}
    }
    
  3. Generate client and server code

    protoc --proto_path=. \
    --gogo_out=paths=source_relative:. \
    --natsrpc_out=paths=source_relative:. \
    *.proto
    
  4. Server implements the interface at the end and create a service

    type HelloSvc struct {
    }
    
    func (s *HelloSvc) Hello(ctx context.Context, req *example.HelloRequest) (*example.HelloReply, error) {
        return &example.HelloReply{
            Message: "hello " + req.Name,
        }, nil
    }
    
    func main() {
        conn, err := nats.Connect(*nats_url)
        defer conn.Close()
    
        server, err := natsrpc.NewServer(conn)
        defer server.Close(context.Background())
    
        svc, err := example.RegisterGreetingNRServer(server, &HelloSvc{})
        defer svc.Close()
    
        select{
        }
    }
    
    
  5. Client calls rpc

    client:=natsrpc.NewClient(conn)
    
    cli := example.NewGreeterNRClient(client)
    rsp,err:=cli.Hello(context.Background(), &example.HelloRequest{Name: "natsrpc"})
    

Examples

here

Bench Tool

  1. Request go run ./example/tool/request_bench -url=nats://127.0.0.1:4222
  2. Broadcast go run ./example/tool/publish_bench -url=nats://127.0.0.1:4222

TODO

  • The service definition file is changed to the gRPC standard
  • Support return error
  • Support Header
  • Generate Client interface
  • Support middleware
  • Default multithreading, support a single thread at the same time
  • Support goroutine pool
  • Support byte pool

Documentation

Index

Constants

View Source
const (
	SupportVersion_0_7_0 = true
)
View Source
const (
	Version = "v0.7.0"
)

Variables

View Source
var (
	ErrHeaderFormat     = errors.New("natsrpc: header format error")
	ErrDuplicateService = errors.New("natsrpc: duplicate service")
	ErrNoMethod         = errors.New("natsrpc: no method")
	ErrNoMeta           = errors.New("natsrpc: no meta data")
	ErrEmptyReply       = errors.New("natsrpc: reply is empty")

	// ErrReplyLater
	// It's not an error, when you want to reply message later, then return this.
	ErrReplyLater = errors.New("natsrpc: reply later")
)
View Source
var DefaultClientOptions = ClientOptions{
	// contains filtered or unexported fields
}

DefaultClientOptions 默认client选项

View Source
var DefaultServerOptions = ServerOptions{
	// contains filtered or unexported fields
}

DefaultServerOptions 默认server选项

View Source
var DefaultServiceOptions = ServiceOptions{
	// contains filtered or unexported fields
}

DefaultServiceOptions 默认service选项

View Source
var (
	// optional bool publish = 2360;
	E_Publish = &file_natsrpc_proto_extTypes[0] // true表示广播(不需要返回值), false表示request(需要返回值)
)

Extension fields to descriptorpb.MethodOptions.

View Source
var File_natsrpc_proto protoreflect.FileDescriptor

Functions

func CallHeader

func CallHeader(ctx context.Context) map[string]string

CallHeader 获得call Header

func MakeReplyFunc

func MakeReplyFunc[T any](ctx context.Context) (replay func(T, error) error)

MakeReplyFunc 构造一个延迟返回函数

func Reply

func Reply(ctx context.Context, rep interface{}, repErr error) error

Reply 用手动回复消息. 当用户要延迟返回结果时, 可以在当前handle函数 return nil, ErrReplyLater. 然后在其他地方调用Reply函数

例如:

func XXHandle(ctx context.Context, req *XXReq) (*XXRep, error) {
	go func() {
		time.Sleep(time.Second)
		Reply(ctx, &XXRep{}, nil)
	}
	return nil, ErrReplyLater
}

Types

type CallOption

type CallOption func(options *CallOptions)

CallOption call option

func WithCallHeader

func WithCallHeader(hd map[string]string) CallOption

WithCallHeader header

type CallOptions

type CallOptions struct {
	// contains filtered or unexported fields
}

CallOptions 调用选项

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client RPC client

func NewClient

func NewClient(conn *nats.Conn, opts ...ClientOption) *Client

NewClient 构造器

func (*Client) Publish

func (c *Client) Publish(ctx context.Context, service, method string, req interface{}, opt ...CallOption) error

Publish 发布

func (*Client) Request

func (c *Client) Request(ctx context.Context, service, method string, req interface{}, rep interface{}, opt ...CallOption) error

Request 请求

type ClientInterface

type ClientInterface interface {
	// Publish 发布
	Publish(ctx context.Context, service, method string, req interface{}, opt ...CallOption) error

	// Request 请求
	Request(ctx context.Context, service, method string, req interface{}, rep interface{}, opt ...CallOption) error
}

ClientInterface 客户端接口

type ClientOption

type ClientOption func(options *ClientOptions)

func WithClientEncoder

func WithClientEncoder(encoder Encoder) ClientOption

WithClientEncoder 编码

func WithClientID

func WithClientID(id string) ClientOption

WithClientID call id(不会覆盖clientOptions.id,只是用来标识这次调用)

func WithClientNamespace

func WithClientNamespace(namespace string) ClientOption

WithClientNamespace 空间集群

type ClientOptions

type ClientOptions struct {
	// contains filtered or unexported fields
}

ClientOptions client 选项

type Encoder

type Encoder interface {
	// Encode 编码
	Encode(v interface{}) ([]byte, error)

	// Decode 解码
	Decode(data []byte, vPtr interface{}) error
}

Encoder 编码器

type Handler

type Handler func(svc interface{}, ctx context.Context, req interface{}) (interface{}, error)

type Interceptor

type Interceptor func(ctx context.Context, method string, req interface{}, invoker Invoker) (interface{}, error)

type Invoker

type Invoker func(ctx context.Context, req interface{}) (interface{}, error)

type MethodDesc

type MethodDesc struct {
	MethodName  string       // 方法名
	Handler     Handler      // 方法处理函数
	IsPublish   bool         // 是否发布
	RequestType reflect.Type // 请求类型
}

MethodDesc 方法描述

func (MethodDesc) NewRequest

func (md MethodDesc) NewRequest() any

NewRequest 创建请求 后面优化成静态的

type Server

type Server struct {
	Encoder
	// contains filtered or unexported fields
}

Server RPC server

func NewServer

func NewServer(conn *nats.Conn, option ...ServerOption) (*Server, error)

NewServer 构造器

func (*Server) Close

func (s *Server) Close(ctx context.Context) (err error)

Close 关闭

func (*Server) Register

func (s *Server) Register(sd ServiceDesc, val interface{}, opts ...ServiceOption) (ServiceInterface, error)

Register 注册服务

func (*Server) Remove

func (s *Server) Remove(name string) bool

Remove 移除一个服务

func (*Server) UnSubscribeAll

func (s *Server) UnSubscribeAll() error

UnSubscribeAll 取消所有订阅

type ServerInterface

type ServerInterface interface {
	Encoder
	Remove(string) bool
}

type ServerOption

type ServerOption func(options *ServerOptions)

ServerOption server option

func WithErrorHandler

func WithErrorHandler(h func(interface{})) ServerOption

WithErrorHandler error handler

func WithServerEncoder

func WithServerEncoder(encoder Encoder) ServerOption

WithServerEncoder 编码

func WithServerRecovery

func WithServerRecovery(h func(interface{})) ServerOption

WithServerRecovery recover handler

type ServerOptions

type ServerOptions struct {
	// contains filtered or unexported fields
}

ServerOptions server 选项

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service 服务

func NewService

func NewService(server ServerInterface, sd ServiceDesc, i interface{}, options ServiceOptions) (*Service, error)

NewService 创建服务

func (*Service) Call

func (s *Service) Call(ctx context.Context, methodName string, b []byte, interceptor Interceptor) ([]byte, error)

func (*Service) Close

func (s *Service) Close() bool

Close 关闭 会取消所有订阅

func (*Service) Name

func (s *Service) Name() string

Name 名字

type ServiceDesc

type ServiceDesc struct {
	ServiceName string       // 服务名
	Methods     []MethodDesc // 方法列表
	Metadata    string       // 元数据
}

ServiceDesc 服务描述

type ServiceInterface

type ServiceInterface interface {
	// Name 名字
	Name() string

	// Close 关闭
	Close() bool
}

ServiceInterface 服务

type ServiceOption

type ServiceOption func(options *ServiceOptions)

ServiceOption Service option

func WithServiceID

func WithServiceID(id string) ServiceOption

WithServiceID id

func WithServiceInterceptor

func WithServiceInterceptor(i Interceptor) ServiceOption

WithServiceInterceptor handler 拦截器

func WithServiceNamespace

func WithServiceNamespace(namespace string) ServiceOption

WithServiceNamespace 空间集群

func WithServiceSingleGoroutine

func WithServiceSingleGoroutine() ServiceOption

WithServiceSingleGoroutine 单协程,不并发handle,给那种消息需要顺序处理的情况

func WithServiceTimeout

func WithServiceTimeout(timeout time.Duration) ServiceOption

WithServiceTimeout 超时时间

type ServiceOptions

type ServiceOptions struct {
	// contains filtered or unexported fields
}

ServiceOptions Service 选项

type ServiceRegistrar

type ServiceRegistrar interface {
	// Register 注册
	Register(sd ServiceDesc, svc any, opt ...ServiceOption) (ServiceInterface, error)
}

ServiceRegistrar 注册服务

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL