uploads

package
v2.0.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 11, 2025 License: MIT Imports: 38 Imported by: 0

Documentation

Overview

Package uploads 提供文件上传相关的功能实现

Package uploads 提供文件上传相关的功能实现

Package uploads 提供了文件上传相关的功能

Package uploads 实现文件上传相关功能

Package uploads 提供文件上传相关的功能实现

Index

Constants

View Source
const (
	MaxSessions = 3 // 允许的最大并发会话数
)

Variables

View Source
var (
	// StreamSendingToNetworkProtocol 定义了发送任务到网络的协议标识符
	StreamSendingToNetworkProtocol = fmt.Sprintf("defs@stream/sending/network/%s", version)
	// StreamForwardToNetworkProtocol 定义了转发任务到网络的协议标识符
	StreamForwardToNetworkProtocol = fmt.Sprintf("defs@stream/forward/network/%s", version)
)

AllowedTopics 定义了系统支持的所有主题列表

Functions

func CalculateShards

func CalculateShards(size int64, opt *fscfg.Options) (int64, int64, error)

CalculateShards 根据文件大小和存储选项计算数据分片和奇偶校验分片的数量 参数:

  • size: int64 文件大小,单位为字节
  • opt: *fscfg.Options 存储选项,包含存储模式和其他参数

返回值:

  • int64: 数据分片数
  • int64: 奇偶校验分片数
  • error: 如果发生错误,返回错误信息

func CreateFileAssetRecord

func CreateFileAssetRecord(db *badgerhold.Store, fileRecord *pb.UploadFileRecord) error

CreateFileAssetRecord 创建一个新的文件资产记录 参数:

  • db: *badgerhold.Store 数据库实例
  • fileRecord: *pb.UploadFileRecord 上传文件记录信息

返回值:

  • error: 如果创建过程中发生错误,返回错误信息;否则返回 nil

func CreateUploadFileRecord

func CreateUploadFileRecord(
	db *badgerhold.Store,
	taskID string,
	fileID string,
	name string,
	fileMeta *pb.FileMeta,
	fileSecurity *pb.FileSecurity,
	status pb.UploadStatus,
) error

CreateUploadFileRecord 创建上传文件记录并保存到数据库 该方法用于初始化一个新的文件上传记录,并将其保存到持久化存储中

参数:

  • db: 上传文件数据库存储接口
  • taskID: 上传任务的唯一标识
  • fileID: 文件的唯一标识
  • name: 文件名称/路径
  • fileMeta: 文件的元数据信息
  • fileSecurity: 文件的安全相关信息
  • status: 文件上传状态

返回值:

  • error: 如果创建或存储过程中发生错误则返回相应错误,否则返回 nil

func CreateUploadSegmentRecord

func CreateUploadSegmentRecord(
	db *badgerhold.Store,
	taskID string,
	segmentID string,
	index int64,
	size int64,
	checksum uint32,
	content []byte,
	isRsCodes bool,
	status pb.SegmentUploadStatus,
) error

CreateUploadSegmentRecord 创建上传分片记录并保存到数据库 该方法用于初始化一个新的分片上传记录,并将其保存到持久化存储中

参数:

  • db: 数据库存储接口
  • taskID: 上传任务的唯一标识
  • segmentID: 分片的唯一标识
  • index: 分片在文件中的索引位置
  • size: 分片大小
  • checksum: 分片的CRC32校验和
  • content: 分片的加密后内容
  • isRsCodes: 是否为纠删码冗余分片
  • status: 分片的上传状态

返回值:

  • error: 如果创建或存储过程中发生错误则返回相应错误,否则返回 nil

func GetUploadProgress

func GetUploadProgress(db *badgerhold.Store, taskID string) (*bitset.BitSet, error)

GetUploadProgress 获取上传任务的进度信息 参数:

  • db: *badgerhold.Store 数据库存储接口
  • taskID: string 上传任务的唯一标识

返回值:

  • *bitset.BitSet: 表示上传进度的位图
  • error: 如果获取过程中发生错误则返回相应错误,否则返回 nil

