pubsub

package module
v0.0.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 27, 2024 License: Apache-2.0 Imports: 43 Imported by: 0

README

DEP2P PubSub

DEP2P PubSub 是一个基于 libp2p 的分布式发布订阅系统,提供了灵活的消息发布和订阅功能。除了传统的发布订阅模式外,还创新性地提供了请求-响应模式。

主要特性

1. 多种发布订阅模式
  • GossipSub: 基于 gossip 协议的高效消息传播
  • FloodSub: 简单的消息洪泛机制
  • RandomSub: 随机选择节点进行消息传播
2. 创新的请求-响应模式 (PublishWithReply)

除了传统的单向消息发布,本系统创新性地提供了 PublishWithReply 功能,支持发送消息并等待响应:

// 发送消息并等待响应
reply, err := topic.PublishWithReply(ctx, message)
// 指定目标节点发送消息并等待响应
reply, err := topic.PublishWithReply(ctx, message, targetPeerIDs...)
3. 与传统 PubSub 的对比:
特性 传统 PubSub DEP2P PubSub
消息流向 单向(发布->订阅) 双向(支持请求-响应)
响应处理 需要额外实现 内置支持
目标指定 广播给所有订阅者 可指定特定节点
3. 高级特性
  • 消息追踪: 支持消息全链路追踪
  • 节点发现: 自动发现网络中的其他节点
  • 状态监控: 实时监控节点状态和网络质量
  • 安全机制: 支持消息签名和验证

使用示例

基本发布订阅:
// 创建主题
topic, err := ps.Join("test-topic")
// 订阅消息
sub, err := topic.Subscribe()
msg, err := sub.Next(ctx)
// 发布消息
err = topic.Publish(ctx, []byte("Hello World"))
请求-响应模式:
// 发送方:
reply, err := topic.PublishWithReply(ctx, []byte("Request"))
fmt.Printf("收到响应: %s\n", reply)

// 接收方:
sub, err := topic.Subscribe()
msg, err := sub.Next(ctx)

// 发送响应
replyMsg := append([]byte("Reply: "), msg.Data...)
err = topic.Publish(ctx, replyMsg, WithMessageMetadata(
    msg.GetMetadata().GetMessageID(), 
    pb.MessageMetadata_RESPONSE,
))

性能优化

  • 支持消息缓存和批处理
  • 动态调整心跳间隔
  • 智能节点评分系统
  • 自适应的网络质量监控

配置选项

提供丰富的配置选项以适应不同场景:

options := &Options{
    SignMessages:     true,  // 启用消息签名
    ValidateMessages: true,  // 启用消息验证
    MaxMessageSize:   1024 * 1024, // 最大消息大小
    HeartbeatInterval: 500 * time.Millisecond,
    PubSubMode:       GossipSub, // 使用 GossipSub 模式
}

注意事项

  1. 合理配置消息大小限制
  2. 根据网络规模调整心跳间隔
  3. 在大规模网络中建议启用消息签名和验证
  4. 注意处理超时和错误情况

订阅响应模式详解

1. 工作原理

订阅响应模式基于以下核心机制:

  • 消息ID追踪: 每个请求消息都有唯一的 UUID,用于关联请求和响应
  • 消息类型标记: 通过 MessageMetadata 区分请求(REQUEST)和响应(RESPONSE)
  • 响应通道管理: 为每个请求维护一个专用的响应通道
  • 超时控制: 内置超时机制,避免无限等待响应
  • 重试机制: 支持自动重试失败的请求
2. 高级用法
2.1 定向请求响应
// 向特定节点发送请求
reply, err := topic.PublishWithReply(ctx, data, targetPeerID1, targetPeerID2)

// 批量处理多个响应
replies := make([][]byte, 0)
for i := 0; i < expectedResponses; i++ {
    reply, err := topic.PublishWithReply(ctx, data)
    if err == nil {
        replies = append(replies, reply)
    }
}
2.2 超时和重试控制
// 使用带超时的上下文
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// 带重试的请求发送
reply, err := topic.PublishWithReply(ctx, data)
if err == context.DeadlineExceeded {
    // 处理超时情况
}
2.3 响应处理模式
// 接收方处理请求并响应
sub, err := topic.Subscribe()
msg, err := sub.Next(ctx)

// 检查消息类型
if msg.GetMetadata().GetType() == pb.MessageMetadata_REQUEST {
    // 处理请求并发送响应
    response := processRequest(msg.Data)
    err = topic.Publish(ctx, response, WithMessageMetadata(
        msg.GetMetadata().GetMessageID(),
        pb.MessageMetadata_RESPONSE,
    ))
}
3. 性能优化建议
3.1 网络优化
  • 连接池管理: 维护活跃连接,减少建立连接的开销
  • 消息压缩: 对大型消息进行压缩传输
  • 批量处理: 合并多个小请求,减少网络往返
3.2 超时设置
const (
    MinTimeout = 1 * time.Second
    MaxTimeout = 30 * time.Second
    DefaultTimeout = 5 * time.Second
)

// 根据网络状况动态调整超时时间
timeout := calculateDynamicTimeout(networkLatency)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
3.3 错误处理策略
// 实现渐进式重试
for attempt := 1; attempt <= maxRetries; attempt++ {
    reply, err := topic.PublishWithReply(ctx, data)
    if err == nil {
        break
    }
    // 指数退避
    backoff := time.Duration(attempt * attempt) * baseDelay
    time.Sleep(backoff)
}
4. 应用场景
4.1 分布式服务发现
  • 节点能力查询
  • 服务状态检查
  • 资源可用性探测
4.2 分布式协调
  • 领导者选举
  • 配置同步
  • 状态协商
4.3 数据同步
  • 增量数据同步
  • 状态验证
  • 数据一致性检查
5. 监控和调试
5.1 内置指标
  • 请求响应延迟
  • 成功/失败率
  • 重试次数
  • 超时统计
5.2 日志追踪
// 启用详细日志
options := []PubOpt{
    WithMessageTracing(true),
    WithPerformanceMetrics(true),
}
6. 安全考虑
6.1 消息安全
  • 支持消息签名验证
  • 可选的消息加密
  • 防重放攻击保护
6.2 访问控制
  • 基于节点ID的权限控制
  • 请求频率限制
  • 消息大小限制
7. 最佳实践
  1. 错误处理

    • 始终设置合理的超时时间
    • 实现优雅的降级策略
    • 处理所有可能的错误情况
  2. 性能优化

    • 适当的缓存策略
    • 批量处理请求
    • 合理的重试策略
  3. 资源管理

    • 及时清理过期的响应通道
    • 控制并发请求数量
    • 监控资源使用情况
  4. 测试建议

    • 进行压力测试
    • 模拟网络延迟和故障
    • 验证错误恢复机制

贡献指南

欢迎提交 Issue 和 Pull Request。在提交代码前,请确保:

  1. 通过所有测试用例
  2. 遵循代码规范
  3. 更新相关文档
  4. 添加必要的测试用例

Documentation

Overview

pubsub 包提供了消息传播的发布/订阅模式,也称为覆盖多播。 实现提供基于主题的 pubsub,支持可插拔的路由算法。

该库的主要接口是 PubSub 对象。 可以使用以下构造函数创建此对象:

- NewFloodSub 创建一个使用 floodsub 路由算法的实例。

- NewGossipSub 创建一个使用 gossipsub 路由算法的实例。

- NewRandomSub 创建一个使用 randomsub 路由算法的实例。

此外,还有一个通用构造函数,用于创建具有自定义 PubSubRouter 接口的 pubsub 实例。 目前,此过程保留供包内部使用。

一旦构造了 PubSub 实例,需要与对等节点建立一些连接; 实现依赖于环境对等发现,将引导和活动对等发现留给客户端。

要向某个主题发布消息,请使用 Publish;您不需要订阅主题即可发布。

要订阅主题,请使用 Subscribe;这将为您提供一个订阅接口,通过该接口可以推送新消息。

Package pubsub 提供了发布订阅系统的实现

package pubsub 定义了分布式存储网络的核心功能

package pubsub 提供了发布订阅功能的实现

Index

Constants

View Source
const (
	MinBackoffDelay        = 100 * time.Millisecond // 最小退避延迟时间
	MaxBackoffDelay        = 10 * time.Second       // 最大退避延迟时间
	TimeToLive             = 10 * time.Minute       // 退避信息的存活时间
	BackoffCleanupInterval = 1 * time.Minute        // 清理退避信息的间隔时间
	BackoffMultiplier      = 2                      // 退避时间的倍增因子
	MaxBackoffJitterCoff   = 100                    // 最大退避抖动系数
	MaxBackoffAttempts     = 4                      // 最大退避尝试次数
)

定义了一些常量,用于控制退避算法的参数

View Source
const (
	FloodSubID              = protocol.ID("/floodsub/1.0.0") // FloodSub协议ID
	FloodSubTopicSearchSize = 5                              // FloodSub主题搜索大小
)

常量定义

View Source
const (
	// GossipSubID_v10 是 GossipSub 协议的版本 1.0.0 的协议 ID。
	// 它与 GossipSubID_v11 一起发布以实现向后兼容。
	GossipSubID_v10 = protocol.ID("/meshsub/1.0.0")

	// GossipSubID_v11 是 GossipSub 协议的版本 1.1.0 的协议 ID。
	// 参见规范了解 v1.1.0 与 v1.0.0 的详细比较:
	// https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md
	GossipSubID_v11 = protocol.ID("/meshsub/1.1.0")
)
View Source
const (
	// 协议支持基本的 GossipSub 网格 -- 与 gossipsub-v1.0 兼容
	GossipSubFeatureMesh = iota
	// 协议支持在修剪时的对等节点交换 -- 与 gossipsub-v1.1 兼容
	GossipSubFeaturePX
)
View Source
const (
	DefaultDecayInterval = time.Second // 默认的衰减间隔
	DefaultDecayToZero   = 0.01        // 默认的衰减到零值
)
View Source
const (
	// StrictSign 生成签名并期望验证传入的签名
	StrictSign = msgSigning | msgVerification
	// StrictNoSign 不生成签名,并丢弃和惩罚携带签名的传入消息
	StrictNoSign = msgVerification
	// LaxSign 生成签名,并仅在存在签名时验证传入的签名
	// 已弃用:建议严格启用或严格禁用签名。
	LaxSign = msgSigning
	// LaxNoSign 不生成签名,并仅在存在签名时验证传入的签名
	// 已弃用:建议严格启用或严格禁用签名。
	LaxNoSign = 0
)

常量表示严格的签名和验证策略

View Source
const (
	RejectBlacklstedPeer      = "blacklisted peer"        // 被列入黑名单的对等节点
	RejectBlacklistedSource   = "blacklisted source"      // 被列入黑名单的来源
	RejectMissingSignature    = "missing signature"       // 缺少签名
	RejectUnexpectedSignature = "unexpected signature"    // 意外的签名
	RejectUnexpectedAuthInfo  = "unexpected auth info"    // 意外的身份验证信息
	RejectInvalidSignature    = "invalid signature"       // 无效的签名
	RejectValidationQueueFull = "validation queue full"   // 验证队列已满
	RejectValidationThrottled = "validation throttled"    // 验证被限制
	RejectValidationFailed    = "validation failed"       // 验证失败
	RejectValidationIgnored   = "validation ignored"      // 验证被忽略
	RejectSelfOrigin          = "self originated message" // 自己发起的消息
)

拒绝消息的原因常量

View Source
const (
	// ValidationAccept 表示消息验证通过,应该被接受并交付给应用程序并转发到网络
	ValidationAccept = ValidationResult(0)
	// ValidationReject 表示消息验证失败,不应该交付给应用程序或转发到网络,并且转发该消息的对等节点应该被惩罚
	ValidationReject = ValidationResult(1)
	// ValidationIgnore 表示消息应该被忽略,不交付给应用程序或转发到网络,但与 ValidationReject 不同,转发该消息的对等节点不会被惩罚
	ValidationIgnore = ValidationResult(2)
)
View Source
const DefaultMaxMessageSize = 1 << 20

DefaultMaxMessageSize 定义默认的最大消息大小为1MB

View Source
const (
	DefaultPubsubProtocol = "/dep2p/pubsub/1.0.0"
)

DefaultPubsubProtocol 定义了默认的pubsub协议版本

View Source
const (
	// RandomSubID 是 RandomSub 路由器使用的协议 ID
	RandomSubID = protocol.ID("/randomsub/1.0.0")
)
View Source
const RemoteTracerProtoID = protocol.ID("/libp2p/pubsub/tracer/1.0.0")

RemoteTracerProtoID 是远程追踪协议的 ID

View Source
const SignPrefix = "libp2p-pubsub:"

SignPrefix 是签名前缀常量

Variables

View Source
var (

	// DiscoveryPollInitialDelay 是发现系统在首次启动后等待的时间
	DiscoveryPollInitialDelay = 0 * time.Millisecond
	// DiscoveryPollInterval 是发现系统在检查是否需要更多对等点之间等待的大致时间
	DiscoveryPollInterval = 1 * time.Second
)
View Source
var (
	// D 设置 GossipSub 主题网格的最佳度。
	GossipSubD = 6
	// Dlo 设置我们在 GossipSub 主题网格中保持的对等节点的下限。
	GossipSubDlo = 5
	// Dhi 设置我们在 GossipSub 主题网格中保持的对等节点的上限。
	GossipSubDhi = 12
	// Dscore 影响在由于过度订阅而修剪网格时如何选择对等节点。
	GossipSubDscore = 4
	// Dout 设置在主题网格中维护的出站连接的配额。
	GossipSubDout = 2
	// HistoryLength 控制用于 gossip 的消息缓存的大小。
	GossipSubHistoryLength = 5
	// HistoryGossip 控制我们将在 IHAVE gossip 消息中广告的缓存消息 ID 的数量。
	GossipSubHistoryGossip = 3
	// Dlazy 影响我们在每个心跳期间将 gossip 发送到的对等节点数量。
	GossipSubDlazy = 6
	// GossipFactor 影响我们在每个心跳期间将 gossip 发送到的对等节点数量。
	GossipSubGossipFactor = 0.25
	// GossipRetransmission 控制在开始忽略对等节点之前允许对等节点通过 IWANT gossip 请求相同消息 ID 的次数。
	GossipSubGossipRetransmission = 3
	// HeartbeatInitialDelay 是路由器初始化后心跳计时器开始之前的短暂延迟。
	GossipSubHeartbeatInitialDelay = 100 * time.Millisecond
	// HeartbeatInterval 控制心跳之间的时间。
	GossipSubHeartbeatInterval = 1 * time.Second
	// FanoutTTL 控制我们跟踪 fanout 状态的时间。
	GossipSubFanoutTTL = 60 * time.Second
	// PrunePeers 控制修剪对等节点交换中的对等节点数量。
	GossipSubPrunePeers = 16
	// PruneBackoff 控制修剪对等节点的回退时间。
	GossipSubPruneBackoff = time.Minute
	// UnsubscribeBackoff 控制取消订阅主题时使用的回退时间。
	GossipSubUnsubscribeBackoff = 10 * time.Second
	// Connectors 控制通过 PX 获取的对等节点的活动连接尝试数量。
	GossipSubConnectors = 8
	// MaxPendingConnections 设置通过 px 尝试的对等节点的最大挂起连接数。
	GossipSubMaxPendingConnections = 128
	// ConnectionTimeout 控制连接尝试的超时时间。
	GossipSubConnectionTimeout = 30 * time.Second
	// DirectConnectTicks 是尝试重新连接当前未连接的直接对等节点的心跳滴答次数。
	GossipSubDirectConnectTicks uint64 = 300
	// DirectConnectInitialDelay 是在打开与直接对等节点的连接之前的初始延迟。
	GossipSubDirectConnectInitialDelay = time.Second
	// OpportunisticGraftTicks 是尝试通过机会性移植改善网格的心跳滴答次数。
	GossipSubOpportunisticGraftTicks uint64 = 60
	// OpportunisticGraftPeers 是机会性移植的对等节点数量。
	GossipSubOpportunisticGraftPeers = 2
	// GraftFloodThreshold 是在最后一次修剪后经过的时间内 GRAFT 提供的额外分数惩罚。
	GossipSubGraftFloodThreshold = 10 * time.Second
	// MaxIHaveLength 是包含在 IHAVE 消息中的最大消息数量。
	GossipSubMaxIHaveLength = 5000
	// MaxIHaveMessages 是在心跳期间从对等节点接受的最大 IHAVE 消息数量。
	GossipSubMaxIHaveMessages = 10
	// IWantFollowupTime 是在 IHAVE 广告之后通过 IWANT 请求消息的等待时间。
	GossipSubIWantFollowupTime = 3 * time.Second
)

定义 gossipsub 的默认参数。

View Source
var (
	DefaultPeerGaterRetainStats     = 6 * time.Hour                        // 保留统计数据的默认时间
	DefaultPeerGaterQuiet           = time.Minute                          // 安静期的默认时间
	DefaultPeerGaterDuplicateWeight = 0.125                                // 重复消息的权重
	DefaultPeerGaterIgnoreWeight    = 1.0                                  // 忽略消息的权重
	DefaultPeerGaterRejectWeight    = 16.0                                 // 拒绝消息的权重
	DefaultPeerGaterThreshold       = 0.33                                 // 门控阈值
	DefaultPeerGaterGlobalDecay     = ScoreParameterDecay(2 * time.Minute) // 全局衰减参数
	DefaultPeerGaterSourceDecay     = ScoreParameterDecay(time.Hour)       // 来源衰减参数
)

默认参数定义

View Source
var (
	// TimeCacheDuration 指定消息ID会被记住多长时间
	TimeCacheDuration = 120 * time.Second

	// TimeCacheStrategy 指定 seen messages cache 的查找/清理策略
	TimeCacheStrategy = timecache.Strategy_FirstSeen

	// ErrSubscriptionCancelled 当订阅被取消后调用 Next() 时可能返回的错误
	ErrSubscriptionCancelled = errors.New("subscription cancelled")
)
View Source
var (
	// GossipSubConnTagBumpMessageDelivery 表示每次一个对等节点首次在一个主题中传递消息时,添加到连接管理器标签中的量。
	// 每次对等节点首次在一个主题中传递消息时,我们将此标签增加该量,直到最大值 GossipSubConnTagMessageDeliveryCap。
	// 注意,传递标签会随着时间衰减,在每个 GossipSubConnTagDecayInterval 期间减少 GossipSubConnTagDecayAmount。
	GossipSubConnTagBumpMessageDelivery = 1

	// GossipSubConnTagDecayInterval 是连接管理器标签衰减的时间间隔。
	GossipSubConnTagDecayInterval = 10 * time.Minute

	// GossipSubConnTagDecayAmount 是在每个衰减间隔期间从衰减标签值中减去的量。
	GossipSubConnTagDecayAmount = 1

	// GossipSubConnTagMessageDeliveryCap 是用于跟踪消息传递的连接管理器标签的最大值。
	GossipSubConnTagMessageDeliveryCap = 15
)
View Source
var ErrEmptyPeerID = errors.New("空的对等节点 ID")

ErrEmptyPeerID 表示如果提供了一个空的对等节点 ID,将返回此错误。

View Source
var ErrNilSignKey = errors.New("空的私钥")

ErrNilSignKey 表示如果提供了一个空的私钥,将返回此错误。

View Source
var ErrTooManySubscriptions = errors.New("too many subscriptions")

ErrTooManySubscriptions 可能由 SubscriptionFilter 返回,以表示订阅过多无法处理的错误

View Source
var ErrTopicClosed = errors.New("主题已关闭,请打开一个新的主题")

ErrTopicClosed 表示如果在主题关闭后使用 Topic,将返回此错误。

View Source
var GossipSubDefaultProtocols = []protocol.ID{GossipSubID_v11, GossipSubID_v10, FloodSubID}

GossipSubDefaultProtocols 是默认的 gossipsub 路由器协议列表

View Source
var MinTraceBatchSize = 16

MinTraceBatchSize 设置最小批处理大小,默认为 16。

View Source
var (
	// RandomSubD 是 RandomSub 路由器使用的最小对等节点数
	RandomSubD = 6
)
View Source
var TraceBufferSize = 1 << 16

TraceBufferSize 设置追踪缓冲区大小,默认为 64K。

Functions

func Debug

func Debug(args ...interface{})

Debug 记录调试级别的日志 参数:

  • args: 日志参数

func Debugf

func Debugf(format string, args ...interface{})

Debugf 记录格式化的调试级别日志 参数:

  • format: 格式化字符串
  • args: 格式化参数

func DefaultMsgIdFn

func DefaultMsgIdFn(pmsg *pb.Message) string

DefaultMsgIdFn 返回传入消息的唯一 ID 参数:

  • pmsg: 传入的消息

返回值:

  • string: 消息的唯一 ID

func DefaultPeerFilter

func DefaultPeerFilter(pid peer.ID, topic string) bool

DefaultPeerFilter 接受所有主题的所有 peers 参数:

  • pid: peer ID
  • topic: 主题

返回值:

  • bool: 是否接受该 peer

func Error

func Error(args ...interface{})

Error 记录错误级别的日志 参数:

  • args: 日志参数

func Errorf

func Errorf(format string, args ...interface{})

Errorf 记录格式化的错误级别日志 参数:

  • format: 格式化字符串
  • args: 格式化参数

func Fatal

func Fatal(args ...interface{})

Fatal 记录致命级别的日志 参数:

  • args: 日志参数

func Fatalf

func Fatalf(format string, args ...interface{})

Fatalf 记录格式化的致命级别日志 参数:

  • format: 格式化字符串
  • args: 格式化参数

func FilterSubscriptions

func FilterSubscriptions(subs []*pb.RPC_SubOpts, filter func(string) bool) []*pb.RPC_SubOpts

FilterSubscriptions 过滤并去重订阅列表。 filter 应返回 true 如果一个主题是感兴趣的。 参数: - subs: 包含订阅通知的 RPC_SubOpts 列表 - filter: 用于确定是否感兴趣的过滤函数 返回值: - []*pb.RPC_SubOpts: 过滤后的订阅列表

func GetLevel

func GetLevel() logrus.Level

GetLevel 获取当前的日志级别

func GossipSubDefaultFeatures

func GossipSubDefaultFeatures(feat GossipSubFeature, proto protocol.ID) bool

GossipSubDefaultFeatures 是默认 gossipsub 协议的功能测试函数 参数:

  • feat: 要测试的功能
  • proto: 协议ID

返回值:

  • bool: 如果协议支持该功能,则返回 true

func Info

func Info(args ...interface{})

Info 记录信息级别的日志 参数:

  • args: 日志参数

func Infof

func Infof(format string, args ...interface{})

Infof 记录格式化的信息级别日志 参数:

  • format: 格式化字符串
  • args: 格式化参数

func LogMessage

func LogMessage(level logrus.Level, args ...interface{})

LogMessage 记录指定级别的日志

func Logf

func Logf(level logrus.Level, format string, args ...interface{})

Logf 记录格式化的指定级别日志

func Panic

func Panic(args ...interface{})

Panic 记录 panic 级别的日志 参数:

  • args: 日志参数

func Panicf

func Panicf(format string, args ...interface{})

Panicf 记录格式化的 panic 级别日志 参数:

  • format: 格式化字符串
  • args: 格式化参数

func ParseLevel

func ParseLevel(level string) (logrus.Level, error)

ParseLevel 解析字符串表示的日志级别

func Print

func Print(args ...interface{})

Print 记录信息级别的日志 参数:

  • args: 日志参数

func Printf

func Printf(format string, args ...interface{})

Printf 记录格式化的信息级别日志 参数:

  • format: 格式化字符串
  • args: 格式化参数

func Println

func Println(args ...interface{})

Println 记录信息级别的日志,并在末尾添加换行符 参数:

  • args: 日志参数

func ScoreParameterDecay

func ScoreParameterDecay(decay time.Duration) float64

ScoreParameterDecay 计算参数的衰减因子,假设 DecayInterval 为 1s 并且值在低于 0.01 时衰减到零 参数:

  • decay: 衰减时间

返回值:

  • float64: 衰减因子

func ScoreParameterDecayWithBase

func ScoreParameterDecayWithBase(decay time.Duration, base time.Duration, decayToZero float64) float64

ScoreParameterDecayWithBase 使用基准 DecayInterval 计算参数的衰减因子 参数:

  • decay: 衰减时间
  • base: 基准衰减间隔
  • decayToZero: 衰减到零值

返回值:

  • float64: 衰减因子

func SetLevel

func SetLevel(level logrus.Level)

SetLevel 设置日志级别 参数:

  • level: 日志级别

func SetOutput

func SetOutput(output *os.File)

SetOutput 设置日志输出 参数:

  • output: 输出文件

func SetReportCaller

func SetReportCaller(reportCaller bool)

SetReportCaller 设置是否在日志中报告调用者信息

func Trace

func Trace(args ...interface{})

Trace 记录跟踪级别的日志

func Tracef

func Tracef(format string, args ...interface{})

Tracef 记录格式化的跟踪级别日志

func Warn

func Warn(args ...interface{})

Warn 记录警告级别的日志 参数:

  • args: 日志参数

func Warnf

func Warnf(format string, args ...interface{})

Warnf 记录格式化的警告级别日志 参数:

  • format: 格式化字符串
  • args: 格式化参数

func WithError

func WithError(err error) *logrus.Entry

WithError 创建一个包含错误信息的日志条目

func WithField

func WithField(key string, value interface{}) *logrus.Entry

WithField 创建一个带有单个字段的日志条目 参数:

  • key: 字段键
  • value: 字段值

返回值:

  • *logrus.Entry: 带有单个字段的日志条目

func WithFields

func WithFields(fields logrus.Fields) *logrus.Entry

WithFields 创建一个带有字段的日志条目 参数:

  • fields: 日志字段

返回值:

  • *logrus.Entry: 带有字段的日志条目

Types

type AcceptStatus

type AcceptStatus int

AcceptStatus 是表示是否接受传入 RPC 的枚举

const (
	// AcceptNone 表示丢弃传入的 RPC
	AcceptNone AcceptStatus = iota
	// AcceptControl 表示仅接受传入 RPC 的控制消息
	AcceptControl
	// AcceptAll 表示接受传入 RPC 的全部内容
	AcceptAll
)

type BackoffConnectorFactory

type BackoffConnectorFactory func(host host.Host) (*discimpl.BackoffConnector, error)

BackoffConnectorFactory 创建一个附加到给定主机的 BackoffConnector

type BasicSeqnoValidator

type BasicSeqnoValidator struct {
	// contains filtered or unexported fields
}

BasicSeqnoValidator 是一个基本验证器,可用作默认验证器,忽略超出已见缓存窗口的重放消息。 验证器使用消息序号作为对等节点特定的 nonce 来决定是否应传播消息,比较对等节点元数据存储中的最大 nonce。 这有助于确保无论已见缓存跨度和网络直径如何,网络中都不会存在无限传播的消息。 它要求 pubsub 实例化时具有严格的消息签名策略,并且序号未被禁用,即不支持匿名模式。

警告:请参阅 https://github.com/libp2p/rust-libp2p/issues/3453 简而言之:rust 当前通过发出随机序号违反了规范,这带来了互操作性风险。 我们预计此问题将在不久的将来得到解决,但如果您处于与(较旧的)rust 节点混合的环境中,请牢记这一点。

type Blacklist

type Blacklist interface {
	Add(peer.ID) bool      // 将节点添加到黑名单
	Contains(peer.ID) bool // 检查节点是否在黑名单中
}

