Documentation ¶
Overview ¶
pubsub 包提供了消息传播的发布/订阅模式,也称为覆盖多播。 实现提供基于主题的 pubsub,支持可插拔的路由算法。
该库的主要接口是 PubSub 对象。 可以使用以下构造函数创建此对象:
- NewFloodSub 创建一个使用 floodsub 路由算法的实例。
- NewGossipSub 创建一个使用 gossipsub 路由算法的实例。
- NewRandomSub 创建一个使用 randomsub 路由算法的实例。
此外,还有一个通用构造函数,用于创建具有自定义 PubSubRouter 接口的 pubsub 实例。 目前,此过程保留供包内部使用。
一旦构造了 PubSub 实例,需要与对等节点建立一些连接; 实现依赖于环境对等发现,将引导和活动对等发现留给客户端。
要向某个主题发布消息,请使用 Publish;您不需要订阅主题即可发布。
要订阅主题,请使用 Subscribe;这将为您提供一个订阅接口,通过该接口可以推送新消息。
Package pubsub 提供了发布订阅系统的实现 ¶
package pubsub 定义了分布式存储网络的核心功能
package pubsub 提供了发布订阅功能的实现
Index ¶
- Constants
- Variables
- func Debug(args ...interface{})
- func Debugf(format string, args ...interface{})
- func DefaultMsgIdFn(pmsg *pb.Message) string
- func DefaultPeerFilter(pid peer.ID, topic string) bool
- func Error(args ...interface{})
- func Errorf(format string, args ...interface{})
- func Fatal(args ...interface{})
- func Fatalf(format string, args ...interface{})
- func FilterSubscriptions(subs []*pb.RPC_SubOpts, filter func(string) bool) []*pb.RPC_SubOpts
- func GetLevel() logrus.Level
- func GossipSubDefaultFeatures(feat GossipSubFeature, proto protocol.ID) bool
- func Info(args ...interface{})
- func Infof(format string, args ...interface{})
- func LogMessage(level logrus.Level, args ...interface{})
- func Logf(level logrus.Level, format string, args ...interface{})
- func Panic(args ...interface{})
- func Panicf(format string, args ...interface{})
- func ParseLevel(level string) (logrus.Level, error)
- func Print(args ...interface{})
- func Printf(format string, args ...interface{})
- func Println(args ...interface{})
- func ScoreParameterDecay(decay time.Duration) float64
- func ScoreParameterDecayWithBase(decay time.Duration, base time.Duration, decayToZero float64) float64
- func SetLevel(level logrus.Level)
- func SetOutput(output *os.File)
- func SetReportCaller(reportCaller bool)
- func Trace(args ...interface{})
- func Tracef(format string, args ...interface{})
- func Warn(args ...interface{})
- func Warnf(format string, args ...interface{})
- func WithError(err error) *logrus.Entry
- func WithField(key string, value interface{}) *logrus.Entry
- func WithFields(fields logrus.Fields) *logrus.Entry
- type AcceptStatus
- type BackoffConnectorFactory
- type BasicSeqnoValidator
- type Blacklist
- type CacheEntry
- type DefaultLogger
- type DiscoverOpt
- type EventTracer
- type EventType
- type ExtendedPeerScoreInspectFn
- type FloodSubRouter
- func (fs *FloodSubRouter) AcceptFrom(peer.ID) AcceptStatus
- func (fs *FloodSubRouter) AddPeer(p peer.ID, proto protocol.ID)
- func (fs *FloodSubRouter) Attach(p *PubSub)
- func (fs *FloodSubRouter) EnoughPeers(topic string, suggested int) bool
- func (fs *FloodSubRouter) HandleRPC(rpc *RPC)
- func (fs *FloodSubRouter) Join(topic string)
- func (fs *FloodSubRouter) Leave(topic string)
- func (fs *FloodSubRouter) Protocols() []protocol.ID
- func (fs *FloodSubRouter) Publish(msg *Message)
- func (fs *FloodSubRouter) RemovePeer(p peer.ID)
- type GossipSubFeature
- type GossipSubFeatureTest
- type GossipSubParams
- type GossipSubRouter
- func (gs *GossipSubRouter) AcceptFrom(p peer.ID) AcceptStatus
- func (gs *GossipSubRouter) AddPeer(p peer.ID, proto protocol.ID)
- func (gs *GossipSubRouter) Attach(p *PubSub)
- func (gs *GossipSubRouter) EnoughPeers(topic string, suggested int) bool
- func (gs *GossipSubRouter) HandleRPC(rpc *RPC)
- func (gs *GossipSubRouter) Join(topic string)
- func (gs *GossipSubRouter) Leave(topic string)
- func (gs *GossipSubRouter) Protocols() []protocol.ID
- func (gs *GossipSubRouter) Publish(msg *Message)
- func (gs *GossipSubRouter) RemovePeer(p peer.ID)
- func (gs *GossipSubRouter) WithDefaultTagTracer() Option
- type JSONTracer
- type MapBlacklist
- type Message
- type MessageCache
- func (mc *MessageCache) Get(mid string) (*Message, bool)
- func (mc *MessageCache) GetForPeer(mid string, p peer.ID) (*Message, int, bool)
- func (mc *MessageCache) GetGossipIDs(topic string) []string
- func (mc *MessageCache) Put(msg *Message)
- func (mc *MessageCache) SetMsgIdFn(msgID func(*Message) string)
- func (mc *MessageCache) Shift()
- type MessageMetadataOpt
- type MessageSignaturePolicy
- type MsgIdFunction
- type Node
- type NodeOption
- func WithNodeDiscovery(d discovery.Discovery) NodeOption
- func WithSetD(d int) NodeOption
- func WithSetDirectPeers(peers []peer.AddrInfo) NodeOption
- func WithSetDlo(dlo int) NodeOption
- func WithSetFollowupTime(t time.Duration) NodeOption
- func WithSetGossipFactor(f float64) NodeOption
- func WithSetHeartbeatInterval(interval time.Duration) NodeOption
- func WithSetLoadConfig(load bool) NodeOption
- func WithSetMaxMessageSize(size int) NodeOption
- func WithSetMaxPendingConns(n int) NodeOption
- func WithSetMaxTransmissionSize(size int) NodeOption
- func WithSetPubSubMode(mode PubSubType) NodeOption
- func WithSetSignMessages(sign bool) NodeOption
- func WithSetValidateMessages(validate bool) NodeOption
- type NodePubSub
- func (pubsub *NodePubSub) BroadcastWithTopic(topic string, data []byte) error
- func (pubsub *NodePubSub) CancelPubsubWithTopic(name string) error
- func (pubsub *NodePubSub) CancelSubscribeWithTopic(topic string) error
- func (pubsub *NodePubSub) GetTopic(name string) (*Topic, error)
- func (pubsub *NodePubSub) IsSubscribed(topic string) bool
- func (pubsub *NodePubSub) ListPeers(topic string) []peer.ID
- func (pubsub *NodePubSub) NotifyNewPeer(peer peer.ID) error
- func (pubsub *NodePubSub) Publish(topic string, data []byte) error
- func (pubsub *NodePubSub) Pubsub() *PubSub
- func (pubsub *NodePubSub) Subscribe(topic string, subscribe bool) (*Subscription, error)
- func (pubsub *NodePubSub) SubscribeWithTopic(topic string, handler PubSubMsgHandler, subscribe bool) error
- type NodeStatus
- type NodeStatusTracker
- func (nst *NodeStatusTracker) Start()
- func (nst *NodeStatusTracker) Stop()
- func (nst *NodeStatusTracker) SubscribeStatusChanges() chan StatusChange
- func (nst *NodeStatusTracker) UnsubscribeStatusChanges(ch chan StatusChange)
- func (nst *NodeStatusTracker) UpdateNodeStatus(pid peer.ID, status NodeStatus)
- type Option
- func WithAppSpecificRpcInspector(inspector func(peer.ID, *RPC) error) Option
- func WithBlacklist(b Blacklist) Option
- func WithDefaultValidator(val interface{}, opts ...ValidatorOpt) Option
- func WithDirectConnectTicks(t uint64) Option
- func WithDirectPeers(pis []peer.AddrInfo) Option
- func WithDiscovery(d discovery.Discovery, opts ...DiscoverOpt) Option
- func WithEventTracer(tracer EventTracer) Option
- func WithFloodPublish(floodPublish bool) Option
- func WithGossipSubParams(cfg GossipSubParams) Option
- func WithGossipSubProtocols(protos []protocol.ID, feature GossipSubFeatureTest) Option
- func WithMaxMessageSize(maxMessageSize int) Option
- func WithMessageAuthor(author peer.ID) Option
- func WithMessageIdFn(fn MsgIdFunction) Option
- func WithMessageSignaturePolicy(policy MessageSignaturePolicy) Option
- func WithMessageSigning(enabled bool) Option
- func WithNoAuthor() Option
- func WithPeerExchange(doPX bool) Option
- func WithPeerFilter(filter PeerFilter) Option
- func WithPeerGater(params *PeerGaterParams) Option
- func WithPeerOutboundQueueSize(size int) Option
- func WithPeerScore(params *PeerScoreParams, thresholds *PeerScoreThresholds) Option
- func WithPeerScoreInspect(inspect interface{}, period time.Duration) Option
- func WithProtocolMatchFn(m ProtocolMatchFn) Option
- func WithRawTracer(tracer RawTracer) Option
- func WithRetry(retries int) Option
- func WithSeenMessagesStrategy(strategy timecache.Strategy) Option
- func WithSeenMessagesTTL(ttl time.Duration) Option
- func WithStrictSignatureVerification(required bool) Option
- func WithSubscriptionFilter(subFilter SubscriptionFilter) Option
- func WithTimeout(d time.Duration) Option
- func WithValidateQueueSize(n int) Option
- func WithValidateThrottle(n int) Option
- func WithValidateWorkers(n int) Option
- type Options
- func (opt *Options) ApplyOptions(opts ...NodeOption) error
- func (o *Options) GetD() int
- func (o *Options) GetDirectPeers() []peer.AddrInfo
- func (o *Options) GetDlo() int
- func (o *Options) GetFollowupTime() time.Duration
- func (o *Options) GetGossipFactor() float64
- func (o *Options) GetHeartbeatInterval() time.Duration
- func (o *Options) GetLoadConfig() bool
- func (o *Options) GetMaxMessageSize() int
- func (o *Options) GetMaxPendingConns() int
- func (o *Options) GetMaxTransmissionSize() int
- func (opt *Options) GetNodeDiscovery() discovery.Discovery
- func (o *Options) GetPubSubMode() PubSubType
- func (o *Options) GetSignMessages() bool
- func (o *Options) GetValidateMessages() bool
- type PBTracer
- type PeerEvent
- type PeerFilter
- type PeerGaterParams
- type PeerMetadataStore
- type PeerScoreInspectFn
- type PeerScoreParams
- type PeerScoreSnapshot
- type PeerScoreThresholds
- type ProtocolMatchFn
- type ProvideKey
- type PubOpt
- type PubSub
- func NewFloodSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error)
- func NewFloodsubWithProtocols(ctx context.Context, h host.Host, ps []protocol.ID, opts ...Option) (*PubSub, error)
- func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error)
- func NewGossipSubWithRouter(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option) (*PubSub, error)
- func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option) (*PubSub, error)
- func NewRandomSub(ctx context.Context, h host.Host, size int, opts ...Option) (*PubSub, error)
- func (p *PubSub) BlacklistPeer(pid peer.ID)
- func (p *PubSub) GetTopics() []string
- func (p *PubSub) Join(topic string, opts ...TopicOpt) (*Topic, error)
- func (p *PubSub) ListPeers(topic string) []peer.ID
- func (ps *PubSub) NotifyNewPeer(peer peer.ID) error
- func (p *PubSub) RegisterTopicValidator(topic string, val interface{}, opts ...ValidatorOpt) error
- func (p *PubSub) Subscribe(topic string, opts ...SubOpt) (*Subscription, error)
- func (p *PubSub) UnregisterTopicValidator(topic string) error
- type PubSubMsgHandler
- type PubSubRouter
- type PubSubType
- type PublishOptions
- type RPC
- type RandomSubRouter
- func (rs *RandomSubRouter) AcceptFrom(peer.ID) AcceptStatus
- func (rs *RandomSubRouter) AddPeer(p peer.ID, proto protocol.ID)
- func (rs *RandomSubRouter) Attach(p *PubSub)
- func (rs *RandomSubRouter) EnoughPeers(topic string, suggested int) bool
- func (rs *RandomSubRouter) HandleRPC(rpc *RPC)
- func (rs *RandomSubRouter) Join(topic string)
- func (rs *RandomSubRouter) Leave(topic string)
- func (rs *RandomSubRouter) Protocols() []protocol.ID
- func (rs *RandomSubRouter) Publish(msg *Message)
- func (rs *RandomSubRouter) RemovePeer(p peer.ID)
- type RawTracer
- type RelayCancelFunc
- type RemoteTracer
- type RouterReady
- type StatusChange
- type SubOpt
- type Subscription
- type SubscriptionFilter
- type TimeCachedBlacklist
- type Topic
- func (t *Topic) Close() error
- func (t *Topic) EventHandler(opts ...TopicEventHandlerOpt) (*TopicEventHandler, error)
- func (t *Topic) ListPeers() []peer.ID
- func (t *Topic) Publish(ctx context.Context, data []byte, opts ...PubOpt) error
- func (t *Topic) PublishWithReply(ctx context.Context, data []byte, targetNodes ...peer.ID) ([]byte, error)
- func (t *Topic) Relay() (RelayCancelFunc, error)
- func (t *Topic) SetScoreParams(p *TopicScoreParams) error
- func (t *Topic) String() string
- func (t *Topic) Subscribe(opts ...SubOpt) (*Subscription, error)
- type TopicEventHandler
- type TopicEventHandlerOpt
- type TopicOpt
- type TopicOptions
- type TopicScoreParams
- type TopicScoreSnapshot
- type ValidationError
- type ValidationResult
- type Validator
- type ValidatorEx
- type ValidatorOpt
Constants ¶
const ( MinBackoffDelay = 100 * time.Millisecond // 最小退避延迟时间 MaxBackoffDelay = 10 * time.Second // 最大退避延迟时间 TimeToLive = 10 * time.Minute // 退避信息的存活时间 BackoffCleanupInterval = 1 * time.Minute // 清理退避信息的间隔时间 BackoffMultiplier = 2 // 退避时间的倍增因子 MaxBackoffJitterCoff = 100 // 最大退避抖动系数 MaxBackoffAttempts = 4 // 最大退避尝试次数 )
定义了一些常量,用于控制退避算法的参数
const ( FloodSubID = protocol.ID("/floodsub/1.0.0") // FloodSub协议ID FloodSubTopicSearchSize = 5 // FloodSub主题搜索大小 )
常量定义
const ( // GossipSubID_v10 是 GossipSub 协议的版本 1.0.0 的协议 ID。 // 它与 GossipSubID_v11 一起发布以实现向后兼容。 GossipSubID_v10 = protocol.ID("/meshsub/1.0.0") // GossipSubID_v11 是 GossipSub 协议的版本 1.1.0 的协议 ID。 // 参见规范了解 v1.1.0 与 v1.0.0 的详细比较: // https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md GossipSubID_v11 = protocol.ID("/meshsub/1.1.0") )
const ( // 协议支持基本的 GossipSub 网格 -- 与 gossipsub-v1.0 兼容 GossipSubFeatureMesh = iota // 协议支持在修剪时的对等节点交换 -- 与 gossipsub-v1.1 兼容 GossipSubFeaturePX )
const ( DefaultDecayInterval = time.Second // 默认的衰减间隔 DefaultDecayToZero = 0.01 // 默认的衰减到零值 )
const ( // StrictSign 生成签名并期望验证传入的签名 StrictSign = msgSigning | msgVerification // StrictNoSign 不生成签名,并丢弃和惩罚携带签名的传入消息 StrictNoSign = msgVerification // LaxSign 生成签名,并仅在存在签名时验证传入的签名 // 已弃用:建议严格启用或严格禁用签名。 LaxSign = msgSigning // LaxNoSign 不生成签名,并仅在存在签名时验证传入的签名 // 已弃用:建议严格启用或严格禁用签名。 LaxNoSign = 0 )
常量表示严格的签名和验证策略
const ( RejectBlacklstedPeer = "blacklisted peer" // 被列入黑名单的对等节点 RejectBlacklistedSource = "blacklisted source" // 被列入黑名单的来源 RejectMissingSignature = "missing signature" // 缺少签名 RejectUnexpectedSignature = "unexpected signature" // 意外的签名 RejectUnexpectedAuthInfo = "unexpected auth info" // 意外的身份验证信息 RejectInvalidSignature = "invalid signature" // 无效的签名 RejectValidationQueueFull = "validation queue full" // 验证队列已满 RejectValidationThrottled = "validation throttled" // 验证被限制 RejectValidationFailed = "validation failed" // 验证失败 RejectValidationIgnored = "validation ignored" // 验证被忽略 RejectSelfOrigin = "self originated message" // 自己发起的消息 )
拒绝消息的原因常量
const ( // ValidationAccept 表示消息验证通过,应该被接受并交付给应用程序并转发到网络 ValidationAccept = ValidationResult(0) // ValidationReject 表示消息验证失败,不应该交付给应用程序或转发到网络,并且转发该消息的对等节点应该被惩罚 ValidationReject = ValidationResult(1) // ValidationIgnore 表示消息应该被忽略,不交付给应用程序或转发到网络,但与 ValidationReject 不同,转发该消息的对等节点不会被惩罚 ValidationIgnore = ValidationResult(2) )
const DefaultMaxMessageSize = 1 << 20
DefaultMaxMessageSize 定义默认的最大消息大小为1MB
const (
DefaultPubsubProtocol = "/dep2p/pubsub/1.0.0"
)
DefaultPubsubProtocol 定义了默认的pubsub协议版本
const ( // RandomSubID 是 RandomSub 路由器使用的协议 ID RandomSubID = protocol.ID("/randomsub/1.0.0") )
const RemoteTracerProtoID = protocol.ID("/libp2p/pubsub/tracer/1.0.0")
RemoteTracerProtoID 是远程追踪协议的 ID
const SignPrefix = "libp2p-pubsub:"
SignPrefix 是签名前缀常量
Variables ¶
var ( // DiscoveryPollInitialDelay 是发现系统在首次启动后等待的时间 DiscoveryPollInitialDelay = 0 * time.Millisecond // DiscoveryPollInterval 是发现系统在检查是否需要更多对等点之间等待的大致时间 DiscoveryPollInterval = 1 * time.Second )
var ( // D 设置 GossipSub 主题网格的最佳度。 GossipSubD = 6 // Dlo 设置我们在 GossipSub 主题网格中保持的对等节点的下限。 GossipSubDlo = 5 // Dhi 设置我们在 GossipSub 主题网格中保持的对等节点的上限。 GossipSubDhi = 12 // Dscore 影响在由于过度订阅而修剪网格时如何选择对等节点。 GossipSubDscore = 4 // Dout 设置在主题网格中维护的出站连接的配额。 GossipSubDout = 2 // HistoryLength 控制用于 gossip 的消息缓存的大小。 GossipSubHistoryLength = 5 // HistoryGossip 控制我们将在 IHAVE gossip 消息中广告的缓存消息 ID 的数量。 GossipSubHistoryGossip = 3 // Dlazy 影响我们在每个心跳期间将 gossip 发送到的对等节点数量。 GossipSubDlazy = 6 // GossipFactor 影响我们在每个心跳期间将 gossip 发送到的对等节点数量。 GossipSubGossipFactor = 0.25 // GossipRetransmission 控制在开始忽略对等节点之前允许对等节点通过 IWANT gossip 请求相同消息 ID 的次数。 GossipSubGossipRetransmission = 3 // HeartbeatInitialDelay 是路由器初始化后心跳计时器开始之前的短暂延迟。 GossipSubHeartbeatInitialDelay = 100 * time.Millisecond // HeartbeatInterval 控制心跳之间的时间。 GossipSubHeartbeatInterval = 1 * time.Second // FanoutTTL 控制我们跟踪 fanout 状态的时间。 GossipSubFanoutTTL = 60 * time.Second // PrunePeers 控制修剪对等节点交换中的对等节点数量。 GossipSubPrunePeers = 16 // PruneBackoff 控制修剪对等节点的回退时间。 GossipSubPruneBackoff = time.Minute // UnsubscribeBackoff 控制取消订阅主题时使用的回退时间。 GossipSubUnsubscribeBackoff = 10 * time.Second // Connectors 控制通过 PX 获取的对等节点的活动连接尝试数量。 GossipSubConnectors = 8 // MaxPendingConnections 设置通过 px 尝试的对等节点的最大挂起连接数。 GossipSubMaxPendingConnections = 128 // ConnectionTimeout 控制连接尝试的超时时间。 GossipSubConnectionTimeout = 30 * time.Second // DirectConnectTicks 是尝试重新连接当前未连接的直接对等节点的心跳滴答次数。 GossipSubDirectConnectTicks uint64 = 300 // DirectConnectInitialDelay 是在打开与直接对等节点的连接之前的初始延迟。 GossipSubDirectConnectInitialDelay = time.Second // OpportunisticGraftTicks 是尝试通过机会性移植改善网格的心跳滴答次数。 GossipSubOpportunisticGraftTicks uint64 = 60 // OpportunisticGraftPeers 是机会性移植的对等节点数量。 GossipSubOpportunisticGraftPeers = 2 // GraftFloodThreshold 是在最后一次修剪后经过的时间内 GRAFT 提供的额外分数惩罚。 GossipSubGraftFloodThreshold = 10 * time.Second // MaxIHaveLength 是包含在 IHAVE 消息中的最大消息数量。 GossipSubMaxIHaveLength = 5000 // MaxIHaveMessages 是在心跳期间从对等节点接受的最大 IHAVE 消息数量。 GossipSubMaxIHaveMessages = 10 // IWantFollowupTime 是在 IHAVE 广告之后通过 IWANT 请求消息的等待时间。 GossipSubIWantFollowupTime = 3 * time.Second )
定义 gossipsub 的默认参数。
var ( DefaultPeerGaterRetainStats = 6 * time.Hour // 保留统计数据的默认时间 DefaultPeerGaterQuiet = time.Minute // 安静期的默认时间 DefaultPeerGaterDuplicateWeight = 0.125 // 重复消息的权重 DefaultPeerGaterIgnoreWeight = 1.0 // 忽略消息的权重 DefaultPeerGaterRejectWeight = 16.0 // 拒绝消息的权重 DefaultPeerGaterThreshold = 0.33 // 门控阈值 DefaultPeerGaterGlobalDecay = ScoreParameterDecay(2 * time.Minute) // 全局衰减参数 DefaultPeerGaterSourceDecay = ScoreParameterDecay(time.Hour) // 来源衰减参数 )
默认参数定义
var ( // TimeCacheDuration 指定消息ID会被记住多长时间 TimeCacheDuration = 120 * time.Second // TimeCacheStrategy 指定 seen messages cache 的查找/清理策略 TimeCacheStrategy = timecache.Strategy_FirstSeen // ErrSubscriptionCancelled 当订阅被取消后调用 Next() 时可能返回的错误 ErrSubscriptionCancelled = errors.New("subscription cancelled") )
var ( // GossipSubConnTagBumpMessageDelivery 表示每次一个对等节点首次在一个主题中传递消息时,添加到连接管理器标签中的量。 // 每次对等节点首次在一个主题中传递消息时,我们将此标签增加该量,直到最大值 GossipSubConnTagMessageDeliveryCap。 // 注意,传递标签会随着时间衰减,在每个 GossipSubConnTagDecayInterval 期间减少 GossipSubConnTagDecayAmount。 GossipSubConnTagBumpMessageDelivery = 1 // GossipSubConnTagDecayInterval 是连接管理器标签衰减的时间间隔。 GossipSubConnTagDecayInterval = 10 * time.Minute // GossipSubConnTagDecayAmount 是在每个衰减间隔期间从衰减标签值中减去的量。 GossipSubConnTagDecayAmount = 1 // GossipSubConnTagMessageDeliveryCap 是用于跟踪消息传递的连接管理器标签的最大值。 GossipSubConnTagMessageDeliveryCap = 15 )
var ErrEmptyPeerID = errors.New("空的对等节点 ID")
ErrEmptyPeerID 表示如果提供了一个空的对等节点 ID,将返回此错误。
var ErrNilSignKey = errors.New("空的私钥")
ErrNilSignKey 表示如果提供了一个空的私钥,将返回此错误。
var ErrTooManySubscriptions = errors.New("too many subscriptions")
ErrTooManySubscriptions 可能由 SubscriptionFilter 返回,以表示订阅过多无法处理的错误
var ErrTopicClosed = errors.New("主题已关闭,请打开一个新的主题")
ErrTopicClosed 表示如果在主题关闭后使用 Topic,将返回此错误。
var GossipSubDefaultProtocols = []protocol.ID{GossipSubID_v11, GossipSubID_v10, FloodSubID}
GossipSubDefaultProtocols 是默认的 gossipsub 路由器协议列表
var MinTraceBatchSize = 16
MinTraceBatchSize 设置最小批处理大小,默认为 16。
var (
// RandomSubD 是 RandomSub 路由器使用的最小对等节点数
RandomSubD = 6
)
var TraceBufferSize = 1 << 16
TraceBufferSize 设置追踪缓冲区大小,默认为 64K。
Functions ¶
func Debugf ¶
func Debugf(format string, args ...interface{})
Debugf 记录格式化的调试级别日志 参数:
- format: 格式化字符串
- args: 格式化参数
func DefaultPeerFilter ¶
DefaultPeerFilter 接受所有主题的所有 peers 参数:
- pid: peer ID
- topic: 主题
返回值:
- bool: 是否接受该 peer
func Errorf ¶
func Errorf(format string, args ...interface{})
Errorf 记录格式化的错误级别日志 参数:
- format: 格式化字符串
- args: 格式化参数
func Fatalf ¶
func Fatalf(format string, args ...interface{})
Fatalf 记录格式化的致命级别日志 参数:
- format: 格式化字符串
- args: 格式化参数
func FilterSubscriptions ¶
func FilterSubscriptions(subs []*pb.RPC_SubOpts, filter func(string) bool) []*pb.RPC_SubOpts
FilterSubscriptions 过滤并去重订阅列表。 filter 应返回 true 如果一个主题是感兴趣的。 参数: - subs: 包含订阅通知的 RPC_SubOpts 列表 - filter: 用于确定是否感兴趣的过滤函数 返回值: - []*pb.RPC_SubOpts: 过滤后的订阅列表
func GossipSubDefaultFeatures ¶
func GossipSubDefaultFeatures(feat GossipSubFeature, proto protocol.ID) bool
GossipSubDefaultFeatures 是默认 gossipsub 协议的功能测试函数 参数:
- feat: 要测试的功能
- proto: 协议ID
返回值:
- bool: 如果协议支持该功能,则返回 true
func Infof ¶
func Infof(format string, args ...interface{})
Infof 记录格式化的信息级别日志 参数:
- format: 格式化字符串
- args: 格式化参数
func Panicf ¶
func Panicf(format string, args ...interface{})
Panicf 记录格式化的 panic 级别日志 参数:
- format: 格式化字符串
- args: 格式化参数
func Printf ¶
func Printf(format string, args ...interface{})
Printf 记录格式化的信息级别日志 参数:
- format: 格式化字符串
- args: 格式化参数
func ScoreParameterDecay ¶
ScoreParameterDecay 计算参数的衰减因子,假设 DecayInterval 为 1s 并且值在低于 0.01 时衰减到零 参数:
- decay: 衰减时间
返回值:
- float64: 衰减因子
func ScoreParameterDecayWithBase ¶
func ScoreParameterDecayWithBase(decay time.Duration, base time.Duration, decayToZero float64) float64
ScoreParameterDecayWithBase 使用基准 DecayInterval 计算参数的衰减因子 参数:
- decay: 衰减时间
- base: 基准衰减间隔
- decayToZero: 衰减到零值
返回值:
- float64: 衰减因子
func Warnf ¶
func Warnf(format string, args ...interface{})
Warnf 记录格式化的警告级别日志 参数:
- format: 格式化字符串
- args: 格式化参数
Types ¶
type AcceptStatus ¶
type AcceptStatus int
AcceptStatus 是表示是否接受传入 RPC 的枚举
const ( // AcceptNone 表示丢弃传入的 RPC AcceptNone AcceptStatus = iota // AcceptControl 表示仅接受传入 RPC 的控制消息 AcceptControl // AcceptAll 表示接受传入 RPC 的全部内容 AcceptAll )
type BackoffConnectorFactory ¶
type BackoffConnectorFactory func(host host.Host) (*discimpl.BackoffConnector, error)
BackoffConnectorFactory 创建一个附加到给定主机的 BackoffConnector
type BasicSeqnoValidator ¶
type BasicSeqnoValidator struct {
// contains filtered or unexported fields
}
BasicSeqnoValidator 是一个基本验证器,可用作默认验证器,忽略超出已见缓存窗口的重放消息。 验证器使用消息序号作为对等节点特定的 nonce 来决定是否应传播消息,比较对等节点元数据存储中的最大 nonce。 这有助于确保无论已见缓存跨度和网络直径如何,网络中都不会存在无限传播的消息。 它要求 pubsub 实例化时具有严格的消息签名策略,并且序号未被禁用,即不支持匿名模式。
警告:请参阅 https://github.com/libp2p/rust-libp2p/issues/3453 简而言之:rust 当前通过发出随机序号违反了规范,这带来了互操作性风险。 我们预计此问题将在不久的将来得到解决,但如果您处于与(较旧的)rust 节点混合的环境中,请牢记这一点。
type CacheEntry ¶
type CacheEntry struct {
// contains filtered or unexported fields
}
CacheEntry 表示消息缓存条目。
type DefaultLogger ¶
DefaultLogger 是默认的日志记录器实现
type DiscoverOpt ¶
type DiscoverOpt func(*discoverOptions) error
DiscoverOpt 是一个函数类型,用于配置发现选项
func WithDiscoverConnector ¶
func WithDiscoverConnector(connFactory BackoffConnectorFactory) DiscoverOpt
WithDiscoverConnector 添加一个自定义连接器,该连接器处理发现子系统如何连接到对等节点 参数:
- connFactory: 退避连接器工厂,用于创建一个自定义的连接器
返回值:
- DiscoverOpt: 配置发现选项的函数,用于设置发现配置
func WithDiscoveryOpts ¶
func WithDiscoveryOpts(opts ...discovery.Option) DiscoverOpt
WithDiscoveryOpts 传递 libp2p 发现选项到 PubSub 发现子系统 参数:
- opts: 发现选项,这些选项将被应用到发现子系统中
返回值:
- DiscoverOpt: 配置发现选项的函数,用于设置发现配置
type EventTracer ¶
type EventTracer interface { // Trace 方法用于记录一个追踪事件。 Trace(evt *pb.TraceEvent) }
EventTracer 是一个通用的事件追踪器接口。 这是一个高级追踪接口,它传递由 pb/trace.proto 中定义的追踪事件。
type ExtendedPeerScoreInspectFn ¶
type ExtendedPeerScoreInspectFn = func(map[peer.ID]*PeerScoreSnapshot)
ExtendedPeerScoreInspectFn 定义扩展的对等节点分数检查函数类型
type FloodSubRouter ¶
type FloodSubRouter struct {
// contains filtered or unexported fields
}
FloodSubRouter 结构体定义了FloodSub路由器
func (*FloodSubRouter) AcceptFrom ¶
func (fs *FloodSubRouter) AcceptFrom(peer.ID) AcceptStatus
AcceptFrom 决定是否接受来自特定对等节点的消息 参数:
- peer: 对等节点ID
返回值:
- AcceptStatus: 接受状态
func (*FloodSubRouter) AddPeer ¶
func (fs *FloodSubRouter) AddPeer(p peer.ID, proto protocol.ID)
AddPeer 添加对等节点到FloodSubRouter 参数:
- p: 对等节点ID
- proto: 协议ID
func (*FloodSubRouter) Attach ¶
func (fs *FloodSubRouter) Attach(p *PubSub)
Attach 将FloodSubRouter附加到PubSub实例 参数:
- p: PubSub实例
func (*FloodSubRouter) EnoughPeers ¶
func (fs *FloodSubRouter) EnoughPeers(topic string, suggested int) bool
EnoughPeers 检查是否有足够的对等节点来支持特定主题。 参数:
- topic: 主题名称
- suggested: 建议的对等节点数量
返回值:
- bool: 是否有足够的对等节点支持该主题
func (*FloodSubRouter) HandleRPC ¶
func (fs *FloodSubRouter) HandleRPC(rpc *RPC)
HandleRPC 处理接收到的RPC消息 参数:
- rpc: RPC消息
func (*FloodSubRouter) Leave ¶
func (fs *FloodSubRouter) Leave(topic string)
Leave 离开主题 参数:
- topic: 主题
func (*FloodSubRouter) Protocols ¶
func (fs *FloodSubRouter) Protocols() []protocol.ID
Protocols 返回FloodSubRouter支持的协议列表 返回值:
- []protocol.ID: 支持的协议列表
func (*FloodSubRouter) Publish ¶
func (fs *FloodSubRouter) Publish(msg *Message)
Publish 发布消息到主题 参数:
- msg: 消息
func (*FloodSubRouter) RemovePeer ¶
func (fs *FloodSubRouter) RemovePeer(p peer.ID)
RemovePeer 从FloodSubRouter中移除对等节点 参数:
- p: 对等节点ID
type GossipSubFeatureTest ¶
type GossipSubFeatureTest = func(GossipSubFeature, protocol.ID) bool
GossipSubFeatureTest 是一个功能测试函数;它接受一个功能和协议ID,并且如果协议支持该功能则返回 true
type GossipSubParams ¶
type GossipSubParams struct { // D 设置 GossipSub 主题网格的理想度数。例如,如果 D == 6, // 每个节点希望在他们订阅的每个主题中保持大约六个节点在他们的网格中。 // D 应该设置在 Dlo 和 Dhi 之间的某个值。 D int // Dlo 设置 GossipSub 主题网格中保持的最少节点数。 // 如果我们拥有的节点少于 Dlo,我们将在下一个心跳中尝试添加更多节点到网格中。 Dlo int // Dhi 设置 GossipSub 主题网格中保持的最多节点数。 // 如果我们拥有的节点多于 Dhi,我们将在下一个心跳中选择一些节点从网格中移除。 Dhi int // Dscore 影响由于过度订阅而修剪网格时选择保留哪些节点。 // 保留的节点中至少 Dscore 个将是高分节点,而其余的节点将随机选择。 Dscore int // Dout 设置在主题网格中保持的出站连接的配额。 // 当网格因过度订阅而被修剪时,我们确保至少与 Dout 个幸存节点保持出站连接。 // 这可以防止 sybil 攻击者通过大量的入站连接压倒我们的网格。 // // Dout 必须设置在 Dlo 之下,并且不能超过 D / 2。 Dout int // HistoryLength 控制用于 gossip 的消息缓存的大小。 // 消息缓存将在 HistoryLength 个心跳内记住消息。 HistoryLength int // HistoryGossip 控制我们将在 IHAVE gossip 消息中通告的缓存消息 ID 的数量。 // 当被要求提供我们看到的消息 ID 时,我们将只返回来自最近 HistoryGossip 次心跳的那些消息。 // HistoryGossip 和 HistoryLength 之间的差距使我们避免广告即将过期的消息。 // // HistoryGossip 必须小于或等于 HistoryLength 以避免运行时 panic。 HistoryGossip int // Dlazy 影响我们在每次心跳时将 gossip 发送给多少节点。 // 我们将至少向 Dlazy 个网格外部的节点发送 gossip。实际的数量可能更多, // 取决于 GossipFactor 和我们连接的节点数。 Dlazy int // GossipFactor 影响我们在每次心跳时将 gossip 发送给多少节点。 // 我们将 gossip 发送给 GossipFactor * (非网格节点的总数),或者 Dlazy, // 以较大者为准。 GossipFactor float64 // GossipRetransmission 控制在开始忽略节点之前,我们允许节点通过 IWANT gossip 请求相同消息 ID 的次数。 // 这样设计是为了防止节点通过请求消息浪费我们的资源。 GossipRetransmission int // HeartbeatInitialDelay 是在路由器初始化后,心跳定时器开始之前的短暂延迟。 HeartbeatInitialDelay time.Duration // HeartbeatInterval 控制心跳之间的时间间隔。 HeartbeatInterval time.Duration // SlowHeartbeatWarning 是心跳处理时间超过该阈值时触发警告的持续时间;这表明节点可能过载。 SlowHeartbeatWarning float64 // FanoutTTL 控制我们保持 fanout 状态的时间。如果自上次我们发布到一个我们没有订阅的主题以来, // 已经过了 FanoutTTL,我们将删除该主题的 fanout 映射。 FanoutTTL time.Duration // PrunePeers 控制在 prune Peer eXchange 中包含的节点数量。 // 当我们修剪一个符合 PX 条件的节点(得分良好等)时,我们会尝试向他们发送我们知道的最多 PrunePeers 个节点的签名节点记录。 PrunePeers int // PruneBackoff 控制被修剪节点的退避时间。这是节点在被修剪后重新尝试加入我们网格之前必须等待的时间。 // 当修剪节点时,我们会向他们发送我们设置的 PruneBackoff 值,以便他们知道最短等待时间。 // 运行旧版本的节点可能不会发送退避时间,所以如果我们收到没有退避时间的 prune 消息, // 我们将在重新加入前等待至少 PruneBackoff 时间。 PruneBackoff time.Duration // UnsubscribeBackoff 控制取消订阅主题时使用的退避时间。 // 节点在此期间不应重新订阅该主题。 UnsubscribeBackoff time.Duration // Connectors 控制通过 PX 获取的节点的活动连接尝试数量。 Connectors int // MaxPendingConnections 设置通过 PX 尝试连接的节点的最大挂起连接数。 MaxPendingConnections int // ConnectionTimeout 控制连接尝试的超时时间。 ConnectionTimeout time.Duration // DirectConnectTicks 是尝试重新连接当前未连接的直接节点的心跳刻度数。 DirectConnectTicks uint64 // DirectConnectInitialDelay 是开始与直接节点建立连接之前的初始延迟。 DirectConnectInitialDelay time.Duration // OpportunisticGraftTicks 是通过机会性 grafting 改善网格的心跳刻度数。 // 每当经过 OpportunisticGraftTicks 次心跳时,我们将尝试选择一些高分网格节点来替换低分的节点, // 如果我们网格节点的中位数得分低于阈值(见 https://godoc.org/bpfs#PeerScoreThresholds)。 OpportunisticGraftTicks uint64 // OpportunisticGraftPeers 是要机会性 graft 的节点数。 OpportunisticGraftPeers int // 如果在上次 PRUNE 之后 GraftFloodThreshold 时间内收到 GRAFT, // 则对节点应用额外的得分惩罚,通过 P7 来实现。 GraftFloodThreshold time.Duration // MaxIHaveLength 是 IHAVE 消息中包含的最大消息数量。 // 还控制在一个心跳内,我们从节点接受和请求的 IHAVE ids 的最大数量,以防止 IHAVE 泛滥。 // 如果您的系统在 HistoryGossip 心跳中推送了超过 5000 条消息,您应调整该值; // 默认情况下,这意味着 1666 条消息/秒。 MaxIHaveLength int // MaxIHaveMessages 是在一个心跳内从节点接受的 IHAVE 消息的最大数量。 MaxIHaveMessages int // IWantFollowupTime 是在 IHAVE 广告后等待通过 IWANT 请求的消息的时间。 // 如果在此窗口内未收到消息,则声明违反承诺,路由器可能会应用行为惩罚。 IWantFollowupTime time.Duration }
GossipSubParams 定义了所有 gossipsub 特定的参数。
func DefaultGossipSubParams ¶
func DefaultGossipSubParams() GossipSubParams
DefaultGossipSubParams 返回默认的 gossipsub 参数。 返回值:
- GossipSubParams: gossipsub 参数
type GossipSubRouter ¶
type GossipSubRouter struct {
// contains filtered or unexported fields
}
GossipSubRouter 是一个实现 gossipsub 协议的路由器。 对于我们加入的每个主题,我们维护一个消息流过的覆盖层;这是 mesh map。 对于我们发布但没有加入的每个主题,我们维护一个对等节点列表,用于在覆盖层中注入我们的消息;这是 fanout map。 如果我们没有发布任何消息到 fanout 主题的 fanout 对等节点列表在 GossipSubFanoutTTL 之后将过期。
func DefaultGossipSubRouter ¶
func DefaultGossipSubRouter(h host.Host) *GossipSubRouter
DefaultGossipSubRouter 返回一个带有默认参数的新的 GossipSubRouter。 参数:
- h: 主机
返回值:
- *GossipSubRouter: GossipSubRouter 对象
func (*GossipSubRouter) AcceptFrom ¶
func (gs *GossipSubRouter) AcceptFrom(p peer.ID) AcceptStatus
AcceptFrom 检查是否接受来自对等节点的消息。 参数:
- p: peer.ID 类型,表示对等节点的 ID。
返回值:
- AcceptStatus: 返回接受状态,表示是否接受消息。
func (*GossipSubRouter) AddPeer ¶
func (gs *GossipSubRouter) AddPeer(p peer.ID, proto protocol.ID)
AddPeer 添加对等节点。 参数:
- p: peer.ID 类型,表示对等节点的 ID。
- proto: protocol.ID 类型,表示协议 ID。
func (*GossipSubRouter) Attach ¶
func (gs *GossipSubRouter) Attach(p *PubSub)
Attach 将 GossipSubRouter 附加到 PubSub 实例。 参数:
- p: PubSub 实例
func (*GossipSubRouter) EnoughPeers ¶
func (gs *GossipSubRouter) EnoughPeers(topic string, suggested int) bool
EnoughPeers 检查主题是否有足够的对等节点。 参数:
- topic: string 类型,表示主题名称。
- suggested: int 类型,表示建议的对等节点数量。
返回值:
- bool: 返回布尔值,表示是否有足够的对等节点。
func (*GossipSubRouter) HandleRPC ¶
func (gs *GossipSubRouter) HandleRPC(rpc *RPC)
HandleRPC 处理 RPC 消息。 参数:
- rpc: *RPC 类型,表示要处理的 RPC 消息。
func (*GossipSubRouter) Join ¶
func (gs *GossipSubRouter) Join(topic string)
Join 加入主题。 参数:
- topic: string 类型,表示主题名称。
func (*GossipSubRouter) Leave ¶
func (gs *GossipSubRouter) Leave(topic string)
Leave 离开主题。 参数:
- topic: string 类型,表示主题名称。
func (*GossipSubRouter) Protocols ¶
func (gs *GossipSubRouter) Protocols() []protocol.ID
Protocols 返回协议列表。 返回值:
- []protocol.ID: 协议列表
func (*GossipSubRouter) Publish ¶
func (gs *GossipSubRouter) Publish(msg *Message)
Publish 发布消息。 参数:
- msg: *Message 类型,表示要发布的消息。
func (*GossipSubRouter) RemovePeer ¶
func (gs *GossipSubRouter) RemovePeer(p peer.ID)
RemovePeer 移除对等节点。 参数:
- p: peer.ID 类型,表示对等节点的 ID。
func (*GossipSubRouter) WithDefaultTagTracer ¶
func (gs *GossipSubRouter) WithDefaultTagTracer() Option
WithDefaultTagTracer 返回 GossipSubRouter 的标签跟踪器作为 PubSub 选项。 这对于 GossipSubRouter 在外部实例化并作为依赖项注入 GossipSub 构造函数的情况很有用。 这允许标签跟踪器也作为 PubSub 选项依赖项注入 GossipSub 构造函数。
type JSONTracer ¶
type JSONTracer struct {
// contains filtered or unexported fields
}
JSONTracer 是一个将事件写入文件的追踪器,事件以 ndjson 格式编码。
func NewJSONTracer ¶
func NewJSONTracer(file string) (*JSONTracer, error)
NewJSONTracer 创建一个新的 JSONTracer,将追踪信息写入文件。 参数:
- file: 文件路径
返回值:
- *JSONTracer: JSONTracer 对象
- error: 错误信息
func OpenJSONTracer ¶
OpenJSONTracer 创建一个新的 JSONTracer,可以显式控制文件打开的标志和权限。 参数:
- file: 文件路径
- flags: 文件打开标志
- perm: 文件权限
返回值:
- *JSONTracer: JSONTracer 对象
- error: 错误信息
func (*JSONTracer) Trace ¶
func (t *JSONTracer) Trace(evt *pb.TraceEvent)
Trace 向追踪器添加一个事件 参数:
- evt: 要添加的事件
type MapBlacklist ¶
MapBlacklist 是一种使用map实现的黑名单
type Message ¶
type Message struct { *pb.Message // 嵌入的 Protocol Buffers 生成的消息结构体 ID string // 消息的唯一标识符 ReceivedFrom peer.ID // 发送该消息的节点ID ValidatorData interface{} // 验证器相关数据,可能包含验证消息的元数据 Local bool // 指示消息是否是本地生成的 }
Message 表示一个消息
type MessageCache ¶
type MessageCache struct {
// contains filtered or unexported fields
}
MessageCache 是一个滑动窗口缓存,记住消息长达一定的历史长度。
func NewMessageCache ¶
func NewMessageCache(gossip, history int) *MessageCache
NewMessageCache 创建一个滑动窗口缓存,记住消息长达 `history` 个插槽。 当查询要通告的消息时,缓存仅返回最后 `gossip` 个插槽中的消息。 `gossip` 参数必须小于或等于 `history`,否则该函数会引发 panic。 在通过 IHAVE gossip 通告消息和通过 IWANT 命令获取消息之间的反应时间之间存在松弛。
func (*MessageCache) Get ¶
func (mc *MessageCache) Get(mid string) (*Message, bool)
Get 从缓存中获取消息。 参数:
- mid: 消息 ID
返回值:
- *Message: 消息指针
- bool: 是否存在该消息
func (*MessageCache) GetForPeer ¶
GetForPeer 从缓存中获取对等节点的消息。 参数:
- mid: 消息 ID
- p: 对等节点 ID
返回值:
- *Message: 消息指针
- int: 对等节点事务计数
- bool: 是否存在该消息
func (*MessageCache) GetGossipIDs ¶
func (mc *MessageCache) GetGossipIDs(topic string) []string
GetGossipIDs 获取给定主题的 gossip 消息 ID 列表。 参数:
- topic: 主题名称
返回值:
- []string: gossip 消息 ID 列表
func (*MessageCache) SetMsgIdFn ¶
func (mc *MessageCache) SetMsgIdFn(msgID func(*Message) string)
SetMsgIdFn 设置消息 ID 生成函数。 参数:
- msgID: 消息 ID 生成函数
type MessageMetadataOpt ¶
type MessageMetadataOpt struct {
// contains filtered or unexported fields
}
MessageMetadataOpt 表示消息元信息的选项。
type MessageSignaturePolicy ¶
type MessageSignaturePolicy uint8
MessageSignaturePolicy 描述是否生成、期望和/或验证签名的策略。
type MsgIdFunction ¶
MsgIdFunction 返回传递的消息的唯一 ID,PubSub 可以通过配置 Option from WithMessageIdFn 使用任何此函数的实现。 参数:
- pmsg: 要计算 ID 的消息。
返回值:
- string: 消息 ID。
type Node ¶
type Node struct { ID peer.ID // 节点ID Status NodeStatus // 当前状态 LastSeen time.Time // 最后一次看到节点的时间 FailedAttempts int // 连续失败尝试次数 Score float64 // 节点评分 History []StatusChange // 状态变化历史 CheckInterval time.Duration // 检查间隔 SuccessiveSuccesses int // 连续成功次数 LastCheckTime time.Time // 上次检查时间 ConnectionQuality float64 // 连接质量 NetworkCondition []time.Duration // 网络延迟历史 }
Node 表示一个节点及其状态息
type NodeOption ¶
NodeOption 定义了一个函数类型,用于配置PubSub 参数:
- *Options: 需要配置的选项对象
返回值:
- error: 配置过程中的错误信息
func WithNodeDiscovery ¶ added in v0.0.3
func WithNodeDiscovery(d discovery.Discovery) NodeOption
WithNodeDiscovery 设置 Discovery 服务 参数:
- d: 要设置的 Discovery 服务实例
返回值:
- NodeOption: 返回一个配置函数
func WithSetD ¶
func WithSetD(d int) NodeOption
WithSetD 设置 GossipSub 主题网格的理想度数 参数:
- d: 要设置的理想度数
返回值:
- NodeOption: 返回一个配置函数
func WithSetDirectPeers ¶
func WithSetDirectPeers(peers []peer.AddrInfo) NodeOption
WithSetDirectPeers 设置直连对等节点列表 参数:
- peers: 要设置的直连对等节点列表
返回值:
- NodeOption: 返回一个配置函数
func WithSetDlo ¶
func WithSetDlo(dlo int) NodeOption
WithSetDlo 设置 GossipSub 主题网格中保持的最少节点数 参数:
- dlo: 要设置的最少节点数
返回值:
- NodeOption: 返回一个配置函数
func WithSetFollowupTime ¶
func WithSetFollowupTime(t time.Duration) NodeOption
WithSetFollowupTime 设置跟随时间 参数:
- t: 要设置的跟随时间
返回值:
- NodeOption: 返回一个配置函数
func WithSetGossipFactor ¶
func WithSetGossipFactor(f float64) NodeOption
WithSetGossipFactor 设置Gossip因子 参数:
- f: 要设置的Gossip因子
返回值:
- NodeOption: 返回一个配置函数
func WithSetHeartbeatInterval ¶
func WithSetHeartbeatInterval(interval time.Duration) NodeOption
WithSetHeartbeatInterval 设置心跳间隔 参数:
- interval: 要设置的心跳间隔
返回值:
- NodeOption: 返回一个配置函数
func WithSetLoadConfig ¶
func WithSetLoadConfig(load bool) NodeOption
WithSetLoadConfig 设置是否加载配置选项 参数:
- load: 是否加载配置
返回值:
- NodeOption: 返回一个配置函数
func WithSetMaxMessageSize ¶
func WithSetMaxMessageSize(size int) NodeOption
WithSetMaxMessageSize 设置最大消息大小 参数:
- size: 要设置的最大消息大小
返回值:
- NodeOption: 返回一个配置函数
func WithSetMaxPendingConns ¶
func WithSetMaxPendingConns(n int) NodeOption
WithSetMaxPendingConns 设置最大待处理连接数 参数:
- n: 要设置的最大待处理连接数
返回值:
- NodeOption: 返回一个配置函数
func WithSetMaxTransmissionSize ¶
func WithSetMaxTransmissionSize(size int) NodeOption
WithSetMaxTransmissionSize 设置最大传输大小 参数:
- size: 要设置的最大传输大小
返回值:
- NodeOption: 返回一个配置函数
func WithSetPubSubMode ¶
func WithSetPubSubMode(mode PubSubType) NodeOption
WithSetPubSubMode 设置发布订阅模式 参数:
- mode: 要设置的发布订阅模式
返回值:
- NodeOption: 返回一个配置函数
func WithSetSignMessages ¶
func WithSetSignMessages(sign bool) NodeOption
WithSetSignMessages 设置是否签名消息 参数:
- sign: 是否签名消息
返回值:
- NodeOption: 返回一个配置函数
func WithSetValidateMessages ¶
func WithSetValidateMessages(validate bool) NodeOption
WithSetValidateMessages 设置是否验证消息 参数:
- validate: 是否验证消息
返回值:
- NodeOption: 返回一个配置函数
type NodePubSub ¶
type NodePubSub struct {
// contains filtered or unexported fields
}
NodePubSub 表示分布式存储网络的主要结构
func NewNodePubSub ¶
func NewNodePubSub(ctx context.Context, host host.Host, opts ...NodeOption) (*NodePubSub, error)
NewNodePubSub 创建并返回一个新的 NodePubSub 实例 参数:
- ctx: 上下文,用于控制PubSub实例的生命周期
- host: libp2p主机,代表当前节点
- opts: 节点选项,用于自定义PubSub的行为
返回:
- *NodePubSub: 新创建的NodePubSub实例
- error: 如果创建过程中出现错误,返回相应的错误信息
func (*NodePubSub) BroadcastWithTopic ¶
func (pubsub *NodePubSub) BroadcastWithTopic(topic string, data []byte) error
BroadcastWithTopic 将消息广播到给定主题 参数:
- topic: 主题名称
- data: 要广播的消息数据
返回:
- error: 如果广播过程中出现错误,返回相应的错误信息
func (*NodePubSub) CancelPubsubWithTopic ¶
func (pubsub *NodePubSub) CancelPubsubWithTopic(name string) error
CancelPubsubWithTopic 取消给定名字的订阅 参数:
- name: 要取消的主题名称
返回:
- error: 如果取消过程中出现错误,返回相应的错误信息
func (*NodePubSub) CancelSubscribeWithTopic ¶
func (pubsub *NodePubSub) CancelSubscribeWithTopic(topic string) error
CancelSubscribeWithTopic 取消订阅给定主题 参数:
- topic: 要取消订阅的主题名称
返回:
- error: 如果取消订阅过程中出现错误,返回相应的错误信息
func (*NodePubSub) GetTopic ¶
func (pubsub *NodePubSub) GetTopic(name string) (*Topic, error)
GetTopic 根据给定的名称获取一个 topic 参数:
- name: 主题名称
返回:
- *Topic: 获取或创建的主题实例
- error: 如果获取或创建过程中出现错误,返回相应的错误信息
func (*NodePubSub) IsSubscribed ¶
func (pubsub *NodePubSub) IsSubscribed(topic string) bool
IsSubscribed 检查给定的主题是否已经订阅 参数:
- topic: 主题名称
返回:
- bool: 如果主题已订阅返回true,否则返回false
func (*NodePubSub) ListPeers ¶
func (pubsub *NodePubSub) ListPeers(topic string) []peer.ID
ListPeers 返回我们在给定主题中连接到的对等点列表 参数:
- topic: 主题名称
返回:
- []peer.ID: 与给定主题相关的对等点ID列表
func (*NodePubSub) NotifyNewPeer ¶ added in v0.0.4
func (pubsub *NodePubSub) NotifyNewPeer(peer peer.ID) error
NotifyNewPeer 通知系统有新的对等节点加入 参数:
- peer: 新加入节点的ID
返回值:
- error: 如果节点不满足要求则返回错误
func (*NodePubSub) Publish ¶
func (pubsub *NodePubSub) Publish(topic string, data []byte) error
Publish 向 topic 发布一条消息 参数:
- topic: 主题名称
- data: 要发布的消息数据
返回:
- error: 如果发布过程中出现错误,返回相应的错误信息
func (*NodePubSub) Pubsub ¶
func (pubsub *NodePubSub) Pubsub() *PubSub
Pubsub 返回 PubSub 实例 返回:
- *PubSub: 当前NodePubSub实例使用的PubSub实例
func (*NodePubSub) Subscribe ¶
func (pubsub *NodePubSub) Subscribe(topic string, subscribe bool) (*Subscription, error)
Subscribe 订阅一个 topic 参数:
- topic: 主题名称
- subscribe: 是否实际进行订阅操作
返回:
- *Subscription: 如果subscribe为true,返回订阅实例;否则返回nil
- error: 如果订阅过程中出现错误,返回相应的错误信息
func (*NodePubSub) SubscribeWithTopic ¶
func (pubsub *NodePubSub) SubscribeWithTopic(topic string, handler PubSubMsgHandler, subscribe bool) error
SubscribeWithTopic 订阅给定主题,并使用给定的订阅消息处理函数 参数:
- topic: 要订阅的主题名称
- handler: 用于处理接收到的消息的函数
- subscribe: 是否实际进行订阅操作
返回:
- error: 如果订阅过程中出现错误,返回相应的错误信息
type NodeStatus ¶
type NodeStatus int
NodeStatus 表示节点的状态
const ( Online NodeStatus = iota // 节点在线 Suspicious // 节点可疑 Offline // 节点离线 )
type NodeStatusTracker ¶
type NodeStatusTracker struct {
// contains filtered or unexported fields
}
NodeStatusTracker 用于跟踪和管理节点状态
func NewNodeStatusTracker ¶
func NewNodeStatusTracker(h host.Host, defaultCheckInterval time.Duration, offlineThreshold int, pingTimeout time.Duration) *NodeStatusTracker
NewNodeStatusTracker 创建一个新的 NodeStatusTracker 实例 参数:
- h: libp2p主机
- defaultCheckInterval: 默认检查间隔
- offlineThreshold: 离线阈值
- pingTimeout: ping超时时间
返回:
- *NodeStatusTracker: 新创建的NodeStatusTracker实例
func (*NodeStatusTracker) SubscribeStatusChanges ¶
func (nst *NodeStatusTracker) SubscribeStatusChanges() chan StatusChange
SubscribeStatusChanges 订阅状态变化事件 返回:
- chan StatusChange: 状态变化通道
func (*NodeStatusTracker) UnsubscribeStatusChanges ¶
func (nst *NodeStatusTracker) UnsubscribeStatusChanges(ch chan StatusChange)
UnsubscribeStatusChanges 取消订阅状态变化事件 参数:
- ch: 要取消订阅的通道
func (*NodeStatusTracker) UpdateNodeStatus ¶
func (nst *NodeStatusTracker) UpdateNodeStatus(pid peer.ID, status NodeStatus)
UpdateNodeStatus 允许外部组件更新节点状态 参数:
- pid: 要更新的节点ID
- status: 新的状态
type Option ¶
Option 是用于配置 PubSub 的选项函数类型
func WithAppSpecificRpcInspector ¶
WithAppSpecificRpcInspector 设置一个钩子,用于在处理传入的 RPC 之前检查它们。 检查器在处理已接受的 RPC 之前调用。如果检查器的错误为 nil,则按常规处理 RPC。否则,RPC 将被丢弃。 参数:
- inspector: 检查函数。
返回值:
- Option: 配置选项。
func WithDefaultValidator ¶
func WithDefaultValidator(val interface{}, opts ...ValidatorOpt) Option
WithDefaultValidator 添加一个默认验证器,适用于所有主题 参数:
- val: interface{} 验证器
- opts: ...ValidatorOpt 验证器选项
返回值:
- Option 配置选项
func WithDirectConnectTicks ¶
WithDirectConnectTicks 是一个 gossipsub 路由器选项,用于设置尝试重新连接当前未连接的直接对等节点的心跳滴答次数。 参数:
- t: uint64 类型,表示心跳滴答次数。
返回值:
- Option: 返回一个 Option 类型的函数,用于配置 gossipsub 路由器。
func WithDirectPeers ¶
WithDirectPeers 是一个 gossipsub 路由器选项,用于指定具有直接对等关系的对等节点。 参数:
- pis: []peer.AddrInfo 类型,表示对等节点的信息列表。
返回值:
- Option: 返回一个 Option 类型的函数,用于配置 gossipsub 路由器。
func WithDiscovery ¶
func WithDiscovery(d discovery.Discovery, opts ...DiscoverOpt) Option
WithDiscovery 提供用于引导和提供 peers 到 PubSub 的发现机制。 参数:
- d: 发现机制。
- opts: 可选的发现配置。
返回值:
- Option: 配置选项。
func WithEventTracer ¶
func WithEventTracer(tracer EventTracer) Option
WithEventTracer 提供 pubsub 系统的事件追踪器。 参数:
- tracer: 事件追踪器。
返回值:
- Option: 配置选项。
func WithFloodPublish ¶
WithFloodPublish 是一个 gossipsub 路由器选项,用于启用洪水发布。 参数:
- floodPublish: bool 类型,表示是否启用洪水发布。
返回值:
- Option: 返回一个 Option 类型的函数,用于配置 gossipsub 路由器。
func WithGossipSubParams ¶
func WithGossipSubParams(cfg GossipSubParams) Option
WithGossipSubParams 是一个 gossipsub 路由器选项,允许在实例化 gossipsub 路由器时设置自定义配置。 参数:
- cfg: GossipSubParams 类型,表示 gossipsub 参数配置。
返回值:
- Option: 返回一个 Option 类型的函数,用于配置 gossipsub 路由器。
func WithGossipSubProtocols ¶
func WithGossipSubProtocols(protos []protocol.ID, feature GossipSubFeatureTest) Option
WithGossipSubProtocols 是一个 gossipsub 路由器选项,用于配置自定义协议列表和功能测试函数 参数:
- protos: 协议列表
- feature: 功能测试函数
返回值:
- Option: 配置 gossipsub 协议和功能的选项函数
func WithMaxMessageSize ¶
WithMaxMessageSize 设置 pubsub 消息的全局最大消息大小。默认值是 1MiB (DefaultMaxMessageSize)。 警告 #1:确保更改 floodsub (FloodSubID) 和 gossipsub (GossipSubID) 的默认协议前缀。 警告 #2:减少默认的最大消息限制是可以的,但要确保您的应用程序消息不会超过新的限制。 参数:
- maxMessageSize: 最大消息大小。
返回值:
- Option: 配置选项。
func WithMessageAuthor ¶
WithMessageAuthor 设置出站消息的作者为给定的 peer ID(默认为主机的 ID)。 如果启用了消息签名,则私钥必须在主机的 peerstore 中可用。 参数:
- author: 消息作者的 ID。
返回值:
- Option: 配置选项。
func WithMessageIdFn ¶
func WithMessageIdFn(fn MsgIdFunction) Option
WithMessageIdFn 是一个选项,用于自定义为 pubsub 消息计算消息 ID 的方式。 默认的 ID 函数是 DefaultMsgIdFn(连接源和序列号),但它可以自定义为消息的哈希值。 参数:
- fn: 自定义的消息 ID 函数。
返回值:
- Option: 配置选项。
func WithMessageSignaturePolicy ¶
func WithMessageSignaturePolicy(policy MessageSignaturePolicy) Option
WithMessageSignaturePolicy 设置生产和验证消息签名的操作模式。 参数:
- policy: 签名策略。
返回值:
- Option: 配置选项。
func WithMessageSigning ¶
WithMessageSigning 启用或禁用消息签名(默认启用)。 不推荐在没有消息签名或没有验证的情况下使用。 参数:
- enabled: 是否启用消息签名。
返回值:
- Option: 配置选项。
func WithNoAuthor ¶
func WithNoAuthor() Option
WithNoAuthor 省略消息的作者和序列号数据,并禁用签名的使用。 不推荐与默认的消息 ID 函数一起使用,请参阅 WithMessageIdFn。 返回值:
- Option: 配置选项。
func WithPeerExchange ¶
WithPeerExchange 是一个 gossipsub 路由器选项,用于在 PRUNE 上启用对等节点交换。 参数:
- doPX: bool 类型,表示是否启用对等节点交换。
返回值:
- Option: 返回一个 Option 类型的函数,用于配置 gossipsub 路由器。
func WithPeerFilter ¶
func WithPeerFilter(filter PeerFilter) Option
WithPeerFilter 是一个选项,用于设置 pubsub peers 的过滤器。 默认的 peer 过滤器是 DefaultPeerFilter(总是返回 true),但它可以自定义为任何自定义实现。 参数:
- filter: 自定义的 peer 过滤器。
返回值:
- Option: 配置选项。
func WithPeerGater ¶
func WithPeerGater(params *PeerGaterParams) Option
WithPeerGater 是一个 gossipsub 路由器选项,用于启用反应性验证队列管理 参数:
- params: PeerGater 的参数
返回值:
- Option: PubSub 的选项函数
func WithPeerOutboundQueueSize ¶
WithPeerOutboundQueueSize 是一个选项,用于设置对 peer 的出站消息缓冲区大小。 当出站队列已满时,我们开始丢弃消息。 参数:
- size: 出站消息队列的大小。
返回值:
- Option: 配置选项。
func WithPeerScore ¶
func WithPeerScore(params *PeerScoreParams, thresholds *PeerScoreThresholds) Option
WithPeerScore 是一个 gossipsub 路由器选项,用于启用对等节点评分。 参数:
- params: *PeerScoreParams 类型,表示对等节点评分参数。
- thresholds: *PeerScoreThresholds 类型,表示对等节点评分阈值。
返回值:
- Option: 返回一个 Option 类型的函数,用于配置 gossipsub 路由器。
func WithPeerScoreInspect ¶
WithPeerScoreInspect 是一个 gossipsub 路由器选项,用于启用对等节点分数调试。 启用此选项后,将定期调用提供的函数,以便应用程序检查或转储已连接对等节点的分数。 提供的函数可以具有以下两种签名之一:
- PeerScoreInspectFn,接受对等节点 ID 到分数的映射。
- ExtendedPeerScoreInspectFn,接受对等节点 ID 到 PeerScoreSnapshots 的映射, 并允许检查单个分数组件以调试对等节点评分。
此选项必须在 WithPeerScore 选项之后传递。
func WithProtocolMatchFn ¶
func WithProtocolMatchFn(m ProtocolMatchFn) Option
WithProtocolMatchFn 设置用于协议选择的自定义匹配函数。 参数:
- m: 协议匹配函数。
返回值:
- Option: 配置选项。
func WithRawTracer ¶
WithRawTracer 添加一个原始追踪器到 pubsub 系统。 可以使用多次调用选项添加多个追踪器。 参数:
- tracer: 原始追踪器。
返回值:
- Option: 配置选项。
func WithSeenMessagesStrategy ¶
WithSeenMessagesStrategy 配置已看到消息缓存使用的查找/清理策略。 参数:
- strategy: 策略。
返回值:
- Option: 配置选项。
func WithSeenMessagesTTL ¶
WithSeenMessagesTTL 配置之前看到的消息 ID 可以被遗忘的时间。 参数:
- ttl: 生存时间。
返回值:
- Option: 配置选项。
func WithStrictSignatureVerification ¶
WithStrictSignatureVerification 是一个选项,用于启用或禁用严格的消息签名验证。 当启用时(这是默认设置),未签名的消息将被丢弃。 不推荐在没有消息签名或没有验证的情况下使用。 参数:
- required: 是否要求严格签名验证。
返回值:
- Option: 配置选项。
func WithSubscriptionFilter ¶
func WithSubscriptionFilter(subFilter SubscriptionFilter) Option
WithSubscriptionFilter 是一个 pubsub 选项,用于指定感兴趣主题的订阅过滤器。 参数: - subFilter: 要应用的 SubscriptionFilter 返回值: - Option: 一个设置 SubscriptionFilter 的选项
func WithValidateQueueSize ¶
WithValidateQueueSize 设置验证队列的大小,默认大小为 32 参数:
- n: int 队列大小
返回值:
- Option 配置选项
func WithValidateThrottle ¶
WithValidateThrottle 设置活动验证 goroutine 的上限,默认值为 8192 参数:
- n: int 上限值
返回值:
- Option 配置选项
func WithValidateWorkers ¶
WithValidateWorkers 设置同步验证工作线程的数量,默认值为 CPU 数量 参数:
- n: int 线程数量
返回值:
- Option 配置选项
type Options ¶
type Options struct { FollowupTime time.Duration // 跟随时间,用于控制消息传播延迟 GossipFactor float64 // Gossip 因子,控制消息传播的概率 D int // GossipSub 主题网格的理想度数,每个节点维护的连接数 Dlo int // GossipSub 主题网格中保持的最少节点数,网格连接的下限 MaxPendingConns int // 最大待处理连接数,限制并发连接请求数量 MaxMessageSize int // 最大消息大小,限制单条消息的字节数 SignMessages bool // 是否签名消息,控制消息的安全性 ValidateMessages bool // 是否验证消息,控制消息的合法性检查 DirectPeers []peer.AddrInfo // 直连对等节点列表,保存需要直接连接的节点信息 HeartbeatInterval time.Duration // 心跳间隔,控制节点存活检测的频率 MaxTransmissionSize int // 最大传输大小,限制单次传输的字节数 LoadConfig bool // 是否加载配置选项,控制是否使用外部配置 PubSubMode PubSubType // 发布订阅模式,指定使用的协议类型 // contains filtered or unexported fields }
Options 定义了 PubSub 的配置选项
func DefaultOptions ¶
func DefaultOptions() *Options
DefaultOptions 返回一个带有默认配置的 Options 对象 返回值:
- *Options: 包含默认配置的 Options 对象
func (*Options) ApplyOptions ¶
func (opt *Options) ApplyOptions(opts ...NodeOption) error
ApplyOptions 应用给定的选项到 Options 对象 参数:
- opts: 可变参数,包含多个 NodeOption 函数
返回值:
- error: 如果应用选项时出现错误,返回相应的错误信息
func (*Options) GetNodeDiscovery ¶ added in v0.0.3
GetNodeDiscovery 获取配置的 Discovery 服务 返回值:
- discovery.Discovery: 当前配置的 Discovery 服务实例
func (*Options) GetPubSubMode ¶
func (o *Options) GetPubSubMode() PubSubType
GetPubSubMode 获取发布订阅模式 返回值:
- PubSubType: 当前设置的发布订阅模式
type PBTracer ¶
type PBTracer struct {
// contains filtered or unexported fields
}
PBTracer 是一个将事件写入文件的追踪器,事件以 protobuf 格式编码。
func NewPBTracer ¶
NewPBTracer 创建一个新的 PBTracer,将追踪信息写入文件。 参数:
- file: 文件路径
返回值:
- *PBTracer: PBTracer 对象
- error: 错误信息
func OpenPBTracer ¶
OpenPBTracer 创建一个新的 PBTracer,可以显式控制文件打开的标志和权限。 参数:
- file: 文件路径
- flags: 文件打开标志
- perm: 文件权限
返回值:
- *PBTracer: PBTracer 对象
- error: 错误信息
func (*PBTracer) Trace ¶
func (t *PBTracer) Trace(evt *pb.TraceEvent)
Trace 向追踪器添加一个事件 参数:
- evt: 要添加的事件
type PeerFilter ¶
PeerFilter 用于过滤 pubsub peers。对于给定的主题,它应该返回 true 表示接受。 参数:
- pid: peer 的 ID。
- topic: 主题。
返回值:
- bool: 是否接受该 peer。
type PeerGaterParams ¶
type PeerGaterParams struct { Threshold float64 // 当被限流/验证消息的比例超过此阈值时,启用 gater GlobalDecay float64 // 全局计数器衰减参数 SourceDecay float64 // 每 IP 计数器衰减参数 DecayInterval time.Duration // 衰减间隔 DecayToZero float64 // 计数器归零阈值 RetainStats time.Duration // 保留统计数据的时间 Quiet time.Duration // 安静期时间 DuplicateWeight float64 // 重复消息的权重 IgnoreWeight float64 // 忽略消息的权重 RejectWeight float64 // 拒绝消息的权重 TopicDeliveryWeights map[string]float64 // 优先主题的传递权重 }
PeerGaterParams 包含控制 peer gater 操作的参数
func DefaultPeerGaterParams ¶
func DefaultPeerGaterParams() *PeerGaterParams
DefaultPeerGaterParams 创建使用默认值的 PeerGaterParams 结构 返回值:
- *PeerGaterParams: 返回使用默认值的参数结构
func NewPeerGaterParams ¶
func NewPeerGaterParams(threshold, globalDecay, sourceDecay float64) *PeerGaterParams
NewPeerGaterParams 创建新的 PeerGaterParams 结构,使用指定的阈值和衰减参数以及默认值 参数:
- threshold: 门控阈值
- globalDecay: 全局衰减参数
- sourceDecay: 来源衰减参数
返回值:
- *PeerGaterParams: 返回新创建的参数结构
func (*PeerGaterParams) WithTopicDeliveryWeights ¶
func (p *PeerGaterParams) WithTopicDeliveryWeights(w map[string]float64) *PeerGaterParams
WithTopicDeliveryWeights 设置优先主题的传递权重 参数:
- w: 主题权重的映射
返回值:
- *PeerGaterParams: 返回更新后的参数
type PeerMetadataStore ¶
type PeerMetadataStore interface { // Get 获取与对等节点关联的元数据; // 如果没有与对等节点关联的元数据,应返回 nil,而不是错误。 Get(context.Context, peer.ID) ([]byte, error) // Put 设置与对等节点关联的元数据。 Put(context.Context, peer.ID, []byte) error }
PeerMetadataStore 是一个接口,用于存储和检索每个对等节点的元数据
type PeerScoreInspectFn ¶
PeerScoreInspectFn 定义对等节点分数检查函数类型
type PeerScoreParams ¶
type PeerScoreParams struct { SkipAtomicValidation bool // 是否允许仅设置某些参数而不是所有参数 Topics map[string]*TopicScoreParams // 每个主题的分数参数 TopicScoreCap float64 // 主题分数上限 AppSpecificScore func(p peer.ID) float64 // 应用程序特定的对等节点分数 AppSpecificWeight float64 // 应用程序特定分数的权重 IPColocationFactorWeight float64 // IP 同位因素的权重 IPColocationFactorThreshold int // IP 同位因素阈值 IPColocationFactorWhitelist []*net.IPNet // IP 同位因素白名单 BehaviourPenaltyWeight float64 // 行为模式处罚的权重 BehaviourPenaltyThreshold float64 // 行为模式处罚的阈值 BehaviourPenaltyDecay float64 // 行为模式处罚的衰减 DecayInterval time.Duration // 参数计数器的衰减间隔 DecayToZero float64 // 计数器值低于该值时被视为 0 RetainScore time.Duration // 断开连接的对等节点记住计数器的时间 SeenMsgTTL time.Duration // 记住消息传递时间 }
PeerScoreParams 包含用于控制对等节点分数的参数
type PeerScoreSnapshot ¶
type PeerScoreSnapshot struct { Score float64 // 对等节点分数 Topics map[string]*TopicScoreSnapshot // 每个主题的分数快照 AppSpecificScore float64 // 应用程序特定的分数 IPColocationFactor float64 // IP 同位因素 BehaviourPenalty float64 // 行为模式处罚 }
PeerScoreSnapshot 包含对等节点分数快照
type PeerScoreThresholds ¶
type PeerScoreThresholds struct { SkipAtomicValidation bool // 是否允许仅设置某些参数而不是所有参数 GossipThreshold float64 // 低于该分数时抑制 Gossip 传播,应为负数 PublishThreshold float64 // 低于该分数时不应发布消息,应为负数且 <= GossipThreshold GraylistThreshold float64 // 低于该分数时完全抑制消息处理,应为负数且 <= PublishThreshold AcceptPXThreshold float64 // 低于该分数时将忽略 PX,应为正数,限于启动器和其他可信节点 OpportunisticGraftThreshold float64 // 低于该分数时触发机会性 grafting,应为正数且值小 }
PeerScoreThresholds 包含用于控制对等节点分数的参数
type ProtocolMatchFn ¶
ProtocolMatchFn 是一个函数类型,用于匹配协议ID
type ProvideKey ¶
ProvideKey 是一个函数,在发布新消息时提供私钥及其关联的对等节点 ID。
type PubOpt ¶
type PubOpt func(pub *PublishOptions) error
PubOpt 定义发布选项的类型。
func WithLocalPublication ¶
WithLocalPublication 返回一个发布选项,仅通知进程内的订阅者。 它阻止消息发布到网状对等节点。 参数: - local: bool 类型的标志,指示是否仅在本地发布消息。 返回值: - PubOpt: 返回一个发布选项函数,用于设置 PublishOptions 中的 local 字段。
func WithMessageMetadata ¶
func WithMessageMetadata(messageID string, msgType pb.MessageMetadata_MessageType) PubOpt
WithMessageMetadata 设置消息的元信息。 参数: - messageID: string 类型,表示消息ID。 - msgType: MessageType 类型,表示消息的类型(请求或响应)。 返回值: - PubOpt: 返回一个发布选项函数,用于设置 PublishOptions 中的消息元信息。
func WithReadiness ¶
func WithReadiness(ready RouterReady) PubOpt
WithReadiness 返回一个发布选项,仅在路由器准备好时发布。 此选项仅在 PubSub 也使用 WithDiscovery 时有用。 参数: - ready: RouterReady 类型的回调函数,当路由器准备好时被调用。 返回值: - PubOpt: 返回一个发布选项函数,用于设置 PublishOptions 中的 ready 字段。
func WithSecretKeyAndPeerId ¶
WithSecretKeyAndPeerId 返回一个发布选项,用于提供自定义私钥及其对应的对等节点 ID。 这个选项在我们希望从网络中的"虚拟"不可连接的对等节点发送消息时非常有用。 参数: - key: crypto.PrivKey 类型,自定义私钥。 - pid: peer.ID 类型,对应的对等节点 ID。 返回值: - PubOpt: 返回一个发布选项函数,用于设置 PublishOptions 中的 customKey 字段。
func WithTargetMap ¶
WithTargetMap 设置目标节点列表。 参数: - targets: 目标节点的列表,类型为 []peer.ID。 返回值: - PubOpt: 返回一个发布选项函数,用于设置 PublishOptions 中的 targetMap 字段。
type PubSub ¶
type PubSub struct {
// contains filtered or unexported fields
}
PubSub 实现了发布-订阅系统。
func NewFloodSub ¶
NewFloodSub 返回一个使用FloodSubRouter的新PubSub对象 参数:
- ctx: 上下文
- h: 主机
- opts: 选项
返回值:
- *PubSub: 创建的PubSub实例
- error: 如果创建失败,返回错误
func NewFloodsubWithProtocols ¶
func NewFloodsubWithProtocols(ctx context.Context, h host.Host, ps []protocol.ID, opts ...Option) (*PubSub, error)
NewFloodsubWithProtocols 返回一个新的启用floodsub的PubSub对象,使用指定的协议 参数:
- ctx: 上下文
- h: 主机
- ps: 协议列表
- opts: 选项
返回值:
- *PubSub: 创建的PubSub实例
- error: 如果创建失败,返回错误
func NewGossipSub ¶
NewGossipSub 返回一个新的使用默认 GossipSubRouter 作为路由器的 PubSub 对象。 参数:
- ctx: 上下文
- h: 主机
- opts: 选项
返回值:
- *PubSub: PubSub 对象
- error: 错误信息
func NewGossipSubWithRouter ¶
func NewGossipSubWithRouter(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option) (*PubSub, error)
NewGossipSubWithRouter 返回一个使用给定路由器的新的 PubSub 对象。 参数:
- ctx: 上下文
- h: 主机
- rt: 路由器
- opts: 选项
返回值:
- *PubSub: PubSub 对象
- error: 错误信息
func NewPubSub ¶
NewPubSub 返回一个新的 PubSub 管理对象。 参数:
- ctx: 用于控制 PubSub 生命周期的上下文。
- h: libp2p 主机。
- rt: PubSub 路由器。
- opts: 可选配置项。
返回值:
- *PubSub: 新的 PubSub 对象。
- error: 如果有错误发生,返回错误。
func NewRandomSub ¶
NewRandomSub 返回一个使用 RandomSubRouter 作为路由器的新 PubSub 对象 参数:
- ctx: 上下文
- h: 主机
- size: 网络大小
- opts: 选项
返回值:
- *PubSub: PubSub 对象
- error: 错误
func (*PubSub) BlacklistPeer ¶
BlacklistPeer 将一个对等节点列入黑名单;所有来自此对等节点的消息将无条件丢弃。
func (*PubSub) Join ¶
Join 加入主题并返回 Topic 句柄。 每个主题应该只有一个 Topic 句柄,如果主题句柄已存在,Join 将返回错误。 参数:
- topic: 主题名称
- opts: 主题选项
返回值:
- *Topic: 主题句柄
- error: 如果发生错误,返回错误
func (*PubSub) NotifyNewPeer ¶ added in v0.0.4
NotifyNewPeer 通知系统有新的对等节点加入 参数:
- peer: 新加入节点的ID
返回值:
- error: 如果节点不满足要求则返回错误
func (*PubSub) RegisterTopicValidator ¶
func (p *PubSub) RegisterTopicValidator(topic string, val interface{}, opts ...ValidatorOpt) error
RegisterTopicValidator 为主题注册一个验证器。 默认情况下,验证器是异步的,这意味着它们将在单独的 goroutine 中运行。 活动 goroutine 的数量由全局和每个主题验证器的节流控制;如果超过节流阈值,消息将被丢弃。
func (*PubSub) Subscribe ¶
func (p *PubSub) Subscribe(topic string, opts ...SubOpt) (*Subscription, error)
Subscribe 返回给定主题的新订阅。 请注意,订阅不是即时操作。可能需要一些时间,订阅才能被 pubsub 主循环处理并传播给我们的对等节点。
已弃用:请使用 pubsub.Join() 和 topic.Subscribe()
func (*PubSub) UnregisterTopicValidator ¶
UnregisterTopicValidator 从主题中移除一个验证器。 如果没有验证器注册到该主题,则返回错误。
type PubSubMsgHandler ¶
type PubSubMsgHandler func(*Message)
PubSubMsgHandler 定义了处理其他节点发布消息的函数类型 参数:
- *Message: 接收到的消息对象
type PubSubRouter ¶
type PubSubRouter interface { // Protocols 返回路由器支持的协议列表 Protocols() []protocol.ID // Attach 被 PubSub 构造函数调用以将路由器附加到一个新初始化的 PubSub 实例 Attach(*PubSub) // AddPeer 通知路由器有新的 peer 已连接 AddPeer(peer.ID, protocol.ID) // RemovePeer 通知路由器有 peer 已断开 RemovePeer(peer.ID) // EnoughPeers 返回路由器是否需要更多的 peers 才能准备好发布新记录 // 参数: // - topic: 主题 // - suggested: 建议的 peer 数量(如果大于 0) EnoughPeers(topic string, suggested int) bool // AcceptFrom 在将消息推送到验证管道或处理控制信息之前调用,用于判断是否接受来自指定 peer 的消息 // 参数: // - peer.ID: 发送消息的 peer ID // 返回值: // - AcceptStatus: 接受状态(全部接受、仅接受控制消息或不接受) AcceptFrom(peer.ID) AcceptStatus // HandleRPC 处理 RPC 包裹中的控制消息 // 参数: // - rpc: 要处理的 RPC HandleRPC(*RPC) // Publish 转发已验证的新消息 // 参数: // - msg: 要转发的消息 Publish(*Message) // Join 通知路由器我们要接收和转发主题中的消息 // 参数: // - topic: 要加入的主题 Join(topic string) // Leave 通知路由器我们不再对主题感兴趣 // 参数: // - topic: 要离开的主题 Leave(topic string) }
PubSubRouter 是 PubSub 的消息路由组件
type PubSubType ¶
type PubSubType int
PubSubType 定义发布订阅的类型
const ( GossipSub PubSubType = iota // GossipSub 类型,基于 gossip 协议的发布订阅 FloodSub // FloodSub 类型,基于洪泛的发布订阅 RandomSub // RandomSub 类型,基于随机选择的发布订阅 )
type PublishOptions ¶
type PublishOptions struct {
// contains filtered or unexported fields
}
PublishOptions 表示发布选项。
type RandomSubRouter ¶
type RandomSubRouter struct {
// contains filtered or unexported fields
}
RandomSubRouter 是一个实现随机传播策略的路由器。 对于每条消息,它选择网络大小平方根个对等节点,最少为 RandomSubD,并将消息转发给它们。
func (*RandomSubRouter) AcceptFrom ¶
func (rs *RandomSubRouter) AcceptFrom(peer.ID) AcceptStatus
AcceptFrom 在处理控制信息或将消息推送到验证管道之前,对每个传入消息调用此方法 参数:
- peer.ID: 对等节点 ID
返回值:
- AcceptStatus: 接受状态
func (*RandomSubRouter) AddPeer ¶
func (rs *RandomSubRouter) AddPeer(p peer.ID, proto protocol.ID)
AddPeer 通知路由器一个新的对等节点已经连接 参数:
- p: 对等节点 ID
- proto: 协议 ID
func (*RandomSubRouter) Attach ¶
func (rs *RandomSubRouter) Attach(p *PubSub)
Attach 将路由器附加到一个初始化的 PubSub 实例 参数:
- p: PubSub 对象
func (*RandomSubRouter) EnoughPeers ¶
func (rs *RandomSubRouter) EnoughPeers(topic string, suggested int) bool
EnoughPeers 返回路由器是否需要更多对等节点才能准备好发布新记录 参数:
- topic: 主题
- suggested: 建议的对等节点数
返回值:
- bool: 是否有足够的对等节点
func (*RandomSubRouter) HandleRPC ¶
func (rs *RandomSubRouter) HandleRPC(rpc *RPC)
HandleRPC 处理控制消息 参数:
- rpc: RPC 对象
func (*RandomSubRouter) Join ¶
func (rs *RandomSubRouter) Join(topic string)
Join 通知路由器我们想要接收和转发主题中的消息 参数:
- topic: 主题
func (*RandomSubRouter) Leave ¶
func (rs *RandomSubRouter) Leave(topic string)
Leave 通知路由器我们不再对主题感兴趣 参数:
- topic: 主题
func (*RandomSubRouter) Protocols ¶
func (rs *RandomSubRouter) Protocols() []protocol.ID
Protocols 返回路由器支持的协议列表 返回值:
- []protocol.ID: 协议列表
func (*RandomSubRouter) Publish ¶
func (rs *RandomSubRouter) Publish(msg *Message)
Publish 发布一条已验证的新消息 参数:
- msg: 消息对象
func (*RandomSubRouter) RemovePeer ¶
func (rs *RandomSubRouter) RemovePeer(p peer.ID)
RemovePeer 通知路由器一个对等节点已经断开连接 参数:
- p: 对等节点 ID
type RawTracer ¶
type RawTracer interface { // AddPeer 当一个新对等节点被添加时调用。 AddPeer(p peer.ID, proto protocol.ID) // RemovePeer 当一个对等节点被移除时调用。 RemovePeer(p peer.ID) // Join 当加入一个新主题时调用。 Join(topic string) // Leave 当放弃一个主题时调用。 Leave(topic string) // Graft 当一个新对等节点被添加到网格时调用(gossipsub)。 Graft(p peer.ID, topic string) // Prune 当一个对等节点被从网格中移除时调用(gossipsub)。 Prune(p peer.ID, topic string) // ValidateMessage 当消息首次进入验证管道时调用。 ValidateMessage(msg *Message) // DeliverMessage 当消息被传递时调用。 DeliverMessage(msg *Message) // RejectMessage 当消息被拒绝或忽略时调用。 // 参数 reason 是一个命名字符串 Reject*。 RejectMessage(msg *Message, reason string) // DuplicateMessage 当重复消息被丢弃时调用。 DuplicateMessage(msg *Message) // ThrottlePeer 当一个对等节点被对等节点限制器限制时调用。 ThrottlePeer(p peer.ID) // RecvRPC 当接收到一个传入的 RPC 时调用。 RecvRPC(rpc *RPC) // SendRPC 当发送一个 RPC 时调用。 SendRPC(rpc *RPC, p peer.ID) // DropRPC 当一个出站 RPC 被丢弃时调用,通常是因为队列已满。 DropRPC(rpc *RPC, p peer.ID) // UndeliverableMessage 当 Subscribe 的消费者未能足够快地读取消息且压力释放机制触发丢弃消息时调用。 UndeliverableMessage(msg *Message) }
RawTracer 是一个低级追踪接口,允许应用程序追踪 pubsub 子系统的内部操作。 请注意,追踪器是同步调用的,这意味着应用程序的追踪器必须注意不要阻塞或修改参数。 警告:此接口不是固定的,可能会根据系统需求添加新方法。
type RemoteTracer ¶
type RemoteTracer struct {
// contains filtered or unexported fields
}
RemoteTracer 是一个将追踪事件发送到远程对等节点的追踪器
func NewRemoteTracer ¶
NewRemoteTracer 构建一个 RemoteTracer,将追踪信息发送到由 pi 标识的对等节点。 参数:
- ctx: 上下文,用于控制生命周期和取消操作
- host: 本地主机,表示当前节点
- pi: 远程对等节点的地址信息,包括节点ID和地址
返回值:
- *RemoteTracer: 新创建的 RemoteTracer 对象
- error: 如果发生错误,返回错误信息
func (*RemoteTracer) Trace ¶
func (t *RemoteTracer) Trace(evt *pb.TraceEvent)
Trace 向追踪器添加一个事件 参数:
- evt: 要添加的事件
type RouterReady ¶
type RouterReady func(rt PubSubRouter, topic string) (bool, error)
RouterReady 是一个函数,用于决定路由器是否准备好发布。
func MinTopicSize ¶
func MinTopicSize(size int) RouterReady
MinTopicSize 返回一个函数,该函数根据主题大小检查路由器是否准备好发布。 参数:
- size: 建议的主题大小(对等节点数)
返回值:
- RouterReady: 一个函数类型,接收路由器和主题名称,返回布尔值和错误
type StatusChange ¶
type StatusChange struct { PeerID peer.ID // 节点ID OldStatus NodeStatus // 旧状态 NewStatus NodeStatus // 新状态 Timestamp time.Time // 状态变化时间戳 Score float64 // 节点评分 ConnectionQuality float64 // 连接质量 CheckInterval time.Duration // 检查间隔 FailedAttempts int // 连续失败尝试次数 NetworkLatency time.Duration // 网络延迟 }
StatusChange 表示节点状态的变化
type SubOpt ¶
type SubOpt func(sub *Subscription) error
SubOpt 订阅选项函数类型
func WithBufferSize ¶
WithBufferSize 是一个订阅选项,用于自定义订阅输出缓冲区的大小。 默认长度为 32,但可以配置以避免消费者读取速度不够快时丢失消息。
type Subscription ¶
type Subscription struct {
// contains filtered or unexported fields
}
Subscription 处理特定主题订阅的详细信息。 对于给定的主题可能有多个订阅。
func (*Subscription) Cancel ¶
func (sub *Subscription) Cancel()
Cancel 关闭订阅。如果这是最后一个活动订阅,那么 pubsub 将向网络发送取消订阅公告。
func (*Subscription) Next ¶
func (sub *Subscription) Next(ctx context.Context) (*Message, error)
Next 返回订阅中的下一条消息。 参数: - ctx: context.Context 上下文,用于取消操作 返回值: - *Message: 下一条消息,如果有的话 - error: 错误信息,如果有的话
func (*Subscription) Topic ¶
func (sub *Subscription) Topic() string
Topic 返回与订阅关联的主题字符串。 返回值: - string: 订阅的主题字符串
type SubscriptionFilter ¶
type SubscriptionFilter interface { // CanSubscribe 返回 true 如果主题是感兴趣的并且我们可以订阅它 CanSubscribe(topic string) bool // FilterIncomingSubscriptions 用于所有包含订阅通知的 RPC。 // 它应仅过滤感兴趣的订阅,并且如果订阅过多(例如)可能会返回错误。 // 参数: // - from: 订阅来源的 peer.ID // - subs: 包含订阅通知的 RPC_SubOpts 列表 // 返回值: // - []*pb.RPC_SubOpts: 过滤后的订阅列表 // - error: 错误信息,如果有的话 FilterIncomingSubscriptions(from peer.ID, subs []*pb.RPC_SubOpts) ([]*pb.RPC_SubOpts, error) }
SubscriptionFilter 是一个函数,用于告诉我们是否有兴趣允许和跟踪给定主题的订阅。
每当收到另一个对等点的订阅通知时,都会咨询过滤器; 如果过滤器返回 false,则忽略通知。
当加入主题时,也会咨询过滤器;如果过滤器返回 false,则 Join 操作将导致错误。
func NewAllowlistSubscriptionFilter ¶
func NewAllowlistSubscriptionFilter(topics ...string) SubscriptionFilter
NewAllowlistSubscriptionFilter 创建一个订阅过滤器,该过滤器仅允许显式指定的主题用于本地订阅和传入对等订阅。 参数: - topics: 允许订阅的主题列表 返回值: - SubscriptionFilter: 一个新的允许列表订阅过滤器
func NewRegexpSubscriptionFilter ¶
func NewRegexpSubscriptionFilter(rx *regexp.Regexp) SubscriptionFilter
NewRegexpSubscriptionFilter 创建一个订阅过滤器,该过滤器仅允许与正则表达式匹配的主题用于本地订阅和传入对等订阅。 参数: - rx: 用于匹配主题的正则表达式 返回值: - SubscriptionFilter: 一个新的正则表达式订阅过滤器
func WrapLimitSubscriptionFilter ¶
func WrapLimitSubscriptionFilter(filter SubscriptionFilter, limit int) SubscriptionFilter
WrapLimitSubscriptionFilter 包装一个订阅过滤器,在 RPC 消息中允许的订阅数量有硬限制。 参数: - filter: 内部使用的 SubscriptionFilter - limit: 订阅数量限制 返回值: - SubscriptionFilter: 包装后的订阅过滤器
type TimeCachedBlacklist ¶
type TimeCachedBlacklist struct {
// contains filtered or unexported fields
}
TimeCachedBlacklist 是一种使用时间缓存实现的黑名单
type Topic ¶
type Topic struct {
// contains filtered or unexported fields
}
Topic 表示 pubsub 主题的句柄。
func (*Topic) EventHandler ¶
func (t *Topic) EventHandler(opts ...TopicEventHandlerOpt) (*TopicEventHandler, error)
EventHandler 创建特定主题事件的句柄。 参数: - opts: ...TopicEventHandlerOpt 事件处理程序选项 返回值: - *TopicEventHandler: 创建的事件处理程序 - error: 错误信息,如果有的话
func (*Topic) Publish ¶
Publish 发布数据到主题。 注意,如果是响应消息,需要为 opts ...PubOpt 设置消息元数据 WithMessageMetadata(msgID, pb.MessageMetadata_RESPONSE) 参数: - ctx: 上下文,用于控制发布操作 - data: 要发布的数据 - opts: 发布选项 返回值: - error: 错误信息,如果有的话
func (*Topic) PublishWithReply ¶
func (t *Topic) PublishWithReply(ctx context.Context, data []byte, targetNodes ...peer.ID) ([]byte, error)
PublishWithReply 发送消息并等待响应
使用示例:
// 不指定目标节点 reply, err := topic.PublishWithReply(ctx, data) // 指定一个目标节点 reply, err := topic.PublishWithReply(ctx, data, peerID1) // 指定多个目标节点 reply, err := topic.PublishWithReply(ctx, data, peerID1, peerID2, peerID3)
参数: - ctx: context.Context 表示上下文,用于控制流程 - data: []byte 表示要发送的消息内容 - targetNodes: ...peer.ID 表示需要将消息发送到的目标节点列表(可选) 返回值: - []byte: 接收到的回复消息 - error: 如果出现错误,返回错误信息
func (*Topic) Relay ¶
func (t *Topic) Relay() (RelayCancelFunc, error)
Relay 启用主题的消息中继并返回引用取消函数。 随后的调用增加引用计数器。 要完全禁用中继,必须取消所有引用。 返回值: - RelayCancelFunc: 取消中继的函数 - error: 错误信息,如果有的话
func (*Topic) SetScoreParams ¶
func (t *Topic) SetScoreParams(p *TopicScoreParams) error
SetScoreParams 设置主题的评分参数,如果 pubsub 路由器支持对等评分。 参数: - p: *TopicScoreParams 评分参数 返回值: - error: 错误信息,如果有的话
type TopicEventHandler ¶
type TopicEventHandler struct {
// contains filtered or unexported fields
}
TopicEventHandler 用于管理特定主题事件。无需订阅即可接收事件。
func (*TopicEventHandler) NextPeerEvent ¶
func (t *TopicEventHandler) NextPeerEvent(ctx context.Context) (PeerEvent, error)
NextPeerEvent 返回有关订阅对等节点的下一个事件。 保证:给定对等节点的 Peer Join 和 Peer Leave 事件将按顺序触发。 参数: - ctx: 上下文,用于控制操作。 返回值: - PeerEvent: 下一个对等节点事件。 - error: 错误信息,如果有的话。
type TopicEventHandlerOpt ¶
type TopicEventHandlerOpt func(t *TopicEventHandler) error
TopicEventHandlerOpt 定义了一个用于设置 TopicEventHandler 选项的函数类型。
type TopicOpt ¶
TopicOpt 主题选项函数类型
func WithTopicMessageIdFn ¶
func WithTopicMessageIdFn(msgId MsgIdFunction) TopicOpt
WithTopicMessageIdFn 设置自定义 MsgIdFunction 用于生成消息 ID 参数:
- msgId: 消息 ID 函数
返回值:
- TopicOpt: 主题选项函数
type TopicScoreParams ¶
type TopicScoreParams struct { SkipAtomicValidation bool // 是否允许仅设置某些参数而不是所有参数 TopicWeight float64 // 主题权重 TimeInMeshWeight float64 // 在 mesh 中的时间权重 TimeInMeshQuantum time.Duration // 在 mesh 中的时间量子 TimeInMeshCap float64 // 在 mesh 中的时间上限 FirstMessageDeliveriesWeight float64 // 首次消息传递的权重 FirstMessageDeliveriesDecay float64 // 首次消息传递的衰减 FirstMessageDeliveriesCap float64 // 首次消息传递的上限 MeshMessageDeliveriesWeight float64 // mesh 消息传递的权重 MeshMessageDeliveriesDecay float64 // mesh 消息传递的衰减 MeshMessageDeliveriesCap float64 // mesh 消息传递的上限 MeshMessageDeliveriesThreshold float64 // mesh 消息传递的阈值 MeshMessageDeliveriesWindow time.Duration // mesh 消息传递的窗口 MeshMessageDeliveriesActivation time.Duration // mesh 消息传递的激活时间 MeshFailurePenaltyWeight float64 // mesh 失败处罚的权重 MeshFailurePenaltyDecay float64 // mesh 失败处罚的衰减 InvalidMessageDeliveriesWeight float64 // 无效消息传递的权重 InvalidMessageDeliveriesDecay float64 // 无效消息传递的衰减 }
TopicScoreParams 包含用于控制主题分数的参数
type TopicScoreSnapshot ¶
type TopicScoreSnapshot struct { TimeInMesh time.Duration // 在 mesh 中的时间 FirstMessageDeliveries float64 // 首次消息传递 MeshMessageDeliveries float64 // mesh 消息传递 InvalidMessageDeliveries float64 // 无效消息传递 }
TopicScoreSnapshot 包含主题分数快照
type ValidationError ¶
type ValidationError struct {
Reason string // 错误原因
}
ValidationError 表示消息验证失败时可能会发出的错误
type ValidatorEx ¶
ValidatorEx 是一个扩展的验证函数,返回枚举决策
func NewBasicSeqnoValidator ¶
func NewBasicSeqnoValidator(meta PeerMetadataStore) ValidatorEx
NewBasicSeqnoValidator 构造一个使用给定 PeerMetadataStore 的 BasicSeqnoValidator。 参数:
- meta: 用于存储对等节点元数据的接口实例
返回值:
- ValidatorEx: 返回一个扩展验证器函数
type ValidatorOpt ¶
type ValidatorOpt func(addVal *addValReq) error
ValidatorOpt 是 RegisterTopicValidator 的选项类型
func WithValidatorConcurrency ¶
func WithValidatorConcurrency(n int) ValidatorOpt
WithValidatorConcurrency 是一个选项,用于设置主题验证器的节流大小,默认值为 1024 参数:
- n: int 节流大小
返回值:
- ValidatorOpt 验证器选项
func WithValidatorInline ¶
func WithValidatorInline(inline bool) ValidatorOpt
WithValidatorInline 是一个选项,用于设置验证器是否内联执行 参数:
- inline: bool 是否内联执行
返回值:
- ValidatorOpt 验证器选项
func WithValidatorTimeout ¶
func WithValidatorTimeout(timeout time.Duration) ValidatorOpt
WithValidatorTimeout 是一个选项,用于设置异步主题验证器的超时时间,默认无超时 参数:
- timeout: time.Duration 超时时间
返回值:
- ValidatorOpt 验证器选项
Source Files ¶
- backoff.go
- blacklist.go
- comm.go
- discovery.go
- doc.go
- floodsub.go
- gossip_tracer.go
- gossipsub.go
- gossipsub_feat.go
- log.go
- mcache.go
- midgen.go
- node_options.go
- node_pubsub.go
- node_status_tracker.go
- node_topicdiscovery.go
- peer_gater.go
- peer_notify.go
- pubsub.go
- randomsub.go
- score.go
- score_params.go
- sign.go
- subscription.go
- subscription_filter.go
- tag_tracer.go
- topic.go
- trace.go
- tracer.go
- validation.go
- validation_builtin.go