Documentation ¶
Index ¶
- Variables
- func DebugBufferListDetail(path string)
- func DebugQueueDetail(path string)
- func Listen(shmIPCAddress string) (net.Listener, error)
- func ListenWithBacklog(shmIPCAddress string, backlog int) (net.Listener, error)
- func MemfdCreate(name string, flags int) (fd int, err error)
- func SetLogLevel(l int)
- func VerifyConfig(config *Config) error
- type BufferReader
- type BufferWriter
- type Config
- type ListenCallback
- type Listener
- type ListenerConfig
- type MemMapType
- type Monitor
- type PerformanceMetrics
- type Session
- func (s *Session) AcceptStream() (*Stream, error)
- func (s *Session) Close() error
- func (s *Session) CloseChan() <-chan struct{}
- func (s *Session) GetActiveStreamCount() int
- func (s *Session) GetMetrics() (PerformanceMetrics, StabilityMetrics, ShareMemoryMetrics)
- func (s *Session) ID() string
- func (s *Session) IsClient() bool
- func (s *Session) IsClosed() bool
- func (s *Session) IsHealthy() bool
- func (s *Session) LocalAddr() net.Addr
- func (s *Session) OpenStream() (*Stream, error)
- func (s *Session) RemoteAddr() net.Addr
- type SessionManager
- type SessionManagerConfig
- type ShareMemoryMetrics
- type SizePercentPair
- type StabilityMetrics
- type Stream
- func (s *Stream) BufferReader() BufferReader
- func (s *Stream) BufferWriter() BufferWriter
- func (s *Stream) Close() error
- func (s *Stream) Flush(endStream bool) error
- func (s *Stream) IsOpen() bool
- func (s *Stream) LocalAddr() net.Addr
- func (s *Stream) Read(p []byte) (int, error)
- func (s *Stream) ReleaseReadAndReuse()
- func (s *Stream) RemoteAddr() net.Addr
- func (s *Stream) Session() *Session
- func (s *Stream) SetCallbacks(callback StreamCallbacks) error
- func (s *Stream) SetDeadline(t time.Time) error
- func (s *Stream) SetReadDeadline(t time.Time) error
- func (s *Stream) SetWriteDeadline(t time.Time) error
- func (s *Stream) StreamID() uint32
- func (s *Stream) Write(p []byte) (int, error)
- type StreamCallbacks
Constants ¶
This section is empty.
Variables ¶
var ( //ErrInvalidVersion means that we received a message with an invalid version ErrInvalidVersion = errors.New("invalid protocol version") //ErrInvalidMsgType means that we received a message with an invalid message type ErrInvalidMsgType = errors.New("invalid msg type") //ErrSessionShutdown means that the session is shutdown ErrSessionShutdown = errors.New("session shutdown") //ErrStreamsExhausted means that the stream id was used out and maybe have some streams leaked. ErrStreamsExhausted = errors.New("streams exhausted") //ErrTimeout is used when we reach an IO deadline ErrTimeout = errors.New("i/o deadline reached") //ErrStreamClosed was returned when using a closed stream ErrStreamClosed = errors.New("stream closed") //ErrConnectionWriteTimeout means that the write timeout was happened in tcp/unix connection. ErrConnectionWriteTimeout = errors.New("connection write timeout") //ErrEndOfStream means that the stream is end, user shouldn't to read from the stream. ErrEndOfStream = errors.New("end of stream") //ErrSessionUnhealthy occurred at Session.OpenStream(), which mean that the session is overload. //user should retry after 60 seconds(now). the followings situation will result in ErrSessionUnhealthy. //on client side: // 1. when local share memory is not enough, client send request data via unix domain socket. // 2. when peer share memory is not enough, client receive response data from unix domain socket. ErrSessionUnhealthy = errors.New("now the session is unhealthy, please retry later") //ErrNotEnoughData means that the real read size < expect read size. ErrNotEnoughData = errors.New("current buffer is not enough data to read") //ErrNoMoreBuffer means that the share memory is busy, and not more buffer to allocate. ErrNoMoreBuffer = errors.New("share memory not more buffer") ErrShareMemoryHadNotLeftSpace = errors.New("share memory had not left space") //ErrStreamCallbackHadExisted was returned if the Stream'Callbacks had existed ErrStreamCallbackHadExisted = errors.New("stream callbacks had existed") //ErrOSNonSupported means that shmipc couldn't work in current OS. (only support Linux now) ErrOSNonSupported = errors.New("shmipc just support linux OS now") //ErrHotRestartInProgress was returned by Listener.HotRestart when the Session had under the hot restart state ErrHotRestartInProgress = errors.New("hot restart in progress, try again later") //ErrInHandshakeStage was happened in the case that the uninitialized session doing hot restart. ErrInHandshakeStage = errors.New("session in handshake stage, try again later") //ErrFileNameTooLong mean that eht Config.ShareMemoryPathPrefixFile's length reached the limitation of the OS. ErrFileNameTooLong = errors.New("share memory path prefix too long") //ErrQueueFull mean that the server is so busy that the io queue is full ErrQueueFull = errors.New("the io queue is full") )
Functions ¶
func DebugBufferListDetail ¶
func DebugBufferListDetail(path string)
DebugBufferListDetail print all BufferList's status in share memory located in the `path` if MemMapType is MemMapTypeMemFd, you could using the command that `lsof -p $PID` to found the share memory which was mmap by memfd, and the command `cat /proc/$PID/$MEMFD > $path` dump the share memory to file system.
func DebugQueueDetail ¶
func DebugQueueDetail(path string)
DebugQueueDetail print IO-Queue's status which was mmap in the `path`
func Listen ¶
Listen create listener with default backlog size(4096) shmIPCAddress is uds address used as underlying connection, the returned value is net.Listener Remember close the listener if it is created successfully, or goroutine may leak Should I use Listen?
If you want the best performance, you should use low level API(not this one) to marshal and unmarshal manually, which can achieve better batch results. If you just care about the compatibility, you can use this high level API. For example, you can hardly change grpc and protobuf, then you can use this listener to make it compatible with a little bit improved performance.
func ListenWithBacklog ¶
ListenWithBacklog create listener with given backlog size shmIPCAddress is uds address used as underlying connection, the returned value is net.Listener Remember close the listener if it is created successfully, or goroutine may leak Should I use ListenWithBacklog?
If you want the best performance, you should use low level API(not this one) to marshal and unmarshal manually, which can achieve better batch results. If you just care about the compatibility, you can use this high level API. For example, you can hardly change grpc and protobuf, then you can use this listener to make it compatible with a little bit improved performance.
func MemfdCreate ¶
linux 3.17+ provided
func SetLogLevel ¶
func SetLogLevel(l int)
SetLogLevel used to change the internal logger's level and the default level is Warning. The process env `SHMIPC_LOG_LEVEL` also could set log level
func VerifyConfig ¶
VerifyConfig is used to verify the sanity of configuration
Types ¶
type BufferReader ¶
type BufferReader interface { io.ByteReader //Len() return the current unread size of buffer. //It will traverse all underlying slices to compute the unread size, please dont's call frequently. Len() int //Read `size` bytes from share memory, which maybe block if size is greater than Len(). //Notice: when ReleasePreviousRead() was called, the results of previous ReadBytes() will be invalid. ReadBytes(size int) ([]byte, error) //Peek `size` bytes from share memory. the different between Peek() and ReadBytes() is that //Peek() don't influence the return value of Len(), but the ReadBytes() will decrease the unread size. //eg: the buffer is [0,1,2,3] //1. after Peek(2), the buffer is also [0,1,2,3], and the Len() is 4. //2. after ReadBytes(3), the buffer is [3], and the Len() is 1. //Notice: when ReleasePreviousRead was called, the results of previous Peek call is invalid . Peek(size int) ([]byte, error) //Drop data of given length. If there's no that much data, will block until the data is enough to discard Discard(size int) (int, error) /* Call ReleasePreviousRead when it is safe to drop all previous result of ReadBytes and Peek, otherwise shm memory will leak. eg: buf, err := BufferReader.ReadBytes(size) // or Buffer. //do */ ReleasePreviousRead() //If you would like to read string from the buffer, ReadString(size) is better than string(ReadBytes(size)). ReadString(size int) (string, error) }
BufferReader used to read data from stream.
type BufferWriter ¶
type BufferWriter interface { //Len() return the current wrote size of buffer. //It will traverse all underlying slices to compute the unread size, please dont's call frequently. Len() int io.ByteWriter //Reserve `size` byte share memory space, user could use it implement zero copy write. Reserve(size int) ([]byte, error) //Copy data to share memory. //return value: `n` is the written size //return value: `err`, is nil mean that succeed, otherwise failure. WriteBytes(data []byte) (n int, err error) //Copy string to share memory WriteString(string) error }
BufferWriter used to write data to stream.
type Config ¶
type Config struct { // ConnectionWriteTimeout is meant to be a "safety valve" timeout after // we which will suspect a problem with the underlying connection and // close it. This is only applied to writes, where's there's generally // an expectation that things will move along quickly. ConnectionWriteTimeout time.Duration //In the initialization phase, client and server will exchange metadata and mapping share memory. //the InitializeTimeout specify how long time could use in this phase. InitializeTimeout time.Duration //The max number of pending io request. default is 8192 QueueCap uint32 //Share memory path of the underlying queue. QueuePath string ShareMemoryBufferCap uint32 ShareMemoryPathPrefix string //LogOutput is used to control the log destination. LogOutput io.Writer //BufferSliceSizes could adjust share memory buffer slice size. //which could improve performance if most of all request or response's could write into single buffer slice instead of multi buffer slices. //Because multi buffer slices mean that more allocate and free operation, //and if the payload cross different buffer slice, it mean that payload in memory isn't continuous. //Default value is: // 1. 50% share memory hold on buffer slices that every slice is near to 8KB. // 2. 30% share memory hold on buffer slices that every slice is near to 32KB. // 3. 20% share memory hold on buffer slices that every slice is near to 128KB. BufferSliceSizes []*SizePercentPair //MemMapTypeDevShmFile or MemMapTypeMemFd (client set) MemMapType MemMapType //Session will emit some metrics to the Monitor with periodically (default 30s) Monitor Monitor // contains filtered or unexported fields }
Config is used to tune the shmipc session
func DefaultConfig ¶
func DefaultConfig() *Config
DefaultConfig is used to return a default configuration
type ListenCallback ¶
type ListenCallback interface { //OnNewStream was called when accept a new stream OnNewStream(s *Stream) //OnShutdown was called when the listener was stopped OnShutdown(reason string) }
ListenCallback is server's asynchronous API
type Listener ¶
type Listener struct {
// contains filtered or unexported fields
}
Listener listen socket and accept connection as shmipc server connection
func NewListener ¶
func NewListener(callback ListenCallback, config *ListenerConfig) (*Listener, error)
NewListener will try listen the ListenPath of the configuration, and return the Listener if no error happened.
func (*Listener) Accept ¶
Accept doesn't work, whose existence just adapt to the net.Listener interface.
func (*Listener) Close ¶
Close closes the listener. Any blocked Accept operations will be unblocked and return errors.
func (*Listener) HotRestart ¶
HotRestart will do shmipc server hot restart
func (*Listener) IsHotRestartDone ¶
IsHotRestartDone return whether the Listener is under the hot restart state.
func (*Listener) SetUnlinkOnClose ¶
SetUnlinkOnClose sets whether unlink unix socket path when Listener was stopped
type ListenerConfig ¶
type ListenerConfig struct { *Config Network string //Only support unix or tcp //If Network is "tcp', the ListenPath is ip address and port, such as 0.0.0.0:6666(ipv4), [::]:6666 (ipv6) //If Network is "unix", the ListenPath is a file path, such as /your/socket/path/xx_shmipc.sock ListenPath string }
ListenerConfig is the configuration of Listener
func NewDefaultListenerConfig ¶
func NewDefaultListenerConfig(listenPath string, network string) *ListenerConfig
NewDefaultListenerConfig return the default Listener's config
type MemMapType ¶
type MemMapType uint8
MemMapType is the mapping type of shared memory
const ( // MemMapTypeDevShmFile maps share memory to /dev/shm (tmpfs) MemMapTypeDevShmFile MemMapType = 0 // MemMapTypeMemFd maps share memory to memfd (Linux OS v3.17+) MemMapTypeMemFd MemMapType = 1 )
type Monitor ¶
type Monitor interface { // OnEmitSessionMetrics was called by Session with periodically. OnEmitSessionMetrics(PerformanceMetrics, StabilityMetrics, ShareMemoryMetrics, *Session) // flush metrics Flush() error }
Monitor could emit some metrics with periodically
type PerformanceMetrics ¶
type PerformanceMetrics struct { ReceiveSyncEventCount uint64 //the SyncEvent count that session had received SendSyncEventCount uint64 //the SyncEvent count that session had sent OutFlowBytes uint64 //the out flow in bytes that session had sent InFlowBytes uint64 //the in flow in bytes that session had receive SendQueueCount uint64 //the pending count of send queue ReceiveQueueCount uint64 //the pending count of receive queue }
PerformanceMetrics is the metrics about performance
type Session ¶
type Session struct {
// contains filtered or unexported fields
}
Session is used to wrap a reliable ordered connection and to multiplex it into multiple streams.
func (*Session) AcceptStream ¶
AcceptStream is used to block until the next available stream is ready to be accepted.
func (*Session) Close ¶
Close is used to close the session and all streams. Attempts to send a GoAway before closing the connection.
func (*Session) CloseChan ¶
func (s *Session) CloseChan() <-chan struct{}
CloseChan returns a read-only channel which is closed as soon as the session is closed.
func (*Session) GetActiveStreamCount ¶
GetActiveStreamCount returns the number of currently open streams
func (*Session) GetMetrics ¶
func (s *Session) GetMetrics() (PerformanceMetrics, StabilityMetrics, ShareMemoryMetrics)
GetMetrics return the session's metrics for monitoring
func (*Session) LocalAddr ¶
LocalAddr is used to get the local address of the underlying connection.
func (*Session) OpenStream ¶
OpenStream is used to create a new stream
func (*Session) RemoteAddr ¶
RemoteAddr is used to get the address of remote end of the underlying connection
type SessionManager ¶
SessionManager will start multi Session with the peer process. when peer process was crashed or the underlying connection was closed, SessionManager could retry connect. and SessionManager could cooperate with peer process to finish hot restart.
func GlobalSessionManager ¶
func GlobalSessionManager() *SessionManager
GlobalSessionManager return a global SessionManager. return nil if global SessionManager hadn't initialized
func InitGlobalSessionManager ¶
func InitGlobalSessionManager(config *SessionManagerConfig) (*SessionManager, error)
InitGlobalSessionManager initializes a global SessionManager and could use in every where in process
func NewSessionManager ¶
func NewSessionManager(config *SessionManagerConfig) (*SessionManager, error)
NewSessionManager return a SessionManager with giving configuration
func (*SessionManager) Close ¶
func (sm *SessionManager) Close() error
Close will shutdown the SessionManager's background goroutine and close all stream in stream pool
func (*SessionManager) GetStream ¶
func (sm *SessionManager) GetStream() (*Stream, error)
GetStream return a shmipc's Stream from stream pool. Every stream should explicitly call PutBack() to return it to SessionManager for next time using, otherwise it will cause resource leak.
func (*SessionManager) PutBack ¶
func (sm *SessionManager) PutBack(stream *Stream)
PutBack is used to return unused stream to stream pool for next time using.
type SessionManagerConfig ¶
type SessionManagerConfig struct { *Config UnixPath string //Deprecated , please use Network and Address. Network string //tcp or unix Address string //tcp address or unix domain socket path //SessionNum is similar to concurrency. //A session have dependent io queue, dependent tcp/unix connection. //we recommend the value equal peer process' thread count, and every thread keep a session. //if the peer process written by golang, recommend SessionNum = cpu cores / 4 SessionNum int //Max number of stream per session's stream pool MaxStreamNum int //The idle time to close a stream StreamMaxIdleTime time.Duration }
SessionManagerConfig is the configuration of SessionManager
func DefaultSessionManagerConfig ¶
func DefaultSessionManagerConfig() *SessionManagerConfig
DefaultSessionManagerConfig return the default SessionManager's configuration
type ShareMemoryMetrics ¶
type ShareMemoryMetrics struct {}
ShareMemoryMetrics is the metrics about share memory's status
type SizePercentPair ¶
type SizePercentPair struct { //A single buffer slice's capacity of buffer list, Size uint32 //The used percent of buffer list in the total share memory Percent uint32 }
A SizePercentPair describe a buffer list's specification
type StabilityMetrics ¶
type StabilityMetrics struct { AllocShmErrorCount uint64 //the error count of allocating share memory FallbackWriteCount uint64 //the count of the fallback data write to unix/tcp connection FallbackReadCount uint64 //the error count of receiving fallback data from unix/tcp connection every period //the error count of unix/tcp connection //which usually happened in that the peer's process exit(crashed or other reason) EventConnErrorCount uint64 //the error count due to the IO-Queue(SendQueue or ReceiveQueue) is full //which usually happened in that the peer was busy QueueFullErrorCount uint64 //current all active stream count ActiveStreamCount uint64 //the successful count of hot restart HotRestartSuccessCount uint64 //the failed count of hot restart HotRestartErrorCount uint64 }
StabilityMetrics is the metrics about stability
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
Stream is used to represent a logical stream within a session.
func (*Stream) BufferReader ¶
func (s *Stream) BufferReader() BufferReader
BufferReader return the buffer to read.
func (*Stream) BufferWriter ¶
func (s *Stream) BufferWriter() BufferWriter
BufferWriter return the buffer to write, and after wrote done you should call Stream.Flush(endStream).
func (*Stream) Close ¶
Close used to close the stream, which maybe block if there is StreamCallbacks running. if a stream was leaked, it's also mean that some share memory was leaked.
func (*Stream) Flush ¶
Flush the buffered stream data to peer. If the endStream is true, it mean that this stream hadn't send any data to peer after flush, and the peer could close stream after receive data
func (*Stream) Read ¶
Low performance api, it just adapt to the interface net.Conn, which will copy data from read buffer to `p` please use BufferReader() API to implement zero copy read
func (*Stream) ReleaseReadAndReuse ¶
func (s *Stream) ReleaseReadAndReuse()
ReleaseReadAndReuse used to Release the data previous read by Stream.BufferReader(), and reuse the last share memory buffer slice of read buffer for next write by Stream.BufferWriter()
func (*Stream) RemoteAddr ¶
RemoteAddr returns the remote address
func (*Stream) SetCallbacks ¶
func (s *Stream) SetCallbacks(callback StreamCallbacks) error
SetCallbacks used to set the StreamCallbacks. Notice: It was just called only once, or return the error named ErrStreamCallbackHadExisted.
func (*Stream) SetDeadline ¶
SetDeadline sets the read timeout for blocked and future Read calls.
func (*Stream) SetReadDeadline ¶
SetReadDeadline is the same as net.Conn
func (*Stream) SetWriteDeadline ¶
SetWriteDeadline is the same as net.Conn
type StreamCallbacks ¶
type StreamCallbacks interface { //OnData() will be call by new goroutine, When the stream receive new data OnData(reader BufferReader) //OnLocalClose was called when the stream was closed by local process OnLocalClose() //OnRemoteClose was called when the stream was closed by remote process OnRemoteClose() }
StreamCallbacks provide asynchronous programming mode for improving performance in some scenarios
Source Files ¶
- block_io.go
- buffer.go
- buffer_manager.go
- buffer_slice.go
- config.go
- const.go
- debug.go
- epoll_linux.go
- errors.go
- event_dispatcher.go
- event_dispatcher_linux.go
- listener.go
- net_listener.go
- protocol_event.go
- protocol_initializer.go
- protocol_manager.go
- queue.go
- session.go
- session_manager.go
- stats.go
- stream.go
- sys_memfd_create_linux.go
- util.go