Documentation ¶
Overview ¶
Package flowcontrol implements a client side flow control mechanism
Index ¶
- Constants
- type ClientManager
- type ClientNode
- func (node *ClientNode) AcceptRequest(reqID, index, maxCost uint64) (accepted bool, bufShort uint64, priority int64)
- func (node *ClientNode) BufferStatus() (uint64, uint64)
- func (node *ClientNode) Disconnect()
- func (node *ClientNode) Freeze()
- func (node *ClientNode) OneTimeCost(cost uint64)
- func (node *ClientNode) RequestProcessed(reqID, index, maxCost, realCost uint64) uint64
- func (node *ClientNode) UpdateParams(params ServerParams)
- type PieceWiseLinear
- type ServerNode
- func (node *ServerNode) CanSend(maxCost uint64) (time.Duration, float64)
- func (node *ServerNode) DumpLogs()
- func (node *ServerNode) QueuedRequest(reqID, maxCost uint64)
- func (node *ServerNode) ReceivedReply(reqID, bv uint64)
- func (node *ServerNode) ResumeFreeze(bv uint64)
- func (node *ServerNode) UpdateParams(params ServerParams)
- type ServerParams
Constants ¶
const ( // DecParamDelay is applied at server side when decreasing capacity in order to // avoid a buffer underrun error due to requests sent by the client before // receiving the capacity update announcement DecParamDelay = time.Second * 2 )
const FixedPointMultiplier = 1000000
FixedPointMultiplier is applied to the recharge integrator and the recharge curve.
Note: fixed point arithmetic is required for the integrator because it is a constantly increasing value that can wrap around int64 limits (which behavior is also supported by the priority queue). A floating point value would gradually lose precision in this application. The recharge curve and all recharge values are encoded as fixed point because sumRecharge is frequently updated by adding or subtracting individual recharge values and perfect precision is required.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ClientManager ¶
type ClientManager struct {
// contains filtered or unexported fields
}
ClientManager controls the capacity assigned to the clients of a server. Since ServerParams guarantee a safe lower estimate for processable requests even in case of all clients being active, ClientManager calculates a corrigated buffer value and usually allows a higher remaining buffer value to be returned with each reply.
func NewClientManager ¶
func NewClientManager(curve PieceWiseLinear, clock mclock.Clock) *ClientManager
NewClientManager returns a new client manager. Client manager enhances flow control performance by allowing client buffers to recharge quicker than the minimum guaranteed recharge rate if possible. The sum of all minimum recharge rates (sumRecharge) is updated each time a clients starts or finishes buffer recharging. Then an adjusted total recharge rate is calculated using a piecewise linear recharge curve:
totalRecharge = curve(sumRecharge) (totalRecharge >= sumRecharge is enforced)
Then the "bonus" buffer recharge is distributed between currently recharging clients proportionally to their minimum recharge rates.
Note: total recharge is proportional to the average number of parallel running serving threads. A recharge value of 1000000 corresponds to one thread in average. The maximum number of allowed serving threads should always be considerably higher than the targeted average number.
Note 2: although it is possible to specify a curve allowing the total target recharge starting from zero sumRecharge, it makes sense to add a linear ramp starting from zero in order to not let a single low-priority client use up the entire server capacity and thus ensure quick availability for others at any moment.
func (*ClientManager) SetCapacityLimits ¶
func (cm *ClientManager) SetCapacityLimits(min, max, raiseThreshold uint64)
SetCapacityRaiseThreshold sets a threshold value used for raising capFactor. Either if the difference between total allowed and connected capacity is less than this threshold or if their ratio is less than capacityRaiseThresholdRatio then capFactor is allowed to slowly raise.
func (*ClientManager) SetRechargeCurve ¶
func (cm *ClientManager) SetRechargeCurve(curve PieceWiseLinear)
SetRechargeCurve updates the recharge curve
func (*ClientManager) SubscribeTotalCapacity ¶
func (cm *ClientManager) SubscribeTotalCapacity(ch chan uint64) uint64
SubscribeTotalCapacity returns all future updates to the total capacity value through a channel and also returns the current value
type ClientNode ¶
type ClientNode struct {
// contains filtered or unexported fields
}
ClientNode is the flow control system's representation of a client (used in server mode only)
func NewClientNode ¶
func NewClientNode(cm *ClientManager, params ServerParams) *ClientNode
NewClientNode returns a new ClientNode
func (*ClientNode) AcceptRequest ¶
func (node *ClientNode) AcceptRequest(reqID, index, maxCost uint64) (accepted bool, bufShort uint64, priority int64)
AcceptRequest returns whether a new request can be accepted and the missing buffer amount if it was rejected due to a buffer underrun. If accepted, maxCost is deducted from the flow control buffer.
func (*ClientNode) BufferStatus ¶
func (node *ClientNode) BufferStatus() (uint64, uint64)
BufferStatus returns the current buffer value and limit
func (*ClientNode) Disconnect ¶
func (node *ClientNode) Disconnect()
Disconnect should be called when a client is disconnected
func (*ClientNode) Freeze ¶
func (node *ClientNode) Freeze()
Freeze notifies the client manager about a client freeze event in which case the total capacity allowance is slightly reduced.
func (*ClientNode) OneTimeCost ¶
func (node *ClientNode) OneTimeCost(cost uint64)
OneTimeCost subtracts the given amount from the node's buffer.
Note: this call can take the buffer into the negative region internally. In this case zero buffer value is returned by exported calls and no requests are accepted.
func (*ClientNode) RequestProcessed ¶
func (node *ClientNode) RequestProcessed(reqID, index, maxCost, realCost uint64) uint64
RequestProcessed should be called when the request has been processed
func (*ClientNode) UpdateParams ¶
func (node *ClientNode) UpdateParams(params ServerParams)
UpdateParams updates the flow control parameters of a client node
type PieceWiseLinear ¶
type PieceWiseLinear []struct{ X, Y uint64 }
PieceWiseLinear is used to describe recharge curves
func (PieceWiseLinear) Valid ¶
func (pwl PieceWiseLinear) Valid() bool
Valid returns true if the X coordinates of the curve points are non-strictly monotonic
func (PieceWiseLinear) ValueAt ¶
func (pwl PieceWiseLinear) ValueAt(x uint64) float64
ValueAt returns the curve's value at a given point
type ServerNode ¶
type ServerNode struct {
// contains filtered or unexported fields
}
ServerNode is the flow control system's representation of a server (used in client mode only)
func NewServerNode ¶
func NewServerNode(params ServerParams, clock mclock.Clock) *ServerNode
NewServerNode returns a new ServerNode
func (*ServerNode) CanSend ¶
func (node *ServerNode) CanSend(maxCost uint64) (time.Duration, float64)
CanSend returns the minimum waiting time required before sending a request with the given maximum estimated cost. Second return value is the relative estimated buffer level after sending the request (divided by BufLimit).
func (*ServerNode) DumpLogs ¶
func (node *ServerNode) DumpLogs()
DumpLogs dumps the event log if logging is used
func (*ServerNode) QueuedRequest ¶
func (node *ServerNode) QueuedRequest(reqID, maxCost uint64)
QueuedRequest should be called when the request has been assigned to the given server node, before putting it in the send queue. It is mandatory that requests are sent in the same order as the QueuedRequest calls are made.
func (*ServerNode) ReceivedReply ¶
func (node *ServerNode) ReceivedReply(reqID, bv uint64)
ReceivedReply adjusts estimated buffer value according to the value included in the latest request reply.
func (*ServerNode) ResumeFreeze ¶
func (node *ServerNode) ResumeFreeze(bv uint64)
ResumeFreeze cleans all pending requests and sets the buffer estimate to the reported value after resuming from a frozen state
func (*ServerNode) UpdateParams ¶
func (node *ServerNode) UpdateParams(params ServerParams)
UpdateParams updates the flow control parameters of the node