Documentation ¶
Overview ¶
package downloads_ 提供了文件下载相关的功能
package downloads_ 实现文件下载相关功能
Index ¶
- Constants
- Variables
- func CreateDownloadFileRecord(db *badgerhold.Store, taskID string, fileID string, pubkeyHash []byte, ...) (*pb.DownloadFileRecord, error)
- func DecompressAndDecryptSegmentContent(shareOne, shareTwo []byte, compressedData []byte) ([]byte, error)
- func GetDownloadProgressAndPending(db *badgerhold.Store, taskID string) (*bitset.BitSet, map[int64]struct{}, error)
- func GetFileInfoResponse(db *database.DB, taskID string, fileID string, pubkeyHash []byte) (*pb.DownloadPubSubFileInfoResponse, error)
- func GetInitialDownloadResponse(db *database.DB, taskID string, fileID string, pubkeyHash []byte) (*pb.DownloadPubSubFileInfoResponse, *pb.DownloadPubSubManifestResponse, error)
- func GetListDownloadSegments(db *badgerhold.Store, taskID string) ([]*pb.DownloadSegmentRecord, error)
- func GetManifestResponse(db *database.DB, taskID string, fileID string, pubkeyHash []byte, ...) (*pb.DownloadPubSubManifestResponse, error)
- func GetPendingSegments(db *badgerhold.Store, taskID string) ([]string, error)
- func GetSegmentContent(db *database.DB, taskID string, fileID string, segmentId string, ...) (*pb.SegmentContentResponse, error)
- func GetSegmentStats(db *badgerhold.Store, taskID string) (..., error)
- func GetSegmentStorageData(db *database.DB, hostID string, taskID string, fileID string, segmentID string) (*pb.SegmentContentResponse, error)
- func GetSegmentsForRecovery(db *badgerhold.Store, taskID string) (map[int64]string, error)
- func HandleDownloadContentRequestPubSub(ctx context.Context, opt *fscfg.Options, db *database.DB, fs afero.Afero, ...)
- func HandleDownloadManifestRequestPubSub(ctx context.Context, opt *fscfg.Options, db *database.DB, fs afero.Afero, ...)
- func HandleDownloadManifestResponsePubSub(ctx context.Context, opt *fscfg.Options, nps *pubsub.NodePubSub, ...)
- func HandleFileInfoRequestPubSub(ctx context.Context, opt *fscfg.Options, db *database.DB, fs afero.Afero, ...)
- func InitializeDownloadManager(lc fx.Lifecycle, input InitializeDownloadManagerInput) error
- func NewDownloadFile(ctx context.Context, db *database.DB, h host.Host, nps *pubsub.NodePubSub, ...) (*pb.DownloadOperationInfo, error)
- func QueryDownloadTask(db *badgerhold.Store, start, pageSize int, options ...database.QueryOption) ([]*pb.DownloadFileRecord, uint64, error)
- func RegisterDownloadPubsubProtocol(lc fx.Lifecycle, input RegisterPubsubProtocolInput)
- func RegisterDownloadStreamProtocol(lc fx.Lifecycle, input RegisterStreamProtocolInput)
- func ReplyToMessage(ctx context.Context, topic *pubsub.Topic, messageID string, replyData []byte) error
- func RequestContentPubSub(ctx context.Context, h host.Host, nps *pubsub.NodePubSub, taskID string, ...) (*pb.SegmentContentResponse, error)
- func RequestFileInfoPubSub(ctx context.Context, host host.Host, nps *pubsub.NodePubSub, taskID string, ...) (*pb.DownloadPubSubFileInfoResponse, error)
- func RequestManifestPubSub(ctx context.Context, host host.Host, nps *pubsub.NodePubSub, taskID string, ...) error
- func UpdateDownloadFileStatus(db *badgerhold.Store, taskID string, status pb.DownloadStatus) error
- func UpdateSegmentNodes(db *badgerhold.Store, taskID string, peerID string, ...) (map[int64]string, error)
- func ValidateAndStoreSegment(db *badgerhold.Store, shareOne []byte, response *pb.SegmentContentResponse) error
- func VerifySegmentChecksum(content []byte, expectedChecksum uint32) error
- func VerifySegmentSignature(p *pb.SegmentContentResponse) error
- type DownloadManager
- func (m *DownloadManager) Cancel() context.CancelFunc
- func (m *DownloadManager) CancelDownload(taskID string) error
- func (m *DownloadManager) Context() context.Context
- func (m *DownloadManager) DB() *database.DB
- func (m *DownloadManager) DeleteDownload(taskID string) error
- func (m *DownloadManager) ErrChan() <-chan error
- func (m *DownloadManager) FS() afero.Afero
- func (m *DownloadManager) Host() host.Host
- func (m *DownloadManager) IsFileDownloading(fileID string) bool
- func (m *DownloadManager) IsMaxConcurrencyReached() bool
- func (m *DownloadManager) LoadExistingTasks() error
- func (m *DownloadManager) ManagerChannelEvents()
- func (m *DownloadManager) NewDownload(ownerPriv *ecdsa.PrivateKey, fileID string) (string, error)
- func (m *DownloadManager) NewShareDownload(ownerPriv *ecdsa.PrivateKey, fileID string, firstKeyShare []byte) (string, error)
- func (m *DownloadManager) NodePubSub() *pubsub.NodePubSub
- func (m *DownloadManager) NotifyError(err string, args ...interface{})
- func (m *DownloadManager) NotifyStatus(status *pb.DownloadChan)
- func (m *DownloadManager) Options() *fscfg.Options
- func (m *DownloadManager) PauseDownload(taskID string) error
- func (m *DownloadManager) QueryDownload(start, pageSize int, options ...database.QueryOption) ([]*pb.DownloadFileRecord, uint64, int, int, error)
- func (m *DownloadManager) ResumeDownload(taskID string) error
- func (m *DownloadManager) RoutingTable() *kbucket.RoutingTable
- func (m *DownloadManager) StatusChan() <-chan *pb.DownloadChan
- func (m *DownloadManager) TriggerDownload(taskID string) error
- type DownloadTask
- func (t *DownloadTask) Cancel() error
- func (t *DownloadTask) Cleanup()
- func (t *DownloadTask) Close()
- func (t *DownloadTask) Context() context.Context
- func (t *DownloadTask) DB() *badgerhold.Store
- func (t *DownloadTask) Delete() error
- func (t *DownloadTask) FS() afero.Afero
- func (t *DownloadTask) ForceCancel() error
- func (t *DownloadTask) ForceDelete() error
- func (t *DownloadTask) ForceFileFinalize() error
- func (t *DownloadTask) ForceNetworkTransfer(peerSegments map[peer.ID][]string) error
- func (t *DownloadTask) ForceNodeDispatch() error
- func (t *DownloadTask) ForcePause() error
- func (t *DownloadTask) ForceSegmentIndex() error
- func (t *DownloadTask) ForceSegmentMerge() error
- func (t *DownloadTask) ForceSegmentProcess() error
- func (t *DownloadTask) ForceSegmentVerify() error
- func (t *DownloadTask) GetProgress() (int64, error)
- func (t *DownloadTask) Host() host.Host
- func (t *DownloadTask) NodePubSub() *pubsub.NodePubSub
- func (t *DownloadTask) NotifySegmentStatus(status *pb.DownloadChan)
- func (t *DownloadTask) NotifyTaskError(err error)
- func (t *DownloadTask) Options() *fscfg.Options
- func (t *DownloadTask) Pause() error
- func (t *DownloadTask) RoutingTable() *kbucket.RoutingTable
- func (t *DownloadTask) Start() error
- func (t *DownloadTask) TaskID() string
- type InitializeDownloadManagerInput
- type NewDownloadManagerInput
- type NewDownloadManagerOutput
- type RegisterPubsubProtocolInput
- type RegisterStreamProtocolInput
- type StreamProtocol
- type Topic
Constants ¶
const (
MaxSessions = 10 // 允许的最大并发会话数
)
Variables ¶
var AllowedTopics = []Topic{ PubSubFileInfoRequestTopic, PubSubDownloadManifestRequestTopic, PubSubDownloadManifestResponseTopic, PubSubDownloadContentRequestTopic, }
AllowedTopics 定义了系统支持的所有主题列表
var ( // StreamRequestSegmentProtocol 定义了请求数据片段的协议标识符 StreamRequestSegmentProtocol = fmt.Sprintf("defs@stream/request/segment/%s", version) )
Functions ¶
func CreateDownloadFileRecord ¶
func CreateDownloadFileRecord( db *badgerhold.Store, taskID string, fileID string, pubkeyHash []byte, firstKeyShare []byte, tempStorage string, fileMeta *pb.FileMeta, sliceTable map[int64]*pb.HashTable, status pb.DownloadStatus, ) (*pb.DownloadFileRecord, error)
CreateDownloadFileRecord 创建下载文件记录并保存到数据库 参数:
- db: 数据库存储接口
- taskID: 下载任务的唯一标识
- fileID: 文件的唯一标识
- pubkeyHash: 所有者的公钥哈希
- firstKeyShare: 恢复密钥的第一个分片
- tempStorage: 临时存储路径
- fileMeta: 文件的元数据信息
- sliceTable: 分片哈希表
- status: 下载任务的初始状态
返回值:
- *pb.DownloadFileRecord: 创建的下载文件记录对象
- error: 如果创建或存储过程中发生错误则返回相应错误
功能:
- 创建新的下载文件记录
- 初始化文件和分片记录
- 分批处理并保存分片记录
- 使用事务确保数据一致性
func DecompressAndDecryptSegmentContent ¶
func DecompressAndDecryptSegmentContent(shareOne, shareTwo []byte, compressedData []byte) ([]byte, error)
DecompressAndDecryptSegmentContent 解压并解密片段内容 参数:
- shareOne: 第一个密钥分片
- shareTwo: 第二个密钥分片
- compressedData: 压缩并加密的数据内容
返回值:
- []byte: 解压并解密后的原始数据
- error: 解压或解密失败时返回错误信息
功能:
- 对压缩的加密数据进行解压缩
- 使用密钥分片恢复解密密钥
- 使用AES-GCM模式解密数据
- 返回解密后的原始数据
func GetDownloadProgressAndPending ¶
func GetDownloadProgressAndPending(db *badgerhold.Store, taskID string) (*bitset.BitSet, map[int64]struct{}, error)
GetDownloadProgressAndPending 获取下载任务的进度和待下载片段信息 功能: 根据任务ID获取下载进度位图和待下载片段集合,用于跟踪下载状态
参数: - db: 数据库存储实例 - taskID: 下载任务ID
返回值: - *bitset.BitSet: 下载进度位图,每个位表示一个数据片段的下载状态(1表示已完成,0表示未完成) - map[int64]struct{}: 待下载片段的索引集合,key为片段索引 - error: 错误信息
func GetFileInfoResponse ¶
func GetFileInfoResponse(db *database.DB, taskID string, fileID string, pubkeyHash []byte) (*pb.DownloadPubSubFileInfoResponse, error)
GetFileInfoResponse 根据文件ID获取文件信息响应 参数:
- db: 数据库存储接口
- taskID: 任务的唯一标识符
- fileID: 文件的唯一标识符
- pubkeyHash: 公钥哈希
返回值:
- *pb.FileInfoResponse: 文件信息响应
- error: 操作过程中可能发生的错误
功能:
- 获取文件片段存储记录
- 构建文件元数据
- 生成文件信息响应
func GetInitialDownloadResponse ¶
func GetInitialDownloadResponse(db *database.DB, taskID string, fileID string, pubkeyHash []byte) (*pb.DownloadPubSubFileInfoResponse, *pb.DownloadPubSubManifestResponse, error)
GetInitialDownloadResponse 根据文件ID获取首次下载响应 参数:
- db: 数据库存储接口
- taskID: 任务的唯一标识符
- fileID: 文件的唯一标识符
- pubkeyHash: 公钥哈希
返回值:
- *pb.DownloadPubSubFileInfoResponse: 文件摘要响应
- *pb.DownloadPubSubManifestResponse: 索引清单响应
- error: 操作过程中可能发生的错误
功能:
- 获取文件片段存储记录
- 构建文件元数据
- 生成文件信息响应
- 生成索引清单响应
func GetListDownloadSegments ¶
func GetListDownloadSegments(db *badgerhold.Store, taskID string) ([]*pb.DownloadSegmentRecord, error)
GetListDownloadSegments 获取下载任务的所有片段记录 功能: 根据任务ID获取所有下载片段的详细信息
参数: - db: 数据库存储实例 - taskID: 下载任务ID
返回值: - []*pb.DownloadSegmentRecord: 下载片段记录列表,包含每个片段的详细信息 - error: 错误信息
func GetManifestResponse ¶
func GetManifestResponse( db *database.DB, taskID string, fileID string, pubkeyHash []byte, requestedSegmentIds []string, ) (*pb.DownloadPubSubManifestResponse, error)
GetManifestResponse 根据文件ID获取索引清单响应 参数:
- db: 数据库存储接口
- taskID: 任务的唯一标识符
- fileID: 文件的唯一标识符
- pubkeyHash: 公钥哈希
- requestedSegmentIds: 请求下载的片段ID列表
返回值:
- *pb.DownloadManifestResponse: 索引清单响应
- error: 如果获取过程中发生错误则返回相应错误
功能:
- 获取文件片段存储记录
- 过滤请求的片段
- 构建可用片段映射
- 生成索引清单响应
func GetPendingSegments ¶
func GetPendingSegments(db *badgerhold.Store, taskID string) ([]string, error)
GetPendingSegments 获取未完成的下载片段ID列表 功能: 获取指定任务中所有未完成下载的非校验片段ID
参数: - db: 数据库存储实例 - taskID: 下载任务ID
返回值: - []string: 未完成片段的ID列表 - error: 错误信息
func GetSegmentContent ¶
func GetSegmentContent( db *database.DB, taskID string, fileID string, segmentId string, segmentIndex int64, pubkeyHash []byte, requestedSegmentIds []string, ) (*pb.SegmentContentResponse, error)
GetSegmentContent 根据文件ID、片段ID和片段索引获取单个文件片段的内容 参数:
- db: 数据库存储接口
- taskID: 任务的唯一标识符
- fileID: 文件的唯一标识符
- segmentId: 请求下载的片段ID
- segmentIndex: 请求下载的片段索引
- pubkeyHash: 公钥哈希
- requestedSegmentIds: 请求下载的片段ID列表
返回值:
- *pb.SegmentContentResponse: 包含请求片段内容的响应对象
- error: 如果获取过程中发生错误则返回相应错误
功能:
- 获取指定片段的存储记录
- 构建可用片段映射
- 生成片段内容响应
func GetSegmentStats ¶
func GetSegmentStats(db *badgerhold.Store, taskID string) (*struct { DataSegments struct { Total int Completed int Failed map[int64]string } ParitySegments struct { Completed int Pending map[int64]string } }, error)
GetSegmentStats 获取下载任务的片段统计信息 功能: 统计下载任务中数据片段和校验片段的完成情况
参数: - db: 数据库存储实例 - taskID: 下载任务ID
返回值: - *struct{}: 包含数据片段和校验片段统计信息的结构体,包括总数、完成数、失败数等 - error: 错误信息
func GetSegmentStorageData ¶
func GetSegmentStorageData(db *database.DB, hostID string, taskID string, fileID string, segmentID string) (*pb.SegmentContentResponse, error)
GetSegmentStorageData 获取文件片段的存储数据 功能: 根据给定的标识信息获取文件片段的完整内容和元数据
参数: - db: 数据库实例 - hostID: 主机ID - taskID: 任务ID - fileID: 文件ID - segmentID: 片段ID
返回值: - *pb.SegmentContentResponse: 包含片段内容和元数据的完整响应对象 - error: 错误信息
func GetSegmentsForRecovery ¶
GetSegmentsForRecovery 获取用于数据恢复的片段信息 功能: 根据下载失败的数据片段数量,选择合适的校验片段用于数据恢复
参数: - db: 数据库存储实例 - taskID: 下载任务ID
返回值: - map[int64]string: 需要下载的片段映射,key为片段索引,value为片段ID - error: 错误信息
func HandleDownloadContentRequestPubSub ¶
func HandleDownloadContentRequestPubSub( ctx context.Context, opt *fscfg.Options, db *database.DB, fs afero.Afero, nps *pubsub.NodePubSub, download *DownloadManager, res *pubsub.Message, )
HandleDownloadContentRequestPubSub 处理片段内容请求 参数:
- ctx: 上下文对象
- opt: 配置选项
- db: 数据库实例
- fs: 文件系统实例
- nps: 发布订阅系统
- download: 下载管理器
- res: 接收到的消息
返回值: void
功能:
- 解析片段内容请求数据
- 获取请求的片段内容
- 发送片段内容响应给请求节点
func HandleDownloadManifestRequestPubSub ¶
func HandleDownloadManifestRequestPubSub( ctx context.Context, opt *fscfg.Options, db *database.DB, fs afero.Afero, nps *pubsub.NodePubSub, download *DownloadManager, res *pubsub.Message, )
HandleDownloadManifestRequestPubSub 处理索引清单请求 参数:
- ctx: 上下文对象
- opt: 配置选项
- db: 数据库实例
- fs: 文件系统实例
- nps: 发布订阅系统
- download: 下载管理器
- res: 接收到的消息
返回值: void
功能:
- 解析索引清单请求数据
- 获取请求的文件片段清单
- 发送清单响应给请求节点
func HandleDownloadManifestResponsePubSub ¶
func HandleDownloadManifestResponsePubSub( ctx context.Context, opt *fscfg.Options, nps *pubsub.NodePubSub, db *database.DB, fs afero.Afero, download *DownloadManager, res *pubsub.Message, )
HandleDownloadManifestResponsePubSub 处理索引清单响应 参数:
- ctx: 上下文,用于控制处理过程的生命周期
- opt: 文件存储选项配置
- nps: 发布订阅系统
- db: 数据库存储实例
- fs: 文件系统接口
- download: 下载管理器实例
- res: 接收到的流请求消息
功能:
- 解析索引清单响应数据
- 更新下载任务的节点和可用片段信息
- 记录任务更新日志
- 验证响应数据的有效性
func HandleFileInfoRequestPubSub ¶
func HandleFileInfoRequestPubSub( ctx context.Context, opt *fscfg.Options, db *database.DB, fs afero.Afero, host host.Host, nps *pubsub.NodePubSub, download *DownloadManager, res *pubsub.Message, )
HandleFileInfoRequestPubSub 处理文件信息请求 参数:
- ctx: 上下文对象
- opt: 配置选项
- db: 数据库实例
- fs: 文件系统实例
- host: libp2p网络主机
- nps: 发布订阅系统
- download: 下载管理器
- res: 接收到的消息
返回值: void
功能:
- 解析并验证文件信息请求
- 建立与请求节点的连接
- 获取文件元数据和分片信息
- 发送响应消息给请求节点
func InitializeDownloadManager ¶
func InitializeDownloadManager(lc fx.Lifecycle, input InitializeDownloadManagerInput) error
InitializeDownloadManager 初始化 DownloadManager 并设置相关的生命周期钩子 功能: 初始化下载管理器并设置其生命周期钩子函数 参数:
- lc: 生命周期管理器
- input: 初始化下载管理器所需的输入参数
返回值:
- error: 初始化过程中的错误信息,如果成功则为nil
func NewDownloadFile ¶
func NewDownloadFile(ctx context.Context, db *database.DB, h host.Host, nps *pubsub.NodePubSub, taskID string, fileID string, pubkeyHash []byte, firstKeyShare []byte, ) (*pb.DownloadOperationInfo, error)
NewDownloadFile 创建并初始化一个新的下载文件实例 参数:
- ctx: 上下文对象,用于控制请求的生命周期
- db: 数据库实例,用于存储下载记录
- h: libp2p主机实例,用于网络通信
- nps: 发布订阅系统实例,用于网络寻址
- fileID: 要下载的文件ID
- ownerPriv: 文件所有者的私钥,用于身份验证和加密
返回值:
- *pb.DownloadOperationInfo: 下载操作信息,包含任务ID、文件路径等
- error: 错误信息
功能:
- 生成下载任务ID和密钥分片
- 通过P2P网络获取文件信息
- 创建下载记录并保存到数据库
func QueryDownloadTask ¶
func QueryDownloadTask(db *badgerhold.Store, start, pageSize int, options ...database.QueryOption) ([]*pb.DownloadFileRecord, uint64, error)
QueryDownloadTask 执行下载任务的基础查询 参数:
- db: *badgerhold.Store 数据库存储实例
- start: int 查询的起始位置
- pageSize: int 每页显示的记录数
- options: ...database.QueryOption 可选的查询条件
返回值:
- []*pb.DownloadFileRecord: 下载任务记录列表
- uint64: 符合查询条件的总记录数
- error: 如果查询过程中发生错误,返回相应错误
func RegisterDownloadPubsubProtocol ¶
func RegisterDownloadPubsubProtocol(lc fx.Lifecycle, input RegisterPubsubProtocolInput)
RegisterDownloadPubsubProtocol 注册所有下载相关的PubSub协议处理器 参数:
- lc: 应用生命周期管理器
- input: 注册所需的输入参数
返回值:
- error: 注册失败返回错误,成功返回nil
功能:
- 订阅文件信息请求主题
- 订阅索引清单请求主题
- 订阅片段内容请求主题
- 订阅索引清单响应主题
- 为每个主题注册对应的处理函数
func RegisterDownloadStreamProtocol ¶
func RegisterDownloadStreamProtocol(lc fx.Lifecycle, input RegisterStreamProtocolInput)
RegisterDownloadStreamProtocol 注册下载流协议 参数:
- lc: 生命周期管理器
- input: 注册所需的依赖项
返回值: error - 注册过程中的错误信息
功能:
- 创建流协议实例
- 注册请求和发送数据片段的处理器
- 管理协议的生命周期
func ReplyToMessage ¶
func ReplyToMessage(ctx context.Context, topic *pubsub.Topic, messageID string, replyData []byte) error
ReplyToMessage 用于回复收到的消息 参数:
- ctx: 上下文
- topic: 主题实例
- messageID: 原始消息ID
- replyData: 回复的数据内容
返回值:
- error: 回复失败返回错误,成功返回nil
功能:
- 构造回复消息
- 发布回复消息到指定主题
- 记录错误日志
func RequestContentPubSub ¶
func RequestContentPubSub( ctx context.Context, h host.Host, nps *pubsub.NodePubSub, taskID string, fileID string, pubkeyHash []byte, segmentId string, segmentIndex int64, requestedSegmentIds []string, ) (*pb.SegmentContentResponse, error)
RequestContentPubSub 发送文件片段内容请求并等待响应 参数:
- ctx: 上下文,用于控制请求的生命周期
- h: libp2p网络主机实例
- nps: 发布订阅系统
- taskID: 任务唯一标识
- fileID: 文件唯一标识
- pubkeyHash: 所有者的公钥哈希
- segmentId: 请求的片段ID
- segmentIndex: 片段索引
- requestedSegmentIds: 请求下载的片段ID列表
返回值:
- *pb.SegmentContentResponse: 片段内容响应对象,包含片段数据和元信息
- error: 错误信息,如果请求失败则返回错误原因
功能:
- 构造并发送片段内容请求
- 等待并解析响应数据
- 返回片段内容响应对象
- 记录请求和响应日志
func RequestFileInfoPubSub ¶
func RequestFileInfoPubSub( ctx context.Context, host host.Host, nps *pubsub.NodePubSub, taskID string, fileID string, pubkeyHash []byte, ) (*pb.DownloadPubSubFileInfoResponse, error)
RequestFileInfoPubSub 发送文件信息请求并等待响应 参数:
- ctx: 上下文,用于控制请求的生命周期
- host: libp2p网络主机实例
- nps: 发布订阅系统
- taskID: 任务唯一标识
- fileID: 文件唯一标识
- pubkeyHash: 所有者的公钥哈希
返回值:
- *pb.DownloadPubSubFileInfoResponse: 文件信息响应对象,包含文件元数据
- error: 错误信息,如果请求失败则返回错误原因
功能:
- 构造并发送文件信息请求
- 等待并解析响应数据
- 返回文件信息响应对象
func RequestManifestPubSub ¶
func RequestManifestPubSub( ctx context.Context, host host.Host, nps *pubsub.NodePubSub, taskID string, fileID string, pubkeyHash []byte, requestedSegmentIds []string, ) error
RequestManifestPubSub 发送索引清单请求 参数:
- ctx: 上下文,用于控制请求的生命周期
- host: libp2p网络主机实例
- nps: 发布订阅系统
- taskID: 任务唯一标识
- fileID: 文件唯一标识
- pubkeyHash: 所有者的公钥哈希
- requestedSegmentIds: 请求下载的片段ID列表
返回值:
- error: 错误信息,如果请求失败则返回错误原因
功能:
- 构造并发送索引清单请求
- 不等待响应直接返回
- 记录请求日志信息
func UpdateDownloadFileStatus ¶
func UpdateDownloadFileStatus(db *badgerhold.Store, taskID string, status pb.DownloadStatus) error
UpdateDownloadFileStatus 更新下载文件的状态 参数:
- db: 数据库存储接口
- taskID: 下载任务的唯一标识
- status: 新的文件状态
返回值:
- error: 如果更新过程中发生错误则返回相应错误,否则返回 nil
功能:
- 获取当前文件记录
- 更新文件状态
- 保存更新后的记录
func UpdateSegmentNodes ¶
func UpdateSegmentNodes(db *badgerhold.Store, taskID string, peerID string, availableSlices map[int64]string) (map[int64]string, error)
UpdateSegmentNodes 更新片段的节点信息并返回未完成的片段索引 参数:
- db: 数据库存储实例
- taskID: 任务ID
- peerID: 节点ID
- availableSlices: 节点可用的片段映射,key为片段索引,value为片段ID
返回值:
- map[int64]string: 未完成片段的映射,key为片段索引,value为片段ID
- error: 错误信息
func ValidateAndStoreSegment ¶
func ValidateAndStoreSegment( db *badgerhold.Store, shareOne []byte, response *pb.SegmentContentResponse, ) error
ValidateAndStoreSegment 验证并存储下载的文件片段 功能: 对下载的文件片段进行完整性验证,包括签名验证、解密、校验和验证,并将验证通过的片段存储到数据库
参数: - db: 数据库存储实例 - shareOne: 第一个密钥分片 - response: 下载响应数据
返回值: - error: 错误信息
func VerifySegmentChecksum ¶
VerifySegmentChecksum 验证片段校验和 参数:
- content: 需要验证的内容数据
- expectedChecksum: 期望的校验和值
返回值:
- error: 校验和不匹配返回错误,匹配返回nil
功能:
- 计算内容的CRC32校验和
- 验证计算的校验和与期望值是否匹配
- 确保数据完整性
func VerifySegmentSignature ¶
func VerifySegmentSignature(p *pb.SegmentContentResponse) error
VerifySegmentSignature 验证片段签名 参数:
- p: 片段内容响应对象,包含需要验证的数据和签名
返回值:
- error: 验证失败返回错误,验证成功返回nil
功能:
- 验证片段内容的签名是否有效
- 使用ECDSA算法进行签名验证
- 验证签名数据的完整性和真实性
Types ¶
type DownloadManager ¶
type DownloadManager struct {
// contains filtered or unexported fields
}
DownloadManager 管理所有下载任务,提供文件下载的统一入口和管理功能
func (*DownloadManager) Cancel ¶
func (m *DownloadManager) Cancel() context.CancelFunc
Cancel 获取任务的取消函数 返回值:
- context.CancelFunc: 任务的取消函数
func (*DownloadManager) CancelDownload ¶
func (m *DownloadManager) CancelDownload(taskID string) error
CancelDownload 取消下载任务 参数:
- taskID: 要取消的任务ID
返回值:
- error: 如果取消过程中发生错误,返回错误信息
func (*DownloadManager) Context ¶
func (m *DownloadManager) Context() context.Context
Context 获取任务的上下文 返回值:
- context.Context: 任务的上下文对象
func (*DownloadManager) DB ¶
func (m *DownloadManager) DB() *database.DB
DB 返回数据库存储 返回值:
- *badgerhold.Store: 数据库存储
func (*DownloadManager) DeleteDownload ¶
func (m *DownloadManager) DeleteDownload(taskID string) error
DeleteDownload 删除下载任务 参数:
- taskID: 要删除的任务ID
返回值:
- error: 如果删除过程中发生错误,返回错误信息
func (*DownloadManager) ErrChan ¶
func (m *DownloadManager) ErrChan() <-chan error
ErrChan 返回错误通知通道 返回值:
- <-chan error: 只读的通道,用于接收错误通知
func (*DownloadManager) FS ¶
func (m *DownloadManager) FS() afero.Afero
FS 返回文件系统接口 返回值:
- afero.Afero: 文件系统接口
func (*DownloadManager) Host ¶
func (m *DownloadManager) Host() host.Host
Host 获取网络主机实例 返回值:
- host.Host: 网络主机实例
func (*DownloadManager) IsFileDownloading ¶
func (m *DownloadManager) IsFileDownloading(fileID string) bool
IsFileDownloading 检查指定文件是否正在下载中 功能: 检查指定文件是否处于下载相关状态(包括获取信息、等待下载、下载中和暂停状态) 参数:
- fileID: 要检查的文件ID
返回值:
- bool: 如果文件正在下载中则返回true,否则返回false
func (*DownloadManager) IsMaxConcurrencyReached ¶
func (m *DownloadManager) IsMaxConcurrencyReached() bool
IsMaxConcurrencyReached 检查是否达到下载允许的最大并发数 返回值:
- bool: 如果达到最大并发数则返回true,否则返回false
func (*DownloadManager) LoadExistingTasks ¶
func (m *DownloadManager) LoadExistingTasks() error
LoadExistingTasks 从数据库加载现有的下载任务 功能: 从数据库中加载并恢复所有现有的下载任务 返回值:
- error: 加载过程中的错误信息,如果成功则为nil
func (*DownloadManager) ManagerChannelEvents ¶
func (m *DownloadManager) ManagerChannelEvents()
ManagerChannelEvents 处理下载管理器的通道事件 功能:
- 启动一个goroutine监听下载管理器的通道事件
- 处理上下文取消和新的下载任务请求
- 通过select实现多路复用监听
func (*DownloadManager) NewDownload ¶
func (m *DownloadManager) NewDownload( ownerPriv *ecdsa.PrivateKey, fileID string, ) (string, error)
NewDownload 创建新的下载任务 参数:
- fileID: 文件唯一标识,要下载的文件ID
- ownerPriv: 所有者的私钥,用于签名和权限验证
返回值:
- string: 下载任务ID
- error: 如果创建失败则返回错误原因
func (*DownloadManager) NewShareDownload ¶
func (m *DownloadManager) NewShareDownload( ownerPriv *ecdsa.PrivateKey, fileID string, firstKeyShare []byte, ) (string, error)
NewShareDownload 创建新的共享下载任务 参数:
- fileID: 文件唯一标识,要下载的文件ID
- ownerPriv: 所有者的私钥,用于签名和权限验证
返回值:
- string: 下载任务ID
- error: 如果创建失败则返回错误原因
func (*DownloadManager) NodePubSub ¶
func (m *DownloadManager) NodePubSub() *pubsub.NodePubSub
NodePubSub 返回发布订阅系统 返回值:
- *pubsub.NodePubSub: 发布订阅系统
func (*DownloadManager) NotifyError ¶
func (m *DownloadManager) NotifyError(err string, args ...interface{})
NotifyError 通知错误信息 参数:
- m: 上传管理器对象
- err: 要通知的错误信息
- args: 格式化参数
func (*DownloadManager) NotifyStatus ¶
func (m *DownloadManager) NotifyStatus(status *pb.DownloadChan)
NotifyStatus 通知上传状态 参数:
- m: 上传管理器对象
- status: 要通知的状态信息
func (*DownloadManager) Options ¶
func (m *DownloadManager) Options() *fscfg.Options
Options 返回文件存储选项配置 返回值:
- *fscfg.Options: 文件存储选项配置
func (*DownloadManager) PauseDownload ¶
func (m *DownloadManager) PauseDownload(taskID string) error
PauseDownload 暂停下载任务 参数:
- taskID: 要暂停的任务ID
返回值:
- error: 如果暂停过程中发生错误,返回错误信息
func (*DownloadManager) QueryDownload ¶
func (m *DownloadManager) QueryDownload(start, pageSize int, options ...database.QueryOption) ([]*pb.DownloadFileRecord, uint64, int, int, error)
QueryDownload 查询下载任务记录并返回分页信息 参数:
- start: 查询的起始位置(从0开始)
- pageSize: 每页显示的记录数
- options: 可选的查询条件(如状态过滤、时间范围等)
返回值:
- []*pb.DownloadFileRecord: 下载任务记录列表
- uint64: 符合查询条件的总记录数
- int: 当前页码(从1开始)
- int: 每页大小
- error: 如果查询���败则返回错误原因
func (*DownloadManager) ResumeDownload ¶
func (m *DownloadManager) ResumeDownload(taskID string) error
ResumeDownload 继续下载任务 参数:
- taskID: 要继续下载的任务ID
返回值:
- error: 如果继续下载过程中发生错误,返回错误信息
func (*DownloadManager) RoutingTable ¶
func (m *DownloadManager) RoutingTable() *kbucket.RoutingTable
RoutingTable 获取客户端实例 返回值:
- *kbucket.RoutingTable : 路由表实例
func (*DownloadManager) StatusChan ¶
func (m *DownloadManager) StatusChan() <-chan *pb.DownloadChan
StatusChan 返回下载状态和进度通知通道 返回值:
- <-chan *pb.UploadChan: 只读的通道,用于接收下载状态和进度通知
func (*DownloadManager) TriggerDownload ¶
func (m *DownloadManager) TriggerDownload(taskID string) error
TriggerDownload 触发指定任务ID的下载操作 参数:
- taskID: 要触发下载的任务ID
返回值:
- error: 如果触发失败则返回错误原因
type DownloadTask ¶
type DownloadTask struct {
// contains filtered or unexported fields
}
DownloadTask 描述一个文件下载任务
func NewDownloadTask ¶
func NewDownloadTask(ctx context.Context, opt *fscfg.Options, db *database.DB, fs afero.Afero, host host.Host, routingTable *kbucket.RoutingTable, nps *pubsub.NodePubSub, statusChan chan *pb.DownloadChan, errChan chan error, taskID string, ) (*DownloadTask, error)
NewDownloadTask 创建并初始化一个新的文件下载任务实例 参数:
- ctx: context.Context 用于管理任务生命周期的上下文
- opt: *fscfg.Options 文件存储配置选项
- db: *database.DB 数据库实例
- host: host.Host libp2p网络主机实例
- routingTable: *kbucket.RoutingTable 路由表
- nps: *pubsub.NodePubSub 发布订阅系统
- statusChan: chan *pb.DownloadChan 状态更新通道
- errChan: chan error 错误通知通道
- taskID: string 任务唯一标识符
返回值:
- *DownloadTask: 创建的下载任务实例
- error: 如果创建过程中发生错误,返回相应的错误信息
func (*DownloadTask) Cancel ¶
func (t *DownloadTask) Cancel() error
Cancel 取消下载任务 返回值:
- error: 错误信息
功能:
- 使用事务取消下载任务
- 清理任务相关资源
- 记录取消操作日志
func (*DownloadTask) Context ¶
func (t *DownloadTask) Context() context.Context
Context 获取任务的上下文 返回值:
- context.Context: 任务的上下文对象
func (*DownloadTask) DB ¶
func (t *DownloadTask) DB() *badgerhold.Store
DB 获取持久化存储 返回值:
- *badgerhold.Store: 持久化存储实例
func (*DownloadTask) Delete ¶
func (t *DownloadTask) Delete() error
Delete 删除下载任务相关的数据 该方法用于删除下载任务的所有相关数据
返回值:
- error: 如果删除过程中发生错误,返回相应的错误信息
func (*DownloadTask) FS ¶
func (t *DownloadTask) FS() afero.Afero
FS 返回文件系统接口 返回值:
- afero.Afero: 文件系统接口
func (*DownloadTask) ForceCancel ¶
func (t *DownloadTask) ForceCancel() error
ForceCancel 强制触发任务取消 取消当前下载任务,如果通道已满则先清空再写入
func (*DownloadTask) ForceDelete ¶
func (t *DownloadTask) ForceDelete() error
ForceDelete 强制触发任务删除 删除当前下载任务及相关资源,如果通道已满则先清空再写入
func (*DownloadTask) ForceFileFinalize ¶
func (t *DownloadTask) ForceFileFinalize() error
ForceFileFinalize 强制触发文件完成处理 处理文件下载完成后的操作,如果通道已满则先清空再写入
func (*DownloadTask) ForceNetworkTransfer ¶
func (t *DownloadTask) ForceNetworkTransfer(peerSegments map[peer.ID][]string) error
ForceNetworkTransfer 强制触发网络传输 向目标节点传输文件片段,支持重试机制
func (*DownloadTask) ForceNodeDispatch ¶
func (t *DownloadTask) ForceNodeDispatch() error
ForceNodeDispatch 强制触发节点分发 以节点为单位从队列中读取文件片段进行分发,如果通道已满则先清空再写入
func (*DownloadTask) ForcePause ¶
func (t *DownloadTask) ForcePause() error
ForcePause 强制触发任务暂停 暂停当前下载任务,如果通道已满则先清空再写入
func (*DownloadTask) ForceSegmentIndex ¶
func (t *DownloadTask) ForceSegmentIndex() error
ForceSegmentIndex 强制触发片段索引请求 请求文件片段的索引信息,如果通道已满则先清空再写入
func (*DownloadTask) ForceSegmentMerge ¶
func (t *DownloadTask) ForceSegmentMerge() error
ForceSegmentMerge 强制触发片段合并 合并已下载的文件片段,如果通道已满则先清空再写入
func (*DownloadTask) ForceSegmentProcess ¶
func (t *DownloadTask) ForceSegmentProcess() error
ForceSegmentProcess 强制触发片段处理 将文件片段整合并写入队列,如果通道已满则先清空再写入
func (*DownloadTask) ForceSegmentVerify ¶
func (t *DownloadTask) ForceSegmentVerify() error
ForceSegmentVerify 强制触发片段验证 验证已传输片段的完整性,如果通道已满则先清空再写入
func (*DownloadTask) GetProgress ¶
func (t *DownloadTask) GetProgress() (int64, error)
GetProgress 获取下载进度的百分比 返回值:
- int64: 下载进度百分比(0-100)
功能:
- 使用BitSet跟踪片段完成状态
- 计算当前下载进度百分比
- 记录进度日志
- 返回进度值
func (*DownloadTask) Host ¶
func (t *DownloadTask) Host() host.Host
Host 获取网络主机实例 返回值:
- host.Host: 网络主机实例
func (*DownloadTask) NodePubSub ¶
func (t *DownloadTask) NodePubSub() *pubsub.NodePubSub
NodePubSub 获取存储网络 返回值:
- *pubsub.NodePubSub: 发布订阅系统
func (*DownloadTask) NotifySegmentStatus ¶
func (t *DownloadTask) NotifySegmentStatus(status *pb.DownloadChan)
NotifySegmentStatus 通知片段状态更新 向外部通知文件片段的处理状态,超时后记录警告日志
func (*DownloadTask) NotifyTaskError ¶
func (t *DownloadTask) NotifyTaskError(err error)
NotifyTaskError 通知任务错误 向外部通知任务执行过程中的错误,超时后记录警告日志
func (*DownloadTask) Options ¶
func (t *DownloadTask) Options() *fscfg.Options
Options 获取文件存储选项配置 返回值:
- *fscfg.Options: 文件存储选项配置
func (*DownloadTask) Pause ¶
func (t *DownloadTask) Pause() error
Pause 暂停下载任务 返回值:
- error: 错误信息
功能:
- 使用事务暂停下载任务
- 清理任务相关资源
- 记录暂停操作日志
func (*DownloadTask) RoutingTable ¶
func (t *DownloadTask) RoutingTable() *kbucket.RoutingTable
RoutingTable 获取端实例 返回值:
- *kbucket.RoutingTable : 路由表实例
func (*DownloadTask) Start ¶
func (t *DownloadTask) Start() error
Start 开始下载任务 返回值:
- error: 错误信息
功能:
- 启动多个协程处理下载任务
- 初始化任务通道和定时器
- 强制更新索引清单
- 记录任务开始日志
func (*DownloadTask) TaskID ¶
func (t *DownloadTask) TaskID() string
TaskID 获取任务ID 返回值:
- string: 任务的唯一标识符
type InitializeDownloadManagerInput ¶
type InitializeDownloadManagerInput struct { fx.In Download *DownloadManager // 下载管理器实例 }
InitializeDownloadManagerInput 定义了初始化 DownloadManager 所需的输入参数
type NewDownloadManagerInput ¶
type NewDownloadManagerInput struct { fx.In Ctx context.Context // 上下文 Opt *fscfg.Options // 文件存储选项配置 DB *database.DB // 数据库存储 FS afero.Afero // 文件系统接口 Host host.Host // libp2p网络主机实例 RoutingTable *kbucket.RoutingTable // 路由表,用于管理对等节点和路由 NPS *pubsub.NodePubSub // 发布订阅系统,用于节点之间的消息传递 }
NewDownloadManagerInput 定义了创建 DownloadManager 所需的输入参数
type NewDownloadManagerOutput ¶
type NewDownloadManagerOutput struct { fx.Out Download *DownloadManager // 下载管理器实例 }
NewDownloadManagerOutput 定义了 NewDownloadManager 函数的输出
func NewDownloadManager ¶
func NewDownloadManager(lc fx.Lifecycle, input NewDownloadManagerInput) (out NewDownloadManagerOutput, err error)
NewDownloadManager 创建并初始化一个新的 DownloadManager 实例 功能: 根据提供的参数创建并初始化一个新的下载管理器实例 参数:
- lc: 生命周期管理器
- input: 创建下载管理器所需的输入参数
返回值:
- NewDownloadManagerOutput: 包含创建的下载管理器实例的输出结构
- error: 创建过程中的错误信息,如果成功则为nil
type RegisterPubsubProtocolInput ¶
type RegisterPubsubProtocolInput struct { fx.In Ctx context.Context // 全局上下文,用于管理整个应用的生命周期 Opt *fscfg.Options // 文件存储配置选项 DB *database.DB // 本地数据存储实例 FS afero.Afero // 文件系统接口 Host host.Host // libp2p网络主机实例 RoutingTable *kbucket.RoutingTable // 路由表,用于管理对等节点和路由 NPS *pubsub.NodePubSub // 发布订阅系统 Download *DownloadManager // 下载管理器实例 }
RegisterPubsubProtocolInput 定义了注册PubsubProtocol所需的输入参数
type RegisterStreamProtocolInput ¶
type RegisterStreamProtocolInput struct { fx.In Ctx context.Context // 全局上下文,用于管理整个应用的生命周期和取消操作 Opt *fscfg.Options // 文件存储选项配置,包含各种系统设置和参数 DB *database.DB // 持久化存储,用于本地数据的存储和检索 FS afero.Afero // 文件系统接口,提供跨平台的文件操作能力 Host host.Host // libp2p网络主机实例 RoutingTable *kbucket.RoutingTable // 路由表,用于管理对等节点和路由 NPS *pubsub.NodePubSub // 发布订阅系统,用于节点之间的消息传递 DM *DownloadManager // 下载管理器,用于处理和管理文件下载任务 }
RegisterStreamProtocolInput 定义了注册流协议所需的输入参数
type StreamProtocol ¶
type StreamProtocol struct {
// contains filtered or unexported fields
}
StreamProtocol 定义了流协议的结构体