Documentation ¶
Overview ¶
Package ingest implements an engine for receiving transactions that need to be packaged into a collection.
Index ¶
- func AddAddresses(r *AddressRateLimiter, addresses []flow.Address)
- func ParseAddresses(addresses string) ([]flow.Address, error)
- func RemoveAddresses(r *AddressRateLimiter, addresses []flow.Address)
- type AddressRateLimiter
- func (r *AddressRateLimiter) AddAddress(address flow.Address)
- func (r *AddressRateLimiter) Allow(address flow.Address) bool
- func (r *AddressRateLimiter) GetAddresses() []flow.Address
- func (r *AddressRateLimiter) GetLimitConfig() (rate.Limit, int)
- func (r *AddressRateLimiter) IsRateLimited(address flow.Address) bool
- func (r *AddressRateLimiter) RemoveAddress(address flow.Address)
- func (r *AddressRateLimiter) SetLimitConfig(limit rate.Limit, burst int)
- type Config
- type Engine
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AddAddresses ¶ added in v0.33.30
func AddAddresses(r *AddressRateLimiter, addresses []flow.Address)
Util functions
func ParseAddresses ¶ added in v0.33.30
parse addresses string into a list of flow addresses
func RemoveAddresses ¶ added in v0.33.30
func RemoveAddresses(r *AddressRateLimiter, addresses []flow.Address)
Types ¶
type AddressRateLimiter ¶ added in v0.33.30
type AddressRateLimiter struct {
// contains filtered or unexported fields
}
AddressRateLimiter limits the rate of ingested transactions with a given payer address.
func NewAddressRateLimiter ¶ added in v0.33.30
func NewAddressRateLimiter(limit rate.Limit, burst int) *AddressRateLimiter
AddressRateLimiter limits the rate of ingested transactions with a given payer address. It allows the given "limit" amount messages per second with a "burst" amount of messages to be sent at once
for example, To config 1 message per 100 milliseconds, convert to per second first, which is 10 message per second, so limit is 10 ( rate.Limit(10) ), and burst is 1. Note: rate.Limit(0.1), burst = 1 means 1 message per 10 seconds, instead of 1 message per 100 milliseconds.
To config 3 message per minute, the per-second-basis is 0.05 (3/60), so the limit should be rate.Limit(0.05), and burst is 3.
Note: The rate limit configured for each node may differ from the effective network-wide rate limit for a given payer. In particular, the number of clusters and the message propagation factor will influence how the individual rate limit translates to a network-wide rate limit. For example, suppose we have 5 collection clusters and configure each Collection Node with a rate limit of 1 message per second. Then, the effective network-wide rate limit for a payer address would be *at least* 5 messages per second.
func (*AddressRateLimiter) AddAddress ¶ added in v0.33.30
func (r *AddressRateLimiter) AddAddress(address flow.Address)
AddAddress add an address to be rate limited
func (*AddressRateLimiter) Allow ¶ added in v0.33.30
func (r *AddressRateLimiter) Allow(address flow.Address) bool
Allow returns whether the given address should be allowed (not rate limited)
func (*AddressRateLimiter) GetAddresses ¶ added in v0.33.30
func (r *AddressRateLimiter) GetAddresses() []flow.Address
GetAddresses get the list of rate limited address
func (*AddressRateLimiter) GetLimitConfig ¶ added in v0.33.30
func (r *AddressRateLimiter) GetLimitConfig() (rate.Limit, int)
GetLimitConfig get the limit config
func (*AddressRateLimiter) IsRateLimited ¶ added in v0.33.30
func (r *AddressRateLimiter) IsRateLimited(address flow.Address) bool
IsRateLimited returns whether the given address should be rate limited
func (*AddressRateLimiter) RemoveAddress ¶ added in v0.33.30
func (r *AddressRateLimiter) RemoveAddress(address flow.Address)
RemoveAddress remove an address for being rate limited
func (*AddressRateLimiter) SetLimitConfig ¶ added in v0.33.30
func (r *AddressRateLimiter) SetLimitConfig(limit rate.Limit, burst int)
SetLimitConfig update the limit config Note all the existing limiters will be updated, and reset
type Config ¶
type Config struct { // how much buffer time there is between a transaction being ingested by a // collection node and being included in a collection and block ExpiryBuffer uint // the maximum transaction gas limit MaxGasLimit uint64 // whether or not we check that transaction scripts are parse-able CheckScriptsParse bool // how many extra nodes in the responsible cluster we propagate transactions to // (we always send to at least one) PropagationRedundancy uint // the maximum transaction byte size limit MaxTransactionByteSize uint64 // maximum collection byte size, it acts as hard limit max for the tx size. MaxCollectionByteSize uint64 // maximum number of un-processed transaction messages to hold in the queue. MaxMessageQueueSize uint }
Config defines configuration for the transaction ingest engine.
func DefaultConfig ¶
func DefaultConfig() Config
type Engine ¶
type Engine struct { *component.ComponentManager // contains filtered or unexported fields }
Engine is the transaction ingestion engine, which ensures that new transactions are delegated to the correct collection cluster, and prepared to be included in a collection.
func New ¶
func New( log zerolog.Logger, net network.EngineRegistry, state protocol.State, engMetrics module.EngineMetrics, mempoolMetrics module.MempoolMetrics, colMetrics module.CollectionMetrics, me module.Local, chain flow.Chain, pools *epochs.TransactionPools, config Config, limiter *AddressRateLimiter, ) (*Engine, error)
New creates a new collection ingest engine.
func (*Engine) Process ¶
func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, event interface{}) error
Process processes a transaction message from the network and enqueues the message. Validation and ingestion is performed in the processQueuedTransactions worker.
func (*Engine) ProcessTransaction ¶ added in v0.25.0
func (e *Engine) ProcessTransaction(tx *flow.TransactionBody) error
ProcessTransaction processes a transaction message submitted from another local component. The transaction is validated and ingested synchronously. This is used by the GRPC API, for transactions from Access nodes.