iproto

package
v1.5.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 17, 2023 License: MIT Imports: 25 Imported by: 7

Documentation

Index

Constants

View Source
const (
	MessagePing     = 0xff00
	MessageShutdown = 0xff01
)

Constant control message codes. All control message codes should be greater than MessagePing.

View Source
const (
	DefaultPingInterval    = time.Minute
	DefaultShutdownTimeout = 5 * time.Second

	DefaultReadBufferSize = 256
	DefaultSizeLimit      = 1e8

	DefaultWriteQueueSize  = 50
	DefaultWriteTimeout    = 5 * time.Second
	DefaultWriteBufferSize = 4096

	DefaultRequestTimeout = 5 * time.Second
	DefaultNoticeTimeout  = 1 * time.Second
)
View Source
const (
	DefaultPacketReadBufferSize = 1 << 16 // maximum udp datagram size.
	DefaultMaxTransmissionUnit  = 1500
)
View Source
const (
	DefaultPoolConnectTimeout = time.Second
	DefaultPoolSize           = 1
	DefaultPoolRedialInterval = time.Millisecond * 10
	DefaultPoolRedialTimeout  = time.Hour
)
View Source
const DefaultPacketLimit = 1 << 16

Variables

View Source
var (
	ErrEmptyPeers = fmt.Errorf("empty peers")
	ErrNoPeer     = fmt.Errorf("no peer")
)
View Source
var (
	ErrStopped   = fmt.Errorf("channel is stopped")
	ErrHijacked  = fmt.Errorf("channel connection hijacked")
	ErrCutoff    = fmt.Errorf("pool is offline")
	ErrPoolFull  = fmt.Errorf("pool is full")
	ErrThrottled = fmt.Errorf("dial attempt was throttled")
	ErrNoChannel = fmt.Errorf("get channel error: no channel before timeout")
	ErrPolicied  = fmt.Errorf("policied due to rate limit")
)
View Source
var DefaultServeMux = NewServeMux()
View Source
var ErrDroppedConn = errors.New("connection is gone")
View Source
var ErrTimeout = errors.New("timed out")

Functions

func Append

func Append(data []byte, value ...interface{}) (out []byte, err error)

Append is similar to Pack function, but it appends to the slice instead of unconditionally creating a new slice.

func AppendBER

func AppendBER(data []byte, value ...interface{}) (out []byte, err error)

AppendBER is similar to Pack function, but it appends to the slice instead of unconditionally creating a new slice. BER-encoding is used by default for every item.

func DefaultAlloc

func DefaultAlloc(n int) []byte

func Handle

func Handle(message uint32, handler Handler)

func ListenAndServe

func ListenAndServe(ctx context.Context, network, addr string, h Handler) error

ListenAndServe creates listening socket on addr and starts serving IProto connections with default configured Server.

func MarshalPacket

func MarshalPacket(pkt Packet) []byte

MarshalPacket returns binary representation of pkt.

func NotifyPoolChangeWith

func NotifyPoolChangeWith(h *PoolChangeHandle, ch chan<- PoolChange, pools ...*Pool)

NotifyPoolChangeWith is the same as NotifyPoolChange except that it uses already existing PoolChangeHandle.

This could be useful to stop notifications after multiple NotifyPoolChange*() calls by a single call to h.Stop().

func Pack

func Pack(value ...interface{}) (out []byte, err error)

Pack encodes values into a byte slice using default mode (little endian fixed length ints).

Strings and slices are prefixed by item count. Structs are encoded field-by-field in the order of definition.

For struct fields, `iproto:"ber"` tag can be used to use BER encoding for a particular field. BER-encoding used is similar to one used by perl 'pack()' function and is different from varint encoding used in go standard library. BER-encoding is not recursive: for slices only the length of the slice will be BER-encoded, not the items.

func PackBER

func PackBER(value ...interface{}) (out []byte, err error)

PackBER is similar to Pack function, except that BER-encoding is used by default for every item.

func PackBytes

func PackBytes(w []byte, v []byte, mode PackMode) []byte

PackBytes packs data length (according to mode) followed by the data.

func PackString

func PackString(w []byte, v string, mode PackMode) []byte

PackString packs string length (according to mode) followed by string data.

func PackUint16

func PackUint16(w []byte, v uint16, mode PackMode) []byte

PackUint16 packs a single 16-bit integer according to mode.

func PackUint32

func PackUint32(w []byte, v uint32, mode PackMode) []byte

PackUint32 packs a single 32-bit integer according to mode.

func PackUint64

func PackUint64(w []byte, v uint64, mode PackMode) []byte

PackUint64 packs a single 64-bit integer according to mode.

func PackUint8

func PackUint8(w []byte, v uint8, mode PackMode) []byte

PackUint8 packs a single 8-bit integer.

func PacketSize

func PacketSize(pkt Packet) int

PacketSize returns packet binary representation size.

func PutPacket

func PutPacket(p []byte, pkt Packet)

PutPacket puts packet binary representation to given slice. Note that it will panic if p doesn't fit PacketSize(pkt).

func Unpack

func Unpack(data []byte, value ...interface{}) ([]byte, error)

Unpack decodes the data as encoded by Pack function. Remaining bytes are returned on success.

func UnpackBER

func UnpackBER(data []byte, value ...interface{}) ([]byte, error)

UnpackBER is similar to Unpack function, except that BER-encoding is used by default for every item.

func UnpackBytes

func UnpackBytes(r *bytes.Reader, v *[]byte, mode PackMode) (err error)

UnpackBytes unpacks raw bytes prefixed by length, length is packed according to mode.

func UnpackString

func UnpackString(r *bytes.Reader, v *string, mode PackMode) (err error)

UnpackString unpacks a string prefixed by length, length is packed according to mode.

func UnpackUint16

func UnpackUint16(r *bytes.Reader, v *uint16, mode PackMode) (err error)

UnpackUint16 unpacks a single 16-bit integer according to mode.

func UnpackUint32

func UnpackUint32(r *bytes.Reader, v *uint32, mode PackMode) (err error)

UnpackUint32 unpacks a single 32-bit integer according to mode.

func UnpackUint64

func UnpackUint64(r *bytes.Reader, v *uint64, mode PackMode) (err error)

UnpackUint64 unpacks a single 64-bit integer according to mode.

func UnpackUint8

func UnpackUint8(r *bytes.Reader, v *uint8, mode PackMode) (err error)

UnpackUint8 unpacks a single 8-bit integer.

func WatchPoolConnCount

func WatchPoolConnCount(p *Pool, cb func(int))

WatchPoolConnCount is a helper function that makes channel to receive pool changes, starts a goroutine to accumulate that changes and calls cb with total number of connections at every change.

Caller should pass not initialized pool. That is caller should not make iproto.Dial and then WatchPoolConnCount. The right way to do this is to make NewPool, followed by WatchPoolConnCount, optionally followed by pool.Init. In other way the value passed to cb will contain number of connections which are established/closed only after WatchPoolConnCount call, not the all connections number.

Note that if pool config.ChannelConfig.Init returns error, callback will not be called about the established connection nor the closure of it.

func WritePacket

func WritePacket(w io.Writer, p Packet) (err error)

WritePacket writes p to w.

Types

type AcceptFn

type AcceptFn func(net.Conn, *ChannelConfig) (*Channel, error)

AcceptFn allows user to construct Channel manually. It receives the server's context as first argument. It is user responsibility to make sub context if needed.

type BodyAllocator

type BodyAllocator func(int) []byte

type BytePool

type BytePool interface {
	// Get obtains buffer from pool with given length.
	Get(int) []byte
	// Put reclaims given buffer for further reuse.
	Put([]byte)
}

BytePool describes an object that contains bytes buffer reuse logic.

func BytePoolFunc

func BytePoolFunc(get func(int) []byte, put func([]byte)) BytePool

BytePoolFunc returns BytePool that uses given get and put functions as its methods.

type Channel

type Channel struct {
	// contains filtered or unexported fields
}

Channel provides low-level packet send/receive operations that are same for both server and client.

func DefaultAccept

func DefaultAccept(conn net.Conn, cfg *ChannelConfig) (*Channel, error)

func NewChannel

func NewChannel(conn net.Conn, config *ChannelConfig) *Channel

func RunChannel

func RunChannel(conn net.Conn, config *ChannelConfig) (*Channel, error)

RunChannel is a helper function that calls NewChannel and ch.Init sequentially.

func (*Channel) Call

func (c *Channel) Call(ctx context.Context, method uint32, data []byte) (resp []byte, err error)

Call sends request with given method and data. It guarantees that caller will receive response or corresponding error.

Note that it will return error after c.config.RequestTimeout even if ctx has higher timeout.

func (*Channel) Close

func (c *Channel) Close()

Close drops the channel and returns when all resources are done.

func (*Channel) Closed

func (c *Channel) Closed() bool

Closed returns true if channel is closed.

func (*Channel) Done

func (c *Channel) Done() <-chan struct{}

Done returns channel which closure means that underlying connection and other resources are closed. Returned channel is closed before any OnClose callback is called.

func (*Channel) Drop

func (c *Channel) Drop()

Drop closes underlying connection and sends signals to reader and writer goroutines to stop. It returns immediately, without awaiting for goroutines to exit.

func (*Channel) Error

func (c *Channel) Error() error

Error returns error that caused close of the channel.

func (*Channel) GetBytes

func (c *Channel) GetBytes(n int) []byte

GetBytes obtains bytes from the Channel's byte pool.

func (*Channel) Hijack

func (c *Channel) Hijack() (conn net.Conn, rbuf []byte, err error)

Hijack lets the caller take over the channel connection. After a call to Hijack channel gracefully stops its i/o routines and after that it will not do anything with the connection.

Actually it calls Shutdown() under the hood, with exception that it does not sends shutdown packet to the peer and not closes the connection after reader and writer routines are done.

It returns reader bytes buffer. It may contain data that was read from the connection after a call to Hijack and before Shutdown() returns. It may be not aligned to the Packet bounds.

It returns non-nil error when channel is already stopped or hijacked or when error occured somewhere while shutting down the channel.

func (*Channel) Init

func (c *Channel) Init() (err error)

Init makes Channel "alive". It starts necessary goroutines for packet processing and does other things like setting ping and idle timers.

Note that Run could be called only once. All non-first calls will do nothing and return nil as an error.

Callers should call ch.Close() or ch.Shutdown() to close the Channel and its underlying connection or cancel context set by ch.SetContext.

func (*Channel) LocalAddr

func (c *Channel) LocalAddr() net.Addr

LocalAddr returns local network address of the underlying connection.

func (*Channel) Notify

func (c *Channel) Notify(ctx context.Context, method uint32, data []byte) (err error)

Notify sends request with given method and data in 'fire and forget' manner.

Note that it will return error after c.config.RequestTimeout even if ctx has higher timeout.

func (*Channel) OnClose

func (c *Channel) OnClose(cb func())

OnClose allows to run a callback after channel and all its underlying resources are closed.

Note that your cb could be delayed by other cb registered before and vise versa – if you doing some long job, you could delay other callbacks registered before.

If you want to be informed as soon as possible, use c.Done(), which is closed before any callback is called.

func (*Channel) OnShutdown

func (c *Channel) OnShutdown(cb func())

OnShutdown registers given callback to be called right before sending MessageShutdown to the peer (as response to already received message or as initial packet in shutdown "handshake").

Note that a timer with ChannelConfig.ShutdownTimeout is initialized after callbacks execution. That is, if some callback did stuck, then whole Shutdown routine will also do. Note that if channel is already in shutdown phase, then callback will not be called.

func (*Channel) PutBytes

func (c *Channel) PutBytes(p []byte)

PutBytes reclaims bytes to the Channel's byte pool.

func (*Channel) ReadDone

func (c *Channel) ReadDone() <-chan struct{}

ReadDone returns channel which closure means that channel stopped to read.

func (*Channel) RemoteAddr

func (c *Channel) RemoteAddr() net.Addr

RemoteAddr returns remote network address of the underlying connection.

func (*Channel) Send

func (c *Channel) Send(ctx context.Context, p Packet) error

Send sends given packet.

Note that it is preferable way of sending request response.

It does not check state of channel as Call and Notify do. That is, it allows to try to send packet even if Channel is in shutdown state.

func (*Channel) SetContext

func (c *Channel) SetContext(ctx context.Context)

SetContext saves context inside Channel and used for graceful channel interrupt

func (*Channel) Shutdown

func (c *Channel) Shutdown()

Shutdown closes channel gracefully. It stops writing to the underlying connection and continues read from connection for a while if any pending requests were done. Then Close method called.

func (*Channel) Stats

func (c *Channel) Stats() ChannelStats

func (*Channel) WriteClosed

func (c *Channel) WriteClosed() bool

WriteClosed returns true if channel is closed for writing.

func (*Channel) WriteDone

func (c *Channel) WriteDone() <-chan struct{}

WriteDone returns channel which closure means that channel stopped to write.

type ChannelConfig

type ChannelConfig struct {
	Handler Handler

	ReadBufferSize int
	SizeLimit      uint32

	WriteQueueSize  int
	WriteTimeout    time.Duration
	WriteBufferSize int

	RequestTimeout time.Duration
	NoticeTimeout  time.Duration

	ShutdownTimeout time.Duration
	IdleTimeout     time.Duration
	PingInterval    time.Duration
	DisablePing     bool
	DisableShutdown bool

	Logger   Logger
	BytePool BytePool

	// Init is called right after Channel initialization inside Run() method.
	// That is, when Init is called, reader and writer goroutines are started
	// already. Thus two things should be noticed: first, you can send any
	// packets to peer inside Init; second, channel is already handling packets
	// when Init is called, so beware of races.
	//
	// If Init returns error the channel will be closed immediately.
	//
	// It is guaranteed that the callbacks set by Channel.On*() methods inside
	// Init can only be called after Init returns.
	//
	// It is Init implementation responsibility to handle timeouts during
	// initialization. That is, it could block some significant iproto parts
	// depending on caller – iproto.Server within accept loop, iproto.Pool
	// withing dialing, or some custom user initiator.
	//
	// If Init is nil then no additional initialization prepared.
	Init func(context.Context, *Channel) error

	// GetCustomRequestTimeout provides the ability to change request timeout
	// For example to increase the timeout only for a first request
	GetCustomRequestTimeout func() time.Duration
}

func CopyChannelConfig

func CopyChannelConfig(c *ChannelConfig) *ChannelConfig

CopyChannelConfig returns deep copy of c. If c is nil, it returns new ChannelConfig.

type ChannelStats

type ChannelStats struct {
	PacketsSent     uint32 // total number of sent packets
	PacketsReceived uint32 // total number of received packets
	PacketsDropped  uint32 // total number of dropped packets

	BytesSent     uint32 // total number of sent bytes
	BytesReceived uint32 // total number of received bytes

	CallCount     uint32 // total number of call attempts
	CanceledCount uint32 // total number of call attempts (failed or not) that has been dropped by any reason

	PendingCount uint32 // number of calls sent, for which we still expect a response

	NoticesCount uint32 // total number of notify attempts
}

ChannelStats shows statistics of channel usage.

func (ChannelStats) Add

func (*ChannelStats) Copy

func (c *ChannelStats) Copy() (s ChannelStats)

type Closer

type Closer interface {
	Close()
	Shutdown()
	Done() <-chan struct{}
	OnClose(func())
}

Closer represents channel that could be closed.

type Cluster

type Cluster struct {
	// contains filtered or unexported fields
}

Cluster is a wrapper around unique slice of Pools. It helps to atomically read and modify that slice. The uniqueness criteria is pool remote address.

func (*Cluster) Close

func (c *Cluster) Close()

Close calls Close() on every registered pool.

func (*Cluster) Disable

func (c *Cluster) Disable(p *Pool) bool

func (*Cluster) Enable

func (c *Cluster) Enable(p *Pool) bool

func (*Cluster) Insert

func (c *Cluster) Insert(p *Pool) (inserted bool)

Insert inserts given pool to the peer list. It returns a flag that become true if there was no pool with the same remote address in the list before.

func (*Cluster) OnBeforeChange

func (c *Cluster) OnBeforeChange(cb func([]*Pool))

OnBeforeChange registers given callback to be called right before peers list changes. Note that it will be called under Cluster inner mutex held.

func (*Cluster) Peers

func (c *Cluster) Peers() []*Pool

Peers returns current peer list. Note that it is for read only.

func (*Cluster) Remove

func (c *Cluster) Remove(p *Pool) (removed bool)

Remove removes given pool from the peer list. It returns a flag that become true if there was pool with the same remote address in the list before.

func (*Cluster) Reset

func (c *Cluster) Reset(ps []*Pool) (inserted, removed, ignored []*Pool)

Reset makes cluster peers list to be equal to the given slice of Pools. It tries to leave existing pools untouched if they are present in ps. It returns three slices of Pools: first one is those which were mixed in to the existing list and probably must to be initialized; the second one is those which were removed from the existing list and probably must to be closed; the third one is thoe which were ignored from the given slice due to deduplication.

That is, it provides ability to merge two slices of Pools – currently existing and new one.

So the most common way to use Reset() is to call it with slice of uninitialized pools, postponing their initialization to the moment when Reset() returns the slice of pools which are actually should be initialized.

Note that it compares pools by their remote address, not by equality of values (pointer value).

func (*Cluster) ResetAndDisableInserted

func (c *Cluster) ResetAndDisableInserted(ps []*Pool) (inserted, removed, ignored []*Pool)

ResetAndDisableInserted is the same as Reset but it does not enables inserted peers. Instead, it marks inserted peers as disabled and lets caller to Enable() them later.

func (*Cluster) Shutdown

func (c *Cluster) Shutdown()

Shutdown calls Shutdown() on every registered pool.

type Conn

type Conn interface {
	Sender
	Closer

	// GetBytes obtains bytes from the Channel's byte pool.
	GetBytes(n int) []byte
	// PutBytes reclaims bytes to the Channel's byte pool.
	PutBytes(p []byte)

	RemoteAddr() net.Addr
	LocalAddr() net.Addr
}

Conn represents channel that has ability to reply to received packets.

type DefaultLogger

type DefaultLogger struct {
	// Prefix is used as a default prefix when given ctx does not implement a
	// ctxlog.Context.
	Prefix string
}

DefaultLogger implements Logger interface.

func (DefaultLogger) Debugf

func (d DefaultLogger) Debugf(ctx context.Context, f string, args ...interface{})

Debugf logs message in Sprintf form. If ctx implements ctxlog.Context then ctxlog package will be used to print the message. In other way it prints message with d.Prefix via log package.

func (DefaultLogger) Printf

func (d DefaultLogger) Printf(ctx context.Context, f string, args ...interface{})

Printf logs message in Sprintf form. If ctx implements ctxlog.Context then ctxlog package will be used to print the message. In other way it prints message with d.Prefix via log package.

type Handler

type Handler interface {
	// ServeIProto called on each incoming non-technical Packet.
	// It called with underlying channel context. It is handler responsibility to make sub context if needed.
	ServeIProto(ctx context.Context, c Conn, p Packet)
}

Handler represents IProto packets handler.

func ParallelHandler

func ParallelHandler(h Handler, n int) Handler

ParallelHandler wraps handler and starts goroutine for each request on demand. It runs maximum n goroutines in one time. After serving request goroutine is exits.

func PoolHandler

func PoolHandler(h Handler, p *pool.Pool) Handler

PoolHandler returns Handler that schedules to handle packets by h in given pool p.

func RecoverHandler

func RecoverHandler(h Handler) Handler

RecoverHandler tries to make recover after handling packet. If panic was occured it logs its message and stack of panicked goroutine. Note that this handler should be the last one in the chain of handler wrappers, e.g.: PoolHandler(RecoverHandler(h)) or ParallelHandler(RecoverHandler(h)).

type HandlerFunc

type HandlerFunc func(context.Context, Conn, Packet)

func (HandlerFunc) ServeIProto

func (f HandlerFunc) ServeIProto(ctx context.Context, c Conn, p Packet)
type Header struct {
	Msg  uint32
	Len  uint32
	Sync uint32
}

type Logger

type Logger interface {
	Printf(ctx context.Context, fmt string, v ...interface{})
	Debugf(ctx context.Context, fmt string, v ...interface{})
}

type MultiWatcher

type MultiWatcher struct {
	// MinAlivePeers specifies a number of alive peers, less than which
	// we consider the cluster to be detached.
	//
	// If MinAlivePeers is zero, then even an empty list of peers means that
	// cluster is attached.
	MinAlivePeers int

	// StrictGraceful expects only graceful connections closure. Non-graceful
	// closure of the connection leads to OnDetached() call.
	// After non-graceful peer reestablished connection, OnAttached will be
	// called.
	StrictGraceful bool

	// OnDetached is called when there are no enough alive peers or some peers
	// are broken or uninitialized.
	OnDetached func()

	// OnAttached is called when cluster get enough alive peers with no any
	// broken or uninitialized peer.
	OnAttached func()

	// OnStateChange is called when some peer from cluster changes its state.
	// It receives peer itself, number of its alive connections and the
	// sign of the broken state (e.g. when closed not gracefully). The last
	// argument is the total number of alive peers in the cluster.
	OnStateChange func(peer *Pool, alive int, broken bool, alivePeers int)

	// Logger contains logger interface.
	Logger Logger
	// contains filtered or unexported fields
}

MultiWatcher allows to watch on the state of the multiple pools.

The initial state of watcher is always detached.

func (*MultiWatcher) Init

func (w *MultiWatcher) Init()

func (*MultiWatcher) MarkInitialized

func (w *MultiWatcher) MarkInitialized(p *Pool)

MarkInitialized marsk given peer as initialized. It makes sense to call MarkInitialized() only when MarkNotInitialized() was called before.

func (*MultiWatcher) MarkNotInitialized

func (w *MultiWatcher) MarkNotInitialized(p *Pool)

MarkNotInitialized marks given peer as not initialized yet. That is, cluster will become detached if there are any not initialized members. Call to MarkNotInitialized() suggests further MarkInitialized() call.

func (*MultiWatcher) Remove

func (w *MultiWatcher) Remove(p *Pool)

Remove removes any data related to pool p. Note that it may lead watcher to constantly detached state, if some channel of p was closed gracelessly. That is, caller must call p.Shutdown() before calling w.Remove(p).

func (*MultiWatcher) Stop

func (w *MultiWatcher) Stop()

func (*MultiWatcher) Watch

func (w *MultiWatcher) Watch(p *Pool)

type PackMode

type PackMode int

PackMode defines encoding mode for integers and string/slice lengths. Mode is not recursive and applies only to the top-level element.

const (
	ModeDefault PackMode = 0
	ModeBER     PackMode = 1
)

type Packer

type Packer interface {
	IprotoPack(w []byte, mode PackMode) ([]byte, error)
}

type Packet

type Packet struct {
	Header Header
	Data   []byte
}

func ReadPacket

func ReadPacket(r io.Reader) (ret Packet, err error)

ReadPacket reads Packet from r.

func ReadPacketLimit

func ReadPacketLimit(r io.Reader, n uint32) (ret Packet, err error)

ReadPacketLimit reads Packet from r. Size of packet's payload is limited to be at most n.

func ResponseTo

func ResponseTo(p Packet, b []byte) Packet

ResponseTo constructs new Packet that is a response to p.

type PacketServer

type PacketServer struct {
	// contains filtered or unexported fields
}

PacketServer contains logic of handling iproto packets from datagram oriented network protocols such as "udp" or "unixgram".

func ListenPacket

func ListenPacket(network, addr string, config *PacketServerConfig) (*PacketServer, error)

func NewPacketServer

func NewPacketServer(conn net.PacketConn, config *PacketServerConfig) *PacketServer

func (*PacketServer) Close

func (p *PacketServer) Close()

func (*PacketServer) GetBytes

func (p *PacketServer) GetBytes(n int) []byte

func (*PacketServer) Init

func (p *PacketServer) Init() (err error)

func (*PacketServer) LocalAddr

func (p *PacketServer) LocalAddr() net.Addr

func (*PacketServer) PutBytes

func (p *PacketServer) PutBytes(bts []byte)

func (*PacketServer) Shutdown

func (p *PacketServer) Shutdown()

type PacketServerConfig

type PacketServerConfig struct {
	Handler Handler

	MaxTransmissionUnit int
	WriteQueueSize      int
	WriteTimeout        time.Duration

	ReadBufferSize int
	SizeLimit      uint32

	BytePool BytePool
	Logger   Logger

	ShutdownTimeout time.Duration

	OnPeerShutdown func(net.Addr)
}

func CopyPacketServerConfig

func CopyPacketServerConfig(c *PacketServerConfig) *PacketServerConfig

CopyPacketServerConfig returns deep copy of c. If c is nil, it returns new PacketServerConfig.

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool represents struct that could manage multiple iproto connections to one host. It establishes/reestablishes connections automatically.

func Dial

func Dial(ctx context.Context, network, addr string, config *PoolConfig) (pool *Pool, err error)

Dial creates new pool of iproto channel(s). It returns error if initialization of first channel was failed.

Callers should call pool.Close() or pool.Shutdown() methods when work with created pool is complete.

Given ctx will not be saved inside Pool. If ctx implements ctxlog.Context and config does not contains Logger, then DefaultLogger will be used with Prefix set to ctx.LogPrefix().

func NewPool

func NewPool(network, addr string, config *PoolConfig) *Pool

NewPool creates new pool of iproto connections at addr.

func (*Pool) Call

func (p *Pool) Call(ctx context.Context, method uint32, data []byte) ([]byte, error)

Call finds next alive channel in pool and returns its Call() result.

func (*Pool) CallNext

func (p *Pool) CallNext(ctx context.Context, method uint32, data []byte) (ch *Channel, resp []byte, err error)

CallNext finds next alive channel in pool and returns found channel and its Call() result.

func (*Pool) CancelDial

func (p *Pool) CancelDial()

CancelDial cancels current dial routine.

func (*Pool) Close

func (p *Pool) Close()

Close closes all underlying channels with channel.Close() method.

func (*Pool) CloseConnections

func (p *Pool) CloseConnections()

CloseConnections resets a list of online channels. It calls Close() on each of ex-online channel and returns when all calls are done.

func (*Pool) DialBackground

func (p *Pool) DialBackground() (<-chan error, error)

DialBackground initiates background dial for a new connection. If pool is configured to not RedialForever, then dial will end up after RedialTimeout config parameter.

It returns a chan which closure means that dial were finished. Non-nil error means that dial was not started.

Callers must take care of senseless dial when poll is full.

func (*Pool) DialForeground

func (p *Pool) DialForeground(ctx context.Context) (<-chan error, error)

DialForeground initiates background dial for a new connection. If pool is configured to not RedialForever, then dial will end up after context's deadline exceeded.

It returns a chan which closure means that dial were finished. Non-nil error means that dial was not started.

Callers must take care of senseless dial when poll is full.

func (*Pool) Done

func (p *Pool) Done() <-chan struct{}

Done returns channel which closure signaling that pool resources are freed and no more actions could be performed. Note that Done is useful only after pool.{Close,Shutdown} calls.

func (*Pool) DropConnections

func (p *Pool) DropConnections()

DropConnections resets a list of online channels. It calls Drop() on each of ex-online channel and returns when all calls are done.

func (*Pool) GetBytes

func (p *Pool) GetBytes(n int) []byte

GetBytes obtains bytes from the Channel config's byte pool.

func (*Pool) Hijack

func (p *Pool) Hijack(cb func(conn net.Conn, rbuf []byte))

Hijack resets a list of online channels. It calls callback cb for every ex-online channel with result of its Hijack() call. If Hijack returns error it does not call a callback.

Note that it is a caller responsibility to manage ownership of the returned channels that could be shared across goroutines by NextChannel(), Call(), Notify() and Send() calls.

func (*Pool) Init

func (p *Pool) Init(ctx context.Context) (err error)

Init initializes Pool and tries to create single iproto connection.

func (*Pool) InitAll

func (p *Pool) InitAll(ctx context.Context) (err error)

InitAll is the same as Init, except that it tries to get N connections, N = PoolConfig.Size || 1.

func (*Pool) NextChannel

func (p *Pool) NextChannel(ctx context.Context) (c *Channel, err error)

NextChannel returns next alive channel. It could try to establish new connection and create new Channel. If establishing of new connection is failed, it tries to wait some time and repeat.

Note that returned Channel could be closed while receiving it from the Pool. In this case additional NextChannel() may be made to get next alive channel.

func (*Pool) Notify

func (p *Pool) Notify(ctx context.Context, method uint32, data []byte) (err error)

Notify finds for alive channel in pool and make Notify() on it.

func (*Pool) OnClose

func (p *Pool) OnClose(cb func())

OnClose allows to run a callback after pool resources are freed and no more actions could be performed. Note that your cb could be delayed by other cb registered before. And vise versa – if you doing some hard long job, you could delay other callbacks registered before. If you want to be informed as soon as possible, use pool.Done(). Note that cb will be called only after pool.{Close,Shutdown} calls.

func (*Pool) Online

func (p *Pool) Online() []*Channel

Online returns current list of online Channels.

func (*Pool) PutBytes

func (p *Pool) PutBytes(bts []byte)

PutBytes reclaims bytes to the Channel config's byte pool.

func (*Pool) RemoteAddr

func (p *Pool) RemoteAddr() net.Addr

func (*Pool) ResetDialThrottle

func (p *Pool) ResetDialThrottle()

ResetDialThrottle resets the dial throttle timeout such that next dial attempt for a new connection will not be canceled due to throttled logic. That is, it works for pool which was configured with non-zero DialThrottle option.

func (*Pool) Send

func (p *Pool) Send(ctx context.Context, pkt Packet) (err error)

Send finds alive channel in pool and make Send() on it.

func (*Pool) SetDialThrottle

func (p *Pool) SetDialThrottle(m time.Time)

SetDialThrottle sets up time point before which underlying dialer will fail its dial attempts.

func (*Pool) Shutdown

func (p *Pool) Shutdown()

Shutdown closes all underlying channels by calling channel.Shutdown().

func (*Pool) ShutdownConnections

func (p *Pool) ShutdownConnections()

ShutdownConnections resets a list of online channels. It calls Shutdown() on each of ex-online channel and returns when all calls are done.

func (*Pool) Stats

func (p *Pool) Stats() (ret PoolStats)

Stats returns pool usage statistics.

func (*Pool) StoreConn

func (p *Pool) StoreConn(conn net.Conn) (err error)

StoreConn stores given connection inside the pool if there are space for it. Given ctx is used for Channel.Init() call. If err is non-nol given conn will be closed.

type PoolChange

type PoolChange struct {
	// Pool is a source of changes.
	Pool *Pool

	// Channel points to channel, that was opened/closed.
	Channel *Channel

	// Conn contains difference of connection count in pool.
	Conn int8
}

PoolChange represents pool state change.

type PoolChangeHandle

type PoolChangeHandle struct {
	// contains filtered or unexported fields
}

PoolChangeHandle helps to control PoolChange notifications flow.

func NewPoolChangeHandle

func NewPoolChangeHandle() *PoolChangeHandle

func NotifyPoolChange

func NotifyPoolChange(ch chan<- PoolChange, pools ...*Pool) *PoolChangeHandle

NotifyPoolChange makes every given pool to relay its changes into ch.

It will block sending to ch: the caller must ensure that ch has sufficient buffer space to not block the pool.

Caller should pass not initialized pool. That is, caller should not make iproto.Dial and then WatchPoolConnCount. The right way to do this is to make NewPool, followed by WatchPoolConnCount, optionally followed by pool.Init.

Note that if some pool config.ChannelConfig.Init returns error, given ch will not be notified about the established connection nor the closure of it.

It returns PoolChangeHandle that could be used to control notifications sent to ch.

func (*PoolChangeHandle) Stop

func (p *PoolChangeHandle) Stop()

func (*PoolChangeHandle) Stopped

func (p *PoolChangeHandle) Stopped() <-chan struct{}

type PoolConfig

type PoolConfig struct {
	// Size limits number of connections used by Pool.
	// When Size is zero, single connection is used.
	Size int

	// ChannelConfig will be passed to Channel constructor when new connection
	// established.
	ChannelConfig *ChannelConfig

	// NetDial allows to override standard net.Dial logic.
	NetDial func(ctx context.Context, network, addr string) (net.Conn, error)

	// ConntectTimeout is the maximum time spent on awaiting for alive iproto connection
	// for sending packet.
	ConnectTimeout time.Duration

	// DialTimeout is the maximum time spent on establishing new iproto connection.
	DialTimeout time.Duration

	// RedialInterval is used inside dial routine for delaying next attempt to establish
	// new iproto connection after previous attempt was failed.
	RedialInterval time.Duration

	// MaxRedialInterval is the maximum delay before next attempt to establish new iproto
	// connection is prepared. Note that RedialInterval is used as initial delay time,
	// and could be increased by every dial attempt up to MaxRedialInterval.
	MaxRedialInterval time.Duration

	// RedialTimeout is the maximum time spent in dial routine.
	// If RedialForever is true this parameter is not used.
	RedialTimeout time.Duration

	// RedialForever configures the underlying dialer to dial without any
	// timeout.
	RedialForever bool

	// FailOnCutoff prevents Pool of waiting for a dial for a new
	// Channel when there are no online Channels to proceed the request.
	// If this field is true, then any Call() Notify() or NextChannel() calls
	// will be failed with ErrCutoff error if no online Channels currently are
	// available.
	FailOnCutoff bool

	// DialThrottle allows to limit rate of the dialer routine execution.
	DialThrottle time.Duration

	// Logger is used to handle some log messages.
	// If nil, default logger is used.
	Logger Logger

	// If true, pool will not try to establish a new connection right after one
	// is closed.
	DisablePreDial bool

	// RateLimit is the limit parameter for golang.org/x/time/rate token bucket rate limiter.
	// A zero value causes the rate limiter to reject all events if `RateBurst' is nonzero.
	RateLimit rate.Limit

	// RateBurst is the burst parameter of time/rate token bucket rate limiter.
	// A zero value disables the rate limiter.
	RateBurst int

	// RateWait is used to determine the desired behaviour of request rate limiting
	// on the pool. A zero value sets the requests to rather be "policied" than "shaped",
	// and the corresponding function calls will return `RateError' caused by `ErrPolicied'.
	// when limiter is unable to satisfy the request. With a value of `true' the limiter will
	// rather provide "shaping" of the request flow, and the corresponding function calls
	// will either block until the request can be satisfied, or return at once if the
	// predicted wait time exceeds the context deadline. Cancellation of the context also
	// forces a premature return with an error.
	// See golang.org/x/time/rate documentation for .Allow() and .Wait() behaviour.
	RateWait bool
}

PoolConfig contains options for Pool configuration.

func CopyPoolConfig

func CopyPoolConfig(c *PoolConfig) (pc *PoolConfig)

CopyPoolConfig return deep copy of c. If c is nil, it returns new PoolConfig. Returned config always contains non-nil ChannelConfig.

type PoolStats

type PoolStats struct {
	ChannelStats

	DialCount  uint32 // total number of net.Dial calls
	DialErrors uint32 // total number of failed net.Dial calls
	WaitErrors uint32 // total number of failed expectation for free channel

	Online int // number of online channels
}

type RateError

type RateError struct {
	Err error
}

RateError provides the ability to determine if the request exceeds current pool's rate limit

func (*RateError) Error

func (e *RateError) Error() string

func (*RateError) Unwrap

func (e *RateError) Unwrap() error

type Sender

type Sender interface {
	Call(ctx context.Context, message uint32, data []byte) ([]byte, error)
	Notify(ctx context.Context, message uint32, data []byte) error
	Send(ctx context.Context, packet Packet) error
}

Sender represetns iproto packets sender in different forms.

type ServeMux

type ServeMux struct {
	// contains filtered or unexported fields
}

func NewServeMux

func NewServeMux() *ServeMux

func (*ServeMux) Handle

func (s *ServeMux) Handle(message uint32, handler Handler)

func (*ServeMux) Handler

func (s *ServeMux) Handler(message uint32) Handler

func (*ServeMux) ServeIProto

func (s *ServeMux) ServeIProto(ctx context.Context, c Conn, p Packet)

type Server

type Server struct {
	// Accept allow to rewrite default Channel creation logic.
	//
	// Normally, without setting Accept field, every accepted connection share
	// the same ChannelConfig. This could bring some trouble, when server
	// accepts two connections A and B, and handles packets from them with one
	// config.Handler, say, with N pooled goroutines. Then, if A will produce
	// huge stream of packets, B will not get fair amount of work time. The
	// better approach is to create separate handlers, each with its own pool.
	//
	// Note that if Accept returns error, server Serve() method will return
	// with that error.
	//
	// Note that returned Channel's Init() method will be called if err is
	// non-nil. Returned error from that Init() call will not be checked.
	Accept AcceptFn

	// ChannelConfig is used to initialize new Channel on every incoming
	// connection.
	//
	// Note that copy of config is shared across all channels.
	// To customize this behavior see Accept field of the Server.
	ChannelConfig *ChannelConfig

	// Log is used for write errors in serve process
	Log Logger

	// OnClose calls on channel close
	OnClose []func()

	// OnShutdown calls on channel shutdown
	OnShutdown []func()
}

Server contains options for serving IProto connections.

func (*Server) ListenAndServe

func (s *Server) ListenAndServe(ctx context.Context, network, addr string) error

func (*Server) Serve

func (s *Server) Serve(ctx context.Context, ln net.Listener) (err error)

Serve begins to accept connection from ln. It does not handles net.Error temporary cases.

Note that Serve() copies s.ChannelConfig once before starting accept loop.

type StreamReader

type StreamReader struct {
	Source    io.Reader
	SizeLimit uint32
	Alloc     BodyAllocator
	// contains filtered or unexported fields
}

StreamReader represents iproto stream reader.

func (*StreamReader) LastRead

func (b *StreamReader) LastRead() int

func (*StreamReader) ReadPacket

func (b *StreamReader) ReadPacket() (ret Packet, err error)

ReadPackets reads next packet.

type StreamWriter

type StreamWriter struct {
	Dest io.Writer
	// contains filtered or unexported fields
}

StreamWriter represents iproto stream writer.

func (*StreamWriter) WritePacket

func (b *StreamWriter) WritePacket(p Packet) (err error)

WritePacket writes p to the underlying writer.

type Unicast

type Unicast struct {
	*Cluster

	// Select contains logic of picking Pool from cluster's peers for further
	// Call/Notify call.
	// If Select is nil, then round-robin algorithm is used.
	Select func(context.Context, *Cluster) (*Pool, error)
	// contains filtered or unexported fields
}

Unicast wraps Cluster for sending messages to any single cluster's peers.

func (*Unicast) Call

func (u *Unicast) Call(ctx context.Context, method uint32, data []byte) ([]byte, error)

Call selects peer form cluster's peers and makes Call() on it.

func (*Unicast) CallNext

func (u *Unicast) CallNext(ctx context.Context, method uint32, data []byte) (peer *Pool, resp []byte, err error)

CallNext selects peer form cluster's peers and makes Call() on it.

func (*Unicast) Notify

func (u *Unicast) Notify(ctx context.Context, method uint32, data []byte) error

Notify selects peer form cluster's peers and makes Notify() on it.

func (*Unicast) Send

func (u *Unicast) Send(ctx context.Context, pkt Packet) error

Send selects peer form cluster's peers and makes Send() on it.

type Unpacker

type Unpacker interface {
	IprotoUnpack(r *bytes.Reader, mode PackMode) error
}

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL