Documentation ¶
Index ¶
- Constants
- Variables
- func Append(data []byte, value ...interface{}) (out []byte, err error)
- func AppendBER(data []byte, value ...interface{}) (out []byte, err error)
- func DefaultAlloc(n int) []byte
- func Handle(message uint32, handler Handler)
- func ListenAndServe(ctx context.Context, network, addr string, h Handler) error
- func MarshalPacket(pkt Packet) []byte
- func NotifyPoolChangeWith(h *PoolChangeHandle, ch chan<- PoolChange, pools ...*Pool)
- func Pack(value ...interface{}) (out []byte, err error)
- func PackBER(value ...interface{}) (out []byte, err error)
- func PackBytes(w []byte, v []byte, mode PackMode) []byte
- func PackString(w []byte, v string, mode PackMode) []byte
- func PackUint16(w []byte, v uint16, mode PackMode) []byte
- func PackUint32(w []byte, v uint32, mode PackMode) []byte
- func PackUint64(w []byte, v uint64, mode PackMode) []byte
- func PackUint8(w []byte, v uint8, mode PackMode) []byte
- func PacketSize(pkt Packet) int
- func PutPacket(p []byte, pkt Packet)
- func Unpack(data []byte, value ...interface{}) ([]byte, error)
- func UnpackBER(data []byte, value ...interface{}) ([]byte, error)
- func UnpackBytes(r *bytes.Reader, v *[]byte, mode PackMode) (err error)
- func UnpackString(r *bytes.Reader, v *string, mode PackMode) (err error)
- func UnpackUint16(r *bytes.Reader, v *uint16, mode PackMode) (err error)
- func UnpackUint32(r *bytes.Reader, v *uint32, mode PackMode) (err error)
- func UnpackUint64(r *bytes.Reader, v *uint64, mode PackMode) (err error)
- func UnpackUint8(r *bytes.Reader, v *uint8, mode PackMode) (err error)
- func WatchPoolConnCount(p *Pool, cb func(int))
- func WritePacket(w io.Writer, p Packet) (err error)
- type AcceptFn
- type BodyAllocator
- type BytePool
- type Channel
- func (c *Channel) Call(ctx context.Context, method uint32, data []byte) (resp []byte, err error)
- func (c *Channel) Close()
- func (c *Channel) Closed() bool
- func (c *Channel) Done() <-chan struct{}
- func (c *Channel) Drop()
- func (c *Channel) Error() error
- func (c *Channel) GetBytes(n int) []byte
- func (c *Channel) Hijack() (conn net.Conn, rbuf []byte, err error)
- func (c *Channel) Init() (err error)
- func (c *Channel) LocalAddr() net.Addr
- func (c *Channel) Notify(ctx context.Context, method uint32, data []byte) (err error)
- func (c *Channel) OnClose(cb func())
- func (c *Channel) OnShutdown(cb func())
- func (c *Channel) PutBytes(p []byte)
- func (c *Channel) ReadDone() <-chan struct{}
- func (c *Channel) RemoteAddr() net.Addr
- func (c *Channel) Send(ctx context.Context, p Packet) error
- func (c *Channel) SetContext(ctx context.Context)
- func (c *Channel) Shutdown()
- func (c *Channel) Stats() ChannelStats
- func (c *Channel) WriteClosed() bool
- func (c *Channel) WriteDone() <-chan struct{}
- type ChannelConfig
- type ChannelStats
- type Closer
- type Cluster
- func (c *Cluster) Close()
- func (c *Cluster) Disable(p *Pool) bool
- func (c *Cluster) Enable(p *Pool) bool
- func (c *Cluster) Insert(p *Pool) (inserted bool)
- func (c *Cluster) OnBeforeChange(cb func([]*Pool))
- func (c *Cluster) Peers() []*Pool
- func (c *Cluster) Remove(p *Pool) (removed bool)
- func (c *Cluster) Reset(ps []*Pool) (inserted, removed, ignored []*Pool)
- func (c *Cluster) ResetAndDisableInserted(ps []*Pool) (inserted, removed, ignored []*Pool)
- func (c *Cluster) Shutdown()
- type Conn
- type DefaultLogger
- type Handler
- type HandlerFunc
- type Header
- type Logger
- type MultiWatcher
- type PackMode
- type Packer
- type Packet
- type PacketServer
- type PacketServerConfig
- type Pool
- func (p *Pool) Call(ctx context.Context, method uint32, data []byte) ([]byte, error)
- func (p *Pool) CallNext(ctx context.Context, method uint32, data []byte) (ch *Channel, resp []byte, err error)
- func (p *Pool) CancelDial()
- func (p *Pool) Close()
- func (p *Pool) CloseConnections()
- func (p *Pool) DialBackground() (<-chan error, error)
- func (p *Pool) DialForeground(ctx context.Context) (<-chan error, error)
- func (p *Pool) Done() <-chan struct{}
- func (p *Pool) DropConnections()
- func (p *Pool) GetBytes(n int) []byte
- func (p *Pool) Hijack(cb func(conn net.Conn, rbuf []byte))
- func (p *Pool) Init(ctx context.Context) (err error)
- func (p *Pool) InitAll(ctx context.Context) (err error)
- func (p *Pool) NextChannel(ctx context.Context) (c *Channel, err error)
- func (p *Pool) Notify(ctx context.Context, method uint32, data []byte) (err error)
- func (p *Pool) OnClose(cb func())
- func (p *Pool) Online() []*Channel
- func (p *Pool) PutBytes(bts []byte)
- func (p *Pool) RemoteAddr() net.Addr
- func (p *Pool) ResetDialThrottle()
- func (p *Pool) Send(ctx context.Context, pkt Packet) (err error)
- func (p *Pool) SetDialThrottle(m time.Time)
- func (p *Pool) Shutdown()
- func (p *Pool) ShutdownConnections()
- func (p *Pool) Stats() (ret PoolStats)
- func (p *Pool) StoreConn(conn net.Conn) (err error)
- type PoolChange
- type PoolChangeHandle
- type PoolConfig
- type PoolStats
- type RateError
- type Sender
- type ServeMux
- type Server
- type StreamReader
- type StreamWriter
- type Unicast
- func (u *Unicast) Call(ctx context.Context, method uint32, data []byte) ([]byte, error)
- func (u *Unicast) CallNext(ctx context.Context, method uint32, data []byte) (peer *Pool, resp []byte, err error)
- func (u *Unicast) Notify(ctx context.Context, method uint32, data []byte) error
- func (u *Unicast) Send(ctx context.Context, pkt Packet) error
- type Unpacker
Constants ¶
const ( MessagePing = 0xff00 MessageShutdown = 0xff01 )
Constant control message codes. All control message codes should be greater than MessagePing.
const ( DefaultPingInterval = time.Minute DefaultShutdownTimeout = 5 * time.Second DefaultReadBufferSize = 256 DefaultSizeLimit = 1e8 DefaultWriteQueueSize = 50 DefaultWriteTimeout = 5 * time.Second DefaultWriteBufferSize = 4096 DefaultRequestTimeout = 5 * time.Second DefaultNoticeTimeout = 1 * time.Second )
const ( DefaultPacketReadBufferSize = 1 << 16 // maximum udp datagram size. DefaultMaxTransmissionUnit = 1500 )
const ( DefaultPoolConnectTimeout = time.Second DefaultPoolSize = 1 DefaultPoolRedialInterval = time.Millisecond * 10 DefaultPoolRedialTimeout = time.Hour )
const DefaultPacketLimit = 1 << 16
Variables ¶
var ( ErrEmptyPeers = fmt.Errorf("empty peers") ErrNoPeer = fmt.Errorf("no peer") )
var ( ErrStopped = fmt.Errorf("channel is stopped") ErrHijacked = fmt.Errorf("channel connection hijacked") ErrCutoff = fmt.Errorf("pool is offline") ErrPoolFull = fmt.Errorf("pool is full") ErrThrottled = fmt.Errorf("dial attempt was throttled") ErrNoChannel = fmt.Errorf("get channel error: no channel before timeout") ErrPolicied = fmt.Errorf("policied due to rate limit") )
var DefaultServeMux = NewServeMux()
var ErrDroppedConn = errors.New("connection is gone")
var ErrTimeout = errors.New("timed out")
Functions ¶
func Append ¶
Append is similar to Pack function, but it appends to the slice instead of unconditionally creating a new slice.
func AppendBER ¶
AppendBER is similar to Pack function, but it appends to the slice instead of unconditionally creating a new slice. BER-encoding is used by default for every item.
func DefaultAlloc ¶
func ListenAndServe ¶
ListenAndServe creates listening socket on addr and starts serving IProto connections with default configured Server.
func MarshalPacket ¶
MarshalPacket returns binary representation of pkt.
func NotifyPoolChangeWith ¶
func NotifyPoolChangeWith(h *PoolChangeHandle, ch chan<- PoolChange, pools ...*Pool)
NotifyPoolChangeWith is the same as NotifyPoolChange except that it uses already existing PoolChangeHandle.
This could be useful to stop notifications after multiple NotifyPoolChange*() calls by a single call to h.Stop().
func Pack ¶
Pack encodes values into a byte slice using default mode (little endian fixed length ints).
Strings and slices are prefixed by item count. Structs are encoded field-by-field in the order of definition.
For struct fields, `iproto:"ber"` tag can be used to use BER encoding for a particular field. BER-encoding used is similar to one used by perl 'pack()' function and is different from varint encoding used in go standard library. BER-encoding is not recursive: for slices only the length of the slice will be BER-encoded, not the items.
func PackBER ¶
PackBER is similar to Pack function, except that BER-encoding is used by default for every item.
func PackString ¶
PackString packs string length (according to mode) followed by string data.
func PackUint16 ¶
PackUint16 packs a single 16-bit integer according to mode.
func PackUint32 ¶
PackUint32 packs a single 32-bit integer according to mode.
func PackUint64 ¶
PackUint64 packs a single 64-bit integer according to mode.
func PacketSize ¶
PacketSize returns packet binary representation size.
func PutPacket ¶
PutPacket puts packet binary representation to given slice. Note that it will panic if p doesn't fit PacketSize(pkt).
func Unpack ¶
Unpack decodes the data as encoded by Pack function. Remaining bytes are returned on success.
func UnpackBER ¶
UnpackBER is similar to Unpack function, except that BER-encoding is used by default for every item.
func UnpackBytes ¶
UnpackBytes unpacks raw bytes prefixed by length, length is packed according to mode.
func UnpackString ¶
UnpackString unpacks a string prefixed by length, length is packed according to mode.
func UnpackUint16 ¶
UnpackUint16 unpacks a single 16-bit integer according to mode.
func UnpackUint32 ¶
UnpackUint32 unpacks a single 32-bit integer according to mode.
func UnpackUint64 ¶
UnpackUint64 unpacks a single 64-bit integer according to mode.
func UnpackUint8 ¶
UnpackUint8 unpacks a single 8-bit integer.
func WatchPoolConnCount ¶
WatchPoolConnCount is a helper function that makes channel to receive pool changes, starts a goroutine to accumulate that changes and calls cb with total number of connections at every change.
Caller should pass not initialized pool. That is caller should not make iproto.Dial and then WatchPoolConnCount. The right way to do this is to make NewPool, followed by WatchPoolConnCount, optionally followed by pool.Init. In other way the value passed to cb will contain number of connections which are established/closed only after WatchPoolConnCount call, not the all connections number.
Note that if pool config.ChannelConfig.Init returns error, callback will not be called about the established connection nor the closure of it.
Types ¶
type AcceptFn ¶
type AcceptFn func(net.Conn, *ChannelConfig) (*Channel, error)
AcceptFn allows user to construct Channel manually. It receives the server's context as first argument. It is user responsibility to make sub context if needed.
type BodyAllocator ¶
type BytePool ¶
type BytePool interface { // Get obtains buffer from pool with given length. Get(int) []byte // Put reclaims given buffer for further reuse. Put([]byte) }
BytePool describes an object that contains bytes buffer reuse logic.
type Channel ¶
type Channel struct {
// contains filtered or unexported fields
}
Channel provides low-level packet send/receive operations that are same for both server and client.
func DefaultAccept ¶
func DefaultAccept(conn net.Conn, cfg *ChannelConfig) (*Channel, error)
func NewChannel ¶
func NewChannel(conn net.Conn, config *ChannelConfig) *Channel
func RunChannel ¶
func RunChannel(conn net.Conn, config *ChannelConfig) (*Channel, error)
RunChannel is a helper function that calls NewChannel and ch.Init sequentially.
func (*Channel) Call ¶
Call sends request with given method and data. It guarantees that caller will receive response or corresponding error.
Note that it will return error after c.config.RequestTimeout even if ctx has higher timeout.
func (*Channel) Close ¶
func (c *Channel) Close()
Close drops the channel and returns when all resources are done.
func (*Channel) Done ¶
func (c *Channel) Done() <-chan struct{}
Done returns channel which closure means that underlying connection and other resources are closed. Returned channel is closed before any OnClose callback is called.
func (*Channel) Drop ¶
func (c *Channel) Drop()
Drop closes underlying connection and sends signals to reader and writer goroutines to stop. It returns immediately, without awaiting for goroutines to exit.
func (*Channel) Hijack ¶
Hijack lets the caller take over the channel connection. After a call to Hijack channel gracefully stops its i/o routines and after that it will not do anything with the connection.
Actually it calls Shutdown() under the hood, with exception that it does not sends shutdown packet to the peer and not closes the connection after reader and writer routines are done.
It returns reader bytes buffer. It may contain data that was read from the connection after a call to Hijack and before Shutdown() returns. It may be not aligned to the Packet bounds.
It returns non-nil error when channel is already stopped or hijacked or when error occured somewhere while shutting down the channel.
func (*Channel) Init ¶
Init makes Channel "alive". It starts necessary goroutines for packet processing and does other things like setting ping and idle timers.
Note that Run could be called only once. All non-first calls will do nothing and return nil as an error.
Callers should call ch.Close() or ch.Shutdown() to close the Channel and its underlying connection or cancel context set by ch.SetContext.
func (*Channel) Notify ¶
Notify sends request with given method and data in 'fire and forget' manner.
Note that it will return error after c.config.RequestTimeout even if ctx has higher timeout.
func (*Channel) OnClose ¶
func (c *Channel) OnClose(cb func())
OnClose allows to run a callback after channel and all its underlying resources are closed.
Note that your cb could be delayed by other cb registered before and vise versa – if you doing some long job, you could delay other callbacks registered before.
If you want to be informed as soon as possible, use c.Done(), which is closed before any callback is called.
func (*Channel) OnShutdown ¶
func (c *Channel) OnShutdown(cb func())
OnShutdown registers given callback to be called right before sending MessageShutdown to the peer (as response to already received message or as initial packet in shutdown "handshake").
Note that a timer with ChannelConfig.ShutdownTimeout is initialized after callbacks execution. That is, if some callback did stuck, then whole Shutdown routine will also do. Note that if channel is already in shutdown phase, then callback will not be called.
func (*Channel) ReadDone ¶
func (c *Channel) ReadDone() <-chan struct{}
ReadDone returns channel which closure means that channel stopped to read.
func (*Channel) RemoteAddr ¶
RemoteAddr returns remote network address of the underlying connection.
func (*Channel) Send ¶
Send sends given packet.
Note that it is preferable way of sending request response.
It does not check state of channel as Call and Notify do. That is, it allows to try to send packet even if Channel is in shutdown state.
func (*Channel) SetContext ¶
SetContext saves context inside Channel and used for graceful channel interrupt
func (*Channel) Shutdown ¶
func (c *Channel) Shutdown()
Shutdown closes channel gracefully. It stops writing to the underlying connection and continues read from connection for a while if any pending requests were done. Then Close method called.
func (*Channel) Stats ¶
func (c *Channel) Stats() ChannelStats
func (*Channel) WriteClosed ¶
WriteClosed returns true if channel is closed for writing.
type ChannelConfig ¶
type ChannelConfig struct { Handler Handler ReadBufferSize int SizeLimit uint32 WriteQueueSize int WriteTimeout time.Duration WriteBufferSize int RequestTimeout time.Duration NoticeTimeout time.Duration ShutdownTimeout time.Duration IdleTimeout time.Duration PingInterval time.Duration DisablePing bool DisableShutdown bool Logger Logger BytePool BytePool // Init is called right after Channel initialization inside Run() method. // That is, when Init is called, reader and writer goroutines are started // already. Thus two things should be noticed: first, you can send any // packets to peer inside Init; second, channel is already handling packets // when Init is called, so beware of races. // // If Init returns error the channel will be closed immediately. // // It is guaranteed that the callbacks set by Channel.On*() methods inside // Init can only be called after Init returns. // // It is Init implementation responsibility to handle timeouts during // initialization. That is, it could block some significant iproto parts // depending on caller – iproto.Server within accept loop, iproto.Pool // withing dialing, or some custom user initiator. // // If Init is nil then no additional initialization prepared. Init func(context.Context, *Channel) error // GetCustomRequestTimeout provides the ability to change request timeout // For example to increase the timeout only for a first request GetCustomRequestTimeout func() time.Duration }
func CopyChannelConfig ¶
func CopyChannelConfig(c *ChannelConfig) *ChannelConfig
CopyChannelConfig returns deep copy of c. If c is nil, it returns new ChannelConfig.
type ChannelStats ¶
type ChannelStats struct { PacketsSent uint32 // total number of sent packets PacketsReceived uint32 // total number of received packets PacketsDropped uint32 // total number of dropped packets BytesSent uint32 // total number of sent bytes BytesReceived uint32 // total number of received bytes CallCount uint32 // total number of call attempts CanceledCount uint32 // total number of call attempts (failed or not) that has been dropped by any reason PendingCount uint32 // number of calls sent, for which we still expect a response NoticesCount uint32 // total number of notify attempts }
ChannelStats shows statistics of channel usage.
func (ChannelStats) Add ¶
func (c ChannelStats) Add(s ChannelStats) ChannelStats
func (*ChannelStats) Copy ¶
func (c *ChannelStats) Copy() (s ChannelStats)
type Closer ¶
type Closer interface { Close() Shutdown() Done() <-chan struct{} OnClose(func()) }
Closer represents channel that could be closed.
type Cluster ¶
type Cluster struct {
// contains filtered or unexported fields
}
Cluster is a wrapper around unique slice of Pools. It helps to atomically read and modify that slice. The uniqueness criteria is pool remote address.
func (*Cluster) Insert ¶
Insert inserts given pool to the peer list. It returns a flag that become true if there was no pool with the same remote address in the list before.
func (*Cluster) OnBeforeChange ¶
OnBeforeChange registers given callback to be called right before peers list changes. Note that it will be called under Cluster inner mutex held.
func (*Cluster) Remove ¶
Remove removes given pool from the peer list. It returns a flag that become true if there was pool with the same remote address in the list before.
func (*Cluster) Reset ¶
Reset makes cluster peers list to be equal to the given slice of Pools. It tries to leave existing pools untouched if they are present in ps. It returns three slices of Pools: first one is those which were mixed in to the existing list and probably must to be initialized; the second one is those which were removed from the existing list and probably must to be closed; the third one is thoe which were ignored from the given slice due to deduplication.
That is, it provides ability to merge two slices of Pools – currently existing and new one.
So the most common way to use Reset() is to call it with slice of uninitialized pools, postponing their initialization to the moment when Reset() returns the slice of pools which are actually should be initialized.
Note that it compares pools by their remote address, not by equality of values (pointer value).
func (*Cluster) ResetAndDisableInserted ¶
ResetAndDisableInserted is the same as Reset but it does not enables inserted peers. Instead, it marks inserted peers as disabled and lets caller to Enable() them later.
type Conn ¶
type Conn interface { Sender Closer // GetBytes obtains bytes from the Channel's byte pool. GetBytes(n int) []byte // PutBytes reclaims bytes to the Channel's byte pool. PutBytes(p []byte) RemoteAddr() net.Addr LocalAddr() net.Addr }
Conn represents channel that has ability to reply to received packets.
type DefaultLogger ¶
type DefaultLogger struct { // Prefix is used as a default prefix when given ctx does not implement a // ctxlog.Context. Prefix string }
DefaultLogger implements Logger interface.
type Handler ¶
type Handler interface { // ServeIProto called on each incoming non-technical Packet. // It called with underlying channel context. It is handler responsibility to make sub context if needed. ServeIProto(ctx context.Context, c Conn, p Packet) }
Handler represents IProto packets handler.
func ParallelHandler ¶
ParallelHandler wraps handler and starts goroutine for each request on demand. It runs maximum n goroutines in one time. After serving request goroutine is exits.
func PoolHandler ¶
PoolHandler returns Handler that schedules to handle packets by h in given pool p.
func RecoverHandler ¶
RecoverHandler tries to make recover after handling packet. If panic was occured it logs its message and stack of panicked goroutine. Note that this handler should be the last one in the chain of handler wrappers, e.g.: PoolHandler(RecoverHandler(h)) or ParallelHandler(RecoverHandler(h)).
type HandlerFunc ¶
func (HandlerFunc) ServeIProto ¶
func (f HandlerFunc) ServeIProto(ctx context.Context, c Conn, p Packet)
type MultiWatcher ¶
type MultiWatcher struct { // MinAlivePeers specifies a number of alive peers, less than which // we consider the cluster to be detached. // // If MinAlivePeers is zero, then even an empty list of peers means that // cluster is attached. MinAlivePeers int // StrictGraceful expects only graceful connections closure. Non-graceful // closure of the connection leads to OnDetached() call. // After non-graceful peer reestablished connection, OnAttached will be // called. StrictGraceful bool // OnDetached is called when there are no enough alive peers or some peers // are broken or uninitialized. OnDetached func() // OnAttached is called when cluster get enough alive peers with no any // broken or uninitialized peer. OnAttached func() // OnStateChange is called when some peer from cluster changes its state. // It receives peer itself, number of its alive connections and the // sign of the broken state (e.g. when closed not gracefully). The last // argument is the total number of alive peers in the cluster. OnStateChange func(peer *Pool, alive int, broken bool, alivePeers int) // Logger contains logger interface. Logger Logger // contains filtered or unexported fields }
MultiWatcher allows to watch on the state of the multiple pools.
The initial state of watcher is always detached.
func (*MultiWatcher) Init ¶
func (w *MultiWatcher) Init()
func (*MultiWatcher) MarkInitialized ¶
func (w *MultiWatcher) MarkInitialized(p *Pool)
MarkInitialized marsk given peer as initialized. It makes sense to call MarkInitialized() only when MarkNotInitialized() was called before.
func (*MultiWatcher) MarkNotInitialized ¶
func (w *MultiWatcher) MarkNotInitialized(p *Pool)
MarkNotInitialized marks given peer as not initialized yet. That is, cluster will become detached if there are any not initialized members. Call to MarkNotInitialized() suggests further MarkInitialized() call.
func (*MultiWatcher) Remove ¶
func (w *MultiWatcher) Remove(p *Pool)
Remove removes any data related to pool p. Note that it may lead watcher to constantly detached state, if some channel of p was closed gracelessly. That is, caller must call p.Shutdown() before calling w.Remove(p).
func (*MultiWatcher) Stop ¶
func (w *MultiWatcher) Stop()
func (*MultiWatcher) Watch ¶
func (w *MultiWatcher) Watch(p *Pool)
type PackMode ¶
type PackMode int
PackMode defines encoding mode for integers and string/slice lengths. Mode is not recursive and applies only to the top-level element.
type Packet ¶
func ReadPacket ¶
ReadPacket reads Packet from r.
func ReadPacketLimit ¶
ReadPacketLimit reads Packet from r. Size of packet's payload is limited to be at most n.
func ResponseTo ¶
ResponseTo constructs new Packet that is a response to p.
type PacketServer ¶
type PacketServer struct {
// contains filtered or unexported fields
}
PacketServer contains logic of handling iproto packets from datagram oriented network protocols such as "udp" or "unixgram".
func ListenPacket ¶
func ListenPacket(network, addr string, config *PacketServerConfig) (*PacketServer, error)
func NewPacketServer ¶
func NewPacketServer(conn net.PacketConn, config *PacketServerConfig) *PacketServer
func (*PacketServer) Close ¶
func (p *PacketServer) Close()
func (*PacketServer) GetBytes ¶
func (p *PacketServer) GetBytes(n int) []byte
func (*PacketServer) Init ¶
func (p *PacketServer) Init() (err error)
func (*PacketServer) LocalAddr ¶
func (p *PacketServer) LocalAddr() net.Addr
func (*PacketServer) PutBytes ¶
func (p *PacketServer) PutBytes(bts []byte)
func (*PacketServer) Shutdown ¶
func (p *PacketServer) Shutdown()
type PacketServerConfig ¶
type PacketServerConfig struct { Handler Handler MaxTransmissionUnit int WriteQueueSize int WriteTimeout time.Duration ReadBufferSize int SizeLimit uint32 BytePool BytePool Logger Logger ShutdownTimeout time.Duration OnPeerShutdown func(net.Addr) }
func CopyPacketServerConfig ¶
func CopyPacketServerConfig(c *PacketServerConfig) *PacketServerConfig
CopyPacketServerConfig returns deep copy of c. If c is nil, it returns new PacketServerConfig.
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
Pool represents struct that could manage multiple iproto connections to one host. It establishes/reestablishes connections automatically.
func Dial ¶
Dial creates new pool of iproto channel(s). It returns error if initialization of first channel was failed.
Callers should call pool.Close() or pool.Shutdown() methods when work with created pool is complete.
Given ctx will not be saved inside Pool. If ctx implements ctxlog.Context and config does not contains Logger, then DefaultLogger will be used with Prefix set to ctx.LogPrefix().
func NewPool ¶
func NewPool(network, addr string, config *PoolConfig) *Pool
NewPool creates new pool of iproto connections at addr.
func (*Pool) CallNext ¶
func (p *Pool) CallNext(ctx context.Context, method uint32, data []byte) (ch *Channel, resp []byte, err error)
CallNext finds next alive channel in pool and returns found channel and its Call() result.
func (*Pool) Close ¶
func (p *Pool) Close()
Close closes all underlying channels with channel.Close() method.
func (*Pool) CloseConnections ¶
func (p *Pool) CloseConnections()
CloseConnections resets a list of online channels. It calls Close() on each of ex-online channel and returns when all calls are done.
func (*Pool) DialBackground ¶
DialBackground initiates background dial for a new connection. If pool is configured to not RedialForever, then dial will end up after RedialTimeout config parameter.
It returns a chan which closure means that dial were finished. Non-nil error means that dial was not started.
Callers must take care of senseless dial when poll is full.
func (*Pool) DialForeground ¶
DialForeground initiates background dial for a new connection. If pool is configured to not RedialForever, then dial will end up after context's deadline exceeded.
It returns a chan which closure means that dial were finished. Non-nil error means that dial was not started.
Callers must take care of senseless dial when poll is full.
func (*Pool) Done ¶
func (p *Pool) Done() <-chan struct{}
Done returns channel which closure signaling that pool resources are freed and no more actions could be performed. Note that Done is useful only after pool.{Close,Shutdown} calls.
func (*Pool) DropConnections ¶
func (p *Pool) DropConnections()
DropConnections resets a list of online channels. It calls Drop() on each of ex-online channel and returns when all calls are done.
func (*Pool) Hijack ¶
Hijack resets a list of online channels. It calls callback cb for every ex-online channel with result of its Hijack() call. If Hijack returns error it does not call a callback.
Note that it is a caller responsibility to manage ownership of the returned channels that could be shared across goroutines by NextChannel(), Call(), Notify() and Send() calls.
func (*Pool) InitAll ¶
InitAll is the same as Init, except that it tries to get N connections, N = PoolConfig.Size || 1.
func (*Pool) NextChannel ¶
NextChannel returns next alive channel. It could try to establish new connection and create new Channel. If establishing of new connection is failed, it tries to wait some time and repeat.
Note that returned Channel could be closed while receiving it from the Pool. In this case additional NextChannel() may be made to get next alive channel.
func (*Pool) OnClose ¶
func (p *Pool) OnClose(cb func())
OnClose allows to run a callback after pool resources are freed and no more actions could be performed. Note that your cb could be delayed by other cb registered before. And vise versa – if you doing some hard long job, you could delay other callbacks registered before. If you want to be informed as soon as possible, use pool.Done(). Note that cb will be called only after pool.{Close,Shutdown} calls.
func (*Pool) RemoteAddr ¶
func (*Pool) ResetDialThrottle ¶
func (p *Pool) ResetDialThrottle()
ResetDialThrottle resets the dial throttle timeout such that next dial attempt for a new connection will not be canceled due to throttled logic. That is, it works for pool which was configured with non-zero DialThrottle option.
func (*Pool) SetDialThrottle ¶
SetDialThrottle sets up time point before which underlying dialer will fail its dial attempts.
func (*Pool) Shutdown ¶
func (p *Pool) Shutdown()
Shutdown closes all underlying channels by calling channel.Shutdown().
func (*Pool) ShutdownConnections ¶
func (p *Pool) ShutdownConnections()
ShutdownConnections resets a list of online channels. It calls Shutdown() on each of ex-online channel and returns when all calls are done.
type PoolChange ¶
type PoolChange struct { // Pool is a source of changes. Pool *Pool // Channel points to channel, that was opened/closed. Channel *Channel // Conn contains difference of connection count in pool. Conn int8 }
PoolChange represents pool state change.
type PoolChangeHandle ¶
type PoolChangeHandle struct {
// contains filtered or unexported fields
}
PoolChangeHandle helps to control PoolChange notifications flow.
func NewPoolChangeHandle ¶
func NewPoolChangeHandle() *PoolChangeHandle
func NotifyPoolChange ¶
func NotifyPoolChange(ch chan<- PoolChange, pools ...*Pool) *PoolChangeHandle
NotifyPoolChange makes every given pool to relay its changes into ch.
It will block sending to ch: the caller must ensure that ch has sufficient buffer space to not block the pool.
Caller should pass not initialized pool. That is, caller should not make iproto.Dial and then WatchPoolConnCount. The right way to do this is to make NewPool, followed by WatchPoolConnCount, optionally followed by pool.Init.
Note that if some pool config.ChannelConfig.Init returns error, given ch will not be notified about the established connection nor the closure of it.
It returns PoolChangeHandle that could be used to control notifications sent to ch.
func (*PoolChangeHandle) Stop ¶
func (p *PoolChangeHandle) Stop()
func (*PoolChangeHandle) Stopped ¶
func (p *PoolChangeHandle) Stopped() <-chan struct{}
type PoolConfig ¶
type PoolConfig struct { // Size limits number of connections used by Pool. // When Size is zero, single connection is used. Size int // ChannelConfig will be passed to Channel constructor when new connection // established. ChannelConfig *ChannelConfig // NetDial allows to override standard net.Dial logic. NetDial func(ctx context.Context, network, addr string) (net.Conn, error) // ConntectTimeout is the maximum time spent on awaiting for alive iproto connection // for sending packet. ConnectTimeout time.Duration // DialTimeout is the maximum time spent on establishing new iproto connection. DialTimeout time.Duration // RedialInterval is used inside dial routine for delaying next attempt to establish // new iproto connection after previous attempt was failed. RedialInterval time.Duration // MaxRedialInterval is the maximum delay before next attempt to establish new iproto // connection is prepared. Note that RedialInterval is used as initial delay time, // and could be increased by every dial attempt up to MaxRedialInterval. MaxRedialInterval time.Duration // RedialTimeout is the maximum time spent in dial routine. // If RedialForever is true this parameter is not used. RedialTimeout time.Duration // RedialForever configures the underlying dialer to dial without any // timeout. RedialForever bool // FailOnCutoff prevents Pool of waiting for a dial for a new // Channel when there are no online Channels to proceed the request. // If this field is true, then any Call() Notify() or NextChannel() calls // will be failed with ErrCutoff error if no online Channels currently are // available. FailOnCutoff bool // DialThrottle allows to limit rate of the dialer routine execution. DialThrottle time.Duration // Logger is used to handle some log messages. // If nil, default logger is used. Logger Logger // If true, pool will not try to establish a new connection right after one // is closed. DisablePreDial bool // RateLimit is the limit parameter for golang.org/x/time/rate token bucket rate limiter. // A zero value causes the rate limiter to reject all events if `RateBurst' is nonzero. RateLimit rate.Limit // RateBurst is the burst parameter of time/rate token bucket rate limiter. // A zero value disables the rate limiter. RateBurst int // RateWait is used to determine the desired behaviour of request rate limiting // on the pool. A zero value sets the requests to rather be "policied" than "shaped", // and the corresponding function calls will return `RateError' caused by `ErrPolicied'. // when limiter is unable to satisfy the request. With a value of `true' the limiter will // rather provide "shaping" of the request flow, and the corresponding function calls // will either block until the request can be satisfied, or return at once if the // predicted wait time exceeds the context deadline. Cancellation of the context also // forces a premature return with an error. // See golang.org/x/time/rate documentation for .Allow() and .Wait() behaviour. RateWait bool }
PoolConfig contains options for Pool configuration.
func CopyPoolConfig ¶
func CopyPoolConfig(c *PoolConfig) (pc *PoolConfig)
CopyPoolConfig return deep copy of c. If c is nil, it returns new PoolConfig. Returned config always contains non-nil ChannelConfig.
type PoolStats ¶
type PoolStats struct { ChannelStats DialCount uint32 // total number of net.Dial calls DialErrors uint32 // total number of failed net.Dial calls WaitErrors uint32 // total number of failed expectation for free channel Online int // number of online channels }
type RateError ¶
type RateError struct {
Err error
}
RateError provides the ability to determine if the request exceeds current pool's rate limit
type Sender ¶
type Sender interface { Call(ctx context.Context, message uint32, data []byte) ([]byte, error) Notify(ctx context.Context, message uint32, data []byte) error Send(ctx context.Context, packet Packet) error }
Sender represetns iproto packets sender in different forms.
type ServeMux ¶
type ServeMux struct {
// contains filtered or unexported fields
}
func NewServeMux ¶
func NewServeMux() *ServeMux
type Server ¶
type Server struct { // Accept allow to rewrite default Channel creation logic. // // Normally, without setting Accept field, every accepted connection share // the same ChannelConfig. This could bring some trouble, when server // accepts two connections A and B, and handles packets from them with one // config.Handler, say, with N pooled goroutines. Then, if A will produce // huge stream of packets, B will not get fair amount of work time. The // better approach is to create separate handlers, each with its own pool. // // Note that if Accept returns error, server Serve() method will return // with that error. // // Note that returned Channel's Init() method will be called if err is // non-nil. Returned error from that Init() call will not be checked. Accept AcceptFn // ChannelConfig is used to initialize new Channel on every incoming // connection. // // Note that copy of config is shared across all channels. // To customize this behavior see Accept field of the Server. ChannelConfig *ChannelConfig // Log is used for write errors in serve process Log Logger // OnClose calls on channel close OnClose []func() // OnShutdown calls on channel shutdown OnShutdown []func() }
Server contains options for serving IProto connections.
func (*Server) ListenAndServe ¶
type StreamReader ¶
type StreamReader struct { Source io.Reader SizeLimit uint32 Alloc BodyAllocator // contains filtered or unexported fields }
StreamReader represents iproto stream reader.
func (*StreamReader) LastRead ¶
func (b *StreamReader) LastRead() int
func (*StreamReader) ReadPacket ¶
func (b *StreamReader) ReadPacket() (ret Packet, err error)
ReadPackets reads next packet.
type StreamWriter ¶
StreamWriter represents iproto stream writer.
func (*StreamWriter) WritePacket ¶
func (b *StreamWriter) WritePacket(p Packet) (err error)
WritePacket writes p to the underlying writer.
type Unicast ¶
type Unicast struct { *Cluster // Select contains logic of picking Pool from cluster's peers for further // Call/Notify call. // If Select is nil, then round-robin algorithm is used. Select func(context.Context, *Cluster) (*Pool, error) // contains filtered or unexported fields }
Unicast wraps Cluster for sending messages to any single cluster's peers.
func (*Unicast) CallNext ¶
func (u *Unicast) CallNext(ctx context.Context, method uint32, data []byte) (peer *Pool, resp []byte, err error)
CallNext selects peer form cluster's peers and makes Call() on it.