Documentation ¶
Index ¶
- Constants
- Variables
- func BlockProcessedCaller(handler interface{}, params ...interface{})
- func BroadcastCaller(handler interface{}, params ...interface{})
- func CheckpointCaller(handler interface{}, params ...interface{})
- func ProtocolCaller(handler interface{}, params ...interface{})
- func StreamCancelCaller(handler interface{}, params ...interface{})
- func SyncDoneCaller(handler interface{}, params ...interface{})
- func SyncStartCaller(handler interface{}, params ...interface{})
- func TargetCaller(handler interface{}, params ...interface{})
- type AdvanceCheckpointCriteria
- type Broadcast
- type Broadcaster
- type CachedWorkUnit
- type Events
- type FilterFunc
- type Heartbeat
- type Info
- type MessageProcessor
- func (proc *MessageProcessor) Broadcast(cachedBlockMeta *storage.CachedMetadata)
- func (proc *MessageProcessor) Emit(block *storage.Block) error
- func (proc *MessageProcessor) Process(p *Protocol, msgType message.Type, data []byte)
- func (proc *MessageProcessor) Run(ctx context.Context)
- func (proc *MessageProcessor) Shutdown()
- func (proc *MessageProcessor) WorkUnitsSize() int
- type MessageProcessorEvents
- type Metrics
- type MetricsSnapshot
- type Options
- type Protocol
- func (p *Protocol) CouldHaveDataForMilestone(index iotago.MilestoneIndex) bool
- func (p *Protocol) Enqueue(data []byte)
- func (p *Protocol) HasDataForMilestone(index iotago.MilestoneIndex) bool
- func (p *Protocol) Info() *Info
- func (p *Protocol) IsSynced(cmi iotago.MilestoneIndex) bool
- func (p *Protocol) Read(buf []byte) (int, error)
- func (p *Protocol) Send(message []byte) error
- func (p *Protocol) SendBlock(blockData []byte)
- func (p *Protocol) SendBlockRequest(requestedBlockID iotago.BlockID)
- func (p *Protocol) SendHeartbeat(solidMsIndex iotago.MilestoneIndex, pruningMsIndex iotago.MilestoneIndex, ...)
- func (p *Protocol) SendLatestMilestoneRequest()
- func (p *Protocol) SendMilestoneRequest(index iotago.MilestoneIndex)
- func (p *Protocol) Terminated() <-chan struct{}
- type ProtocolEvents
- type ProtocolForEachFunc
- type Request
- type RequestBackPressureFunc
- type RequestQueue
- type RequestType
- type Requester
- func (r *Requester) AddBackPressureFunc(pressureFunc RequestBackPressureFunc)
- func (r *Requester) Request(data interface{}, msIndex iotago.MilestoneIndex, preventDiscard ...bool) bool
- func (r *Requester) RequestMilestoneParents(cachedMilestone *storage.CachedMilestone) bool
- func (r *Requester) RequestMultiple(blockIDs iotago.BlockIDs, msIndex iotago.MilestoneIndex, ...) int
- func (r *Requester) RequestParents(cachedBlock *storage.CachedBlock, msIndex iotago.MilestoneIndex, ...)
- func (r *Requester) RunPendingRequestEnqueuer(ctx context.Context)
- func (r *Requester) RunRequestQueueDrainer(ctx context.Context)
- type RequesterOption
- type RequesterOptions
- type Requests
- type Service
- type ServiceEvents
- type ServiceOption
- func WithLogger(logger *logger.Logger) ServiceOption
- func WithSendQueueSize(size int) ServiceOption
- func WithStreamConnectTimeout(dur time.Duration) ServiceOption
- func WithStreamReadTimeout(dur time.Duration) ServiceOption
- func WithStreamWriteTimeout(dur time.Duration) ServiceOption
- func WithUnknownPeersLimit(limit int) ServiceOption
- type ServiceOptions
- type StreamCancelReason
- type WarpSync
- type WarpSyncMilestoneRequester
- func (w *WarpSyncMilestoneRequester) Cleanup()
- func (w *WarpSyncMilestoneRequester) RequestMilestoneRange(ctx context.Context, rangeToRequest syncmanager.MilestoneIndexDelta, ...) syncmanager.MilestoneIndexDelta
- func (w *WarpSyncMilestoneRequester) RequestMissingMilestoneParents(ctx context.Context, msIndex iotago.MilestoneIndex) error
- type WorkUnit
- type WorkUnitState
Constants ¶
const ( WorkerQueueSize = 50000 WorkerCount = 64 )
const ( MessageTypeMilestoneRequest message.Type = 1 MessageTypeBlock message.Type = 2 MessageTypeBlockRequest message.Type = 3 MessageTypeHeartbeat message.Type = 4 )
const DefaultAdvancementThreshold = 0.0
DefaultAdvancementThreshold is the default threshold at which a checkpoint advancement is done. Per default an advancement is always done as soon the confirmed milestone enters the range between the previous and current checkpoint.
const DefaultLatencyResolution = 100
Variables ¶
var ( ErrBlockNotSolid = errors.New("block is not solid") ErrBlockBelowMaxDepth = errors.New("block is below max depth") )
var ( // ErrInvalidSourceLength is returned when an invalid source byte slice for extraction of certain data is passed. ErrInvalidSourceLength = errors.New("invalid source byte slice") )
var (
ErrProtocolDoesNotExist = errors.New("stream/protocol does not exist")
)
var (
ErrUnknownRequestType = errors.New("unknown request type")
)
Functions ¶
func BlockProcessedCaller ¶
func BlockProcessedCaller(handler interface{}, params ...interface{})
func BroadcastCaller ¶
func BroadcastCaller(handler interface{}, params ...interface{})
func CheckpointCaller ¶
func CheckpointCaller(handler interface{}, params ...interface{})
func ProtocolCaller ¶
func ProtocolCaller(handler interface{}, params ...interface{})
ProtocolCaller gets called with a Protocol.
func StreamCancelCaller ¶
func StreamCancelCaller(handler interface{}, params ...interface{})
StreamCancelCaller gets called with a network.Stream and its cancel reason.
func SyncDoneCaller ¶
func SyncDoneCaller(handler interface{}, params ...interface{})
func SyncStartCaller ¶
func SyncStartCaller(handler interface{}, params ...interface{})
func TargetCaller ¶
func TargetCaller(handler interface{}, params ...interface{})
Types ¶
type AdvanceCheckpointCriteria ¶
type AdvanceCheckpointCriteria func(currentConfirmed, previousCheckpoint, currentCheckpoint iotago.MilestoneIndex) bool
AdvanceCheckpointCriteria is a function which determines whether the checkpoint should be advanced.
func AdvanceAtPercentageReached ¶
func AdvanceAtPercentageReached(threshold float64) AdvanceCheckpointCriteria
AdvanceAtPercentageReached is an AdvanceCheckpointCriteria which advances the checkpoint when the current one was reached by >=X% by the current confirmed milestone in relation to the previous checkpoint.
type Broadcast ¶
type Broadcast struct { // The data to broadcast. Data []byte // The IDs of the peers to exclude from broadcasting. ExcludePeers map[peer.ID]struct{} }
Broadcast defines a data which should be broadcasted.
type Broadcaster ¶
type Broadcaster struct {
// contains filtered or unexported fields
}
Broadcaster provides functions to broadcast data to gossip streams.
func NewBroadcaster ¶
func NewBroadcaster( dbStorage *storage.Storage, syncManager *syncmanager.SyncManager, peeringManager *p2p.Manager, service *Service, broadcastQueueSize int) *Broadcaster
NewBroadcaster creates a new Broadcaster.
func (*Broadcaster) Broadcast ¶
func (b *Broadcaster) Broadcast(broadcast *Broadcast)
Broadcast broadcasts the given Broadcast.
func (*Broadcaster) BroadcastHeartbeat ¶
func (b *Broadcaster) BroadcastHeartbeat(filter func(proto *Protocol) bool)
BroadcastHeartbeat broadcasts a heartbeat message to every peer.
func (*Broadcaster) RunBroadcastQueueDrainer ¶
func (b *Broadcaster) RunBroadcastQueueDrainer(ctx context.Context)
RunBroadcastQueueDrainer runs the broadcast queue drainer.
type CachedWorkUnit ¶
type CachedWorkUnit struct {
objectstorage.CachedObject
}
CachedWorkUnit represents a cached WorkUnit.
func (*CachedWorkUnit) WorkUnit ¶
func (c *CachedWorkUnit) WorkUnit() *WorkUnit
WorkUnit retrieves the work unit, that is cached in this container.
type Events ¶
type Events struct { // Fired when a new set of milestones should be requested. CheckpointUpdated *events.Event // Fired when the target milestone is updated. TargetUpdated *events.Event // Fired when warp synchronization starts. Start *events.Event // Fired when the warp synchronization is done. Done *events.Event }
Events holds WarpSync related events.
type FilterFunc ¶
FilterFunc is a function which determines whether a request should be enqueued or not.
type Heartbeat ¶
type Heartbeat struct { SolidMilestoneIndex iotago.MilestoneIndex `json:"solidMilestoneIndex"` PrunedMilestoneIndex iotago.MilestoneIndex `json:"prunedMilestoneIndex"` LatestMilestoneIndex iotago.MilestoneIndex `json:"latestMilestoneIndex"` ConnectedPeers int `json:"connectedPeers"` SyncedPeers int `json:"syncedPeers"` }
Heartbeat contains information about a nodes current solid and pruned milestone index and its connected and synced peers count.
func ParseHeartbeat ¶
ParseHeartbeat parses the given message into a heartbeat.
type Info ¶
type Info struct { Heartbeat *Heartbeat `json:"heartbeat"` Metrics MetricsSnapshot `json:"metrics"` }
Info represents information about an ongoing gossip protocol.
type MessageProcessor ¶
type MessageProcessor struct { // events of the block processor. Events *MessageProcessorEvents // contains filtered or unexported fields }
MessageProcessor processes submitted messages in parallel and fires appropriate completion events.
func NewMessageProcessor ¶
func NewMessageProcessor( dbStorage *storage.Storage, syncManager *syncmanager.SyncManager, requestQueue RequestQueue, peeringManager *p2p.Manager, serverMetrics *metrics.ServerMetrics, protocolManager *protocol.Manager, opts *Options) (*MessageProcessor, error)
NewMessageProcessor creates a new processor which processes messages.
func (*MessageProcessor) Broadcast ¶
func (proc *MessageProcessor) Broadcast(cachedBlockMeta *storage.CachedMetadata)
func (*MessageProcessor) Emit ¶
func (proc *MessageProcessor) Emit(block *storage.Block) error
Emit triggers BlockProcessed and BroadcastBlock events for the given block. All blocks passed to this function must be checked with "DeSeriModePerformValidation" before. We also check if the parents are solid and not BMD before we broadcast the block, otherwise this block would be seen as invalid gossip by other peers.
func (*MessageProcessor) Process ¶
func (proc *MessageProcessor) Process(p *Protocol, msgType message.Type, data []byte)
Process submits the given message to the processor for processing.
func (*MessageProcessor) Run ¶
func (proc *MessageProcessor) Run(ctx context.Context)
Run runs the processor and blocks until the shutdown signal is triggered.
func (*MessageProcessor) Shutdown ¶
func (proc *MessageProcessor) Shutdown()
Shutdown signals the internal worker pool and object storage to shut down and sets the shutdown flag.
func (*MessageProcessor) WorkUnitsSize ¶
func (proc *MessageProcessor) WorkUnitsSize() int
WorkUnitsSize returns the size of WorkUnits currently cached.
type MessageProcessorEvents ¶
type MessageProcessorEvents struct { // Fired when a block was fully processed. BlockProcessed *events.Event // Fired when a block is meant to be broadcasted. BroadcastBlock *events.Event }
MessageProcessorEvents are the events fired by the MessageProcessor.
type Metrics ¶
type Metrics struct { // The number of received blocks which are new. NewBlocks atomic.Uint32 // The number of received blocks which are already known. KnownBlocks atomic.Uint32 // The number of received blocks. ReceivedBlocks atomic.Uint32 // The number of received block requests. ReceivedBlockRequests atomic.Uint32 // The number of received milestone requests. ReceivedMilestoneRequests atomic.Uint32 // The number of received heartbeats. ReceivedHeartbeats atomic.Uint32 // The number of sent packets. SentPackets atomic.Uint32 // The number of sent blocks. SentBlocks atomic.Uint32 // The number of sent block requests. SentBlockRequests atomic.Uint32 // The number of sent milestone requests. SentMilestoneRequests atomic.Uint32 // The number of sent heartbeats. SentHeartbeats atomic.Uint32 // The number of dropped packets. DroppedPackets atomic.Uint32 }
Metrics defines a set of metrics regarding a gossip protocol instance.
func (*Metrics) Snapshot ¶
func (m *Metrics) Snapshot() MetricsSnapshot
Snapshot returns MetricsSnapshot of the Metrics.
type MetricsSnapshot ¶
type MetricsSnapshot struct { NewBlocks uint32 `json:"newBlocks"` KnownBlocks uint32 `json:"knownBlocks"` ReceivedBlocks uint32 `json:"receivedBlocks"` ReceivedBlockRequests uint32 `json:"receivedBlockRequests"` ReceivedMilestoneRequests uint32 `json:"receivedMilestoneRequests"` ReceivedHeartbeats uint32 `json:"receivedHeartbeats"` SentBlocks uint32 `json:"sentBlocks"` SentBlockRequests uint32 `json:"sentBlockRequests"` SentMilestoneRequests uint32 `json:"sentMilestoneRequests"` SentHeartbeats uint32 `json:"sentHeartbeats"` DroppedPackets uint32 `json:"droppedPackets"` }
MetricsSnapshot represents a snapshot of the gossip protocol metrics.
type Protocol ¶
type Protocol struct { // Parser parses gossip messages and emits received events for them. Parser *protocol.Protocol // The ID of the peer to which this protocol is associated to. PeerID peer.ID // The underlying stream for this Protocol. Stream network.Stream // The events surrounding a Protocol. Events *ProtocolEvents // The peer's latest heartbeat message. LatestHeartbeat *Heartbeat // Time the last heartbeat was received. HeartbeatReceivedTime time.Time // Time the last heartbeat was sent. HeartbeatSentTime time.Time // The send queue into which to enqueue messages to send. SendQueue chan []byte // The metrics around this protocol instance. Metrics Metrics // The shared server metrics instance. ServerMetrics *metrics.ServerMetrics // contains filtered or unexported fields }
Protocol represents an instance of the gossip protocol.
func NewProtocol ¶
func NewProtocol(peerID peer.ID, stream network.Stream, sendQueueSize int, readTimeout, writeTimeout time.Duration, serverMetrics *metrics.ServerMetrics) *Protocol
NewProtocol creates a new gossip protocol instance associated to the given peer.
func (*Protocol) CouldHaveDataForMilestone ¶
func (p *Protocol) CouldHaveDataForMilestone(index iotago.MilestoneIndex) bool
CouldHaveDataForMilestone tells whether the underlying peer given the latest heartbeat message, could have parts of the cone data for the given milestone. Returns false if no heartbeat message was received yet.
func (*Protocol) Enqueue ¶
Enqueue enqueues the given gossip protocol message to be sent to the peer. If it can't because the send queue is over capacity, the message gets dropped.
func (*Protocol) HasDataForMilestone ¶
func (p *Protocol) HasDataForMilestone(index iotago.MilestoneIndex) bool
HasDataForMilestone tells whether the underlying peer given the latest heartbeat message, has the cone data for the given milestone. Returns false if no heartbeat message was received yet.
func (*Protocol) IsSynced ¶
func (p *Protocol) IsSynced(cmi iotago.MilestoneIndex) bool
IsSynced tells whether the underlying peer is synced.
func (*Protocol) SendBlockRequest ¶
SendBlockRequest sends a block request message to the given peer.
func (*Protocol) SendHeartbeat ¶
func (p *Protocol) SendHeartbeat(solidMsIndex iotago.MilestoneIndex, pruningMsIndex iotago.MilestoneIndex, latestMsIndex iotago.MilestoneIndex, connectedPeers uint8, syncedPeers uint8)
SendHeartbeat sends a Heartbeat to the given peer.
func (*Protocol) SendLatestMilestoneRequest ¶
func (p *Protocol) SendLatestMilestoneRequest()
SendLatestMilestoneRequest sends a storage.Milestone request which requests the latest known milestone from the given peer.
func (*Protocol) SendMilestoneRequest ¶
func (p *Protocol) SendMilestoneRequest(index iotago.MilestoneIndex)
SendMilestoneRequest sends a milestone request to the given peer.
func (*Protocol) Terminated ¶
func (p *Protocol) Terminated() <-chan struct{}
Terminated returns a channel that is closed if the protocol was terminated.
type ProtocolEvents ¶
type ProtocolEvents struct { // Fired when the heartbeat message state on the peer has been updated. HeartbeatUpdated *events.Event // Fired when a message of the given type is sent. // This exists solely because protocol.Protocol in hive.go doesn't // emit events anymore for sent messages, as it is solely a parser. Sent []*events.Event // Fired when an error occurs on the protocol. Errors *events.Event }
ProtocolEvents happening on a Protocol.
type ProtocolForEachFunc ¶
ProtocolForEachFunc is used in Service.ForEach. Returning false indicates to stop looping. This function must not call any methods on Service.
type Request ¶
type Request struct { // The type of the request. RequestType RequestType // The BlockID of the block to request. BlockID iotago.BlockID // The milestone index under which this request is linked. MilestoneIndex iotago.MilestoneIndex // Tells the request queue to not remove this request if the enqueue time is // over the given threshold. PreventDiscard bool // the time at which this request was first enqueued. // do not modify this time EnqueueTime time.Time }
Request is a request for a particular block.
func NewBlockIDRequest ¶
func NewBlockIDRequest(blockID iotago.BlockID, msIndex iotago.MilestoneIndex) *Request
NewBlockIDRequest creates a new block request for a specific blockID.
func NewMilestoneIndexRequest ¶
func NewMilestoneIndexRequest(msIndex iotago.MilestoneIndex) *Request
NewMilestoneIndexRequest creates a new block request for a specific milestone index.
type RequestBackPressureFunc ¶
type RequestBackPressureFunc func() bool
RequestBackPressureFunc is a function which tells the Requester to stop requesting more data.
type RequestQueue ¶
type RequestQueue interface { // Next returns the next request to send, pops it from the queue and marks it as pending. Next() *Request // Peek returns the next request to send without popping it from the queue. Peek() *Request // Enqueue enqueues the given request if it isn't already queued or pending. Enqueue(*Request) (enqueued bool) // IsQueued tells whether a given request for the given data is queued. IsQueued(data interface{}) bool // IsPending tells whether a given request was popped from the queue and is now pending. IsPending(data interface{}) bool // IsProcessing tells whether a given request was popped from the queue, received and is now processing. IsProcessing(data interface{}) bool // Received marks a request as received and thereby removes it from the pending set. // It is added to the processing set. // Returns the origin request which was pending or nil if the data was not requested. Received(data interface{}) *Request // Processed marks a request as fulfilled and thereby removes it from the processing set. // Returns the origin request which was processing or nil if the data was not requested. Processed(data interface{}) *Request // EnqueuePending enqueues all pending requests back into the queue. // It also discards requests in the pending set of which their enqueue time is over the given delta threshold. // If discardOlderThan is zero, no requests are discarded. EnqueuePending(discardOlderThan time.Duration) (queued int) // Size returns the size of currently queued, requested/pending and processing requests. Size() (queued int, pending int, processing int) // Empty tells whether the queue has no queued and pending requests. Empty() bool // Requests returns a snapshot of all queued, pending and processing requests in the queue. Requests() (queued []*Request, pending []*Request, processing []*Request) // AvgLatency returns the average latency of enqueueing and then receiving a request. AvgLatency() int64 // Filter adds the given filter function to the queue. Passing nil resets the current one. // Setting a filter automatically clears all queued and pending requests which do not fulfill // the filter criteria. Filter(f FilterFunc) }
RequestQueue implements a queue which contains requests for needed data.
func NewRequestQueue ¶
func NewRequestQueue(latencyResolution ...int32) RequestQueue
NewRequestQueue creates a new RequestQueue where request are prioritized over their milestone index (lower = higher priority).
type RequestType ¶
type RequestType int
RequestType is the type of request.
const ( RequestTypeBlockID RequestType = iota RequestTypeMilestoneIndex )
type Requester ¶
type Requester struct {
// contains filtered or unexported fields
}
Requester handles requesting packets.
func NewRequester ¶
func NewRequester( dbStorage *storage.Storage, service *Service, rQueue RequestQueue, opts ...RequesterOption) *Requester
NewRequester creates a new Requester.
func (*Requester) AddBackPressureFunc ¶
func (r *Requester) AddBackPressureFunc(pressureFunc RequestBackPressureFunc)
AddBackPressureFunc adds a RequestBackPressureFunc. This function can be called multiple times to add additional RequestBackPressureFunc. Calling this function after any Requester worker has been started results in a panic.
func (*Requester) Request ¶
func (r *Requester) Request(data interface{}, msIndex iotago.MilestoneIndex, preventDiscard ...bool) bool
Request enqueues a request to the request queue for the given block if it isn't a solid entry point and is not contained in the database already.
func (*Requester) RequestMilestoneParents ¶
func (r *Requester) RequestMilestoneParents(cachedMilestone *storage.CachedMilestone) bool
RequestMilestoneParents enqueues requests for the parents of the given milestone to the request queue, if the parents are not solid entry points and not already in the database.
func (*Requester) RequestMultiple ¶
func (r *Requester) RequestMultiple(blockIDs iotago.BlockIDs, msIndex iotago.MilestoneIndex, preventDiscard ...bool) int
RequestMultiple works like Request but takes multiple block IDs.
func (*Requester) RequestParents ¶
func (r *Requester) RequestParents(cachedBlock *storage.CachedBlock, msIndex iotago.MilestoneIndex, preventDiscard ...bool)
RequestParents enqueues requests for the parents of the given block to the request queue, if the given block is not a solid entry point and neither its parents are and also not in the database.
func (*Requester) RunPendingRequestEnqueuer ¶
RunPendingRequestEnqueuer runs the loop to periodically re-request pending requests from the RequestQueue.
func (*Requester) RunRequestQueueDrainer ¶
RunRequestQueueDrainer runs the RequestQueue drainer.
type RequesterOption ¶
type RequesterOption func(options *RequesterOptions)
RequesterOption is a function which sets an option on a RequesterOptions instance.
func WithRequesterDiscardRequestsOlderThan ¶
func WithRequesterDiscardRequestsOlderThan(dur time.Duration) RequesterOption
WithRequesterDiscardRequestsOlderThan sets the threshold for the max age of requests.
func WithRequesterPendingRequestReEnqueueInterval ¶
func WithRequesterPendingRequestReEnqueueInterval(dur time.Duration) RequesterOption
WithRequesterPendingRequestReEnqueueInterval sets the re-enqueue interval for pending requests.
type RequesterOptions ¶
type RequesterOptions struct { // Defines the re-queue interval for pending requests. PendingRequestReEnqueueInterval time.Duration // Defines the max age for requests. DiscardRequestsOlderThan time.Duration }
RequesterOptions are options around a Requester.
type Requests ¶
type Requests []*Request
func (Requests) HasRequest ¶
HasRequest returns true if Requests contains a Request.
type Service ¶
type Service struct { // the logger used to log events. *logger.WrappedLogger // Events happening around a Service. Events *ServiceEvents // contains filtered or unexported fields }
Service handles ongoing gossip streams.
func NewService ¶
func NewService( protocol protocol.ID, host host.Host, peeringManager *p2p.Manager, serverMetrics *metrics.ServerMetrics, opts ...ServiceOption) *Service
NewService creates a new Service.
func (*Service) CloseStream ¶
CloseStream closes an ongoing stream with a peer.
func (*Service) ForEach ¶
func (s *Service) ForEach(f ProtocolForEachFunc)
ForEach calls the given ProtocolForEachFunc on each Protocol.
func (*Service) SynchronizedCount ¶
func (s *Service) SynchronizedCount(latestMilestoneIndex iotago.MilestoneIndex) int
SynchronizedCount returns the count of streams with peers which appear to be synchronized given their latest Heartbeat message.
type ServiceEvents ¶
type ServiceEvents struct { // Fired when a protocol has been started. ProtocolStarted *events.Event // Fired when a protocol has ended. ProtocolTerminated *events.Event // Fired when an inbound stream gets canceled. InboundStreamCanceled *events.Event // Fired when an internal error happens. Error *events.Event }
ServiceEvents are events happening around a Service.
type ServiceOption ¶
type ServiceOption func(opts *ServiceOptions)
ServiceOption is a function setting a ServiceOptions option.
func WithLogger ¶
func WithLogger(logger *logger.Logger) ServiceOption
WithLogger enables logging within the Service.
func WithSendQueueSize ¶
func WithSendQueueSize(size int) ServiceOption
WithSendQueueSize defines the size of send queues on ongoing gossip protocol streams.
func WithStreamConnectTimeout ¶
func WithStreamConnectTimeout(dur time.Duration) ServiceOption
WithStreamConnectTimeout defines the timeout for creating a gossip protocol stream.
func WithStreamReadTimeout ¶
func WithStreamReadTimeout(dur time.Duration) ServiceOption
WithStreamReadTimeout defines the read timeout for reading from a stream.
func WithStreamWriteTimeout ¶
func WithStreamWriteTimeout(dur time.Duration) ServiceOption
WithStreamWriteTimeout defines the write timeout for writing to a stream.
func WithUnknownPeersLimit ¶
func WithUnknownPeersLimit(limit int) ServiceOption
WithUnknownPeersLimit defines how many peers with an unknown relation are allowed to have an ongoing gossip protocol stream.
type ServiceOptions ¶
type ServiceOptions struct {
// contains filtered or unexported fields
}
ServiceOptions define options for a Service.
type StreamCancelReason ¶
type StreamCancelReason string
StreamCancelReason is a reason for a gossip stream cancellation.
const ( // StreamCancelReasonDuplicated defines a stream cancellation because // it would lead to a duplicated ongoing stream. StreamCancelReasonDuplicated StreamCancelReason = "duplicated stream" // StreamCancelReasonInsufficientPeerRelation defines a stream cancellation because // the relation to the other peer is insufficient. StreamCancelReasonInsufficientPeerRelation StreamCancelReason = "insufficient peer relation" // StreamCancelReasonNoUnknownPeerSlotAvailable defines a stream cancellation // because no more unknown peers slot were available. StreamCancelReasonNoUnknownPeerSlotAvailable StreamCancelReason = "no unknown peer slot available" )
type WarpSync ¶
type WarpSync struct { sync.Mutex // The used advancement range per checkpoint. AdvancementRange syncmanager.MilestoneIndexDelta // The Events of the warpsync. Events *Events // The current confirmed milestone of the node. CurrentConfirmedMilestone iotago.MilestoneIndex // The starting time of the synchronization. StartTime time.Time // The starting point of the synchronization. InitMilestone iotago.MilestoneIndex // The target milestone to which to synchronize to. TargetMilestone iotago.MilestoneIndex // The previous checkpoint of the synchronization. PreviousCheckpoint iotago.MilestoneIndex // The current checkpoint of the synchronization. CurrentCheckpoint iotago.MilestoneIndex // contains filtered or unexported fields }
WarpSync is metadata about doing a synchronization via STING messages.
func NewWarpSync ¶
func NewWarpSync(advRange int, advanceCheckpointCriteriaFunc ...AdvanceCheckpointCriteria) *WarpSync
NewWarpSync creates a new WarpSync instance with the given advancement range and criteria func. If no advancement func is provided, the WarpSync uses AdvanceAtPercentageReached with DefaultAdvancementThreshold.
func (*WarpSync) AddReferencedBlocksCount ¶
AddReferencedBlocksCount adds the amount of referenced blocks to collect stats.
func (*WarpSync) UpdateCurrentConfirmedMilestone ¶
func (ws *WarpSync) UpdateCurrentConfirmedMilestone(current iotago.MilestoneIndex)
UpdateCurrentConfirmedMilestone updates the current confirmed milestone index state.
func (*WarpSync) UpdateTargetMilestone ¶
func (ws *WarpSync) UpdateTargetMilestone(target iotago.MilestoneIndex)
UpdateTargetMilestone updates the synchronization target if it is higher than the current one and triggers a synchronization start if the target was set for the first time.
type WarpSyncMilestoneRequester ¶
type WarpSyncMilestoneRequester struct { syncutils.Mutex // contains filtered or unexported fields }
WarpSyncMilestoneRequester walks the cones of existing but non-solid milestones and memoizes already walked blocks and milestones.
func NewWarpSyncMilestoneRequester ¶
func NewWarpSyncMilestoneRequester( dbStorage *storage.Storage, syncManager *syncmanager.SyncManager, requester *Requester, preventDiscard bool) *WarpSyncMilestoneRequester
NewWarpSyncMilestoneRequester creates a new WarpSyncMilestoneRequester instance.
func (*WarpSyncMilestoneRequester) Cleanup ¶
func (w *WarpSyncMilestoneRequester) Cleanup()
Cleanup cleans up traversed blocks to free memory.
func (*WarpSyncMilestoneRequester) RequestMilestoneRange ¶
func (w *WarpSyncMilestoneRequester) RequestMilestoneRange(ctx context.Context, rangeToRequest syncmanager.MilestoneIndexDelta, onExistingMilestoneInRange func(ctx context.Context, msIndex iotago.MilestoneIndex) error, from ...iotago.MilestoneIndex) syncmanager.MilestoneIndexDelta
RequestMilestoneRange requests up to N milestones nearest to the current confirmed milestone index. Returns the number of milestones requested.
func (*WarpSyncMilestoneRequester) RequestMissingMilestoneParents ¶
func (w *WarpSyncMilestoneRequester) RequestMissingMilestoneParents(ctx context.Context, msIndex iotago.MilestoneIndex) error
RequestMissingMilestoneParents traverses the parents of a given milestone and requests each missing parent. Already requested milestones or traversed blocks will be ignored, to circumvent requesting the same parents multiple times.
type WorkUnit ¶
type WorkUnit struct { objectstorage.StorableObjectFlags // contains filtered or unexported fields }
WorkUnit defines the work around processing a received block and its associated requests from peers. There is at most one WorkUnit active per same block bytes.
func (*WorkUnit) Is ¶
func (wu *WorkUnit) Is(state WorkUnitState) bool
Is tells whether the WorkUnit has the given state.
func (*WorkUnit) ObjectStorageKey ¶
func (*WorkUnit) ObjectStorageValue ¶
func (*WorkUnit) Update ¶
func (wu *WorkUnit) Update(_ objectstorage.StorableObject)
func (*WorkUnit) UpdateState ¶
func (wu *WorkUnit) UpdateState(state WorkUnitState)
UpdateState updates the WorkUnit's state.
type WorkUnitState ¶
type WorkUnitState byte
WorkUnitState defines the state which a WorkUnit is in.
const ( Hashing WorkUnitState = 1 << 0 Invalid WorkUnitState = 1 << 1 Hashed WorkUnitState = 1 << 2 )