Documentation ¶
Overview ¶
Package metricplugin implements a plugin for kubo (go-ipfs) to export additional metrics and Bitswap traffic from a running node.
Index ¶
- Constants
- Variables
- type BitswapDiscoveryProbe
- func (wt *BitswapDiscoveryProbe) Connected(_ network.Network, conn network.Conn)
- func (wt *BitswapDiscoveryProbe) Disconnected(_ network.Network, conn network.Conn)
- func (*BitswapDiscoveryProbe) Listen(network.Network, ma.Multiaddr)
- func (*BitswapDiscoveryProbe) ListenClose(network.Network, ma.Multiaddr)
- func (wt *BitswapDiscoveryProbe) Shutdown()
- type BitswapMessage
- type BitswapTracer
- func (wt *BitswapTracer) Connected(_ network.Network, conn network.Conn)
- func (wt *BitswapTracer) Disconnected(_ network.Network, conn network.Conn)
- func (*BitswapTracer) Listen(network.Network, ma.Multiaddr)
- func (*BitswapTracer) ListenClose(network.Network, ma.Multiaddr)
- func (wt *BitswapTracer) MessageReceived(peerID peer.ID, msg bsmsg.BitSwapMessage)
- func (*BitswapTracer) MessageSent(peer.ID, bsmsg.BitSwapMessage)
- func (wt *BitswapTracer) Shutdown()
- type BlockPresence
- type BlockPresenceType
- type BroadcastCancelStatus
- type BroadcastSendStatus
- type BroadcastStatus
- type BroadcastWantCancelStatus
- type BroadcastWantCancelWantStatus
- type BroadcastWantStatus
- type Config
- type ConnectionEvent
- type ConnectionEventType
- type Event
- type HTTPServerConfig
- type JSONResponse
- type MetricExporterPlugin
- func (mep *MetricExporterPlugin) BroadcastBitswapCancel(cids []cid.Cid) ([]BroadcastCancelStatus, error)
- func (mep *MetricExporterPlugin) BroadcastBitswapWant(cids []cid.Cid) ([]BroadcastWantStatus, error)
- func (mep *MetricExporterPlugin) BroadcastBitswapWantCancel(cids []cid.Cid, secondsBetween uint) ([]BroadcastWantCancelStatus, error)
- func (mep *MetricExporterPlugin) Close() error
- func (mep *MetricExporterPlugin) Init(env *plugin.Environment) error
- func (*MetricExporterPlugin) Name() string
- func (mep *MetricExporterPlugin) Options(info core.FXNodeInfo) ([]fx.Option, error)
- func (*MetricExporterPlugin) Ping()
- func (mep *MetricExporterPlugin) SamplePeerMetadata(onlyConnected bool) ([]PeerMetadata, int)
- func (mep *MetricExporterPlugin) Start(ipfsInstance *core.IpfsNode) error
- func (*MetricExporterPlugin) Version() string
- type PeerMetadata
- type PluginAPI
- type RPCAPI
- type RabbitMQPublisher
Constants ¶
const ( APIBasePath = "/metric_plugin/v1" APIPingPath = APIBasePath + "/ping" APIBroadcastWantPath = APIBasePath + "/broadcast_want" APIBroadcastCancelPath = APIBasePath + "/broadcast_cancel" APIBroadcastWantCancelPath = APIBasePath + "/broadcast_want_cancel" APISamplePeerMetadataPath = APIBasePath + "/sample_peer_metadata" )
API path constants.
const (
// APIOnlyConnectedPeersParameter is the parameter for the SamplePeerMetadata function.
APIOnlyConnectedPeersParameter = "only_connected"
)
API parameter constants.
const ExchangeName = "ipfs.passive_monitoring"
ExchangeName is the name of the exchange on RabbitMQ.
Variables ¶
var ( // ErrBitswapProbeDisabled is returned for Bitswap probing API calls if // the Bitswap probe is disabled via the plugin's configuration. ErrBitswapProbeDisabled = errors.New("Bitswap probe not enabled") )
var ErrInternalServerError = newPresentableError(http.StatusInternalServerError, "internal server error")
ErrInternalServerError is returned when there was either a panic processing a request or a non-presentable error was generated during execution.
var ErrInvalidRequest = newPresentableError(http.StatusBadRequest, "invalid request")
ErrInvalidRequest is returned for invalid requests, such as malformed JSON.
var ErrServerClosing = newPresentableError(http.StatusServiceUnavailable, "server closing")
ErrServerClosing is returned when a request is made while the node has started to shut down.
Functions ¶
This section is empty.
Types ¶
type BitswapDiscoveryProbe ¶
type BitswapDiscoveryProbe struct {
// contains filtered or unexported fields
}
BitswapDiscoveryProbe exposes functionality to perform Bitswap-only discovery for a set of CIDs. It implements `network.Notifiee` to receive notifications about peers connecting, disconnecting, etc. It keeps track of peers the node is connected to and attempts to hold an active Bitswap sender to each of them.
Using the `network.Notifiee` events is somewhat wonky: We occasionally receive duplicate disconnection events or miss connection events. However, it looks like we converge with the true IPFS connectivity after a while.
func NewProbe ¶
func NewProbe(nodeNetwork network.Network, bsnetImpl bsnet.BitSwapNetwork) *BitswapDiscoveryProbe
NewProbe creates a new Bitswap Probe and network Notifiee. This will spawn goroutines to log stats and connect to peers via Bitswap. Use the Shutdown method to shut down cleanly.
func (*BitswapDiscoveryProbe) Connected ¶
func (wt *BitswapDiscoveryProbe) Connected(_ network.Network, conn network.Conn)
Connected is called when a connection is opened.
func (*BitswapDiscoveryProbe) Disconnected ¶
func (wt *BitswapDiscoveryProbe) Disconnected(_ network.Network, conn network.Conn)
Disconnected is called when a connection is closed.
func (*BitswapDiscoveryProbe) Listen ¶
func (*BitswapDiscoveryProbe) Listen(network.Network, ma.Multiaddr)
Listen is called when the network implementation starts listening on the given address. We do not use this at the moment.
func (*BitswapDiscoveryProbe) ListenClose ¶
func (*BitswapDiscoveryProbe) ListenClose(network.Network, ma.Multiaddr)
ListenClose is called when the network implementation stops listening on the given address. We do not use this at the moment.
func (*BitswapDiscoveryProbe) Shutdown ¶
func (wt *BitswapDiscoveryProbe) Shutdown()
Shutdown shuts down the probe cleanly. This is idempotent, i.e. calling it multiple times does not cause problems. This will block until all goroutines have quit (this depends on some timeouts for connecting to peers and opening streams, probably).
type BitswapMessage ¶
type BitswapMessage struct { // Wantlist entries sent with this message. WantlistEntries []bsmsg.Entry `json:"wantlist_entries"` // Whether the wantlist entries are a full new wantlist. FullWantList bool `json:"full_wantlist"` // Blocks sent with this message. Blocks []cid.Cid `json:"blocks"` // Block presence indicators sent with this message. BlockPresences []BlockPresence `json:"block_presences"` // Underlay addresses of the peer we were connected to when the message // was received. ConnectedAddresses []ma.Multiaddr `json:"connected_addresses"` }
A BitswapMessage is the type pushed to remote clients for recorded incoming Bitswap messages.
type BitswapTracer ¶
type BitswapTracer struct {
// contains filtered or unexported fields
}
BitswapTracer implements the `bitswap.Tracer` interface to log Bitswap messages.
func NewTracer ¶
func NewTracer(nodeNetwork network.Network, monitorName string, amqpServerAddress string) (*BitswapTracer, error)
NewTracer creates a new Bitswap Tracer. Use the Shutdown method to shut down cleanly.
func (*BitswapTracer) Connected ¶
func (wt *BitswapTracer) Connected(_ network.Network, conn network.Conn)
Connected implements network.Notifiee.
func (*BitswapTracer) Disconnected ¶
func (wt *BitswapTracer) Disconnected(_ network.Network, conn network.Conn)
Disconnected implements network.Notifiee.
func (*BitswapTracer) Listen ¶
func (*BitswapTracer) Listen(network.Network, ma.Multiaddr)
Listen implements network.Notifiee. We do not use this at the moment.
func (*BitswapTracer) ListenClose ¶
func (*BitswapTracer) ListenClose(network.Network, ma.Multiaddr)
ListenClose implements network.Notifiee. We do not use this at the moment.
func (*BitswapTracer) MessageReceived ¶
func (wt *BitswapTracer) MessageReceived(peerID peer.ID, msg bsmsg.BitSwapMessage)
MessageReceived is called on incoming Bitswap messages.
func (*BitswapTracer) MessageSent ¶
func (*BitswapTracer) MessageSent(peer.ID, bsmsg.BitSwapMessage)
MessageSent is called on outgoing Bitswap messages. We do not use this at the moment.
func (*BitswapTracer) Shutdown ¶
func (wt *BitswapTracer) Shutdown()
Shutdown shuts down the tracer cleanly. This is idempotent, i.e. calling it multiple times does not cause problems.
type BlockPresence ¶
type BlockPresence struct { // Cid is the referenced CID. Cid cid.Cid `json:"cid"` // Type indicates the block presence type. Type BlockPresenceType `json:"block_presence_type"` }
A BlockPresence indicates the presence or absence of a block.
type BlockPresenceType ¶
type BlockPresenceType int
BlockPresenceType is an enum for presence or absence notifications.
const ( // Have indicates that the peer has the block. Have BlockPresenceType = 0 // DontHave indicates that the peer does not have the block. DontHave BlockPresenceType = 1 )
Block presence constants.
type BroadcastCancelStatus ¶
type BroadcastCancelStatus struct {
BroadcastStatus
}
BroadcastCancelStatus describes the status of a send operation to a single peer as part of a Bitswap CANCEL broadcast.
type BroadcastSendStatus ¶
type BroadcastSendStatus struct { TimestampBeforeSend time.Time `json:"timestamp_before_send"` SendDurationMillis int64 `json:"send_duration_millis"` Error error `json:"error,omitempty"` }
BroadcastSendStatus contains basic information about a send operation to a single peer as part of a Bitswap broadcast.
type BroadcastStatus ¶
type BroadcastStatus struct { BroadcastSendStatus Peer peer.ID `json:"peer"` // Underlay addresses of the peer we were connected to when the message // was sent, or empty if there was an error. ConnectedAddresses []ma.Multiaddr `json:"connected_addresses,omitempty"` }
BroadcastStatus contains additional basic information about a send operation to a single peer as part of a Bitswap broadcast.
type BroadcastWantCancelStatus ¶
type BroadcastWantCancelStatus struct { Peer peer.ID `json:"peer"` ConnectedAddresses []ma.Multiaddr `json:"connected_addresses,omitempty"` WantStatus BroadcastWantCancelWantStatus `json:"want_status"` CancelStatus BroadcastSendStatus `json:"cancel_status"` }
BroadcastWantCancelStatus describes the status of a send operation to a single peer as part of a Bitswap WANT+CANCEL broadcast.
type BroadcastWantCancelWantStatus ¶
type BroadcastWantCancelWantStatus struct { BroadcastSendStatus RequestTypeSent *pbmsg.Message_Wantlist_WantType `json:"request_type_sent,omitempty"` }
BroadcastWantCancelWantStatus contains information about the send-WANT operation to a single peer as part of a Bitswap WANT+CANCEL broadcast.
type BroadcastWantStatus ¶
type BroadcastWantStatus struct { BroadcastStatus RequestTypeSent *pbmsg.Message_Wantlist_WantType `json:"request_type_sent,omitempty"` }
BroadcastWantStatus describes the status of a send operation to a single peer as part of a Bitswap WANT broadcast.
type Config ¶
type Config struct { // The interval at which to populate prometheus with statistics about // currently connected peers, streams, connections, etc. PopulatePrometheusInterval int `json:"PopulatePrometheusInterval"` // The number of top N agent versions that are reported to prometheus. // AVs are somewhat arbitrarily chosen strings and clutter prometheus. AgentVersionCutOff int `json:"AgentVersionCutOff"` // Whether to enable the Bitswap discovery probe. // This does not work on large monitoring nodes for recent versions of the // network, because establishing and holding many Bitswap streams seems to // be very resource-hungry. EnableBitswapDiscoveryProbe bool `json:"EnableBitswapDiscoveryProbe"` // The address of the AMQP server to send real-time data to. // If this is empty, the real-time tracer will not be set up. AMQPServerAddress string `json:"AMQPServerAddress"` // The name to tag real-time data with. // If unset, the hostname is used. MonitorName *string `json:"MonitorName,omitempty"` // Configuration of the HTTP server. HTTPServerConfig HTTPServerConfig `json:"HTTPServerConfig"` }
Config contains all values configured via the standard IPFS config section on plugins.
type ConnectionEvent ¶
type ConnectionEvent struct { // The multiaddress of the remote peer. Remote ma.Multiaddr `json:"remote"` // The type of this event. ConnectionEventType ConnectionEventType `json:"connection_event_type"` }
A ConnectionEvent is the type pushed to remote clients for recorded connection events.
type ConnectionEventType ¶
type ConnectionEventType int
ConnectionEventType specifies the type of connection event.
const ( // Connected specifies that a connection was opened. Connected ConnectionEventType = 0 // Disconnected specifies that a connection was closed. Disconnected ConnectionEventType = 1 )
type Event ¶
type Event struct { // The timestamp at which the event was recorded. // This defines an ordering for events. Timestamp time.Time `json:"timestamp"` // Peer is a base58-encoded string representation of the peer ID. Peer string `json:"peer"` // BitswapMessage is not nil if this event is a bitswap message. BitswapMessage *BitswapMessage `json:"bitswap_message,omitempty"` // ConnectionEvent is not nil if this event is a connection event. ConnectionEvent *ConnectionEvent `json:"connection_event,omitempty"` }
Event is the recording of an incoming message.
type HTTPServerConfig ¶
type HTTPServerConfig struct { // ListenAddresses specifies the addresses to listen on. // It should take a form usable by `net.ResolveTCPAddr`, for example // `localhost:8181` or `0.0.0.0:1444`. ListenAddresses []string `json:"ListenAddresses"` }
HTTPServerConfig is the config for the metric export HTTP server.
type JSONResponse ¶
type JSONResponse struct { Status int `json:"status"` Result interface{} `json:"result,omitempty"` Err *string `json:"error,omitempty"` }
A JSONResponse is the format for every response returned by the HTTP server.
type MetricExporterPlugin ¶
type MetricExporterPlugin struct {
// contains filtered or unexported fields
}
MetricExporterPlugin holds the state of the metrics exporter plugin.
func (*MetricExporterPlugin) BroadcastBitswapCancel ¶
func (mep *MetricExporterPlugin) BroadcastBitswapCancel(cids []cid.Cid) ([]BroadcastCancelStatus, error)
BroadcastBitswapCancel implements RPCAPI.
func (*MetricExporterPlugin) BroadcastBitswapWant ¶
func (mep *MetricExporterPlugin) BroadcastBitswapWant(cids []cid.Cid) ([]BroadcastWantStatus, error)
BroadcastBitswapWant implements RPCAPI.
func (*MetricExporterPlugin) BroadcastBitswapWantCancel ¶
func (mep *MetricExporterPlugin) BroadcastBitswapWantCancel(cids []cid.Cid, secondsBetween uint) ([]BroadcastWantCancelStatus, error)
BroadcastBitswapWantCancel implements RPCAPI.
func (*MetricExporterPlugin) Close ¶
func (mep *MetricExporterPlugin) Close() error
Close implements io.Closer. This is called to cleanly shut down the plugin when the daemon exits. This is idempotent.
func (*MetricExporterPlugin) Init ¶
func (mep *MetricExporterPlugin) Init(env *plugin.Environment) error
Init initializes this plugin with the given environment. This is part of the `plugin.Plugin` interface.
func (*MetricExporterPlugin) Name ¶
func (*MetricExporterPlugin) Name() string
Name returns the name of this plugin. This is part of the `plugin.Plugin` interface.
func (*MetricExporterPlugin) Options ¶
func (mep *MetricExporterPlugin) Options(info core.FXNodeInfo) ([]fx.Option, error)
Options implements FxPlugin. This is run _after_ Init and _before_ start, which means we have a validated config at this point.
func (*MetricExporterPlugin) SamplePeerMetadata ¶
func (mep *MetricExporterPlugin) SamplePeerMetadata(onlyConnected bool) ([]PeerMetadata, int)
SamplePeerMetadata implements RPCAPI.
func (*MetricExporterPlugin) Start ¶
func (mep *MetricExporterPlugin) Start(ipfsInstance *core.IpfsNode) error
Start starts this plugin. This is not run in a separate goroutine and should not block.
func (*MetricExporterPlugin) Version ¶
func (*MetricExporterPlugin) Version() string
Version returns the version of this plugin. This is part of the `plugin.Plugin` interface.
type PeerMetadata ¶
type PeerMetadata struct { // The ID of the peer. ID peer.ID `json:"peer_id"` // The connectedness, i.e., current connection status. Connectedness network.Connectedness `json:"connectedness"` // A list of known valid multiaddresses for this peer. // If the peer is not currently connected, this information might be // outdated. Multiaddrs []ma.Multiaddr `json:"multiaddresses"` // A list of known supported protocols for this peer. // If the peer is not currently connected, this information might be // outdated. Protocols []protocol.ID `json:"protocols"` // Agent version of the peer. // If we are no longer connected, this reports the last-seen agent version. // If the agent version is not (yet) known, this is "N/A". // If this is null, some other error occurred (which hopefully never // happens). AgentVersion *string `json:"agent_version"` // The EWMA of latencies to the peer. // If we are no longer connected, this reports the last-known average. // If this is null, we don't have latency information for the peer yet. LatencyEWMA *time.Duration `json:"latency_ewma_ns"` // A list of multiaddresses to which we currently hold a connection. ConnectedMultiaddrs []ma.Multiaddr `json:"connected_multiaddresses"` }
PeerMetadata holds metadata about a peer. This applies to both currently-connected as well as past peers. For peers that are not currently connected, the ConnectedMultiaddrs field will be nil. The AgentVersion and LatencyEWMA are optional. They are usually present for connected peers. For no-longer-connected peers, these hold the last known value.
type PluginAPI ¶
type PluginAPI interface { RPCAPI }
PluginAPI describes the functionality provided by this monitor to remote clients.
type RPCAPI ¶
type RPCAPI interface { // Ping is a no-op. Ping() // BroadcastBitswapWant broadcasts WANT_(HAVE|BLOCK) requests for the given // CIDs to all connected peers that support Bitswap. // Which request type to send is chosen by the capabilities of the remote // peer. // This is sent as one message, which is either sent completely or fails. // An error is returned if Bitswap discovery is unavailable. BroadcastBitswapWant(cids []cid.Cid) ([]BroadcastWantStatus, error) // BroadcastBitswapCancel broadcasts CANCEL entries for the given CIDs to // all connected peers that support Bitswap. // This is sent as one message, which is either sent completely or fails. // An error is returned if Bitswap discovery is unavailable. BroadcastBitswapCancel(cids []cid.Cid) ([]BroadcastCancelStatus, error) // BroadcastBitswapWantCancel broadcasts WANT_(HAVE|BLOCK) requests for the // given CIDs, followed by CANCEL entries after a given time to all // connected peers that support Bitswap. // An error is returned if Bitswap discovery is unavailable. BroadcastBitswapWantCancel(cids []cid.Cid, secondsBetween uint) ([]BroadcastWantCancelStatus, error) // SamplePeerMetadata returns information about seen/connected Peers. // It returns information about all peers of the peerstore (including past // peers not currently connected) as well as the number of currently // established connections. // The onlyConnected parameter specifies whether data should be filtered // for currently connected peers. This can be useful, as the size of the // peerstore, and thus the metadata returned by this function, grows without // bound. SamplePeerMetadata(onlyConnected bool) ([]PeerMetadata, int) }
The RPCAPI is the interface for RPC-like method calls. These are served via HTTP, reliably.
type RabbitMQPublisher ¶
type RabbitMQPublisher struct {
// contains filtered or unexported fields
}
RabbitMQPublisher implements publishing events to RabbitMQ.
func (*RabbitMQPublisher) Close ¶
func (p *RabbitMQPublisher) Close()
Close closes the publisher. Attempting to send messages after Close was called will result in a panic.
func (*RabbitMQPublisher) PublishBitswapMessage ¶
func (p *RabbitMQPublisher) PublishBitswapMessage(ts time.Time, peer peer.ID, msg *BitswapMessage)
PublishBitswapMessage creates and attempts to publish a Bitswap message from the given values. The message may be dropped if the connection to RabbitMQ is too slow. This will never block.
func (*RabbitMQPublisher) PublishConnectionEvent ¶
PublishConnectionEvent creates and attempts to publish a connection event from the given values. The event may be dropped if the connection to RabbitMQ is too slow. This will never block.