Blacklist 是一个接口,定义了对等节点黑名单的方法

func NewMapBlacklist

func NewMapBlacklist() Blacklist

NewMapBlacklist 创建一个新的 MapBlacklist 实例

func NewTimeCachedBlacklist

func NewTimeCachedBlacklist(expiry time.Duration) (Blacklist, error)

NewTimeCachedBlacklist 创建一个带有指定过期时间的新 TimeCachedBlacklist 实例 参数:

  • expiry: 黑名单条目的过期时间

返回值:

  • Blacklist: 初始化后的 TimeCachedBlacklist 实例
  • error: 如果有错误发生则返回错误

type CacheEntry

type CacheEntry struct {
	// contains filtered or unexported fields
}

CacheEntry 表示消息缓存条目。

type DefaultLogger

type DefaultLogger struct {
	*logrus.Logger // 嵌入 logrus.Logger
}

DefaultLogger 是默认的日志记录器实现

type DiscoverOpt

type DiscoverOpt func(*discoverOptions) error

DiscoverOpt 是一个函数类型,用于配置发现选项

func WithDiscoverConnector

func WithDiscoverConnector(connFactory BackoffConnectorFactory) DiscoverOpt

WithDiscoverConnector 添加一个自定义连接器,该连接器处理发现子系统如何连接到对等节点 参数:

  • connFactory: 退避连接器工厂,用于创建一个自定义的连接器

返回值:

  • DiscoverOpt: 配置发现选项的函数,用于设置发现配置

func WithDiscoveryOpts

func WithDiscoveryOpts(opts ...discovery.Option) DiscoverOpt

WithDiscoveryOpts 传递 libp2p 发现选项到 PubSub 发现子系统 参数:

  • opts: 发现选项,这些选项将被应用到发现子系统中

返回值:

  • DiscoverOpt: 配置发现选项的函数,用于设置发现配置

type EventTracer

type EventTracer interface {
	// Trace 方法用于记录一个追踪事件。
	Trace(evt *pb.TraceEvent)
}

EventTracer 是一个通用的事件追踪器接口。 这是一个高级追踪接口,它传递由 pb/trace.proto 中定义的追踪事件。

type EventType

type EventType int

EventType 表示事件类型。

const (
	PeerJoin  EventType = iota // 对等节点加入事件
	PeerLeave                  // 对等节点离开事件
)

type ExtendedPeerScoreInspectFn

type ExtendedPeerScoreInspectFn = func(map[peer.ID]*PeerScoreSnapshot)

ExtendedPeerScoreInspectFn 定义扩展的对等节点分数检查函数类型

type FloodSubRouter

type FloodSubRouter struct {
	// contains filtered or unexported fields
}

FloodSubRouter 结构体定义了FloodSub路由器

func (*FloodSubRouter) AcceptFrom

func (fs *FloodSubRouter) AcceptFrom(peer.ID) AcceptStatus

AcceptFrom 决定是否接受来自特定对等节点的消息 参数:

  • peer: 对等节点ID

返回值:

  • AcceptStatus: 接受状态

func (*FloodSubRouter) AddPeer

func (fs *FloodSubRouter) AddPeer(p peer.ID, proto protocol.ID)

AddPeer 添加对等节点到FloodSubRouter 参数:

  • p: 对等节点ID
  • proto: 协议ID

func (*FloodSubRouter) Attach

func (fs *FloodSubRouter) Attach(p *PubSub)

Attach 将FloodSubRouter附加到PubSub实例 参数:

  • p: PubSub实例

func (*FloodSubRouter) EnoughPeers

func (fs *FloodSubRouter) EnoughPeers(topic string, suggested int) bool

EnoughPeers 检查是否有足够的对等节点来支持特定主题。 参数:

  • topic: 主题名称
  • suggested: 建议的对等节点数量

返回值:

  • bool: 是否有足够的对等节点支持该主题

func (*FloodSubRouter) HandleRPC

func (fs *FloodSubRouter) HandleRPC(rpc *RPC)

HandleRPC 处理接收到的RPC消息 参数:

  • rpc: RPC消息

func (*FloodSubRouter) Join

func (fs *FloodSubRouter) Join(topic string)

Join 加入主题 参数:

  • topic: 主题

func (*FloodSubRouter) Leave

func (fs *FloodSubRouter) Leave(topic string)

Leave 离开主题 参数:

  • topic: 主题

func (*FloodSubRouter) Protocols

func (fs *FloodSubRouter) Protocols() []protocol.ID

Protocols 返回FloodSubRouter支持的协议列表 返回值:

  • []protocol.ID: 支持的协议列表

func (*FloodSubRouter) Publish

func (fs *FloodSubRouter) Publish(msg *Message)

Publish 发布消息到主题 参数:

  • msg: 消息

func (*FloodSubRouter) RemovePeer

func (fs *FloodSubRouter) RemovePeer(p peer.ID)

RemovePeer 从FloodSubRouter中移除对等节点 参数:

  • p: 对等节点ID

type GossipSubFeature

type GossipSubFeature int

GossipSubFeature 是一个功能判别枚举

type GossipSubFeatureTest

type GossipSubFeatureTest = func(GossipSubFeature, protocol.ID) bool

GossipSubFeatureTest 是一个功能测试函数;它接受一个功能和协议ID,并且如果协议支持该功能则返回 true

type GossipSubParams

type GossipSubParams struct {

	// D 设置 GossipSub 主题网格的理想度数。例如,如果 D == 6,
	// 每个节点希望在他们订阅的每个主题中保持大约六个节点在他们的网格中。
	// D 应该设置在 Dlo 和 Dhi 之间的某个值。
	D int

	// Dlo 设置 GossipSub 主题网格中保持的最少节点数。
	// 如果我们拥有的节点少于 Dlo,我们将在下一个心跳中尝试添加更多节点到网格中。
	Dlo int

	// Dhi 设置 GossipSub 主题网格中保持的最多节点数。
	// 如果我们拥有的节点多于 Dhi,我们将在下一个心跳中选择一些节点从网格中移除。
	Dhi int

	// Dscore 影响由于过度订阅而修剪网格时选择保留哪些节点。
	// 保留的节点中至少 Dscore 个将是高分节点,而其余的节点将随机选择。
	Dscore int

	// Dout 设置在主题网格中保持的出站连接的配额。
	// 当网格因过度订阅而被修剪时,我们确保至少与 Dout 个幸存节点保持出站连接。
	// 这可以防止 sybil 攻击者通过大量的入站连接压倒我们的网格。
	//
	// Dout 必须设置在 Dlo 之下,并且不能超过 D / 2。
	Dout int

	// HistoryLength 控制用于 gossip 的消息缓存的大小。
	// 消息缓存将在 HistoryLength 个心跳内记住消息。
	HistoryLength int

	// HistoryGossip 控制我们将在 IHAVE gossip 消息中通告的缓存消息 ID 的数量。
	// 当被要求提供我们看到的消息 ID 时,我们将只返回来自最近 HistoryGossip 次心跳的那些消息。
	// HistoryGossip 和 HistoryLength 之间的差距使我们避免广告即将过期的消息。
	//
	// HistoryGossip 必须小于或等于 HistoryLength 以避免运行时 panic。
	HistoryGossip int

	// Dlazy 影响我们在每次心跳时将 gossip 发送给多少节点。
	// 我们将至少向 Dlazy 个网格外部的节点发送 gossip。实际的数量可能更多,
	// 取决于 GossipFactor 和我们连接的节点数。
	Dlazy int

	// GossipFactor 影响我们在每次心跳时将 gossip 发送给多少节点。
	// 我们将 gossip 发送给 GossipFactor * (非网格节点的总数),或者 Dlazy,
	// 以较大者为准。
	GossipFactor float64

	// GossipRetransmission 控制在开始忽略节点之前,我们允许节点通过 IWANT gossip 请求相同消息 ID 的次数。
	// 这样设计是为了防止节点通过请求消息浪费我们的资源。
	GossipRetransmission int

	// HeartbeatInitialDelay 是在路由器初始化后,心跳定时器开始之前的短暂延迟。
	HeartbeatInitialDelay time.Duration

	// HeartbeatInterval 控制心跳之间的时间间隔。
	HeartbeatInterval time.Duration

	// SlowHeartbeatWarning 是心跳处理时间超过该阈值时触发警告的持续时间;这表明节点可能过载。
	SlowHeartbeatWarning float64

	// FanoutTTL 控制我们保持 fanout 状态的时间。如果自上次我们发布到一个我们没有订阅的主题以来,
	// 已经过了 FanoutTTL,我们将删除该主题的 fanout 映射。
	FanoutTTL time.Duration

	// PrunePeers 控制在 prune Peer eXchange 中包含的节点数量。
	// 当我们修剪一个符合 PX 条件的节点(得分良好等)时,我们会尝试向他们发送我们知道的最多 PrunePeers 个节点的签名节点记录。
	PrunePeers int

	// PruneBackoff 控制被修剪节点的退避时间。这是节点在被修剪后重新尝试加入我们网格之前必须等待的时间。
	// 当修剪节点时,我们会向他们发送我们设置的 PruneBackoff 值,以便他们知道最短等待时间。
	// 运行旧版本的节点可能不会发送退避时间,所以如果我们收到没有退避时间的 prune 消息,
	// 我们将在重新加入前等待至少 PruneBackoff 时间。
	PruneBackoff time.Duration

	// UnsubscribeBackoff 控制取消订阅主题时使用的退避时间。
	// 节点在此期间不应重新订阅该主题。
	UnsubscribeBackoff time.Duration

	// Connectors 控制通过 PX 获取的节点的活动连接尝试数量。
	Connectors int

	// MaxPendingConnections 设置通过 PX 尝试连接的节点的最大挂起连接数。
	MaxPendingConnections int

	// ConnectionTimeout 控制连接尝试的超时时间。
	ConnectionTimeout time.Duration

	// DirectConnectTicks 是尝试重新连接当前未连接的直接节点的心跳刻度数。
	DirectConnectTicks uint64

	// DirectConnectInitialDelay 是开始与直接节点建立连接之前的初始延迟。
	DirectConnectInitialDelay time.Duration

	// OpportunisticGraftTicks 是通过机会性 grafting 改善网格的心跳刻度数。
	// 每当经过 OpportunisticGraftTicks 次心跳时,我们将尝试选择一些高分网格节点来替换低分的节点,
	// 如果我们网格节点的中位数得分低于阈值(见 https://godoc.org/bpfs#PeerScoreThresholds)。
	OpportunisticGraftTicks uint64

	// OpportunisticGraftPeers 是要机会性 graft 的节点数。
	OpportunisticGraftPeers int

	// 如果在上次 PRUNE 之后 GraftFloodThreshold 时间内收到 GRAFT,
	// 则对节点应用额外的得分惩罚,通过 P7 来实现。
	GraftFloodThreshold time.Duration

	// MaxIHaveLength 是 IHAVE 消息中包含的最大消息数量。
	// 还控制在一个心跳内,我们从节点接受和请求的 IHAVE ids 的最大数量,以防止 IHAVE 泛滥。
	// 如果您的系统在 HistoryGossip 心跳中推送了超过 5000 条消息,您应调整该值;
	// 默认情况下,这意味着 1666 条消息/秒。
	MaxIHaveLength int

	// MaxIHaveMessages 是在一个心跳内从节点接受的 IHAVE 消息的最大数量。
	MaxIHaveMessages int

	// IWantFollowupTime 是在 IHAVE 广告后等待通过 IWANT 请求的消息的时间。
	// 如果在此窗口内未收到消息,则声明违反承诺,路由器可能会应用行为惩罚。
	IWantFollowupTime time.Duration
}

GossipSubParams 定义了所有 gossipsub 特定的参数。

func DefaultGossipSubParams

func DefaultGossipSubParams() GossipSubParams

DefaultGossipSubParams 返回默认的 gossipsub 参数。 返回值:

  • GossipSubParams: gossipsub 参数

type GossipSubRouter

type GossipSubRouter struct {
	// contains filtered or unexported fields
}

GossipSubRouter 是一个实现 gossipsub 协议的路由器。 对于我们加入的每个主题,我们维护一个消息流过的覆盖层;这是 mesh map。 对于我们发布但没有加入的每个主题,我们维护一个对等节点列表,用于在覆盖层中注入我们的消息;这是 fanout map。 如果我们没有发布任何消息到 fanout 主题的 fanout 对等节点列表在 GossipSubFanoutTTL 之后将过期。

func DefaultGossipSubRouter

func DefaultGossipSubRouter(h host.Host) *GossipSubRouter

DefaultGossipSubRouter 返回一个带有默认参数的新的 GossipSubRouter。 参数:

  • h: 主机

返回值:

  • *GossipSubRouter: GossipSubRouter 对象

func (*GossipSubRouter) AcceptFrom

func (gs *GossipSubRouter) AcceptFrom(p peer.ID) AcceptStatus

AcceptFrom 检查是否接受来自对等节点的消息。 参数:

  • p: peer.ID 类型,表示对等节点的 ID。

返回值:

  • AcceptStatus: 返回接受状态,表示是否接受消息。

func (*GossipSubRouter) AddPeer

func (gs *GossipSubRouter) AddPeer(p peer.ID, proto protocol.ID)

AddPeer 添加对等节点。 参数:

  • p: peer.ID 类型,表示对等节点的 ID。
  • proto: protocol.ID 类型,表示协议 ID。

func (*GossipSubRouter) Attach

func (gs *GossipSubRouter) Attach(p *PubSub)

Attach 将 GossipSubRouter 附加到 PubSub 实例。 参数:

  • p: PubSub 实例

func (*GossipSubRouter) EnoughPeers

func (gs *GossipSubRouter) EnoughPeers(topic string, suggested int) bool

EnoughPeers 检查主题是否有足够的对等节点。 参数:

  • topic: string 类型,表示主题名称。
  • suggested: int 类型,表示建议的对等节点数量。

返回值:

  • bool: 返回布尔值,表示是否有足够的对等节点。

func (*GossipSubRouter) HandleRPC

func (gs *GossipSubRouter) HandleRPC(rpc *RPC)

HandleRPC 处理 RPC 消息。 参数:

  • rpc: *RPC 类型,表示要处理的 RPC 消息。

func (*GossipSubRouter) Join

func (gs *GossipSubRouter) Join(topic string)

Join 加入主题。 参数:

  • topic: string 类型,表示主题名称。

func (*GossipSubRouter) Leave

func (gs *GossipSubRouter) Leave(topic string)

Leave 离开主题。 参数:

  • topic: string 类型,表示主题名称。

func (*GossipSubRouter) Protocols

func (gs *GossipSubRouter) Protocols() []protocol.ID

Protocols 返回协议列表。 返回值:

  • []protocol.ID: 协议列表

func (*GossipSubRouter) Publish

func (gs *GossipSubRouter) Publish(msg *Message)

Publish 发布消息。 参数:

  • msg: *Message 类型,表示要发布的消息。

func (*GossipSubRouter) RemovePeer

func (gs *GossipSubRouter) RemovePeer(p peer.ID)

RemovePeer 移除对等节点。 参数:

  • p: peer.ID 类型,表示对等节点的 ID。

func (*GossipSubRouter) WithDefaultTagTracer

func (gs *GossipSubRouter) WithDefaultTagTracer() Option

WithDefaultTagTracer 返回 GossipSubRouter 的标签跟踪器作为 PubSub 选项。 这对于 GossipSubRouter 在外部实例化并作为依赖项注入 GossipSub 构造函数的情况很有用。 这允许标签跟踪器也作为 PubSub 选项依赖项注入 GossipSub 构造函数。

type JSONTracer

type JSONTracer struct {
	// contains filtered or unexported fields
}

JSONTracer 是一个将事件写入文件的追踪器,事件以 ndjson 格式编码。

func NewJSONTracer

func NewJSONTracer(file string) (*JSONTracer, error)

NewJSONTracer 创建一个新的 JSONTracer,将追踪信息写入文件。 参数:

  • file: 文件路径

返回值:

  • *JSONTracer: JSONTracer 对象
  • error: 错误信息

func OpenJSONTracer

func OpenJSONTracer(file string, flags int, perm os.FileMode) (*JSONTracer, error)

OpenJSONTracer 创建一个新的 JSONTracer,可以显式控制文件打开的标志和权限。 参数:

  • file: 文件路径
  • flags: 文件打开标志
  • perm: 文件权限

返回值:

  • *JSONTracer: JSONTracer 对象
  • error: 错误信息

func (*JSONTracer) Close

func (t *JSONTracer) Close()

Close 关闭追踪器

func (*JSONTracer) Trace

func (t *JSONTracer) Trace(evt *pb.TraceEvent)

Trace 向追踪器添加一个事件 参数:

  • evt: 要添加的事件

type MapBlacklist

type MapBlacklist map[peer.ID]struct{}

MapBlacklist 是一种使用map实现的黑名单

func (MapBlacklist) Add

func (b MapBlacklist) Add(p peer.ID) bool

Add 将节点添加到 MapBlacklist 中 参数:

  • p: 需要添加到黑名单的节点ID

返回值:

  • bool: 添加操作是否成功

func (MapBlacklist) Contains

func (b MapBlacklist) Contains(p peer.ID) bool

Contains 检查节点是否在 MapBlacklist 中 参数:

  • p: 需要检查的节点ID

返回值:

  • bool: 节点是否在黑名单中

type Message

type Message struct {
	*pb.Message               // 嵌入的 Protocol Buffers 生成的消息结构体
	ID            string      // 消息的唯一标识符
	ReceivedFrom  peer.ID     // 发送该消息的节点ID
	ValidatorData interface{} // 验证器相关数据,可能包含验证消息的元数据
	Local         bool        // 指示消息是否是本地生成的
}

Message 表示一个消息

func (*Message) GetFrom

func (m *Message) GetFrom() peer.ID

GetFrom 获取消息的发送者 返回值:

  • peer.ID: 消息的发送者 ID

type MessageCache

type MessageCache struct {
	// contains filtered or unexported fields
}

MessageCache 是一个滑动窗口缓存,记住消息长达一定的历史长度。

func NewMessageCache

func NewMessageCache(gossip, history int) *MessageCache

NewMessageCache 创建一个滑动窗口缓存,记住消息长达 `history` 个插槽。 当查询要通告的消息时,缓存仅返回最后 `gossip` 个插槽中的消息。 `gossip` 参数必须小于或等于 `history`,否则该函数会引发 panic。 在通过 IHAVE gossip 通告消息和通过 IWANT 命令获取消息之间的反应时间之间存在松弛。

func (*MessageCache) Get

func (mc *MessageCache) Get(mid string) (*Message, bool)

Get 从缓存中获取消息。 参数:

  • mid: 消息 ID

返回值:

  • *Message: 消息指针
  • bool: 是否存在该消息

func (*MessageCache) GetForPeer

func (mc *MessageCache) GetForPeer(mid string, p peer.ID) (*Message, int, bool)

GetForPeer 从缓存中获取对等节点的消息。 参数:

  • mid: 消息 ID
  • p: 对等节点 ID

返回值:

  • *Message: 消息指针
  • int: 对等节点事务计数
  • bool: 是否存在该消息

func (*MessageCache) GetGossipIDs

func (mc *MessageCache) GetGossipIDs(topic string) []string

GetGossipIDs 获取给定主题的 gossip 消息 ID 列表。 参数:

  • topic: 主题名称

返回值:

  • []string: gossip 消息 ID 列表

func (*MessageCache) Put

func (mc *MessageCache) Put(msg *Message)

Put 将消息放入缓存。 参数:

  • msg: 要放入缓存的消息

func (*MessageCache) SetMsgIdFn

func (mc *MessageCache) SetMsgIdFn(msgID func(*Message) string)

SetMsgIdFn 设置消息 ID 生成函数。 参数:

  • msgID: 消息 ID 生成函数

func (*MessageCache) Shift

func (mc *MessageCache) Shift()

Shift 移动缓存窗口,丢弃最旧的插槽并腾出空间存储新的消息。

type MessageMetadataOpt

type MessageMetadataOpt struct {
	// contains filtered or unexported fields
}

MessageMetadataOpt 表示消息元信息的选项。

type MessageSignaturePolicy

type MessageSignaturePolicy uint8

MessageSignaturePolicy 描述是否生成、期望和/或验证签名的策略。

type MsgIdFunction

type MsgIdFunction func(pmsg *pb.Message) string

MsgIdFunction 返回传递的消息的唯一 ID,PubSub 可以通过配置 Option from WithMessageIdFn 使用任何此函数的实现。 参数:

  • pmsg: 要计算 ID 的消息。

返回值:

  • string: 消息 ID。

type Node

type Node struct {
	ID                  peer.ID         // 节点ID
	Status              NodeStatus      // 当前状态
	LastSeen            time.Time       // 最后一次看到节点的时间
	FailedAttempts      int             // 连续失败尝试次数
	Score               float64         // 节点评分
	History             []StatusChange  // 状态变化历史
	CheckInterval       time.Duration   // 检查间隔
	SuccessiveSuccesses int             // 连续成功次数
	LastCheckTime       time.Time       // 上次检查时间
	ConnectionQuality   float64         // 连接质量
	NetworkCondition    []time.Duration // 网络延迟历史
}

Node 表示一个节点及其状态息

type NodeOption

type NodeOption func(*Options) error

NodeOption 定义了一个函数类型,用于配置PubSub 参数:

  • *Options: 需要配置的选项对象

返回值:

  • error: 配置过程中的错误信息

func WithNodeDiscovery added in v0.0.3

func WithNodeDiscovery(d discovery.Discovery) NodeOption

WithNodeDiscovery 设置 Discovery 服务 参数:

  • d: 要设置的 Discovery 服务实例

返回值:

  • NodeOption: 返回一个配置函数

func WithSetD

func WithSetD(d int) NodeOption

WithSetD 设置 GossipSub 主题网格的理想度数 参数:

  • d: 要设置的理想度数

返回值:

  • NodeOption: 返回一个配置函数

func WithSetDirectPeers

func WithSetDirectPeers(peers []peer.AddrInfo) NodeOption

WithSetDirectPeers 设置直连对等节点列表 参数:

  • peers: 要设置的直连对等节点列表

返回值:

  • NodeOption: 返回一个配置函数

func WithSetDlo

func WithSetDlo(dlo int) NodeOption

WithSetDlo 设置 GossipSub 主题网格中保持的最少节点数 参数:

  • dlo: 要设置的最少节点数

返回值:

  • NodeOption: 返回一个配置函数

func WithSetFollowupTime

func WithSetFollowupTime(t time.Duration) NodeOption

WithSetFollowupTime 设置跟随时间 参数:

  • t: 要设置的跟随时间

返回值:

  • NodeOption: 返回一个配置函数

func WithSetGossipFactor

func WithSetGossipFactor(f float64) NodeOption

WithSetGossipFactor 设置Gossip因子 参数:

  • f: 要设置的Gossip因子

返回值:

  • NodeOption: 返回一个配置函数

func WithSetHeartbeatInterval

func WithSetHeartbeatInterval(interval time.Duration) NodeOption

WithSetHeartbeatInterval 设置心跳间隔 参数:

  • interval: 要设置的心跳间隔

返回值:

  • NodeOption: 返回一个配置函数

func WithSetLoadConfig

func WithSetLoadConfig(load bool) NodeOption

WithSetLoadConfig 设置是否加载配置选项 参数:

  • load: 是否加载配置

返回值:

  • NodeOption: 返回一个配置函数

func WithSetMaxMessageSize

func WithSetMaxMessageSize(size int) NodeOption

WithSetMaxMessageSize 设置最大消息大小 参数:

  • size: 要设置的最大消息大小

返回值:

  • NodeOption: 返回一个配置函数

func WithSetMaxPendingConns

func WithSetMaxPendingConns(n int) NodeOption

WithSetMaxPendingConns 设置最大待处理连接数 参数:

  • n: 要设置的最大待处理连接数

返回值:

  • NodeOption: 返回一个配置函数

func WithSetMaxTransmissionSize

func WithSetMaxTransmissionSize(size int) NodeOption

WithSetMaxTransmissionSize 设置最大传输大小 参数:

  • size: 要设置的最大传输大小

返回值:

  • NodeOption: 返回一个配置函数

func WithSetPubSubMode

func WithSetPubSubMode(mode PubSubType) NodeOption

WithSetPubSubMode 设置发布订阅模式 参数:

  • mode: 要设置的发布订阅模式

返回值:

  • NodeOption: 返回一个配置函数

func WithSetSignMessages

func WithSetSignMessages(sign bool) NodeOption

WithSetSignMessages 设置是否签名消息 参数:

  • sign: 是否签名消息

返回值:

  • NodeOption: 返回一个配置函数

func WithSetValidateMessages

func WithSetValidateMessages(validate bool) NodeOption

WithSetValidateMessages 设置是否验证消息 参数:

  • validate: 是否验证消息

返回值:

  • NodeOption: 返回一个配置函数

type NodePubSub

type NodePubSub struct {
	// contains filtered or unexported fields
}

NodePubSub 表示分布式存储网络的主要结构

func NewNodePubSub

func NewNodePubSub(ctx context.Context, host host.Host, opts ...NodeOption) (*NodePubSub, error)

NewNodePubSub 创建并返回一个新的 NodePubSub 实例 参数:

  • ctx: 上下文,用于控制PubSub实例的生命周期
  • host: libp2p主机,代表当前节点
  • opts: 节点选项,用于自定义PubSub的行为

返回:

  • *NodePubSub: 新创建的NodePubSub实例
  • error: 如果创建过程中出现错误,返回相应的错误信息

func (*NodePubSub) BroadcastWithTopic

func (pubsub *NodePubSub) BroadcastWithTopic(topic string, data []byte) error

BroadcastWithTopic 将消息广播到给定主题 参数:

  • topic: 主题名称
  • data: 要广播的消息数据

返回:

  • error: 如果广播过程中出现错误,返回相应的错误信息

func (*NodePubSub) CancelPubsubWithTopic

func (pubsub *NodePubSub) CancelPubsubWithTopic(name string) error

CancelPubsubWithTopic 取消给定名字的订阅 参数:

  • name: 要取消的主题名称

返回:

  • error: 如果取消过程中出现错误,返回相应的错误信息

func (*NodePubSub) CancelSubscribeWithTopic

func (pubsub *NodePubSub) CancelSubscribeWithTopic(topic string) error

CancelSubscribeWithTopic 取消订阅给定主题 参数:

  • topic: 要取消订阅的主题名称

返回:

  • error: 如果取消订阅过程中出现错误,返回相应的错误信息

func (*NodePubSub) GetTopic

func (pubsub *NodePubSub) GetTopic(name string) (*Topic, error)

GetTopic 根据给定的名称获取一个 topic 参数:

  • name: 主题名称

返回:

  • *Topic: 获取或创建的主题实例
  • error: 如果获取或创建过程中出现错误,返回相应的错误信息

func (*NodePubSub) IsSubscribed

func (pubsub *NodePubSub) IsSubscribed(topic string) bool

IsSubscribed 检查给定的主题是否已经订阅 参数:

  • topic: 主题名称

返回:

  • bool: 如果主题已订阅返回true,否则返回false

func (*NodePubSub) ListPeers

func (pubsub *NodePubSub) ListPeers(topic string) []peer.ID

ListPeers 返回我们在给定主题中连接到的对等点列表 参数:

  • topic: 主题名称

返回:

  • []peer.ID: 与给定主题相关的对等点ID列表

func (*NodePubSub) NotifyNewPeer added in v0.0.4

func (pubsub *NodePubSub) NotifyNewPeer(peer peer.ID) error

NotifyNewPeer 通知系统有新的对等节点加入 参数:

  • peer: 新加入节点的ID

返回值:

  • error: 如果节点不满足要求则返回错误

func (*NodePubSub) Publish

func (pubsub *NodePubSub) Publish(topic string, data []byte) error

Publish 向 topic 发布一条消息 参数:

  • topic: 主题名称
  • data: 要发布的消息数据

返回:

  • error: 如果发布过程中出现错误,返回相应的错误信息

func (*NodePubSub) Pubsub

func (pubsub *NodePubSub) Pubsub() *PubSub

Pubsub 返回 PubSub 实例 返回:

  • *PubSub: 当前NodePubSub实例使用的PubSub实例

func (*NodePubSub) Subscribe

func (pubsub *NodePubSub) Subscribe(topic string, subscribe bool) (*Subscription, error)

Subscribe 订阅一个 topic 参数:

  • topic: 主题名称
  • subscribe: 是否实际进行订阅操作

返回:

  • *Subscription: 如果subscribe为true,返回订阅实例;否则返回nil
  • error: 如果订阅过程中出现错误,返回相应的错误信息

func (*NodePubSub) SubscribeWithTopic

func (pubsub *NodePubSub) SubscribeWithTopic(topic string, handler PubSubMsgHandler, subscribe bool) error

SubscribeWithTopic 订阅给定主题,并使用给定的订阅消息处理函数 参数:

  • topic: 要订阅的主题名称
  • handler: 用于处理接收到的消息的函数
  • subscribe: 是否实际进行订阅操作

返回:

  • error: 如果订阅过程中出现错误,返回相应的错误信息

type NodeStatus

type NodeStatus int

NodeStatus 表示节点的状态

const (
	Online     NodeStatus = iota // 节点在线
	Suspicious                   // 节点可疑
	Offline                      // 节点离线
)

type NodeStatusTracker

type NodeStatusTracker struct {
	// contains filtered or unexported fields
}

NodeStatusTracker 用于跟踪和管理节点状态

func NewNodeStatusTracker

func NewNodeStatusTracker(h host.Host, defaultCheckInterval time.Duration, offlineThreshold int, pingTimeout time.Duration) *NodeStatusTracker

NewNodeStatusTracker 创建一个新的 NodeStatusTracker 实例 参数:

  • h: libp2p主机
  • defaultCheckInterval: 默认检查间隔
  • offlineThreshold: 离线阈值
  • pingTimeout: ping超时时间

返回:

  • *NodeStatusTracker: 新创建的NodeStatusTracker实例

func (*NodeStatusTracker) Start

func (nst *NodeStatusTracker) Start()

Start 启动节点状态跟踪器

func (*NodeStatusTracker) Stop

func (nst *NodeStatusTracker) Stop()

Stop 停止节点状跟踪器

func (*NodeStatusTracker) SubscribeStatusChanges

func (nst *NodeStatusTracker) SubscribeStatusChanges() chan StatusChange

SubscribeStatusChanges 订阅状态变化事件 返回:

  • chan StatusChange: 状态变化通道

func (*NodeStatusTracker) UnsubscribeStatusChanges

func (nst *NodeStatusTracker) UnsubscribeStatusChanges(ch chan StatusChange)

UnsubscribeStatusChanges 取消订阅状态变化事件 参数:

  • ch: 要取消订阅的通道

func (*NodeStatusTracker) UpdateNodeStatus

func (nst *NodeStatusTracker) UpdateNodeStatus(pid peer.ID, status NodeStatus)

UpdateNodeStatus 允许外部组件更新节点状态 参数:

  • pid: 要更新的节点ID
  • status: 新的状态

type Option

type Option func(*PubSub) error

Option 是用于配置 PubSub 的选项函数类型

func WithAppSpecificRpcInspector

func WithAppSpecificRpcInspector(inspector func(peer.ID, *RPC) error) Option

WithAppSpecificRpcInspector 设置一个钩子,用于在处理传入的 RPC 之前检查它们。 检查器在处理已接受的 RPC 之前调用。如果检查器的错误为 nil,则按常规处理 RPC。否则,RPC 将被丢弃。 参数:

  • inspector: 检查函数。

返回值:

  • Option: 配置选项。

func WithBlacklist

func WithBlacklist(b Blacklist) Option

WithBlacklist 提供黑名单的实现;默认是 MapBlacklist。 参数:

  • b: 黑名单实现。

返回值:

  • Option: 配置选项。

func WithDefaultValidator

func WithDefaultValidator(val interface{}, opts ...ValidatorOpt) Option

WithDefaultValidator 添加一个默认验证器,适用于所有主题 参数:

  • val: interface{} 验证器
  • opts: ...ValidatorOpt 验证器选项

返回值:

  • Option 配置选项

func WithDirectConnectTicks

func WithDirectConnectTicks(t uint64) Option

WithDirectConnectTicks 是一个 gossipsub 路由器选项,用于设置尝试重新连接当前未连接的直接对等节点的心跳滴答次数。 参数:

  • t: uint64 类型,表示心跳滴答次数。

返回值:

  • Option: 返回一个 Option 类型的函数,用于配置 gossipsub 路由器。

func WithDirectPeers

func WithDirectPeers(pis []peer.AddrInfo) Option

WithDirectPeers 是一个 gossipsub 路由器选项,用于指定具有直接对等关系的对等节点。 参数:

  • pis: []peer.AddrInfo 类型,表示对等节点的信息列表。

返回值:

  • Option: 返回一个 Option 类型的函数,用于配置 gossipsub 路由器。

func WithDiscovery

func WithDiscovery(d discovery.Discovery, opts ...DiscoverOpt) Option

WithDiscovery 提供用于引导和提供 peers 到 PubSub 的发现机制。 参数:

  • d: 发现机制。
  • opts: 可选的发现配置。

返回值:

  • Option: 配置选项。

func WithEventTracer

func WithEventTracer(tracer EventTracer) Option

WithEventTracer 提供 pubsub 系统的事件追踪器。 参数:

  • tracer: 事件追踪器。

返回值:

  • Option: 配置选项。

func WithFloodPublish

func WithFloodPublish(floodPublish bool) Option

WithFloodPublish 是一个 gossipsub 路由器选项,用于启用洪水发布。 参数:

  • floodPublish: bool 类型,表示是否启用洪水发布。

返回值:

  • Option: 返回一个 Option 类型的函数,用于配置 gossipsub 路由器。

func WithGossipSubParams

func WithGossipSubParams(cfg GossipSubParams) Option

WithGossipSubParams 是一个 gossipsub 路由器选项,允许在实例化 gossipsub 路由器时设置自定义配置。 参数:

  • cfg: GossipSubParams 类型,表示 gossipsub 参数配置。

返回值:

  • Option: 返回一个 Option 类型的函数,用于配置 gossipsub 路由器。

func WithGossipSubProtocols

func WithGossipSubProtocols(protos []protocol.ID, feature GossipSubFeatureTest) Option

WithGossipSubProtocols 是一个 gossipsub 路由器选项,用于配置自定义协议列表和功能测试函数 参数:

  • protos: 协议列表
  • feature: 功能测试函数

返回值:

  • Option: 配置 gossipsub 协议和功能的选项函数

func WithMaxMessageSize

func WithMaxMessageSize(maxMessageSize int) Option

WithMaxMessageSize 设置 pubsub 消息的全局最大消息大小。默认值是 1MiB (DefaultMaxMessageSize)。 警告 #1:确保更改 floodsub (FloodSubID) 和 gossipsub (GossipSubID) 的默认协议前缀。 警告 #2:减少默认的最大消息限制是可以的,但要确保您的应用程序消息不会超过新的限制。 参数:

  • maxMessageSize: 最大消息大小。

返回值:

  • Option: 配置选项。

func WithMessageAuthor

func WithMessageAuthor(author peer.ID) Option

WithMessageAuthor 设置出站消息的作者为给定的 peer ID(默认为主机的 ID)。 如果启用了消息签名,则私钥必须在主机的 peerstore 中可用。 参数:

  • author: 消息作者的 ID。

返回值:

  • Option: 配置选项。

func WithMessageIdFn

func WithMessageIdFn(fn MsgIdFunction) Option

WithMessageIdFn 是一个选项,用于自定义为 pubsub 消息计算消息 ID 的方式。 默认的 ID 函数是 DefaultMsgIdFn(连接源和序列号),但它可以自定义为消息的哈希值。 参数:

  • fn: 自定义的消息 ID 函数。

返回值:

  • Option: 配置选项。

func WithMessageSignaturePolicy

func WithMessageSignaturePolicy(policy MessageSignaturePolicy) Option

WithMessageSignaturePolicy 设置生产和验证消息签名的操作模式。 参数:

  • policy: 签名策略。

返回值:

  • Option: 配置选项。

func WithMessageSigning

func WithMessageSigning(enabled bool) Option

WithMessageSigning 启用或禁用消息签名(默认启用)。 不推荐在没有消息签名或没有验证的情况下使用。 参数:

  • enabled: 是否启用消息签名。

返回值:

  • Option: 配置选项。

func WithNoAuthor

func WithNoAuthor() Option

WithNoAuthor 省略消息的作者和序列号数据,并禁用签名的使用。 不推荐与默认的消息 ID 函数一起使用,请参阅 WithMessageIdFn。 返回值:

  • Option: 配置选项。

func WithPeerExchange

func WithPeerExchange(doPX bool) Option

WithPeerExchange 是一个 gossipsub 路由器选项,用于在 PRUNE 上启用对等节点交换。 参数:

  • doPX: bool 类型,表示是否启用对等节点交换。

返回值:

  • Option: 返回一个 Option 类型的函数,用于配置 gossipsub 路由器。

func WithPeerFilter

func WithPeerFilter(filter PeerFilter) Option

WithPeerFilter 是一个选项,用于设置 pubsub peers 的过滤器。 默认的 peer 过滤器是 DefaultPeerFilter(总是返回 true),但它可以自定义为任何自定义实现。 参数:

  • filter: 自定义的 peer 过滤器。

返回值:

  • Option: 配置选项。

func WithPeerGater

func WithPeerGater(params *PeerGaterParams) Option

WithPeerGater 是一个 gossipsub 路由器选项,用于启用反应性验证队列管理 参数:

  • params: PeerGater 的参数

返回值:

  • Option: PubSub 的选项函数

func WithPeerOutboundQueueSize

func WithPeerOutboundQueueSize(size int) Option

WithPeerOutboundQueueSize 是一个选项,用于设置对 peer 的出站消息缓冲区大小。 当出站队列已满时,我们开始丢弃消息。 参数:

  • size: 出站消息队列的大小。

返回值:

  • Option: 配置选项。

func WithPeerScore

func WithPeerScore(params *PeerScoreParams, thresholds *PeerScoreThresholds) Option

WithPeerScore 是一个 gossipsub 路由器选项,用于启用对等节点评分。 参数:

  • params: *PeerScoreParams 类型,表示对等节点评分参数。
  • thresholds: *PeerScoreThresholds 类型,表示对等节点评分阈值。

返回值:

  • Option: 返回一个 Option 类型的函数,用于配置 gossipsub 路由器。

func WithPeerScoreInspect

func WithPeerScoreInspect(inspect interface{}, period time.Duration) Option

WithPeerScoreInspect 是一个 gossipsub 路由器选项,用于启用对等节点分数调试。 启用此选项后,将定期调用提供的函数,以便应用程序检查或转储已连接对等节点的分数。 提供的函数可以具有以下两种签名之一:

  • PeerScoreInspectFn,接受对等节点 ID 到分数的映射。
  • ExtendedPeerScoreInspectFn,接受对等节点 ID 到 PeerScoreSnapshots 的映射, 并允许检查单个分数组件以调试对等节点评分。

此选项必须在 WithPeerScore 选项之后传递。

func WithProtocolMatchFn

func WithProtocolMatchFn(m ProtocolMatchFn) Option

WithProtocolMatchFn 设置用于协议选择的自定义匹配函数。 参数:

  • m: 协议匹配函数。

返回值:

  • Option: 配置选项。

func WithRawTracer

func WithRawTracer(tracer RawTracer) Option

WithRawTracer 添加一个原始追踪器到 pubsub 系统。 可以使用多次调用选项添加多个追踪器。 参数:

  • tracer: 原始追踪器。

返回值:

  • Option: 配置选项。

func WithRetry

func WithRetry(retries int) Option

WithRetry 设置发送消息的重试次数 参数: - retries: int 表示重试次数

返回值:

  • Option: 配置选项。

func WithSeenMessagesStrategy

func WithSeenMessagesStrategy(strategy timecache.Strategy) Option

WithSeenMessagesStrategy 配置已看到消息缓存使用的查找/清理策略。 参数:

  • strategy: 策略。

返回值:

  • Option: 配置选项。

func WithSeenMessagesTTL

func WithSeenMessagesTTL(ttl time.Duration) Option

WithSeenMessagesTTL 配置之前看到的消息 ID 可以被遗忘的时间。 参数:

  • ttl: 生存时间。

返回值:

  • Option: 配置选项。

func WithStrictSignatureVerification

func WithStrictSignatureVerification(required bool) Option

WithStrictSignatureVerification 是一个选项,用于启用或禁用严格的消息签名验证。 当启用时(这是默认设置),未签名的消息将被丢弃。 不推荐在没有消息签名或没有验证的情况下使用。 参数:

  • required: 是否要求严格签名验证。

返回值:

  • Option: 配置选项。

func WithSubscriptionFilter

func WithSubscriptionFilter(subFilter SubscriptionFilter) Option

WithSubscriptionFilter 是一个 pubsub 选项,用于指定感兴趣主题的订阅过滤器。 参数: - subFilter: 要应用的 SubscriptionFilter 返回值: - Option: 一个设置 SubscriptionFilter 的选项

func WithTimeout

func WithTimeout(d time.Duration) Option

WithTimeout 设置等待回复的超时时间 参数: - d: time.Duration 表示超时时间

返回值:

  • Option: 配置选项。

func WithValidateQueueSize

func WithValidateQueueSize(n int) Option

WithValidateQueueSize 设置验证队列的大小,默认大小为 32 参数:

  • n: int 队列大小

返回值:

  • Option 配置选项

func WithValidateThrottle

func WithValidateThrottle(n int) Option

WithValidateThrottle 设置活动验证 goroutine 的上限,默认值为 8192 参数:

  • n: int 上限值

返回值:

  • Option 配置选项

func WithValidateWorkers

func WithValidateWorkers(n int) Option

WithValidateWorkers 设置同步验证工作线程的数量,默认值为 CPU 数量 参数:

  • n: int 线程数量

返回值:

  • Option 配置选项

type Options

type Options struct {
	FollowupTime        time.Duration   // 跟随时间,用于控制消息传播延迟
	GossipFactor        float64         // Gossip 因子,控制消息传播的概率
	D                   int             // GossipSub 主题网格的理想度数,每个节点维护的连接数
	Dlo                 int             // GossipSub 主题网格中保持的最少节点数,网格连接的下限
	MaxPendingConns     int             // 最大待处理连接数,限制并发连接请求数量
	MaxMessageSize      int             // 最大消息大小,限制单条消息的字节数
	SignMessages        bool            // 是否签名消息,控制消息的安全性
	ValidateMessages    bool            // 是否验证消息,控制消息的合法性检查
	DirectPeers         []peer.AddrInfo // 直连对等节点列表,保存需要直接连接的节点信息
	HeartbeatInterval   time.Duration   // 心跳间隔,控制节点存活检测的频率
	MaxTransmissionSize int             // 最大传输大小,限制单次传输的字节数
	LoadConfig          bool            // 是否加载配置选项,控制是否使用外部配置
	PubSubMode          PubSubType      // 发布订阅模式,指定使用的协议类型
	// contains filtered or unexported fields
}

Options 定义了 PubSub 的配置选项

func DefaultOptions

func DefaultOptions() *Options

DefaultOptions 返回一个带有默认配置的 Options 对象 返回值:

  • *Options: 包含默认配置的 Options 对象

func (*Options) ApplyOptions

func (opt *Options) ApplyOptions(opts ...NodeOption) error

ApplyOptions 应用给定的选项到 Options 对象 参数:

  • opts: 可变参数,包含多个 NodeOption 函数

返回值:

  • error: 如果应用选项时出现错误,返回相应的错误信息

func (*Options) GetD

func (o *Options) GetD() int

GetD 获取 GossipSub 主题网格的理想度数 返回值:

  • int: 当前设置的理想度数

func (*Options) GetDirectPeers

func (o *Options) GetDirectPeers() []peer.AddrInfo

GetDirectPeers 获取直连对等节点列表 返回值:

  • []peer.AddrInfo: 当前设置的直连对等节点列表

func (*Options) GetDlo

func (o *Options) GetDlo() int

GetDlo 获取 GossipSub 主题网格中保持的最少节点数 返回值:

  • int: 当前设置的最少节点数

func (*Options) GetFollowupTime

func (o *Options) GetFollowupTime() time.Duration

GetFollowupTime 获取跟随时间 返回值:

  • time.Duration: 当前设置的跟随时间

func (*Options) GetGossipFactor

func (o *Options) GetGossipFactor() float64

GetGossipFactor 获取Gossip因子 返回值:

  • float64: 当前设置的Gossip因子

func (*Options) GetHeartbeatInterval

func (o *Options) GetHeartbeatInterval() time.Duration

GetHeartbeatInterval 获取心跳间隔 返回值:

  • time.Duration: 当前设置的心跳间隔

func (*Options) GetLoadConfig

func (o *Options) GetLoadConfig() bool

GetLoadConfig 获取是否加载配置选项 返回值:

  • bool: 当前是否设置为加载配置

func (*Options) GetMaxMessageSize

func (o *Options) GetMaxMessageSize() int

GetMaxMessageSize 获取最大消息大小 返回值:

  • int: 当前设置的最大消息大小

func (*Options) GetMaxPendingConns

func (o *Options) GetMaxPendingConns() int

GetMaxPendingConns 获取最大待处理连接数 返回值:

  • int: 当前设置的最大待处理连接数

func (*Options) GetMaxTransmissionSize

func (o *Options) GetMaxTransmissionSize() int

GetMaxTransmissionSize 获取最大传输大小 返回值:

  • int: 当前设置的最大传输大小

func (*Options) GetNodeDiscovery added in v0.0.3

func (opt *Options) GetNodeDiscovery() discovery.Discovery

GetNodeDiscovery 获取配置的 Discovery 服务 返回值:

  • discovery.Discovery: 当前配置的 Discovery 服务实例

func (*Options) GetPubSubMode

func (o *Options) GetPubSubMode() PubSubType

GetPubSubMode 获取发布订阅模式 返回值:

  • PubSubType: 当前设置的发布订阅模式

func (*Options) GetSignMessages

func (o *Options) GetSignMessages() bool

GetSignMessages 获取是否签名消息 返回值:

  • bool: 当前是否设置为签名消息

func (*Options) GetValidateMessages

func (o *Options) GetValidateMessages() bool

GetValidateMessages 获取是否验证消息 返回值:

  • bool: 当前是否设置为验证消息

type PBTracer

type PBTracer struct {
	// contains filtered or unexported fields
}

PBTracer 是一个将事件写入文件的追踪器,事件以 protobuf 格式编码。

func NewPBTracer

func NewPBTracer(file string) (*PBTracer, error)

NewPBTracer 创建一个新的 PBTracer,将追踪信息写入文件。 参数:

  • file: 文件路径

返回值:

  • *PBTracer: PBTracer 对象
  • error: 错误信息

func OpenPBTracer

func OpenPBTracer(file string, flags int, perm os.FileMode) (*PBTracer, error)

OpenPBTracer 创建一个新的 PBTracer,可以显式控制文件打开的标志和权限。 参数:

  • file: 文件路径
  • flags: 文件打开标志
  • perm: 文件权限

返回值:

  • *PBTracer: PBTracer 对象
  • error: 错误信息

func (*PBTracer) Close

func (t *PBTracer) Close()

Close 关闭追踪器

func (*PBTracer) Trace

func (t *PBTracer) Trace(evt *pb.TraceEvent)

Trace 向追踪器添加一个事件 参数:

  • evt: 要添加的事件

type PeerEvent

type PeerEvent struct {
	Type EventType // 事件类型
	Peer peer.ID   // 对等节点 ID
}

PeerEvent 表示对等节点事件。

type PeerFilter

type PeerFilter func(pid peer.ID, topic string) bool

PeerFilter 用于过滤 pubsub peers。对于给定的主题,它应该返回 true 表示接受。 参数:

  • pid: peer 的 ID。
  • topic: 主题。

返回值:

  • bool: 是否接受该 peer。

type PeerGaterParams

type PeerGaterParams struct {
	Threshold            float64            // 当被限流/验证消息的比例超过此阈值时,启用 gater
	GlobalDecay          float64            // 全局计数器衰减参数
	SourceDecay          float64            // 每 IP 计数器衰减参数
	DecayInterval        time.Duration      // 衰减间隔
	DecayToZero          float64            // 计数器归零阈值
	RetainStats          time.Duration      // 保留统计数据的时间
	Quiet                time.Duration      // 安静期时间
	DuplicateWeight      float64            // 重复消息的权重
	IgnoreWeight         float64            // 忽略消息的权重
	RejectWeight         float64            // 拒绝消息的权重
	TopicDeliveryWeights map[string]float64 // 优先主题的传递权重
}

PeerGaterParams 包含控制 peer gater 操作的参数

func DefaultPeerGaterParams

func DefaultPeerGaterParams() *PeerGaterParams

DefaultPeerGaterParams 创建使用默认值的 PeerGaterParams 结构 返回值:

  • *PeerGaterParams: 返回使用默认值的参数结构

func NewPeerGaterParams

func NewPeerGaterParams(threshold, globalDecay, sourceDecay float64) *PeerGaterParams

NewPeerGaterParams 创建新的 PeerGaterParams 结构,使用指定的阈值和衰减参数以及默认值 参数:

  • threshold: 门控阈值
  • globalDecay: 全局衰减参数
  • sourceDecay: 来源衰减参数

返回值:

  • *PeerGaterParams: 返回新创建的参数结构

func (*PeerGaterParams) WithTopicDeliveryWeights

func (p *PeerGaterParams) WithTopicDeliveryWeights(w map[string]float64) *PeerGaterParams

WithTopicDeliveryWeights 设置优先主题的传递权重 参数:

  • w: 主题权重的映射

返回值:

  • *PeerGaterParams: 返回更新后的参数

type PeerMetadataStore

type PeerMetadataStore interface {
	// Get 获取与对等节点关联的元数据;
	// 如果没有与对等节点关联的元数据,应返回 nil,而不是错误。
	Get(context.Context, peer.ID) ([]byte, error)
	// Put 设置与对等节点关联的元数据。
	Put(context.Context, peer.ID, []byte) error
}

PeerMetadataStore 是一个接口,用于存储和检索每个对等节点的元数据

type PeerScoreInspectFn

type PeerScoreInspectFn = func(map[peer.ID]float64)

PeerScoreInspectFn 定义对等节点分数检查函数类型

type PeerScoreParams

type PeerScoreParams struct {
	SkipAtomicValidation        bool                         // 是否允许仅设置某些参数而不是所有参数
	Topics                      map[string]*TopicScoreParams // 每个主题的分数参数
	TopicScoreCap               float64                      // 主题分数上限
	AppSpecificScore            func(p peer.ID) float64      // 应用程序特定的对等节点分数
	AppSpecificWeight           float64                      // 应用程序特定分数的权重
	IPColocationFactorWeight    float64                      // IP 同位因素的权重
	IPColocationFactorThreshold int                          // IP 同位因素阈值
	IPColocationFactorWhitelist []*net.IPNet                 // IP 同位因素白名单
	BehaviourPenaltyWeight      float64                      // 行为模式处罚的权重
	BehaviourPenaltyThreshold   float64                      // 行为模式处罚的阈值
	BehaviourPenaltyDecay       float64                      // 行为模式处罚的衰减
	DecayInterval               time.Duration                // 参数计数器的衰减间隔
	DecayToZero                 float64                      // 计数器值低于该值时被视为 0
	RetainScore                 time.Duration                // 断开连接的对等节点记住计数器的时间
	SeenMsgTTL                  time.Duration                // 记住消息传递时间
}

PeerScoreParams 包含用于控制对等节点分数的参数

type PeerScoreSnapshot

type PeerScoreSnapshot struct {
	Score              float64                        // 对等节点分数
	Topics             map[string]*TopicScoreSnapshot // 每个主题的分数快照
	AppSpecificScore   float64                        // 应用程序特定的分数
	IPColocationFactor float64                        // IP 同位因素
	BehaviourPenalty   float64                        // 行为模式处罚
}

PeerScoreSnapshot 包含对等节点分数快照

type PeerScoreThresholds

type PeerScoreThresholds struct {
	SkipAtomicValidation        bool    // 是否允许仅设置某些参数而不是所有参数
	GossipThreshold             float64 // 低于该分数时抑制 Gossip 传播,应为负数
	PublishThreshold            float64 // 低于该分数时不应发布消息,应为负数且 <= GossipThreshold
	GraylistThreshold           float64 // 低于该分数时完全抑制消息处理,应为负数且 <= PublishThreshold
	AcceptPXThreshold           float64 // 低于该分数时将忽略 PX,应为正数,限于启动器和其他可信节点
	OpportunisticGraftThreshold float64 // 低于该分数时触发机会性 grafting,应为正数且值小
}

PeerScoreThresholds 包含用于控制对等节点分数的参数

type ProtocolMatchFn

type ProtocolMatchFn = func(protocol.ID) func(protocol.ID) bool

ProtocolMatchFn 是一个函数类型,用于匹配协议ID

type ProvideKey

type ProvideKey func() (crypto.PrivKey, peer.ID)

ProvideKey 是一个函数,在发布新消息时提供私钥及其关联的对等节点 ID。

type PubOpt

type PubOpt func(pub *PublishOptions) error

PubOpt 定义发布选项的类型。

func WithLocalPublication

func WithLocalPublication(local bool) PubOpt

WithLocalPublication 返回一个发布选项,仅通知进程内的订阅者。 它阻止消息发布到网状对等节点。 参数: - local: bool 类型的标志,指示是否仅在本地发布消息。 返回值: - PubOpt: 返回一个发布选项函数,用于设置 PublishOptions 中的 local 字段。

func WithMessageMetadata

func WithMessageMetadata(messageID string, msgType pb.MessageMetadata_MessageType) PubOpt

WithMessageMetadata 设置消息的元信息。 参数: - messageID: string 类型,表示消息ID。 - msgType: MessageType 类型,表示消息的类型(请求或响应)。 返回值: - PubOpt: 返回一个发布选项函数,用于设置 PublishOptions 中的消息元信息。

func WithReadiness

func WithReadiness(ready RouterReady) PubOpt

WithReadiness 返回一个发布选项,仅在路由器准备好时发布。 此选项仅在 PubSub 也使用 WithDiscovery 时有用。 参数: - ready: RouterReady 类型的回调函数,当路由器准备好时被调用。 返回值: - PubOpt: 返回一个发布选项函数,用于设置 PublishOptions 中的 ready 字段。

func WithSecretKeyAndPeerId

func WithSecretKeyAndPeerId(key crypto.PrivKey, pid peer.ID) PubOpt

WithSecretKeyAndPeerId 返回一个发布选项,用于提供自定义私钥及其对应的对等节点 ID。 这个选项在我们希望从网络中的"虚拟"不可连接的对等节点发送消息时非常有用。 参数: - key: crypto.PrivKey 类型,自定义私钥。 - pid: peer.ID 类型,对应的对等节点 ID。 返回值: - PubOpt: 返回一个发布选项函数,用于设置 PublishOptions 中的 customKey 字段。

func WithTargetMap

func WithTargetMap(targets []peer.ID) PubOpt

WithTargetMap 设置目标节点列表。 参数: - targets: 目标节点的列表,类型为 []peer.ID。 返回值: - PubOpt: 返回一个发布选项函数,用于设置 PublishOptions 中的 targetMap 字段。

type PubSub

type PubSub struct {
	// contains filtered or unexported fields
}

PubSub 实现了发布-订阅系统。

func NewFloodSub

func NewFloodSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error)

NewFloodSub 返回一个使用FloodSubRouter的新PubSub对象 参数:

  • ctx: 上下文
  • h: 主机
  • opts: 选项

返回值:

  • *PubSub: 创建的PubSub实例
  • error: 如果创建失败,返回错误

func NewFloodsubWithProtocols

func NewFloodsubWithProtocols(ctx context.Context, h host.Host, ps []protocol.ID, opts ...Option) (*PubSub, error)

NewFloodsubWithProtocols 返回一个新的启用floodsub的PubSub对象,使用指定的协议 参数:

  • ctx: 上下文
  • h: 主机
  • ps: 协议列表
  • opts: 选项

返回值:

  • *PubSub: 创建的PubSub实例
  • error: 如果创建失败,返回错误

func NewGossipSub

func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error)

NewGossipSub 返回一个新的使用默认 GossipSubRouter 作为路由器的 PubSub 对象。 参数:

  • ctx: 上下文
  • h: 主机
  • opts: 选项

返回值:

  • *PubSub: PubSub 对象
  • error: 错误信息

func NewGossipSubWithRouter

func NewGossipSubWithRouter(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option) (*PubSub, error)

NewGossipSubWithRouter 返回一个使用给定路由器的新的 PubSub 对象。 参数:

  • ctx: 上下文
  • h: 主机
  • rt: 路由器
  • opts: 选项

返回值:

  • *PubSub: PubSub 对象
  • error: 错误信息

func NewPubSub

func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option) (*PubSub, error)

NewPubSub 返回一个新的 PubSub 管理对象。 参数:

  • ctx: 用于控制 PubSub 生命周期的上下文。
  • h: libp2p 主机。
  • rt: PubSub 路由器。
  • opts: 可选配置项。

返回值:

  • *PubSub: 新的 PubSub 对象。
  • error: 如果有错误发生,返回错误。

func NewRandomSub

func NewRandomSub(ctx context.Context, h host.Host, size int, opts ...Option) (*PubSub, error)

NewRandomSub 返回一个使用 RandomSubRouter 作为路由器的新 PubSub 对象 参数:

  • ctx: 上下文
  • h: 主机
  • size: 网络大小
  • opts: 选项

返回值:

  • *PubSub: PubSub 对象
  • error: 错误

func (*PubSub) BlacklistPeer

func (p *PubSub) BlacklistPeer(pid peer.ID)

BlacklistPeer 将一个对等节点列入黑名单;所有来自此对等节点的消息将无条件丢弃。

func (*PubSub) GetTopics

func (p *PubSub) GetTopics() []string

GetTopics 返回此节点订阅的主题。

func (*PubSub) Join

func (p *PubSub) Join(topic string, opts ...TopicOpt) (*Topic, error)

Join 加入主题并返回 Topic 句柄。 每个主题应该只有一个 Topic 句柄,如果主题句柄已存在,Join 将返回错误。 参数:

  • topic: 主题名称
  • opts: 主题选项

返回值:

  • *Topic: 主题句柄
  • error: 如果发生错误,返回错误

func (*PubSub) ListPeers

func (p *PubSub) ListPeers(topic string) []peer.ID

ListPeers 返回我们在给定主题中连接的对等节点列表。

func (*PubSub) NotifyNewPeer added in v0.0.4

func (ps *PubSub) NotifyNewPeer(peer peer.ID) error

NotifyNewPeer 通知系统有新的对等节点加入 参数:

  • peer: 新加入节点的ID

返回值:

  • error: 如果节点不满足要求则返回错误

func (*PubSub) RegisterTopicValidator

func (p *PubSub) RegisterTopicValidator(topic string, val interface{}, opts ...ValidatorOpt) error

RegisterTopicValidator 为主题注册一个验证器。 默认情况下,验证器是异步的,这意味着它们将在单独的 goroutine 中运行。 活动 goroutine 的数量由全局和每个主题验证器的节流控制;如果超过节流阈值,消息将被丢弃。

func (*PubSub) Subscribe

func (p *PubSub) Subscribe(topic string, opts ...SubOpt) (*Subscription, error)

Subscribe 返回给定主题的新订阅。 请注意,订阅不是即时操作。可能需要一些时间,订阅才能被 pubsub 主循环处理并传播给我们的对等节点。

已弃用:请使用 pubsub.Join() 和 topic.Subscribe()

func (*PubSub) UnregisterTopicValidator

func (p *PubSub) UnregisterTopicValidator(topic string) error

UnregisterTopicValidator 从主题中移除一个验证器。 如果没有验证器注册到该主题,则返回错误。

type PubSubMsgHandler

type PubSubMsgHandler func(*Message)

PubSubMsgHandler 定义了处理其他节点发布消息的函数类型 参数:

  • *Message: 接收到的消息对象

type PubSubRouter

type PubSubRouter interface {
	// Protocols 返回路由器支持的协议列表
	Protocols() []protocol.ID
	// Attach 被 PubSub 构造函数调用以将路由器附加到一个新初始化的 PubSub 实例
	Attach(*PubSub)
	// AddPeer 通知路由器有新的 peer 已连接
	AddPeer(peer.ID, protocol.ID)
	// RemovePeer 通知路由器有 peer 已断开
	RemovePeer(peer.ID)
	// EnoughPeers 返回路由器是否需要更多的 peers 才能准备好发布新记录
	// 参数:
	//   - topic: 主题
	//   - suggested: 建议的 peer 数量(如果大于 0)
	EnoughPeers(topic string, suggested int) bool
	// AcceptFrom 在将消息推送到验证管道或处理控制信息之前调用,用于判断是否接受来自指定 peer 的消息
	// 参数:
	//   - peer.ID: 发送消息的 peer ID
	// 返回值:
	//   - AcceptStatus: 接受状态(全部接受、仅接受控制消息或不接受)
	AcceptFrom(peer.ID) AcceptStatus
	// HandleRPC 处理 RPC 包裹中的控制消息
	// 参数:
	//   - rpc: 要处理的 RPC
	HandleRPC(*RPC)
	// Publish 转发已验证的新消息
	// 参数:
	//   - msg: 要转发的消息
	Publish(*Message)
	// Join 通知路由器我们要接收和转发主题中的消息
	// 参数:
	//   - topic: 要加入的主题
	Join(topic string)
	// Leave 通知路由器我们不再对主题感兴趣
	// 参数:
	//   - topic: 要离开的主题
	Leave(topic string)
}

PubSubRouter 是 PubSub 的消息路由组件

type PubSubType

type PubSubType int

PubSubType 定义发布订阅的类型

const (
	GossipSub PubSubType = iota // GossipSub 类型,基于 gossip 协议的发布订阅
	FloodSub                    // FloodSub 类型,基于洪泛的发布订阅
	RandomSub                   // RandomSub 类型,基于随机选择的发布订阅
)

type PublishOptions

type PublishOptions struct {
	// contains filtered or unexported fields
}

PublishOptions 表示发布选项。

type RPC

type RPC struct {
	pb.RPC
	// contains filtered or unexported fields
}

RPC 表示一个 RPC 消息

type RandomSubRouter

type RandomSubRouter struct {
	// contains filtered or unexported fields
}

RandomSubRouter 是一个实现随机传播策略的路由器。 对于每条消息,它选择网络大小平方根个对等节点,最少为 RandomSubD,并将消息转发给它们。

func (*RandomSubRouter) AcceptFrom

func (rs *RandomSubRouter) AcceptFrom(peer.ID) AcceptStatus

AcceptFrom 在处理控制信息或将消息推送到验证管道之前,对每个传入消息调用此方法 参数:

  • peer.ID: 对等节点 ID

返回值:

  • AcceptStatus: 接受状态

func (*RandomSubRouter) AddPeer

func (rs *RandomSubRouter) AddPeer(p peer.ID, proto protocol.ID)

AddPeer 通知路由器一个新的对等节点已经连接 参数:

  • p: 对等节点 ID
  • proto: 协议 ID

func (*RandomSubRouter) Attach

func (rs *RandomSubRouter) Attach(p *PubSub)

Attach 将路由器附加到一个初始化的 PubSub 实例 参数:

  • p: PubSub 对象

func (*RandomSubRouter) EnoughPeers

func (rs *RandomSubRouter) EnoughPeers(topic string, suggested int) bool

EnoughPeers 返回路由器是否需要更多对等节点才能准备好发布新记录 参数:

  • topic: 主题
  • suggested: 建议的对等节点数

返回值:

  • bool: 是否有足够的对等节点

func (*RandomSubRouter) HandleRPC

func (rs *RandomSubRouter) HandleRPC(rpc *RPC)

HandleRPC 处理控制消息 参数:

  • rpc: RPC 对象

func (*RandomSubRouter) Join

func (rs *RandomSubRouter) Join(topic string)

Join 通知路由器我们想要接收和转发主题中的消息 参数:

  • topic: 主题

func (*RandomSubRouter) Leave

func (rs *RandomSubRouter) Leave(topic string)

Leave 通知路由器我们不再对主题感兴趣 参数:

  • topic: 主题

func (*RandomSubRouter) Protocols

func (rs *RandomSubRouter) Protocols() []protocol.ID

Protocols 返回路由器支持的协议列表 返回值:

  • []protocol.ID: 协议列表

func (*RandomSubRouter) Publish

func (rs *RandomSubRouter) Publish(msg *Message)

Publish 发布一条已验证的新消息 参数:

  • msg: 消息对象

func (*RandomSubRouter) RemovePeer

func (rs *RandomSubRouter) RemovePeer(p peer.ID)

RemovePeer 通知路由器一个对等节点已经断开连接 参数:

  • p: 对等节点 ID

type RawTracer

type RawTracer interface {
	// AddPeer 当一个新对等节点被添加时调用。
	AddPeer(p peer.ID, proto protocol.ID)
	// RemovePeer 当一个对等节点被移除时调用。
	RemovePeer(p peer.ID)
	// Join 当加入一个新主题时调用。
	Join(topic string)
	// Leave 当放弃一个主题时调用。
	Leave(topic string)
	// Graft 当一个新对等节点被添加到网格时调用(gossipsub)。
	Graft(p peer.ID, topic string)
	// Prune 当一个对等节点被从网格中移除时调用(gossipsub)。
	Prune(p peer.ID, topic string)
	// ValidateMessage 当消息首次进入验证管道时调用。
	ValidateMessage(msg *Message)
	// DeliverMessage 当消息被传递时调用。
	DeliverMessage(msg *Message)
	// RejectMessage 当消息被拒绝或忽略时调用。
	// 参数 reason 是一个命名字符串 Reject*。
	RejectMessage(msg *Message, reason string)
	// DuplicateMessage 当重复消息被丢弃时调用。
	DuplicateMessage(msg *Message)
	// ThrottlePeer 当一个对等节点被对等节点限制器限制时调用。
	ThrottlePeer(p peer.ID)
	// RecvRPC 当接收到一个传入的 RPC 时调用。
	RecvRPC(rpc *RPC)
	// SendRPC 当发送一个 RPC 时调用。
	SendRPC(rpc *RPC, p peer.ID)
	// DropRPC 当一个出站 RPC 被丢弃时调用,通常是因为队列已满。
	DropRPC(rpc *RPC, p peer.ID)
	// UndeliverableMessage 当 Subscribe 的消费者未能足够快地读取消息且压力释放机制触发丢弃消息时调用。
	UndeliverableMessage(msg *Message)
}

RawTracer 是一个低级追踪接口,允许应用程序追踪 pubsub 子系统的内部操作。 请注意,追踪器是同步调用的,这意味着应用程序的追踪器必须注意不要阻塞或修改参数。 警告:此接口不是固定的,可能会根据系统需求添加新方法。

type RelayCancelFunc

type RelayCancelFunc func()

RelayCancelFunc 中继取消函数类型

type RemoteTracer

type RemoteTracer struct {
	// contains filtered or unexported fields
}

RemoteTracer 是一个将追踪事件发送到远程对等节点的追踪器

func NewRemoteTracer

func NewRemoteTracer(ctx context.Context, host host.Host, pi peer.AddrInfo) (*RemoteTracer, error)

NewRemoteTracer 构建一个 RemoteTracer,将追踪信息发送到由 pi 标识的对等节点。 参数:

  • ctx: 上下文,用于控制生命周期和取消操作
  • host: 本地主机,表示当前节点
  • pi: 远程对等节点的地址信息,包括节点ID和地址

返回值:

  • *RemoteTracer: 新创建的 RemoteTracer 对象
  • error: 如果发生错误,返回错误信息

func (*RemoteTracer) Close

func (t *RemoteTracer) Close()

Close 关闭追踪器

func (*RemoteTracer) Trace

func (t *RemoteTracer) Trace(evt *pb.TraceEvent)

Trace 向追踪器添加一个事件 参数:

  • evt: 要添加的事件

type RouterReady

type RouterReady func(rt PubSubRouter, topic string) (bool, error)

RouterReady 是一个函数,用于决定路由器是否准备好发布。

func MinTopicSize

func MinTopicSize(size int) RouterReady

MinTopicSize 返回一个函数,该函数根据主题大小检查路由器是否准备好发布。 参数:

  • size: 建议的主题大小(对等节点数)

返回值:

  • RouterReady: 一个函数类型,接收路由器和主题名称,返回布尔值和错误

type StatusChange

type StatusChange struct {
	PeerID            peer.ID       // 节点ID
	OldStatus         NodeStatus    // 旧状态
	NewStatus         NodeStatus    // 新状态
	Timestamp         time.Time     // 状态变化时间戳
	Score             float64       // 节点评分
	ConnectionQuality float64       // 连接质量
	CheckInterval     time.Duration // 检查间隔
	FailedAttempts    int           // 连续失败尝试次数
	NetworkLatency    time.Duration // 网络延迟
}

StatusChange 表示节点状态的变化

type SubOpt

type SubOpt func(sub *Subscription) error

SubOpt 订阅选项函数类型

func WithBufferSize

func WithBufferSize(size int) SubOpt

WithBufferSize 是一个订阅选项,用于自定义订阅输出缓冲区的大小。 默认长度为 32,但可以配置以避免消费者读取速度不够快时丢失消息。

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

Subscription 处理特定主题订阅的详细信息。 对于给定的主题可能有多个订阅。

func (*Subscription) Cancel

func (sub *Subscription) Cancel()

Cancel 关闭订阅。如果这是最后一个活动订阅,那么 pubsub 将向网络发送取消订阅公告。

func (*Subscription) Next

func (sub *Subscription) Next(ctx context.Context) (*Message, error)

Next 返回订阅中的下一条消息。 参数: - ctx: context.Context 上下文,用于取消操作 返回值: - *Message: 下一条消息,如果有的话 - error: 错误信息,如果有的话

func (*Subscription) Topic

func (sub *Subscription) Topic() string

Topic 返回与订阅关联的主题字符串。 返回值: - string: 订阅的主题字符串

type SubscriptionFilter

type SubscriptionFilter interface {
	// CanSubscribe 返回 true 如果主题是感兴趣的并且我们可以订阅它
	CanSubscribe(topic string) bool

	// FilterIncomingSubscriptions 用于所有包含订阅通知的 RPC。
	// 它应仅过滤感兴趣的订阅,并且如果订阅过多(例如)可能会返回错误。
	// 参数:
	// - from: 订阅来源的 peer.ID
	// - subs: 包含订阅通知的 RPC_SubOpts 列表
	// 返回值:
	// - []*pb.RPC_SubOpts: 过滤后的订阅列表
	// - error: 错误信息,如果有的话
	FilterIncomingSubscriptions(from peer.ID, subs []*pb.RPC_SubOpts) ([]*pb.RPC_SubOpts, error)
}

SubscriptionFilter 是一个函数,用于告诉我们是否有兴趣允许和跟踪给定主题的订阅。

每当收到另一个对等点的订阅通知时,都会咨询过滤器; 如果过滤器返回 false,则忽略通知。

当加入主题时,也会咨询过滤器;如果过滤器返回 false,则 Join 操作将导致错误。

func NewAllowlistSubscriptionFilter

func NewAllowlistSubscriptionFilter(topics ...string) SubscriptionFilter

NewAllowlistSubscriptionFilter 创建一个订阅过滤器,该过滤器仅允许显式指定的主题用于本地订阅和传入对等订阅。 参数: - topics: 允许订阅的主题列表 返回值: - SubscriptionFilter: 一个新的允许列表订阅过滤器

func NewRegexpSubscriptionFilter

func NewRegexpSubscriptionFilter(rx *regexp.Regexp) SubscriptionFilter

NewRegexpSubscriptionFilter 创建一个订阅过滤器,该过滤器仅允许与正则表达式匹配的主题用于本地订阅和传入对等订阅。 参数: - rx: 用于匹配主题的正则表达式 返回值: - SubscriptionFilter: 一个新的正则表达式订阅过滤器

func WrapLimitSubscriptionFilter

func WrapLimitSubscriptionFilter(filter SubscriptionFilter, limit int) SubscriptionFilter

WrapLimitSubscriptionFilter 包装一个订阅过滤器,在 RPC 消息中允许的订阅数量有硬限制。 参数: - filter: 内部使用的 SubscriptionFilter - limit: 订阅数量限制 返回值: - SubscriptionFilter: 包装后的订阅过滤器

type TimeCachedBlacklist

type TimeCachedBlacklist struct {
	// contains filtered or unexported fields
}

TimeCachedBlacklist 是一种使用时间缓存实现的黑名单

func (*TimeCachedBlacklist) Add

func (b *TimeCachedBlacklist) Add(p peer.ID) bool

Add 将节点添加到 TimeCachedBlacklist 中 参数:

  • p: 需要添加到黑名单的节点ID

返回值:

  • bool: 添加操作是否成功

func (*TimeCachedBlacklist) Contains

func (b *TimeCachedBlacklist) Contains(p peer.ID) bool

Contains 检查节点是否在 TimeCachedBlacklist 中 参数:

  • p: 需要检查的节点ID

返回值:

  • bool: 节点是否在黑名单中

type Topic

type Topic struct {
	// contains filtered or unexported fields
}

Topic 表示 pubsub 主题的句柄。

func (*Topic) Close

func (t *Topic) Close() error

Close 关闭主题。返回错误,除非没有活动的事件处理程序或订阅。 如果主题已经关闭,则不会返回错误。 返回值: - error: 错误信息,如果有的话。

func (*Topic) EventHandler

func (t *Topic) EventHandler(opts ...TopicEventHandlerOpt) (*TopicEventHandler, error)

EventHandler 创建特定主题事件的句柄。 参数: - opts: ...TopicEventHandlerOpt 事件处理程序选项 返回值: - *TopicEventHandler: 创建的事件处理程序 - error: 错误信息,如果有的话

func (*Topic) ListPeers

func (t *Topic) ListPeers() []peer.ID

ListPeers 返回我们在给定主题中连接的对等节点列表。 返回值: - []peer.ID: 对等节点列表。

func (*Topic) Publish

func (t *Topic) Publish(ctx context.Context, data []byte, opts ...PubOpt) error

Publish 发布数据到主题。 注意,如果是响应消息,需要为 opts ...PubOpt 设置消息元数据 WithMessageMetadata(msgID, pb.MessageMetadata_RESPONSE) 参数: - ctx: 上下文,用于控制发布操作 - data: 要发布的数据 - opts: 发布选项 返回值: - error: 错误信息,如果有的话

func (*Topic) PublishWithReply

func (t *Topic) PublishWithReply(ctx context.Context, data []byte, targetNodes ...peer.ID) ([]byte, error)

PublishWithReply 发送消息并等待响应

使用示例:

// 不指定目标节点
reply, err := topic.PublishWithReply(ctx, data)

// 指定一个目标节点
reply, err := topic.PublishWithReply(ctx, data, peerID1)

// 指定多个目标节点
reply, err := topic.PublishWithReply(ctx, data, peerID1, peerID2, peerID3)

参数: - ctx: context.Context 表示上下文,用于控制流程 - data: []byte 表示要发送的消息内容 - targetNodes: ...peer.ID 表示需要将消息发送到的目标节点列表(可选) 返回值: - []byte: 接收到的回复消息 - error: 如果出现错误,返回错误信息

func (*Topic) Relay

func (t *Topic) Relay() (RelayCancelFunc, error)

Relay 启用主题的消息中继并返回引用取消函数。 随后的调用增加引用计数器。 要完全禁用中继,必须取消所有引用。 返回值: - RelayCancelFunc: 取消中继的函数 - error: 错误信息,如果有的话

func (*Topic) SetScoreParams

func (t *Topic) SetScoreParams(p *TopicScoreParams) error

SetScoreParams 设置主题的评分参数,如果 pubsub 路由器支持对等评分。 参数: - p: *TopicScoreParams 评分参数 返回值: - error: 错误信息,如果有的话

func (*Topic) String

func (t *Topic) String() string

String 返回与 t 关联的主题。 返回值: - string: 主题名称

func (*Topic) Subscribe

func (t *Topic) Subscribe(opts ...SubOpt) (*Subscription, error)

Subscribe 返回该主题的新订阅。 注意订阅并不是瞬时操作。在 pubsub 主循环处理并传播给我们的对等节点之前,可能需要一些时间。 参数: - opts: ...SubOpt 订阅选项 返回值: - *Subscription: 创建的订阅 - error: 错误信息,如果有的话

type TopicEventHandler

type TopicEventHandler struct {
	// contains filtered or unexported fields
}

TopicEventHandler 用于管理特定主题事件。无需订阅即可接收事件。

func (*TopicEventHandler) Cancel

func (t *TopicEventHandler) Cancel()

Cancel 关闭主题事件处理程序。

func (*TopicEventHandler) NextPeerEvent

func (t *TopicEventHandler) NextPeerEvent(ctx context.Context) (PeerEvent, error)

NextPeerEvent 返回有关订阅对等节点的下一个事件。 保证:给定对等节点的 Peer Join 和 Peer Leave 事件将按顺序触发。 参数: - ctx: 上下文,用于控制操作。 返回值: - PeerEvent: 下一个对等节点事件。 - error: 错误信息,如果有的话。

type TopicEventHandlerOpt

type TopicEventHandlerOpt func(t *TopicEventHandler) error

TopicEventHandlerOpt 定义了一个用于设置 TopicEventHandler 选项的函数类型。

type TopicOpt

type TopicOpt func(t *Topic) error

TopicOpt 主题选项函数类型

func WithTopicMessageIdFn

func WithTopicMessageIdFn(msgId MsgIdFunction) TopicOpt

WithTopicMessageIdFn 设置自定义 MsgIdFunction 用于生成消息 ID 参数:

  • msgId: 消息 ID 函数

返回值:

  • TopicOpt: 主题选项函数

type TopicOptions

type TopicOptions struct{}

TopicOptions 主题选项结构体(占位符)

type TopicScoreParams

type TopicScoreParams struct {
	SkipAtomicValidation            bool          // 是否允许仅设置某些参数而不是所有参数
	TopicWeight                     float64       // 主题权重
	TimeInMeshWeight                float64       // 在 mesh 中的时间权重
	TimeInMeshQuantum               time.Duration // 在 mesh 中的时间量子
	TimeInMeshCap                   float64       // 在 mesh 中的时间上限
	FirstMessageDeliveriesWeight    float64       // 首次消息传递的权重
	FirstMessageDeliveriesDecay     float64       // 首次消息传递的衰减
	FirstMessageDeliveriesCap       float64       // 首次消息传递的上限
	MeshMessageDeliveriesWeight     float64       // mesh 消息传递的权重
	MeshMessageDeliveriesDecay      float64       // mesh 消息传递的衰减
	MeshMessageDeliveriesCap        float64       // mesh 消息传递的上限
	MeshMessageDeliveriesThreshold  float64       // mesh 消息传递的阈值
	MeshMessageDeliveriesWindow     time.Duration // mesh 消息传递的窗口
	MeshMessageDeliveriesActivation time.Duration // mesh 消息传递的激活时间
	MeshFailurePenaltyWeight        float64       // mesh 失败处罚的权重
	MeshFailurePenaltyDecay         float64       // mesh 失败处罚的衰减
	InvalidMessageDeliveriesWeight  float64       // 无效消息传递的权重
	InvalidMessageDeliveriesDecay   float64       // 无效消息传递的衰减
}

TopicScoreParams 包含用于控制主题分数的参数

type TopicScoreSnapshot

type TopicScoreSnapshot struct {
	TimeInMesh               time.Duration // 在 mesh 中的时间
	FirstMessageDeliveries   float64       // 首次消息传递
	MeshMessageDeliveries    float64       // mesh 消息传递
	InvalidMessageDeliveries float64       // 无效消息传递
}

TopicScoreSnapshot 包含主题分数快照

type ValidationError

type ValidationError struct {
	Reason string // 错误原因
}

ValidationError 表示消息验证失败时可能会发出的错误

func (ValidationError) Error

func (e ValidationError) Error() string

Error 返回验证错误的原因

type ValidationResult

type ValidationResult int

ValidationResult 表示扩展验证器的决策结果

type Validator

type Validator func(context.Context, peer.ID, *Message) bool

Validator 是一个验证消息的函数,返回二元决策:接受或拒绝

type ValidatorEx

type ValidatorEx func(context.Context, peer.ID, *Message) ValidationResult

ValidatorEx 是一个扩展的验证函数,返回枚举决策

func NewBasicSeqnoValidator

func NewBasicSeqnoValidator(meta PeerMetadataStore) ValidatorEx

NewBasicSeqnoValidator 构造一个使用给定 PeerMetadataStore 的 BasicSeqnoValidator。 参数:

  • meta: 用于存储对等节点元数据的接口实例

返回值:

  • ValidatorEx: 返回一个扩展验证器函数

type ValidatorOpt

type ValidatorOpt func(addVal *addValReq) error

ValidatorOpt 是 RegisterTopicValidator 的选项类型

func WithValidatorConcurrency

func WithValidatorConcurrency(n int) ValidatorOpt

WithValidatorConcurrency 是一个选项,用于设置主题验证器的节流大小,默认值为 1024 参数:

  • n: int 节流大小

返回值:

  • ValidatorOpt 验证器选项

func WithValidatorInline

func WithValidatorInline(inline bool) ValidatorOpt

WithValidatorInline 是一个选项,用于设置验证器是否内联执行 参数:

  • inline: bool 是否内联执行

返回值:

  • ValidatorOpt 验证器选项

func WithValidatorTimeout

func WithValidatorTimeout(timeout time.Duration) ValidatorOpt

WithValidatorTimeout 是一个选项,用于设置异步主题验证器的超时时间,默认无超时 参数:

  • timeout: time.Duration 超时时间

返回值:

  • ValidatorOpt 验证器选项

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL