Documentation
¶
Index ¶
- Constants
- Variables
- func Connect(endpoint *ServiceEndpoint, connectTimeout time.Duration) (conn net.Conn, err error)
- func ReleaseOutboundResponse(resp IResponseContext)
- func ReplyError(req IRequestContext, status uint32)
- type Conn
- type Connection
- type Connector
- type IConnEventHandler
- type IHandshaker
- type IListener
- type IRequestContext
- type IRequestHandler
- type IResponseContext
- type InboundConfig
- type InboundConfigMap
- type InboundConnManager
- type InboundRequestContext
- type InboundRequestContextCreator
- type InboundResponseContext
- type Listener
- func (l *Listener) AcceptAndServe() error
- func (l *Listener) Close() error
- func (l *Listener) GetConnString() string
- func (l *Listener) GetName() string
- func (l *Listener) GetNumActiveConnections() uint32
- func (l *Listener) GetType() ListenerType
- func (l *Listener) Refresh()
- func (l *Listener) Shutdown()
- func (l *Listener) WaitForShutdownToComplete(timeout time.Duration)
- type ListenerConfig
- type ListenerType
- type OutboundConfig
- type OutboundConfigMap
- type OutboundConnector
- func (p *OutboundConnector) AllowRestart() bool
- func (p *OutboundConnector) Close()
- func (p *OutboundConnector) GetId() int
- func (p *OutboundConnector) GetPingIP() string
- func (p *OutboundConnector) Handshake() bool
- func (p *OutboundConnector) IsActive() bool
- func (p *OutboundConnector) Recycle()
- func (p *OutboundConnector) SetHandshaker(h IHandshaker)
- func (p *OutboundConnector) SetNewConn(c net.Conn)
- func (p *OutboundConnector) SetState(s StateType)
- func (p *OutboundConnector) Shutdown()
- func (p *OutboundConnector) Start()
- func (p *OutboundConnector) WriteStats(w io.Writer, indent int)
- type OutboundProcessor
- func (p *OutboundProcessor) GetConnInfo() string
- func (p *OutboundProcessor) GetIPPort() (ip string, port string)
- func (p *OutboundProcessor) GetIsConnected() int
- func (p *OutboundProcessor) GetNumConnections() int
- func (p *OutboundProcessor) GetRandFrequency(freq time.Duration) time.Duration
- func (p *OutboundProcessor) GetRequestCh() chan IRequestContext
- func (p *OutboundProcessor) GetRequestSendingQueueSize() int
- func (p *OutboundProcessor) Init(endpoint ServiceEndpoint, config *OutboundConfig, enableBounce bool)
- func (p *OutboundProcessor) OnConnectError(timeTaken time.Duration, connStr string, err error)
- func (p *OutboundProcessor) OnConnectSuccess(conn Conn, connector *OutboundConnector, timeTaken time.Duration)
- func (p *OutboundProcessor) Run()
- func (p *OutboundProcessor) SendRequest(req IRequestContext) (err *errors.Error)
- func (p *OutboundProcessor) SendRequestLowPriority(req IRequestContext) (err *errors.Error)
- func (p *OutboundProcessor) SetConnEventHandler(hdlr IConnEventHandler)
- func (p *OutboundProcessor) Shutdown()
- func (p *OutboundProcessor) Start()
- func (p *OutboundProcessor) WaitShutdown()
- func (p *OutboundProcessor) WriteStats(w io.Writer, indent int)
- type OutboundRequestContext
- type OutboundResponseContext
- type RequestContext
- func (r *RequestContext) Cancel()
- func (r *RequestContext) GetCtx() context.Context
- func (r *RequestContext) GetMessage() *proto.RawMessage
- func (r *RequestContext) GetReceiveTime() time.Time
- func (r *RequestContext) OnCleanup()
- func (r *RequestContext) OnComplete()
- func (r *RequestContext) OnExpiration()
- func (req *RequestContext) Read(r io.Reader) (n int, err error)
- func (r *RequestContext) Reply(resp IResponseContext)
- func (r *RequestContext) SetTimeout(parent context.Context, timeout time.Duration)
- func (r *RequestContext) WriteWithOpaque(opaque uint32, w io.Writer) (n int, err error)
- type ResponseContext
- func (r *ResponseContext) GetMessage() *proto.RawMessage
- func (r *ResponseContext) GetMsgSize() uint32
- func (r *ResponseContext) GetOpStatus() proto.OpStatus
- func (r *ResponseContext) GetStatus() (s uint32)
- func (r *ResponseContext) OnComplete()
- func (req *ResponseContext) Read(r io.Reader) (n int, err error)
- func (r *ResponseContext) Write(w io.Writer) (n int, err error)
- type ServiceEndpoint
- type SslListener
- type StateType
Constants ¶
View Source
const ( ListenerTypeTCP = ListenerType(iota) ListenerTypeTCPwSSL )
View Source
const ( WAITING = StateType(0) CONNECTING = StateType(1) SERVING = StateType(2) DRAINING = StateType(3) )
Variables ¶
View Source
var ( DefaultInboundConfig = InboundConfig{ HandshakeTimeout: util.Duration{Duration: 500 * time.Millisecond}, IdleTimeout: util.Duration{Duration: 120 * time.Second}, ReadTimeout: util.Duration{Duration: 500 * time.Millisecond}, WriteTimeout: util.Duration{Duration: 500 * time.Millisecond}, RequestTimeout: util.Duration{Duration: 600 * time.Millisecond}, MaxBufferedWriteSize: 64 * 1024, IOBufSize: 64 * 1024, RespChanSize: 10000, } DefaultOutboundConfig = OutboundConfig{ ConnectTimeout: util.Duration{Duration: 1 * time.Second}, ConnectRecycleT: util.Duration{Duration: 30 * time.Second}, GracefulShutdownTime: util.Duration{Duration: 200 * time.Millisecond}, EnableConnRecycle: false, ReqChanBufSize: 8092, MaxPendingQueSize: 8092, PendingQueExtra: 50, MaxBufferedWriteSize: 64 * 1024, ReconnectIntervalBase: 100, ReconnectIntervalMax: 20000, NumConnsPerTarget: 1, IOBufSize: 64 * 1024, } )
Functions ¶
func ReleaseOutboundResponse ¶
func ReleaseOutboundResponse(resp IResponseContext)
func ReplyError ¶
func ReplyError(req IRequestContext, status uint32)
Types ¶
type Conn ¶
type Connection ¶
type Connection struct {
// contains filtered or unexported fields
}
func (*Connection) DidResume ¶
func (c *Connection) DidResume() string
func (*Connection) GetCipherName ¶
func (c *Connection) GetCipherName() string
func (*Connection) GetNetConn ¶
func (c *Connection) GetNetConn() net.Conn
func (*Connection) GetStateString ¶
func (c *Connection) GetStateString() string
func (*Connection) GetTLSVersion ¶
func (c *Connection) GetTLSVersion() string
func (*Connection) IsTLS ¶
func (c *Connection) IsTLS() bool
type Connector ¶
type Connector struct {
// contains filtered or unexported fields
}
func (*Connector) OnKeepAlive ¶
func (c *Connector) OnKeepAlive()
type IConnEventHandler ¶
type IHandshaker ¶
type IHandshaker interface { GetName() string GetHandshakeTimeout() util.Duration GetPingRequest() IRequestContext ExpectResponse() bool OnPingResponse(ctx IResponseContext) bool GetPingIP() string }
type IListener ¶
type IListener interface { GetName() string GetType() ListenerType AcceptAndServe() error Close() error Shutdown() WaitForShutdownToComplete(time.Duration) GetConnString() string GetNumActiveConnections() uint32 Refresh() }
func NewListener ¶
func NewListener(cfg ListenerConfig, iocfg InboundConfig, reqHandler IRequestHandler) (lsnr IListener, err error)
func NewListenerWithFd ¶
func NewListenerWithFd(cfg ListenerConfig, iocfg InboundConfig, f *os.File, reqHandler IRequestHandler) (lsnr IListener, err error)
type IRequestContext ¶
type IRequestContext interface { util.QueItem GetMessage() *proto.RawMessage GetCtx() context.Context Cancel() Read(r io.Reader) (n int, err error) WriteWithOpaque(opaque uint32, w io.Writer) (n int, err error) Reply(resp IResponseContext) OnComplete() GetReceiveTime() time.Time SetTimeout(parent context.Context, duration time.Duration) }
func DefaultInboundRequestContexCreator ¶
func DefaultInboundRequestContexCreator(magic []byte, c *Connector) (ctx IRequestContext, err error)
func ExtendedRequestContexCreator ¶
func ExtendedRequestContexCreator(magic []byte, c *Connector) (ctx IRequestContext, err error)
type IRequestHandler ¶
type IRequestHandler interface { Init() GetReqCtxCreator() InboundRequestContextCreator Process(reqCtx IRequestContext) error Finish() OnKeepAlive(connector *Connector, reqCtx IRequestContext) error }
type IResponseContext ¶
type InboundConfig ¶
type InboundConfig struct { HandshakeTimeout util.Duration //only for TLS connection IdleTimeout util.Duration ReadTimeout util.Duration WriteTimeout util.Duration RequestTimeout util.Duration MaxBufferedWriteSize int IOBufSize int RespChanSize int }
func (*InboundConfig) SetDefaultIfNotDefined ¶
func (conf *InboundConfig) SetDefaultIfNotDefined() (set bool)
type InboundConfigMap ¶
type InboundConfigMap map[string]InboundConfig
func (*InboundConfigMap) SetDefaultIfNotDefined ¶
func (m *InboundConfigMap) SetDefaultIfNotDefined()
type InboundConnManager ¶
type InboundConnManager struct {
// contains filtered or unexported fields
}
func (*InboundConnManager) GetNumActiveConnections ¶
func (m *InboundConnManager) GetNumActiveConnections() uint32
thread safe?
func (*InboundConnManager) Shutdown ¶
func (m *InboundConnManager) Shutdown()
func (*InboundConnManager) TrackConn ¶
func (m *InboundConnManager) TrackConn(c *Connector, add bool)
func (*InboundConnManager) WaitForShutdownToComplete ¶
func (m *InboundConnManager) WaitForShutdownToComplete(timeout time.Duration)
type InboundRequestContext ¶
type InboundRequestContext struct { RequestContext // contains filtered or unexported fields }
func NewInboundRequestContext ¶
func NewInboundRequestContext(c *Connector) (r *InboundRequestContext)
func (*InboundRequestContext) GetListenerType ¶
func (r *InboundRequestContext) GetListenerType() ListenerType
func (*InboundRequestContext) SetResponseChannel ¶
func (r *InboundRequestContext) SetResponseChannel(ch chan<- IResponseContext)
type InboundRequestContextCreator ¶
type InboundRequestContextCreator func(magic []byte, c *Connector) (ctx IRequestContext, err error)
type InboundResponseContext ¶
type InboundResponseContext struct {
ResponseContext
}
func NewInboundResponseContext ¶
func NewInboundResponseContext(opMsg *proto.OperationalMessage) (r *InboundResponseContext, err error)
func NewInboundRespose ¶
func NewInboundRespose(opCode proto.OpCode, m *proto.RawMessage) (r *InboundResponseContext)
type Listener ¶
type Listener struct {
// contains filtered or unexported fields
}
func (*Listener) AcceptAndServe ¶
func (*Listener) GetConnString ¶
func (*Listener) GetNumActiveConnections ¶
func (*Listener) GetType ¶
func (l *Listener) GetType() ListenerType
func (*Listener) WaitForShutdownToComplete ¶
type ListenerConfig ¶
type ListenerConfig struct { ServiceEndpoint Name string }
func (*ListenerConfig) SetDefaultIfNotDefined ¶
func (cfg *ListenerConfig) SetDefaultIfNotDefined()
type ListenerType ¶
type ListenerType byte
type OutboundConfig ¶
type OutboundConfig struct { ConnectTimeout util.Duration ConnectRecycleT util.Duration GracefulShutdownTime util.Duration EnableConnRecycle bool ReqChanBufSize int MaxPendingQueSize int PendingQueExtra int MaxBufferedWriteSize int ReconnectIntervalBase int ReconnectIntervalMax int NumConnsPerTarget int32 IOBufSize int }
func (*OutboundConfig) SetDefaultIfNotDefined ¶
func (conf *OutboundConfig) SetDefaultIfNotDefined() (set bool)
type OutboundConfigMap ¶
type OutboundConfigMap map[string]OutboundConfig
func (*OutboundConfigMap) SetDefaultIfNotDefined ¶
func (m *OutboundConfigMap) SetDefaultIfNotDefined()
type OutboundConnector ¶
type OutboundConnector struct {
// contains filtered or unexported fields
}
func NewOutboundConnector ¶
func NewOutboundConnector(id int, c net.Conn, reqCh chan IRequestContext, monCh chan int, config *OutboundConfig) (p *OutboundConnector)
constructor/factory each OutboundConnector object will have two go routines one for Read; one for Write
func (*OutboundConnector) AllowRestart ¶
func (p *OutboundConnector) AllowRestart() bool
func (*OutboundConnector) Close ¶
func (p *OutboundConnector) Close()
func (*OutboundConnector) GetId ¶
func (p *OutboundConnector) GetId() int
func (*OutboundConnector) GetPingIP ¶
func (p *OutboundConnector) GetPingIP() string
func (*OutboundConnector) IsActive ¶
func (p *OutboundConnector) IsActive() bool
func (*OutboundConnector) Recycle ¶
func (p *OutboundConnector) Recycle()
func (*OutboundConnector) SetHandshaker ¶
func (p *OutboundConnector) SetHandshaker(h IHandshaker)
func (*OutboundConnector) SetNewConn ¶
func (p *OutboundConnector) SetNewConn(c net.Conn)
func (*OutboundConnector) SetState ¶
func (p *OutboundConnector) SetState(s StateType)
func (*OutboundConnector) Shutdown ¶
func (p *OutboundConnector) Shutdown()
wait for all go routine to finish
func (*OutboundConnector) Start ¶
func (p *OutboundConnector) Start()
func (*OutboundConnector) WriteStats ¶
func (p *OutboundConnector) WriteStats(w io.Writer, indent int)
type OutboundProcessor ¶
type OutboundProcessor struct {
// contains filtered or unexported fields
}
OutboundProcessor manages a pool of one or more underlying connections to a downstream server; It also bounces incoming requests when all connections are down.
func NewOutbProcessor ¶
func NewOutbProcessor(endpoint ServiceEndpoint, config *OutboundConfig, enableBounce bool) (p *OutboundProcessor)
func NewOutboundProcessor ¶
func NewOutboundProcessor(connInfo string, config *OutboundConfig, enableBounce bool) *OutboundProcessor
func (*OutboundProcessor) GetConnInfo ¶
func (p *OutboundProcessor) GetConnInfo() string
func (*OutboundProcessor) GetIPPort ¶
func (p *OutboundProcessor) GetIPPort() (ip string, port string)
func (*OutboundProcessor) GetIsConnected ¶
func (p *OutboundProcessor) GetIsConnected() int
func (*OutboundProcessor) GetNumConnections ¶
func (p *OutboundProcessor) GetNumConnections() int
func (*OutboundProcessor) GetRandFrequency ¶
func (p *OutboundProcessor) GetRandFrequency(freq time.Duration) time.Duration
func (*OutboundProcessor) GetRequestCh ¶
func (p *OutboundProcessor) GetRequestCh() chan IRequestContext
func (*OutboundProcessor) GetRequestSendingQueueSize ¶
func (p *OutboundProcessor) GetRequestSendingQueueSize() int
func (*OutboundProcessor) Init ¶
func (p *OutboundProcessor) Init(endpoint ServiceEndpoint, config *OutboundConfig, enableBounce bool)
func (*OutboundProcessor) OnConnectError ¶
func (p *OutboundProcessor) OnConnectError(timeTaken time.Duration, connStr string, err error)
func (*OutboundProcessor) OnConnectSuccess ¶
func (p *OutboundProcessor) OnConnectSuccess(conn Conn, connector *OutboundConnector, timeTaken time.Duration)
func (*OutboundProcessor) Run ¶
func (p *OutboundProcessor) Run()
func (*OutboundProcessor) SendRequest ¶
func (p *OutboundProcessor) SendRequest(req IRequestContext) (err *errors.Error)
func (*OutboundProcessor) SendRequestLowPriority ¶
func (p *OutboundProcessor) SendRequestLowPriority(req IRequestContext) (err *errors.Error)
func (*OutboundProcessor) SetConnEventHandler ¶
func (p *OutboundProcessor) SetConnEventHandler(hdlr IConnEventHandler)
func (*OutboundProcessor) Shutdown ¶
func (p *OutboundProcessor) Shutdown()
func (*OutboundProcessor) Start ¶
func (p *OutboundProcessor) Start()
func (*OutboundProcessor) WaitShutdown ¶
func (p *OutboundProcessor) WaitShutdown()
func (*OutboundProcessor) WriteStats ¶
func (p *OutboundProcessor) WriteStats(w io.Writer, indent int)
type OutboundRequestContext ¶
type OutboundRequestContext struct {
RequestContext
}
func NewOutboundRequestContext ¶
func NewOutboundRequestContext(msg *proto.RawMessage, opaque uint32, ctx context.Context, ch chan<- IResponseContext, to time.Duration) (r *OutboundRequestContext)
func (*OutboundRequestContext) OnCleanup ¶
func (r *OutboundRequestContext) OnCleanup()
func (*OutboundRequestContext) OnExpiration ¶
func (r *OutboundRequestContext) OnExpiration()
func (*OutboundRequestContext) Reply ¶
func (r *OutboundRequestContext) Reply(resp IResponseContext)
type OutboundResponseContext ¶
type OutboundResponseContext struct { ResponseContext // contains filtered or unexported fields }
func NewErrorOutboundResponse ¶
func NewErrorOutboundResponse(status uint32) (r *OutboundResponseContext)
func NewOutboundResponse ¶
func NewOutboundResponse() *OutboundResponseContext
func (*OutboundResponseContext) GetStatus ¶
func (r *OutboundResponseContext) GetStatus() uint32
type RequestContext ¶
type RequestContext struct { util.QueItemBase // contains filtered or unexported fields }
Implement IRequestContext
func (*RequestContext) Cancel ¶
func (r *RequestContext) Cancel()
func (*RequestContext) GetCtx ¶
func (r *RequestContext) GetCtx() context.Context
func (*RequestContext) GetMessage ¶
func (r *RequestContext) GetMessage() *proto.RawMessage
func (*RequestContext) GetReceiveTime ¶
func (r *RequestContext) GetReceiveTime() time.Time
func (*RequestContext) OnCleanup ¶
func (r *RequestContext) OnCleanup()
func (*RequestContext) OnComplete ¶
func (r *RequestContext) OnComplete()
func (*RequestContext) OnExpiration ¶
func (r *RequestContext) OnExpiration()
func (*RequestContext) Reply ¶
func (r *RequestContext) Reply(resp IResponseContext)
func (*RequestContext) SetTimeout ¶
func (r *RequestContext) SetTimeout(parent context.Context, timeout time.Duration)
To be implement
func (*RequestContext) WriteWithOpaque ¶
type ResponseContext ¶
type ResponseContext struct {
// contains filtered or unexported fields
}
Implement IResponseContext
func (*ResponseContext) GetMessage ¶
func (r *ResponseContext) GetMessage() *proto.RawMessage
func (*ResponseContext) GetMsgSize ¶
func (r *ResponseContext) GetMsgSize() uint32
func (*ResponseContext) GetOpStatus ¶
func (r *ResponseContext) GetOpStatus() proto.OpStatus
func (*ResponseContext) GetStatus ¶
func (r *ResponseContext) GetStatus() (s uint32)
func (*ResponseContext) OnComplete ¶
func (r *ResponseContext) OnComplete()
type ServiceEndpoint ¶
func (*ServiceEndpoint) GetConnString ¶
func (p *ServiceEndpoint) GetConnString() (str string)
/TODO . now, it is just a very simple one to construct connection string
func (*ServiceEndpoint) SetFromConnString ¶
func (p *ServiceEndpoint) SetFromConnString(connStr string) error
/TODO . now, it is just a very simple function to parse connection string
func (*ServiceEndpoint) Validate ¶
func (p *ServiceEndpoint) Validate() (err error)
type SslListener ¶
type SslListener struct {
Listener
}
func (*SslListener) AcceptAndServe ¶
func (l *SslListener) AcceptAndServe() error
func (*SslListener) GetType ¶
func (l *SslListener) GetType() ListenerType
func (*SslListener) Refresh ¶
func (l *SslListener) Refresh()
Source Files
¶
Click to show internal directories.
Click to hide internal directories.