server

package
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 16, 2024 License: Apache-2.0, BSD-2-Clause, BSD-3-Clause, + 2 more Imports: 26 Imported by: 52

README

English | 中文

tRPC-Go Server Package

Introduction

A service process may listen on multiple ports, providing different business services on different ports. Therefore, the server package introduces the concepts of Server, Service, and Proto service. Typically, one process contains one Server, and each Server can contain one or more Service. Services are used for name registration, and clients use it's names for routing and sending network requests. Upon receiving a request, the server executes the business logic based on the specified Protos service.

  • Server: Represents a tRPC server instance, i.e., one process.
  • Service: Represents a logical service, i.e., a real external service that listens on a port. It corresponds one-to-one with services defined in the configuration file, with one Server possibly containing multiple Service, one for each port.
  • Proto service: Represents a protocol service defined in a protobuf protocol file. Typically, a Service corresponds one-to-one with a Proto service, but users can also combine them arbitrarily using the Register method.
// Server is a tRPC server. One process, one server.
// A server may offer one or more services.
type Server struct {
    MaxCloseWaitTime time.Duration
}

// Service is the interface that provides services.
type Service interface {
    // Register registers a proto service.
    Register(serviceDesc interface{}, serviceImpl interface{}) error
    // Serve starts serving.
    Serve() error
    // Close stops serving.
    Close(chan struct{}) error
}

Service Mapping Relationships

Suppose a protocol file provides a hello service as follows:

service hello {
    rpc SayHello(HelloRequest) returns (HelloReply) {};
}

And a configuration file specifies multiple services, each providing trpc and http protocol services:

server: # Server configuration
  app: test # Application name
  server: helloworld # Process service name
  close_wait_time: 5000 # Minimum waiting time for service unregistration when closing, in milliseconds
  max_close_wait_time: 60000 # Maximum waiting time when closing to allow pending requests to complete, in milliseconds
  service: # Business services providing two services, listening on different ports and offering different protocols
    - name: trpc.test.helloworld.HelloTrpc # Name for the first service
      ip: 127.0.0.1 # IP address the service listens on
      port: 8000 # Port the service listens on (8000)
      protocol: trpc # Provides tRPC protocol service
    - name: trpc.test.helloworld.HelloHttp # Name for the second service
      ip: 127.0.0.1 # IP address the service listens on
      port: 8080 # Port the service listens on (8080)
      protocol: http # Provides HTTP protocol service

To register protocol services for different logical services:

type helloImpl struct{}

func (s *helloImpl) SayHello(ctx context.Context, req *pb.HelloRequest) (*pb.HelloReply, error) {
    rsp := &pb.HelloReply{}
    // implement business logic here ...
    return rsp, nil
}

func main() {
    s := trpc.NewServer()

    // Recommended: Register a proto service for each service separately
    pb.RegisterHiServer(s.Service("trpc.test.helloworld.HelloTrpc"), helloImpl)
    pb.RegisterHiServer(s.Service("trpc.test.helloworld.HelloHttp"), helloImpl)

    // Alternatively, register the same proto service for all services in the server
    pb.RegisterHelloServer(s, helloImpl)
}

Server Execution Flow

  1. The transport layer accepts a new connection and starts a goroutine to handle the connection's data.
  2. Upon receiving a complete data packet, unpack the entire request.
  3. Locate the specific handling function based on the specific proto service name.
  4. Decode the request body.
  5. Set an overall message timeout.
  6. Decompress and deserialize the request body.
  7. Call pre-interceptors.
  8. Enter the business handling function.
  9. Exit the business handling function.
  10. Call post-interceptors.
  11. Serialize and compress the response body.
  12. Package the entire response.
  13. Send the response back to the upstream client.

Documentation

Overview

Package server provides a framework for managing multiple services within a single process. A server process may listen on multiple ports, providing different services on different ports.

Index

Constants

View Source
const MaxCloseWaitTime = 10 * time.Second

MaxCloseWaitTime is the max waiting time for closing services.

Variables

View Source
var DefaultServerCloseSIG = []os.Signal{syscall.SIGINT, syscall.SIGTERM, syscall.SIGSEGV}

DefaultServerCloseSIG are signals that trigger server shutdown.

View Source
var DefaultServerGracefulSIG = syscall.SIGUSR2

DefaultServerGracefulSIG is signal that triggers server graceful restart.

View Source
var New = func(opts ...Option) Service {
	o := defaultOptions()
	s := &service{
		opts:           o,
		handlers:       make(map[string]Handler),
		streamHandlers: make(map[string]StreamHandler),
		streamInfo:     make(map[string]*StreamServerInfo),
	}
	for _, o := range opts {
		o(s.opts)
	}
	o.Transport = attemptSwitchingTransport(o)
	if !s.opts.handlerSet {

		s.opts.ServeOptions = append(s.opts.ServeOptions, transport.WithHandler(s))
	}
	s.ctx, s.cancel = context.WithCancel(context.Background())
	return s
}

New creates a service. It will use transport.DefaultServerTransport unless Option WithTransport() is called to replace its transport.ServerTransport plugin.

Functions

func RegisterStreamFilter

func RegisterStreamFilter(name string, filter StreamFilter)

RegisterStreamFilter registers server stream filter with name.

Types

type Attachment

type Attachment struct {
	// contains filtered or unexported fields
}

Attachment stores the attachment of tRPC requests/responses.

func GetAttachment

func GetAttachment(msg codec.Msg) *Attachment

GetAttachment returns Attachment from msg. If there is no Attachment in the msg, an empty attachment bound to the msg will be returned.

func (*Attachment) Request

func (a *Attachment) Request() io.Reader

Request returns Request Attachment.

func (*Attachment) SetResponse

func (a *Attachment) SetResponse(attachment io.Reader)

SetResponse sets Response attachment.

type FilterFunc

type FilterFunc func(reqBody interface{}) (filter.ServerChain, error)

FilterFunc reads reqBody, parses it, and returns a filter.Chain for server stub.

type Handler

type Handler func(ctx context.Context, f FilterFunc) (rspBody interface{}, err error)

Handler is the default handler.

type Method

type Method struct {
	Name     string
	Func     func(svr interface{}, ctx context.Context, f FilterFunc) (rspBody interface{}, err error)
	Bindings []*restful.Binding
}

Method provides the information of an RPC Method.

type Option

type Option func(*Options)

Option sets server options.

func WithAddress

func WithAddress(s string) Option

WithAddress returns an Option that sets address (ip:port or :port).

func WithCloseWaitTime

func WithCloseWaitTime(t time.Duration) Option

WithCloseWaitTime returns an Option that sets min waiting time when close service. It's used for service's graceful restart. Default: 0ms, max: 10s.

func WithContainer

func WithContainer(container string) Option

WithContainer returns an Option that sets container name.

func WithCurrentCompressType

func WithCurrentCompressType(t int) Option

WithCurrentCompressType returns an Option that sets current compress type.

func WithCurrentSerializationType

func WithCurrentSerializationType(t int) Option

WithCurrentSerializationType returns an Option that sets current serialization type. It's often used for transparent proxy without serialization. If current serialization type is not set, serialization type will be determined by serialization field of request protocol.

func WithDisableKeepAlives

func WithDisableKeepAlives(disable bool) Option

WithDisableKeepAlives returns an Option that disables keep-alives.

func WithDisableRequestTimeout

func WithDisableRequestTimeout(disable bool) Option

WithDisableRequestTimeout returns an Option that disables timeout for handling requests.

func WithEnvName

func WithEnvName(envName string) Option

WithEnvName returns an Option that sets environment name.

func WithFilter

func WithFilter(f filter.ServerFilter) Option

WithFilter returns an Option that adds a filter.Filter (pre or post).

func WithFilters

func WithFilters(fs []filter.ServerFilter) Option

WithFilters returns an Option that adds a filter chain.

func WithHandler

func WithHandler(h transport.Handler) Option

WithHandler returns an Option that sets transport.Handler (service itself by default).

func WithIdleTimeout

func WithIdleTimeout(t time.Duration) Option

WithIdleTimeout returns an Option that sets idle connection timeout. Notice: it doesn't work for server streaming.

func WithListener

func WithListener(lis net.Listener) Option

WithListener returns an Option that sets net.Listener for accept, read/write op customization.

func WithMaxCloseWaitTime

func WithMaxCloseWaitTime(t time.Duration) Option

WithMaxCloseWaitTime returns an Option that sets max waiting time when close service. It's used for wait requests finish. Default: 0ms.

func WithMaxRoutines

func WithMaxRoutines(routines int) Option

WithMaxRoutines returns an Option that sets max number of goroutines. It only works for server async mode. MaxRoutines should be set to twice as expected number of routines (can be calculated by expected QPS), and larger than MAXPROCS. If MaxRoutines is not set or set to 0, it will be set to (1<<31 - 1). Requests exceeding MaxRoutines will be queued. Prolonged overages may lead to OOM! MaxRoutines is not the solution to alleviate server overloading.

func WithMaxWindowSize

func WithMaxWindowSize(w uint32) Option

WithMaxWindowSize returns an Option that sets max window size for server stream.

func WithNamedFilter

func WithNamedFilter(name string, f filter.ServerFilter) Option

WithNamedFilter returns an Option that adds named filter

func WithNamespace

func WithNamespace(namespace string) Option

WithNamespace returns an Option that sets namespace for server.

func WithNetwork

func WithNetwork(s string) Option

WithNetwork returns an Option that sets network, tcp by default.

func WithProtocol

func WithProtocol(s string) Option

WithProtocol returns an Option that sets protocol of service. This Option also sets framerbuilder and codec plugin.

func WithRESTOptions

func WithRESTOptions(opts ...restful.Option) Option

WithRESTOptions returns an Option that sets RESTful router options.

func WithRegistry

func WithRegistry(r registry.Registry) Option

WithRegistry returns an Option that sets registry.Registry. One service, one registry.Registry.

func WithServerAsync

func WithServerAsync(serverAsync bool) Option

WithServerAsync returns an Option that sets whether to enable server asynchronous or not. When enable it, the server can cyclically receive packets and process request and response packets concurrently for the same connection.

func WithServiceName

func WithServiceName(s string) Option

WithServiceName returns an Option that sets service name.

func WithSetName

func WithSetName(setName string) Option

WithSetName returns an Option that sets "Set" name.

func WithStreamFilter

func WithStreamFilter(sf StreamFilter) Option

WithStreamFilter returns an Option that adds a stream filter (pre or post).

func WithStreamFilters

func WithStreamFilters(sfs ...StreamFilter) Option

WithStreamFilters returns an Option that adds a stream filter chain.

func WithStreamTransport

func WithStreamTransport(st transport.ServerStreamTransport) Option

WithStreamTransport returns an Option that sets transport.ServerStreamTransport for server.

func WithTLS

func WithTLS(certFile, keyFile, caFile string) Option

WithTLS returns an Option that sets TLS certificate files' path. The input param certFile represents server certificate. The input param keyFile represents server private key. The input param caFile represents CA certificate, which is used for client-to-server authentication(mTLS). If cafile is empty, no client validation. Also, caFile="root" means local ca file would be used to validate client. All certificates must be X.509 certificates.

func WithTimeout

func WithTimeout(t time.Duration) Option

WithTimeout returns an Option that sets timeout for handling a request.

func WithTransport

func WithTransport(t transport.ServerTransport) Option

WithTransport returns an Option that sets transport.ServerTransport.

func WithWritev

func WithWritev(writev bool) Option

WithWritev returns an Option that sets whether to enable writev or not.

type Options

type Options struct {
	Namespace   string // namespace like "Production", "Development" etc.
	EnvName     string // environment name
	SetName     string // "Set" name
	ServiceName string // service name

	Address                  string        // listen address, ip:port
	Timeout                  time.Duration // timeout for handling a request
	DisableRequestTimeout    bool          // whether to disable request timeout that inherits from upstream
	DisableKeepAlives        bool          // disables keep-alives
	CurrentSerializationType int
	CurrentCompressType      int

	ServeOptions []transport.ListenServeOption
	Transport    transport.ServerTransport

	Registry registry.Registry
	Codec    codec.Codec

	Filters          filter.ServerChain              // filter chain
	FilterNames      []string                        // the name of filters
	StreamHandle     StreamHandle                    // server stream processing
	StreamTransport  transport.ServerStreamTransport // server stream transport plugin
	MaxWindowSize    uint32                          // max window size for server stream
	CloseWaitTime    time.Duration                   // min waiting time when closing server for wait deregister finish
	MaxCloseWaitTime time.Duration                   // max waiting time when closing server for wait requests finish

	RESTOptions   []restful.Option // RESTful router options
	StreamFilters StreamFilterChain
	// contains filtered or unexported fields
}

Options are server side options.

type Server

type Server struct {
	MaxCloseWaitTime time.Duration // max waiting time when closing server
	// contains filtered or unexported fields
}

Server is a tRPC server. One process, one server. A server may offer one or more services.

func (*Server) AddService

func (s *Server) AddService(serviceName string, service Service)

AddService adds a service for the server. The param serviceName refers to the name used for Naming Services and configured by config file (typically trpc_go.yaml). When trpc.NewServer() is called, it will traverse service configuration from config file, and call AddService to add a service implementation to the server's map[string]Service (serviceName as key).

func (*Server) Close

func (s *Server) Close(ch chan struct{}) error

Close implements Service interface, notifying all services of server shutdown. Would wait no more than 10s.

func (*Server) Register

func (s *Server) Register(serviceDesc interface{}, serviceImpl interface{}) error

Register implements Service interface, registering a proto service. Normally, a server contains only one service, so the registration is straightforward. When it comes to server with multiple services, remember to use Service("servicename") to specify which service this proto service is registered for. Otherwise, this proto service will be registered for all services of the server.

func (*Server) RegisterOnShutdown

func (s *Server) RegisterOnShutdown(fn func())

RegisterOnShutdown registers a hook function that would be executed when server is shutting down.

func (*Server) Serve

func (s *Server) Serve() error

Serve implements Service, starting all services that belong to the server.

func (*Server) Service

func (s *Server) Service(serviceName string) Service

Service returns a service by service name.

func (*Server) StartNewProcess

func (s *Server) StartNewProcess(args ...string) (uintptr, error)

StartNewProcess starts a new process.

type Service

type Service interface {
	// Register registers a proto service.
	Register(serviceDesc interface{}, serviceImpl interface{}) error
	// Serve starts serving.
	Serve() error
	// Close stops serving.
	Close(chan struct{}) error
}

Service is the interface that provides services.

type ServiceDesc

type ServiceDesc struct {
	ServiceName  string
	HandlerType  interface{}
	Methods      []Method
	Streams      []StreamDesc
	StreamHandle StreamHandle
}

ServiceDesc describes a proto service.

type Stream

type Stream interface {
	// Context is context of server stream.
	Context() context.Context
	// SendMsg sends streaming data.
	SendMsg(m interface{}) error
	// RecvMsg receives streaming data.
	RecvMsg(m interface{}) error
}

Stream is the interface that defines server stream api.

type StreamDesc

type StreamDesc struct {
	// StreamName is the name of stream.
	StreamName string
	// Handler is a stream handler.
	Handler StreamHandlerWapper
	// ServerStreams indicates whether it's server streaming.
	ServerStreams bool
	// ClientStreams indicates whether it's client streaming.
	ClientStreams bool
}

StreamDesc describes a server stream.

type StreamFilter

type StreamFilter func(ss Stream, info *StreamServerInfo, handler StreamHandler) error

StreamFilter is server stream filter.

func GetStreamFilter

func GetStreamFilter(name string) StreamFilter

GetStreamFilter gets server stream filter by name.

type StreamFilterChain

type StreamFilterChain []StreamFilter

StreamFilterChain server stream filters chain.

func (StreamFilterChain) Filter

func (c StreamFilterChain) Filter(ss Stream, info *StreamServerInfo, handler StreamHandler) error

Filter implements StreamFilter for multi stream filters.

type StreamHandle

type StreamHandle interface {
	// StreamHandleFunc does server stream processing.
	StreamHandleFunc(ctx context.Context, sh StreamHandler, si *StreamServerInfo, req []byte) ([]byte, error)
	// Init does the initialization, mainly passing and saving Options.
	Init(opts *Options) error
}

StreamHandle is the interface that defines server stream processing.

type StreamHandler

type StreamHandler func(stream Stream) error

StreamHandler is server stream handler.

type StreamHandlerWapper

type StreamHandlerWapper func(srv interface{}, stream Stream) error

StreamHandlerWapper is server stream handler wrapper. The input param srv should be an implementation of server stream proto service. The input param stream is used by srv.

type StreamServerInfo

type StreamServerInfo struct {
	// FullMethod is the full RPC method string, i.e., /package.service/method.
	FullMethod string
	// IsClientStream indicates whether the RPC is a client streaming RPC.
	IsClientStream bool
	// IsServerStream indicates whether the RPC is a server streaming RPC.
	IsServerStream bool
}

StreamServerInfo is stream information on server side.

Directories

Path Synopsis
Package mockserver is a generated GoMock package.
Package mockserver is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL