Documentation ¶
Index ¶
- type CrawlHandler
- type CrawlHandlerConfig
- type CrawlResult
- type CrawlWriter
- type CrawlWriterConfig
- type DialHandler
- type DialHandlerConfig
- type DialResult
- type DialWriter
- type Driver
- type Engine
- type EngineConfig
- type Handler
- type PeerInfo
- type Pool
- type PriorityQueue
- func (tq *PriorityQueue[T]) All() map[string]T
- func (tq *PriorityQueue[T]) Drop(key string) bool
- func (tq *PriorityQueue[T]) Find(key string) (T, bool)
- func (tq *PriorityQueue[T]) Len() int
- func (tq *PriorityQueue[T]) Peek() (T, bool)
- func (tq *PriorityQueue[T]) Pop() (T, bool)
- func (tq *PriorityQueue[T]) Push(key string, value T, priority int) bool
- func (tq *PriorityQueue[T]) Update(key string, value T, priority int) bool
- func (tq *PriorityQueue[T]) UpdatePriority(key string, priority int) bool
- type Result
- type RoutingTable
- type WorkResult
- type Worker
- type WriteResult
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CrawlHandler ¶
type CrawlHandler[I PeerInfo[I]] struct { // A map that maps peer IDs to their database IDs. This speeds up the insertion of neighbor information as // the database does not need to look up every peer ID but only the ones not yet present in the database. // Speed up for ~11k peers: 5.5 min -> 30s PeerMappings map[peer.ID]int // A map that keeps track of all k-bucket entries of a particular peer. RoutingTables map[peer.ID]*RoutingTable[I] // A map of agent versions and their occurrences that happened during the crawl. AgentVersion map[string]int // A map of protocols and their occurrences that happened during the crawl. Protocols map[string]int // A map of errors that happened when trying to dial a peer. ConnErrs map[string]int // A map of errors that happened during the crawl. CrawlErrs map[string]int // The number of peers we would still need to crawl after the Run method has returned. QueuedPeers int // The number of peers that were crawled. CrawledPeers int // contains filtered or unexported fields }
CrawlHandler is the default implementation for a Handler that can be used as the basis for crawl operations.
func NewCrawlHandler ¶
func NewCrawlHandler[I PeerInfo[I]](cfg *CrawlHandlerConfig) *CrawlHandler[I]
func (*CrawlHandler[I]) HandlePeerResult ¶
func (h *CrawlHandler[I]) HandlePeerResult(ctx context.Context, result Result[CrawlResult[I]]) []I
func (*CrawlHandler[I]) HandleWriteResult ¶
func (h *CrawlHandler[I]) HandleWriteResult(ctx context.Context, result Result[WriteResult])
func (*CrawlHandler[I]) TotalErrors ¶
func (h *CrawlHandler[I]) TotalErrors() int
TotalErrors counts the total amount of errors - equivalent to undialable peers during this crawl.
type CrawlHandlerConfig ¶
type CrawlHandlerConfig struct { // a flag that indicates whether we want to track and keep routing table // configurations of all peers in memory and write them to disk after the // crawl has finished. TrackNeighbors bool }
type CrawlResult ¶
type CrawlResult[I PeerInfo[I]] struct { // The crawler that generated this result CrawlerID string // Information about crawled peer Info I // The neighbors of the crawled peer RoutingTable *RoutingTable[I] // The agent version of the crawled peer Agent string // The protocols the peer supports Protocols []string // Indicates whether the above routing table information was queried through the API. // The API routing table does not include MultiAddresses, so we won't use them for further crawls. RoutingTableFromAPI bool // Any error that has occurred when connecting to the peer ConnectError error // The above error transferred to a known error ConnectErrorStr string // Any error that has occurred during fetching neighbor information CrawlError error // The above error transferred to a known error CrawlErrorStr string // When was the crawl started CrawlStartTime time.Time // When did this crawl end CrawlEndTime time.Time // When was the connection attempt made ConnectStartTime time.Time // As it can take some time to handle the result we track the timestamp explicitly ConnectEndTime time.Time // Additional properties of that specific peer we have crawled Properties json.RawMessage // Debug flag that indicates whether to log the full error string LogErrors bool }
CrawlResult captures data that is gathered from crawling a single peer.
func (CrawlResult[I]) ConnectDuration ¶
func (r CrawlResult[I]) ConnectDuration() time.Duration
ConnectDuration returns the time it took to connect to the peer. This includes dialing and the identity protocol.
func (CrawlResult[I]) CrawlDuration ¶
func (r CrawlResult[I]) CrawlDuration() time.Duration
CrawlDuration returns the time it took to crawl to the peer (connecting + fetching neighbors)
func (CrawlResult[I]) IsSuccess ¶
func (r CrawlResult[I]) IsSuccess() bool
func (CrawlResult[I]) LogEntry ¶
func (r CrawlResult[I]) LogEntry() *log.Entry
func (CrawlResult[I]) PeerInfo ¶
func (r CrawlResult[I]) PeerInfo() I
type CrawlWriter ¶
type CrawlWriter[I PeerInfo[I]] struct { // contains filtered or unexported fields }
CrawlWriter handles the insert/upsert/update operations for a particular crawl result.
func NewCrawlWriter ¶
func NewCrawlWriter[I PeerInfo[I]](id string, dbc db.Client, dbCrawlID int, cfg *CrawlWriterConfig) *CrawlWriter[I]
func (*CrawlWriter[I]) Work ¶
func (w *CrawlWriter[I]) Work(ctx context.Context, task CrawlResult[I]) (WriteResult, error)
Work takes a crawl result (persist job) and inserts a denormalized database entry of the results.
type CrawlWriterConfig ¶
type DialHandler ¶
type DialHandler[I PeerInfo[I]] struct { // contains filtered or unexported fields }
func NewDialHandler ¶
func NewDialHandler[I PeerInfo[I]](cfg *DialHandlerConfig) *DialHandler[I]
func (*DialHandler[I]) HandlePeerResult ¶
func (h *DialHandler[I]) HandlePeerResult(ctx context.Context, result Result[DialResult[I]]) []I
func (*DialHandler[I]) HandleWriteResult ¶
func (h *DialHandler[I]) HandleWriteResult(ctx context.Context, result Result[WriteResult])
type DialHandlerConfig ¶
type DialHandlerConfig struct{}
type DialResult ¶
type DialResult[I PeerInfo[I]] struct { // The dialer that generated this result DialerID string // The dialed peer Info I // If error is set, the peer was not dialable Error error // The above error transferred to a known error DialError string // When was the dial started DialStartTime time.Time // When did this crawl end DialEndTime time.Time }
DialResult captures data that is gathered from pinging a single peer.
func (DialResult[I]) DialDuration ¶
func (r DialResult[I]) DialDuration() time.Duration
DialDuration returns the time it took to dial the peer
func (DialResult[I]) IsSuccess ¶
func (r DialResult[I]) IsSuccess() bool
func (DialResult[I]) LogEntry ¶
func (r DialResult[I]) LogEntry() *log.Entry
func (DialResult[I]) PeerInfo ¶
func (r DialResult[I]) PeerInfo() I
type DialWriter ¶
type DialWriter[I PeerInfo[I]] struct { // contains filtered or unexported fields }
DialWriter handles the insert/upsert/update operations for a particular crawl result.
func NewDialWriter ¶
func NewDialWriter[I PeerInfo[I]](id string, dbc *db.DBClient) *DialWriter[I]
func (*DialWriter[I]) Work ¶
func (w *DialWriter[I]) Work(ctx context.Context, task DialResult[I]) (WriteResult, error)
Work takes a crawl result (persist job) and inserts a denormalized database entry of the results.
type Driver ¶
type Driver[I PeerInfo[I], R WorkResult[I]] interface { // NewWorker returns a new [Worker] that takes a [PeerInfo], performs its // duties by contacting that peer, and returns the resulting WorkResult. // In the current implementation, this could be either a "peer crawl" (when // you run "nebula crawl") or a "peer dial" (when you run "nebula monitor"). NewWorker() (Worker[I, R], error) // NewWriter returns a new [Worker] that takes a [WorkResult], performs its // duties by storing that result somewhere, and returns information about // how that all went. NewWriter() (Worker[R, WriteResult], error) // Tasks returns a channel on which the driver should emit peer processing // tasks. This method will only be called once by the engine. The engine // will keep running until the returned channel was closed. Closing the // channel signals the engine that we don't anticipate to schedule any more // tasks. However, this doesn't mean that the engine will stop right away. // It will first process all remaining tasks it has in its queue. If you // want to prematurely stop the engine, cancel the context you passed into // [Engine.Run]. Tasks() <-chan I // Close is called when the engine is about to shut down. This gives the // stack a chance to clean up internal resources. Implementation must be // idempotent as it may be called multiple times. Close() }
A Driver is a data structure that provides the necessary implementations and tasks for the engine to operate.
type Engine ¶
type Engine[I PeerInfo[I], R WorkResult[I]] struct { // contains filtered or unexported fields }
Engine is the integral data structure for orchestrating the communication with peers and writing the processing results to disk. It maintains a pool of workers and writers that are concurrently processing peers and writing the results to disk. The engine is responsible for scheduling which peer to process next, making sure to not process the same peer twice. At the same time, it buffers the processing results and schedules results to be stored and distributes these tasks to the writers when they have capacity. The engine can be configured with the EngineConfig struct. It is generic on the peer information type (PeerInfo) and the result that the peer workers return (WorkResult).
func NewEngine ¶
func NewEngine[I PeerInfo[I], R WorkResult[I]](driver Driver[I, R], handler Handler[I, R], cfg *EngineConfig) (*Engine[I, R], error)
NewEngine initializes a new engine. See the Engine documentation for more information.
func (*Engine[I, R]) Run ¶
Run is a blocking call that starts the worker and writer pools to accept and perform tasks. It enters an indefinite loop expecting to receive tasks from the driver. In the case of a crawl operation, these should be the bootstrap peers start the crawl from. Then it sends these peers to the workers which then process that peer and send back the result which is then sent to one of the writer workers which in turn stores the result. The engine, in the meantime, keeps track of and exposes prometheus metrics. The engine will keep running as long as the tasksChan from the driver isn't closed. If the channel was closed, the engine will process all remaining peers in the queue. Each result is passed to a handler that may return additional peers to process.
type EngineConfig ¶
type EngineConfig struct { // the number of internal workers. This translates to how many peers we // process in parallel. WorkerCount int // the number of internal writers that store the results to disk. WriterCount int // maximum number of peers to process before stopping the engine. 0 means // to process peers until there are no more in the work queue. If // [DuplicateProcessing] is true, process indefinitely. Limit int // if set to true, the engine won't keep track of which peers were already // processed to prevent processing a peer twice. The engine is solely driven // by what the driver will emit on its tasks channel. DuplicateProcessing bool // which type addresses should be dialed. Relevant for parking // peers during a crawl AddrDialType config.AddrType // MeterProvider is the meter provider to use when initialising metric instruments. MeterProvider metric.MeterProvider // TracerProvider is the tracer provider to use when initialising tracing TracerProvider trace.TracerProvider }
The EngineConfig object configures the core Nebula Engine below.
func DefaultEngineConfig ¶
func DefaultEngineConfig() *EngineConfig
DefaultEngineConfig returns a default engine configuration that can and should be adjusted for different networks.
func (*EngineConfig) Validate ¶
func (cfg *EngineConfig) Validate() error
Validate verifies the engine configuration's invariants.
type Handler ¶
type Handler[I PeerInfo[I], R WorkResult[I]] interface { // HandlePeerResult is called when the worker that has processed a peer // has emitted a new processing result. This can be a [CrawlResult] or // [DialResult] at the moment. HandlePeerResult(context.Context, Result[R]) []I // HandleWriteResult is called when the writer has written a [CrawlResult] // or [DialResult] to disk. HandleWriteResult(context.Context, Result[WriteResult]) }
Handler defines the interface that the engine will call every time it has received a result from any of its workers.
type PeerInfo ¶
type PeerInfo[T any] interface { // ID should return the peer's/node's identifier mapped into a libp2p peer.ID. ID() peer.ID // Addrs should return all addresses that this peer is reachable at in multi address format. Addrs() []multiaddr.Multiaddr // Merge takes another peer info and merges it information into the callee // peer info struct. The implementation of Merge may panic if the peer IDs // don't match. Merge(other T) T // DeduplicationKey returns a unique string used for deduplication of crawl // tasks. For example, in discv4 and discv5 we might want to crawl the same // peer (as identified by its public key) multiple times when we find new // ENR's for it. If the deduplication key was just the public key, we would // only crawl it once. If we later find newer ENR's for the same peer with // different network addresses, we would skip that peer. On the other hand, // if the deduplication key was the entire ENR, we would crawl the same peer // with different (potentially newer) connectivity information again. DeduplicationKey() string }
PeerInfo is the interface that any peer information struct must conform to.
type Pool ¶
A Pool manages a set of Workers
func (*Pool[T, R]) Start ¶
Start takes a channel on which tasks will be scheduled. It is guaranteed that the Pool reads from this channel as fast as possible. To stop the worker pool you need to close the channel. To wait until all Workers have finished, wait until the results channel returned from this method was closed as well.
type PriorityQueue ¶
type PriorityQueue[T any] struct { // contains filtered or unexported fields }
PriorityQueue is can take unique items and pops them according to their priority.
func NewPriorityQueue ¶
func NewPriorityQueue[T any]() *PriorityQueue[T]
func (*PriorityQueue[T]) All ¶
func (tq *PriorityQueue[T]) All() map[string]T
func (*PriorityQueue[T]) Drop ¶
func (tq *PriorityQueue[T]) Drop(key string) bool
func (*PriorityQueue[T]) Find ¶
func (tq *PriorityQueue[T]) Find(key string) (T, bool)
func (*PriorityQueue[T]) Len ¶
func (tq *PriorityQueue[T]) Len() int
func (*PriorityQueue[T]) Peek ¶
func (tq *PriorityQueue[T]) Peek() (T, bool)
func (*PriorityQueue[T]) Pop ¶
func (tq *PriorityQueue[T]) Pop() (T, bool)
func (*PriorityQueue[T]) Push ¶
func (tq *PriorityQueue[T]) Push(key string, value T, priority int) bool
func (*PriorityQueue[T]) Update ¶
func (tq *PriorityQueue[T]) Update(key string, value T, priority int) bool
Update modifies the priority and value of an item in the queue.
func (*PriorityQueue[T]) UpdatePriority ¶
func (tq *PriorityQueue[T]) UpdatePriority(key string, priority int) bool
UpdatePriority modifies the priority of an item in the queue.
type Result ¶
Result is a generic result object. It captures a generic value or any error that might have occurred when producing this result.
type RoutingTable ¶
type RoutingTable[I PeerInfo[I]] struct { // PeerID is the peer whose neighbors (routing table entries) are in the array below. PeerID peer.ID // The peers that are in the routing table of the above peer Neighbors []I // First error that has occurred during crawling that peer Error error // Little Endian representation of at which CPLs errors occurred during // neighbors fetches. // errorBits tracks at which CPL errors have occurred. // 0000 0000 0000 0000 - No error // 0000 0000 0000 0001 - An error has occurred at CPL 0 // 1000 0000 0000 0001 - An error has occurred at CPL 0 and 15 ErrorBits uint16 }
RoutingTable captures the routing table information and crawl error of a particular peer
type WorkResult ¶
type WorkResult[I PeerInfo[I]] interface { // PeerInfo returns information of the peer that was processed PeerInfo() I // LogEntry returns logging information that can be used by the engine LogEntry() *log.Entry // IsSuccess indicates whether this WorkResult is considered a success IsSuccess() bool }
WorkResult must be implemented by the result that a Worker which processes peers returns.
type Worker ¶
type Worker[T any, R any] interface { // Work instructs the Worker to process the task and produce a result. Work(ctx context.Context, task T) (R, error) }
A Worker processes tasks of type T and returns results of type R or an error. Workers are used to process a single peer or store a crawl result to the database. It is the unit of concurrency in this system.
type WriteResult ¶
type WriteResult struct { *db.InsertVisitResult WriterID string PeerID peer.ID Duration time.Duration Error error }
WriteResult must be returned by write workers.