func HandleDeleteFileSegmentRequestPubSub

func HandleDeleteFileSegmentRequestPubSub(
	ctx context.Context,
	opt *fscfg.Options,
	db *database.DB,
	fs afero.Afero,
	nps *pubsub.NodePubSub,
	upload *UploadManager,
	res *pubsub.Message,
) error

HandleDeleteFileSegmentRequestPubSub 处理删除文件切片请求主题 参数:

  • ctx: context.Context 上下文对象,用于控制请求的生命周期
  • opt: *fscfg.Options 配置选项,包含系统配置信息
  • db: *database.DB 数据库实例,用于数据持久化
  • fs: afero.Afero 文件系统实例,用于文件操作
  • nps: *pubsub.NodePubSub 发布订阅系统,用于消息通信
  • upload: *UploadManager 上传管理器实例,处理上传相关逻辑
  • res: *pubsub.Message 接收到的消息,包含请求数据

返回值:

  • error 返回处理过程中的错误信息

功能:

  • 处理删除文件切片信息的请求
  • 解析请求数据并删除对应的文件切片存储记录

func InitializeUploadManager

func InitializeUploadManager(lc fx.Lifecycle, input InitializeUploadManagerInput) error

InitializeUploadManager 初始化 UploadManager 并设置相关的生命周期钩子 参数:

  • lc: fx.Lifecycle 用于管理应用生命周期的对象
  • input: InitializeUploadManagerInput 包含初始化 UploadManager 所需的输入参数

返回值:

  • error 如果初始化过程中发生错误,则返回相应的错误信息

func NewFileMeta

func NewFileMeta(f *os.File) (*pb.FileMeta, error)

NewFileMeta 创建并初始化一个新的 FileMeta 实例,提供文件的基本元数据信息 参数:

  • file: *os.File 文件对象,用于读取文件信息

返回值:

  • *pb.FileMeta: 新创建的 FileMeta 实例,包含文件的基本元数据
  • error: 如果发生错误,返回错误信息

func NewFileSecurity

func NewFileSecurity(fileID string, privKey *ecdsa.PrivateKey, secret []byte) (*pb.FileSecurity, error)

NewFileSecurity 创建并初始化一个新的FileSecurity实例,封装了文件的安全和权限相关的信息 参数:

  • fileID: string 文件ID
  • privKey: *ecdsa.PrivateKey 私钥
  • secret: []byte 需要共享的秘密

返回:

  • *pb.FileSecurity: 包含文件安全信息的结构体
  • error: 错误信息

func NewFileSegment

func NewFileSegment(db *badgerhold.Store, taskID string, fileID string, file *os.File, pk []byte, dataShards, parityShards int64) error

NewFileSegment 创建并初始化一个新的 FileSegment 实例,提供分片的详细信息及其上传状态。 参数:

  • db: *badgerhold.Store 数据库实例
  • taskID: string 任务ID,用于标识上传任务
  • fileID: string 文件ID,用于标识文件
  • file: *os.File 待处理的文件对象
  • pk: []byte 用于加密的公钥
  • dataShards: int64 数据分片数量
  • parityShards: int64 奇偶校验分片数量

返回值:

  • error: 如果发生错误,返回错误信息

func NewUploadFile

func NewUploadFile(opt *fscfg.Options, db *database.DB, scheme *shamir.ShamirScheme,
	name string,
	ownerPriv *ecdsa.PrivateKey,
	onSegmentsReady func(taskID string),
	taskStatus *SegmentStatus,
	errChan chan error,
) (*pb.UploadOperationInfo, error)

NewUploadFile 创建并初始化一个新的 UploadFile 实例 参数:

  • opt: *fscfg.Options 存储选项
  • db: *database.DB 数据库实例
  • scheme: *shamir.ShamirScheme Shamir 秘密共享方案
  • name: string 文件名
  • ownerPriv: *ecdsa.PrivateKey 文件所有者的私钥
  • onSegmentsReady: func(taskID string) 完成回调函数

返回值:

  • *pb.UploadOperationInfo: 上传操作信息
  • error: 错误信息

func ProcessFileSegments

func ProcessFileSegments(
	db *badgerhold.Store,
	taskID string,
	fileID string,
	file *os.File,
	pk []byte,
	dataShards,
	parityShards int64,
	onComplete func(),
)

ProcessFileSegments 异步处理文件分片 该方法会在后台进行文件的分片、编码和存储操作

参数:

  • db: *badgerhold.Store 数据库实例
  • taskID: string 任务ID
  • fileID: string 文件ID
  • file: *os.File 文件对象
  • pk: []byte 用于加密的公钥
  • dataShards: int64 数据分片数
  • parityShards: int64 奇偶校验分片数
  • onComplete: func() 完成回调函数

func QueryFileAssets

func QueryFileAssets(db *badgerhold.Store, pubkeyHash []byte, start, pageSize int, query string, options ...database.QueryOption) ([]*pb.FileAssetRecord, uint64, int, int, error)

QueryFileAssets 查询文件资产记录 参数:

  • db: BadgerDB存储实例,用于数据持久化
  • pubkeyHash: 所有者的公钥哈希,用于权限验证
  • start: 起始记录索引
  • pageSize: 每页的最大记录数
  • query: 查询条件字符串
  • options: 额外的查询选项,用于设置查询条件

返回值:

  • []*pb.FileAssetRecord: 查询到的文件资产记录切片
  • uint64: 符合查询条件的总记录数
  • int: 当前页数
  • int: 每页的最大记录数
  • error: 如果查询过程中发生错误,返回错误信息

func RegisterUploadPubsubProtocol

func RegisterUploadPubsubProtocol(lc fx.Lifecycle, input RegisterPubsubProtocolInput)

RegisterUploadPubsubProtocol 注册所有上传相关的PubSub协议处理器 参数:

  • lc: fx.Lifecycle 应用生命周期管理器
  • input: RegisterPubsubProtocolInput 注册所需的输入参数

返回值:

  • error: 如果注册过程中出现错误,返回相应的错误信息

func RegisterUploadStreamProtocol

func RegisterUploadStreamProtocol(lc fx.Lifecycle, input RegisterStreamProtocolInput)

RegisterUploadStreamProtocol 注册上传流协议 参数:

  • lc: fx.Lifecycle 类型,用于管理组件的生命周期
  • input: RegisterStreamProtocolInput 类型,包含注册所需的所有依赖项

返回值: 无

func RequestDeleteFileSegmentPubSub

func RequestDeleteFileSegmentPubSub(
	ctx context.Context,
	nps *pubsub.NodePubSub,
	fileID string,
	pubkeyHash []byte,
) error

RequestDeleteFileSegmentPubSub 发送删除文件片段请求 参数:

  • ctx: 上下文,用于控制请求的生命周期
  • nps: 发布订阅系统,用于节点之间的消息传递
  • fileID: 文件唯一标识
  • pubkeyHash: 所有者的公钥哈希

返回值:

  • error: 如果请求过程中出现错误,返回相应的错误信息

func UpdateSegmentUploadInfo

func UpdateSegmentUploadInfo(
	db *badgerhold.Store,
	taskID string,
	index int64,
	status pb.SegmentUploadStatus,
) error

UpdateSegmentUploadInfo 更新文件片段的上传信息 该方法用于更新文件片段的上传状态、节点ID和上传时间

参数:

  • db: *badgerhold.Store 数据库存储接口
  • taskID: string 上传任务的唯一标识
  • index: int64 片段索引
  • status: pb.SegmentUploadStatus 片段上传状态

返回值:

  • error: 如果更新过程中发生错误则返回相应错误,否则返回 nil

func UpdateUploadFileHashTable

func UpdateUploadFileHashTable(db *badgerhold.Store, taskID string, sliceTable map[int64]*pb.HashTable) error

UpdateUploadFileHashTable 更新上传文件的哈希表 该方法仅在文件处于编码状态时更新哈希表,并将状态改为待上传 参数:

  • db: *badgerhold.Store 数据库存储接口
  • taskID: string 上传任务的唯一标识
  • sliceTable: map[int64]*pb.HashTable 分片哈希表

返回值:

  • error: 如果更新过程中发生错误则返回相应错误,否则返回 nil

func UpdateUploadFileStatus

func UpdateUploadFileStatus(db *badgerhold.Store, taskID string, status pb.UploadStatus) error

UpdateUploadFileStatus 更新上传文件的状态 参数:

  • db: *badgerhold.Store 数据库存储接口
  • taskID: string 上传任务的唯一标识
  • status: pb.UploadStatus 新的文件状态

返回值:

  • error: 如果更新过程中发生错误则返回相应错误,否则返回 nil

func ValidateAndUpdateUploadStatus

func ValidateAndUpdateUploadStatus(db *badgerhold.Store, fileRecord *pb.UploadFileRecord, progress bitset.BitSet) error

ValidateAndUpdateUploadStatus 校验并更新上传状态 该方法用于开始上传前的状态校验,会根据实际进度强制更新任务状态: 1. 如果所有分片已完成,则更新为已完成状态并返回错误 2. 如果还有未完成分片,则更新为上传中状态

参数:

  • db: *badgerhold.Store 数据库存储接口
  • fileRecord: *pb.UploadFileRecord 文件记录
  • progress: bitset.BitSet 上传进度位图

返回值:

  • error: 如果所有分片已完成或发生其他错误则返回错误信息,否则返回 nil

Types

type InitializeUploadManagerInput

type InitializeUploadManagerInput struct {
	fx.In

	Upload *UploadManager
}

InitializeUploadManagerInput 定义了初始化 UploadManager 所需的输入参数

type NewUploadManagerInput

type NewUploadManagerInput struct {
	fx.In

	Ctx          context.Context       // 全局上下文,用于管理整个应用的生命周期和取消操作
	Opt          *fscfg.Options        // 文件存储选项配置,包含各种系统设置和参数
	DB           *database.DB          // 持久化存储,用于本地数据的存储和检索
	FS           afero.Afero           // 文件文件系统接口,提供跨平台的文件操作能力统接口
	Host         host.Host             // libp2p网络主机实例
	RoutingTable *kbucket.RoutingTable // 路由表,用于管理对等节点和路由
	NPS          *pubsub.NodePubSub    // 发布订阅系统,用于节点之间的消息传递
}

NewUploadManagerInput 定义了创建 UploadManager 所需的输入参数

type NewUploadManagerOutput

type NewUploadManagerOutput struct {
	fx.Out

	Upload *UploadManager // 上传管理器,用于处理和管理文件上传任务,包括任务调度、状态跟踪等
}

NewUploadManagerOutput 定义了 NewUploadManager 函数的输出

func NewUploadManager

func NewUploadManager(lc fx.Lifecycle, input NewUploadManagerInput) (out NewUploadManagerOutput, err error)

NewUploadManager 创建并初始化一个新的 UploadManager 实例 参数:

  • lc: fx.Lifecycle 用于管理应用生命周期的对象
  • input: NewUploadManagerInput 包含创建 UploadManager 所需的输入参数

返回值:

  • out: NewUploadManagerOutput 包含创建的 UploadManager 实例
  • err: error 如果创建过程中发生错误,则返回相应的错误信息

type RegisterPubsubProtocolInput

type RegisterPubsubProtocolInput struct {
	fx.In

	Ctx    context.Context    // 全局上下文,用于管理整个应用的生命周期
	Opt    *fscfg.Options     // 文件存储配置选项
	DB     *database.DB       // 本地数据存储实例
	FS     afero.Afero        // 文件系统接口
	Host   host.Host          // libp2p网络主机实例
	NPS    *pubsub.NodePubSub // 发布订阅系统
	Upload *UploadManager     // 上传管理器实例
}

RegisterPubsubProtocolInput 定义了注册PubsubProtocol所需的输入参数

type RegisterStreamProtocolInput

type RegisterStreamProtocolInput struct {
	fx.In

	Ctx          context.Context       // 全局上下文,用于管理整个应用的生命周期和取消操作
	Opt          *fscfg.Options        // 文件存储选项配置,包含各种系统设置和参数
	DB           *database.DB          // 持久化存储,用于本地数据的存储和检索
	FS           afero.Afero           // 文件系统接口,提供跨平台的文件操作能力
	Host         host.Host             // libp2p网络主机实例
	RoutingTable *kbucket.RoutingTable // 路由表,用于管理对等节点和路由
	NPS          *pubsub.NodePubSub    // 发布订阅系统,用于节点之间的消息传递
	Upload       *UploadManager        // 上传管理器,用于处理和管理文件上传任务,包括任务调度、状态跟踪等
}

RegisterStreamProtocolInput 定义了注册流协议所需的输入参数

type SegmentStatus

type SegmentStatus struct {
	// contains filtered or unexported fields
}

SegmentStatus 用于跟踪文件是否准备就绪的状态

func NewSegmentStatus

func NewSegmentStatus(locker sync.Locker) *SegmentStatus

NewSegmentStatus 初始化并返回一个 SegmentStatus 实例 参数:

  • locker: sync.Locker 用于同步的互斥锁

返回值:

  • *SegmentStatus: 初始化后的 SegmentStatus 实例

func (*SegmentStatus) GetState

func (s *SegmentStatus) GetState() bool

GetState 获取当前状态 返回值:

  • bool: 当前的状态值

func (*SegmentStatus) SetState

func (s *SegmentStatus) SetState(state bool)

SetState 设置状态,并通知所有等待的goroutine 参数:

  • state: bool 要设置的新状态

func (*SegmentStatus) WaitForSpecificState

func (s *SegmentStatus) WaitForSpecificState(targetState bool)

WaitForSpecificState 阻塞当前goroutine,直到达到指定状态 参数:

  • targetState: bool 要等待的目标状态

func (*SegmentStatus) WaitForStateChange

func (s *SegmentStatus) WaitForStateChange()

WaitForStateChange 阻塞当前goroutine,直到状态发生变化

type StreamProtocol

type StreamProtocol struct {
	// contains filtered or unexported fields
}

StreamProtocol 定义了流协议的结构体

type Topic

type Topic int

Topic 定义了允许的主题类型

const (
	PubSubDeleteFileSegmentRequestTopic Topic = iota // 删除文件切片请求主题
)

定义主题类型常量

func (Topic) String

func (t Topic) String() string

String 将Topic转换为对应的字符串表示 返回值:

  • string: 主题对应的字符串

type UploadManager

type UploadManager struct {
	// contains filtered or unexported fields
}

UploadManager 管理所有上传任务,提供文件上传的统一入口和管理功能 它负责协调上传任务的执行,管理任务的生命周期,以及通知任务的状态更新和错误事件

func (*UploadManager) Cancel

func (m *UploadManager) Cancel() context.CancelFunc

Cancel 获取任务的取消函数 返回值:

  • context.CancelFunc: 任务的取消函数

func (*UploadManager) CancelUpload

func (m *UploadManager) CancelUpload(taskID string) error

CancelUpload 取消上传 参数:

  • taskID: 要取消的任务ID,用于标识具体的上传任务

返回值:

  • error: 如果取消过程中发生错误,返回错误信息;否则返回 nil

func (*UploadManager) Context

func (m *UploadManager) Context() context.Context

Context 获取任务的上下文 返回值:

  • context.Context: 任务的上下文对象

func (*UploadManager) DB

func (m *UploadManager) DB() *database.DB

DB 返回数据库存储 返回值:

  • *badgerhold.Store: 数据库存储

func (*UploadManager) DeleteUpload

func (m *UploadManager) DeleteUpload(taskID string) error

DeleteUpload 删除上传任务 参数:

  • taskID: 要删除的任务ID,用于标识具体的上传任务

返回值:

  • error: 如果删除过程中发生错误,返回错误信息;否则返回 nil

func (*UploadManager) ErrChan

func (m *UploadManager) ErrChan() <-chan error

ErrChan 返回错误通知通道 返回值:

  • <-chan error: 只读的通道,用于接收错误通知

func (*UploadManager) FS

func (m *UploadManager) FS() afero.Afero

FS 返回文件系统接口 返回值:

  • afero.Afero: 文件系统接口

func (*UploadManager) GetAllUploadFilesSummaries

func (m *UploadManager) GetAllUploadFilesSummaries() ([]*pb.UploadFilesSummaries, error)

GetAllUploadFilesSummaries 获取所有上传记录的概要信息 返回值:

  • []*pb.UploadFilesSummaries: 包含所有上传记录概要信息的切片,每个元素包含任务ID、文件名、大小等信息
  • error: 如果获取过程中发生错误,返回错误信息;否则返回 nil

func (*UploadManager) GetErrChan

func (m *UploadManager) GetErrChan() <-chan error

GetErrChan 返回错误通道 返回值:

  • <-chan error: 只读的通道,用于接收错误信息

func (*UploadManager) Host

func (m *UploadManager) Host() host.Host

Host 获取网络主机实例 返回值:

  • host.Host: 网络主机实例

func (*UploadManager) IsMaxConcurrencyReached

func (m *UploadManager) IsMaxConcurrencyReached() bool

IsMaxConcurrencyReached 检查是否达到上传允许的最大并发数 返回值:

  • bool: 如果达到最大并发数返回 true,否则返回 false

func (*UploadManager) LoadExistingTasks

func (m *UploadManager) LoadExistingTasks() error

LoadExistingTasks 从数据库加载现有的上传任务 返回值:

  • error: 如果加载过程中发生错误,返回相应的错误信息

func (*UploadManager) ManagerChannelEvents

func (m *UploadManager) ManagerChannelEvents()

ManagerChannelEvents 处理上传管理器的通道事件 此方法启动一个新的goroutine来持续监听和处理各种通道事件

func (*UploadManager) NewUpload

func (m *UploadManager) NewUpload(
	path string,
	ownerPriv *ecdsa.PrivateKey,
	immediate ...bool,
) (*pb.UploadOperationInfo, error)

NewUpload 创建一个新的上传任务 参数:

  • path: 文件路径,要上传的文件的完整路径
  • ownerPriv: 所有者的私钥,用于签名和权限验证
  • immediate: 是否立即执行上传(可选参数,默认为 false)

返回:

  • *pb.UploadOperationInfo: 上传操作信息,包含任务ID等信息
  • error: 错误信息,如果创建失败则返回错误原因

func (*UploadManager) NodePubSub

func (m *UploadManager) NodePubSub() *pubsub.NodePubSub

NodePubSub 返回发布订阅系统 返回值:

  • *pubsub.NodePubSub: 发布订阅系统

func (*UploadManager) NotifyError

func (m *UploadManager) NotifyError(err string, args ...interface{})

NotifyError 通知错误信息 参数:

  • m: 上传管理器对象
  • err: 要通知的错误信息
  • args: 格式化参数

func (*UploadManager) NotifyStatus

func (m *UploadManager) NotifyStatus(status *pb.UploadChan)

NotifyStatus 通知上传状态 参数:

  • m: 上传管理器对象
  • status: 要通知的状态信息

func (*UploadManager) Options

func (m *UploadManager) Options() *fscfg.Options

Options 返回文件存储选项配置 返回值:

  • *fscfg.Options: 文件存储选项配置

func (*UploadManager) PauseUpload

func (m *UploadManager) PauseUpload(taskID string) error

PauseUpload 暂停上传 参数:

  • taskID: 要暂停的任务ID,用于标识具体的上传任务

返回值:

  • error: 如果暂停过程中发生错误,返回错误信息;否则返回 nil

func (*UploadManager) ResumeUpload

func (m *UploadManager) ResumeUpload(taskID string) error

ResumeUpload 继续上传 参数:

  • taskID: 要继续上传的任务ID,用于标识具体的上传任务

返回值:

  • error: 如果继续上传过程中发生错误,返回相应的错误信息

func (*UploadManager) RoutingTable

func (m *UploadManager) RoutingTable() *kbucket.RoutingTable

RoutingTable 获取客户端实例 返回值:

  • *kbucket.RoutingTable : 路由表实例

func (*UploadManager) StatusChan

func (m *UploadManager) StatusChan() <-chan *pb.UploadChan

StatusChan 返回上传状态和进度通知通道 返回值:

  • <-chan *pb.UploadChan: 只读的通道,用于接收上传状态和进度通知

func (*UploadManager) TriggerForward

func (m *UploadManager) TriggerForward(payload *pb.FileSegmentStorage)

TriggerForward 触发转发操作 参数:

  • payload: *pb.FileSegmentStorage 要转发的文件段存储信息

func (*UploadManager) TriggerUpload

func (m *UploadManager) TriggerUpload(taskID string) error

TriggerUpload 触发上传操作 参数:

  • taskID: 要上传的任务ID,用于标识具体的上传任务

返回值:

  • error: 如果触发过程中发生错误,返回错误信息;否则返回 nil

type UploadTask

type UploadTask struct {
	// contains filtered or unexported fields
}

UploadTask 描述一个文件上传任务,包括文件信息和上传状态

func NewUploadTask

func NewUploadTask(ctx context.Context, opt *fscfg.Options, db *database.DB, fs afero.Afero,
	host host.Host, routingTable *kbucket.RoutingTable, nps *pubsub.NodePubSub,
	scheme *shamir.ShamirScheme, statusChan chan *pb.UploadChan, errChan chan error, taskID string,
) *UploadTask

NewUploadTask 创建并初始化一个新的文件上传任务实例 参数:

  • ctx: context.Context 用于管理任务生命周期的上下文
  • opt: *fscfg.Options 文件存储配置选项
  • db: *database.DB 数据库实例
  • fs: afero.Afero 文件系统接口
  • host: host.Host libp2p网络主机实例
  • routingTable: *kbucket.RoutingTable 路由表
  • nps: *pubsub.NodePubSub 发布订阅系统
  • scheme: *shamir.ShamirScheme Shamir秘密共享方案实例
  • statusChan: chan *pb.UploadChan 状态更新通道
  • taskID: string 任务唯一标识符

返回值:

  • *UploadTask: 创建的上传任务实例

func (*UploadTask) Cancel

func (t *UploadTask) Cancel() error

Cancel 取消上传任务 返回值:

  • error: 如果取消过程中发生错误,返回相应的错误信息

func (*UploadTask) Cleanup

func (t *UploadTask) Cleanup()

Cleanup 清理任务资源

func (*UploadTask) Close

func (t *UploadTask) Close()

Close 关闭任务并释放资源

func (*UploadTask) Context

func (t *UploadTask) Context() context.Context

Context 获取任务的上下文 返回值:

  • context.Context: 任务的上下文对象

func (*UploadTask) DB

func (t *UploadTask) DB() *badgerhold.Store

DB 获取持久化存储 返回值:

  • *badgerhold.Store: 持久化存储实例

func (*UploadTask) Delete

func (t *UploadTask) Delete() error

Delete 删除上传任务相关的数据 该方法用于删除上传任务的所有相关数据

返回值:

  • error: 如果删除过程中发生错误,返回相应的错误信息

func (*UploadTask) FS

func (t *UploadTask) FS() afero.Afero

FS 返回文件系统接口 返回值:

  • afero.Afero: 文件系统接口

func (*UploadTask) ForceCancel

func (t *UploadTask) ForceCancel() error

ForceCancel 强制触发任务取消 取消当前上传任务,如果通道已满则先清空再写入

func (*UploadTask) ForceDelete

func (t *UploadTask) ForceDelete() error

ForceDelete 强制触发任务删除 删除当前上传任务及相关资源,如果通道已满则先清空再写入

func (*UploadTask) ForceFileFinalize

func (t *UploadTask) ForceFileFinalize() error

ForceFileFinalize 强制触发文件完成处理 处理文件上传完成后的操作,如果通道已满则先清空再写入

func (*UploadTask) ForceNetworkTransfer

func (t *UploadTask) ForceNetworkTransfer(peerSegments map[peer.ID][]string) error

ForceNetworkTransfer 强制触发网络传输 向目标节点传输文件片段,支持重试机制

func (*UploadTask) ForceNodeDispatch

func (t *UploadTask) ForceNodeDispatch() error

ForceNodeDispatch 强制触发节点分发 以节点为单位从队列中读取文件片段进行分发,如果通道已满则先清空再写入

func (*UploadTask) ForcePause

func (t *UploadTask) ForcePause() error

ForcePause 强制触发任务暂停 暂停当前上传任务,如果通道已满则先清空再写入

func (*UploadTask) ForceSegmentProcess

func (t *UploadTask) ForceSegmentProcess() error

ForceSegmentProcess 强制触发片段处理 将文件片段整合并写入队列,如果通道已满则先清空再写入

func (*UploadTask) ForceSegmentVerify

func (t *UploadTask) ForceSegmentVerify() error

ForceSegmentVerify 强制触发片段验证 验证已传输片段的完整性,如果通道已满则先清空再写入

func (*UploadTask) GetProgress

func (t *UploadTask) GetProgress() (int64, error)

GetProgress 计算并返回当前上传进度 返回值:

  • int64: 返回0-100之间的进度值

func (*UploadTask) GetShardCounts

func (t *UploadTask) GetShardCounts() (totalShards, parityShards int64, err error)

GetShardCounts 返回文件的总分片数和冗余分片数 该方法统计文件的分片信息,包括总分片数和冗余分片数

返回值:

  • totalShards: int64 总分片数
  • parityShards: int64 冗余分片数
  • error: 如果获取分片数失败,返回相应的错误信息

func (*UploadTask) GetTotalSize

func (t *UploadTask) GetTotalSize() (int64, error)

GetTotalSize 返回文件大小加上奇偶校验片段的总大小 该方法计算文件的总大小,包括原始文件和奇偶校验片段

返回值:

  • int64: 文件总大小(包括原始文件大小和奇偶校验片段大小)
  • error: 如果计算过程中发生错误,返回相应的错误信息

func (*UploadTask) Host

func (t *UploadTask) Host() host.Host

Host 获取网络主机实例 返回值:

  • host.Host: 网络主机实例

func (*UploadTask) NodePubSub

func (t *UploadTask) NodePubSub() *pubsub.NodePubSub

NodePubSub 获取存储网络 返回值:

  • *pubsub.NodePubSub: 发布订阅系统

func (*UploadTask) NotifySegmentStatus

func (t *UploadTask) NotifySegmentStatus(status *pb.UploadChan)

NotifySegmentStatus 通知片段状态更新 向外部通知文件片段的处理状态,超时后记录警告日志

func (*UploadTask) NotifyTaskError

func (t *UploadTask) NotifyTaskError(err error)

NotifyTaskError 通知任务错误 向外部通知任务执行过程中的错误,超时后记录警告日志

func (*UploadTask) Options

func (t *UploadTask) Options() *fscfg.Options

Options 获取文件存储选项配置 返回值:

  • *fscfg.Options: 文件存储选项配置

func (*UploadTask) Pause

func (t *UploadTask) Pause() error

Pause 暂停上传任务 该方法用于暂停正在进行的上传任务

返回值:

  • error: 如果暂停过程中发生错误,返回相应的错误信息

func (*UploadTask) RoutingTable

func (t *UploadTask) RoutingTable() *kbucket.RoutingTable

RoutingTable 获取端实例 返回值:

  • *kbucket.RoutingTable : 路由表实例

func (*UploadTask) Scheme

func (t *UploadTask) Scheme() *shamir.ShamirScheme

Scheme 返回 Shamir 秘密共享方案 返回值:

  • *shamir.ShamirScheme: Shamir 秘密共享方案

func (*UploadTask) Start

func (t *UploadTask) Start() error

Start 开始或恢复上传任务 该方法会启动一个goroutine来处理上传任务的各种事件

返回值:

  • error: 如果开始上传过程中发生错误,返回相应的错误信息

func (*UploadTask) TaskID

func (t *UploadTask) TaskID() string

TaskID 获取任务ID 返回值:

  • string: 任务的唯一标识符

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL