Documentation ¶
Index ¶
- Constants
- Variables
- func BroadcastCaller(handler interface{}, params ...interface{})
- func CheckpointCaller(handler interface{}, params ...interface{})
- func ExtractRequestedMilestoneIndex(source []byte) (milestone.Index, error)
- func HeartbeatCaller(handler interface{}, params ...interface{})
- func MessageProcessedCaller(handler interface{}, params ...interface{})
- func NewHeartbeatMsg(solidMilestoneIndex milestone.Index, prunedMilestoneIndex milestone.Index, ...) ([]byte, error)
- func NewMessageMsg(msgData []byte) ([]byte, error)
- func NewMessageRequestMsg(requestedMessageID hornet.MessageID) ([]byte, error)
- func NewMilestoneRequestMsg(requestedMilestoneIndex milestone.Index) ([]byte, error)
- func ProtocolCaller(handler interface{}, params ...interface{})
- func StreamCaller(handler interface{}, params ...interface{})
- func StreamCancelCaller(handler interface{}, params ...interface{})
- func SyncDoneCaller(handler interface{}, params ...interface{})
- func SyncStartCaller(handler interface{}, params ...interface{})
- func TargetCaller(handler interface{}, params ...interface{})
- type AdvanceCheckpointCriteria
- type Broadcast
- type Broadcaster
- type CachedWorkUnit
- type Events
- type FilterFunc
- type Heartbeat
- type Info
- type MessageProcessor
- func (proc *MessageProcessor) Broadcast(cachedMsgMeta *storage.CachedMetadata)
- func (proc *MessageProcessor) Emit(msg *storage.Message) error
- func (proc *MessageProcessor) Process(p *Protocol, msgType message.Type, data []byte)
- func (proc *MessageProcessor) Run(ctx context.Context)
- func (proc *MessageProcessor) Shutdown()
- func (proc *MessageProcessor) WorkUnitsSize() int
- type MessageProcessorEvents
- type Metrics
- type MetricsSnapshot
- type Options
- type Protocol
- func (p *Protocol) CouldHaveDataForMilestone(index milestone.Index) bool
- func (p *Protocol) Enqueue(data []byte)
- func (p *Protocol) HasDataForMilestone(index milestone.Index) bool
- func (p *Protocol) Info() *Info
- func (p *Protocol) IsSynced(cmi milestone.Index) bool
- func (p *Protocol) Read(buf []byte) (int, error)
- func (p *Protocol) Send(message []byte) error
- func (p *Protocol) SendHeartbeat(solidMsIndex milestone.Index, pruningMsIndex milestone.Index, ...)
- func (p *Protocol) SendLatestMilestoneRequest()
- func (p *Protocol) SendMessage(msgData []byte)
- func (p *Protocol) SendMessageRequest(requestedMessageID hornet.MessageID)
- func (p *Protocol) SendMilestoneRequest(index milestone.Index)
- func (p *Protocol) Terminated() <-chan struct{}
- type ProtocolEvents
- type ProtocolForEachFunc
- type Request
- type RequestBackPressureFunc
- type RequestQueue
- type RequestType
- type Requester
- func (r *Requester) AddBackPressureFunc(pressureFunc RequestBackPressureFunc)
- func (r *Requester) Request(data interface{}, msIndex milestone.Index, preventDiscard ...bool) bool
- func (r *Requester) RequestMilestoneParents(cachedMilestone *storage.CachedMilestone) bool
- func (r *Requester) RequestMultiple(messageIDs hornet.MessageIDs, msIndex milestone.Index, preventDiscard ...bool) int
- func (r *Requester) RequestParents(cachedMsg *storage.CachedMessage, msIndex milestone.Index, ...)
- func (r *Requester) RunPendingRequestEnqueuer(ctx context.Context)
- func (r *Requester) RunRequestQueueDrainer(ctx context.Context)
- type RequesterOption
- type RequesterOptions
- type Requests
- type Service
- type ServiceEvents
- type ServiceOption
- func WithLogger(logger *logger.Logger) ServiceOption
- func WithSendQueueSize(size int) ServiceOption
- func WithStreamConnectTimeout(dur time.Duration) ServiceOption
- func WithStreamReadTimeout(dur time.Duration) ServiceOption
- func WithStreamWriteTimeout(dur time.Duration) ServiceOption
- func WithUnknownPeersLimit(limit int) ServiceOption
- type ServiceOptions
- type StreamCancelReason
- type WarpSync
- type WarpSyncMilestoneRequester
- type WorkUnit
- type WorkUnitState
Constants ¶
const ( MessageTypeMilestoneRequest message.Type = 1 MessageTypeMessage message.Type = 2 MessageTypeMessageRequest message.Type = 3 MessageTypeHeartbeat message.Type = 4 )
const ( // The amount of bytes used for the requested message ID. RequestedMessageIDMsgBytesLength = 32 // The amount of bytes used for the requested milestone index. RequestedMilestoneIndexMsgBytesLength = 4 // The amount of bytes used for a milestone index within a heartbeat packet. HeartbeatMilestoneIndexBytesLength = 4 // The index to use to request the latest milestone via a milestone request message. LatestMilestoneRequestIndex = 0 )
const DefaultAdvancementThreshold = 0.0
DefaultAdvancementThreshold is the default threshold at which a checkpoint advancement is done. Per default an advancement is always done as soon the confirmed milestone enters the range between the previous and current checkpoint.
const DefaultLatencyResolution = 100
const FeatureSetName = "Chrysalis-Pt2"
FeatureSetName is the name of the feature set.
const MinimumVersion = 1
MinimumVersion denotes the minimum version for Chrysalis-Pt2 support.
const (
WorkerQueueSize = 50000
)
Variables ¶
var ( ErrInvalidTimestamp = errors.New("invalid timestamp") ErrMessageNotSolid = errors.New("msg is not solid") ErrMessageBelowMaxDepth = errors.New("msg is below max depth") )
var ( // MessageMessageDefinition defines a message message's format. MessageMessageDefinition = &message.Definition{ ID: MessageTypeMessage, MaxBytesLength: iotago.MessageBinSerializedMaxSize, VariableLength: true, } // The requested message ID gossipping packet. // Contains only an ID of a requested message payload. MessageRequestMessageDefinition = &message.Definition{ ID: MessageTypeMessageRequest, MaxBytesLength: RequestedMessageIDMsgBytesLength, VariableLength: false, } // The heartbeat packet containing the current solid, pruned and latest milestone index, // number of connected peers and number of synced peers. HeartbeatMessageDefinition = &message.Definition{ ID: MessageTypeHeartbeat, MaxBytesLength: HeartbeatMilestoneIndexBytesLength*3 + 2, VariableLength: false, } // The requested milestone index packet. MilestoneRequestMessageDefinition = &message.Definition{ ID: MessageTypeMilestoneRequest, MaxBytesLength: RequestedMilestoneIndexMsgBytesLength, VariableLength: false, } )
var ( // ErrInvalidSourceLength is returned when an invalid source byte slice for extraction of certain data is passed. ErrInvalidSourceLength = errors.New("invalid source byte slice") )
var (
ErrProtocolDoesNotExist = errors.New("stream/protocol does not exist")
)
var (
ErrUnknownRequestType = errors.New("unknown request type")
)
Functions ¶
func BroadcastCaller ¶
func BroadcastCaller(handler interface{}, params ...interface{})
func CheckpointCaller ¶
func CheckpointCaller(handler interface{}, params ...interface{})
func ExtractRequestedMilestoneIndex ¶
ExtractRequestedMilestoneIndex extracts the requested milestone index from the given source.
func HeartbeatCaller ¶
func HeartbeatCaller(handler interface{}, params ...interface{})
func MessageProcessedCaller ¶
func MessageProcessedCaller(handler interface{}, params ...interface{})
func NewHeartbeatMsg ¶
func NewHeartbeatMsg(solidMilestoneIndex milestone.Index, prunedMilestoneIndex milestone.Index, latestMilestoneIndex milestone.Index, connectedPeers uint8, syncedPeers uint8) ([]byte, error)
NewHeartbeatMsg creates a new heartbeat message.
func NewMessageMsg ¶
NewMessageMsg creates a new message message.
func NewMessageRequestMsg ¶
NewMessageRequestMsg creates a message request message.
func NewMilestoneRequestMsg ¶
NewMilestoneRequestMsg creates a new milestone request message.
func ProtocolCaller ¶
func ProtocolCaller(handler interface{}, params ...interface{})
ProtocolCaller gets called with a Protocol.
func StreamCaller ¶
func StreamCaller(handler interface{}, params ...interface{})
StreamCaller gets called with a network.Stream.
func StreamCancelCaller ¶
func StreamCancelCaller(handler interface{}, params ...interface{})
StreamCancelCaller gets called with a network.Stream and its cancel reason.
func SyncDoneCaller ¶
func SyncDoneCaller(handler interface{}, params ...interface{})
func SyncStartCaller ¶
func SyncStartCaller(handler interface{}, params ...interface{})
func TargetCaller ¶
func TargetCaller(handler interface{}, params ...interface{})
Types ¶
type AdvanceCheckpointCriteria ¶
type AdvanceCheckpointCriteria func(currentConfirmed, previousCheckpoint, currentCheckpoint milestone.Index) bool
AdvanceCheckpointCriteria is a function which determines whether the checkpoint should be advanced.
func AdvanceAtPercentageReached ¶
func AdvanceAtPercentageReached(threshold float64) AdvanceCheckpointCriteria
AdvanceAtPercentageReached is an AdvanceCheckpointCriteria which advances the checkpoint when the current one was reached by >=X% by the current confirmed milestone in relation to the previous checkpoint.
type Broadcast ¶
type Broadcast struct { // The message data to broadcast. MsgData []byte // The IDs of the peers to exclude from broadcasting. ExcludePeers map[peer.ID]struct{} }
Broadcast defines a message which should be broadcasted.
type Broadcaster ¶
type Broadcaster struct {
// contains filtered or unexported fields
}
Broadcaster provides functions to broadcast data to gossip streams.
func NewBroadcaster ¶
func NewBroadcaster( dbStorage *storage.Storage, syncManager *syncmanager.SyncManager, peeringManager *p2p.Manager, service *Service, broadcastQueueSize int) *Broadcaster
NewBroadcaster creates a new Broadcaster.
func (*Broadcaster) Broadcast ¶
func (b *Broadcaster) Broadcast(broadcast *Broadcast)
Broadcast broadcasts the given Broadcast.
func (*Broadcaster) BroadcastHeartbeat ¶
func (b *Broadcaster) BroadcastHeartbeat(filter func(proto *Protocol) bool)
BroadcastHeartbeat broadcasts a heartbeat message to every peer.
func (*Broadcaster) RunBroadcastQueueDrainer ¶
func (b *Broadcaster) RunBroadcastQueueDrainer(ctx context.Context)
RunBroadcastQueueDrainer runs the broadcast queue drainer.
type CachedWorkUnit ¶
type CachedWorkUnit struct {
objectstorage.CachedObject
}
CachedWorkUnit represents a cached WorkUnit.
func (*CachedWorkUnit) WorkUnit ¶
func (c *CachedWorkUnit) WorkUnit() *WorkUnit
WorkUnit retrieves the work unit, that is cached in this container.
type Events ¶
type Events struct { // Fired when a new set of milestones should be requested. CheckpointUpdated *events.Event // Fired when the target milestone is updated. TargetUpdated *events.Event // Fired when warp synchronization starts. Start *events.Event // Fired when the warp synchronization is done. Done *events.Event }
Events holds WarpSync related events.
type FilterFunc ¶
FilterFunc is a function which determines whether a request should be enqueued or not.
type Heartbeat ¶
type Heartbeat struct { SolidMilestoneIndex milestone.Index `json:"solidMilestoneIndex"` PrunedMilestoneIndex milestone.Index `json:"prunedMilestoneIndex"` LatestMilestoneIndex milestone.Index `json:"latestMilestoneIndex"` ConnectedNeighbors int `json:"connectedNeighbors"` SyncedNeighbors int `json:"syncedNeighbors"` }
Heartbeat contains information about a nodes current solid and pruned milestone index and its connected and synced neighbors count.
func ParseHeartbeat ¶
ParseHeartbeat parses the given message into a heartbeat.
type Info ¶
type Info struct { Heartbeat *Heartbeat `json:"heartbeat"` Metrics MetricsSnapshot `json:"metrics"` }
Info represents information about an ongoing gossip protocol.
type MessageProcessor ¶
type MessageProcessor struct { // events of the message processor. Events *MessageProcessorEvents // contains filtered or unexported fields }
MessageProcessor processes submitted messages in parallel and fires appropriate completion events.
func NewMessageProcessor ¶
func NewMessageProcessor( dbStorage *storage.Storage, syncManager *syncmanager.SyncManager, requestQueue RequestQueue, peeringManager *p2p.Manager, serverMetrics *metrics.ServerMetrics, opts *Options) (*MessageProcessor, error)
NewMessageProcessor creates a new processor which parses messages.
func (*MessageProcessor) Broadcast ¶
func (proc *MessageProcessor) Broadcast(cachedMsgMeta *storage.CachedMetadata)
func (*MessageProcessor) Emit ¶
func (proc *MessageProcessor) Emit(msg *storage.Message) error
Emit triggers MessageProcessed and BroadcastMessage events for the given message. All messages passed to this function must be checked with "DeSeriModePerformValidation" before. We also check if the parents are solid and not BMD before we broadcast the message, otherwise this message would be seen as invalid gossip by other peers.
func (*MessageProcessor) Process ¶
func (proc *MessageProcessor) Process(p *Protocol, msgType message.Type, data []byte)
Process submits the given message to the processor for processing.
func (*MessageProcessor) Run ¶
func (proc *MessageProcessor) Run(ctx context.Context)
Run runs the processor and blocks until the shutdown signal is triggered.
func (*MessageProcessor) Shutdown ¶
func (proc *MessageProcessor) Shutdown()
Shutdown signals the internal worker pool and object storage to shut down and sets the shutdown flag.
func (*MessageProcessor) WorkUnitsSize ¶
func (proc *MessageProcessor) WorkUnitsSize() int
WorkUnitsSize returns the size of WorkUnits currently cached.
type MessageProcessorEvents ¶
type MessageProcessorEvents struct { // Fired when a message was fully processed. MessageProcessed *events.Event // Fired when a message is meant to be broadcasted. BroadcastMessage *events.Event }
MessageProcessorEvents are the events fired by the MessageProcessor.
type Metrics ¶
type Metrics struct { // The number of received messages which are new. NewMessages atomic.Uint32 // The number of received messages which are already known. KnownMessages atomic.Uint32 // The number of received messages. ReceivedMessages atomic.Uint32 // The number of received message requests. ReceivedMessageRequests atomic.Uint32 // The number of received milestone requests. ReceivedMilestoneRequests atomic.Uint32 // The number of received heartbeats. ReceivedHeartbeats atomic.Uint32 // The number of sent packets. SentPackets atomic.Uint32 // The number of sent messages. SentMessages atomic.Uint32 // The number of sent message requests. SentMessageRequests atomic.Uint32 // The number of sent milestone requests. SentMilestoneRequests atomic.Uint32 // The number of sent heartbeats. SentHeartbeats atomic.Uint32 // The number of dropped packets. DroppedPackets atomic.Uint32 }
Metrics defines a set of metrics regarding a gossip protocol instance.
func (*Metrics) Snapshot ¶
func (m *Metrics) Snapshot() MetricsSnapshot
Snapshot returns MetricsSnapshot of the Metrics.
type MetricsSnapshot ¶
type MetricsSnapshot struct { NewMessages uint32 `json:"newMessages"` KnownMessages uint32 `json:"knownMessages"` ReceivedMessages uint32 `json:"receivedMessages"` ReceivedMessageReq uint32 `json:"receivedMessageRequests"` ReceivedMilestoneReq uint32 `json:"receivedMilestoneRequests"` ReceivedHeartbeats uint32 `json:"receivedHeartbeats"` SentMessages uint32 `json:"sentMessages"` SentMessageReq uint32 `json:"sentMessageRequests"` SentMilestoneReq uint32 `json:"sentMilestoneRequests"` SentHeartbeats uint32 `json:"sentHeartbeats"` DroppedPackets uint32 `json:"droppedPackets"` }
MetricsSnapshot represents a snapshot of the gossip protocol metrics.
type Options ¶
type Options struct { MinPoWScore float64 NetworkID uint64 BelowMaxDepth milestone.Index WorkUnitCacheOpts *profile.CacheOpts }
The Options for the MessageProcessor.
type Protocol ¶
type Protocol struct { // Parser parses gossip messages and emits received events for them. Parser *protocol.Protocol // The ID of the peer to which this protocol is associated to. PeerID peer.ID // The underlying stream for this Protocol. Stream network.Stream // The events surrounding a Protocol. Events *ProtocolEvents // The peer's latest heartbeat message. LatestHeartbeat *Heartbeat // Time the last heartbeat was received. HeartbeatReceivedTime time.Time // Time the last heartbeat was sent. HeartbeatSentTime time.Time // The send queue into which to enqueue messages to send. SendQueue chan []byte // The metrics around this protocol instance. Metrics Metrics // The shared server metrics instance. ServerMetrics *metrics.ServerMetrics // contains filtered or unexported fields }
Protocol represents an instance of the gossip protocol.
func NewProtocol ¶
func NewProtocol(peerID peer.ID, stream network.Stream, sendQueueSize int, readTimeout, writeTimeout time.Duration, serverMetrics *metrics.ServerMetrics) *Protocol
NewProtocol creates a new gossip protocol instance associated to the given peer.
func (*Protocol) CouldHaveDataForMilestone ¶
CouldHaveDataForMilestone tells whether the underlying peer given the latest heartbeat message, could have parts of the cone data for the given milestone. Returns false if no heartbeat message was received yet.
func (*Protocol) Enqueue ¶
Enqueue enqueues the given gossip protocol message to be sent to the peer. If it can't because the send queue is over capacity, the message gets dropped.
func (*Protocol) HasDataForMilestone ¶
HasDataForMilestone tells whether the underlying peer given the latest heartbeat message, has the cone data for the given milestone. Returns false if no heartbeat message was received yet.
func (*Protocol) SendHeartbeat ¶
func (p *Protocol) SendHeartbeat(solidMsIndex milestone.Index, pruningMsIndex milestone.Index, latestMsIndex milestone.Index, connectedNeighbors uint8, syncedNeighbors uint8)
SendHeartbeat sends a Heartbeat to the given peer.
func (*Protocol) SendLatestMilestoneRequest ¶
func (p *Protocol) SendLatestMilestoneRequest()
SendLatestMilestoneRequest sends a storage.Milestone request which requests the latest known milestone from the given peer.
func (*Protocol) SendMessage ¶
SendMessage sends a storage.Message to the given peer.
func (*Protocol) SendMessageRequest ¶
SendMessageRequest sends a storage.Message request message to the given peer.
func (*Protocol) SendMilestoneRequest ¶
SendMilestoneRequest sends a storage.Milestone request to the given peer.
func (*Protocol) Terminated ¶ added in v1.2.0
func (p *Protocol) Terminated() <-chan struct{}
Terminated returns a channel that is closed if the protocol was terminated.
type ProtocolEvents ¶
type ProtocolEvents struct { // Fired when the heartbeat message state on the peer has been updated. HeartbeatUpdated *events.Event // Fired when a message of the given type is sent. // This exists solely because protocol.Protocol in hive.go doesn't // emit events anymore for sent messages, as it is solely a parser. Sent []*events.Event // Fired when an error occurs on the protocol. Errors *events.Event }
ProtocolEvents happening on a Protocol.
type ProtocolForEachFunc ¶
ProtocolForEachFunc is used in Service.ForEach. Returning false indicates to stop looping. This function must not call any methods on Service.
type Request ¶
type Request struct { // The type of the request. RequestType RequestType // The MessageID of the message to request. MessageID hornet.MessageID // The milestone index under which this request is linked. MilestoneIndex milestone.Index // Tells the request queue to not remove this request if the enqueue time is // over the given threshold. PreventDiscard bool // the time at which this request was first enqueued. // do not modify this time EnqueueTime time.Time }
Request is a request for a particular message.
func NewMessageIDRequest ¶ added in v1.1.0
NewMessageIDRequest creates a new message request for a specific messageID.
func NewMilestoneIndexRequest ¶ added in v1.1.0
NewMilestoneIndexRequest creates a new message request for a specific milestone index
type RequestBackPressureFunc ¶
type RequestBackPressureFunc func() bool
RequestBackPressureFunc is a function which tells the Requester to stop requesting more data.
type RequestQueue ¶
type RequestQueue interface { // Next returns the next request to send, pops it from the queue and marks it as pending. Next() *Request // Peek returns the next request to send without popping it from the queue. Peek() *Request // Enqueue enqueues the given request if it isn't already queued or pending. Enqueue(*Request) (enqueued bool) // IsQueued tells whether a given request for the given data is queued. IsQueued(data interface{}) bool // IsPending tells whether a given request was popped from the queue and is now pending. IsPending(data interface{}) bool // IsProcessing tells whether a given request was popped from the queue, received and is now processing. IsProcessing(data interface{}) bool // Received marks a request as received and thereby removes it from the pending set. // It is added to the processing set. // Returns the origin request which was pending or nil if the data was not requested. Received(data interface{}) *Request // Processed marks a request as fulfilled and thereby removes it from the processing set. // Returns the origin request which was processing or nil if the data was not requested. Processed(data interface{}) *Request // EnqueuePending enqueues all pending requests back into the queue. // It also discards requests in the pending set of which their enqueue time is over the given delta threshold. // If discardOlderThan is zero, no requests are discarded. EnqueuePending(discardOlderThan time.Duration) (queued int) // Size returns the size of currently queued, requested/pending and processing requests. Size() (queued int, pending int, processing int) // Empty tells whether the queue has no queued and pending requests. Empty() bool // Requests returns a snapshot of all queued, pending and processing requests in the queue. Requests() (queued []*Request, pending []*Request, processing []*Request) // AvgLatency returns the average latency of enqueueing and then receiving a request. AvgLatency() int64 // Filter adds the given filter function to the queue. Passing nil resets the current one. // Setting a filter automatically clears all queued and pending requests which do not fulfill // the filter criteria. Filter(f FilterFunc) }
RequestQueue implements a queue which contains requests for needed data.
func NewRequestQueue ¶
func NewRequestQueue(latencyResolution ...int32) RequestQueue
NewRequestQueue creates a new RequestQueue where request are prioritized over their milestone index (lower = higher priority).
type RequestType ¶ added in v1.1.0
type RequestType int
RequestType is the type of request.
const ( RequestTypeMessageID RequestType = iota RequestTypeMilestoneIndex )
type Requester ¶
type Requester struct {
// contains filtered or unexported fields
}
Requester handles requesting packets.
func NewRequester ¶
func NewRequester( dbStorage *storage.Storage, service *Service, rQueue RequestQueue, opts ...RequesterOption) *Requester
NewRequester creates a new Requester.
func (*Requester) AddBackPressureFunc ¶
func (r *Requester) AddBackPressureFunc(pressureFunc RequestBackPressureFunc)
AddBackPressureFunc adds a RequestBackPressureFunc. This function can be called multiple times to add additional RequestBackPressureFunc. Calling this function after any Requester worker has been started results in a panic.
func (*Requester) Request ¶
Request enqueues a request to the request queue for the given message if it isn't a solid entry point and is not contained in the database already.
func (*Requester) RequestMilestoneParents ¶
func (r *Requester) RequestMilestoneParents(cachedMilestone *storage.CachedMilestone) bool
RequestMilestoneParents enqueues requests for the parents of the given milestone to the request queue, if the parents are not solid entry points and not already in the database.
func (*Requester) RequestMultiple ¶
func (r *Requester) RequestMultiple(messageIDs hornet.MessageIDs, msIndex milestone.Index, preventDiscard ...bool) int
RequestMultiple works like Request but takes multiple message IDs.
func (*Requester) RequestParents ¶
func (r *Requester) RequestParents(cachedMsg *storage.CachedMessage, msIndex milestone.Index, preventDiscard ...bool)
RequestParents enqueues requests for the parents of the given message to the request queue, if the given message is not a solid entry point and neither its parents are and also not in the database.
func (*Requester) RunPendingRequestEnqueuer ¶
RunPendingRequestEnqueuer runs the loop to periodically re-request pending requests from the RequestQueue.
func (*Requester) RunRequestQueueDrainer ¶
RunRequestQueueDrainer runs the RequestQueue drainer.
type RequesterOption ¶
type RequesterOption func(options *RequesterOptions)
RequesterOption is a function which sets an option on a RequesterOptions instance.
func WithRequesterDiscardRequestsOlderThan ¶
func WithRequesterDiscardRequestsOlderThan(dur time.Duration) RequesterOption
WithRequesterDiscardRequestsOlderThan sets the threshold for the max age of requests.
func WithRequesterPendingRequestReEnqueueInterval ¶
func WithRequesterPendingRequestReEnqueueInterval(dur time.Duration) RequesterOption
WithRequesterPendingRequestReEnqueueInterval sets the re-enqueue interval for pending requests.
type RequesterOptions ¶
type RequesterOptions struct { // Defines the re-queue interval for pending requests. PendingRequestReEnqueueInterval time.Duration // Defines the max age for requests. DiscardRequestsOlderThan time.Duration }
RequesterOptions are options around a Requester.
type Requests ¶ added in v1.1.0
type Requests []*Request
func (Requests) HasRequest ¶ added in v1.1.0
HasRequest returns true if Requests contains a Request.
type Service ¶
type Service struct { // the logger used to log events. *utils.WrappedLogger // Events happening around a Service. Events *ServiceEvents // contains filtered or unexported fields }
Service handles ongoing gossip streams.
func NewService ¶
func NewService( protocol protocol.ID, host host.Host, peeringManager *p2p.Manager, serverMetrics *metrics.ServerMetrics, opts ...ServiceOption) *Service
NewService creates a new Service.
func (*Service) CloseStream ¶ added in v1.2.0
CloseStream closes an ongoing stream with a peer.
func (*Service) ForEach ¶
func (s *Service) ForEach(f ProtocolForEachFunc)
ForEach calls the given ProtocolForEachFunc on each Protocol.
type ServiceEvents ¶
type ServiceEvents struct { // Fired when a protocol has been started. ProtocolStarted *events.Event // Fired when a protocol has ended. ProtocolTerminated *events.Event // Fired when an inbound stream gets canceled. InboundStreamCancelled *events.Event // Fired when an internal error happens. Error *events.Event }
ServiceEvents are events happening around a Service.
type ServiceOption ¶
type ServiceOption func(opts *ServiceOptions)
ServiceOption is a function setting a ServiceOptions option.
func WithLogger ¶
func WithLogger(logger *logger.Logger) ServiceOption
WithLogger enables logging within the Service.
func WithSendQueueSize ¶
func WithSendQueueSize(size int) ServiceOption
WithSendQueueSize defines the size of send queues on ongoing gossip protocol streams.
func WithStreamConnectTimeout ¶
func WithStreamConnectTimeout(dur time.Duration) ServiceOption
WithStreamConnectTimeout defines the timeout for creating a gossip protocol stream.
func WithStreamReadTimeout ¶
func WithStreamReadTimeout(dur time.Duration) ServiceOption
WithStreamReadTimeout defines the read timeout for reading from a stream.
func WithStreamWriteTimeout ¶
func WithStreamWriteTimeout(dur time.Duration) ServiceOption
WithStreamWriteTimeout defines the write timeout for writing to a stream.
func WithUnknownPeersLimit ¶
func WithUnknownPeersLimit(limit int) ServiceOption
WithUnknownPeersLimit defines how many peers with an unknown relation are allowed to have an ongoing gossip protocol stream.
type ServiceOptions ¶
type ServiceOptions struct {
// contains filtered or unexported fields
}
ServiceOptions define options for a Service.
type StreamCancelReason ¶
type StreamCancelReason string
StreamCancelReason is a reason for a gossip stream cancellation.
const ( // StreamCancelReasonDuplicated defines a stream cancellation because // it would lead to a duplicated ongoing stream. StreamCancelReasonDuplicated StreamCancelReason = "duplicated stream" // StreamCancelReasonInsufficientPeerRelation defines a stream cancellation because // the relation to the other peer is insufficient. StreamCancelReasonInsufficientPeerRelation StreamCancelReason = "insufficient peer relation" // StreamCancelReasonNoUnknownPeerSlotAvailable defines a stream cancellation // because no more unknown peers slot were available. StreamCancelReasonNoUnknownPeerSlotAvailable StreamCancelReason = "no unknown peer slot available" // StreamCancelReasonHostShutdown defines a stream cancellation // because the host is shutting down. StreamCancelReasonHostShutdown StreamCancelReason = "host shutdown" )
type WarpSync ¶
type WarpSync struct { sync.Mutex // The used advancement range per checkpoint. AdvancementRange int // The Events of the warpsync. Events *Events // The current confirmed milestone of the node. CurrentConfirmedMilestone milestone.Index // The starting time of the synchronization. StartTime time.Time // The starting point of the synchronization. InitMilestone milestone.Index // The target milestone to which to synchronize to. TargetMilestone milestone.Index // The previous checkpoint of the synchronization. PreviousCheckpoint milestone.Index // The current checkpoint of the synchronization. CurrentCheckpoint milestone.Index // contains filtered or unexported fields }
WarpSync is metadata about doing a synchronization via STING messages.
func NewWarpSync ¶
func NewWarpSync(advRange int, advanceCheckpointCriteriaFunc ...AdvanceCheckpointCriteria) *WarpSync
NewWarpSync creates a new WarpSync instance with the given advancement range and criteria func. If no advancement func is provided, the WarpSync uses AdvanceAtPercentageReached with DefaultAdvancementThreshold.
func (*WarpSync) AddReferencedMessagesCount ¶
AddReferencedMessagesCount adds the amount of referenced messages to collect stats.
func (*WarpSync) UpdateCurrentConfirmedMilestone ¶
UpdateCurrentConfirmedMilestone updates the current confirmed milestone index state.
func (*WarpSync) UpdateTargetMilestone ¶
UpdateTargetMilestone updates the synchronization target if it is higher than the current one and triggers a synchronization start if the target was set for the first time.
type WarpSyncMilestoneRequester ¶
type WarpSyncMilestoneRequester struct { syncutils.Mutex // contains filtered or unexported fields }
WarpSyncMilestoneRequester walks the cones of existing but non-solid milestones and memoizes already walked messages and milestones.
func NewWarpSyncMilestoneRequester ¶
func NewWarpSyncMilestoneRequester( dbStorage *storage.Storage, syncManager *syncmanager.SyncManager, requester *Requester, preventDiscard bool) *WarpSyncMilestoneRequester
NewWarpSyncMilestoneRequester creates a new WarpSyncMilestoneRequester instance.
func (*WarpSyncMilestoneRequester) Cleanup ¶
func (w *WarpSyncMilestoneRequester) Cleanup()
Cleanup cleans up traversed messages to free memory.
func (*WarpSyncMilestoneRequester) RequestMilestoneRange ¶ added in v1.1.0
func (w *WarpSyncMilestoneRequester) RequestMilestoneRange(ctx context.Context, rangeToRequest int, onExistingMilestoneInRange func(ctx context.Context, milestone *storage.CachedMilestone) error, from ...milestone.Index) int
RequestMilestoneRange requests up to N milestones nearest to the current confirmed milestone index. Returns the number of milestones requested.
func (*WarpSyncMilestoneRequester) RequestMissingMilestoneParents ¶
func (w *WarpSyncMilestoneRequester) RequestMissingMilestoneParents(ctx context.Context, cachedMilestone *storage.CachedMilestone) error
RequestMissingMilestoneParents traverses the parents of a given milestone and requests each missing parent. Already requested milestones or traversed messages will be ignored, to circumvent requesting the same parents multiple times.
type WorkUnit ¶
type WorkUnit struct { objectstorage.StorableObjectFlags // contains filtered or unexported fields }
WorkUnit defines the work around processing a received message and its associated requests from peers. There is at most one WorkUnit active per same message bytes.
func (*WorkUnit) Is ¶
func (wu *WorkUnit) Is(state WorkUnitState) bool
Is tells whether the WorkUnit has the given state.
func (*WorkUnit) ObjectStorageKey ¶
func (*WorkUnit) ObjectStorageValue ¶
func (*WorkUnit) Update ¶
func (wu *WorkUnit) Update(_ objectstorage.StorableObject)
func (*WorkUnit) UpdateState ¶
func (wu *WorkUnit) UpdateState(state WorkUnitState)
UpdateState updates the WorkUnit's state.
type WorkUnitState ¶
type WorkUnitState byte
WorkUnitState defines the state which a WorkUnit is in.
const ( Hashing WorkUnitState = 1 << 0 Invalid WorkUnitState = 1 << 1 Hashed WorkUnitState = 1 << 2 )