gossip

package
v1.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 29, 2021 License: Apache-2.0 Imports: 36 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MessageTypeMilestoneRequest message.Type = 1
	MessageTypeMessage          message.Type = 2
	MessageTypeMessageRequest   message.Type = 3
	MessageTypeHeartbeat        message.Type = 4
)
View Source
const (
	// The amount of bytes used for the requested message ID.
	RequestedMessageIDMsgBytesLength = 32

	// The amount of bytes used for the requested milestone index.
	RequestedMilestoneIndexMsgBytesLength = 4

	// The amount of bytes used for a milestone index within a heartbeat packet.
	HeartbeatMilestoneIndexBytesLength = 4

	// The index to use to request the latest milestone via a milestone request message.
	LatestMilestoneRequestIndex = 0
)
View Source
const DefaultAdvancementThreshold = 0.0

DefaultAdvancementThreshold is the default threshold at which a checkpoint advancement is done. Per default an advancement is always done as soon the confirmed milestone enters the range between the previous and current checkpoint.

View Source
const DefaultLatencyResolution = 100
View Source
const FeatureSetName = "Chrysalis-Pt2"

FeatureSetName is the name of the feature set.

View Source
const MinimumVersion = 1

MinimumVersion denotes the minimum version for Chrysalis-Pt2 support.

View Source
const (
	WorkerQueueSize = 50000
)

Variables

View Source
var (
	ErrInvalidTimestamp     = errors.New("invalid timestamp")
	ErrMessageNotSolid      = errors.New("msg is not solid")
	ErrMessageBelowMaxDepth = errors.New("msg is below max depth")
)
View Source
var (
	// MessageMessageDefinition defines a message message's format.
	MessageMessageDefinition = &message.Definition{
		ID:             MessageTypeMessage,
		MaxBytesLength: iotago.MessageBinSerializedMaxSize,
		VariableLength: true,
	}

	// The requested message ID gossipping packet.
	// Contains only an ID of a requested message payload.
	MessageRequestMessageDefinition = &message.Definition{
		ID:             MessageTypeMessageRequest,
		MaxBytesLength: RequestedMessageIDMsgBytesLength,
		VariableLength: false,
	}

	// The heartbeat packet containing the current solid, pruned and latest milestone index,
	// number of connected peers and number of synced peers.
	HeartbeatMessageDefinition = &message.Definition{
		ID:             MessageTypeHeartbeat,
		MaxBytesLength: HeartbeatMilestoneIndexBytesLength*3 + 2,
		VariableLength: false,
	}

	// The requested milestone index packet.
	MilestoneRequestMessageDefinition = &message.Definition{
		ID:             MessageTypeMilestoneRequest,
		MaxBytesLength: RequestedMilestoneIndexMsgBytesLength,
		VariableLength: false,
	}
)
View Source
var (
	// ErrInvalidSourceLength is returned when an invalid source byte slice for extraction of certain data is passed.
	ErrInvalidSourceLength = errors.New("invalid source byte slice")
)
View Source
var (
	ErrUnknownRequestType = errors.New("unknown request type")
)

Functions

func BroadcastCaller

func BroadcastCaller(handler interface{}, params ...interface{})

func CheckpointCaller

func CheckpointCaller(handler interface{}, params ...interface{})

func ExtractRequestedMilestoneIndex

func ExtractRequestedMilestoneIndex(source []byte) (milestone.Index, error)

ExtractRequestedMilestoneIndex extracts the requested milestone index from the given source.

func HeartbeatCaller

func HeartbeatCaller(handler interface{}, params ...interface{})

func MessageProcessedCaller

func MessageProcessedCaller(handler interface{}, params ...interface{})

func NewHeartbeatMsg

func NewHeartbeatMsg(solidMilestoneIndex milestone.Index, prunedMilestoneIndex milestone.Index, latestMilestoneIndex milestone.Index, connectedPeers uint8, syncedPeers uint8) ([]byte, error)

NewHeartbeatMsg creates a new heartbeat message.

func NewMessageMsg

func NewMessageMsg(msgData []byte) ([]byte, error)

NewMessageMsg creates a new message message.

func NewMessageRequestMsg

func NewMessageRequestMsg(requestedMessageID hornet.MessageID) ([]byte, error)

NewMessageRequestMsg creates a message request message.

func NewMilestoneRequestMsg

func NewMilestoneRequestMsg(requestedMilestoneIndex milestone.Index) ([]byte, error)

NewMilestoneRequestMsg creates a new milestone request message.

func ProtocolCaller

func ProtocolCaller(handler interface{}, params ...interface{})

ProtocolCaller gets called with a Protocol.

func StreamCaller

func StreamCaller(handler interface{}, params ...interface{})

StreamCaller gets called with a network.Stream.

func StreamCancelCaller

func StreamCancelCaller(handler interface{}, params ...interface{})

StreamCancelCaller gets called with a network.Stream and its cancel reason.

func SyncDoneCaller

func SyncDoneCaller(handler interface{}, params ...interface{})

func SyncStartCaller

func SyncStartCaller(handler interface{}, params ...interface{})

func TargetCaller

func TargetCaller(handler interface{}, params ...interface{})

Types

type AdvanceCheckpointCriteria

type AdvanceCheckpointCriteria func(currentConfirmed, previousCheckpoint, currentCheckpoint milestone.Index) bool

AdvanceCheckpointCriteria is a function which determines whether the checkpoint should be advanced.

func AdvanceAtPercentageReached

func AdvanceAtPercentageReached(threshold float64) AdvanceCheckpointCriteria

AdvanceAtPercentageReached is an AdvanceCheckpointCriteria which advances the checkpoint when the current one was reached by >=X% by the current confirmed milestone in relation to the previous checkpoint.

type Broadcast

type Broadcast struct {
	// The message data to broadcast.
	MsgData []byte
	// The IDs of the peers to exclude from broadcasting.
	ExcludePeers map[peer.ID]struct{}
}

Broadcast defines a message which should be broadcasted.

type Broadcaster

type Broadcaster struct {
	// contains filtered or unexported fields
}

Broadcaster provides functions to broadcast data to gossip streams.

func NewBroadcaster

func NewBroadcaster(
	dbStorage *storage.Storage,
	syncManager *syncmanager.SyncManager,
	peeringManager *p2p.Manager,
	service *Service,
	broadcastQueueSize int) *Broadcaster

NewBroadcaster creates a new Broadcaster.

func (*Broadcaster) Broadcast

func (b *Broadcaster) Broadcast(broadcast *Broadcast)

Broadcast broadcasts the given Broadcast.

func (*Broadcaster) BroadcastHeartbeat

func (b *Broadcaster) BroadcastHeartbeat(filter func(proto *Protocol) bool)

BroadcastHeartbeat broadcasts a heartbeat message to every peer.

func (*Broadcaster) RunBroadcastQueueDrainer

func (b *Broadcaster) RunBroadcastQueueDrainer(ctx context.Context)

RunBroadcastQueueDrainer runs the broadcast queue drainer.

type CachedWorkUnit

type CachedWorkUnit struct {
	objectstorage.CachedObject
}

CachedWorkUnit represents a cached WorkUnit.

func (*CachedWorkUnit) WorkUnit

func (c *CachedWorkUnit) WorkUnit() *WorkUnit

WorkUnit retrieves the work unit, that is cached in this container.

type Events

type Events struct {
	// Fired when a new set of milestones should be requested.
	CheckpointUpdated *events.Event
	// Fired when the target milestone is updated.
	TargetUpdated *events.Event
	// Fired when warp synchronization starts.
	Start *events.Event
	// Fired when the warp synchronization is done.
	Done *events.Event
}

Events holds WarpSync related events.

type FilterFunc

type FilterFunc func(r *Request) bool

FilterFunc is a function which determines whether a request should be enqueued or not.

type Heartbeat

type Heartbeat struct {
	SolidMilestoneIndex  milestone.Index `json:"solidMilestoneIndex"`
	PrunedMilestoneIndex milestone.Index `json:"prunedMilestoneIndex"`
	LatestMilestoneIndex milestone.Index `json:"latestMilestoneIndex"`
	ConnectedNeighbors   int             `json:"connectedNeighbors"`
	SyncedNeighbors      int             `json:"syncedNeighbors"`
}

Heartbeat contains information about a nodes current solid and pruned milestone index and its connected and synced neighbors count.

func ParseHeartbeat

func ParseHeartbeat(data []byte) *Heartbeat

ParseHeartbeat parses the given message into a heartbeat.

type Info

type Info struct {
	Heartbeat *Heartbeat      `json:"heartbeat"`
	Metrics   MetricsSnapshot `json:"metrics"`
}

Info represents information about an ongoing gossip protocol.

type MessageProcessor

type MessageProcessor struct {

	// events of the message processor.
	Events MessageProcessorEvents
	// contains filtered or unexported fields
}

MessageProcessor processes submitted messages in parallel and fires appropriate completion events.

func NewMessageProcessor

func NewMessageProcessor(
	dbStorage *storage.Storage,
	syncManager *syncmanager.SyncManager,
	requestQueue RequestQueue,
	peeringManager *p2p.Manager,
	serverMetrics *metrics.ServerMetrics,
	opts *Options) (*MessageProcessor, error)

NewMessageProcessor creates a new processor which parses messages.

func (*MessageProcessor) Broadcast

func (proc *MessageProcessor) Broadcast(cachedMsgMeta *storage.CachedMetadata)

func (*MessageProcessor) Emit

func (proc *MessageProcessor) Emit(msg *storage.Message) error

Emit triggers MessageProcessed and BroadcastMessage events for the given message. All messages passed to this function must be checked with "DeSeriModePerformValidation" before. We also check if the parents are solid and not BMD before we broadcast the message, otherwise this message would be seen as invalid gossip by other peers.

func (*MessageProcessor) Process

func (proc *MessageProcessor) Process(p *Protocol, msgType message.Type, data []byte)

Process submits the given message to the processor for processing.

func (*MessageProcessor) Run

func (proc *MessageProcessor) Run(ctx context.Context)

Run runs the processor and blocks until the shutdown signal is triggered.

func (*MessageProcessor) Shutdown

func (proc *MessageProcessor) Shutdown()

Shutdown signals the internal worker pool and object storage to shut down and sets the shutdown flag.

func (*MessageProcessor) WorkUnitsSize

func (proc *MessageProcessor) WorkUnitsSize() int

WorkUnitsSize returns the size of WorkUnits currently cached.

type MessageProcessorEvents

type MessageProcessorEvents struct {
	// Fired when a message was fully processed.
	MessageProcessed *events.Event
	// Fired when a message is meant to be broadcasted.
	BroadcastMessage *events.Event
}

MessageProcessorEvents are the events fired by the MessageProcessor.

type Metrics

type Metrics struct {
	// The number of received messages which are new.
	NewMessages atomic.Uint32
	// The number of received messages which are already known.
	KnownMessages atomic.Uint32
	// The number of received messages.
	ReceivedMessages atomic.Uint32
	// The number of received message requests.
	ReceivedMessageRequests atomic.Uint32
	// The number of received milestone requests.
	ReceivedMilestoneRequests atomic.Uint32
	// The number of received heartbeats.
	ReceivedHeartbeats atomic.Uint32
	// The number of sent packets.
	SentPackets atomic.Uint32
	// The number of sent messages.
	SentMessages atomic.Uint32
	// The number of sent message requests.
	SentMessageRequests atomic.Uint32
	// The number of sent milestone requests.
	SentMilestoneRequests atomic.Uint32
	// The number of sent heartbeats.
	SentHeartbeats atomic.Uint32
	// The number of dropped packets.
	DroppedPackets atomic.Uint32
}

Metrics defines a set of metrics regarding a gossip protocol instance.

func (*Metrics) Snapshot

func (m *Metrics) Snapshot() MetricsSnapshot

Snapshot returns MetricsSnapshot of the Metrics.

type MetricsSnapshot

type MetricsSnapshot struct {
	NewMessages          uint32 `json:"newMessages"`
	KnownMessages        uint32 `json:"knownMessages"`
	ReceivedMessages     uint32 `json:"receivedMessages"`
	ReceivedMessageReq   uint32 `json:"receivedMessageRequests"`
	ReceivedMilestoneReq uint32 `json:"receivedMilestoneRequests"`
	ReceivedHeartbeats   uint32 `json:"receivedHeartbeats"`
	SentMessages         uint32 `json:"sentMessages"`
	SentMessageReq       uint32 `json:"sentMessageRequests"`
	SentMilestoneReq     uint32 `json:"sentMilestoneRequests"`
	SentHeartbeats       uint32 `json:"sentHeartbeats"`
	DroppedPackets       uint32 `json:"droppedPackets"`
}

MetricsSnapshot represents a snapshot of the gossip protocol metrics.

type Options

type Options struct {
	MinPoWScore       float64
	NetworkID         uint64
	BelowMaxDepth     milestone.Index
	WorkUnitCacheOpts *profile.CacheOpts
}

The Options for the MessageProcessor.

type Protocol

type Protocol struct {
	// Parser parses gossip messages and emits received events for them.
	Parser *protocol.Protocol
	// The ID of the peer to which this protocol is associated to.
	PeerID peer.ID
	// The underlying stream for this Protocol.
	Stream network.Stream
	// The events surrounding a Protocol.
	Events ProtocolEvents
	// The peer's latest heartbeat message.
	LatestHeartbeat *Heartbeat
	// Time the last heartbeat was received.
	HeartbeatReceivedTime time.Time
	// Time the last heartbeat was sent.
	HeartbeatSentTime time.Time
	// The send queue into which to enqueue messages to send.
	SendQueue chan []byte
	// The metrics around this protocol instance.
	Metrics Metrics

	// The shared server metrics instance.
	ServerMetrics *metrics.ServerMetrics
	// contains filtered or unexported fields
}

Protocol represents an instance of the gossip protocol.

func NewProtocol

func NewProtocol(peerID peer.ID, stream network.Stream, sendQueueSize int, readTimeout, writeTimeout time.Duration, serverMetrics *metrics.ServerMetrics) *Protocol

NewProtocol creates a new gossip protocol instance associated to the given peer.

func (*Protocol) CouldHaveDataForMilestone

func (p *Protocol) CouldHaveDataForMilestone(index milestone.Index) bool

CouldHaveDataForMilestone tells whether the underlying peer given the latest heartbeat message, could have parts of the cone data for the given milestone. Returns false if no heartbeat message was received yet.

func (*Protocol) Enqueue

func (p *Protocol) Enqueue(data []byte)

Enqueue enqueues the given gossip protocol message to be sent to the peer. If it can't because the send queue is over capacity, the message gets dropped.

func (*Protocol) HasDataForMilestone

func (p *Protocol) HasDataForMilestone(index milestone.Index) bool

HasDataForMilestone tells whether the underlying peer given the latest heartbeat message, has the cone data for the given milestone. Returns false if no heartbeat message was received yet.

func (*Protocol) Info

func (p *Protocol) Info() *Info

Info returns

func (*Protocol) IsSynced

func (p *Protocol) IsSynced(cmi milestone.Index) bool

IsSynced tells whether the underlying peer is synced.

func (*Protocol) Read

func (p *Protocol) Read(buf []byte) (int, error)

Read reads from the stream into the given buffer.

func (*Protocol) Send

func (p *Protocol) Send(message []byte) error

Send sends the given gossip message on the underlying Protocol.Stream.

func (*Protocol) SendHeartbeat

func (p *Protocol) SendHeartbeat(solidMsIndex milestone.Index, pruningMsIndex milestone.Index, latestMsIndex milestone.Index, connectedNeighbors uint8, syncedNeighbors uint8)

SendHeartbeat sends a Heartbeat to the given peer.

func (*Protocol) SendLatestMilestoneRequest

func (p *Protocol) SendLatestMilestoneRequest()

SendLatestMilestoneRequest sends a storage.Milestone request which requests the latest known milestone from the given peer.

func (*Protocol) SendMessage

func (p *Protocol) SendMessage(msgData []byte)

SendMessage sends a storage.Message to the given peer.

func (*Protocol) SendMessageRequest

func (p *Protocol) SendMessageRequest(requestedMessageID hornet.MessageID)

SendMessageRequest sends a storage.Message request message to the given peer.

func (*Protocol) SendMilestoneRequest

func (p *Protocol) SendMilestoneRequest(index milestone.Index)

SendMilestoneRequest sends a storage.Milestone request to the given peer.

type ProtocolEvents

type ProtocolEvents struct {
	// Fired when the heartbeat message state on the peer has been updated.
	HeartbeatUpdated *events.Event
	// Fired when a message of the given type is sent.
	// This exists solely because protocol.Protocol in hive.go doesn't
	// emit events anymore for sent messages, as it is solely a parser.
	Sent []*events.Event
	// Fired when the protocol stream has been closed.
	Closed *events.Event
	// Fired when an error occurs on the protocol.
	Errors *events.Event
}

ProtocolEvents happening on a Protocol.

type ProtocolForEachFunc

type ProtocolForEachFunc func(proto *Protocol) bool

ProtocolForEachFunc is used in Service.ForEach. Returning false indicates to stop looping. This function must not call any methods on Service.

type Request

type Request struct {
	// The type of the request.
	RequestType RequestType
	// The MessageID of the message to request.
	MessageID hornet.MessageID
	// The milestone index under which this request is linked.
	MilestoneIndex milestone.Index

	// Tells the request queue to not remove this request if the enqueue time is
	// over the given threshold.
	PreventDiscard bool
	// the time at which this request was first enqueued.
	// do not modify this time
	EnqueueTime time.Time
	// contains filtered or unexported fields
}

Request is a request for a particular message.

func NewMessageIDRequest added in v1.1.0

func NewMessageIDRequest(messageID hornet.MessageID, msIndex milestone.Index) *Request

NewMessageIDRequest creates a new message request for a specific messageID.

func NewMilestoneIndexRequest added in v1.1.0

func NewMilestoneIndexRequest(msIndex milestone.Index) *Request

NewMilestoneIndexRequest creates a new message request for a specific milestone index

func (*Request) MapKey added in v1.1.0

func (r *Request) MapKey() string

type RequestBackPressureFunc

type RequestBackPressureFunc func() bool

RequestBackPressureFunc is a function which tells the Requester to stop requesting more data.

type RequestQueue

type RequestQueue interface {
	// Next returns the next request to send, pops it from the queue and marks it as pending.
	Next() *Request
	// Peek returns the next request to send without popping it from the queue.
	Peek() *Request
	// Enqueue enqueues the given request if it isn't already queued or pending.
	Enqueue(*Request) (enqueued bool)
	// IsQueued tells whether a given request for the given data is queued.
	IsQueued(data interface{}) bool
	// IsPending tells whether a given request was popped from the queue and is now pending.
	IsPending(data interface{}) bool
	// IsProcessing tells whether a given request was popped from the queue, received and is now processing.
	IsProcessing(data interface{}) bool
	// Received marks a request as received and thereby removes it from the pending set.
	// It is added to the processing set.
	// Returns the origin request which was pending or nil if the data was not requested.
	Received(data interface{}) *Request
	// Processed marks a request as fulfilled and thereby removes it from the processing set.
	// Returns the origin request which was processing or nil if the data was not requested.
	Processed(data interface{}) *Request
	// EnqueuePending enqueues all pending requests back into the queue.
	// It also discards requests in the pending set of which their enqueue time is over the given delta threshold.
	// If discardOlderThan is zero, no requests are discarded.
	EnqueuePending(discardOlderThan time.Duration) (queued int)
	// Size returns the size of currently queued, requested/pending and processing requests.
	Size() (queued int, pending int, processing int)
	// Empty tells whether the queue has no queued and pending requests.
	Empty() bool
	// Requests returns a snapshot of all queued, pending and processing requests in the queue.
	Requests() (queued []*Request, pending []*Request, processing []*Request)
	// AvgLatency returns the average latency of enqueueing and then receiving a request.
	AvgLatency() int64
	// Filter adds the given filter function to the queue. Passing nil resets the current one.
	// Setting a filter automatically clears all queued and pending requests which do not fulfill
	// the filter criteria.
	Filter(f FilterFunc)
}

RequestQueue implements a queue which contains requests for needed data.

func NewRequestQueue

func NewRequestQueue(latencyResolution ...int32) RequestQueue

NewRequestQueue creates a new RequestQueue where request are prioritized over their milestone index (lower = higher priority).

type RequestType added in v1.1.0

type RequestType int

RequestType is the type of request.

const (
	RequestTypeMessageID RequestType = iota
	RequestTypeMilestoneIndex
)

type Requester

type Requester struct {
	// contains filtered or unexported fields
}

Requester handles requesting packets.

func NewRequester

func NewRequester(
	dbStorage *storage.Storage,
	service *Service,
	rQueue RequestQueue,
	opts ...RequesterOption) *Requester

NewRequester creates a new Requester.

func (*Requester) AddBackPressureFunc

func (r *Requester) AddBackPressureFunc(pressureFunc RequestBackPressureFunc)

AddBackPressureFunc adds a RequestBackPressureFunc. This function can be called multiple times to add additional RequestBackPressureFunc. Calling this function after any Requester worker has been started results in a panic.

func (*Requester) Request

func (r *Requester) Request(data interface{}, msIndex milestone.Index, preventDiscard ...bool) bool

Request enqueues a request to the request queue for the given message if it isn't a solid entry point and is not contained in the database already.

func (*Requester) RequestMilestoneParents

func (r *Requester) RequestMilestoneParents(cachedMilestone *storage.CachedMilestone) bool

RequestMilestoneParents enqueues requests for the parents of the given milestone to the request queue, if the parents are not solid entry points and not already in the database.

func (*Requester) RequestMultiple

func (r *Requester) RequestMultiple(messageIDs hornet.MessageIDs, msIndex milestone.Index, preventDiscard ...bool) int

RequestMultiple works like Request but takes multiple message IDs.

func (*Requester) RequestParents

func (r *Requester) RequestParents(cachedMsg *storage.CachedMessage, msIndex milestone.Index, preventDiscard ...bool)

RequestParents enqueues requests for the parents of the given message to the request queue, if the given message is not a solid entry point and neither its parents are and also not in the database.

func (*Requester) RunPendingRequestEnqueuer

func (r *Requester) RunPendingRequestEnqueuer(ctx context.Context)

RunPendingRequestEnqueuer runs the loop to periodically re-request pending requests from the RequestQueue.

func (*Requester) RunRequestQueueDrainer

func (r *Requester) RunRequestQueueDrainer(ctx context.Context)

RunRequestQueueDrainer runs the RequestQueue drainer.

type RequesterOption

type RequesterOption func(options *RequesterOptions)

RequesterOption is a function which sets an option on a RequesterOptions instance.

func WithRequesterDiscardRequestsOlderThan

func WithRequesterDiscardRequestsOlderThan(dur time.Duration) RequesterOption

WithRequesterDiscardRequestsOlderThan sets the threshold for the max age of requests.

func WithRequesterPendingRequestReEnqueueInterval

func WithRequesterPendingRequestReEnqueueInterval(dur time.Duration) RequesterOption

WithRequesterPendingRequestReEnqueueInterval sets the re-enqueue interval for pending requests.

type RequesterOptions

type RequesterOptions struct {
	// Defines the re-queue interval for pending requests.
	PendingRequestReEnqueueInterval time.Duration
	// Defines the max age for requests.
	DiscardRequestsOlderThan time.Duration
}

RequesterOptions are options around a Requester.

type Requests added in v1.1.0

type Requests []*Request

func (Requests) HasRequest added in v1.1.0

func (r Requests) HasRequest() bool

HasRequest returns true if Requests contains a Request.

type Service

type Service struct {
	// the logger used to log events.
	*utils.WrappedLogger

	// Events happening around a Service.
	Events ServiceEvents
	// contains filtered or unexported fields
}

Service handles ongoing gossip streams.

func NewService

func NewService(
	protocol protocol.ID, host host.Host,
	peeringManager *p2p.Manager,
	serverMetrics *metrics.ServerMetrics,
	opts ...ServiceOption) *Service

NewService creates a new Service.

func (*Service) ForEach

func (s *Service) ForEach(f ProtocolForEachFunc)

ForEach calls the given ProtocolForEachFunc on each Protocol.

func (*Service) Protocol

func (s *Service) Protocol(peerID peer.ID) *Protocol

Protocol returns the gossip.Protocol instance for the given peer or nil.

func (*Service) Start

func (s *Service) Start(ctx context.Context)

Start starts the Service's event loop.

func (*Service) SynchronizedCount

func (s *Service) SynchronizedCount(latestMilestoneIndex milestone.Index) int

SynchronizedCount returns the count of streams with peers which appear to be synchronized given their latest Heartbeat message.

type ServiceEvents

type ServiceEvents struct {
	// Fired when a protocol has been started.
	ProtocolStarted *events.Event
	// Fired when a protocol has ended.
	ProtocolTerminated *events.Event
	// Fired when an inbound stream gets canceled.
	InboundStreamCancelled *events.Event
	// Fired when an internal error happens.
	Error *events.Event
}

ServiceEvents are events happening around a Service.

type ServiceOption

type ServiceOption func(opts *ServiceOptions)

ServiceOption is a function setting a ServiceOptions option.

func WithLogger

func WithLogger(logger *logger.Logger) ServiceOption

WithLogger enables logging within the Service.

func WithSendQueueSize

func WithSendQueueSize(size int) ServiceOption

WithSendQueueSize defines the size of send queues on ongoing gossip protocol streams.

func WithStreamConnectTimeout

func WithStreamConnectTimeout(dur time.Duration) ServiceOption

WithStreamConnectTimeout defines the timeout for creating a gossip protocol stream.

func WithStreamReadTimeout

func WithStreamReadTimeout(dur time.Duration) ServiceOption

WithStreamReadTimeout defines the read timeout for reading from a stream.

func WithStreamWriteTimeout

func WithStreamWriteTimeout(dur time.Duration) ServiceOption

WithStreamWriteTimeout defines the write timeout for writing to a stream.

func WithUnknownPeersLimit

func WithUnknownPeersLimit(limit int) ServiceOption

WithUnknownPeersLimit defines how many peers with an unknown relation are allowed to have an ongoing gossip protocol stream.

type ServiceOptions

type ServiceOptions struct {
	// contains filtered or unexported fields
}

ServiceOptions define options for a Service.

type StreamCancelReason

type StreamCancelReason string

StreamCancelReason is a reason for a gossip stream cancellation.

const (
	// StreamCancelReasonDuplicated defines a stream cancellation because
	// it would lead to a duplicated ongoing stream.
	StreamCancelReasonDuplicated StreamCancelReason = "duplicated stream"
	// StreamCancelReasonInsufficientPeerRelation defines a stream cancellation because
	// the relation to the other peer is insufficient.
	StreamCancelReasonInsufficientPeerRelation StreamCancelReason = "insufficient peer relation"
	// StreamCancelReasonNoUnknownPeerSlotAvailable defines a stream cancellation
	// because no more unknown peers slot were available.
	StreamCancelReasonNoUnknownPeerSlotAvailable StreamCancelReason = "no unknown peer slot available"
	// StreamCancelReasonHostShutdown defines a stream cancellation
	// because the host is shutting down.
	StreamCancelReasonHostShutdown StreamCancelReason = "host shutdown"
)

type WarpSync

type WarpSync struct {
	sync.Mutex

	// The used advancement range per checkpoint.
	AdvancementRange int
	// The Events of the warpsync.
	Events Events

	// The current confirmed milestone of the node.
	CurrentConfirmedMilestone milestone.Index
	// The starting time of the synchronization.
	StartTime time.Time
	// The starting point of the synchronization.
	InitMilestone milestone.Index
	// The target milestone to which to synchronize to.
	TargetMilestone milestone.Index
	// The previous checkpoint of the synchronization.
	PreviousCheckpoint milestone.Index
	// The current checkpoint of the synchronization.
	CurrentCheckpoint milestone.Index
	// contains filtered or unexported fields
}

WarpSync is metadata about doing a synchronization via STING messages.

func NewWarpSync

func NewWarpSync(advRange int, advanceCheckpointCriteriaFunc ...AdvanceCheckpointCriteria) *WarpSync

NewWarpSync creates a new WarpSync instance with the given advancement range and criteria func. If no advancement func is provided, the WarpSync uses AdvanceAtPercentageReached with DefaultAdvancementThreshold.

func (*WarpSync) AddReferencedMessagesCount

func (ws *WarpSync) AddReferencedMessagesCount(referencedMessagesCount int)

AddReferencedMessagesCount adds the amount of referenced messages to collect stats.

func (*WarpSync) UpdateCurrentConfirmedMilestone

func (ws *WarpSync) UpdateCurrentConfirmedMilestone(current milestone.Index)

UpdateCurrentConfirmedMilestone updates the current confirmed milestone index state.

func (*WarpSync) UpdateTargetMilestone

func (ws *WarpSync) UpdateTargetMilestone(target milestone.Index)

UpdateTargetMilestone updates the synchronization target if it is higher than the current one and triggers a synchronization start if the target was set for the first time.

type WarpSyncMilestoneRequester

type WarpSyncMilestoneRequester struct {
	syncutils.Mutex
	// contains filtered or unexported fields
}

WarpSyncMilestoneRequester walks the cones of existing but non-solid milestones and memoizes already walked messages and milestones.

func NewWarpSyncMilestoneRequester

func NewWarpSyncMilestoneRequester(
	dbStorage *storage.Storage,
	syncManager *syncmanager.SyncManager,
	requester *Requester,
	preventDiscard bool) *WarpSyncMilestoneRequester

NewWarpSyncMilestoneRequester creates a new WarpSyncMilestoneRequester instance.

func (*WarpSyncMilestoneRequester) Cleanup

func (w *WarpSyncMilestoneRequester) Cleanup()

Cleanup cleans up traversed messages to free memory.

func (*WarpSyncMilestoneRequester) RequestMilestoneRange added in v1.1.0

func (w *WarpSyncMilestoneRequester) RequestMilestoneRange(ctx context.Context, rangeToRequest int, onExistingMilestoneInRange func(ctx context.Context, milestone *storage.CachedMilestone) error, from ...milestone.Index) int

RequestMilestoneRange requests up to N milestones nearest to the current confirmed milestone index. Returns the number of milestones requested.

func (*WarpSyncMilestoneRequester) RequestMissingMilestoneParents

func (w *WarpSyncMilestoneRequester) RequestMissingMilestoneParents(ctx context.Context, cachedMs *storage.CachedMilestone) error

RequestMissingMilestoneParents traverses the parents of a given milestone and requests each missing parent. Already requested milestones or traversed messages will be ignored, to circumvent requesting the same parents multiple times.

type WorkUnit

type WorkUnit struct {
	objectstorage.StorableObjectFlags
	// contains filtered or unexported fields
}

WorkUnit defines the work around processing a received message and its associated requests from peers. There is at most one WorkUnit active per same message bytes.

func (*WorkUnit) Is

func (wu *WorkUnit) Is(state WorkUnitState) bool

Is tells whether the WorkUnit has the given state.

func (*WorkUnit) ObjectStorageKey

func (wu *WorkUnit) ObjectStorageKey() []byte

func (*WorkUnit) ObjectStorageValue

func (wu *WorkUnit) ObjectStorageValue() []byte

func (*WorkUnit) Update

func (wu *WorkUnit) Update(_ objectstorage.StorableObject)

func (*WorkUnit) UpdateState

func (wu *WorkUnit) UpdateState(state WorkUnitState)

UpdateState updates the WorkUnit's state.

type WorkUnitState

type WorkUnitState byte

WorkUnitState defines the state which a WorkUnit is in.

const (
	Hashing WorkUnitState = 1 << 0
	Invalid WorkUnitState = 1 << 1
	Hashed  WorkUnitState = 1 << 2
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL