psrpc

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 22, 2022 License: Apache-2.0 Imports: 15 Imported by: 65

README

PubSub-RPC

Create custom protobuf-based golang RPCs built on pub/sub.

Supports:

  • Protobuf service definitions
  • Redis or Nats as a communication layer
  • Custom server selection for RPC handling based on user-defined affinity
  • RPC topics - any RPC can be divided into topics, (e.g. by region)
  • Single RPCs - one request is handled by one server, used for normal RPCs
  • Multi RPCs - one request is handled by every server, used for distributed updates or result aggregation
  • Queue Subscriptions - updates sent from the server will only be processed by a single client
  • Subscriptions - updates sent be the server will be processed by every client

Usage

Protobuf

PSRPC is generated from proto files, and we've added a few custom method options:

message Options {
  // For RPCs, each client request will receive a response from every server.
  // For subscriptions, every client will receive every update.
  bool multi = 1;

  // This method is a pub/sub.
  bool subscription = 2;

  // This method uses topics.
  bool topics = 3;

  // Your service will supply an affinity function for handler selection.
  bool affinity_func = 4;
}

Start with your service definition. Here's an example using different method options:

syntax = "proto3";

import "options.proto";

option go_package = "/api";

service MyService {
  // A normal RPC - one request, one response. The request will be handled by the first available server
  rpc NormalRPC(MyRequest) returns (MyResponse);
  
  // An RPC with a server affinity function for handler selection.
  rpc IntensiveRPC(MyRequest) returns (MyResponse) {
    option (psrpc.options).affinity_func = true;
  };
  
  // A multi-rpc - a client will send one request, and receive one response each from every server
  rpc GetStats(MyRequest) returns (MyResponse) {
    option (psrpc.options).multi = true;
  };
  
  // An RPC with topics - a client can send one request, and receive one response from each server in one region
  rpc GetRegionStats(MyRequest) returns (MyResponse) {
    option (psrpc.options).topics = true;
    option (psrpc.options).multi = true;
  }
  
  // A queue subscription - even if multiple clients are subscribed, only one will receive this update.
  // The request parameter (in this case, Ignored) will always be ignored when generating go files.
  rpc ProcessUpdate(Ignored) returns (MyUpdate) {
    option (psrpc.options).subscription = true;
  };
  
  // A normal subscription - every client will receive every update.
  // The request parameter (in this case, Ignored) will always be ignored when generating go files.
  rpc UpdateState(Ignored) returns (MyUpdate) {
    option (psrpc.options).subscription = true;
    option (psrpc.options).multi = true;
  };

  // A subscription with topics - every client subscribed to the topic will receive every update.
  // The request parameter (in this case, Ignored) will always be ignored when generating go files.
  rpc UpdateRegionState(Ignored) returns (MyUpdate) {
    option (psrpc.options).subscription = true;
    option (psrpc.options).topics = true;
    option (psrpc.options).multi = true;
  }
}

message Ignored {}
message MyRequest {}
message MyResponse {}
message MyUpdate {}
Generation

Install protoc-gen-psrpc by running go install github.com/livekit/psrpc/protoc-gen-psrpc.

If using the custom options above, you'll also need to download options.proto.

Use the --psrpc_out with protoc and include the options file.

protoc \ 
  --go_out=paths=source_relative:. \
  --psrpc_out=paths=source_relative:. \
  -I ./protoc-gen-psrpc/options \
  -I=. my_service.proto

This will create a my_service.psrpc.go file

Client

A MyServiceClient will be generated based on your rpc definitions:

type MyServiceClient interface {
    // A normal RPC - one request, one response. The request will be handled by the first available server
    NormalRPC(ctx context.Context, req *MyRequest, opts ...psrpc.RequestOpt) (*MyResponse, error)
    
    // An RPC with a server affinity function for handler selection.
    IntensiveRPC(ctx context.Context, req *MyRequest, opts ...psrpc.RequestOpt) (*MyResponse, error)
    
    // A multi-rpc - a client will send one request, and receive one response each from every server
    GetStats(ctx context.Context, req *MyRequest, opts ...psrpc.RequestOpt) (<-chan *psrpc.Response[*MyResponse], error)
    
    // An RPC with topics - a client can send one request, and receive one response from each server in one region
    GetRegionStats(ctx context.Context, topic string, req *Request, opts ...psrpc.RequestOpt) (<-chan *psrpc.Response[*MyResponse], error)
    
    // A queue subscription - even if multiple clients are subscribed, only one will receive this update.
    SubscribeProcessUpdate(ctx context.Context) (psrpc.Subscription[*MyUpdate], error)
    
    // A subscription with topics - every client subscribed to the topic will receive every update.
    SubscribeUpdateRegionState(ctx context.Context, topic string) (psrpc.Subscription[*MyUpdate], error)
}

// NewMyServiceClient creates a psrpc client that implements the MyServiceClient interface.
func NewMyServiceClient(clientID string, bus psrpc.MessageBus, opts ...psrpc.ClientOpt) (MyServiceClient, error) {
    ...
}

Multi-RPCs will return a chan *psrpc.Response, where you will receive an individual response or error from each server:

type Response[ResponseType proto.Message] struct {
    Result ResponseType
    Err    error
}

Subscription RPCs will return a psrpc.Subscription, where you can listen for updates on its channel:

type Subscription[MessageType proto.Message] interface {
    Channel() <-chan MessageType
    Close() error
}
ServerImpl

A <ServiceName>ServerImpl interface will be also be generated from your rpcs. Your service will need to fulfill its interface:

type MyServiceServerImpl interface {
    // A normal RPC - one request, one response. The request will be handled by the first available server
    NormalRPC(ctx context.Context, req *MyRequest) (*MyResponse, error)
    
    // An RPC with a server affinity function for handler selection.
    IntensiveRPC(ctx context.Context, req *MyRequest) (*MyResponse, error)
    IntensiveRPCAffinity(req *MyRequest) float32
    
    // A multi-rpc - a client will send one request, and receive one response each from every server
    GetStats(ctx context.Context, req *MyRequest) (*MyResponse, error)
    
    // An RPC with topics - a client can send one request, and receive one response from each server in one region
    GetRegionStats(ctx context.Context, req *MyRequest) (*MyResponse, error)
}
Server

Finally, a <ServiceName>Server will be generated. This is used to start your rpc server, as well as register and deregister topics:

type MyServiceServer interface {
    // An RPC with topics - a client can send one request, and receive one response from each server in one region
    RegisterGetRegionStatsTopic(topic string) error
    DeregisterGetRegionStatsTopic(topic string) error
    
    // A queue subscription - even if multiple clients are subscribed, only one will receive this update.
    PublishProcessUpdate(ctx context.Context, msg *MyUpdate) error
    
    // A subscription with topics - every client subscribed to the topic will receive every update.
    PublishUpdateRegionState(ctx context.Context, topic string, msg *MyUpdate) error
}

// NewMyServiceServer builds a RPCServer that can be used to handle
// requests that are routed to the right method in the provided svc implementation.
func NewMyServiceServer(serverID string, svc MyServiceServerImpl, bus psrpc.MessageBus, opts ...psrpc.ServerOpt) (MyServiceServer, error) {
    ...
}

Affinity

AffinityFunc

The server can implement an affinity function for the client to decide which instance should take a SingleRequest. A higher affinity score is better, and a score of 0 means the server is not available.

For example, the following could be used to return an affinity based on cpu load:

rpc IntensiveRPC(MyRequest) returns (MyResponse) {
  option (psrpc.options).affinity_func = true;
};
func (s *MyService) IntensiveRPC(ctx context.Context, req *api.MyRequest) (*api.MyResponse, error) {
    ... // do something CPU intensive
}

func (s *MyService) IntensiveRPCAffinity(_ *MyRequest) float32 {
    return stats.GetIdleCPU()
}
SelectionOpts

On the client side, you can also set server selection options with single RPCs.

type SelectionOpts struct {
    MinimumAffinity      float32       // (default 0) minimum affinity for a server to be considered a valid handler
    AcceptFirstAvailable bool          // (default true)
    AffinityTimeout      time.Duration // (default 0 (none)) server selection deadline
    ShortCircuitTimeout  time.Duration // (default 0 (none)) deadline imposed after receiving first response
}
selectionOpts := psrpc.SelectionOpts{
    MinimumAffinity:      0.5,
    AffinityTimeout:      time.Second,
    ShortCircuitTimeout:  time.Millisecond * 250,
}

res, err := myClient.IntensiveRPC(ctx, req, psrpc.WithSelectionOpts(selectionOpts))

Documentation

Index

Constants

View Source
const (
	DefaultTimeout = time.Second * 3
	ChannelSize    = 100
)

Variables

View Source
var (
	ErrRequestTimedOut = errors.New("request timed out")
	ErrNoResponse      = errors.New("no response from servers")
)
View Source
var ErrBusNotConnected = errors.New("bus not connected")

Functions

func Publish

func Publish(bus MessageBus, ctx context.Context, channel string, msg proto.Message) error

func RequestAll

func RequestAll[ResponseType proto.Message](
	ctx context.Context,
	client RPCClient,
	rpc string,
	request proto.Message,
	opts ...RequestOpt,
) (<-chan *Response[ResponseType], error)

func RequestSingle

func RequestSingle[ResponseType proto.Message](
	ctx context.Context,
	client RPCClient,
	rpc string,
	request proto.Message,
	opts ...RequestOpt,
) (ResponseType, error)

func RequestTopicAll

func RequestTopicAll[ResponseType proto.Message](
	ctx context.Context,
	client RPCClient,
	rpc string,
	topic string,
	request proto.Message,
	opts ...RequestOpt,
) (<-chan *Response[ResponseType], error)

func RequestTopicSingle

func RequestTopicSingle[ResponseType proto.Message](
	ctx context.Context,
	client RPCClient,
	rpc string,
	topic string,
	request proto.Message,
	opts ...RequestOpt,
) (ResponseType, error)

func SetLogger

func SetLogger(l logr.Logger)

Types

type AffinityFunc

type AffinityFunc[RequestType proto.Message] func(RequestType) float32

type ClientOpt

type ClientOpt func(clientOpts) clientOpts

func WithClientTimeout

func WithClientTimeout(timeout time.Duration) ClientOpt

type Handler

type Handler interface {
	// contains filtered or unexported methods
}

func NewHandler

func NewHandler[RequestType proto.Message, ResponseType proto.Message](
	rpc string,
	handlerFunc func(context.Context, RequestType) (ResponseType, error),
) Handler

func NewHandlerWithAffinity

func NewHandlerWithAffinity[RequestType proto.Message, ResponseType proto.Message](
	rpc string,
	handlerFunc func(context.Context, RequestType) (ResponseType, error),
	affinityFunc AffinityFunc[RequestType],
) Handler

func NewTopicHandler

func NewTopicHandler[RequestType proto.Message, ResponseType proto.Message](
	rpc string,
	topic string,
	handlerFunc func(context.Context, RequestType) (ResponseType, error),
) Handler

func NewTopicHandlerWithAffinity

func NewTopicHandlerWithAffinity[RequestType proto.Message, ResponseType proto.Message](
	rpc string,
	topic string,
	handlerFunc func(context.Context, RequestType) (ResponseType, error),
	affinityFunc AffinityFunc[RequestType],
) Handler

type MessageBus

type MessageBus interface {
	// contains filtered or unexported methods
}

func NewNatsMessageBus

func NewNatsMessageBus(nc *nats.Conn) MessageBus

func NewRedisMessageBus

func NewRedisMessageBus(rc redis.UniversalClient) MessageBus

type RPCClient

type RPCClient interface {
	// close all subscriptions and stop
	Close()
	// contains filtered or unexported methods
}

func NewRPCClient

func NewRPCClient(serviceName, clientID string, bus MessageBus, opts ...ClientOpt) (RPCClient, error)

type RPCServer

type RPCServer interface {
	// register a handler
	RegisterHandler(h Handler) error
	// publish updates to a streaming rpc
	Publish(ctx context.Context, rpc string, message proto.Message) error
	// publish updates to a topic within a streaming rpc
	PublishTopic(ctx context.Context, rpc, topic string, message proto.Message) error
	// stop listening for requests for a rpc
	DeregisterHandler(rpc string) error
	// stop listening on a topic for a rpc
	DeregisterTopic(rpc, topic string) error
	// close all subscriptions and stop
	Close()
}

func NewRPCServer

func NewRPCServer(serviceName, serverID string, bus MessageBus, opts ...ServerOpt) RPCServer

type RequestOpt

type RequestOpt func(reqOpts) reqOpts

func WithRequestTimeout

func WithRequestTimeout(timeout time.Duration) RequestOpt

func WithSelectionOpts

func WithSelectionOpts(opts SelectionOpts) RequestOpt

type Response

type Response[ResponseType proto.Message] struct {
	Result ResponseType
	Err    error
}

type SelectionOpts

type SelectionOpts struct {
	MinimumAffinity      float32       // minimum affinity for a server to be considered a valid handler
	AcceptFirstAvailable bool          // go fast
	AffinityTimeout      time.Duration // server selection deadline
	ShortCircuitTimeout  time.Duration // deadline imposed after receiving first response
}

type ServerOpt

type ServerOpt func(serverOpts) serverOpts

func WithServerTimeout

func WithServerTimeout(timeout time.Duration) ServerOpt

type Subscription

type Subscription[MessageType proto.Message] interface {
	Channel() <-chan MessageType
	Close() error
}

func Subscribe

func Subscribe[MessageType proto.Message](bus MessageBus, ctx context.Context, channel string) (Subscription[MessageType], error)

func SubscribeQueue

func SubscribeQueue[MessageType proto.Message](bus MessageBus, ctx context.Context, channel string) (Subscription[MessageType], error)

func SubscribeStream

func SubscribeStream[ResponseType proto.Message](
	ctx context.Context,
	client RPCClient,
	rpc string,
) (Subscription[ResponseType], error)

func SubscribeStreamQueue

func SubscribeStreamQueue[ResponseType proto.Message](
	ctx context.Context,
	client RPCClient,
	rpc string,
) (Subscription[ResponseType], error)

func SubscribeTopic

func SubscribeTopic[ResponseType proto.Message](
	ctx context.Context,
	client RPCClient,
	rpc string,
	topic string,
) (Subscription[ResponseType], error)

func SubscribeTopicQueue

func SubscribeTopicQueue[ResponseType proto.Message](
	ctx context.Context,
	client RPCClient,
	rpc string,
	topic string,
) (Subscription[ResponseType], error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL