io

package
v0.0.0-...-9750751 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 21, 2024 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ListenerTypeTCP = ListenerType(iota)
	ListenerTypeTCPwSSL
)
View Source
const (
	WAITING    = StateType(0)
	CONNECTING = StateType(1)
	SERVING    = StateType(2)
	DRAINING   = StateType(3)
)

Variables

View Source
var (
	DefaultInboundConfig = InboundConfig{
		HandshakeTimeout:     util.Duration{Duration: 500 * time.Millisecond},
		IdleTimeout:          util.Duration{Duration: 120 * time.Second},
		ReadTimeout:          util.Duration{Duration: 500 * time.Millisecond},
		WriteTimeout:         util.Duration{Duration: 500 * time.Millisecond},
		RequestTimeout:       util.Duration{Duration: 600 * time.Millisecond},
		MaxBufferedWriteSize: 64 * 1024,
		IOBufSize:            64 * 1024,
		RespChanSize:         10000,
	}

	DefaultOutboundConfig = OutboundConfig{
		ConnectTimeout:        util.Duration{Duration: 1 * time.Second},
		ConnectRecycleT:       util.Duration{Duration: 30 * time.Second},
		GracefulShutdownTime:  util.Duration{Duration: 200 * time.Millisecond},
		EnableConnRecycle:     false,
		ReqChanBufSize:        8092,
		MaxPendingQueSize:     8092,
		PendingQueExtra:       50,
		MaxBufferedWriteSize:  64 * 1024,
		ReconnectIntervalBase: 100,
		ReconnectIntervalMax:  20000,
		NumConnsPerTarget:     1,
		IOBufSize:             64 * 1024,
	}
)

Functions

func Connect

func Connect(endpoint *ServiceEndpoint, connectTimeout time.Duration) (conn net.Conn, err error)

func ReleaseOutboundResponse

func ReleaseOutboundResponse(resp IResponseContext)

func ReplyError

func ReplyError(req IRequestContext, status uint32)

Types

type Conn

type Conn interface {
	GetStateString() string
	GetTLSVersion() string
	GetCipherName() string
	DidResume() string
	GetNetConn() net.Conn
	IsTLS() bool
}

func ConnectTo

func ConnectTo(endpoint *ServiceEndpoint, connectTimeout time.Duration) (conn Conn, err error)

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

func (*Connection) DidResume

func (c *Connection) DidResume() string

func (*Connection) GetCipherName

func (c *Connection) GetCipherName() string

func (*Connection) GetNetConn

func (c *Connection) GetNetConn() net.Conn

func (*Connection) GetStateString

func (c *Connection) GetStateString() string

func (*Connection) GetTLSVersion

func (c *Connection) GetTLSVersion() string

func (*Connection) IsTLS

func (c *Connection) IsTLS() bool

type Connector

type Connector struct {
	// contains filtered or unexported fields
}

func (*Connector) Close

func (c *Connector) Close()

func (*Connector) OnKeepAlive

func (c *Connector) OnKeepAlive()

func (*Connector) Start

func (c *Connector) Start()

func (*Connector) Stop

func (c *Connector) Stop()

type IConnEventHandler

type IConnEventHandler interface {
	OnConnectSuccess(conn Conn, connector *OutboundConnector, timeTaken time.Duration)
	OnConnectError(timeTaken time.Duration, connStr string, err error)
}

type IHandshaker

type IHandshaker interface {
	GetName() string
	GetHandshakeTimeout() util.Duration
	GetPingRequest() IRequestContext
	ExpectResponse() bool
	OnPingResponse(ctx IResponseContext) bool
	GetPingIP() string
}

type IListener

type IListener interface {
	GetName() string
	GetType() ListenerType
	AcceptAndServe() error
	Close() error
	Shutdown()
	WaitForShutdownToComplete(time.Duration)
	GetConnString() string
	GetNumActiveConnections() uint32
	Refresh()
}

func NewListener

func NewListener(cfg ListenerConfig, iocfg InboundConfig, reqHandler IRequestHandler) (lsnr IListener, err error)

func NewListenerWithFd

func NewListenerWithFd(cfg ListenerConfig, iocfg InboundConfig, f *os.File, reqHandler IRequestHandler) (lsnr IListener, err error)

type IRequestContext

type IRequestContext interface {
	util.QueItem
	GetMessage() *proto.RawMessage
	GetCtx() context.Context
	Cancel()
	Read(r io.Reader) (n int, err error)
	WriteWithOpaque(opaque uint32, w io.Writer) (n int, err error)
	Reply(resp IResponseContext)
	OnComplete()
	GetReceiveTime() time.Time
	SetTimeout(parent context.Context, duration time.Duration)
}

func DefaultInboundRequestContexCreator

func DefaultInboundRequestContexCreator(magic []byte, c *Connector) (ctx IRequestContext, err error)

func ExtendedRequestContexCreator

func ExtendedRequestContexCreator(magic []byte, c *Connector) (ctx IRequestContext, err error)

type IRequestHandler

type IRequestHandler interface {
	Init()
	GetReqCtxCreator() InboundRequestContextCreator

	Process(reqCtx IRequestContext) error
	Finish()

	OnKeepAlive(connector *Connector, reqCtx IRequestContext) error
}

type IResponseContext

type IResponseContext interface {
	GetStatus() uint32
	GetMessage() *proto.RawMessage
	GetMsgSize() uint32
	OnComplete()
	Read(r io.Reader) (n int, err error)
	Write(w io.Writer) (n int, err error)
}

type InboundConfig

type InboundConfig struct {
	HandshakeTimeout     util.Duration //only for TLS connection
	IdleTimeout          util.Duration
	ReadTimeout          util.Duration
	WriteTimeout         util.Duration
	RequestTimeout       util.Duration
	MaxBufferedWriteSize int
	IOBufSize            int
	RespChanSize         int
}

func (*InboundConfig) SetDefaultIfNotDefined

func (conf *InboundConfig) SetDefaultIfNotDefined() (set bool)

type InboundConfigMap

type InboundConfigMap map[string]InboundConfig

func (*InboundConfigMap) SetDefaultIfNotDefined

func (m *InboundConfigMap) SetDefaultIfNotDefined()

type InboundConnManager

type InboundConnManager struct {
	// contains filtered or unexported fields
}

func (*InboundConnManager) GetNumActiveConnections

func (m *InboundConnManager) GetNumActiveConnections() uint32

thread safe?

func (*InboundConnManager) Shutdown

func (m *InboundConnManager) Shutdown()

func (*InboundConnManager) TrackConn

func (m *InboundConnManager) TrackConn(c *Connector, add bool)

func (*InboundConnManager) WaitForShutdownToComplete

func (m *InboundConnManager) WaitForShutdownToComplete(timeout time.Duration)

type InboundRequestContext

type InboundRequestContext struct {
	RequestContext
	// contains filtered or unexported fields
}

func NewInboundRequestContext

func NewInboundRequestContext(c *Connector) (r *InboundRequestContext)

func (*InboundRequestContext) GetListenerType

func (r *InboundRequestContext) GetListenerType() ListenerType

func (*InboundRequestContext) SetResponseChannel

func (r *InboundRequestContext) SetResponseChannel(ch chan<- IResponseContext)

type InboundRequestContextCreator

type InboundRequestContextCreator func(magic []byte, c *Connector) (ctx IRequestContext, err error)

type InboundResponseContext

type InboundResponseContext struct {
	ResponseContext
}

func NewInboundResponseContext

func NewInboundResponseContext(opMsg *proto.OperationalMessage) (r *InboundResponseContext, err error)

func NewInboundRespose

func NewInboundRespose(opCode proto.OpCode, m *proto.RawMessage) (r *InboundResponseContext)

type Listener

type Listener struct {
	// contains filtered or unexported fields
}

func (*Listener) AcceptAndServe

func (l *Listener) AcceptAndServe() error

func (*Listener) Close

func (l *Listener) Close() error

func (*Listener) GetConnString

func (l *Listener) GetConnString() string

func (*Listener) GetName

func (l *Listener) GetName() string

func (*Listener) GetNumActiveConnections

func (l *Listener) GetNumActiveConnections() uint32

func (*Listener) GetType

func (l *Listener) GetType() ListenerType

func (*Listener) Refresh

func (l *Listener) Refresh()

func (*Listener) Shutdown

func (l *Listener) Shutdown()

func (*Listener) WaitForShutdownToComplete

func (l *Listener) WaitForShutdownToComplete(timeout time.Duration)

type ListenerConfig

type ListenerConfig struct {
	ServiceEndpoint
	Name string
}

func (*ListenerConfig) SetDefaultIfNotDefined

func (cfg *ListenerConfig) SetDefaultIfNotDefined()

type ListenerType

type ListenerType byte

type OutboundConfig

type OutboundConfig struct {
	ConnectTimeout        util.Duration
	ConnectRecycleT       util.Duration
	GracefulShutdownTime  util.Duration
	EnableConnRecycle     bool
	ReqChanBufSize        int
	MaxPendingQueSize     int
	PendingQueExtra       int
	MaxBufferedWriteSize  int
	ReconnectIntervalBase int
	ReconnectIntervalMax  int
	NumConnsPerTarget     int32
	IOBufSize             int
}

func (*OutboundConfig) SetDefaultIfNotDefined

func (conf *OutboundConfig) SetDefaultIfNotDefined() (set bool)

type OutboundConfigMap

type OutboundConfigMap map[string]OutboundConfig

func (*OutboundConfigMap) SetDefaultIfNotDefined

func (m *OutboundConfigMap) SetDefaultIfNotDefined()

type OutboundConnector

type OutboundConnector struct {
	// contains filtered or unexported fields
}

func NewOutboundConnector

func NewOutboundConnector(id int, c net.Conn, reqCh chan IRequestContext, monCh chan int,
	config *OutboundConfig) (p *OutboundConnector)

constructor/factory each OutboundConnector object will have two go routines one for Read; one for Write

func (*OutboundConnector) AllowRestart

func (p *OutboundConnector) AllowRestart() bool

func (*OutboundConnector) Close

func (p *OutboundConnector) Close()

func (*OutboundConnector) GetId

func (p *OutboundConnector) GetId() int

func (*OutboundConnector) GetPingIP

func (p *OutboundConnector) GetPingIP() string

func (*OutboundConnector) Handshake

func (p *OutboundConnector) Handshake() bool

run one time

func (*OutboundConnector) IsActive

func (p *OutboundConnector) IsActive() bool

func (*OutboundConnector) Recycle

func (p *OutboundConnector) Recycle()

func (*OutboundConnector) SetHandshaker

func (p *OutboundConnector) SetHandshaker(h IHandshaker)

func (*OutboundConnector) SetNewConn

func (p *OutboundConnector) SetNewConn(c net.Conn)

func (*OutboundConnector) SetState

func (p *OutboundConnector) SetState(s StateType)

func (*OutboundConnector) Shutdown

func (p *OutboundConnector) Shutdown()

wait for all go routine to finish

func (*OutboundConnector) Start

func (p *OutboundConnector) Start()

func (*OutboundConnector) WriteStats

func (p *OutboundConnector) WriteStats(w io.Writer, indent int)

type OutboundProcessor

type OutboundProcessor struct {
	// contains filtered or unexported fields
}

OutboundProcessor manages a pool of one or more underlying connections to a downstream server; It also bounces incoming requests when all connections are down.

func NewOutbProcessor

func NewOutbProcessor(endpoint ServiceEndpoint, config *OutboundConfig, enableBounce bool) (p *OutboundProcessor)

func NewOutboundProcessor

func NewOutboundProcessor(connInfo string, config *OutboundConfig, enableBounce bool) *OutboundProcessor

func (*OutboundProcessor) GetConnInfo

func (p *OutboundProcessor) GetConnInfo() string

func (*OutboundProcessor) GetIPPort

func (p *OutboundProcessor) GetIPPort() (ip string, port string)

func (*OutboundProcessor) GetIsConnected

func (p *OutboundProcessor) GetIsConnected() int

func (*OutboundProcessor) GetNumConnections

func (p *OutboundProcessor) GetNumConnections() int

func (*OutboundProcessor) GetRandFrequency

func (p *OutboundProcessor) GetRandFrequency(freq time.Duration) time.Duration

func (*OutboundProcessor) GetRequestCh

func (p *OutboundProcessor) GetRequestCh() chan IRequestContext

func (*OutboundProcessor) GetRequestSendingQueueSize

func (p *OutboundProcessor) GetRequestSendingQueueSize() int

func (*OutboundProcessor) Init

func (p *OutboundProcessor) Init(endpoint ServiceEndpoint, config *OutboundConfig, enableBounce bool)

func (*OutboundProcessor) OnConnectError

func (p *OutboundProcessor) OnConnectError(timeTaken time.Duration, connStr string, err error)

func (*OutboundProcessor) OnConnectSuccess

func (p *OutboundProcessor) OnConnectSuccess(conn Conn, connector *OutboundConnector, timeTaken time.Duration)

func (*OutboundProcessor) Run

func (p *OutboundProcessor) Run()

func (*OutboundProcessor) SendRequest

func (p *OutboundProcessor) SendRequest(req IRequestContext) (err *errors.Error)

func (*OutboundProcessor) SendRequestLowPriority

func (p *OutboundProcessor) SendRequestLowPriority(req IRequestContext) (err *errors.Error)

func (*OutboundProcessor) SetConnEventHandler

func (p *OutboundProcessor) SetConnEventHandler(hdlr IConnEventHandler)

func (*OutboundProcessor) Shutdown

func (p *OutboundProcessor) Shutdown()

func (*OutboundProcessor) Start

func (p *OutboundProcessor) Start()

func (*OutboundProcessor) WaitShutdown

func (p *OutboundProcessor) WaitShutdown()

func (*OutboundProcessor) WriteStats

func (p *OutboundProcessor) WriteStats(w io.Writer, indent int)

type OutboundRequestContext

type OutboundRequestContext struct {
	RequestContext
}

func NewOutboundRequestContext

func NewOutboundRequestContext(msg *proto.RawMessage, opaque uint32,
	ctx context.Context, ch chan<- IResponseContext, to time.Duration) (r *OutboundRequestContext)

func (*OutboundRequestContext) OnCleanup

func (r *OutboundRequestContext) OnCleanup()

func (*OutboundRequestContext) OnExpiration

func (r *OutboundRequestContext) OnExpiration()

func (*OutboundRequestContext) Reply

type OutboundResponseContext

type OutboundResponseContext struct {
	ResponseContext
	// contains filtered or unexported fields
}

func NewErrorOutboundResponse

func NewErrorOutboundResponse(status uint32) (r *OutboundResponseContext)

func NewOutboundResponse

func NewOutboundResponse() *OutboundResponseContext

func (*OutboundResponseContext) GetStatus

func (r *OutboundResponseContext) GetStatus() uint32

type RequestContext

type RequestContext struct {
	util.QueItemBase
	// contains filtered or unexported fields
}

Implement IRequestContext

func (*RequestContext) Cancel

func (r *RequestContext) Cancel()

func (*RequestContext) GetCtx

func (r *RequestContext) GetCtx() context.Context

func (*RequestContext) GetMessage

func (r *RequestContext) GetMessage() *proto.RawMessage

func (*RequestContext) GetReceiveTime

func (r *RequestContext) GetReceiveTime() time.Time

func (*RequestContext) OnCleanup

func (r *RequestContext) OnCleanup()

func (*RequestContext) OnComplete

func (r *RequestContext) OnComplete()

func (*RequestContext) OnExpiration

func (r *RequestContext) OnExpiration()

func (*RequestContext) Read

func (req *RequestContext) Read(r io.Reader) (n int, err error)

func (*RequestContext) Reply

func (r *RequestContext) Reply(resp IResponseContext)

func (*RequestContext) SetTimeout

func (r *RequestContext) SetTimeout(parent context.Context, timeout time.Duration)

To be implement

func (*RequestContext) WriteWithOpaque

func (r *RequestContext) WriteWithOpaque(opaque uint32, w io.Writer) (n int, err error)

type ResponseContext

type ResponseContext struct {
	// contains filtered or unexported fields
}

Implement IResponseContext

func (*ResponseContext) GetMessage

func (r *ResponseContext) GetMessage() *proto.RawMessage

func (*ResponseContext) GetMsgSize

func (r *ResponseContext) GetMsgSize() uint32

func (*ResponseContext) GetOpStatus

func (r *ResponseContext) GetOpStatus() proto.OpStatus

func (*ResponseContext) GetStatus

func (r *ResponseContext) GetStatus() (s uint32)

func (*ResponseContext) OnComplete

func (r *ResponseContext) OnComplete()

func (*ResponseContext) Read

func (req *ResponseContext) Read(r io.Reader) (n int, err error)

func (*ResponseContext) Write

func (r *ResponseContext) Write(w io.Writer) (n int, err error)

type ServiceEndpoint

type ServiceEndpoint struct {
	Addr       string
	Network    string
	SSLEnabled bool
}

func (*ServiceEndpoint) GetConnString

func (p *ServiceEndpoint) GetConnString() (str string)

/TODO . now, it is just a very simple one to construct connection string

func (*ServiceEndpoint) SetFromConnString

func (p *ServiceEndpoint) SetFromConnString(connStr string) error

/TODO . now, it is just a very simple function to parse connection string

func (*ServiceEndpoint) Validate

func (p *ServiceEndpoint) Validate() (err error)

type SslListener

type SslListener struct {
	Listener
}

func (*SslListener) AcceptAndServe

func (l *SslListener) AcceptAndServe() error

func (*SslListener) GetType

func (l *SslListener) GetType() ListenerType

func (*SslListener) Refresh

func (l *SslListener) Refresh()

type StateType

type StateType int32

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL