metricplugin

package
v0.0.0-...-5cd3a54 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 16, 2024 License: MIT Imports: 39 Imported by: 0

Documentation

Overview

Package metricplugin implements a plugin for kubo (go-ipfs) to export additional metrics and Bitswap traffic from a running node.

Index

Constants

View Source
const (
	APIBasePath                = "/metric_plugin/v1"
	APIPingPath                = APIBasePath + "/ping"
	APIBroadcastWantPath       = APIBasePath + "/broadcast_want"
	APIBroadcastCancelPath     = APIBasePath + "/broadcast_cancel"
	APIBroadcastWantCancelPath = APIBasePath + "/broadcast_want_cancel"
	APISamplePeerMetadataPath  = APIBasePath + "/sample_peer_metadata"
)

API path constants.

View Source
const (
	// APIOnlyConnectedPeersParameter is the parameter for the SamplePeerMetadata function.
	APIOnlyConnectedPeersParameter = "only_connected"
)

API parameter constants.

View Source
const ExchangeName = "ipfs.passive_monitoring"

ExchangeName is the name of the exchange on RabbitMQ.

Variables

View Source
var (

	// ErrBitswapProbeDisabled is returned for Bitswap probing API calls if
	// the Bitswap probe is disabled via the plugin's configuration.
	ErrBitswapProbeDisabled = errors.New("Bitswap probe not enabled")
)
View Source
var ErrInternalServerError = newPresentableError(http.StatusInternalServerError, "internal server error")

ErrInternalServerError is returned when there was either a panic processing a request or a non-presentable error was generated during execution.

View Source
var ErrInvalidRequest = newPresentableError(http.StatusBadRequest, "invalid request")

ErrInvalidRequest is returned for invalid requests, such as malformed JSON.

View Source
var ErrServerClosing = newPresentableError(http.StatusServiceUnavailable, "server closing")

ErrServerClosing is returned when a request is made while the node has started to shut down.

Functions

This section is empty.

Types

type BitswapDiscoveryProbe

type BitswapDiscoveryProbe struct {
	// contains filtered or unexported fields
}

BitswapDiscoveryProbe exposes functionality to perform Bitswap-only discovery for a set of CIDs. It implements `network.Notifiee` to receive notifications about peers connecting, disconnecting, etc. It keeps track of peers the node is connected to and attempts to hold an active Bitswap sender to each of them.

Using the `network.Notifiee` events is somewhat wonky: We occasionally receive duplicate disconnection events or miss connection events. However, it looks like we converge with the true IPFS connectivity after a while.

func NewProbe

func NewProbe(nodeNetwork network.Network, bsnetImpl bsnet.BitSwapNetwork) *BitswapDiscoveryProbe

NewProbe creates a new Bitswap Probe and network Notifiee. This will spawn goroutines to log stats and connect to peers via Bitswap. Use the Shutdown method to shut down cleanly.

func (*BitswapDiscoveryProbe) Connected

func (wt *BitswapDiscoveryProbe) Connected(_ network.Network, conn network.Conn)

Connected is called when a connection is opened.

func (*BitswapDiscoveryProbe) Disconnected

func (wt *BitswapDiscoveryProbe) Disconnected(_ network.Network, conn network.Conn)

Disconnected is called when a connection is closed.

func (*BitswapDiscoveryProbe) Listen

Listen is called when the network implementation starts listening on the given address. We do not use this at the moment.

func (*BitswapDiscoveryProbe) ListenClose

ListenClose is called when the network implementation stops listening on the given address. We do not use this at the moment.

func (*BitswapDiscoveryProbe) Shutdown

func (wt *BitswapDiscoveryProbe) Shutdown()

Shutdown shuts down the probe cleanly. This is idempotent, i.e. calling it multiple times does not cause problems. This will block until all goroutines have quit (this depends on some timeouts for connecting to peers and opening streams, probably).

type BitswapMessage

type BitswapMessage struct {
	// Wantlist entries sent with this message.
	WantlistEntries []bsmsg.Entry `json:"wantlist_entries"`

	// Whether the wantlist entries are a full new wantlist.
	FullWantList bool `json:"full_wantlist"`

	// Blocks sent with this message.
	Blocks []cid.Cid `json:"blocks"`

	// Block presence indicators sent with this message.
	BlockPresences []BlockPresence `json:"block_presences"`

	// Underlay addresses of the peer we were connected to when the message
	// was received.
	ConnectedAddresses []ma.Multiaddr `json:"connected_addresses"`
}

A BitswapMessage is the type pushed to remote clients for recorded incoming Bitswap messages.

type BitswapTracer

type BitswapTracer struct {
	// contains filtered or unexported fields
}

BitswapTracer implements the `bitswap.Tracer` interface to log Bitswap messages.

func NewTracer

func NewTracer(nodeNetwork network.Network, monitorName string, amqpServerAddress string) (*BitswapTracer, error)

NewTracer creates a new Bitswap Tracer. Use the Shutdown method to shut down cleanly.

func (*BitswapTracer) Connected

func (wt *BitswapTracer) Connected(_ network.Network, conn network.Conn)

Connected implements network.Notifiee.

func (*BitswapTracer) Disconnected

func (wt *BitswapTracer) Disconnected(_ network.Network, conn network.Conn)

Disconnected implements network.Notifiee.

func (*BitswapTracer) Listen

Listen implements network.Notifiee. We do not use this at the moment.

func (*BitswapTracer) ListenClose

func (*BitswapTracer) ListenClose(network.Network, ma.Multiaddr)

ListenClose implements network.Notifiee. We do not use this at the moment.

func (*BitswapTracer) MessageReceived

func (wt *BitswapTracer) MessageReceived(peerID peer.ID, msg bsmsg.BitSwapMessage)

MessageReceived is called on incoming Bitswap messages.

func (*BitswapTracer) MessageSent

func (*BitswapTracer) MessageSent(peer.ID, bsmsg.BitSwapMessage)

MessageSent is called on outgoing Bitswap messages. We do not use this at the moment.

func (*BitswapTracer) Shutdown

func (wt *BitswapTracer) Shutdown()

Shutdown shuts down the tracer cleanly. This is idempotent, i.e. calling it multiple times does not cause problems.

type BlockPresence

type BlockPresence struct {
	// Cid is the referenced CID.
	Cid cid.Cid `json:"cid"`

	// Type indicates the block presence type.
	Type BlockPresenceType `json:"block_presence_type"`
}

A BlockPresence indicates the presence or absence of a block.

type BlockPresenceType

type BlockPresenceType int

BlockPresenceType is an enum for presence or absence notifications.

const (
	// Have indicates that the peer has the block.
	Have BlockPresenceType = 0
	// DontHave indicates that the peer does not have the block.
	DontHave BlockPresenceType = 1
)

Block presence constants.

type BroadcastCancelStatus

type BroadcastCancelStatus struct {
	BroadcastStatus
}

BroadcastCancelStatus describes the status of a send operation to a single peer as part of a Bitswap CANCEL broadcast.

type BroadcastSendStatus

type BroadcastSendStatus struct {
	TimestampBeforeSend time.Time `json:"timestamp_before_send"`
	SendDurationMillis  int64     `json:"send_duration_millis"`
	Error               error     `json:"error,omitempty"`
}

BroadcastSendStatus contains basic information about a send operation to a single peer as part of a Bitswap broadcast.

type BroadcastStatus

type BroadcastStatus struct {
	BroadcastSendStatus
	Peer peer.ID `json:"peer"`
	// Underlay addresses of the peer we were connected to when the message
	// was sent, or empty if there was an error.
	ConnectedAddresses []ma.Multiaddr `json:"connected_addresses,omitempty"`
}

BroadcastStatus contains additional basic information about a send operation to a single peer as part of a Bitswap broadcast.

type BroadcastWantCancelStatus

type BroadcastWantCancelStatus struct {
	Peer               peer.ID        `json:"peer"`
	ConnectedAddresses []ma.Multiaddr `json:"connected_addresses,omitempty"`

	WantStatus   BroadcastWantCancelWantStatus `json:"want_status"`
	CancelStatus BroadcastSendStatus           `json:"cancel_status"`
}

BroadcastWantCancelStatus describes the status of a send operation to a single peer as part of a Bitswap WANT+CANCEL broadcast.

type BroadcastWantCancelWantStatus

type BroadcastWantCancelWantStatus struct {
	BroadcastSendStatus
	RequestTypeSent *pbmsg.Message_Wantlist_WantType `json:"request_type_sent,omitempty"`
}

BroadcastWantCancelWantStatus contains information about the send-WANT operation to a single peer as part of a Bitswap WANT+CANCEL broadcast.

type BroadcastWantStatus

type BroadcastWantStatus struct {
	BroadcastStatus
	RequestTypeSent *pbmsg.Message_Wantlist_WantType `json:"request_type_sent,omitempty"`
}

BroadcastWantStatus describes the status of a send operation to a single peer as part of a Bitswap WANT broadcast.

type Config

type Config struct {
	// The interval at which to populate prometheus with statistics about
	// currently connected peers, streams, connections, etc.
	PopulatePrometheusInterval int `json:"PopulatePrometheusInterval"`

	// The number of top N agent versions that are reported to prometheus.
	// AVs are somewhat arbitrarily chosen strings and clutter prometheus.
	AgentVersionCutOff int `json:"AgentVersionCutOff"`

	// Whether to enable the Bitswap discovery probe.
	// This does not work on large monitoring nodes for recent versions of the
	// network, because establishing and holding many Bitswap streams seems to
	// be very resource-hungry.
	EnableBitswapDiscoveryProbe bool `json:"EnableBitswapDiscoveryProbe"`

	// The address of the AMQP server to send real-time data to.
	// If this is empty, the real-time tracer will not be set up.
	AMQPServerAddress string `json:"AMQPServerAddress"`

	// The name to tag real-time data with.
	// If unset, the hostname is used.
	MonitorName *string `json:"MonitorName,omitempty"`

	// Configuration of the HTTP server.
	HTTPServerConfig HTTPServerConfig `json:"HTTPServerConfig"`
}

Config contains all values configured via the standard IPFS config section on plugins.

type ConnectionEvent

type ConnectionEvent struct {
	// The multiaddress of the remote peer.
	Remote ma.Multiaddr `json:"remote"`

	// The type of this event.
	ConnectionEventType ConnectionEventType `json:"connection_event_type"`
}

A ConnectionEvent is the type pushed to remote clients for recorded connection events.

type ConnectionEventType

type ConnectionEventType int

ConnectionEventType specifies the type of connection event.

const (
	// Connected specifies that a connection was opened.
	Connected ConnectionEventType = 0
	// Disconnected specifies that a connection was closed.
	Disconnected ConnectionEventType = 1
)

type Event

type Event struct {
	// The timestamp at which the event was recorded.
	// This defines an ordering for events.
	Timestamp time.Time `json:"timestamp"`

	// Peer is a base58-encoded string representation of the peer ID.
	Peer string `json:"peer"`

	// BitswapMessage is not nil if this event is a bitswap message.
	BitswapMessage *BitswapMessage `json:"bitswap_message,omitempty"`

	// ConnectionEvent is not nil if this event is a connection event.
	ConnectionEvent *ConnectionEvent `json:"connection_event,omitempty"`
}

Event is the recording of an incoming message.

type HTTPServerConfig

type HTTPServerConfig struct {
	// ListenAddresses specifies the addresses to listen on.
	// It should take a form usable by `net.ResolveTCPAddr`, for example
	// `localhost:8181` or `0.0.0.0:1444`.
	ListenAddresses []string `json:"ListenAddresses"`
}

HTTPServerConfig is the config for the metric export HTTP server.

type JSONResponse

type JSONResponse struct {
	Status int         `json:"status"`
	Result interface{} `json:"result,omitempty"`
	Err    *string     `json:"error,omitempty"`
}

A JSONResponse is the format for every response returned by the HTTP server.

type MetricExporterPlugin

type MetricExporterPlugin struct {
	// contains filtered or unexported fields
}

MetricExporterPlugin holds the state of the metrics exporter plugin.

func (*MetricExporterPlugin) BroadcastBitswapCancel

func (mep *MetricExporterPlugin) BroadcastBitswapCancel(cids []cid.Cid) ([]BroadcastCancelStatus, error)

BroadcastBitswapCancel implements RPCAPI.

func (*MetricExporterPlugin) BroadcastBitswapWant

func (mep *MetricExporterPlugin) BroadcastBitswapWant(cids []cid.Cid) ([]BroadcastWantStatus, error)

BroadcastBitswapWant implements RPCAPI.

func (*MetricExporterPlugin) BroadcastBitswapWantCancel

func (mep *MetricExporterPlugin) BroadcastBitswapWantCancel(cids []cid.Cid, secondsBetween uint) ([]BroadcastWantCancelStatus, error)

BroadcastBitswapWantCancel implements RPCAPI.

func (*MetricExporterPlugin) Close

func (mep *MetricExporterPlugin) Close() error

Close implements io.Closer. This is called to cleanly shut down the plugin when the daemon exits. This is idempotent.

func (*MetricExporterPlugin) Init

func (mep *MetricExporterPlugin) Init(env *plugin.Environment) error

Init initializes this plugin with the given environment. This is part of the `plugin.Plugin` interface.

func (*MetricExporterPlugin) Name

func (*MetricExporterPlugin) Name() string

Name returns the name of this plugin. This is part of the `plugin.Plugin` interface.

func (*MetricExporterPlugin) Options

func (mep *MetricExporterPlugin) Options(info core.FXNodeInfo) ([]fx.Option, error)

Options implements FxPlugin. This is run _after_ Init and _before_ start, which means we have a validated config at this point.

func (*MetricExporterPlugin) Ping

func (*MetricExporterPlugin) Ping()

Ping implements RPCAPI.

func (*MetricExporterPlugin) SamplePeerMetadata

func (mep *MetricExporterPlugin) SamplePeerMetadata(onlyConnected bool) ([]PeerMetadata, int)

SamplePeerMetadata implements RPCAPI.

func (*MetricExporterPlugin) Start

func (mep *MetricExporterPlugin) Start(ipfsInstance *core.IpfsNode) error

Start starts this plugin. This is not run in a separate goroutine and should not block.

func (*MetricExporterPlugin) Version

func (*MetricExporterPlugin) Version() string

Version returns the version of this plugin. This is part of the `plugin.Plugin` interface.

type PeerMetadata

type PeerMetadata struct {

	// The ID of the peer.
	ID peer.ID `json:"peer_id"`
	// The connectedness, i.e., current connection status.
	Connectedness network.Connectedness `json:"connectedness"`
	// A list of known valid multiaddresses for this peer.
	// If the peer is not currently connected, this information might be
	// outdated.
	Multiaddrs []ma.Multiaddr `json:"multiaddresses"`
	// A list of known supported protocols for this peer.
	// If the peer is not currently connected, this information might be
	// outdated.
	Protocols []protocol.ID `json:"protocols"`

	// Agent version of the peer.
	// If we are no longer connected, this reports the last-seen agent version.
	// If the agent version is not (yet) known, this is "N/A".
	// If this is null, some other error occurred (which hopefully never
	// happens).
	AgentVersion *string `json:"agent_version"`
	// The EWMA of latencies to the peer.
	// If we are no longer connected, this reports the last-known average.
	// If this is null, we don't have latency information for the peer yet.
	LatencyEWMA *time.Duration `json:"latency_ewma_ns"`

	// A list of multiaddresses to which we currently hold a connection.
	ConnectedMultiaddrs []ma.Multiaddr `json:"connected_multiaddresses"`
}

PeerMetadata holds metadata about a peer. This applies to both currently-connected as well as past peers. For peers that are not currently connected, the ConnectedMultiaddrs field will be nil. The AgentVersion and LatencyEWMA are optional. They are usually present for connected peers. For no-longer-connected peers, these hold the last known value.

type PluginAPI

type PluginAPI interface {
	RPCAPI
}

PluginAPI describes the functionality provided by this monitor to remote clients.

type RPCAPI

type RPCAPI interface {
	// Ping is a no-op.
	Ping()

	// BroadcastBitswapWant broadcasts WANT_(HAVE|BLOCK) requests for the given
	// CIDs to all connected peers that support Bitswap.
	// Which request type to send is chosen by the capabilities of the remote
	// peer.
	// This is sent as one message, which is either sent completely or fails.
	// An error is returned if Bitswap discovery is unavailable.
	BroadcastBitswapWant(cids []cid.Cid) ([]BroadcastWantStatus, error)

	// BroadcastBitswapCancel broadcasts CANCEL entries for the given CIDs to
	// all connected peers that support Bitswap.
	// This is sent as one message, which is either sent completely or fails.
	// An error is returned if Bitswap discovery is unavailable.
	BroadcastBitswapCancel(cids []cid.Cid) ([]BroadcastCancelStatus, error)

	// BroadcastBitswapWantCancel broadcasts WANT_(HAVE|BLOCK) requests for the
	// given CIDs, followed by CANCEL entries after a given time to all
	// connected peers that support Bitswap.
	// An error is returned if Bitswap discovery is unavailable.
	BroadcastBitswapWantCancel(cids []cid.Cid, secondsBetween uint) ([]BroadcastWantCancelStatus, error)

	// SamplePeerMetadata returns information about seen/connected Peers.
	// It returns information about all peers of the peerstore (including past
	// peers not currently connected) as well as the number of currently
	// established connections.
	// The onlyConnected parameter specifies whether data should be filtered
	// for currently connected peers. This can be useful, as the size of the
	// peerstore, and thus the metadata returned by this function, grows without
	// bound.
	SamplePeerMetadata(onlyConnected bool) ([]PeerMetadata, int)
}

The RPCAPI is the interface for RPC-like method calls. These are served via HTTP, reliably.

type RabbitMQPublisher

type RabbitMQPublisher struct {
	// contains filtered or unexported fields
}

RabbitMQPublisher implements publishing events to RabbitMQ.

func (*RabbitMQPublisher) Close

func (p *RabbitMQPublisher) Close()

Close closes the publisher. Attempting to send messages after Close was called will result in a panic.

func (*RabbitMQPublisher) PublishBitswapMessage

func (p *RabbitMQPublisher) PublishBitswapMessage(ts time.Time, peer peer.ID, msg *BitswapMessage)

PublishBitswapMessage creates and attempts to publish a Bitswap message from the given values. The message may be dropped if the connection to RabbitMQ is too slow. This will never block.

func (*RabbitMQPublisher) PublishConnectionEvent

func (p *RabbitMQPublisher) PublishConnectionEvent(ts time.Time, connected bool, conn network.Conn)

PublishConnectionEvent creates and attempts to publish a connection event from the given values. The event may be dropped if the connection to RabbitMQ is too slow. This will never block.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL