gossip

package
v2.0.0-beta.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 16, 2022 License: Apache-2.0 Imports: 36 Imported by: 1

Documentation

Index

Constants

View Source
const (
	WorkerQueueSize = 50000
	WorkerCount     = 64
)
View Source
const (
	MessageTypeMilestoneRequest message.Type = 1
	MessageTypeBlock            message.Type = 2
	MessageTypeBlockRequest     message.Type = 3
	MessageTypeHeartbeat        message.Type = 4
)
View Source
const DefaultAdvancementThreshold = 0.0

DefaultAdvancementThreshold is the default threshold at which a checkpoint advancement is done. Per default an advancement is always done as soon the confirmed milestone enters the range between the previous and current checkpoint.

View Source
const DefaultLatencyResolution = 100

Variables

View Source
var (
	ErrBlockNotSolid      = errors.New("block is not solid")
	ErrBlockBelowMaxDepth = errors.New("block is below max depth")
)
View Source
var (
	// ErrInvalidSourceLength is returned when an invalid source byte slice for extraction of certain data is passed.
	ErrInvalidSourceLength = errors.New("invalid source byte slice")
)
View Source
var (
	ErrProtocolDoesNotExist = errors.New("stream/protocol does not exist")
)
View Source
var (
	ErrUnknownRequestType = errors.New("unknown request type")
)

Functions

func BlockProcessedCaller

func BlockProcessedCaller(handler interface{}, params ...interface{})

func BroadcastCaller

func BroadcastCaller(handler interface{}, params ...interface{})

func CheckpointCaller

func CheckpointCaller(handler interface{}, params ...interface{})

func ProtocolCaller

func ProtocolCaller(handler interface{}, params ...interface{})

ProtocolCaller gets called with a Protocol.

func StreamCancelCaller

func StreamCancelCaller(handler interface{}, params ...interface{})

StreamCancelCaller gets called with a network.Stream and its cancel reason.

func SyncDoneCaller

func SyncDoneCaller(handler interface{}, params ...interface{})

func SyncStartCaller

func SyncStartCaller(handler interface{}, params ...interface{})

func TargetCaller

func TargetCaller(handler interface{}, params ...interface{})

Types

type AdvanceCheckpointCriteria

type AdvanceCheckpointCriteria func(currentConfirmed, previousCheckpoint, currentCheckpoint iotago.MilestoneIndex) bool

AdvanceCheckpointCriteria is a function which determines whether the checkpoint should be advanced.

func AdvanceAtPercentageReached

func AdvanceAtPercentageReached(threshold float64) AdvanceCheckpointCriteria

AdvanceAtPercentageReached is an AdvanceCheckpointCriteria which advances the checkpoint when the current one was reached by >=X% by the current confirmed milestone in relation to the previous checkpoint.

type Broadcast

type Broadcast struct {
	// The data to broadcast.
	Data []byte
	// The IDs of the peers to exclude from broadcasting.
	ExcludePeers map[peer.ID]struct{}
}

Broadcast defines a data which should be broadcasted.

type Broadcaster

type Broadcaster struct {
	// contains filtered or unexported fields
}

Broadcaster provides functions to broadcast data to gossip streams.

func NewBroadcaster

func NewBroadcaster(
	dbStorage *storage.Storage,
	syncManager *syncmanager.SyncManager,
	peeringManager *p2p.Manager,
	service *Service,
	broadcastQueueSize int) *Broadcaster

NewBroadcaster creates a new Broadcaster.

func (*Broadcaster) Broadcast

func (b *Broadcaster) Broadcast(broadcast *Broadcast)

Broadcast broadcasts the given Broadcast.

func (*Broadcaster) BroadcastHeartbeat

func (b *Broadcaster) BroadcastHeartbeat(filter func(proto *Protocol) bool)

BroadcastHeartbeat broadcasts a heartbeat message to every peer.

func (*Broadcaster) RunBroadcastQueueDrainer

func (b *Broadcaster) RunBroadcastQueueDrainer(ctx context.Context)

RunBroadcastQueueDrainer runs the broadcast queue drainer.

type CachedWorkUnit

type CachedWorkUnit struct {
	objectstorage.CachedObject
}

CachedWorkUnit represents a cached WorkUnit.

func (*CachedWorkUnit) WorkUnit

func (c *CachedWorkUnit) WorkUnit() *WorkUnit

WorkUnit retrieves the work unit, that is cached in this container.

type Events

type Events struct {
	// Fired when a new set of milestones should be requested.
	CheckpointUpdated *events.Event
	// Fired when the target milestone is updated.
	TargetUpdated *events.Event
	// Fired when warp synchronization starts.
	Start *events.Event
	// Fired when the warp synchronization is done.
	Done *events.Event
}

Events holds WarpSync related events.

type FilterFunc

type FilterFunc func(request *Request) bool

FilterFunc is a function which determines whether a request should be enqueued or not.

type Heartbeat

type Heartbeat struct {
	SolidMilestoneIndex  iotago.MilestoneIndex `json:"solidMilestoneIndex"`
	PrunedMilestoneIndex iotago.MilestoneIndex `json:"prunedMilestoneIndex"`
	LatestMilestoneIndex iotago.MilestoneIndex `json:"latestMilestoneIndex"`
	ConnectedPeers       int                   `json:"connectedPeers"`
	SyncedPeers          int                   `json:"syncedPeers"`
}

Heartbeat contains information about a nodes current solid and pruned milestone index and its connected and synced peers count.

func ParseHeartbeat

func ParseHeartbeat(data []byte) *Heartbeat

ParseHeartbeat parses the given message into a heartbeat.

type Info

type Info struct {
	Heartbeat *Heartbeat      `json:"heartbeat"`
	Metrics   MetricsSnapshot `json:"metrics"`
}

Info represents information about an ongoing gossip protocol.

type MessageProcessor

type MessageProcessor struct {

	// events of the block processor.
	Events *MessageProcessorEvents
	// contains filtered or unexported fields
}

MessageProcessor processes submitted messages in parallel and fires appropriate completion events.

func NewMessageProcessor

func NewMessageProcessor(
	dbStorage *storage.Storage,
	syncManager *syncmanager.SyncManager,
	requestQueue RequestQueue,
	peeringManager *p2p.Manager,
	serverMetrics *metrics.ServerMetrics,
	protocolManager *protocol.Manager,
	opts *Options) (*MessageProcessor, error)

NewMessageProcessor creates a new processor which processes messages.

func (*MessageProcessor) Broadcast

func (proc *MessageProcessor) Broadcast(cachedBlockMeta *storage.CachedMetadata)

func (*MessageProcessor) Emit

func (proc *MessageProcessor) Emit(block *storage.Block) error

Emit triggers BlockProcessed and BroadcastBlock events for the given block. All blocks passed to this function must be checked with "DeSeriModePerformValidation" before. We also check if the parents are solid and not BMD before we broadcast the block, otherwise this block would be seen as invalid gossip by other peers.

func (*MessageProcessor) Process

func (proc *MessageProcessor) Process(p *Protocol, msgType message.Type, data []byte)

Process submits the given message to the processor for processing.

func (*MessageProcessor) Run

func (proc *MessageProcessor) Run(ctx context.Context)

Run runs the processor and blocks until the shutdown signal is triggered.

func (*MessageProcessor) Shutdown

func (proc *MessageProcessor) Shutdown()

Shutdown signals the internal worker pool and object storage to shut down and sets the shutdown flag.

func (*MessageProcessor) WorkUnitsSize

func (proc *MessageProcessor) WorkUnitsSize() int

WorkUnitsSize returns the size of WorkUnits currently cached.

type MessageProcessorEvents

type MessageProcessorEvents struct {
	// Fired when a block was fully processed.
	BlockProcessed *events.Event
	// Fired when a block is meant to be broadcasted.
	BroadcastBlock *events.Event
}

MessageProcessorEvents are the events fired by the MessageProcessor.

type Metrics

type Metrics struct {
	// The number of received blocks which are new.
	NewBlocks atomic.Uint32
	// The number of received blocks which are already known.
	KnownBlocks atomic.Uint32
	// The number of received blocks.
	ReceivedBlocks atomic.Uint32
	// The number of received block requests.
	ReceivedBlockRequests atomic.Uint32
	// The number of received milestone requests.
	ReceivedMilestoneRequests atomic.Uint32
	// The number of received heartbeats.
	ReceivedHeartbeats atomic.Uint32
	// The number of sent packets.
	SentPackets atomic.Uint32
	// The number of sent blocks.
	SentBlocks atomic.Uint32
	// The number of sent block requests.
	SentBlockRequests atomic.Uint32
	// The number of sent milestone requests.
	SentMilestoneRequests atomic.Uint32
	// The number of sent heartbeats.
	SentHeartbeats atomic.Uint32
	// The number of dropped packets.
	DroppedPackets atomic.Uint32
}

