Documentation
¶
Index ¶
- func GrpcClient(ctx context.Context, sentryAddr string) (*direct.SentryClientRemote, error)
- func MakeInboundMessage() *proto_sentry.InboundMessage
- func SentryReconnectAndPumpStreamLoop[TMessage interface{}](ctx context.Context, sentryClient direct.SentryClient, ...)
- type MessageFactory
- type MessageHandler
- type MultiClient
- func (cs *MultiClient) BroadcastNewBlock(ctx context.Context, header *types.Header, body *types.RawBody, td *big.Int)
- func (cs *MultiClient) HandleInboundMessage(ctx context.Context, message *proto_sentry.InboundMessage, ...) (err error)
- func (cs *MultiClient) HandlePeerEvent(ctx context.Context, event *proto_sentry.PeerEvent, ...) error
- func (cs *MultiClient) PeerEventsLoop(ctx context.Context, sentry direct.SentryClient, wg *sync.WaitGroup)
- func (cs *MultiClient) Penalize(ctx context.Context, penalties []headerdownload.PenaltyItem)
- func (cs *MultiClient) PropagateNewBlockHashes(ctx context.Context, announces []headerdownload.Announce)
- func (cs *MultiClient) RecvMessageLoop(ctx context.Context, sentry direct.SentryClient, wg *sync.WaitGroup)
- func (cs *MultiClient) RecvUploadHeadersMessageLoop(ctx context.Context, sentry direct.SentryClient, wg *sync.WaitGroup)
- func (cs *MultiClient) RecvUploadMessageLoop(ctx context.Context, sentry direct.SentryClient, wg *sync.WaitGroup)
- func (cs *MultiClient) SendBodyRequest(ctx context.Context, req *bodydownload.BodyRequest) (peerID [64]byte, ok bool)
- func (cs *MultiClient) SendHeaderRequest(ctx context.Context, req *headerdownload.HeaderRequest) (peerID [64]byte, ok bool)
- func (cs *MultiClient) Sentries() []direct.SentryClient
- func (cs *MultiClient) SetStatus(ctx context.Context)
- func (cs *MultiClient) StartStreamLoops(ctx context.Context)
- type SentryMessageStream
- type SentryMessageStreamFactory
- type StatusDataFactory
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GrpcClient ¶
func MakeInboundMessage ¶
func MakeInboundMessage() *proto_sentry.InboundMessage
func SentryReconnectAndPumpStreamLoop ¶
func SentryReconnectAndPumpStreamLoop[TMessage interface{}]( ctx context.Context, sentryClient direct.SentryClient, statusDataFactory StatusDataFactory, streamName string, streamFactory SentryMessageStreamFactory, messageFactory MessageFactory[TMessage], handleInboundMessage MessageHandler[TMessage], wg *sync.WaitGroup, logger log.Logger, )
Types ¶
type MessageFactory ¶
type MessageFactory[T any] func() T
type MessageHandler ¶
type MultiClient ¶
type MultiClient struct { Hd *headerdownload.HeaderDownload Bd *bodydownload.BodyDownload IsMock bool ChainConfig *chain.Config Engine consensus.Engine // contains filtered or unexported fields }
MultiClient - does handle request/response/subscriptions to multiple sentries each sentry may support same or different p2p protocol
func NewMultiClient ¶
func NewMultiClient( db kv.RwDB, chainConfig *chain.Config, engine consensus.Engine, sentries []direct.SentryClient, syncCfg ethconfig.Sync, blockReader services.FullBlockReader, blockBufferSize int, statusDataProvider *sentry.StatusDataProvider, logPeerInfo bool, maxBlockBroadcastPeers func(*types.Header) uint, disableBlockDownload bool, logger log.Logger, ) (*MultiClient, error)
func (*MultiClient) BroadcastNewBlock ¶
func (*MultiClient) HandleInboundMessage ¶
func (cs *MultiClient) HandleInboundMessage(ctx context.Context, message *proto_sentry.InboundMessage, sentry direct.SentryClient) (err error)
func (*MultiClient) HandlePeerEvent ¶
func (cs *MultiClient) HandlePeerEvent(ctx context.Context, event *proto_sentry.PeerEvent, sentryClient direct.SentryClient) error
func (*MultiClient) PeerEventsLoop ¶
func (cs *MultiClient) PeerEventsLoop( ctx context.Context, sentry direct.SentryClient, wg *sync.WaitGroup, )
func (*MultiClient) Penalize ¶
func (cs *MultiClient) Penalize(ctx context.Context, penalties []headerdownload.PenaltyItem)
sending list of penalties to all sentries
func (*MultiClient) PropagateNewBlockHashes ¶
func (cs *MultiClient) PropagateNewBlockHashes(ctx context.Context, announces []headerdownload.Announce)
func (*MultiClient) RecvMessageLoop ¶
func (cs *MultiClient) RecvMessageLoop( ctx context.Context, sentry direct.SentryClient, wg *sync.WaitGroup, )
func (*MultiClient) RecvUploadHeadersMessageLoop ¶
func (cs *MultiClient) RecvUploadHeadersMessageLoop( ctx context.Context, sentry direct.SentryClient, wg *sync.WaitGroup, )
func (*MultiClient) RecvUploadMessageLoop ¶
func (cs *MultiClient) RecvUploadMessageLoop( ctx context.Context, sentry direct.SentryClient, wg *sync.WaitGroup, )
func (*MultiClient) SendBodyRequest ¶
func (cs *MultiClient) SendBodyRequest(ctx context.Context, req *bodydownload.BodyRequest) (peerID [64]byte, ok bool)
func (*MultiClient) SendHeaderRequest ¶
func (cs *MultiClient) SendHeaderRequest(ctx context.Context, req *headerdownload.HeaderRequest) (peerID [64]byte, ok bool)
func (*MultiClient) Sentries ¶
func (cs *MultiClient) Sentries() []direct.SentryClient
func (*MultiClient) SetStatus ¶
func (cs *MultiClient) SetStatus(ctx context.Context)
func (*MultiClient) StartStreamLoops ¶
func (cs *MultiClient) StartStreamLoops(ctx context.Context)
StartStreamLoops starts message processing loops for all sentries. The processing happens in several streams: RecvMessage - processing incoming headers/bodies RecvUploadMessage - sending bodies/receipts - may be heavy, it's ok to not process this messages enough fast, it's also ok to drop some of these messages if we can't process. RecvUploadHeadersMessage - sending headers - dedicated stream because headers propagation speed important for network health PeerEventsLoop - logging peer connect/disconnect events
type SentryMessageStream ¶
type SentryMessageStream grpc.ClientStream
type SentryMessageStreamFactory ¶
type SentryMessageStreamFactory func(context.Context, direct.SentryClient) (SentryMessageStream, error)
type StatusDataFactory ¶
type StatusDataFactory func(context.Context) (*proto_sentry.StatusData, error)