Documentation
¶
Index ¶
- Constants
- Variables
- func ChainConfig(tx kv.Getter) (*chain.Config, error)
- func LastSeenBlock(tx kv.Getter) (uint64, error)
- func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, ...)
- func PutChainConfig(tx kv.Putter, cc *chain.Config, buf []byte) error
- func PutLastSeenBlock(tx kv.Putter, n uint64, buf []byte) error
- func SortByNonceLess(a, b *metaTx) bool
- func StartGrpc(txPoolServer txpool_proto.TxpoolServer, miningServer txpool_proto.MiningServer, ...) (*grpc.Server, error)
- type BestQueue
- type BySenderAndNonce
- type Fetch
- type GrpcDisabled
- func (*GrpcDisabled) Add(ctx context.Context, request *txpool_proto.AddRequest) (*txpool_proto.AddReply, error)
- func (*GrpcDisabled) All(ctx context.Context, request *txpool_proto.AllRequest) (*txpool_proto.AllReply, error)
- func (*GrpcDisabled) FindUnknown(ctx context.Context, hashes *txpool_proto.TxHashes) (*txpool_proto.TxHashes, error)
- func (*GrpcDisabled) Nonce(ctx context.Context, request *txpool_proto.NonceRequest) (*txpool_proto.NonceReply, error)
- func (*GrpcDisabled) OnAdd(request *txpool_proto.OnAddRequest, server txpool_proto.Txpool_OnAddServer) error
- func (*GrpcDisabled) Pending(ctx context.Context, empty *emptypb.Empty) (*txpool_proto.PendingReply, error)
- func (*GrpcDisabled) Status(ctx context.Context, request *txpool_proto.StatusRequest) (*txpool_proto.StatusReply, error)
- func (*GrpcDisabled) Transactions(ctx context.Context, request *txpool_proto.TransactionsRequest) (*txpool_proto.TransactionsReply, error)
- func (*GrpcDisabled) Version(ctx context.Context, empty *emptypb.Empty) (*types2.VersionReply, error)
- type GrpcServer
- func (s *GrpcServer) Add(ctx context.Context, in *txpool_proto.AddRequest) (*txpool_proto.AddReply, error)
- func (s *GrpcServer) All(ctx context.Context, _ *txpool_proto.AllRequest) (*txpool_proto.AllReply, error)
- func (s *GrpcServer) FindUnknown(ctx context.Context, in *txpool_proto.TxHashes) (*txpool_proto.TxHashes, error)
- func (s *GrpcServer) Nonce(ctx context.Context, in *txpool_proto.NonceRequest) (*txpool_proto.NonceReply, error)
- func (s *GrpcServer) OnAdd(req *txpool_proto.OnAddRequest, stream txpool_proto.Txpool_OnAddServer) error
- func (s *GrpcServer) Pending(ctx context.Context, _ *emptypb.Empty) (*txpool_proto.PendingReply, error)
- func (s *GrpcServer) Status(_ context.Context, _ *txpool_proto.StatusRequest) (*txpool_proto.StatusReply, error)
- func (s *GrpcServer) Transactions(ctx context.Context, in *txpool_proto.TransactionsRequest) (*txpool_proto.TransactionsReply, error)
- func (s *GrpcServer) Version(context.Context, *emptypb.Empty) (*types2.VersionReply, error)
- type MockSentry
- func (ms *MockSentry) HandShake(context.Context, *emptypb.Empty) (*sentry.HandShakeReply, error)
- func (ms *MockSentry) Messages(req *sentry.MessagesRequest, stream sentry.Sentry_MessagesServer) error
- func (ms *MockSentry) PeerEvents(req *sentry.PeerEventsRequest, stream sentry.Sentry_PeerEventsServer) error
- func (ms *MockSentry) Send(req *sentry.InboundMessage) (errs []error)
- func (ms *MockSentry) SetStatus(context.Context, *sentry.StatusData) (*sentry.SetStatusReply, error)
- type NewSlotsStreams
- type PendingPool
- func (p *PendingPool) Add(i *metaTx, logger log.Logger)
- func (p *PendingPool) Best() *metaTx
- func (p *PendingPool) DebugPrint(prefix string)
- func (p *PendingPool) EnforceBestInvariants()
- func (p *PendingPool) EnforceWorstInvariants()
- func (p *PendingPool) Len() int
- func (p *PendingPool) PopWorst() *metaTx
- func (p *PendingPool) Remove(i *metaTx)
- func (p *PendingPool) Updated(mt *metaTx)
- func (p *PendingPool) Worst() *metaTx
- type Pool
- type Send
- func (f *Send) AnnouncePooledTxs(types []byte, sizes []uint32, hashes types2.Hashes, maxPeers uint64) (hashSentTo []int)
- func (f *Send) BroadcastPooledTxs(rlps [][]byte, maxPeers uint64) (txSentTo []int)
- func (f *Send) PropagatePooledTxsToPeersList(peers []types2.PeerID, types []byte, sizes []uint32, hashes []byte)
- func (f *Send) SetWaitGroup(wg *sync.WaitGroup)
- type SentryClient
- type StateChangesClient
- type SubPool
- func (p *SubPool) Add(i *metaTx, logger log.Logger)
- func (p *SubPool) Best() *metaTx
- func (p *SubPool) DebugPrint(prefix string)
- func (p *SubPool) EnforceInvariants()
- func (p *SubPool) Len() int
- func (p *SubPool) PopBest() *metaTx
- func (p *SubPool) PopWorst() *metaTx
- func (p *SubPool) Remove(i *metaTx)
- func (p *SubPool) Updated(i *metaTx)
- func (p *SubPool) Worst() *metaTx
- type SubPoolMarker
- type SubPoolType
- type TxPool
- func (p *TxPool) AddLocalTxs(ctx context.Context, newTransactions types.TxSlots, tx kv.Tx) ([]txpoolcfg.DiscardReason, error)
- func (p *TxPool) AddNewGoodPeer(peerID types.PeerID)
- func (p *TxPool) AddRemoteTxs(_ context.Context, newTxs types.TxSlots)
- func (p *TxPool) AppendAllAnnouncements(types []byte, sizes []uint32, hashes []byte) ([]byte, []uint32, []byte)
- func (p *TxPool) AppendLocalAnnouncements(types []byte, sizes []uint32, hashes []byte) ([]byte, []uint32, []byte)
- func (p *TxPool) AppendRemoteAnnouncements(types []byte, sizes []uint32, hashes []byte) ([]byte, []uint32, []byte)
- func (p *TxPool) CountContent() (int, int, int)
- func (p *TxPool) FilterKnownIdHashes(tx kv.Tx, hashes types.Hashes) (unknownHashes types.Hashes, err error)
- func (p *TxPool) GetKnownBlobTxn(tx kv.Tx, hash []byte) (*metaTx, error)
- func (p *TxPool) GetRlp(tx kv.Tx, hash []byte) ([]byte, error)
- func (p *TxPool) IdHashKnown(tx kv.Tx, hash []byte) (bool, error)
- func (p *TxPool) IsLocal(idHash []byte) bool
- func (p *TxPool) NonceFromAddress(addr [20]byte) (nonce uint64, inPool bool)
- func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, ...) error
- func (p *TxPool) PeekBest(n uint16, txs *types.TxsRlp, tx kv.Tx, ...) (bool, error)
- func (p *TxPool) ResetYieldedStatus()
- func (p *TxPool) Started() bool
- func (p *TxPool) ValidateSerializedTxn(serializedTxn []byte) error
- func (p *TxPool) YieldBest(n uint16, txs *types.TxsRlp, tx kv.Tx, ...) (bool, int, error)
- type WorstQueue
Constants ¶
const ( NoNonceGaps = 0b010000 EnoughBalance = 0b001000 NotTooMuchGas = 0b000100 EnoughFeeCapBlock = 0b000010 IsLocal = 0b000001 BaseFeePoolBits = NoNonceGaps + EnoughBalance + NotTooMuchGas )
Variables ¶
var ErrPoolDisabled = fmt.Errorf("TxPool Disabled")
var PoolChainConfigKey = []byte("chain_config")
var PoolLastSeenBlockKey = []byte("last_seen_block")
var PoolPendingBaseFeeKey = []byte("pending_base_fee")
var PoolPendingBlobFeeKey = []byte("pending_blob_fee")
var TxPoolAPIVersion = &types2.VersionReply{Major: 1, Minor: 0, Patch: 0}
TxPoolAPIVersion
Functions ¶
func MainLoop ¶
func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs chan types.Announcements, send *Send, newSlotsStreams *NewSlotsStreams, notifyMiningAboutNewSlots func())
MainLoop - does: send pending byHash to p2p:
- new byHash
- all pooled byHash to recently connected peers
- all local pooled byHash to random peers periodically
promote/demote transactions reorgs
func SortByNonceLess ¶
func SortByNonceLess(a, b *metaTx) bool
func StartGrpc ¶
func StartGrpc(txPoolServer txpool_proto.TxpoolServer, miningServer txpool_proto.MiningServer, addr string, creds *credentials.TransportCredentials, logger log.Logger) (*grpc.Server, error)
Types ¶
type BySenderAndNonce ¶
type BySenderAndNonce struct {
// contains filtered or unexported fields
}
BySenderAndNonce - designed to perform most expensive operation in TxPool: "recalculate all ephemeral fields of all transactions" by algo
- for all senders - iterate over all transactions in nonce growing order
Performances decisions:
- All senders stored inside 1 large BTree - because iterate over 1 BTree is faster than over map[senderId]BTree
- sortByNonce used as non-pointer wrapper - because iterate over BTree of pointers is 2x slower
type Fetch ¶
type Fetch struct {
// contains filtered or unexported fields
}
Fetch connects to sentry and implements eth/66 protocol regarding the transaction messages. It tries to "prime" the sentry with StatusData message containing given genesis hash and list of forks, but with zero max block and total difficulty Sentry should have a logic not to overwrite statusData with messages from tx pool
func NewFetch ¶
func NewFetch(ctx context.Context, sentryClients []direct.SentryClient, pool Pool, stateChangesClient StateChangesClient, coreDB kv.RoDB, db kv.RwDB, chainID uint256.Int, logger log.Logger) *Fetch
NewFetch creates a new fetch object that will work with given sentry clients. Since the SentryClient here is an interface, it is suitable for mocking in tests (mock will need to implement all the functions of the SentryClient interface).
func (*Fetch) ConnectCore ¶
func (f *Fetch) ConnectCore()
func (*Fetch) ConnectSentries ¶
func (f *Fetch) ConnectSentries()
ConnectSentries initialises connection to the sentry
func (*Fetch) SetWaitGroup ¶
type GrpcDisabled ¶
type GrpcDisabled struct {
txpool_proto.UnimplementedTxpoolServer
}
func (*GrpcDisabled) Add ¶
func (*GrpcDisabled) Add(ctx context.Context, request *txpool_proto.AddRequest) (*txpool_proto.AddReply, error)
func (*GrpcDisabled) All ¶
func (*GrpcDisabled) All(ctx context.Context, request *txpool_proto.AllRequest) (*txpool_proto.AllReply, error)
func (*GrpcDisabled) FindUnknown ¶
func (*GrpcDisabled) FindUnknown(ctx context.Context, hashes *txpool_proto.TxHashes) (*txpool_proto.TxHashes, error)
func (*GrpcDisabled) Nonce ¶
func (*GrpcDisabled) Nonce(ctx context.Context, request *txpool_proto.NonceRequest) (*txpool_proto.NonceReply, error)
func (*GrpcDisabled) OnAdd ¶
func (*GrpcDisabled) OnAdd(request *txpool_proto.OnAddRequest, server txpool_proto.Txpool_OnAddServer) error
func (*GrpcDisabled) Pending ¶
func (*GrpcDisabled) Pending(ctx context.Context, empty *emptypb.Empty) (*txpool_proto.PendingReply, error)
func (*GrpcDisabled) Status ¶
func (*GrpcDisabled) Status(ctx context.Context, request *txpool_proto.StatusRequest) (*txpool_proto.StatusReply, error)
func (*GrpcDisabled) Transactions ¶
func (*GrpcDisabled) Transactions(ctx context.Context, request *txpool_proto.TransactionsRequest) (*txpool_proto.TransactionsReply, error)
func (*GrpcDisabled) Version ¶
func (*GrpcDisabled) Version(ctx context.Context, empty *emptypb.Empty) (*types2.VersionReply, error)
type GrpcServer ¶
type GrpcServer struct { txpool_proto.UnimplementedTxpoolServer NewSlotsStreams *NewSlotsStreams // contains filtered or unexported fields }
func NewGrpcServer ¶
func (*GrpcServer) Add ¶
func (s *GrpcServer) Add(ctx context.Context, in *txpool_proto.AddRequest) (*txpool_proto.AddReply, error)
func (*GrpcServer) All ¶
func (s *GrpcServer) All(ctx context.Context, _ *txpool_proto.AllRequest) (*txpool_proto.AllReply, error)
func (*GrpcServer) FindUnknown ¶
func (s *GrpcServer) FindUnknown(ctx context.Context, in *txpool_proto.TxHashes) (*txpool_proto.TxHashes, error)
func (*GrpcServer) Nonce ¶
func (s *GrpcServer) Nonce(ctx context.Context, in *txpool_proto.NonceRequest) (*txpool_proto.NonceReply, error)
returns nonce for address
func (*GrpcServer) OnAdd ¶
func (s *GrpcServer) OnAdd(req *txpool_proto.OnAddRequest, stream txpool_proto.Txpool_OnAddServer) error
func (*GrpcServer) Pending ¶
func (s *GrpcServer) Pending(ctx context.Context, _ *emptypb.Empty) (*txpool_proto.PendingReply, error)
func (*GrpcServer) Status ¶
func (s *GrpcServer) Status(_ context.Context, _ *txpool_proto.StatusRequest) (*txpool_proto.StatusReply, error)
func (*GrpcServer) Transactions ¶
func (s *GrpcServer) Transactions(ctx context.Context, in *txpool_proto.TransactionsRequest) (*txpool_proto.TransactionsReply, error)
func (*GrpcServer) Version ¶
func (s *GrpcServer) Version(context.Context, *emptypb.Empty) (*types2.VersionReply, error)
type MockSentry ¶
type MockSentry struct { *sentry.SentryServerMock StreamWg sync.WaitGroup // contains filtered or unexported fields }
func NewMockSentry ¶
func NewMockSentry(ctx context.Context) *MockSentry
func (*MockSentry) HandShake ¶
func (ms *MockSentry) HandShake(context.Context, *emptypb.Empty) (*sentry.HandShakeReply, error)
func (*MockSentry) Messages ¶
func (ms *MockSentry) Messages(req *sentry.MessagesRequest, stream sentry.Sentry_MessagesServer) error
func (*MockSentry) PeerEvents ¶
func (ms *MockSentry) PeerEvents(req *sentry.PeerEventsRequest, stream sentry.Sentry_PeerEventsServer) error
func (*MockSentry) Send ¶
func (ms *MockSentry) Send(req *sentry.InboundMessage) (errs []error)
func (*MockSentry) SetStatus ¶
func (ms *MockSentry) SetStatus(context.Context, *sentry.StatusData) (*sentry.SetStatusReply, error)
type NewSlotsStreams ¶
type NewSlotsStreams struct {
// contains filtered or unexported fields
}
NewSlotsStreams - it's safe to use this class as non-pointer
func (*NewSlotsStreams) Add ¶
func (s *NewSlotsStreams) Add(stream txpool_proto.Txpool_OnAddServer) (remove func())
func (*NewSlotsStreams) Broadcast ¶
func (s *NewSlotsStreams) Broadcast(reply *txpool_proto.OnAddReply, logger log.Logger)
type PendingPool ¶
type PendingPool struct {
// contains filtered or unexported fields
}
PendingPool - is different from other pools - it's best is Slice instead of Heap It's more expensive to maintain "slice sort" invariant, but it allow do cheap copy of pending.best slice for mining (because we consider txs and metaTx are immutable)
func NewPendingSubPool ¶
func NewPendingSubPool(t SubPoolType, limit int) *PendingPool
func (*PendingPool) Add ¶
func (p *PendingPool) Add(i *metaTx, logger log.Logger)
func (*PendingPool) Best ¶
func (p *PendingPool) Best() *metaTx
func (*PendingPool) DebugPrint ¶
func (p *PendingPool) DebugPrint(prefix string)
func (*PendingPool) EnforceBestInvariants ¶
func (p *PendingPool) EnforceBestInvariants()
func (*PendingPool) EnforceWorstInvariants ¶
func (p *PendingPool) EnforceWorstInvariants()
func (*PendingPool) Len ¶
func (p *PendingPool) Len() int
func (*PendingPool) PopWorst ¶
func (p *PendingPool) PopWorst() *metaTx
func (*PendingPool) Remove ¶
func (p *PendingPool) Remove(i *metaTx)
func (*PendingPool) Updated ¶
func (p *PendingPool) Updated(mt *metaTx)
func (*PendingPool) Worst ¶
func (p *PendingPool) Worst() *metaTx
type Pool ¶
type Pool interface { ValidateSerializedTxn(serializedTxn []byte) error // Handle 3 main events - new remote txs from p2p, new local txs from RPC, new blocks from execution layer AddRemoteTxs(ctx context.Context, newTxs types.TxSlots) AddLocalTxs(ctx context.Context, newTxs types.TxSlots, tx kv.Tx) ([]txpoolcfg.DiscardReason, error) OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs, minedTxs types.TxSlots, tx kv.Tx) error // IdHashKnown check whether transaction with given Id hash is known to the pool IdHashKnown(tx kv.Tx, hash []byte) (bool, error) FilterKnownIdHashes(tx kv.Tx, hashes types.Hashes) (unknownHashes types.Hashes, err error) Started() bool GetRlp(tx kv.Tx, hash []byte) ([]byte, error) GetKnownBlobTxn(tx kv.Tx, hash []byte) (*metaTx, error) AddNewGoodPeer(peerID types.PeerID) }
Pool is interface for the transaction pool This interface exists for the convenience of testing, and not yet because there are multiple implementations
type Send ¶
type Send struct {
// contains filtered or unexported fields
}
Send - does send concrete P2P messages to Sentry. Same as Fetch but for outbound traffic does not initiate any messages by self
func (*Send) AnnouncePooledTxs ¶
func (*Send) BroadcastPooledTxs ¶
Broadcast given RLPs to random peers
func (*Send) PropagatePooledTxsToPeersList ¶
func (*Send) SetWaitGroup ¶
type SentryClient ¶
type SentryClient interface { sentry.SentryClient Protocol() uint }
type StateChangesClient ¶
type StateChangesClient interface {
StateChanges(ctx context.Context, in *remote.StateChangeRequest, opts ...grpc.CallOption) (remote.KV_StateChangesClient, error)
}
type SubPool ¶
type SubPool struct {
// contains filtered or unexported fields
}
func NewSubPool ¶
func NewSubPool(t SubPoolType, limit int) *SubPool
func (*SubPool) DebugPrint ¶
func (*SubPool) EnforceInvariants ¶
func (p *SubPool) EnforceInvariants()
type SubPoolMarker ¶
type SubPoolMarker uint8
SubPoolMarker is an ordered bitset of five bits that's used to sort transactions into sub-pools. Bits meaning: 1. Absence of nonce gaps. Set to 1 for transactions whose nonce is N, state nonce for the sender is M, and there are transactions for all nonces between M and N from the same sender. Set to 0 is the transaction's nonce is divided from the state nonce by one or more nonce gaps. 2. Sufficient balance for gas. Set to 1 if the balance of sender's account in the state is B, nonce of the sender in the state is M, nonce of the transaction is N, and the sum of feeCap x gasLimit + transferred_value of all transactions from this sender with nonces N+1 ... M is no more than B. Set to 0 otherwise. In other words, this bit is set if there is currently a guarantee that the transaction and all its required prior transactions will be able to pay for gas. 3. Not too much gas: Set to 1 if the transaction doesn't use too much gas 4. Dynamic fee requirement. Set to 1 if feeCap of the transaction is no less than baseFee of the currently pending block. Set to 0 otherwise. 5. Local transaction. Set to 1 if transaction is local.
type SubPoolType ¶
type SubPoolType uint8
const BaseFeeSubPool SubPoolType = 2
const PendingSubPool SubPoolType = 1
const QueuedSubPool SubPoolType = 3
func (SubPoolType) String ¶
func (sp SubPoolType) String() string
type TxPool ¶
type TxPool struct {
// contains filtered or unexported fields
}
TxPool - holds all pool-related data structures and lock-based tiny methods most of logic implemented by pure tests-friendly functions
txpool doesn't start any goroutines - "leave concurrency to user" design txpool has no DB-TX fields - "leave db transactions management to user" design txpool has _chainDB field - but it must maximize local state cache hit-rate - and perform minimum _chainDB transactions
It preserve TxSlot objects immutable
func (*TxPool) AddLocalTxs ¶
func (*TxPool) AddNewGoodPeer ¶
func (*TxPool) AddRemoteTxs ¶
func (*TxPool) AppendAllAnnouncements ¶
func (*TxPool) AppendLocalAnnouncements ¶
func (*TxPool) AppendRemoteAnnouncements ¶
func (*TxPool) FilterKnownIdHashes ¶
func (*TxPool) GetKnownBlobTxn ¶
func (*TxPool) NonceFromAddress ¶
func (*TxPool) OnNewBlock ¶
func (*TxPool) ResetYieldedStatus ¶
func (p *TxPool) ResetYieldedStatus()
func (*TxPool) ValidateSerializedTxn ¶
Check that that the serialized txn should not exceed a certain max size
type WorstQueue ¶
type WorstQueue struct {
// contains filtered or unexported fields
}
func (WorstQueue) Len ¶
func (p WorstQueue) Len() int
func (WorstQueue) Less ¶
func (p WorstQueue) Less(i, j int) bool
func (*WorstQueue) Pop ¶
func (p *WorstQueue) Pop() interface{}
func (*WorstQueue) Push ¶
func (p *WorstQueue) Push(x interface{})
func (WorstQueue) Swap ¶
func (p WorstQueue) Swap(i, j int)