Metrics defines a set of metrics regarding a gossip protocol instance.

func (*Metrics) Snapshot

func (m *Metrics) Snapshot() MetricsSnapshot

Snapshot returns MetricsSnapshot of the Metrics.

type MetricsSnapshot

type MetricsSnapshot struct {
	NewBlocks                 uint32 `json:"newBlocks"`
	KnownBlocks               uint32 `json:"knownBlocks"`
	ReceivedBlocks            uint32 `json:"receivedBlocks"`
	ReceivedBlockRequests     uint32 `json:"receivedBlockRequests"`
	ReceivedMilestoneRequests uint32 `json:"receivedMilestoneRequests"`
	ReceivedHeartbeats        uint32 `json:"receivedHeartbeats"`
	SentBlocks                uint32 `json:"sentBlocks"`
	SentBlockRequests         uint32 `json:"sentBlockRequests"`
	SentMilestoneRequests     uint32 `json:"sentMilestoneRequests"`
	SentHeartbeats            uint32 `json:"sentHeartbeats"`
	DroppedPackets            uint32 `json:"droppedPackets"`
}

MetricsSnapshot represents a snapshot of the gossip protocol metrics.

type Options

type Options struct {
	WorkUnitCacheOpts *profile.CacheOpts
}

The Options for the MessageProcessor.

type Protocol

type Protocol struct {
	// Parser parses gossip messages and emits received events for them.
	Parser *protocol.Protocol
	// The ID of the peer to which this protocol is associated to.
	PeerID peer.ID
	// The underlying stream for this Protocol.
	Stream network.Stream

	// The events surrounding a Protocol.
	Events *ProtocolEvents
	// The peer's latest heartbeat message.
	LatestHeartbeat *Heartbeat
	// Time the last heartbeat was received.
	HeartbeatReceivedTime time.Time
	// Time the last heartbeat was sent.
	HeartbeatSentTime time.Time
	// The send queue into which to enqueue messages to send.
	SendQueue chan []byte
	// The metrics around this protocol instance.
	Metrics Metrics

	// The shared server metrics instance.
	ServerMetrics *metrics.ServerMetrics
	// contains filtered or unexported fields
}

Protocol represents an instance of the gossip protocol.

func NewProtocol

func NewProtocol(peerID peer.ID, stream network.Stream, sendQueueSize int, readTimeout, writeTimeout time.Duration, serverMetrics *metrics.ServerMetrics) *Protocol

NewProtocol creates a new gossip protocol instance associated to the given peer.

func (*Protocol) CouldHaveDataForMilestone

func (p *Protocol) CouldHaveDataForMilestone(index iotago.MilestoneIndex) bool

CouldHaveDataForMilestone tells whether the underlying peer given the latest heartbeat message, could have parts of the cone data for the given milestone. Returns false if no heartbeat message was received yet.

func (*Protocol) Enqueue

func (p *Protocol) Enqueue(data []byte)

Enqueue enqueues the given gossip protocol message to be sent to the peer. If it can't because the send queue is over capacity, the message gets dropped.

func (*Protocol) HasDataForMilestone

func (p *Protocol) HasDataForMilestone(index iotago.MilestoneIndex) bool

HasDataForMilestone tells whether the underlying peer given the latest heartbeat message, has the cone data for the given milestone. Returns false if no heartbeat message was received yet.

func (*Protocol) Info

func (p *Protocol) Info() *Info

Info returns the info about the protocol.

func (*Protocol) IsSynced

func (p *Protocol) IsSynced(cmi iotago.MilestoneIndex) bool

IsSynced tells whether the underlying peer is synced.

func (*Protocol) Read

func (p *Protocol) Read(buf []byte) (int, error)

Read reads from the stream into the given buffer.

func (*Protocol) Send

func (p *Protocol) Send(message []byte) error

Send sends the given gossip message on the underlying Protocol.Stream.

func (*Protocol) SendBlock

func (p *Protocol) SendBlock(blockData []byte)

SendBlock sends a storage.Block to the given peer.

func (*Protocol) SendBlockRequest

func (p *Protocol) SendBlockRequest(requestedBlockID iotago.BlockID)

SendBlockRequest sends a block request message to the given peer.

func (*Protocol) SendHeartbeat

func (p *Protocol) SendHeartbeat(solidMsIndex iotago.MilestoneIndex, pruningMsIndex iotago.MilestoneIndex, latestMsIndex iotago.MilestoneIndex, connectedPeers uint8, syncedPeers uint8)

SendHeartbeat sends a Heartbeat to the given peer.

func (*Protocol) SendLatestMilestoneRequest

func (p *Protocol) SendLatestMilestoneRequest()

SendLatestMilestoneRequest sends a storage.Milestone request which requests the latest known milestone from the given peer.

func (*Protocol) SendMilestoneRequest

func (p *Protocol) SendMilestoneRequest(index iotago.MilestoneIndex)

SendMilestoneRequest sends a milestone request to the given peer.

func (*Protocol) Terminated

func (p *Protocol) Terminated() <-chan struct{}

Terminated returns a channel that is closed if the protocol was terminated.

type ProtocolEvents

type ProtocolEvents struct {
	// Fired when the heartbeat message state on the peer has been updated.
	HeartbeatUpdated *events.Event
	// Fired when a message of the given type is sent.
	// This exists solely because protocol.Protocol in hive.go doesn't
	// emit events anymore for sent messages, as it is solely a parser.
	Sent []*events.Event
	// Fired when an error occurs on the protocol.
	Errors *events.Event
}

ProtocolEvents happening on a Protocol.

type ProtocolForEachFunc

type ProtocolForEachFunc func(proto *Protocol) bool

ProtocolForEachFunc is used in Service.ForEach. Returning false indicates to stop looping. This function must not call any methods on Service.

type Request

type Request struct {
	// The type of the request.
	RequestType RequestType
	// The BlockID of the block to request.
	BlockID iotago.BlockID
	// The milestone index under which this request is linked.
	MilestoneIndex iotago.MilestoneIndex
	// Tells the request queue to not remove this request if the enqueue time is
	// over the given threshold.
	PreventDiscard bool
	// the time at which this request was first enqueued.
	// do not modify this time
	EnqueueTime time.Time
}

Request is a request for a particular block.

func NewBlockIDRequest

func NewBlockIDRequest(blockID iotago.BlockID, msIndex iotago.MilestoneIndex) *Request

NewBlockIDRequest creates a new block request for a specific blockID.

func NewMilestoneIndexRequest

func NewMilestoneIndexRequest(msIndex iotago.MilestoneIndex) *Request

NewMilestoneIndexRequest creates a new block request for a specific milestone index.

func (*Request) MapKey

func (r *Request) MapKey() string

type RequestBackPressureFunc

type RequestBackPressureFunc func() bool

RequestBackPressureFunc is a function which tells the Requester to stop requesting more data.

type RequestQueue

type RequestQueue interface {
	// Next returns the next request to send, pops it from the queue and marks it as pending.
	Next() *Request
	// Peek returns the next request to send without popping it from the queue.
	Peek() *Request
	// Enqueue enqueues the given request if it isn't already queued or pending.
	Enqueue(*Request) (enqueued bool)
	// IsQueued tells whether a given request for the given data is queued.
	IsQueued(data interface{}) bool
	// IsPending tells whether a given request was popped from the queue and is now pending.
	IsPending(data interface{}) bool
	// IsProcessing tells whether a given request was popped from the queue, received and is now processing.
	IsProcessing(data interface{}) bool
	// Received marks a request as received and thereby removes it from the pending set.
	// It is added to the processing set.
	// Returns the origin request which was pending or nil if the data was not requested.
	Received(data interface{}) *Request
	// Processed marks a request as fulfilled and thereby removes it from the processing set.
	// Returns the origin request which was processing or nil if the data was not requested.
	Processed(data interface{}) *Request
	// EnqueuePending enqueues all pending requests back into the queue.
	// It also discards requests in the pending set of which their enqueue time is over the given delta threshold.
	// If discardOlderThan is zero, no requests are discarded.
	EnqueuePending(discardOlderThan time.Duration) (queued int)
	// Size returns the size of currently queued, requested/pending and processing requests.
	Size() (queued int, pending int, processing int)
	// Empty tells whether the queue has no queued and pending requests.
	Empty() bool
	// Requests returns a snapshot of all queued, pending and processing requests in the queue.
	Requests() (queued []*Request, pending []*Request, processing []*Request)
	// AvgLatency returns the average latency of enqueueing and then receiving a request.
	AvgLatency() int64
	// Filter adds the given filter function to the queue. Passing nil resets the current one.
	// Setting a filter automatically clears all queued and pending requests which do not fulfill
	// the filter criteria.
	Filter(f FilterFunc)
}

RequestQueue implements a queue which contains requests for needed data.

func NewRequestQueue

func NewRequestQueue(latencyResolution ...int32) RequestQueue

NewRequestQueue creates a new RequestQueue where request are prioritized over their milestone index (lower = higher priority).

type RequestType

type RequestType int

RequestType is the type of request.

const (
	RequestTypeBlockID RequestType = iota
	RequestTypeMilestoneIndex
)

type Requester

type Requester struct {
	// contains filtered or unexported fields
}

Requester handles requesting packets.

func NewRequester

func NewRequester(
	dbStorage *storage.Storage,
	service *Service,
	rQueue RequestQueue,
	opts ...RequesterOption) *Requester

NewRequester creates a new Requester.

func (*Requester) AddBackPressureFunc

func (r *Requester) AddBackPressureFunc(pressureFunc RequestBackPressureFunc)

AddBackPressureFunc adds a RequestBackPressureFunc. This function can be called multiple times to add additional RequestBackPressureFunc. Calling this function after any Requester worker has been started results in a panic.

func (*Requester) Request

func (r *Requester) Request(data interface{}, msIndex iotago.MilestoneIndex, preventDiscard ...bool) bool

Request enqueues a request to the request queue for the given block if it isn't a solid entry point and is not contained in the database already.

func (*Requester) RequestMilestoneParents

func (r *Requester) RequestMilestoneParents(cachedMilestone *storage.CachedMilestone) bool

RequestMilestoneParents enqueues requests for the parents of the given milestone to the request queue, if the parents are not solid entry points and not already in the database.

func (*Requester) RequestMultiple

func (r *Requester) RequestMultiple(blockIDs iotago.BlockIDs, msIndex iotago.MilestoneIndex, preventDiscard ...bool) int

RequestMultiple works like Request but takes multiple block IDs.

func (*Requester) RequestParents

func (r *Requester) RequestParents(cachedBlock *storage.CachedBlock, msIndex iotago.MilestoneIndex, preventDiscard ...bool)

RequestParents enqueues requests for the parents of the given block to the request queue, if the given block is not a solid entry point and neither its parents are and also not in the database.

func (*Requester) RunPendingRequestEnqueuer

func (r *Requester) RunPendingRequestEnqueuer(ctx context.Context)

RunPendingRequestEnqueuer runs the loop to periodically re-request pending requests from the RequestQueue.

func (*Requester) RunRequestQueueDrainer

func (r *Requester) RunRequestQueueDrainer(ctx context.Context)

RunRequestQueueDrainer runs the RequestQueue drainer.

type RequesterOption

type RequesterOption func(options *RequesterOptions)

RequesterOption is a function which sets an option on a RequesterOptions instance.

func WithRequesterDiscardRequestsOlderThan

func WithRequesterDiscardRequestsOlderThan(dur time.Duration) RequesterOption

WithRequesterDiscardRequestsOlderThan sets the threshold for the max age of requests.

func WithRequesterPendingRequestReEnqueueInterval

func WithRequesterPendingRequestReEnqueueInterval(dur time.Duration) RequesterOption

WithRequesterPendingRequestReEnqueueInterval sets the re-enqueue interval for pending requests.

type RequesterOptions

type RequesterOptions struct {
	// Defines the re-queue interval for pending requests.
	PendingRequestReEnqueueInterval time.Duration
	// Defines the max age for requests.
	DiscardRequestsOlderThan time.Duration
}

RequesterOptions are options around a Requester.

type Requests

type Requests []*Request

func (Requests) HasRequest

func (r Requests) HasRequest() bool

HasRequest returns true if Requests contains a Request.

type Service

type Service struct {
	// the logger used to log events.
	*logger.WrappedLogger

	// Events happening around a Service.
	Events *ServiceEvents
	// contains filtered or unexported fields
}

Service handles ongoing gossip streams.

func NewService

func NewService(
	protocol protocol.ID, host host.Host,
	peeringManager *p2p.Manager,
	serverMetrics *metrics.ServerMetrics,
	opts ...ServiceOption) *Service

NewService creates a new Service.

func (*Service) CloseStream

func (s *Service) CloseStream(peerID peer.ID) error

CloseStream closes an ongoing stream with a peer.

func (*Service) ForEach

func (s *Service) ForEach(f ProtocolForEachFunc)

ForEach calls the given ProtocolForEachFunc on each Protocol.

func (*Service) Protocol

func (s *Service) Protocol(peerID peer.ID) *Protocol

Protocol returns the gossip.Protocol instance for the given peer or nil.

func (*Service) Start

func (s *Service) Start(ctx context.Context)

Start starts the Service's event loop.

func (*Service) SynchronizedCount

func (s *Service) SynchronizedCount(latestMilestoneIndex iotago.MilestoneIndex) int

SynchronizedCount returns the count of streams with peers which appear to be synchronized given their latest Heartbeat message.

type ServiceEvents

type ServiceEvents struct {
	// Fired when a protocol has been started.
	ProtocolStarted *events.Event
	// Fired when a protocol has ended.
	ProtocolTerminated *events.Event
	// Fired when an inbound stream gets canceled.
	InboundStreamCanceled *events.Event
	// Fired when an internal error happens.
	Error *events.Event
}

ServiceEvents are events happening around a Service.

type ServiceOption

type ServiceOption func(opts *ServiceOptions)

ServiceOption is a function setting a ServiceOptions option.

func WithLogger

func WithLogger(logger *logger.Logger) ServiceOption

WithLogger enables logging within the Service.

func WithSendQueueSize

func WithSendQueueSize(size int) ServiceOption

WithSendQueueSize defines the size of send queues on ongoing gossip protocol streams.

func WithStreamConnectTimeout

func WithStreamConnectTimeout(dur time.Duration) ServiceOption

WithStreamConnectTimeout defines the timeout for creating a gossip protocol stream.

func WithStreamReadTimeout

func WithStreamReadTimeout(dur time.Duration) ServiceOption

WithStreamReadTimeout defines the read timeout for reading from a stream.

func WithStreamWriteTimeout

func WithStreamWriteTimeout(dur time.Duration) ServiceOption

WithStreamWriteTimeout defines the write timeout for writing to a stream.

func WithUnknownPeersLimit

func WithUnknownPeersLimit(limit int) ServiceOption

WithUnknownPeersLimit defines how many peers with an unknown relation are allowed to have an ongoing gossip protocol stream.

type ServiceOptions

type ServiceOptions struct {
	// contains filtered or unexported fields
}

ServiceOptions define options for a Service.

type StreamCancelReason

type StreamCancelReason string

StreamCancelReason is a reason for a gossip stream cancellation.

const (
	// StreamCancelReasonDuplicated defines a stream cancellation because
	// it would lead to a duplicated ongoing stream.
	StreamCancelReasonDuplicated StreamCancelReason = "duplicated stream"
	// StreamCancelReasonInsufficientPeerRelation defines a stream cancellation because
	// the relation to the other peer is insufficient.
	StreamCancelReasonInsufficientPeerRelation StreamCancelReason = "insufficient peer relation"
	// StreamCancelReasonNoUnknownPeerSlotAvailable defines a stream cancellation
	// because no more unknown peers slot were available.
	StreamCancelReasonNoUnknownPeerSlotAvailable StreamCancelReason = "no unknown peer slot available"
)

type WarpSync

type WarpSync struct {
	sync.Mutex

	// The used advancement range per checkpoint.
	AdvancementRange syncmanager.MilestoneIndexDelta
	// The Events of the warpsync.
	Events *Events

	// The current confirmed milestone of the node.
	CurrentConfirmedMilestone iotago.MilestoneIndex
	// The starting time of the synchronization.
	StartTime time.Time
	// The starting point of the synchronization.
	InitMilestone iotago.MilestoneIndex
	// The target milestone to which to synchronize to.
	TargetMilestone iotago.MilestoneIndex
	// The previous checkpoint of the synchronization.
	PreviousCheckpoint iotago.MilestoneIndex
	// The current checkpoint of the synchronization.
	CurrentCheckpoint iotago.MilestoneIndex
	// contains filtered or unexported fields
}

WarpSync is metadata about doing a synchronization via STING messages.

func NewWarpSync

func NewWarpSync(advRange int, advanceCheckpointCriteriaFunc ...AdvanceCheckpointCriteria) *WarpSync

NewWarpSync creates a new WarpSync instance with the given advancement range and criteria func. If no advancement func is provided, the WarpSync uses AdvanceAtPercentageReached with DefaultAdvancementThreshold.

func (*WarpSync) AddReferencedBlocksCount

func (ws *WarpSync) AddReferencedBlocksCount(referencedBlocksCount int)

AddReferencedBlocksCount adds the amount of referenced blocks to collect stats.

func (*WarpSync) UpdateCurrentConfirmedMilestone

func (ws *WarpSync) UpdateCurrentConfirmedMilestone(current iotago.MilestoneIndex)

UpdateCurrentConfirmedMilestone updates the current confirmed milestone index state.

func (*WarpSync) UpdateTargetMilestone

func (ws *WarpSync) UpdateTargetMilestone(target iotago.MilestoneIndex)

UpdateTargetMilestone updates the synchronization target if it is higher than the current one and triggers a synchronization start if the target was set for the first time.

type WarpSyncMilestoneRequester

type WarpSyncMilestoneRequester struct {
	syncutils.Mutex
	// contains filtered or unexported fields
}

WarpSyncMilestoneRequester walks the cones of existing but non-solid milestones and memoizes already walked blocks and milestones.

func NewWarpSyncMilestoneRequester

func NewWarpSyncMilestoneRequester(
	dbStorage *storage.Storage,
	syncManager *syncmanager.SyncManager,
	requester *Requester,
	preventDiscard bool) *WarpSyncMilestoneRequester

NewWarpSyncMilestoneRequester creates a new WarpSyncMilestoneRequester instance.

func (*WarpSyncMilestoneRequester) Cleanup

func (w *WarpSyncMilestoneRequester) Cleanup()

Cleanup cleans up traversed blocks to free memory.

func (*WarpSyncMilestoneRequester) RequestMilestoneRange

func (w *WarpSyncMilestoneRequester) RequestMilestoneRange(ctx context.Context, rangeToRequest syncmanager.MilestoneIndexDelta, onExistingMilestoneInRange func(ctx context.Context, msIndex iotago.MilestoneIndex) error, from ...iotago.MilestoneIndex) syncmanager.MilestoneIndexDelta

RequestMilestoneRange requests up to N milestones nearest to the current confirmed milestone index. Returns the number of milestones requested.

func (*WarpSyncMilestoneRequester) RequestMissingMilestoneParents

func (w *WarpSyncMilestoneRequester) RequestMissingMilestoneParents(ctx context.Context, msIndex iotago.MilestoneIndex) error

RequestMissingMilestoneParents traverses the parents of a given milestone and requests each missing parent. Already requested milestones or traversed blocks will be ignored, to circumvent requesting the same parents multiple times.

type WorkUnit

type WorkUnit struct {
	objectstorage.StorableObjectFlags
	// contains filtered or unexported fields
}

WorkUnit defines the work around processing a received block and its associated requests from peers. There is at most one WorkUnit active per same block bytes.

func (*WorkUnit) Is

func (wu *WorkUnit) Is(state WorkUnitState) bool

Is tells whether the WorkUnit has the given state.

func (*WorkUnit) ObjectStorageKey

func (wu *WorkUnit) ObjectStorageKey() []byte

func (*WorkUnit) ObjectStorageValue

func (wu *WorkUnit) ObjectStorageValue() []byte

func (*WorkUnit) Update

func (wu *WorkUnit) Update(_ objectstorage.StorableObject)

func (*WorkUnit) UpdateState

func (wu *WorkUnit) UpdateState(state WorkUnitState)

UpdateState updates the WorkUnit's state.

type WorkUnitState

type WorkUnitState byte

WorkUnitState defines the state which a WorkUnit is in.

const (
	Hashing WorkUnitState = 1 << 0
	Invalid WorkUnitState = 1 << 1
	Hashed  WorkUnitState = 1 << 2
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL