Documentation ¶
Overview ¶
Package adaptiveservice is a message oriented micro service framework.
Servers define micro services identified as name of "publisher_service" and publish them to all available scopes: in same process, and further in same OS, and then further in same local network, and then public network where a public root registry address needs to be configured. In process and OS scope, one service name can only be announced once, duplicated service name is treated as error. In network scope, there can be multiple services with the same name, in this case, each service provider publishes the service "publisher_service" along with an unique provider ID.
Clients then discover wanted micro services in a way that shortest scope comes first. The discover() API returns a connection channel, reading the channel the client will get one or more connections, with each represents a connection to one of the service providers providing the wanted micro service. The connection then can be used to send/receive messages to/from the service provider.
Connections can be multiplexed on client side: NewStream() API creates a new context in which the messages are transferred independently from other contexts over the same underlying connection. The intention of the multiplexer is to have scalability on client side: users use this mechanism to send parallel request messages towards the same service provider to increase execution concurrency.
For server side, the incoming messages are handled in auto-scaled worker pool, so the multiplexer used on client side is not needed on server side. Servers listen to different transports for all available scopes:
process scope, go channels are used OS scope, unix domain socket is used network scope, tcp socket is used
Messages that satisfy Handle() interface are known messages. Typically server defines Handle() method for every message type it can handle, then when the known message arrived on one of the transports it is listening, the message is delivered to one of the workers in which the message's Handle() is called. Clients do not define Handle() method, they just send and receive message in a natural synchronized fashion.
Services that are behind NAT can be auto proxied by the builtin reverseProxy service provided by the daemon server in the local network or by the root registry.
Index ¶
- Constants
- Variables
- func ErrServiceNotFound(publisher, service string) error
- func GetKnownMessageTypes() []string
- func GetRegisteredTypeByName(name string) reflect.Type
- func NewStreamIO(stream Stream) io.ReadWriteCloser
- func ReadAllTracedMsg() (string, error)
- func ReadTracedMsg(token string) (string, error)
- func RegisterType(i interface{})
- func RegisterTypeNoPanic(i interface{}) (err error)
- func TraceMsgByName(name string, count uint32) (token string, err error)
- func TraceMsgByNameWithFilters(name string, count uint32, filters []string) (token string, err error)
- func TraceMsgByType(msg any, count uint32) (token string, err error)
- func TraceMsgByTypeWithFilters(msg any, count uint32, filters []string) (token string, err error)
- func UnTraceMsgAll()
- type Client
- type Connection
- type Context
- type ContextStream
- type GetObservedIP
- type HighPriorityMessage
- type KnownMessage
- type ListService
- type Logger
- type LoggerAll
- type LoggerNull
- type LowPriorityMessage
- type MsgQInfo
- type Netconn
- type Option
- type ProviderSelectionMethod
- type QueryMsgQInfo
- type ReqProviderInfo
- type Scope
- type Server
- func (s *Server) Close()
- func (s *Server) CloseWait()
- func (s *Server) DisableMsgTypeCheck() *Server
- func (s *Server) EnableAutoReverseProxy() *Server
- func (s *Server) EnableIPObserver() *Server
- func (s *Server) EnableMessageTracer() *Server
- func (s *Server) EnableRootRegistry() *Server
- func (s *Server) EnableServiceLister() *Server
- func (s *Server) Publish(serviceName string, knownMessages []KnownMessage, options ...ServiceOption) error
- func (s *Server) PublishIn(scope Scope, serviceName string, knownMessages []KnownMessage, ...) error
- func (s *Server) Serve() error
- func (s *Server) SetBroadcastPort(port string) *Server
- func (s *Server) SetPublisher(publisherName string) *Server
- func (s *Server) SetScaleFactors(residentWorkers, qSizePerCore, qWeight int) *Server
- type ServiceInfo
- type ServiceOption
- type Stream
Constants ¶
const ( // BuiltinPublisher name BuiltinPublisher = "builtin" // OK can be returned by known messages as reply to indicate // everything is OK. Client should use type int to receive it. OK = 0 )
const ( SrvMessageTracing = "messageTracing" MaxTracingSessions = 4096 )
SrvMessageTracing : service messageTracing
const ( // DefaultQSizePerCore is the default value for qSizePerCore DefaultQSizePerCore int = 128 // DefaultQWeight is the default value for qWeight DefaultQWeight int = 8 )
const SrvIPObserver = "IPObserver"
SrvIPObserver : service IPObserver
const SrvLANRegistry = "LANRegistry"
SrvLANRegistry : service LANRegistry
const SrvProviderInfo = "providerInfo"
SrvProviderInfo : service providerInfo
const SrvRegistryInfo = "registryInfo"
SrvRegistryInfo : service registryInfo
const SrvReverseProxy = "reverseProxy"
SrvReverseProxy : service reverseProxy
const SrvServiceLister = "serviceLister"
SrvServiceLister : service serviceLister
Variables ¶
var ( // ErrServiceNotReachable is an error where the service exists // but somehow can not be reached, e.g. the service is behind NAT. ErrServiceNotReachable = errors.New("service not reachable") // ErrConnReset is an error where the connection was forced closed // by peer. ErrConnReset = errors.New("connection reset by peer") // ErrServerClosed is an error where the server was closed by signal. ErrServerClosed = errors.New("server closed by signal") // ErrRecvTimeout is an error where no data was received within // specified duration. ErrRecvTimeout = errors.New("receive timeout") )
Functions ¶
func ErrServiceNotFound ¶
ErrServiceNotFound returns an error that no wanted service was found
func GetKnownMessageTypes ¶ added in v0.10.3
func GetKnownMessageTypes() []string
GetKnownMessageTypes returns all KnownMessage types
func GetRegisteredTypeByName ¶ added in v0.10.3
GetRegisteredTypeByName returns a reflect.Type if it has been registered
func NewStreamIO ¶ added in v0.9.5
func NewStreamIO(stream Stream) io.ReadWriteCloser
NewStreamIO wraps the stream to be an io.ReadWriteCloser in which Read() is a Stream.Recv() that only receives []byte, Write is a Stream.Send() that only sends []byte. Use Read() Write() in pair on the client/server peer, don't mix use them with Send() or Recv().
func ReadAllTracedMsg ¶ added in v0.10.4
ReadAllTracedMsg is equivalent to ReadTracedMsg("00000000-0000-0000-0000-000000000000.0")
func ReadTracedMsg ¶ added in v0.10.1
ReadTracedMsg reads all the collected traced messages by the token returned by TraceMsgByType().
token: should be in the format like 30180061-1044-4b9e-a8ee-174806afe058.0 Special tokens starting with 00000000-0000-0000-0000-000000000000 are used to retrieve all records on behalf of all previous tokens.
func RegisterType ¶
func RegisterType(i interface{})
RegisterType registers the type infomation to encoding sub system.
func RegisterTypeNoPanic ¶ added in v0.9.24
func RegisterTypeNoPanic(i interface{}) (err error)
RegisterTypeNoPanic is like RegisterType but recovers from panic.
func TraceMsgByName ¶ added in v0.10.3
TraceMsgByName is like TraceMsgByType but takes the message type name
func TraceMsgByNameWithFilters ¶ added in v0.10.5
func TraceMsgByNameWithFilters(name string, count uint32, filters []string) (token string, err error)
TraceMsgByNameWithFilters is like TraceMsgByTypeWithFilters but takes the message type name
func TraceMsgByType ¶ added in v0.10.1
TraceMsgByType traces the message type specified by 'msg' 'count' times repeatedly. Each matching of the 'msg' type starts a message tracing session and has a unique session token.
Tracing is type based and always starts from the client side. If a message to be sent by client matches the specified type, it is marked as traced message, a special traced flag will be carried along the entire path across all the service nodes that are involved to handle this message.
When such messages with traced flag are being handled in `Handle(ContextStream) any` on server side, all subsequent messages under the same stream context are related messages. All related messages will also carry the same traced flag and propagate the flag further to next hop and next next hop... All related messages with traced flag will be recorded by built-in service "messageTracing".
msg: any value with the same type of the message to be traced
A call to TraceMsgByType() with 'count' equals 1 only starts a one time tracing session. The subsequent messages with the same type will not be traced unless another call to TraceMsgByType() is made with the same type. TraceMsgByType can work with different input message types at the same time.
msg: any value with the same type of the message to be traced count: the maximum number is the value of MaxTracingSessions >0: trace count times 0: stop tracing the specified message type
The returned token has below forms:
if count > 1, for example count = 100 30180061-1044-4b9e-a8ee-174806afe058.0..99 if count = 1: 30180061-1044-4b9e-a8ee-174806afe058.0 if count = 0: NA
func TraceMsgByTypeWithFilters ¶ added in v0.10.5
TraceMsgByTypeWithFilters is like TraceMsgByType but takes extra filters. Filter should be in the form of "field=pattern", in which pattern supports simple wildcard. Tracing sessions start only if all filters match their patterns.
func UnTraceMsgAll ¶ added in v0.10.4
func UnTraceMsgAll()
UnTraceMsgAll untags all message types that have been tagged by TraceMsgBy* functions
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client uses services.
func (*Client) Discover ¶
func (c *Client) Discover(publisher, service string, providerIDs ...string) <-chan Connection
Discover discovers the wanted service and returns the connection channel, from which user can get one or more connections. Each connection represents a connection to one of the service providers providing the wanted micro service.
Only one service(identified by publisher name and service name) can exist in ScopeProcess and ScopeOS, but in ScopeLAN and ScopeWAN there can be many systems providing the same service, each systeam(called provider) has an unique provider ID.
Use providerIDs to select target providers in ScopeLAN or ScopeWAN, if no provider id presents, discover searches scopes by distance that is in the order of ScopeProcess, ScopeOS, ScopeLAN, ScopeWAN, and returns only one connection towards the found service which may have been randomly selected if more than one services were found.
If any of publisher or service or provider ids contains "*", discover will return all currently available connections of the wanted service(s). Make sure to close ALL the connections it returns after use.
func (*Client) SetDeepCopy ¶
SetDeepCopy sets the client to deep copy the message before sending it out. This option is only useful for process scope where by default the server can modify the message that the client owns because server and client in the same process space, and this case only happens when the message handler in the server has reference receiver.
Unexported fields in the message are not copied but set to zero value. Default is zero copy.
func (*Client) SetDiscoverTimeout ¶
SetDiscoverTimeout sets the wait timeout in seconds of the discover procedure. The discover procedure waits for the wanted service to be available until timeout. The connection channel returned by Discover() will be closed if timeout happens.
> 0 : wait timeout seconds = 0 : do not wait < 0 : wait forever
Default -1.
func (*Client) SetProviderSelectionMethod ¶ added in v0.12.1
func (c *Client) SetProviderSelectionMethod(method ProviderSelectionMethod) *Client
SetProviderSelectionMethod sets the ProviderSelectionMethod. It can be called multiple times with different ProviderSelectionMethod and the provider selection procedure will follow the calling sequence, e.g.
c.SetProviderSelectionMethod(Capacity).SetProviderSelectionMethod(Latency)
In this case, it will first select the top 50% of the provider(s) with Capacity method. If there are still multiple providers, the Latency method is then applied in turn to also select top 50%. The Random ProviderSelectionMethod is applied at last if needed to ensure that wanted number of provider(s) will be ultimately selected.
type Connection ¶
type Connection interface { // default stream. Stream // NewStream creates a new stream. NewStream() Stream // Close closes the connection. Close() }
Connection is the connection between client and server.
type Context ¶
type Context interface { // PutVar puts value v to the underlying map overriding the old value of the same type. PutVar(v interface{}) // GetVar gets value that v points to from the underlying map, it panics if v // is not a non-nil pointer. // The value that v points to will be set to the value in the context if value // of the same type has been putted to the map, otherwise zero value will be set. GetVar(v interface{}) // SetContext sets the context with value v which supposedly is a pointer to // an instance of the struct associated to the connection. // It panics if v is not a non-nil pointer. // It is supposed to be called only once upon a new connection is connected. SetContext(v interface{}) // GetContext gets the context that has been set by SetContext. GetContext() interface{} }
Context represents a context.
type ContextStream ¶
ContextStream is a stream with an associated context. Messages from the same stream have the same context, their handlers may be executed concurrently.
type GetObservedIP ¶ added in v0.9.5
type GetObservedIP struct{}
GetObservedIP returns the observed IP of the client. The reply is string type.
func (GetObservedIP) Handle ¶ added in v0.9.5
func (msg GetObservedIP) Handle(stream ContextStream) (reply interface{})
Handle handles GetObservedIP message.
type HighPriorityMessage ¶
type HighPriorityMessage interface { KnownMessage IsHighPriority() }
HighPriorityMessage is high priority KnownMessage.
type KnownMessage ¶
type KnownMessage interface { // Handle handles the message. // If reply is nil, no message will be sent back. // If reply is not nil, the value will be sent back to the stream peer. // If reply is error type, the peer's Recv() call will return it as error. // Otherwise the reply will be received by the peer's Recv() as normal message. // // The message may be marshaled or compressed. // Remember in golang assignment to interface is also value copy, // so return reply as &someStruct whenever possible in your handler implementation. // // Users can directly use ContextStream to send/receive messages to/from the stream // peer(the client) via a private dedicated channel. // // Use of reply is more like RPC fashion, where clients "call" the Handle() method // on the server side as if Handle() were called on clients. // Always return reply to client is a good idea since client is waiting for the reply. // In the case where only final status is needed, return builtin OK on success or // return error type if any error happened, e.g. return errors.New("some error"). // // Use of ContextStream is more like client and server entered in an interactive // session, in which several messages are exchanged between client and server. // // Cares should be taken if you mix the use of reply and ContextStream. Handle(stream ContextStream) (reply interface{}) }
KnownMessage represents a message with a handler that knows how to process the message. KnownMessage is normal priority message.
type ListService ¶
ListService lists all services in specified scopes matching publisher/service name which can be wildcard:
"*" matches all "*bar*" matches bar, foobar, or foobarabc "foo*abc*" matches foobarabc, foobarabc123, or fooabc
The reply is [4][]*ServiceInfo
func (*ListService) Handle ¶
func (msg *ListService) Handle(stream ContextStream) (reply interface{})
Handle handles ListService message.
type Logger ¶
type Logger interface { Debugf(format string, args ...interface{}) Infof(format string, args ...interface{}) Warnf(format string, args ...interface{}) Errorf(format string, args ...interface{}) }
Logger is the logger interface.
type LoggerAll ¶
type LoggerAll struct{}
LoggerAll prints all regardless of loglevel
type LoggerNull ¶
type LoggerNull struct{}
LoggerNull prints no log
func (LoggerNull) Debugf ¶
func (LoggerNull) Debugf(format string, args ...interface{})
Debugf is Debugf
func (LoggerNull) Errorf ¶
func (LoggerNull) Errorf(format string, args ...interface{})
Errorf is Errorf
func (LoggerNull) Infof ¶
func (LoggerNull) Infof(format string, args ...interface{})
Infof is Infof
func (LoggerNull) Warnf ¶
func (LoggerNull) Warnf(format string, args ...interface{})
Warnf is Warnf
type LowPriorityMessage ¶
type LowPriorityMessage interface { KnownMessage IsLowPriority() }
LowPriorityMessage is Low priority KnownMessage.
type MsgQInfo ¶ added in v0.12.1
type MsgQInfo struct { NumCPU int ResidentWorkers int QueueWeight int QueueLen int QueueSize int BusyWorkerNum int IdleWorkerNum int }
MsgQInfo is the running status of the message queue
type Netconn ¶
type Netconn interface { // Close closes the connection. // Any blocked Read or Write operations will be unblocked and return errors. Close() error // LocalAddr returns the local network address. LocalAddr() net.Addr // RemoteAddr returns the remote network address. RemoteAddr() net.Addr }
Netconn is the underlying net connection.
type Option ¶
type Option func(*conf)
Option is option to be set.
func WithProviderID ¶
WithProviderID sets the provider ID which is used to identify service in the network where there are multiple publisher_service instances found in the registry. Provider ID is usually shared by servers in the same network node.
func WithRegistryAddr ¶
WithRegistryAddr sets the registry address in format ip:port.
type ProviderSelectionMethod ¶ added in v0.12.1
type ProviderSelectionMethod uint16
ProviderSelectionMethod is the method to select the most suitable service provider when client discovered provider candidates more than one.
Random the last method used when there are still more than one providers after other ProviderSelectionMethods were applied.
const ( // Capacity is the strategy to select the service provider which should // be more capable to serve the client. The selected provider // has fewer items to precess or is under lighter working load // or has more workers. Capacity ProviderSelectionMethod = iota + 1 // Latency is the strategy to select the service provider that has faster // response time, this usually indicates better networking connection // and the service provider is in a healthy responsive condition. Latency // Invalid is invalid strategy Invalid )
type QueryMsgQInfo ¶ added in v0.12.1
type QueryMsgQInfo struct{}
QueryMsgQInfo gets the MsgQInfo
func (QueryMsgQInfo) Handle ¶ added in v0.12.1
func (msg QueryMsgQInfo) Handle(stream ContextStream) (reply any)
Handle handles QueryMsgQInfo
type ReqProviderInfo ¶
type ReqProviderInfo struct{}
ReqProviderInfo gets self provider ID, reply with string.
func (*ReqProviderInfo) Handle ¶
func (msg *ReqProviderInfo) Handle(stream ContextStream) (reply interface{})
Handle handles ReqProviderInfo.
type Scope ¶
type Scope uint16
Scope is publishing and discovering scope
const ( // ScopeProcess is a scope where publishing and discovering services // only happen in same process. ScopeProcess Scope = 1 << iota // ScopeOS is a scope where publishing and discovering services // only happen in same OS. ScopeOS // ScopeLAN is a scope where publishing and discovering services // only happen in same local network. ScopeLAN // ScopeWAN is a scope where publishing and discovering services // only happen in same reachable public network. ScopeWAN // ScopeNetwork is a shortcut for ScopeLAN and ScopeWAN ScopeNetwork = ScopeLAN | ScopeWAN // ScopeAll includes all scopes, this is the default value if // no other Scope specified. ScopeAll = ScopeProcess | ScopeOS | ScopeLAN | ScopeWAN )
type Server ¶
Server provides services.
func (*Server) CloseWait ¶ added in v0.9.14
func (s *Server) CloseWait()
CloseWait triggers the close procedure and waits until the server is fully closed.
func (*Server) DisableMsgTypeCheck ¶
DisableMsgTypeCheck disables message type checking for incoming messages.
func (*Server) EnableAutoReverseProxy ¶
EnableAutoReverseProxy tries to enable reverse proxy.
func (*Server) EnableIPObserver ¶ added in v0.9.5
EnableIPObserver enables IP observer service which helps the requesting client to find out its observed IP address.
func (*Server) EnableMessageTracer ¶ added in v0.10.1
EnableMessageTracer enables message tracing service which helps to collect local traced messages.
func (*Server) EnableRootRegistry ¶
EnableRootRegistry makes the server become root registry.
func (*Server) EnableServiceLister ¶
EnableServiceLister enables lister service which can list available services.
func (*Server) Publish ¶
func (s *Server) Publish(serviceName string, knownMessages []KnownMessage, options ...ServiceOption) error
Publish publishes service to all available scope of Server s. knownMessages are messages that the service can handle, e.g. []KnownMessage{(*PublicStructA)(nil), (*PublicStructB)(nil), ...}, where (*PublicStructA) and (*PublicStructB) are the known messages that have `Handle(stream ContextStream) reply interface{}` method.
Publish panics if serviceName contains "_" or "/" or whitespace.
func (*Server) PublishIn ¶ added in v0.9.9
func (s *Server) PublishIn(scope Scope, serviceName string, knownMessages []KnownMessage, options ...ServiceOption) error
PublishIn is like Publish, but with specified scope which should be a subset of the scope of Server s.
func (*Server) SetBroadcastPort ¶
SetBroadcastPort sets the broadcast port used by lan registry.
func (*Server) SetPublisher ¶
SetPublisher declares the publisher of the server, which is usually an organization name. The default value of publisherName is "default.org".
SetPublisher panics if publisherName contains "_" or "/" or whitespace.
func (*Server) SetScaleFactors ¶ added in v0.9.20
SetScaleFactors sets the scale factors to be applied on the internal message queue.
residentWorkers: > 0: the number of resident workers. <= 0: uses default value. Default is 1. qSizePerCore: > 0: the internal message queue size per core. <= 0: uses default value. Default is DefaultQSizePerCore. qWeight: > 0: the weight of the message queue. 0: uses default value. Default is DefaultQWeight. < 0: the number of workers is fixed at residentWorkers.
A Server has one internal message queue, messages received from transport layer are put into the queue, a number of workers get message from the queue and handle it. The size of the message queue is qSizePerCore*core number.
When qWeight >= 0, a worker pool is created, the number of wokers scales automatically in the pool. The target number of available workers is dynamically adjusted by below formula periodically:
target available workers = len(message queue)/qWeight + residentWorkers
The worker pool continuously adds/removes workers to follow the target.
Be careful to set qWeight < 0, which effectively disables auto scala worker pool, which in turn only uses fixed number of workers(residentWorkers). Forever blocking may occur in such case, especially when residentWorkers = 1.
type ServiceInfo ¶
type ServiceInfo struct { Publisher string Service string ProviderID string Addr string // "192.168.0.11:12345", "192.168.0.11:12345P" if proxied }
ServiceInfo is service information.
type ServiceOption ¶
type ServiceOption func(*service)
ServiceOption is option for service.
func OnConnectFunc ¶
func OnConnectFunc(fn func(Netconn) (takeOver bool)) ServiceOption
OnConnectFunc sets a function which is called when new incoming connection is established. Further message dispaching on this connection will stop if fn returns true, leaving the connection NOT closed, fn should then take over this Netconn and close it when finished.
func OnDisconnectFunc ¶
func OnDisconnectFunc(fn func(Netconn)) ServiceOption
OnDisconnectFunc sets a function which is called when the connection was disconnected.
func OnNewStreamFunc ¶
func OnNewStreamFunc(fn func(Context)) ServiceOption
OnNewStreamFunc sets a function which is called to initialize the context when new incoming stream is accepted.
func OnStreamCloseFunc ¶ added in v0.10.1
func OnStreamCloseFunc(fn func(Context)) ServiceOption
OnStreamCloseFunc sets a function which is called when the stream is being closed.
type Stream ¶
type Stream interface { // Send sends a message to the stream peer. If msg is an error value, it will // be received and returned by peer's Recv() as error. Send(msg interface{}) error // Recv receives a message from the stream peer and stores it into the value // that msgPtr points to. // // msgPtr can be nil, where user only cares about error, otherwise // it panics if msgPtr is not a non-nil pointer. Recv(msgPtr interface{}) error // SendRecv combines send and receive on the same stream. SendRecv(msgSnd interface{}, msgRcvPtr interface{}) error // GetNetconn gets the transport connection. GetNetconn() Netconn // SetRecvTimeout sets the timeout for each Recv(), which waits at least duration // d and returns ErrRecvTimeout if no data was received within that duration. // A negative or zero duration causes Recv() waits forever. // Default is 0. SetRecvTimeout(d time.Duration) // Close closes the stream Close() }
Stream is an independent channel multiplexed from the underlying connection.
Source Files ¶
- adaptiveservice.go
- chantransportclient.go
- chantransportserver.go
- client.go
- client_options.go
- context.go
- ipobserver_svc.go
- lanregistry_svc.go
- message.go
- messagetracing.go
- messagetracing_svc.go
- options.go
- providerinfo_svc.go
- registry.go
- registryinfo_svc.go
- reverseproxy_svc.go
- scalamsgq.go
- server.go
- server_options.go
- service_options.go
- servicelister_svc.go
- stream.go
- streamtransportclient.go
- streamtransportserver.go
- workerpool.go