Documentation ¶
Overview ¶
Package server is an interface for a micro server
Index ¶
- Variables
- func NewContext(ctx context.Context, s Server) context.Context
- func NewRegisterService(s Server) (*register.Service, error)
- func ValidateSubscriber(sub Subscriber) error
- type BatchSubscriberFunc
- type BatchSubscriberWrapper
- type Error
- func (e *Error) BadGateway(format string, a ...interface{}) error
- func (e *Error) BadRequest(format string, a ...interface{}) error
- func (e *Error) Conflict(format string, a ...interface{}) error
- func (e *Error) Forbidden(format string, a ...interface{}) error
- func (e *Error) GatewayTimeout(format string, a ...interface{}) error
- func (e *Error) InternalServerError(format string, a ...interface{}) error
- func (e *Error) MethodNotAllowed(format string, a ...interface{}) error
- func (e *Error) NotFound(format string, a ...interface{}) error
- func (e *Error) NotImplemented(format string, a ...interface{}) error
- func (e *Error) ServiceUnavailable(format string, a ...interface{}) error
- func (e *Error) Timeout(format string, a ...interface{}) error
- func (e *Error) Unauthorized(format string, a ...interface{}) error
- type Handler
- type HandlerFunc
- type HandlerOption
- type HandlerOptions
- type HandlerWrapper
- type Message
- type Option
- func Address(a string) Option
- func Advertise(a string) Option
- func Auth(a auth.Auth) Option
- func Broker(b broker.Broker) Option
- func Codec(contentType string, c codec.Codec) Option
- func Context(ctx context.Context) Option
- func ID(id string) Option
- func Listener(l net.Listener) Option
- func Logger(l logger.Logger) Option
- func MaxConn(n int) Option
- func Metadata(md metadata.Metadata) Option
- func Meter(m meter.Meter) Option
- func Name(n string) Option
- func Namespace(n string) Option
- func Register(r register.Register) Option
- func RegisterCheck(fn func(context.Context) error) Option
- func RegisterInterval(t time.Duration) Option
- func RegisterTTL(t time.Duration) Option
- func SetOption(k, v interface{}) Option
- func TLSConfig(t *tls.Config) Option
- func Tracer(t tracer.Tracer) Option
- func Transport(t transport.Transport) Option
- func Version(v string) Option
- func Wait(wg *sync.WaitGroup) Option
- func WrapBatchSubscriber(w BatchSubscriberWrapper) Option
- func WrapHandler(w HandlerWrapper) Option
- func WrapSubscriber(w SubscriberWrapper) Option
- type Options
- type Request
- type Response
- type Server
- type Stream
- type StreamWrapper
- type Subscriber
- type SubscriberFunc
- type SubscriberOption
- func DisableAutoAck() SubscriberOption
- func SetSubscriberOption(k, v interface{}) SubscriberOption
- func SubscriberAck(b bool) SubscriberOption
- func SubscriberBatch(b bool) SubscriberOption
- func SubscriberBatchSize(n int) SubscriberOption
- func SubscriberBatchWait(td time.Duration) SubscriberOption
- func SubscriberBodyOnly(b bool) SubscriberOption
- func SubscriberContext(ctx context.Context) SubscriberOption
- func SubscriberGroup(n string) SubscriberOption
- func SubscriberQueue(n string) SubscriberOption
- type SubscriberOptions
- type SubscriberWrapper
Constants ¶
This section is empty.
Variables ¶
var ( // DefaultRegisterFunc uses backoff to register service DefaultRegisterFunc = func(svc *register.Service, config Options) error { var err error opts := []register.RegisterOption{ register.RegisterTTL(config.RegisterTTL), register.RegisterDomain(config.Namespace), } for i := 0; i <= config.RegisterAttempts; i++ { err = config.Register.Register(config.Context, svc, opts...) if err == nil { break } time.Sleep(backoff.Do(i + 1)) continue } return err } // DefaultDeregisterFunc uses backoff to deregister service DefaultDeregisterFunc = func(svc *register.Service, config Options) error { var err error opts := []register.DeregisterOption{ register.DeregisterDomain(config.Namespace), } for i := 0; i <= config.DeregisterAttempts; i++ { err = config.Register.Deregister(config.Context, svc, opts...) if err == nil { break } time.Sleep(backoff.Do(i + 1)) continue } return err } )
var ( // DefaultAddress will be used if no address passed, use secure localhost DefaultAddress = "127.0.0.1:0" // DefaultName will be used if no name passed DefaultName = "server" // DefaultVersion will be used if no version passed DefaultVersion = "latest" // DefaultID will be used if no id passed DefaultID = id.Must() // DefaultRegisterCheck holds func that run before register server DefaultRegisterCheck = func(context.Context) error { return nil } // DefaultRegisterInterval holds interval for register DefaultRegisterInterval = time.Second * 30 // DefaultRegisterTTL holds register record ttl, must be multiple of DefaultRegisterInterval DefaultRegisterTTL = time.Second * 90 // DefaultNamespace will be used if no namespace passed DefaultNamespace = "micro" // DefaultMaxMsgSize holds default max msg ssize DefaultMaxMsgSize = 1024 * 1024 * 4 // 4Mb // DefaultMaxMsgRecvSize holds default max recv size DefaultMaxMsgRecvSize = 1024 * 1024 * 4 // 4Mb // DefaultMaxMsgSendSize holds default max send size DefaultMaxMsgSendSize = 1024 * 1024 * 4 // 4Mb )
var DefaultCodecs = map[string]codec.Codec{ "application/octet-stream": codec.NewCodec(), }
DefaultCodecs will be used to encode/decode
Functions ¶
func NewContext ¶
NewContext stores Server to context
func NewRegisterService ¶
NewRegisterService returns *register.Service from Server
func ValidateSubscriber ¶
func ValidateSubscriber(sub Subscriber) error
ValidateSubscriber func signature
Types ¶
type BatchSubscriberFunc ¶
BatchSubscriberFunc represents a single method of a subscriber. It's used primarily for the wrappers. What's handed to the actual method is the concrete publication message. This func used by batch subscribers
type BatchSubscriberWrapper ¶
type BatchSubscriberWrapper func(BatchSubscriberFunc) BatchSubscriberFunc
BatchSubscriberWrapper wraps the SubscriberFunc and returns the equivalent
type Error ¶
type Error struct {
// contains filtered or unexported fields
}
func (*Error) BadGateway ¶
func (*Error) BadRequest ¶
func (*Error) GatewayTimeout ¶
func (*Error) InternalServerError ¶
func (*Error) MethodNotAllowed ¶
func (*Error) NotImplemented ¶
func (*Error) ServiceUnavailable ¶
func (*Error) Unauthorized ¶
type Handler ¶
type Handler interface { Name() string Handler() interface{} Endpoints() []*register.Endpoint Options() HandlerOptions }
Handler interface represents a request handler. It's generated by passing any type of public concrete object with endpoints into server.NewHandler. Most will pass in a struct.
Example:
type Greeter struct {} func (g *Greeter) Hello(context, request, response) error { return nil }
type HandlerFunc ¶
HandlerFunc represents a single method of a handler. It's used primarily for the wrappers. What's handed to the actual method is the concrete request and response types.
type HandlerOption ¶
type HandlerOption func(*HandlerOptions)
HandlerOption func
func EndpointMetadata ¶
func EndpointMetadata(name string, md metadata.Metadata) HandlerOption
EndpointMetadata is a Handler option that allows metadata to be added to individual endpoints.
type HandlerOptions ¶
type HandlerOptions struct { // Context holds external options Context context.Context // Metadata for hondler Metadata map[string]metadata.Metadata }
HandlerOptions struct
func NewHandlerOptions ¶
func NewHandlerOptions(opts ...HandlerOption) HandlerOptions
NewHandlerOptions creates new HandlerOptions
type HandlerWrapper ¶
type HandlerWrapper func(HandlerFunc) HandlerFunc
HandlerWrapper wraps the HandlerFunc and returns the equivalent
type Message ¶
type Message interface { // Topic of the message Topic() string // The decoded payload value Payload() interface{} // The content type of the payload ContentType() string // The raw headers of the message Header() metadata.Metadata // The raw body of the message Body() []byte // Codec used to decode the message Codec() codec.Codec }
Message is an async message interface
type Option ¶
type Option func(*Options)
Option func
func Context ¶
Context specifies a context for the service. Can be used to signal shutdown of the service Can be used for extra option values.
func RegisterCheck ¶
RegisterCheck run func before register service
func RegisterInterval ¶
RegisterInterval registers service with at interval
func RegisterTTL ¶
RegisterTTL registers service with a TTL
func SetOption ¶
func SetOption(k, v interface{}) Option
SetOption returns a function to setup a context with given value
func Wait ¶
Wait tells the server to wait for requests to finish before exiting If `wg` is nil, server only wait for completion of rpc handler. For user need finer grained control, pass a concrete `wg` here, server will wait against it on stop.
func WrapBatchSubscriber ¶
func WrapBatchSubscriber(w BatchSubscriberWrapper) Option
WrapBatchSubscriber adds a batch subscriber Wrapper to a list of options passed into the server
func WrapHandler ¶
func WrapHandler(w HandlerWrapper) Option
WrapHandler adds a handler Wrapper to a list of options passed into the server
func WrapSubscriber ¶
func WrapSubscriber(w SubscriberWrapper) Option
WrapSubscriber adds a subscriber Wrapper to a list of options passed into the server
type Options ¶
type Options struct { // Context holds the external options and can be used for server shutdown Context context.Context // Broker holds the server broker Broker broker.Broker // Register holds the register Register register.Register // Tracer holds the tracer Tracer tracer.Tracer // Auth holds the auth Auth auth.Auth // Logger holds the logger Logger logger.Logger // Meter holds the meter Meter meter.Meter // Transport holds the transport Transport transport.Transport // Listener may be passed if already created Listener net.Listener // Wait group Wait *sync.WaitGroup // TLSConfig specifies tls.Config for secure serving TLSConfig *tls.Config // Metadata holds the server metadata Metadata metadata.Metadata // RegisterCheck run before register server RegisterCheck func(context.Context) error // Codecs map to handle content-type Codecs map[string]codec.Codec // ID holds the id of the server ID string // Namespace for te server Namespace string // Name holds the server name Name string // Address holds the server address Address string // Advertise holds the advertise address Advertise string // Version holds the server version Version string // SubWrappers holds the server subscribe wrappers SubWrappers []SubscriberWrapper // BatchSubWrappers holds the server batch subscribe wrappers BatchSubWrappers []BatchSubscriberWrapper // HdlrWrappers holds the handler wrappers HdlrWrappers []HandlerWrapper // RegisterAttempts holds the number of register attempts before error RegisterAttempts int // RegisterInterval holds he interval for re-register RegisterInterval time.Duration // RegisterTTL specifies TTL for register record RegisterTTL time.Duration // MaxConn limits number of connections MaxConn int // DeregisterAttempts holds the number of deregister attempts before error DeregisterAttempts int }
Options server struct
func NewOptions ¶
NewOptions returns new options struct with default or passed values
type Request ¶
type Request interface { // Service name requested Service() string // The action requested Method() string // Endpoint name requested Endpoint() string // Content type provided ContentType() string // Header of the request Header() metadata.Metadata // Body is the initial decoded value Body() interface{} // Read the undecoded request body Read() ([]byte, error) // The encoded message stream Codec() codec.Codec // Indicates whether its a stream Stream() bool }
Request is a synchronous request interface
type Response ¶
type Response interface { // Encoded writer Codec() codec.Codec // Write the header WriteHeader(md metadata.Metadata) // write a response directly to the client Write([]byte) error }
Response is the response writer for unencoded messages
type Server ¶
type Server interface { // Name returns server name Name() string // Initialise options Init(...Option) error // Retrieve the options Options() Options // Register a handler Handle(h Handler) error // Create a new handler NewHandler(h interface{}, opts ...HandlerOption) Handler // Create a new subscriber NewSubscriber(topic string, h interface{}, opts ...SubscriberOption) Subscriber // Register a subscriber Subscribe(s Subscriber) error // Start the server Start() error // Stop the server Stop() error // Server implementation String() string }
Server is a simple micro server abstraction
func FromContext ¶
FromContext returns Server from context
type Stream ¶
type Stream interface { Context() context.Context Request() Request Send(msg interface{}) error Recv(msg interface{}) error Error() error Close() error }
Stream represents a stream established with a client. A stream can be bidirectional which is indicated by the request. The last error will be left in Error(). EOF indicates end of the stream.
type StreamWrapper ¶
StreamWrapper wraps a Stream interface and returns the equivalent. Because streams exist for the lifetime of a method invocation this is a convenient way to wrap a Stream as its in use for trace, monitoring, metrics, etc.
type Subscriber ¶
type Subscriber interface { Topic() string Subscriber() interface{} Endpoints() []*register.Endpoint Options() SubscriberOptions }
Subscriber interface represents a subscription to a given topic using a specific subscriber function or object with endpoints. It mirrors the handler in its behaviour.
type SubscriberFunc ¶
SubscriberFunc represents a single method of a subscriber. It's used primarily for the wrappers. What's handed to the actual method is the concrete publication message.
type SubscriberOption ¶
type SubscriberOption func(*SubscriberOptions)
SubscriberOption func
func DisableAutoAck ¶
func DisableAutoAck() SubscriberOption
DisableAutoAck will disable auto acking of messages after they have been handled.
func SetSubscriberOption ¶
func SetSubscriberOption(k, v interface{}) SubscriberOption
SetSubscriberOption returns a function to setup a context with given value
func SubscriberAck ¶
func SubscriberAck(b bool) SubscriberOption
SubscriberAck control auto ack processing for handler
func SubscriberBatch ¶
func SubscriberBatch(b bool) SubscriberOption
SubscriberBatch control batch processing for handler
func SubscriberBatchSize ¶
func SubscriberBatchSize(n int) SubscriberOption
SubscriberBatchSize control batch filling size for handler Batch filling max waiting time controlled by SubscriberBatchWait
func SubscriberBatchWait ¶
func SubscriberBatchWait(td time.Duration) SubscriberOption
SubscriberBatchWait control batch filling wait time for handler
func SubscriberBodyOnly ¶
func SubscriberBodyOnly(b bool) SubscriberOption
SubscriberBodyOnly says broker that message contains raw data with absence of micro broker.Message format
func SubscriberContext ¶
func SubscriberContext(ctx context.Context) SubscriberOption
SubscriberContext set context options to allow broker SubscriberOption passed
func SubscriberGroup ¶
func SubscriberGroup(n string) SubscriberOption
SubscriberGroup sets the shared group name distributed messages across subscribers
func SubscriberQueue ¶
func SubscriberQueue(n string) SubscriberOption
SubscriberQueue sets the shared queue name distributed messages across subscribers
type SubscriberOptions ¶
type SubscriberOptions struct { // Context holds the external options Context context.Context // Queue holds the subscription queue Queue string // AutoAck flag for auto ack messages after processing AutoAck bool // BodyOnly flag specifies that message without headers BodyOnly bool // Batch flag specifies that message processed in batches Batch bool // BatchSize flag specifies max size of batch BatchSize int // BatchWait flag specifies max wait time for batch filling BatchWait time.Duration }
SubscriberOptions struct
func NewSubscriberOptions ¶
func NewSubscriberOptions(opts ...SubscriberOption) SubscriberOptions
NewSubscriberOptions create new SubscriberOptions
type SubscriberWrapper ¶
type SubscriberWrapper func(SubscriberFunc) SubscriberFunc
SubscriberWrapper wraps the SubscriberFunc and returns the equivalent