downloads

package
v2.0.15 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 11, 2025 License: MIT Imports: 41 Imported by: 0

Documentation

Overview

package downloads_ 提供了文件下载相关的功能

package downloads_ 实现文件下载相关功能

Index

Constants

View Source
const (
	MaxSessions = 10 // 允许的最大并发会话数
)

Variables

AllowedTopics 定义了系统支持的所有主题列表

View Source
var (
	// StreamRequestSegmentProtocol 定义了请求数据片段的协议标识符
	StreamRequestSegmentProtocol = fmt.Sprintf("defs@stream/request/segment/%s", version)
)

Functions

func CreateDownloadFileRecord

func CreateDownloadFileRecord(
	db *badgerhold.Store,
	taskID string,
	fileID string,
	pubkeyHash []byte,
	firstKeyShare []byte,
	tempStorage string,
	fileMeta *pb.FileMeta,
	sliceTable map[int64]*pb.HashTable,
	status pb.DownloadStatus,
) (*pb.DownloadFileRecord, error)

CreateDownloadFileRecord 创建下载文件记录并保存到数据库 参数:

  • db: 数据库存储接口
  • taskID: 下载任务的唯一标识
  • fileID: 文件的唯一标识
  • pubkeyHash: 所有者的公钥哈希
  • firstKeyShare: 恢复密钥的第一个分片
  • tempStorage: 临时存储路径
  • fileMeta: 文件的元数据信息
  • sliceTable: 分片哈希表
  • status: 下载任务的初始状态

返回值:

  • *pb.DownloadFileRecord: 创建的下载文件记录对象
  • error: 如果创建或存储过程中发生错误则返回相应错误

功能:

  • 创建新的下载文件记录
  • 初始化文件和分片记录
  • 分批处理并保存分片记录
  • 使用事务确保数据一致性

func DecompressAndDecryptSegmentContent

func DecompressAndDecryptSegmentContent(shareOne, shareTwo []byte, compressedData []byte) ([]byte, error)

DecompressAndDecryptSegmentContent 解压并解密片段内容 参数:

  • shareOne: 第一个密钥分片
  • shareTwo: 第二个密钥分片
  • compressedData: 压缩并加密的数据内容

返回值:

  • []byte: 解压并解密后的原始数据
  • error: 解压或解密失败时返回错误信息

功能:

  • 对压缩的加密数据进行解压缩
  • 使用密钥分片恢复解密密钥
  • 使用AES-GCM模式解密数据
  • 返回解密后的原始数据

func GetDownloadProgressAndPending

func GetDownloadProgressAndPending(db *badgerhold.Store, taskID string) (*bitset.BitSet, map[int64]struct{}, error)

GetDownloadProgressAndPending 获取下载任务的进度和待下载片段信息 功能: 根据任务ID获取下载进度位图和待下载片段集合,用于跟踪下载状态

参数: - db: 数据库存储实例 - taskID: 下载任务ID

返回值: - *bitset.BitSet: 下载进度位图,每个位表示一个数据片段的下载状态(1表示已完成,0表示未完成) - map[int64]struct{}: 待下载片段的索引集合,key为片段索引 - error: 错误信息

func GetFileInfoResponse

func GetFileInfoResponse(db *database.DB, taskID string, fileID string, pubkeyHash []byte) (*pb.DownloadPubSubFileInfoResponse, error)

GetFileInfoResponse 根据文件ID获取文件信息响应 参数:

  • db: 数据库存储接口
  • taskID: 任务的唯一标识符
  • fileID: 文件的唯一标识符
  • pubkeyHash: 公钥哈希

返回值:

  • *pb.FileInfoResponse: 文件信息响应
  • error: 操作过程中可能发生的错误

功能:

  • 获取文件片段存储记录
  • 构建文件元数据
  • 生成文件信息响应

func GetInitialDownloadResponse

func GetInitialDownloadResponse(db *database.DB, taskID string, fileID string, pubkeyHash []byte) (*pb.DownloadPubSubFileInfoResponse, *pb.DownloadPubSubManifestResponse, error)

GetInitialDownloadResponse 根据文件ID获取首次下载响应 参数:

  • db: 数据库存储接口
  • taskID: 任务的唯一标识符
  • fileID: 文件的唯一标识符
  • pubkeyHash: 公钥哈希

返回值:

  • *pb.DownloadPubSubFileInfoResponse: 文件摘要响应
  • *pb.DownloadPubSubManifestResponse: 索引清单响应
  • error: 操作过程中可能发生的错误

功能:

  • 获取文件片段存储记录
  • 构建文件元数据
  • 生成文件信息响应
  • 生成索引清单响应

func GetListDownloadSegments

func GetListDownloadSegments(db *badgerhold.Store, taskID string) ([]*pb.DownloadSegmentRecord, error)

GetListDownloadSegments 获取下载任务的所有片段记录 功能: 根据任务ID获取所有下载片段的详细信息

参数: - db: 数据库存储实例 - taskID: 下载任务ID

返回值: - []*pb.DownloadSegmentRecord: 下载片段记录列表,包含每个片段的详细信息 - error: 错误信息

func GetManifestResponse

func GetManifestResponse(
	db *database.DB,
	taskID string,
	fileID string,
	pubkeyHash []byte,
	requestedSegmentIds []string,
) (*pb.DownloadPubSubManifestResponse, error)

GetManifestResponse 根据文件ID获取索引清单响应 参数:

  • db: 数据库存储接口
  • taskID: 任务的唯一标识符
  • fileID: 文件的唯一标识符
  • pubkeyHash: 公钥哈希
  • requestedSegmentIds: 请求下载的片段ID列表

返回值:

  • *pb.DownloadManifestResponse: 索引清单响应
  • error: 如果获取过程中发生错误则返回相应错误

功能:

  • 获取文件片段存储记录
  • 过滤请求的片段
  • 构建可用片段映射
  • 生成索引清单响应

func GetPendingSegments

func GetPendingSegments(db *badgerhold.Store, taskID string) ([]string, error)

GetPendingSegments 获取未完成的下载片段ID列表 功能: 获取指定任务中所有未完成下载的非校验片段ID

参数: - db: 数据库存储实例 - taskID: 下载任务ID

返回值: - []string: 未完成片段的ID列表 - error: 错误信息

func GetSegmentContent

func GetSegmentContent(
	db *database.DB,
	taskID string,
	fileID string,
	segmentId string,
	segmentIndex int64,
	pubkeyHash []byte,
	requestedSegmentIds []string,
) (*pb.SegmentContentResponse, error)

GetSegmentContent 根据文件ID、片段ID和片段索引获取单个文件片段的内容 参数:

  • db: 数据库存储接口
  • taskID: 任务的唯一标识符
  • fileID: 文件的唯一标识符
  • segmentId: 请求下载的片段ID
  • segmentIndex: 请求下载的片段索引
  • pubkeyHash: 公钥哈希
  • requestedSegmentIds: 请求下载的片段ID列表

返回值:

  • *pb.SegmentContentResponse: 包含请求片段内容的响应对象
  • error: 如果获取过程中发生错误则返回相应错误

功能:

  • 获取指定片段的存储记录
  • 构建可用片段映射
  • 生成片段内容响应

func GetSegmentStats

func GetSegmentStats(db *badgerhold.Store, taskID string) (*struct {
	DataSegments struct {
		Total     int
		Completed int
		Failed    map[int64]string
	}
	ParitySegments struct {
		Completed int
		Pending   map[int64]string
	}
}, error)

GetSegmentStats 获取下载任务的片段统计信息 功能: 统计下载任务中数据片段和校验片段的完成情况

参数: - db: 数据库存储实例 - taskID: 下载任务ID

返回值: - *struct{}: 包含数据片段和校验片段统计信息的结构体,包括总数、完成数、失败数等 - error: 错误信息

func GetSegmentStorageData

func GetSegmentStorageData(db *database.DB, hostID string, taskID string, fileID string, segmentID string) (*pb.SegmentContentResponse, error)

GetSegmentStorageData 获取文件片段的存储数据 功能: 根据给定的标识信息获取文件片段的完整内容和元数据

参数: - db: 数据库实例 - hostID: 主机ID - taskID: 任务ID - fileID: 文件ID - segmentID: 片段ID

返回值: - *pb.SegmentContentResponse: 包含片段内容和元数据的完整响应对象 - error: 错误信息

func GetSegmentsForRecovery

func GetSegmentsForRecovery(db *badgerhold.Store, taskID string) (map[int64]string, error)

GetSegmentsForRecovery 获取用于数据恢复的片段信息 功能: 根据下载失败的数据片段数量,选择合适的校验片段用于数据恢复

参数: - db: 数据库存储实例 - taskID: 下载任务ID

返回值: - map[int64]string: 需要下载的片段映射,key为片段索引,value为片段ID - error: 错误信息

func HandleDownloadContentRequestPubSub

func HandleDownloadContentRequestPubSub(
	ctx context.Context,
	opt *fscfg.Options,
	db *database.DB,
	fs afero.Afero,
	nps *pubsub.NodePubSub,
	download *DownloadManager,
	res *pubsub.Message,
)

HandleDownloadContentRequestPubSub 处理片段内容请求 参数:

  • ctx: 上下文对象
  • opt: 配置选项
  • db: 数据库实例
  • fs: 文件系统实例
  • nps: 发布订阅系统
  • download: 下载管理器
  • res: 接收到的消息

返回值: void

功能:

  • 解析片段内容请求数据
  • 获取请求的片段内容
  • 发送片段内容响应给请求节点

func HandleDownloadManifestRequestPubSub

func HandleDownloadManifestRequestPubSub(
	ctx context.Context,
	opt *fscfg.Options,
	db *database.DB,
	fs afero.Afero,
	nps *pubsub.NodePubSub,
	download *DownloadManager,
	res *pubsub.Message,
)

HandleDownloadManifestRequestPubSub 处理索引清单请求 参数:

  • ctx: 上下文对象
  • opt: 配置选项
  • db: 数据库实例
  • fs: 文件系统实例
  • nps: 发布订阅系统
  • download: 下载管理器
  • res: 接收到的消息

返回值: void

功能:

  • 解析索引清单请求数据
  • 获取请求的文件片段清单
  • 发送清单响应给请求节点

func HandleDownloadManifestResponsePubSub

func HandleDownloadManifestResponsePubSub(
	ctx context.Context,
	opt *fscfg.Options,
	nps *pubsub.NodePubSub,
	db *database.DB,
	fs afero.Afero,
	download *DownloadManager,
	res *pubsub.Message,
)

HandleDownloadManifestResponsePubSub 处理索引清单响应 参数:

  • ctx: 上下文,用于控制处理过程的生命周期
  • opt: 文件存储选项配置
  • nps: 发布订阅系统
  • db: 数据库存储实例
  • fs: 文件系统接口
  • download: 下载管理器实例
  • res: 接收到的流请求消息

功能:

  • 解析索引清单响应数据
  • 更新下载任务的节点和可用片段信息
  • 记录任务更新日志
  • 验证响应数据的有效性

func HandleFileInfoRequestPubSub

func HandleFileInfoRequestPubSub(
	ctx context.Context,
	opt *fscfg.Options,
	db *database.DB,
	fs afero.Afero,
	host host.Host,
	nps *pubsub.NodePubSub,
	download *DownloadManager,
	res *pubsub.Message,
)

HandleFileInfoRequestPubSub 处理文件信息请求 参数:

  • ctx: 上下文对象
  • opt: 配置选项
  • db: 数据库实例
  • fs: 文件系统实例
  • host: libp2p网络主机
  • nps: 发布订阅系统
  • download: 下载管理器
  • res: 接收到的消息

返回值: void

功能:

  • 解析并验证文件信息请求
  • 建立与请求节点的连接
  • 获取文件元数据和分片信息
  • 发送响应消息给请求节点

func InitializeDownloadManager

func InitializeDownloadManager(lc fx.Lifecycle, input InitializeDownloadManagerInput) error

InitializeDownloadManager 初始化 DownloadManager 并设置相关的生命周期钩子 功能: 初始化下载管理器并设置其生命周期钩子函数 参数:

  • lc: 生命周期管理器
  • input: 初始化下载管理器所需的输入参数

返回值:

  • error: 初始化过程中的错误信息,如果成功则为nil

func NewDownloadFile

func NewDownloadFile(ctx context.Context, db *database.DB, h host.Host, nps *pubsub.NodePubSub,
	taskID string,
	fileID string,
	pubkeyHash []byte,
	firstKeyShare []byte,
) (*pb.DownloadOperationInfo, error)

NewDownloadFile 创建并初始化一个新的下载文件实例 参数:

  • ctx: 上下文对象,用于控制请求的生命周期
  • db: 数据库实例,用于存储下载记录
  • h: libp2p主机实例,用于网络通信
  • nps: 发布订阅系统实例,用于网络寻址
  • fileID: 要下载的文件ID
  • ownerPriv: 文件所有者的私钥,用于身份验证和加密

返回值:

  • *pb.DownloadOperationInfo: 下载操作信息,包含任务ID、文件路径等
  • error: 错误信息

功能:

  • 生成下载任务ID和密钥分片
  • 通过P2P网络获取文件信息
  • 创建下载记录并保存到数据库

func QueryDownloadTask

func QueryDownloadTask(db *badgerhold.Store, start, pageSize int, options ...database.QueryOption) ([]*pb.DownloadFileRecord, uint64, error)

QueryDownloadTask 执行下载任务的基础查询 参数:

  • db: *badgerhold.Store 数据库存储实例
  • start: int 查询的起始位置
  • pageSize: int 每页显示的记录数
  • options: ...database.QueryOption 可选的查询条件

返回值:

  • []*pb.DownloadFileRecord: 下载任务记录列表
  • uint64: 符合查询条件的总记录数
  • error: 如果查询过程中发生错误,返回相应错误

func RegisterDownloadPubsubProtocol

func RegisterDownloadPubsubProtocol(lc fx.Lifecycle, input RegisterPubsubProtocolInput)

RegisterDownloadPubsubProtocol 注册所有下载相关的PubSub协议处理器 参数:

  • lc: 应用生命周期管理器
  • input: 注册所需的输入参数

返回值:

  • error: 注册失败返回错误,成功返回nil

功能:

  • 订阅文件信息请求主题
  • 订阅索引清单请求主题
  • 订阅片段内容请求主题
  • 订阅索引清单响应主题
  • 为每个主题注册对应的处理函数

func RegisterDownloadStreamProtocol

func RegisterDownloadStreamProtocol(lc fx.Lifecycle, input RegisterStreamProtocolInput)

RegisterDownloadStreamProtocol 注册下载流协议 参数:

  • lc: 生命周期管理器
  • input: 注册所需的依赖项

返回值: error - 注册过程中的错误信息

功能:

  • 创建流协议实例
  • 注册请求和发送数据片段的处理器
  • 管理协议的生命周期

func ReplyToMessage

func ReplyToMessage(ctx context.Context, topic *pubsub.Topic, messageID string, replyData []byte) error

ReplyToMessage 用于回复收到的消息 参数:

  • ctx: 上下文
  • topic: 主题实例
  • messageID: 原始消息ID
  • replyData: 回复的数据内容

返回值:

  • error: 回复失败返回错误,成功返回nil

功能:

  • 构造回复消息
  • 发布回复消息到指定主题
  • 记录错误日志

func RequestContentPubSub

func RequestContentPubSub(
	ctx context.Context,
	h host.Host,
	nps *pubsub.NodePubSub,
	taskID string,
	fileID string,
	pubkeyHash []byte,
	segmentId string,
	segmentIndex int64,
	requestedSegmentIds []string,
) (*pb.SegmentContentResponse, error)

RequestContentPubSub 发送文件片段内容请求并等待响应 参数:

  • ctx: 上下文,用于控制请求的生命周期
  • h: libp2p网络主机实例
  • nps: 发布订阅系统
  • taskID: 任务唯一标识
  • fileID: 文件唯一标识
  • pubkeyHash: 所有者的公钥哈希
  • segmentId: 请求的片段ID
  • segmentIndex: 片段索引
  • requestedSegmentIds: 请求下载的片段ID列表

返回值:

  • *pb.SegmentContentResponse: 片段内容响应对象,包含片段数据和元信息
  • error: 错误信息,如果请求失败则返回错误原因

功能:

  • 构造并发送片段内容请求
  • 等待并解析响应数据
  • 返回片段内容响应对象
  • 记录请求和响应日志

func RequestFileInfoPubSub

func RequestFileInfoPubSub(
	ctx context.Context,
	host host.Host,
	nps *pubsub.NodePubSub,
	taskID string,
	fileID string,
	pubkeyHash []byte,
) (*pb.DownloadPubSubFileInfoResponse, error)

RequestFileInfoPubSub 发送文件信息请求并等待响应 参数:

  • ctx: 上下文,用于控制请求的生命周期
  • host: libp2p网络主机实例
  • nps: 发布订阅系统
  • taskID: 任务唯一标识
  • fileID: 文件唯一标识
  • pubkeyHash: 所有者的公钥哈希

返回值:

  • *pb.DownloadPubSubFileInfoResponse: 文件信息响应对象,包含文件元数据
  • error: 错误信息,如果请求失败则返回错误原因

功能:

  • 构造并发送文件信息请求
  • 等待并解析响应数据
  • 返回文件信息响应对象

func RequestManifestPubSub

func RequestManifestPubSub(
	ctx context.Context,
	host host.Host,
	nps *pubsub.NodePubSub,
	taskID string,
	fileID string,
	pubkeyHash []byte,
	requestedSegmentIds []string,
) error

RequestManifestPubSub 发送索引清单请求 参数:

  • ctx: 上下文,用于控制请求的生命周期
  • host: libp2p网络主机实例
  • nps: 发布订阅系统
  • taskID: 任务唯一标识
  • fileID: 文件唯一标识
  • pubkeyHash: 所有者的公钥哈希
  • requestedSegmentIds: 请求下载的片段ID列表

返回值:

  • error: 错误信息,如果请求失败则返回错误原因

功能:

  • 构造并发送索引清单请求
  • 不等待响应直接返回
  • 记录请求日志信息

func UpdateDownloadFileStatus

func UpdateDownloadFileStatus(db *badgerhold.Store, taskID string, status pb.DownloadStatus) error

UpdateDownloadFileStatus 更新下载文件的状态 参数:

  • db: 数据库存储接口
  • taskID: 下载任务的唯一标识
  • status: 新的文件状态

返回值:

  • error: 如果更新过程中发生错误则返回相应错误,否则返回 nil

功能:

  • 获取当前文件记录
  • 更新文件状态
  • 保存更新后的记录

func UpdateSegmentNodes

func UpdateSegmentNodes(db *badgerhold.Store, taskID string, peerID string, availableSlices map[int64]string) (map[int64]string, error)

UpdateSegmentNodes 更新片段的节点信息并返回未完成的片段索引 参数:

  • db: 数据库存储实例
  • taskID: 任务ID
  • peerID: 节点ID
  • availableSlices: 节点可用的片段映射,key为片段索引,value为片段ID

返回值:

  • map[int64]string: 未完成片段的映射,key为片段索引,value为片段ID
  • error: 错误信息

func ValidateAndStoreSegment

func ValidateAndStoreSegment(
	db *badgerhold.Store,
	shareOne []byte,
	response *pb.SegmentContentResponse,
) error

ValidateAndStoreSegment 验证并存储下载的文件片段 功能: 对下载的文件片段进行完整性验证,包括签名验证、解密、校验和验证,并将验证通过的片段存储到数据库

参数: - db: 数据库存储实例 - shareOne: 第一个密钥分片 - response: 下载响应数据

返回值: - error: 错误信息

func VerifySegmentChecksum

func VerifySegmentChecksum(content []byte, expectedChecksum uint32) error

VerifySegmentChecksum 验证片段校验和 参数:

  • content: 需要验证的内容数据
  • expectedChecksum: 期望的校验和值

返回值:

  • error: 校验和不匹配返回错误,匹配返回nil

功能:

  • 计算内容的CRC32校验和
  • 验证计算的校验和与期望值是否匹配
  • 确保数据完整性

func VerifySegmentSignature

func VerifySegmentSignature(p *pb.SegmentContentResponse) error

VerifySegmentSignature 验证片段签名 参数:

  • p: 片段内容响应对象,包含需要验证的数据和签名

返回值:

  • error: 验证失败返回错误,验证成功返回nil

功能:

  • 验证片段内容的签名是否有效
  • 使用ECDSA算法进行签名验证
  • 验证签名数据的完整性和真实性

Types

type DownloadManager

type DownloadManager struct {
	// contains filtered or unexported fields
}

DownloadManager 管理所有下载任务,提供文件下载的统一入口和管理功能

func (*DownloadManager) Cancel

func (m *DownloadManager) Cancel() context.CancelFunc

Cancel 获取任务的取消函数 返回值:

  • context.CancelFunc: 任务的取消函数

func (*DownloadManager) CancelDownload

func (m *DownloadManager) CancelDownload(taskID string) error

CancelDownload 取消下载任务 参数:

  • taskID: 要取消的任务ID

返回值:

  • error: 如果取消过程中发生错误,返回错误信息

func (*DownloadManager) Context

func (m *DownloadManager) Context() context.Context

Context 获取任务的上下文 返回值:

  • context.Context: 任务的上下文对象

func (*DownloadManager) DB

func (m *DownloadManager) DB() *database.DB

DB 返回数据库存储 返回值:

  • *badgerhold.Store: 数据库存储

func (*DownloadManager) DeleteDownload

func (m *DownloadManager) DeleteDownload(taskID string) error

DeleteDownload 删除下载任务 参数:

  • taskID: 要删除的任务ID

返回值:

  • error: 如果删除过程中发生错误,返回错误信息

func (*DownloadManager) ErrChan

func (m *DownloadManager) ErrChan() <-chan error

ErrChan 返回错误通知通道 返回值:

  • <-chan error: 只读的通道,用于接收错误通知

func (*DownloadManager) FS

func (m *DownloadManager) FS() afero.Afero

FS 返回文件系统接口 返回值:

  • afero.Afero: 文件系统接口

func (*DownloadManager) Host

func (m *DownloadManager) Host() host.Host

Host 获取网络主机实例 返回值:

  • host.Host: 网络主机实例

func (*DownloadManager) IsFileDownloading

func (m *DownloadManager) IsFileDownloading(fileID string) bool

IsFileDownloading 检查指定文件是否正在下载中 功能: 检查指定文件是否处于下载相关状态(包括获取信息、等待下载、下载中和暂停状态) 参数:

  • fileID: 要检查的文件ID

返回值:

  • bool: 如果文件正在下载中则返回true,否则返回false

func (*DownloadManager) IsMaxConcurrencyReached

func (m *DownloadManager) IsMaxConcurrencyReached() bool

IsMaxConcurrencyReached 检查是否达到下载允许的最大并发数 返回值:

  • bool: 如果达到最大并发数则返回true,否则返回false

func (*DownloadManager) LoadExistingTasks

func (m *DownloadManager) LoadExistingTasks() error

LoadExistingTasks 从数据库加载现有的下载任务 功能: 从数据库中加载并恢复所有现有的下载任务 返回值:

  • error: 加载过程中的错误信息,如果成功则为nil

func (*DownloadManager) ManagerChannelEvents

func (m *DownloadManager) ManagerChannelEvents()

ManagerChannelEvents 处理下载管理器的通道事件 功能:

  • 启动一个goroutine监听下载管理器的通道事件
  • 处理上下文取消和新的下载任务请求
  • 通过select实现多路复用监听

func (*DownloadManager) NewDownload

func (m *DownloadManager) NewDownload(
	ownerPriv *ecdsa.PrivateKey,
	fileID string,
) (string, error)

NewDownload 创建新的下载任务 参数:

  • fileID: 文件唯一标识,要下载的文件ID
  • ownerPriv: 所有者的私钥,用于签名和权限验证

返回值:

  • string: 下载任务ID
  • error: 如果创建失败则返回错误原因

func (*DownloadManager) NewShareDownload

func (m *DownloadManager) NewShareDownload(
	ownerPriv *ecdsa.PrivateKey,
	fileID string,
	firstKeyShare []byte,
) (string, error)

NewShareDownload 创建新的共享下载任务 参数:

  • fileID: 文件唯一标识,要下载的文件ID
  • ownerPriv: 所有者的私钥,用于签名和权限验证

返回值:

  • string: 下载任务ID
  • error: 如果创建失败则返回错误原因

func (*DownloadManager) NodePubSub

func (m *DownloadManager) NodePubSub() *pubsub.NodePubSub

NodePubSub 返回发布订阅系统 返回值:

  • *pubsub.NodePubSub: 发布订阅系统

func (*DownloadManager) NotifyError

func (m *DownloadManager) NotifyError(err string, args ...interface{})

NotifyError 通知错误信息 参数:

  • m: 上传管理器对象
  • err: 要通知的错误信息
  • args: 格式化参数

func (*DownloadManager) NotifyStatus

func (m *DownloadManager) NotifyStatus(status *pb.DownloadChan)

NotifyStatus 通知上传状态 参数:

  • m: 上传管理器对象
  • status: 要通知的状态信息

func (*DownloadManager) Options

func (m *DownloadManager) Options() *fscfg.Options

Options 返回文件存储选项配置 返回值:

  • *fscfg.Options: 文件存储选项配置

func (*DownloadManager) PauseDownload

func (m *DownloadManager) PauseDownload(taskID string) error

PauseDownload 暂停下载任务 参数:

  • taskID: 要暂停的任务ID

返回值:

  • error: 如果暂停过程中发生错误,返回错误信息

func (*DownloadManager) QueryDownload

func (m *DownloadManager) QueryDownload(start, pageSize int, options ...database.QueryOption) ([]*pb.DownloadFileRecord, uint64, int, int, error)

QueryDownload 查询下载任务记录并返回分页信息 参数:

  • start: 查询的起始位置(从0开始)
  • pageSize: 每页显示的记录数
  • options: 可选的查询条件(如状态过滤、时间范围等)

返回值:

  • []*pb.DownloadFileRecord: 下载任务记录列表
  • uint64: 符合查询条件的总记录数
  • int: 当前页码(从1开始)
  • int: 每页大小
  • error: 如果查询���败则返回错误原因

func (*DownloadManager) ResumeDownload

func (m *DownloadManager) ResumeDownload(taskID string) error

ResumeDownload 继续下载任务 参数:

  • taskID: 要继续下载的任务ID

返回值:

  • error: 如果继续下载过程中发生错误,返回错误信息

func (*DownloadManager) RoutingTable

func (m *DownloadManager) RoutingTable() *kbucket.RoutingTable

RoutingTable 获取客户端实例 返回值:

  • *kbucket.RoutingTable : 路由表实例

func (*DownloadManager) StatusChan

func (m *DownloadManager) StatusChan() <-chan *pb.DownloadChan

StatusChan 返回下载状态和进度通知通道 返回值:

  • <-chan *pb.UploadChan: 只读的通道,用于接收下载状态和进度通知

func (*DownloadManager) TriggerDownload

func (m *DownloadManager) TriggerDownload(taskID string) error

TriggerDownload 触发指定任务ID的下载操作 参数:

  • taskID: 要触发下载的任务ID

返回值:

  • error: 如果触发失败则返回错误原因

type DownloadTask

type DownloadTask struct {
	// contains filtered or unexported fields
}

DownloadTask 描述一个文件下载任务

func NewDownloadTask

func NewDownloadTask(ctx context.Context, opt *fscfg.Options, db *database.DB, fs afero.Afero,
	host host.Host, routingTable *kbucket.RoutingTable, nps *pubsub.NodePubSub,
	statusChan chan *pb.DownloadChan, errChan chan error, taskID string,
) (*DownloadTask, error)

NewDownloadTask 创建并初始化一个新的文件下载任务实例 参数:

  • ctx: context.Context 用于管理任务生命周期的上下文
  • opt: *fscfg.Options 文件存储配置选项
  • db: *database.DB 数据库实例
  • host: host.Host libp2p网络主机实例
  • routingTable: *kbucket.RoutingTable 路由表
  • nps: *pubsub.NodePubSub 发布订阅系统
  • statusChan: chan *pb.DownloadChan 状态更新通道
  • errChan: chan error 错误通知通道
  • taskID: string 任务唯一标识符

返回值:

  • *DownloadTask: 创建的下载任务实例
  • error: 如果创建过程中发生错误,返回相应的错误信息

func (*DownloadTask) Cancel

func (t *DownloadTask) Cancel() error

Cancel 取消下载任务 返回值:

  • error: 错误信息

功能:

  • 使用事务取消下载任务
  • 清理任务相关资源
  • 记录取消操作日志

func (*DownloadTask) Cleanup

func (t *DownloadTask) Cleanup()

Cleanup 清理任务资源

func (*DownloadTask) Close

func (t *DownloadTask) Close()

Close 关闭任务并释放资源

func (*DownloadTask) Context

func (t *DownloadTask) Context() context.Context

Context 获取任务的上下文 返回值:

  • context.Context: 任务的上下文对象

func (*DownloadTask) DB

func (t *DownloadTask) DB() *badgerhold.Store

DB 获取持久化存储 返回值:

  • *badgerhold.Store: 持久化存储实例

func (*DownloadTask) Delete

func (t *DownloadTask) Delete() error

Delete 删除下载任务相关的数据 该方法用于删除下载任务的所有相关数据

返回值:

  • error: 如果删除过程中发生错误,返回相应的错误信息

func (*DownloadTask) FS

func (t *DownloadTask) FS() afero.Afero

FS 返回文件系统接口 返回值:

  • afero.Afero: 文件系统接口

func (*DownloadTask) ForceCancel

func (t *DownloadTask) ForceCancel() error

ForceCancel 强制触发任务取消 取消当前下载任务,如果通道已满则先清空再写入

func (*DownloadTask) ForceDelete

func (t *DownloadTask) ForceDelete() error

ForceDelete 强制触发任务删除 删除当前下载任务及相关资源,如果通道已满则先清空再写入

func (*DownloadTask) ForceFileFinalize

func (t *DownloadTask) ForceFileFinalize() error

ForceFileFinalize 强制触发文件完成处理 处理文件下载完成后的操作,如果通道已满则先清空再写入

func (*DownloadTask) ForceNetworkTransfer

func (t *DownloadTask) ForceNetworkTransfer(peerSegments map[peer.ID][]string) error

ForceNetworkTransfer 强制触发网络传输 向目标节点传输文件片段,支持重试机制

func (*DownloadTask) ForceNodeDispatch

func (t *DownloadTask) ForceNodeDispatch() error

ForceNodeDispatch 强制触发节点分发 以节点为单位从队列中读取文件片段进行分发,如果通道已满则先清空再写入

func (*DownloadTask) ForcePause

func (t *DownloadTask) ForcePause() error

ForcePause 强制触发任务暂停 暂停当前下载任务,如果通道已满则先清空再写入

func (*DownloadTask) ForceSegmentIndex

func (t *DownloadTask) ForceSegmentIndex() error

ForceSegmentIndex 强制触发片段索引请求 请求文件片段的索引信息,如果通道已满则先清空再写入

func (*DownloadTask) ForceSegmentMerge

func (t *DownloadTask) ForceSegmentMerge() error

ForceSegmentMerge 强制触发片段合并 合并已下载的文件片段,如果通道已满则先清空再写入

func (*DownloadTask) ForceSegmentProcess

func (t *DownloadTask) ForceSegmentProcess() error

ForceSegmentProcess 强制触发片段处理 将文件片段整合并写入队列,如果通道已满则先清空再写入

func (*DownloadTask) ForceSegmentVerify

func (t *DownloadTask) ForceSegmentVerify() error

ForceSegmentVerify 强制触发片段验证 验证已传输片段的完整性,如果通道已满则先清空再写入

func (*DownloadTask) GetProgress

func (t *DownloadTask) GetProgress() (int64, error)

GetProgress 获取下载进度的百分比 返回值:

  • int64: 下载进度百分比(0-100)

功能:

  • 使用BitSet跟踪片段完成状态
  • 计算当前下载进度百分比
  • 记录进度日志
  • 返回进度值

func (*DownloadTask) Host

func (t *DownloadTask) Host() host.Host

Host 获取网络主机实例 返回值:

  • host.Host: 网络主机实例

func (*DownloadTask) NodePubSub

func (t *DownloadTask) NodePubSub() *pubsub.NodePubSub

NodePubSub 获取存储网络 返回值:

  • *pubsub.NodePubSub: 发布订阅系统

func (*DownloadTask) NotifySegmentStatus

func (t *DownloadTask) NotifySegmentStatus(status *pb.DownloadChan)

NotifySegmentStatus 通知片段状态更新 向外部通知文件片段的处理状态,超时后记录警告日志

func (*DownloadTask) NotifyTaskError

func (t *DownloadTask) NotifyTaskError(err error)

NotifyTaskError 通知任务错误 向外部通知任务执行过程中的错误,超时后记录警告日志

func (*DownloadTask) Options

func (t *DownloadTask) Options() *fscfg.Options

Options 获取文件存储选项配置 返回值:

  • *fscfg.Options: 文件存储选项配置

func (*DownloadTask) Pause

func (t *DownloadTask) Pause() error

Pause 暂停下载任务 返回值:

  • error: 错误信息

功能:

  • 使用事务暂停下载任务
  • 清理任务相关资源
  • 记录暂停操作日志

func (*DownloadTask) RoutingTable

func (t *DownloadTask) RoutingTable() *kbucket.RoutingTable

RoutingTable 获取端实例 返回值:

  • *kbucket.RoutingTable : 路由表实例

func (*DownloadTask) Start

func (t *DownloadTask) Start() error

Start 开始下载任务 返回值:

  • error: 错误信息

功能:

  • 启动多个协程处理下载任务
  • 初始化任务通道和定时器
  • 强制更新索引清单
  • 记录任务开始日志

func (*DownloadTask) TaskID

func (t *DownloadTask) TaskID() string

TaskID 获取任务ID 返回值:

  • string: 任务的唯一标识符

type InitializeDownloadManagerInput

type InitializeDownloadManagerInput struct {
	fx.In

	Download *DownloadManager // 下载管理器实例
}

InitializeDownloadManagerInput 定义了初始化 DownloadManager 所需的输入参数

type NewDownloadManagerInput

type NewDownloadManagerInput struct {
	fx.In

	Ctx          context.Context       // 上下文
	Opt          *fscfg.Options        // 文件存储选项配置
	DB           *database.DB          // 数据库存储
	FS           afero.Afero           // 文件系统接口
	Host         host.Host             // libp2p网络主机实例
	RoutingTable *kbucket.RoutingTable // 路由表,用于管理对等节点和路由
	NPS          *pubsub.NodePubSub    // 发布订阅系统,用于节点之间的消息传递
}

NewDownloadManagerInput 定义了创建 DownloadManager 所需的输入参数

type NewDownloadManagerOutput

type NewDownloadManagerOutput struct {
	fx.Out

	Download *DownloadManager // 下载管理器实例
}

NewDownloadManagerOutput 定义了 NewDownloadManager 函数的输出

func NewDownloadManager

func NewDownloadManager(lc fx.Lifecycle, input NewDownloadManagerInput) (out NewDownloadManagerOutput, err error)

NewDownloadManager 创建并初始化一个新的 DownloadManager 实例 功能: 根据提供的参数创建并初始化一个新的下载管理器实例 参数:

  • lc: 生命周期管理器
  • input: 创建下载管理器所需的输入参数

返回值:

  • NewDownloadManagerOutput: 包含创建的下载管理器实例的输出结构
  • error: 创建过程中的错误信息,如果成功则为nil

type RegisterPubsubProtocolInput

type RegisterPubsubProtocolInput struct {
	fx.In

	Ctx          context.Context       // 全局上下文,用于管理整个应用的生命周期
	Opt          *fscfg.Options        // 文件存储配置选项
	DB           *database.DB          // 本地数据存储实例
	FS           afero.Afero           // 文件系统接口
	Host         host.Host             // libp2p网络主机实例
	RoutingTable *kbucket.RoutingTable // 路由表,用于管理对等节点和路由
	NPS          *pubsub.NodePubSub    // 发布订阅系统
	Download     *DownloadManager      // 下载管理器实例
}

RegisterPubsubProtocolInput 定义了注册PubsubProtocol所需的输入参数

type RegisterStreamProtocolInput

type RegisterStreamProtocolInput struct {
	fx.In

	Ctx          context.Context       // 全局上下文,用于管理整个应用的生命周期和取消操作
	Opt          *fscfg.Options        // 文件存储选项配置,包含各种系统设置和参数
	DB           *database.DB          // 持久化存储,用于本地数据的存储和检索
	FS           afero.Afero           // 文件系统接口,提供跨平台的文件操作能力
	Host         host.Host             // libp2p网络主机实例
	RoutingTable *kbucket.RoutingTable // 路由表,用于管理对等节点和路由
	NPS          *pubsub.NodePubSub    // 发布订阅系统,用于节点之间的消息传递
	DM           *DownloadManager      // 下载管理器,用于处理和管理文件下载任务
}

RegisterStreamProtocolInput 定义了注册流协议所需的输入参数

type StreamProtocol

type StreamProtocol struct {
	// contains filtered or unexported fields
}

StreamProtocol 定义了流协议的结构体

type Topic

type Topic int

Topic 定义了允许的主题类型

const (
	PubSubFileInfoRequestTopic          Topic = iota // 文件信息请求主题
	PubSubDownloadManifestRequestTopic               // 下载清单请求主题
	PubSubDownloadManifestResponseTopic              // 下载清单响应主题
	PubSubDownloadContentRequestTopic                // 下载内容请求主题
)

定义主题类型常量

func (Topic) String

func (t Topic) String() string

String 将Topic转换为对应的字符串表示 返回值:

  • string: 主题对应的字符串

功能:

  • 将Topic枚举值转换为对应的字符串格式
  • 用于主题订阅和发布时的标识

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL