Documentation ¶
Overview ¶
Package uploads 提供文件上传相关的功能实现
Package uploads 提供文件上传相关的功能实现 ¶
Package uploads 提供了文件上传相关的功能 ¶
Package uploads 实现文件上传相关功能 ¶
Package uploads 提供文件上传相关的功能实现
Index ¶
- Constants
- Variables
- func CalculateShards(size int64, opt *fscfg.Options) (int64, int64, error)
- func CreateFileAssetRecord(db *badgerhold.Store, fileRecord *pb.UploadFileRecord) error
- func CreateUploadFileRecord(db *badgerhold.Store, taskID string, fileID string, name string, ...) error
- func CreateUploadSegmentRecord(db *badgerhold.Store, taskID string, segmentID string, index int64, size int64, ...) error
- func GetUploadProgress(db *badgerhold.Store, taskID string) (*bitset.BitSet, error)
- func HandleDeleteFileSegmentRequestPubSub(ctx context.Context, opt *fscfg.Options, db *database.DB, fs afero.Afero, ...) error
- func InitializeUploadManager(lc fx.Lifecycle, input InitializeUploadManagerInput) error
- func NewFileMeta(f *os.File) (*pb.FileMeta, error)
- func NewFileSecurity(fileID string, privKey *ecdsa.PrivateKey, secret []byte) (*pb.FileSecurity, error)
- func NewFileSegment(db *badgerhold.Store, taskID string, fileID string, file *os.File, pk []byte, ...) error
- func NewUploadFile(opt *fscfg.Options, db *database.DB, scheme *shamir.ShamirScheme, name string, ...) (*pb.UploadOperationInfo, error)
- func ProcessFileSegments(db *badgerhold.Store, taskID string, fileID string, file *os.File, pk []byte, ...)
- func QueryFileAssets(db *badgerhold.Store, pubkeyHash []byte, start, pageSize int, query string, ...) ([]*pb.FileAssetRecord, uint64, int, int, error)
- func RegisterUploadPubsubProtocol(lc fx.Lifecycle, input RegisterPubsubProtocolInput)
- func RegisterUploadStreamProtocol(lc fx.Lifecycle, input RegisterStreamProtocolInput)
- func RequestDeleteFileSegmentPubSub(ctx context.Context, nps *pubsub.NodePubSub, fileID string, pubkeyHash []byte) error
- func UpdateSegmentUploadInfo(db *badgerhold.Store, taskID string, index int64, ...) error
- func UpdateUploadFileHashTable(db *badgerhold.Store, taskID string, sliceTable map[int64]*pb.HashTable) error
- func UpdateUploadFileStatus(db *badgerhold.Store, taskID string, status pb.UploadStatus) error
- func ValidateAndUpdateUploadStatus(db *badgerhold.Store, fileRecord *pb.UploadFileRecord, progress bitset.BitSet) error
- type InitializeUploadManagerInput
- type NewUploadManagerInput
- type NewUploadManagerOutput
- type RegisterPubsubProtocolInput
- type RegisterStreamProtocolInput
- type SegmentStatus
- type StreamProtocol
- type Topic
- type UploadManager
- func (m *UploadManager) Cancel() context.CancelFunc
- func (m *UploadManager) CancelUpload(taskID string) error
- func (m *UploadManager) Context() context.Context
- func (m *UploadManager) DB() *database.DB
- func (m *UploadManager) DeleteUpload(taskID string) error
- func (m *UploadManager) ErrChan() <-chan error
- func (m *UploadManager) FS() afero.Afero
- func (m *UploadManager) GetAllUploadFilesSummaries() ([]*pb.UploadFilesSummaries, error)
- func (m *UploadManager) GetErrChan() <-chan error
- func (m *UploadManager) Host() host.Host
- func (m *UploadManager) IsMaxConcurrencyReached() bool
- func (m *UploadManager) LoadExistingTasks() error
- func (m *UploadManager) ManagerChannelEvents()
- func (m *UploadManager) NewUpload(path string, ownerPriv *ecdsa.PrivateKey, immediate ...bool) (*pb.UploadOperationInfo, error)
- func (m *UploadManager) NodePubSub() *pubsub.NodePubSub
- func (m *UploadManager) NotifyError(err string, args ...interface{})
- func (m *UploadManager) NotifyStatus(status *pb.UploadChan)
- func (m *UploadManager) Options() *fscfg.Options
- func (m *UploadManager) PauseUpload(taskID string) error
- func (m *UploadManager) ResumeUpload(taskID string) error
- func (m *UploadManager) RoutingTable() *kbucket.RoutingTable
- func (m *UploadManager) StatusChan() <-chan *pb.UploadChan
- func (m *UploadManager) TriggerForward(payload *pb.FileSegmentStorage)
- func (m *UploadManager) TriggerUpload(taskID string) error
- type UploadTask
- func (t *UploadTask) Cancel() error
- func (t *UploadTask) Cleanup()
- func (t *UploadTask) Close()
- func (t *UploadTask) Context() context.Context
- func (t *UploadTask) DB() *badgerhold.Store
- func (t *UploadTask) Delete() error
- func (t *UploadTask) FS() afero.Afero
- func (t *UploadTask) ForceCancel() error
- func (t *UploadTask) ForceDelete() error
- func (t *UploadTask) ForceFileFinalize() error
- func (t *UploadTask) ForceNetworkTransfer(peerSegments map[peer.ID][]string) error
- func (t *UploadTask) ForceNodeDispatch() error
- func (t *UploadTask) ForcePause() error
- func (t *UploadTask) ForceSegmentProcess() error
- func (t *UploadTask) ForceSegmentVerify() error
- func (t *UploadTask) GetProgress() (int64, error)
- func (t *UploadTask) GetShardCounts() (totalShards, parityShards int64, err error)
- func (t *UploadTask) GetTotalSize() (int64, error)
- func (t *UploadTask) Host() host.Host
- func (t *UploadTask) NodePubSub() *pubsub.NodePubSub
- func (t *UploadTask) NotifySegmentStatus(status *pb.UploadChan)
- func (t *UploadTask) NotifyTaskError(err error)
- func (t *UploadTask) Options() *fscfg.Options
- func (t *UploadTask) Pause() error
- func (t *UploadTask) RoutingTable() *kbucket.RoutingTable
- func (t *UploadTask) Scheme() *shamir.ShamirScheme
- func (t *UploadTask) Start() error
- func (t *UploadTask) TaskID() string
Constants ¶
const (
MaxSessions = 3 // 允许的最大并发会话数
)
Variables ¶
var ( // StreamSendingToNetworkProtocol 定义了发送任务到网络的协议标识符 StreamSendingToNetworkProtocol = fmt.Sprintf("defs@stream/sending/network/%s", version) // StreamForwardToNetworkProtocol 定义了转发任务到网络的协议标识符 StreamForwardToNetworkProtocol = fmt.Sprintf("defs@stream/forward/network/%s", version) )
var AllowedTopics = []Topic{ PubSubDeleteFileSegmentRequestTopic, }
AllowedTopics 定义了系统支持的所有主题列表
Functions ¶
func CalculateShards ¶
CalculateShards 根据文件大小和存储选项计算数据分片和奇偶校验分片的数量 参数:
- size: int64 文件大小,单位为字节
- opt: *fscfg.Options 存储选项,包含存储模式和其他参数
返回值:
- int64: 数据分片数
- int64: 奇偶校验分片数
- error: 如果发生错误,返回错误信息
func CreateFileAssetRecord ¶
func CreateFileAssetRecord(db *badgerhold.Store, fileRecord *pb.UploadFileRecord) error
CreateFileAssetRecord 创建一个新的文件资产记录 参数:
- db: *badgerhold.Store 数据库实例
- fileRecord: *pb.UploadFileRecord 上传文件记录信息
返回值:
- error: 如果创建过程中发生错误,返回错误信息;否则返回 nil
func CreateUploadFileRecord ¶
func CreateUploadFileRecord( db *badgerhold.Store, taskID string, fileID string, name string, fileMeta *pb.FileMeta, fileSecurity *pb.FileSecurity, status pb.UploadStatus, ) error
CreateUploadFileRecord 创建上传文件记录并保存到数据库 该方法用于初始化一个新的文件上传记录,并将其保存到持久化存储中
参数:
- db: 上传文件数据库存储接口
- taskID: 上传任务的唯一标识
- fileID: 文件的唯一标识
- name: 文件名称/路径
- fileMeta: 文件的元数据信息
- fileSecurity: 文件的安全相关信息
- status: 文件上传状态
返回值:
- error: 如果创建或存储过程中发生错误则返回相应错误,否则返回 nil
func CreateUploadSegmentRecord ¶
func CreateUploadSegmentRecord( db *badgerhold.Store, taskID string, segmentID string, index int64, size int64, checksum uint32, content []byte, isRsCodes bool, status pb.SegmentUploadStatus, ) error
CreateUploadSegmentRecord 创建上传分片记录并保存到数据库 该方法用于初始化一个新的分片上传记录,并将其保存到持久化存储中
参数:
- db: 数据库存储接口
- taskID: 上传任务的唯一标识
- segmentID: 分片的唯一标识
- index: 分片在文件中的索引位置
- size: 分片大小
- checksum: 分片的CRC32校验和
- content: 分片的加密后内容
- isRsCodes: 是否为纠删码冗余分片
- status: 分片的上传状态
返回值:
- error: 如果创建或存储过程中发生错误则返回相应错误,否则返回 nil
func GetUploadProgress ¶
GetUploadProgress 获取上传任务的进度信息 参数:
- db: *badgerhold.Store 数据库存储接口
- taskID: string 上传任务的唯一标识
返回值:
- *bitset.BitSet: 表示上传进度的位图
- error: 如果获取过程中发生错误则返回相应错误,否则返回 nil
func HandleDeleteFileSegmentRequestPubSub ¶
func HandleDeleteFileSegmentRequestPubSub( ctx context.Context, opt *fscfg.Options, db *database.DB, fs afero.Afero, nps *pubsub.NodePubSub, upload *UploadManager, res *pubsub.Message, ) error
HandleDeleteFileSegmentRequestPubSub 处理删除文件切片请求主题 参数:
- ctx: context.Context 上下文对象,用于控制请求的生命周期
- opt: *fscfg.Options 配置选项,包含系统配置信息
- db: *database.DB 数据库实例,用于数据持久化
- fs: afero.Afero 文件系统实例,用于文件操作
- nps: *pubsub.NodePubSub 发布订阅系统,用于消息通信
- upload: *UploadManager 上传管理器实例,处理上传相关逻辑
- res: *pubsub.Message 接收到的消息,包含请求数据
返回值:
- error 返回处理过程中的错误信息
功能:
- 处理删除文件切片信息的请求
- 解析请求数据并删除对应的文件切片存储记录
func InitializeUploadManager ¶
func InitializeUploadManager(lc fx.Lifecycle, input InitializeUploadManagerInput) error
InitializeUploadManager 初始化 UploadManager 并设置相关的生命周期钩子 参数:
- lc: fx.Lifecycle 用于管理应用生命周期的对象
- input: InitializeUploadManagerInput 包含初始化 UploadManager 所需的输入参数
返回值:
- error 如果初始化过程中发生错误,则返回相应的错误信息
func NewFileMeta ¶
NewFileMeta 创建并初始化一个新的 FileMeta 实例,提供文件的基本元数据信息 参数:
- file: *os.File 文件对象,用于读取文件信息
返回值:
- *pb.FileMeta: 新创建的 FileMeta 实例,包含文件的基本元数据
- error: 如果发生错误,返回错误信息
func NewFileSecurity ¶
func NewFileSecurity(fileID string, privKey *ecdsa.PrivateKey, secret []byte) (*pb.FileSecurity, error)
NewFileSecurity 创建并初始化一个新的FileSecurity实例,封装了文件的安全和权限相关的信息 参数:
- fileID: string 文件ID
- privKey: *ecdsa.PrivateKey 私钥
- secret: []byte 需要共享的秘密
返回:
- *pb.FileSecurity: 包含文件安全信息的结构体
- error: 错误信息
func NewFileSegment ¶
func NewFileSegment(db *badgerhold.Store, taskID string, fileID string, file *os.File, pk []byte, dataShards, parityShards int64) error
NewFileSegment 创建并初始化一个新的 FileSegment 实例,提供分片的详细信息及其上传状态。 参数:
- db: *badgerhold.Store 数据库实例
- taskID: string 任务ID,用于标识上传任务
- fileID: string 文件ID,用于标识文件
- file: *os.File 待处理的文件对象
- pk: []byte 用于加密的公钥
- dataShards: int64 数据分片数量
- parityShards: int64 奇偶校验分片数量
返回值:
- error: 如果发生错误,返回错误信息
func NewUploadFile ¶
func NewUploadFile(opt *fscfg.Options, db *database.DB, scheme *shamir.ShamirScheme, name string, ownerPriv *ecdsa.PrivateKey, onSegmentsReady func(taskID string), taskStatus *SegmentStatus, errChan chan error, ) (*pb.UploadOperationInfo, error)
NewUploadFile 创建并初始化一个新的 UploadFile 实例 参数:
- opt: *fscfg.Options 存储选项
- db: *database.DB 数据库实例
- scheme: *shamir.ShamirScheme Shamir 秘密共享方案
- name: string 文件名
- ownerPriv: *ecdsa.PrivateKey 文件所有者的私钥
- onSegmentsReady: func(taskID string) 完成回调函数
返回值:
- *pb.UploadOperationInfo: 上传操作信息
- error: 错误信息
func ProcessFileSegments ¶
func ProcessFileSegments( db *badgerhold.Store, taskID string, fileID string, file *os.File, pk []byte, dataShards, parityShards int64, onComplete func(), )
ProcessFileSegments 异步处理文件分片 该方法会在后台进行文件的分片、编码和存储操作
参数:
- db: *badgerhold.Store 数据库实例
- taskID: string 任务ID
- fileID: string 文件ID
- file: *os.File 文件对象
- pk: []byte 用于加密的公钥
- dataShards: int64 数据分片数
- parityShards: int64 奇偶校验分片数
- onComplete: func() 完成回调函数
func QueryFileAssets ¶
func QueryFileAssets(db *badgerhold.Store, pubkeyHash []byte, start, pageSize int, query string, options ...database.QueryOption) ([]*pb.FileAssetRecord, uint64, int, int, error)
QueryFileAssets 查询文件资产记录 参数:
- db: BadgerDB存储实例,用于数据持久化
- pubkeyHash: 所有者的公钥哈希,用于权限验证
- start: 起始记录索引
- pageSize: 每页的最大记录数
- query: 查询条件字符串
- options: 额外的查询选项,用于设置查询条件
返回值:
- []*pb.FileAssetRecord: 查询到的文件资产记录切片
- uint64: 符合查询条件的总记录数
- int: 当前页数
- int: 每页的最大记录数
- error: 如果查询过程中发生错误,返回错误信息
func RegisterUploadPubsubProtocol ¶
func RegisterUploadPubsubProtocol(lc fx.Lifecycle, input RegisterPubsubProtocolInput)
RegisterUploadPubsubProtocol 注册所有上传相关的PubSub协议处理器 参数:
- lc: fx.Lifecycle 应用生命周期管理器
- input: RegisterPubsubProtocolInput 注册所需的输入参数
返回值:
- error: 如果注册过程中出现错误,返回相应的错误信息
func RegisterUploadStreamProtocol ¶
func RegisterUploadStreamProtocol(lc fx.Lifecycle, input RegisterStreamProtocolInput)
RegisterUploadStreamProtocol 注册上传流协议 参数:
- lc: fx.Lifecycle 类型,用于管理组件的生命周期
- input: RegisterStreamProtocolInput 类型,包含注册所需的所有依赖项
返回值: 无
func RequestDeleteFileSegmentPubSub ¶
func RequestDeleteFileSegmentPubSub( ctx context.Context, nps *pubsub.NodePubSub, fileID string, pubkeyHash []byte, ) error
RequestDeleteFileSegmentPubSub 发送删除文件片段请求 参数:
- ctx: 上下文,用于控制请求的生命周期
- nps: 发布订阅系统,用于节点之间的消息传递
- fileID: 文件唯一标识
- pubkeyHash: 所有者的公钥哈希
返回值:
- error: 如果请求过程中出现错误,返回相应的错误信息
func UpdateSegmentUploadInfo ¶
func UpdateSegmentUploadInfo( db *badgerhold.Store, taskID string, index int64, status pb.SegmentUploadStatus, ) error
UpdateSegmentUploadInfo 更新文件片段的上传信息 该方法用于更新文件片段的上传状态、节点ID和上传时间
参数:
- db: *badgerhold.Store 数据库存储接口
- taskID: string 上传任务的唯一标识
- index: int64 片段索引
- status: pb.SegmentUploadStatus 片段上传状态
返回值:
- error: 如果更新过程中发生错误则返回相应错误,否则返回 nil
func UpdateUploadFileHashTable ¶
func UpdateUploadFileHashTable(db *badgerhold.Store, taskID string, sliceTable map[int64]*pb.HashTable) error
UpdateUploadFileHashTable 更新上传文件的哈希表 该方法仅在文件处于编码状态时更新哈希表,并将状态改为待上传 参数:
- db: *badgerhold.Store 数据库存储接口
- taskID: string 上传任务的唯一标识
- sliceTable: map[int64]*pb.HashTable 分片哈希表
返回值:
- error: 如果更新过程中发生错误则返回相应错误,否则返回 nil
func UpdateUploadFileStatus ¶
func UpdateUploadFileStatus(db *badgerhold.Store, taskID string, status pb.UploadStatus) error
UpdateUploadFileStatus 更新上传文件的状态 参数:
- db: *badgerhold.Store 数据库存储接口
- taskID: string 上传任务的唯一标识
- status: pb.UploadStatus 新的文件状态
返回值:
- error: 如果更新过程中发生错误则返回相应错误,否则返回 nil
func ValidateAndUpdateUploadStatus ¶
func ValidateAndUpdateUploadStatus(db *badgerhold.Store, fileRecord *pb.UploadFileRecord, progress bitset.BitSet) error
ValidateAndUpdateUploadStatus 校验并更新上传状态 该方法用于开始上传前的状态校验,会根据实际进度强制更新任务状态: 1. 如果所有分片已完成,则更新为已完成状态并返回错误 2. 如果还有未完成分片,则更新为上传中状态
参数:
- db: *badgerhold.Store 数据库存储接口
- fileRecord: *pb.UploadFileRecord 文件记录
- progress: bitset.BitSet 上传进度位图
返回值:
- error: 如果所有分片已完成或发生其他错误则返回错误信息,否则返回 nil
Types ¶
type InitializeUploadManagerInput ¶
type InitializeUploadManagerInput struct { fx.In Upload *UploadManager }
InitializeUploadManagerInput 定义了初始化 UploadManager 所需的输入参数
type NewUploadManagerInput ¶
type NewUploadManagerInput struct { fx.In Ctx context.Context // 全局上下文,用于管理整个应用的生命周期和取消操作 Opt *fscfg.Options // 文件存储选项配置,包含各种系统设置和参数 DB *database.DB // 持久化存储,用于本地数据的存储和检索 FS afero.Afero // 文件文件系统接口,提供跨平台的文件操作能力统接口 Host host.Host // libp2p网络主机实例 RoutingTable *kbucket.RoutingTable // 路由表,用于管理对等节点和路由 NPS *pubsub.NodePubSub // 发布订阅系统,用于节点之间的消息传递 }
NewUploadManagerInput 定义了创建 UploadManager 所需的输入参数
type NewUploadManagerOutput ¶
type NewUploadManagerOutput struct { fx.Out Upload *UploadManager // 上传管理器,用于处理和管理文件上传任务,包括任务调度、状态跟踪等 }
NewUploadManagerOutput 定义了 NewUploadManager 函数的输出
func NewUploadManager ¶
func NewUploadManager(lc fx.Lifecycle, input NewUploadManagerInput) (out NewUploadManagerOutput, err error)
NewUploadManager 创建并初始化一个新的 UploadManager 实例 参数:
- lc: fx.Lifecycle 用于管理应用生命周期的对象
- input: NewUploadManagerInput 包含创建 UploadManager 所需的输入参数
返回值:
- out: NewUploadManagerOutput 包含创建的 UploadManager 实例
- err: error 如果创建过程中发生错误,则返回相应的错误信息
type RegisterPubsubProtocolInput ¶
type RegisterPubsubProtocolInput struct { fx.In Ctx context.Context // 全局上下文,用于管理整个应用的生命周期 Opt *fscfg.Options // 文件存储配置选项 DB *database.DB // 本地数据存储实例 FS afero.Afero // 文件系统接口 Host host.Host // libp2p网络主机实例 NPS *pubsub.NodePubSub // 发布订阅系统 Upload *UploadManager // 上传管理器实例 }
RegisterPubsubProtocolInput 定义了注册PubsubProtocol所需的输入参数
type RegisterStreamProtocolInput ¶
type RegisterStreamProtocolInput struct { fx.In Ctx context.Context // 全局上下文,用于管理整个应用的生命周期和取消操作 Opt *fscfg.Options // 文件存储选项配置,包含各种系统设置和参数 DB *database.DB // 持久化存储,用于本地数据的存储和检索 FS afero.Afero // 文件系统接口,提供跨平台的文件操作能力 Host host.Host // libp2p网络主机实例 RoutingTable *kbucket.RoutingTable // 路由表,用于管理对等节点和路由 NPS *pubsub.NodePubSub // 发布订阅系统,用于节点之间的消息传递 Upload *UploadManager // 上传管理器,用于处理和管理文件上传任务,包括任务调度、状态跟踪等 }
RegisterStreamProtocolInput 定义了注册流协议所需的输入参数
type SegmentStatus ¶
type SegmentStatus struct {
// contains filtered or unexported fields
}
SegmentStatus 用于跟踪文件是否准备就绪的状态
func NewSegmentStatus ¶
func NewSegmentStatus(locker sync.Locker) *SegmentStatus
NewSegmentStatus 初始化并返回一个 SegmentStatus 实例 参数:
- locker: sync.Locker 用于同步的互斥锁
返回值:
- *SegmentStatus: 初始化后的 SegmentStatus 实例
func (*SegmentStatus) GetState ¶
func (s *SegmentStatus) GetState() bool
GetState 获取当前状态 返回值:
- bool: 当前的状态值
func (*SegmentStatus) SetState ¶
func (s *SegmentStatus) SetState(state bool)
SetState 设置状态,并通知所有等待的goroutine 参数:
- state: bool 要设置的新状态
func (*SegmentStatus) WaitForSpecificState ¶
func (s *SegmentStatus) WaitForSpecificState(targetState bool)
WaitForSpecificState 阻塞当前goroutine,直到达到指定状态 参数:
- targetState: bool 要等待的目标状态
func (*SegmentStatus) WaitForStateChange ¶
func (s *SegmentStatus) WaitForStateChange()
WaitForStateChange 阻塞当前goroutine,直到状态发生变化
type StreamProtocol ¶
type StreamProtocol struct {
// contains filtered or unexported fields
}
StreamProtocol 定义了流协议的结构体
type UploadManager ¶
type UploadManager struct {
// contains filtered or unexported fields
}
UploadManager 管理所有上传任务,提供文件上传的统一入口和管理功能 它负责协调上传任务的执行,管理任务的生命周期,以及通知任务的状态更新和错误事件
func (*UploadManager) Cancel ¶
func (m *UploadManager) Cancel() context.CancelFunc
Cancel 获取任务的取消函数 返回值:
- context.CancelFunc: 任务的取消函数
func (*UploadManager) CancelUpload ¶
func (m *UploadManager) CancelUpload(taskID string) error
CancelUpload 取消上传 参数:
- taskID: 要取消的任务ID,用于标识具体的上传任务
返回值:
- error: 如果取消过程中发生错误,返回错误信息;否则返回 nil
func (*UploadManager) Context ¶
func (m *UploadManager) Context() context.Context
Context 获取任务的上下文 返回值:
- context.Context: 任务的上下文对象
func (*UploadManager) DB ¶
func (m *UploadManager) DB() *database.DB
DB 返回数据库存储 返回值:
- *badgerhold.Store: 数据库存储
func (*UploadManager) DeleteUpload ¶
func (m *UploadManager) DeleteUpload(taskID string) error
DeleteUpload 删除上传任务 参数:
- taskID: 要删除的任务ID,用于标识具体的上传任务
返回值:
- error: 如果删除过程中发生错误,返回错误信息;否则返回 nil
func (*UploadManager) ErrChan ¶
func (m *UploadManager) ErrChan() <-chan error
ErrChan 返回错误通知通道 返回值:
- <-chan error: 只读的通道,用于接收错误通知
func (*UploadManager) FS ¶
func (m *UploadManager) FS() afero.Afero
FS 返回文件系统接口 返回值:
- afero.Afero: 文件系统接口
func (*UploadManager) GetAllUploadFilesSummaries ¶
func (m *UploadManager) GetAllUploadFilesSummaries() ([]*pb.UploadFilesSummaries, error)
GetAllUploadFilesSummaries 获取所有上传记录的概要信息 返回值:
- []*pb.UploadFilesSummaries: 包含所有上传记录概要信息的切片,每个元素包含任务ID、文件名、大小等信息
- error: 如果获取过程中发生错误,返回错误信息;否则返回 nil
func (*UploadManager) GetErrChan ¶
func (m *UploadManager) GetErrChan() <-chan error
GetErrChan 返回错误通道 返回值:
- <-chan error: 只读的通道,用于接收错误信息
func (*UploadManager) Host ¶
func (m *UploadManager) Host() host.Host
Host 获取网络主机实例 返回值:
- host.Host: 网络主机实例
func (*UploadManager) IsMaxConcurrencyReached ¶
func (m *UploadManager) IsMaxConcurrencyReached() bool
IsMaxConcurrencyReached 检查是否达到上传允许的最大并发数 返回值:
- bool: 如果达到最大并发数返回 true,否则返回 false
func (*UploadManager) LoadExistingTasks ¶
func (m *UploadManager) LoadExistingTasks() error
LoadExistingTasks 从数据库加载现有的上传任务 返回值:
- error: 如果加载过程中发生错误,返回相应的错误信息
func (*UploadManager) ManagerChannelEvents ¶
func (m *UploadManager) ManagerChannelEvents()
ManagerChannelEvents 处理上传管理器的通道事件 此方法启动一个新的goroutine来持续监听和处理各种通道事件
func (*UploadManager) NewUpload ¶
func (m *UploadManager) NewUpload( path string, ownerPriv *ecdsa.PrivateKey, immediate ...bool, ) (*pb.UploadOperationInfo, error)
NewUpload 创建一个新的上传任务 参数:
- path: 文件路径,要上传的文件的完整路径
- ownerPriv: 所有者的私钥,用于签名和权限验证
- immediate: 是否立即执行上传(可选参数,默认为 false)
返回:
- *pb.UploadOperationInfo: 上传操作信息,包含任务ID等信息
- error: 错误信息,如果创建失败则返回错误原因
func (*UploadManager) NodePubSub ¶
func (m *UploadManager) NodePubSub() *pubsub.NodePubSub
NodePubSub 返回发布订阅系统 返回值:
- *pubsub.NodePubSub: 发布订阅系统
func (*UploadManager) NotifyError ¶
func (m *UploadManager) NotifyError(err string, args ...interface{})
NotifyError 通知错误信息 参数:
- m: 上传管理器对象
- err: 要通知的错误信息
- args: 格式化参数
func (*UploadManager) NotifyStatus ¶
func (m *UploadManager) NotifyStatus(status *pb.UploadChan)
NotifyStatus 通知上传状态 参数:
- m: 上传管理器对象
- status: 要通知的状态信息
func (*UploadManager) Options ¶
func (m *UploadManager) Options() *fscfg.Options
Options 返回文件存储选项配置 返回值:
- *fscfg.Options: 文件存储选项配置
func (*UploadManager) PauseUpload ¶
func (m *UploadManager) PauseUpload(taskID string) error
PauseUpload 暂停上传 参数:
- taskID: 要暂停的任务ID,用于标识具体的上传任务
返回值:
- error: 如果暂停过程中发生错误,返回错误信息;否则返回 nil
func (*UploadManager) ResumeUpload ¶
func (m *UploadManager) ResumeUpload(taskID string) error
ResumeUpload 继续上传 参数:
- taskID: 要继续上传的任务ID,用于标识具体的上传任务
返回值:
- error: 如果继续上传过程中发生错误,返回相应的错误信息
func (*UploadManager) RoutingTable ¶
func (m *UploadManager) RoutingTable() *kbucket.RoutingTable
RoutingTable 获取客户端实例 返回值:
- *kbucket.RoutingTable : 路由表实例
func (*UploadManager) StatusChan ¶
func (m *UploadManager) StatusChan() <-chan *pb.UploadChan
StatusChan 返回上传状态和进度通知通道 返回值:
- <-chan *pb.UploadChan: 只读的通道,用于接收上传状态和进度通知
func (*UploadManager) TriggerForward ¶
func (m *UploadManager) TriggerForward(payload *pb.FileSegmentStorage)
TriggerForward 触发转发操作 参数:
- payload: *pb.FileSegmentStorage 要转发的文件段存储信息
func (*UploadManager) TriggerUpload ¶
func (m *UploadManager) TriggerUpload(taskID string) error
TriggerUpload 触发上传操作 参数:
- taskID: 要上传的任务ID,用于标识具体的上传任务
返回值:
- error: 如果触发过程中发生错误,返回错误信息;否则返回 nil
type UploadTask ¶
type UploadTask struct {
// contains filtered or unexported fields
}
UploadTask 描述一个文件上传任务,包括文件信息和上传状态
func NewUploadTask ¶
func NewUploadTask(ctx context.Context, opt *fscfg.Options, db *database.DB, fs afero.Afero, host host.Host, routingTable *kbucket.RoutingTable, nps *pubsub.NodePubSub, scheme *shamir.ShamirScheme, statusChan chan *pb.UploadChan, errChan chan error, taskID string, ) *UploadTask
NewUploadTask 创建并初始化一个新的文件上传任务实例 参数:
- ctx: context.Context 用于管理任务生命周期的上下文
- opt: *fscfg.Options 文件存储配置选项
- db: *database.DB 数据库实例
- fs: afero.Afero 文件系统接口
- host: host.Host libp2p网络主机实例
- routingTable: *kbucket.RoutingTable 路由表
- nps: *pubsub.NodePubSub 发布订阅系统
- scheme: *shamir.ShamirScheme Shamir秘密共享方案实例
- statusChan: chan *pb.UploadChan 状态更新通道
- taskID: string 任务唯一标识符
返回值:
- *UploadTask: 创建的上传任务实例
func (*UploadTask) Cancel ¶
func (t *UploadTask) Cancel() error
Cancel 取消上传任务 返回值:
- error: 如果取消过程中发生错误,返回相应的错误信息
func (*UploadTask) Context ¶
func (t *UploadTask) Context() context.Context
Context 获取任务的上下文 返回值:
- context.Context: 任务的上下文对象
func (*UploadTask) DB ¶
func (t *UploadTask) DB() *badgerhold.Store
DB 获取持久化存储 返回值:
- *badgerhold.Store: 持久化存储实例
func (*UploadTask) Delete ¶
func (t *UploadTask) Delete() error
Delete 删除上传任务相关的数据 该方法用于删除上传任务的所有相关数据
返回值:
- error: 如果删除过程中发生错误,返回相应的错误信息
func (*UploadTask) ForceCancel ¶
func (t *UploadTask) ForceCancel() error
ForceCancel 强制触发任务取消 取消当前上传任务,如果通道已满则先清空再写入
func (*UploadTask) ForceDelete ¶
func (t *UploadTask) ForceDelete() error
ForceDelete 强制触发任务删除 删除当前上传任务及相关资源,如果通道已满则先清空再写入
func (*UploadTask) ForceFileFinalize ¶
func (t *UploadTask) ForceFileFinalize() error
ForceFileFinalize 强制触发文件完成处理 处理文件上传完成后的操作,如果通道已满则先清空再写入
func (*UploadTask) ForceNetworkTransfer ¶
func (t *UploadTask) ForceNetworkTransfer(peerSegments map[peer.ID][]string) error
ForceNetworkTransfer 强制触发网络传输 向目标节点传输文件片段,支持重试机制
func (*UploadTask) ForceNodeDispatch ¶
func (t *UploadTask) ForceNodeDispatch() error
ForceNodeDispatch 强制触发节点分发 以节点为单位从队列中读取文件片段进行分发,如果通道已满则先清空再写入
func (*UploadTask) ForcePause ¶
func (t *UploadTask) ForcePause() error
ForcePause 强制触发任务暂停 暂停当前上传任务,如果通道已满则先清空再写入
func (*UploadTask) ForceSegmentProcess ¶
func (t *UploadTask) ForceSegmentProcess() error
ForceSegmentProcess 强制触发片段处理 将文件片段整合并写入队列,如果通道已满则先清空再写入
func (*UploadTask) ForceSegmentVerify ¶
func (t *UploadTask) ForceSegmentVerify() error
ForceSegmentVerify 强制触发片段验证 验证已传输片段的完整性,如果通道已满则先清空再写入
func (*UploadTask) GetProgress ¶
func (t *UploadTask) GetProgress() (int64, error)
GetProgress 计算并返回当前上传进度 返回值:
- int64: 返回0-100之间的进度值
func (*UploadTask) GetShardCounts ¶
func (t *UploadTask) GetShardCounts() (totalShards, parityShards int64, err error)
GetShardCounts 返回文件的总分片数和冗余分片数 该方法统计文件的分片信息,包括总分片数和冗余分片数
返回值:
- totalShards: int64 总分片数
- parityShards: int64 冗余分片数
- error: 如果获取分片数失败,返回相应的错误信息
func (*UploadTask) GetTotalSize ¶
func (t *UploadTask) GetTotalSize() (int64, error)
GetTotalSize 返回文件大小加上奇偶校验片段的总大小 该方法计算文件的总大小,包括原始文件和奇偶校验片段
返回值:
- int64: 文件总大小(包括原始文件大小和奇偶校验片段大小)
- error: 如果计算过程中发生错误,返回相应的错误信息
func (*UploadTask) Host ¶
func (t *UploadTask) Host() host.Host
Host 获取网络主机实例 返回值:
- host.Host: 网络主机实例
func (*UploadTask) NodePubSub ¶
func (t *UploadTask) NodePubSub() *pubsub.NodePubSub
NodePubSub 获取存储网络 返回值:
- *pubsub.NodePubSub: 发布订阅系统
func (*UploadTask) NotifySegmentStatus ¶
func (t *UploadTask) NotifySegmentStatus(status *pb.UploadChan)
NotifySegmentStatus 通知片段状态更新 向外部通知文件片段的处理状态,超时后记录警告日志
func (*UploadTask) NotifyTaskError ¶
func (t *UploadTask) NotifyTaskError(err error)
NotifyTaskError 通知任务错误 向外部通知任务执行过程中的错误,超时后记录警告日志
func (*UploadTask) Options ¶
func (t *UploadTask) Options() *fscfg.Options
Options 获取文件存储选项配置 返回值:
- *fscfg.Options: 文件存储选项配置
func (*UploadTask) Pause ¶
func (t *UploadTask) Pause() error
Pause 暂停上传任务 该方法用于暂停正在进行的上传任务
返回值:
- error: 如果暂停过程中发生错误,返回相应的错误信息
func (*UploadTask) RoutingTable ¶
func (t *UploadTask) RoutingTable() *kbucket.RoutingTable
RoutingTable 获取端实例 返回值:
- *kbucket.RoutingTable : 路由表实例
func (*UploadTask) Scheme ¶
func (t *UploadTask) Scheme() *shamir.ShamirScheme
Scheme 返回 Shamir 秘密共享方案 返回值:
- *shamir.ShamirScheme: Shamir 秘密共享方案
func (*UploadTask) Start ¶
func (t *UploadTask) Start() error
Start 开始或恢复上传任务 该方法会启动一个goroutine来处理上传任务的各种事件
返回值:
- error: 如果开始上传过程中发生错误,返回相应的错误信息
func (*UploadTask) TaskID ¶
func (t *UploadTask) TaskID() string
TaskID 获取任务ID 返回值:
- string: 任务的唯一标识符
Source Files ¶
- file.go
- file_meta.go
- file_security.go
- file_segment.go
- manager.go
- manager_channel.go
- manager_events.go
- network_stream.go
- pubsub.go
- pubsub_local.go
- pubsub_net.go
- segment_status.go
- serve.go
- store_storage_segment.go
- store_upload_asset.go
- store_upload_segment.go
- strore_upload_file.go
- task.go
- task_channel.go
- task_segment.go
- task_serve.go
- task_status.go