ingest

package
v0.34.0-crescendo-prev... Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 12, 2024 License: AGPL-3.0 Imports: 20 Imported by: 3

Documentation

Overview

Package ingest implements an engine for receiving transactions that need to be packaged into a collection.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AddAddresses added in v0.33.30

func AddAddresses(r *AddressRateLimiter, addresses []flow.Address)

Util functions

func ParseAddresses added in v0.33.30

func ParseAddresses(addresses string) ([]flow.Address, error)

parse addresses string into a list of flow addresses

func RemoveAddresses added in v0.33.30

func RemoveAddresses(r *AddressRateLimiter, addresses []flow.Address)

Types

type AddressRateLimiter added in v0.33.30

type AddressRateLimiter struct {
	// contains filtered or unexported fields
}

AddressRateLimiter limits the rate of ingested transactions with a given payer address.

func NewAddressRateLimiter added in v0.33.30

func NewAddressRateLimiter(limit rate.Limit, burst int) *AddressRateLimiter

AddressRateLimiter limits the rate of ingested transactions with a given payer address. It allows the given "limit" amount messages per second with a "burst" amount of messages to be sent at once

for example, To config 1 message per 100 milliseconds, convert to per second first, which is 10 message per second, so limit is 10 ( rate.Limit(10) ), and burst is 1. Note: rate.Limit(0.1), burst = 1 means 1 message per 10 seconds, instead of 1 message per 100 milliseconds.

To config 3 message per minute, the per-second-basis is 0.05 (3/60), so the limit should be rate.Limit(0.05), and burst is 3.

Note: The rate limit configured for each node may differ from the effective network-wide rate limit for a given payer. In particular, the number of clusters and the message propagation factor will influence how the individual rate limit translates to a network-wide rate limit. For example, suppose we have 5 collection clusters and configure each Collection Node with a rate limit of 1 message per second. Then, the effective network-wide rate limit for a payer address would be *at least* 5 messages per second.

func (*AddressRateLimiter) AddAddress added in v0.33.30

func (r *AddressRateLimiter) AddAddress(address flow.Address)

AddAddress add an address to be rate limited

func (*AddressRateLimiter) Allow added in v0.33.30

func (r *AddressRateLimiter) Allow(address flow.Address) bool

Allow returns whether the given address should be allowed (not rate limited)

func (*AddressRateLimiter) GetAddresses added in v0.33.30

func (r *AddressRateLimiter) GetAddresses() []flow.Address

GetAddresses get the list of rate limited address

func (*AddressRateLimiter) GetLimitConfig added in v0.33.30

func (r *AddressRateLimiter) GetLimitConfig() (rate.Limit, int)

GetLimitConfig get the limit config

func (*AddressRateLimiter) IsRateLimited added in v0.33.30

func (r *AddressRateLimiter) IsRateLimited(address flow.Address) bool

IsRateLimited returns whether the given address should be rate limited

func (*AddressRateLimiter) RemoveAddress added in v0.33.30

func (r *AddressRateLimiter) RemoveAddress(address flow.Address)

RemoveAddress remove an address for being rate limited

func (*AddressRateLimiter) SetLimitConfig added in v0.33.30

func (r *AddressRateLimiter) SetLimitConfig(limit rate.Limit, burst int)

SetLimitConfig update the limit config Note all the existing limiters will be updated, and reset

type Config

type Config struct {
	// how much buffer time there is between a transaction being ingested by a
	// collection node and being included in a collection and block
	ExpiryBuffer uint
	// the maximum transaction gas limit
	MaxGasLimit uint64
	// whether or not we check that transaction scripts are parse-able
	CheckScriptsParse bool
	// how many extra nodes in the responsible cluster we propagate transactions to
	// (we always send to at least one)
	PropagationRedundancy uint
	// the maximum transaction byte size limit
	MaxTransactionByteSize uint64
	// maximum collection byte size, it acts as hard limit max for the tx size.
	MaxCollectionByteSize uint64
	// maximum number of un-processed transaction messages to hold in the queue.
	MaxMessageQueueSize uint
}

Config defines configuration for the transaction ingest engine.

func DefaultConfig

func DefaultConfig() Config

type Engine

type Engine struct {
	*component.ComponentManager
	// contains filtered or unexported fields
}

Engine is the transaction ingestion engine, which ensures that new transactions are delegated to the correct collection cluster, and prepared to be included in a collection.

func New

func New(
	log zerolog.Logger,
	net network.EngineRegistry,
	state protocol.State,
	engMetrics module.EngineMetrics,
	mempoolMetrics module.MempoolMetrics,
	colMetrics module.CollectionMetrics,
	me module.Local,
	chain flow.Chain,
	pools *epochs.TransactionPools,
	config Config,
	limiter *AddressRateLimiter,
) (*Engine, error)

New creates a new collection ingest engine.

func (*Engine) Process

func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, event interface{}) error

Process processes a transaction message from the network and enqueues the message. Validation and ingestion is performed in the processQueuedTransactions worker.

func (*Engine) ProcessTransaction added in v0.25.0

func (e *Engine) ProcessTransaction(tx *flow.TransactionBody) error

ProcessTransaction processes a transaction message submitted from another local component. The transaction is validated and ingested synchronously. This is used by the GRPC API, for transactions from Access nodes.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL