server

package
v3.10.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 17, 2023 License: Apache-2.0 Imports: 29 Imported by: 31

Documentation

Overview

Package server is an interface for a micro server

Index

Constants

This section is empty.

Variables

View Source
var (
	// DefaultRegisterFunc uses backoff to register service
	DefaultRegisterFunc = func(svc *register.Service, config Options) error {
		var err error

		opts := []register.RegisterOption{
			register.RegisterTTL(config.RegisterTTL),
			register.RegisterDomain(config.Namespace),
		}

		for i := 0; i <= config.RegisterAttempts; i++ {
			err = config.Register.Register(config.Context, svc, opts...)
			if err == nil {
				break
			}

			time.Sleep(backoff.Do(i + 1))
			continue
		}
		return err
	}
	// DefaultDeregisterFunc uses backoff to deregister service
	DefaultDeregisterFunc = func(svc *register.Service, config Options) error {
		var err error

		opts := []register.DeregisterOption{
			register.DeregisterDomain(config.Namespace),
		}

		for i := 0; i <= config.DeregisterAttempts; i++ {
			err = config.Register.Deregister(config.Context, svc, opts...)
			if err == nil {
				break
			}

			time.Sleep(backoff.Do(i + 1))
			continue
		}
		return err
	}
)
View Source
var (
	// DefaultAddress will be used if no address passed, use secure localhost
	DefaultAddress = "127.0.0.1:0"
	// DefaultName will be used if no name passed
	DefaultName = "server"
	// DefaultVersion will be used if no version passed
	DefaultVersion = "latest"
	// DefaultRegisterCheck holds func that run before register server
	DefaultRegisterCheck = func(context.Context) error { return nil }
	// DefaultRegisterInterval holds interval for register
	DefaultRegisterInterval = time.Second * 30
	// DefaultRegisterTTL holds register record ttl, must be multiple of DefaultRegisterInterval
	DefaultRegisterTTL = time.Second * 90
	// DefaultNamespace will be used if no namespace passed
	DefaultNamespace = "micro"
	// DefaultMaxMsgSize holds default max msg ssize
	DefaultMaxMsgSize = 1024 * 1024 * 4 // 4Mb
	// DefaultMaxMsgRecvSize holds default max recv size
	DefaultMaxMsgRecvSize = 1024 * 1024 * 4 // 4Mb
	// DefaultMaxMsgSendSize holds default max send size
	DefaultMaxMsgSendSize = 1024 * 1024 * 4 // 4Mb
)
View Source
var DefaultCodecs = map[string]codec.Codec{
	"application/octet-stream": codec.NewCodec(),
}

DefaultCodecs will be used to encode/decode

View Source
var DefaultServer = NewServer()

DefaultServer default server

Functions

func NewContext

func NewContext(ctx context.Context, s Server) context.Context

NewContext stores Server to context

func NewRegisterService

func NewRegisterService(s Server) (*register.Service, error)

NewRegisterService returns *register.Service from Server

func ValidateSubscriber

func ValidateSubscriber(sub Subscriber) error

ValidateSubscriber func signature

Types

type BatchSubscriberFunc

type BatchSubscriberFunc func(ctxs []context.Context, msgs []Message) error

BatchSubscriberFunc represents a single method of a subscriber. It's used primarily for the wrappers. What's handed to the actual method is the concrete publication message. This func used by batch subscribers

type BatchSubscriberWrapper

type BatchSubscriberWrapper func(BatchSubscriberFunc) BatchSubscriberFunc

BatchSubscriberWrapper wraps the SubscriberFunc and returns the equivalent

type Error

type Error struct {
	// contains filtered or unexported fields
}

func NewError

func NewError(id string) *Error

func (*Error) BadGateway

func (e *Error) BadGateway(format string, a ...interface{}) error

func (*Error) BadRequest

func (e *Error) BadRequest(format string, a ...interface{}) error

func (*Error) Conflict

func (e *Error) Conflict(format string, a ...interface{}) error

func (*Error) Forbidden

func (e *Error) Forbidden(format string, a ...interface{}) error

func (*Error) GatewayTimeout

func (e *Error) GatewayTimeout(format string, a ...interface{}) error

func (*Error) InternalServerError

func (e *Error) InternalServerError(format string, a ...interface{}) error

func (*Error) MethodNotAllowed

func (e *Error) MethodNotAllowed(format string, a ...interface{}) error

func (*Error) NotFound

func (e *Error) NotFound(format string, a ...interface{}) error

func (*Error) NotImplemented

func (e *Error) NotImplemented(format string, a ...interface{}) error

func (*Error) ServiceUnavailable

func (e *Error) ServiceUnavailable(format string, a ...interface{}) error

func (*Error) Timeout

func (e *Error) Timeout(format string, a ...interface{}) error

func (*Error) Unauthorized

func (e *Error) Unauthorized(format string, a ...interface{}) error

type Handler

type Handler interface {
	Name() string
	Handler() interface{}
	Endpoints() []*register.Endpoint
	Options() HandlerOptions
}

Handler interface represents a request handler. It's generated by passing any type of public concrete object with endpoints into server.NewHandler. Most will pass in a struct.

Example:

type Greeter struct {}

func (g *Greeter) Hello(context, request, response) error {
        return nil
}

type HandlerFunc

type HandlerFunc func(ctx context.Context, req Request, rsp interface{}) error

HandlerFunc represents a single method of a handler. It's used primarily for the wrappers. What's handed to the actual method is the concrete request and response types.

type HandlerOption

type HandlerOption func(*HandlerOptions)

HandlerOption func

func EndpointMetadata

func EndpointMetadata(name string, md metadata.Metadata) HandlerOption

EndpointMetadata is a Handler option that allows metadata to be added to individual endpoints.

type HandlerOptions

type HandlerOptions struct {
	// Context holds external options
	Context context.Context
	// Metadata for hondler
	Metadata map[string]metadata.Metadata
}

HandlerOptions struct

func NewHandlerOptions

func NewHandlerOptions(opts ...HandlerOption) HandlerOptions

NewHandlerOptions creates new HandlerOptions

type HandlerWrapper

type HandlerWrapper func(HandlerFunc) HandlerFunc

HandlerWrapper wraps the HandlerFunc and returns the equivalent

type Message

type Message interface {
	// Topic of the message
	Topic() string
	// The decoded payload value
	Body() interface{}
	// The content type of the payload
	ContentType() string
	// The raw headers of the message
	Header() metadata.Metadata
	// Codec used to decode the message
	Codec() codec.Codec
}

Message is an async message interface

type Option

type Option func(*Options)

Option func

func Address

func Address(a string) Option

Address to bind to - host:port

func Advertise(a string) Option

Advertise the address to advertise for discovery - host:port

func Broker

func Broker(b broker.Broker) Option

Broker to use for pub/sub

func Codec

func Codec(contentType string, c codec.Codec) Option

Codec to use to encode/decode requests for a given content type

func Context

func Context(ctx context.Context) Option

Context specifies a context for the service. Can be used to signal shutdown of the service Can be used for extra option values.

func ID

func ID(id string) Option

ID unique server id

func Listener

func Listener(l net.Listener) Option

Listener specifies the net.Listener to use instead of the default

func Logger

func Logger(l logger.Logger) Option

Logger sets the logger option

func MaxConn

func MaxConn(n int) Option

MaxConn specifies maximum number of max simultaneous connections to server

func Metadata

func Metadata(md metadata.Metadata) Option

Metadata associated with the server

func Meter

func Meter(m meter.Meter) Option

Meter sets the meter option

func Name

func Name(n string) Option

Name sets the server name option

func Namespace

func Namespace(n string) Option

Namespace to register handlers in

func Register

func Register(r register.Register) Option

Register used for discovery

func RegisterCheck

func RegisterCheck(fn func(context.Context) error) Option

RegisterCheck run func before register service

func RegisterInterval

func RegisterInterval(t time.Duration) Option

RegisterInterval registers service with at interval

func RegisterTTL

func RegisterTTL(t time.Duration) Option

RegisterTTL registers service with a TTL

func SetOption

func SetOption(k, v interface{}) Option

SetOption returns a function to setup a context with given value

func TLSConfig

func TLSConfig(t *tls.Config) Option

TLSConfig specifies a *tls.Config

func Tracer

func Tracer(t tracer.Tracer) Option

Tracer mechanism for distributed tracking

func Transport

func Transport(t transport.Transport) Option

Transport mechanism for communication e.g http, rabbitmq, etc

func Version

func Version(v string) Option

Version of the service

func Wait

func Wait(wg *sync.WaitGroup) Option

Wait tells the server to wait for requests to finish before exiting If `wg` is nil, server only wait for completion of rpc handler. For user need finer grained control, pass a concrete `wg` here, server will wait against it on stop.

func WrapBatchSubscriber

func WrapBatchSubscriber(w BatchSubscriberWrapper) Option

WrapBatchSubscriber adds a batch subscriber Wrapper to a list of options passed into the server

func WrapHandler

func WrapHandler(w HandlerWrapper) Option

WrapHandler adds a handler Wrapper to a list of options passed into the server

func WrapSubscriber

func WrapSubscriber(w SubscriberWrapper) Option

WrapSubscriber adds a subscriber Wrapper to a list of options passed into the server

type Options

type Options struct {
	// Context holds the external options and can be used for server shutdown
	Context context.Context
	// Broker holds the server broker
	Broker broker.Broker
	// Register holds the register
	Register register.Register
	// Tracer holds the tracer
	Tracer tracer.Tracer
	// Logger holds the logger
	Logger logger.Logger
	// Meter holds the meter
	Meter meter.Meter
	// Transport holds the transport
	Transport transport.Transport

	// Listener may be passed if already created
	Listener net.Listener
	// Wait group
	Wait *sync.WaitGroup
	// TLSConfig specifies tls.Config for secure serving
	TLSConfig *tls.Config
	// Metadata holds the server metadata
	Metadata metadata.Metadata
	// RegisterCheck run before register server
	RegisterCheck func(context.Context) error
	// Codecs map to handle content-type
	Codecs map[string]codec.Codec
	// ID holds the id of the server
	ID string
	// Namespace for te server
	Namespace string
	// Name holds the server name
	Name string
	// Address holds the server address
	Address string
	// Advertise holds the advertise address
	Advertise string
	// Version holds the server version
	Version string
	// SubWrappers holds the server subscribe wrappers
	SubWrappers []SubscriberWrapper
	// BatchSubWrappers holds the server batch subscribe wrappers
	BatchSubWrappers []BatchSubscriberWrapper
	// HdlrWrappers holds the handler wrappers
	HdlrWrappers []HandlerWrapper
	// RegisterAttempts holds the number of register attempts before error
	RegisterAttempts int
	// RegisterInterval holds he interval for re-register
	RegisterInterval time.Duration
	// RegisterTTL specifies TTL for register record
	RegisterTTL time.Duration
	// MaxConn limits number of connections
	MaxConn int
	// DeregisterAttempts holds the number of deregister attempts before error
	DeregisterAttempts int
}

Options server struct

func NewOptions

func NewOptions(opts ...Option) Options

NewOptions returns new options struct with default or passed values

type Request

type Request interface {
	// Service name requested
	Service() string
	// The action requested
	Method() string
	// Endpoint name requested
	Endpoint() string
	// Content type provided
	ContentType() string
	// Header of the request
	Header() metadata.Metadata
	// Body is the initial decoded value
	Body() interface{}
	// Read the undecoded request body
	Read() ([]byte, error)
	// The encoded message stream
	Codec() codec.Codec
	// Indicates whether its a stream
	Stream() bool
}

Request is a synchronous request interface

type Response

type Response interface {
	// Encoded writer
	Codec() codec.Codec
	// Write the header
	WriteHeader(md metadata.Metadata)
	// write a response directly to the client
	Write([]byte) error
}

Response is the response writer for unencoded messages

type Server

type Server interface {
	// Name returns server name
	Name() string
	// Initialise options
	Init(...Option) error
	// Retrieve the options
	Options() Options
	// Register a handler
	Handle(h Handler) error
	// Create a new handler
	NewHandler(h interface{}, opts ...HandlerOption) Handler
	// Create a new subscriber
	NewSubscriber(topic string, h interface{}, opts ...SubscriberOption) Subscriber
	// Register a subscriber
	Subscribe(s Subscriber) error
	// Start the server
	Start() error
	// Stop the server
	Stop() error
	// Server implementation
	String() string
}

Server is a simple micro server abstraction

func FromContext

func FromContext(ctx context.Context) (Server, bool)

FromContext returns Server from context

func NewServer

func NewServer(opts ...Option) Server

NewServer returns new noop server

type Stream

type Stream interface {
	// Context for the stream
	Context() context.Context
	// Request returns request
	Request() Request
	// Send will encode and send a request
	Send(msg interface{}) error
	// Recv will decode and read a response
	Recv(msg interface{}) error
	// SendMsg will encode and send a request
	SendMsg(msg interface{}) error
	// RecvMsg will decode and read a response
	RecvMsg(msg interface{}) error
	// Error returns stream error
	Error() error
	// Close closes the stream
	Close() error
}

Stream represents a stream established with a client. A stream can be bidirectional which is indicated by the request. The last error will be left in Error(). EOF indicates end of the stream.

type StreamWrapper

type StreamWrapper func(Stream) Stream

StreamWrapper wraps a Stream interface and returns the equivalent. Because streams exist for the lifetime of a method invocation this is a convenient way to wrap a Stream as its in use for trace, monitoring, metrics, etc.

type Subscriber

type Subscriber interface {
	Topic() string
	Subscriber() interface{}
	Endpoints() []*register.Endpoint
	Options() SubscriberOptions
}

Subscriber interface represents a subscription to a given topic using a specific subscriber function or object with endpoints. It mirrors the handler in its behaviour.

type SubscriberFunc

type SubscriberFunc func(ctx context.Context, msg Message) error

SubscriberFunc represents a single method of a subscriber. It's used primarily for the wrappers. What's handed to the actual method is the concrete publication message.

type SubscriberOption

type SubscriberOption func(*SubscriberOptions)

SubscriberOption func

func DisableAutoAck

func DisableAutoAck() SubscriberOption

DisableAutoAck will disable auto acking of messages after they have been handled.

func SetSubscriberOption

func SetSubscriberOption(k, v interface{}) SubscriberOption

SetSubscriberOption returns a function to setup a context with given value

func SubscriberAck

func SubscriberAck(b bool) SubscriberOption

SubscriberAck control auto ack processing for handler

func SubscriberBatch

func SubscriberBatch(b bool) SubscriberOption

SubscriberBatch control batch processing for handler

func SubscriberBatchSize

func SubscriberBatchSize(n int) SubscriberOption

SubscriberBatchSize control batch filling size for handler Batch filling max waiting time controlled by SubscriberBatchWait

func SubscriberBatchWait

func SubscriberBatchWait(td time.Duration) SubscriberOption

SubscriberBatchWait control batch filling wait time for handler

func SubscriberBodyOnly

func SubscriberBodyOnly(b bool) SubscriberOption

SubscriberBodyOnly says broker that message contains raw data with absence of micro broker.Message format

func SubscriberContext

func SubscriberContext(ctx context.Context) SubscriberOption

SubscriberContext set context options to allow broker SubscriberOption passed

func SubscriberGroup

func SubscriberGroup(n string) SubscriberOption

SubscriberGroup sets the shared group name distributed messages across subscribers

func SubscriberQueue

func SubscriberQueue(n string) SubscriberOption

SubscriberQueue sets the shared queue name distributed messages across subscribers

type SubscriberOptions

type SubscriberOptions struct {
	// Context holds the external options
	Context context.Context
	// Queue holds the subscription queue
	Queue string
	// AutoAck flag for auto ack messages after processing
	AutoAck bool
	// BodyOnly flag specifies that message without headers
	BodyOnly bool
	// Batch flag specifies that message processed in batches
	Batch bool
	// BatchSize flag specifies max size of batch
	BatchSize int
	// BatchWait flag specifies max wait time for batch filling
	BatchWait time.Duration
}

SubscriberOptions struct

func NewSubscriberOptions

func NewSubscriberOptions(opts ...SubscriberOption) SubscriberOptions

NewSubscriberOptions create new SubscriberOptions

type SubscriberWrapper

type SubscriberWrapper func(SubscriberFunc) SubscriberFunc

SubscriberWrapper wraps the SubscriberFunc and returns the equivalent

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL