downloader

package
v0.0.0-...-f2cb11c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 10, 2023 License: Apache-2.0 Imports: 29 Imported by: 2

Documentation

Overview

Package downloader 多线程下载器, 重构版

Index

Constants

View Source
const (
	// InstanceStateStorageFormatJSON json 格式
	InstanceStateStorageFormatJSON = iota
	// InstanceStateStorageFormatProto3 protobuf 格式
	InstanceStateStorageFormatProto3
)
View Source
const (
	//CacheSize 默认的下载缓存
	CacheSize = 8192
)
View Source
const (
	// DefaultAcceptRanges 默认的 Accept-Ranges
	DefaultAcceptRanges = "bytes"
)

Variables

View Source
var (
	// ContentRangeRE Content-Range 正则
	ContentRangeRE = regexp.MustCompile(`^.*? \d*?-\d*?/(\d*?)$`)
)
View Source
var (
	//ErrNoWokers no workers
	ErrNoWokers = errors.New("no workers")
)
View Source
var (
	// MinParallelSize 单个线程最小的数据量
	MinParallelSize int64 = 128 * 1024 // 128kb
)

Functions

func DefaultDURLCheckFunc

func DefaultDURLCheckFunc(client *requester.HTTPClient, durl string) (contentLength int64, resp *http.Response, err error)

DefaultDURLCheckFunc 默认的 DURLCheckFunc

func DefaultLoadBalancerCompareFunc

func DefaultLoadBalancerCompareFunc(info map[string]string, subResp *http.Response) bool

DefaultLoadBalancerCompareFunc 检测负载均衡的服务器是否一致

func DoDownload

func DoDownload(durl string, savePath string, cfg *Config)

DoDownload 执行下载

func GetFileName

func GetFileName(uri string, client *requester.HTTPClient) (filename string, err error)

GetFileName 获取文件名

func GetStatusText

func GetStatusText(sc StatusCode) string

GetStatusText 根据状态码获取状态信息

func ParseContentRange

func ParseContentRange(contentRange string) (contentLength int64)

ParseContentRange 解析Content-Range

func RandomNumber

func RandomNumber(min, max int) int

RandomNumber 生成指定区间随机数

Types

type ByLeftDesc

type ByLeftDesc struct {
	WorkerList
}

ByLeftDesc 根据剩余下载量倒序排序

func (ByLeftDesc) Less

func (wl ByLeftDesc) Less(i, j int) bool

Less 实现倒序

type Config

type Config struct {
	Mode                       transfer.RangeGenMode      // 下载Range分配模式
	MaxParallel                int                        // 最大下载并发量
	CacheSize                  int                        // 下载缓冲
	BlockSize                  int64                      // 每个Range区块的大小, RangeGenMode 为 RangeGenMode2 时才有效
	MaxRate                    int64                      // 限制最大下载速度
	InstanceStateStorageFormat InstanceStateStorageFormat // 断点续传储存类型
	InstanceStatePath          string                     // 断点续传信息路径
	IsTest                     bool                       // 是否测试下载
	TryHTTP                    bool                       // 是否尝试使用 http 连接
}

Config 下载配置

func NewConfig

func NewConfig() *Config

NewConfig 返回默认配置

func (*Config) Copy

func (cfg *Config) Copy() *Config

Copy 拷贝新的配置

func (*Config) Fix

func (cfg *Config) Fix()

Fix 修复配置信息, 使其合法

type DURLCheckFunc

type DURLCheckFunc func(client *requester.HTTPClient, durl string) (contentLength int64, resp *http.Response, err error)

DURLCheckFunc 下载URL检测函数

type DownloadFirstInfo

type DownloadFirstInfo struct {
	ContentLength int64
	ContentMD5    string
	ContentCRC32  string
	AcceptRanges  string
	Referer       string
}

func NewDownloadFirstInfoByResp

func NewDownloadFirstInfoByResp(contentLength int64, resp *http.Response) (dfi *DownloadFirstInfo)

func (*DownloadFirstInfo) Compare

func (dfi *DownloadFirstInfo) Compare(n *DownloadFirstInfo) bool

func (*DownloadFirstInfo) ToMap

func (dfi *DownloadFirstInfo) ToMap() map[string]string

ToMap 转换为map

func (*DownloadFirstInfo) ToMapByReflect

func (dfi *DownloadFirstInfo) ToMapByReflect() map[string]string

ToMapByReflect 用reflect转换为map

type DownloadStatusFunc

type DownloadStatusFunc func(status transfer.DownloadStatuser, workersCallback func(RangeWorkerFunc))

DownloadStatusFunc 下载状态处理函数

type Downloader

type Downloader struct {
	// contains filtered or unexported fields
}

Downloader 下载

func NewDownloader

func NewDownloader(durl string, writer io.WriterAt, config *Config) (der *Downloader)

NewDownloader 初始化Downloader

func (*Downloader) AddLoadBalanceServer

func (der *Downloader) AddLoadBalanceServer(urls ...string)

AddLoadBalanceServer 增加负载均衡服务器

func (*Downloader) Cancel

func (der *Downloader) Cancel()

Cancel 取消

func (*Downloader) Execute

func (der *Downloader) Execute() error

Execute 开始任务

func (*Downloader) OnCancel

func (der *Downloader) OnCancel(onCancelEvent requester.Event)

OnCancel 设置取消下载事件

func (*Downloader) OnDownloadStatusEvent

func (der *Downloader) OnDownloadStatusEvent(f DownloadStatusFunc)

OnDownloadStatusEvent 设置状态处理函数

func (*Downloader) OnExecute

func (der *Downloader) OnExecute(onExecuteEvent requester.Event)

OnExecute 设置开始下载事件

func (*Downloader) OnFinish

func (der *Downloader) OnFinish(onFinishEvent requester.Event)

OnFinish 设置结束下载事件

func (*Downloader) OnPause

func (der *Downloader) OnPause(onPauseEvent requester.Event)

OnPause 设置暂停下载事件

func (*Downloader) OnResume

func (der *Downloader) OnResume(onResumeEvent requester.Event)

OnResume 设置恢复下载事件

func (*Downloader) OnSuccess

func (der *Downloader) OnSuccess(onSuccessEvent requester.Event)

OnSuccess 设置成功下载事件

func (*Downloader) Pause

func (der *Downloader) Pause()

Pause 暂停

func (*Downloader) Resume

func (der *Downloader) Resume()

Resume 恢复

func (*Downloader) SelectBlockSizeAndInitRangeGen

func (der *Downloader) SelectBlockSizeAndInitRangeGen(single bool, status *transfer.DownloadStatus, parallel int) (blockSize int64, initErr error)

SelectBlockSizeAndInitRangeGen 获取合适的 BlockSize, 和初始化 RangeGen

func (*Downloader) SelectCacheSize

func (der *Downloader) SelectCacheSize(confCacheSize int, blockSize int64) (cacheSize int)

SelectCacheSize 获取合适的 cacheSize

func (*Downloader) SelectParallel

func (der *Downloader) SelectParallel(single bool, maxParallel int, totalSize int64, instanceRangeList transfer.RangeList) (parallel int)

SelectParallel 获取合适的 parallel

func (*Downloader) SetClient

func (der *Downloader) SetClient(client *requester.HTTPClient)

SetClient 设置http客户端

func (*Downloader) SetDURLCheckFunc

func (der *Downloader) SetDURLCheckFunc(f DURLCheckFunc)

SetDURLCheckFunc 设置下载URL检测函数

func (*Downloader) SetFirstInfo

func (der *Downloader) SetFirstInfo(i *DownloadFirstInfo)

SetFirstInfo 设置初始信息 如果设置了此值, 将忽略检测url

func (*Downloader) SetLoadBalancerCompareFunc

func (der *Downloader) SetLoadBalancerCompareFunc(f LoadBalancerCompareFunc)

SetLoadBalancerCompareFunc 设置负载均衡检测函数

func (*Downloader) SetStatusCodeBodyCheckFunc

func (der *Downloader) SetStatusCodeBodyCheckFunc(f StatusCodeBodyCheckFunc)

SetStatusCodeBodyCheckFunc 设置响应状态码出错的检查函数, 当FirstCheckMethod不为HEAD时才有效

type Fder

type Fder interface {
	Fd() uintptr
}

Fder 获取fd接口

type InstanceState

type InstanceState struct {
	// contains filtered or unexported fields
}

InstanceState 状态, 断点续传信息

func NewInstanceState

func NewInstanceState(saveFile *os.File, format InstanceStateStorageFormat) *InstanceState

NewInstanceState 初始化InstanceState

func (*InstanceState) Close

func (is *InstanceState) Close() error

Close 关闭

func (*InstanceState) Get

func (is *InstanceState) Get() (eii *transfer.DownloadInstanceInfo)

Get 获取断点续传信息

func (*InstanceState) Put

Put 提交断点续传信息

type InstanceStateStorageFormat

type InstanceStateStorageFormat int

InstanceStateStorageFormat 断点续传储存类型

type LoadBalancerCompareFunc

type LoadBalancerCompareFunc func(info map[string]string, subResp *http.Response) bool

type LoadBalancerResponse

type LoadBalancerResponse struct {
	URL     string
	Referer string
}

LoadBalancerResponse 负载均衡响应状态

type LoadBalancerResponseList

type LoadBalancerResponseList struct {
	// contains filtered or unexported fields
}

LoadBalancerResponseList 负载均衡列表

func NewLoadBalancerResponseList

func NewLoadBalancerResponseList(lbr []*LoadBalancerResponse) *LoadBalancerResponseList

NewLoadBalancerResponseList 初始化负载均衡列表

func (*LoadBalancerResponseList) RandomGet

RandomGet 随机获取

func (*LoadBalancerResponseList) SequentialGet

func (lbrl *LoadBalancerResponseList) SequentialGet() *LoadBalancerResponse

SequentialGet 顺序获取

type Monitor

type Monitor struct {
	// contains filtered or unexported fields
}

Monitor 线程监控器

func NewMonitor

func NewMonitor() *Monitor

NewMonitor 初始化Monitor

func (*Monitor) Append

func (mt *Monitor) Append(worker *Worker)

Append 增加Worker

func (*Monitor) CompletedChan

func (mt *Monitor) CompletedChan() <-chan struct{}

CompletedChan 获取completed chan

func (*Monitor) DynamicSplitWorker

func (mt *Monitor) DynamicSplitWorker(worker *Worker)

DynamicSplitWorker 动态分配线程

func (*Monitor) Err

func (mt *Monitor) Err() error

Err 返回遇到的错误

func (*Monitor) Execute

func (mt *Monitor) Execute(cancelCtx context.Context)

Execute 执行任务

func (*Monitor) GetAllWorkersRange

func (mt *Monitor) GetAllWorkersRange() transfer.RangeList

GetAllWorkersRange 获取所有worker的范围

func (*Monitor) GetAvailableWorker

func (mt *Monitor) GetAvailableWorker() *Worker

GetAvailableWorker 获取空闲的worker

func (*Monitor) InitMonitorCapacity

func (mt *Monitor) InitMonitorCapacity(capacity int)

InitMonitorCapacity 初始化workers, 用于Append

func (*Monitor) IsLeftWorkersAllFailed

func (mt *Monitor) IsLeftWorkersAllFailed() bool

IsLeftWorkersAllFailed 剩下的线程是否全部失败

func (*Monitor) NumLeftWorkers

func (mt *Monitor) NumLeftWorkers() (num int)

NumLeftWorkers 剩余的worker数量

func (*Monitor) Pause

func (mt *Monitor) Pause()

Pause 暂停所有的下载

func (*Monitor) RangeWorker

func (mt *Monitor) RangeWorker(f RangeWorkerFunc)

RangeWorker 遍历worker

func (*Monitor) ResetFailedAndNetErrorWorkers

func (mt *Monitor) ResetFailedAndNetErrorWorkers()

ResetFailedAndNetErrorWorkers 重设部分网络错误的worker

func (*Monitor) ResetWorker

func (mt *Monitor) ResetWorker(worker *Worker)

ResetWorker 重设长时间无响应, 和下载速度为 0 的 Worker

func (*Monitor) Resume

func (mt *Monitor) Resume()

Resume 恢复所有的下载

func (*Monitor) SetInstanceState

func (mt *Monitor) SetInstanceState(instanceState *InstanceState)

SetInstanceState 设置状态

func (*Monitor) SetReloadWorker

func (mt *Monitor) SetReloadWorker(b bool)

SetReloadWorker 是否重载worker

func (*Monitor) SetStatus

func (mt *Monitor) SetStatus(status *transfer.DownloadStatus)

SetStatus 设置DownloadStatus

func (*Monitor) SetWorkers

func (mt *Monitor) SetWorkers(workers WorkerList)

SetWorkers 设置workers, 此操作会覆盖原有的workers

func (*Monitor) Status

func (mt *Monitor) Status() *transfer.DownloadStatus

Status 返回DownloadStatus

func (*Monitor) TryAddNewWork

func (mt *Monitor) TryAddNewWork()

TryAddNewWork 尝试加入新range

type RangeWorkerFunc

type RangeWorkerFunc func(key int, worker *Worker) bool

RangeWorkerFunc 遍历workers的函数

type ResetController

type ResetController struct {
	// contains filtered or unexported fields
}

ResetController 网络连接控制器

func NewResetController

func NewResetController(maxResetNum int) *ResetController

NewResetController 初始化*ResetController

func (*ResetController) AddResetNum

func (rc *ResetController) AddResetNum()

AddResetNum 增加连接

func (*ResetController) CanReset

func (rc *ResetController) CanReset() bool

CanReset 是否可以建立连接

type StatusCode

type StatusCode int

StatusCode 状态码

const (
	//StatusCodeInit 初始化
	StatusCodeInit StatusCode = iota
	//StatusCodeSuccessed 成功
	StatusCodeSuccessed
	//StatusCodePending 等待响应
	StatusCodePending
	//StatusCodeDownloading 下载中
	StatusCodeDownloading
	//StatusCodeWaitToWrite 等待写入数据
	StatusCodeWaitToWrite
	//StatusCodeInternalError 内部错误
	StatusCodeInternalError
	//StatusCodeTooManyConnections 连接数太多
	StatusCodeTooManyConnections
	//StatusCodeNetError 网络错误
	StatusCodeNetError
	//StatusCodeFailed 下载失败
	StatusCodeFailed
	//StatusCodePaused 已暂停
	StatusCodePaused
	//StatusCodeReseted 已重设连接
	StatusCodeReseted
	//StatusCodeCanceled 已取消
	StatusCodeCanceled
)

type StatusCodeBodyCheckFunc

type StatusCodeBodyCheckFunc func(respBody io.Reader) error

StatusCodeBodyCheckFunc 响应状态码出错的检查函数

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker 工作单元

func NewWorker

func NewWorker(id int, durl string, writerAt io.WriterAt) *Worker

NewWorker 初始化Worker

func (*Worker) Cancel

func (wer *Worker) Cancel() error

Cancel 取消下载

func (*Worker) Canceled

func (wer *Worker) Canceled() bool

Canceled 是否已经取消

func (*Worker) ClearStatus

func (wer *Worker) ClearStatus()

ClearStatus 清空状态

func (*Worker) Completed

func (wer *Worker) Completed() bool

Completed 是否已经完成

func (*Worker) Err

func (wer *Worker) Err() error

Err 返回worker错误

func (*Worker) Execute

func (wer *Worker) Execute()

Execute 执行任务

func (*Worker) Failed

func (wer *Worker) Failed() bool

Failed 是否失败

func (*Worker) GetRange

func (wer *Worker) GetRange() *transfer.Range

GetRange 返回worker范围

func (*Worker) GetSpeedsPerSecond

func (wer *Worker) GetSpeedsPerSecond() int64

GetSpeedsPerSecond 获取每秒的速度

func (*Worker) GetStatus

func (wer *Worker) GetStatus() WorkerStatuser

GetStatus 返回下载状态

func (*Worker) ID

func (wer *Worker) ID() int

ID 返回worker ID

func (*Worker) Pause

func (wer *Worker) Pause()

Pause 暂停下载

func (*Worker) Reset

func (wer *Worker) Reset()

Reset 重设连接

func (*Worker) Resume

func (wer *Worker) Resume()

Resume 恢复下载

func (*Worker) SetAcceptRange

func (wer *Worker) SetAcceptRange(acceptRanges string)

SetAcceptRange 设置AcceptRange

func (*Worker) SetClient

func (wer *Worker) SetClient(c *requester.HTTPClient)

SetClient 设置http客户端

func (*Worker) SetDownloadStatus

func (wer *Worker) SetDownloadStatus(downloadStatus *transfer.DownloadStatus)

SetDownloadStatus 增加其他需要统计的数据

func (*Worker) SetRange

func (wer *Worker) SetRange(r *transfer.Range)

SetRange 设置请求范围

func (*Worker) SetReferer

func (wer *Worker) SetReferer(referer string)

SetReferer 设置来源

func (*Worker) SetTotalSize

func (wer *Worker) SetTotalSize(size int64)

SetTotalSize 设置整个文件的大小, worker请求range时会获取尝试获取该值, 如果不匹配, 则返回错误

func (*Worker) SetWriteMutex

func (wer *Worker) SetWriteMutex(mu *sync.Mutex)

SetWriteMutex 设置数据写锁

type WorkerList

type WorkerList []*Worker

WorkerList worker列表

func (WorkerList) Duplicate

func (wl WorkerList) Duplicate() WorkerList

Duplicate 构造新的列表

func (WorkerList) Len

func (wl WorkerList) Len() int

Len 返回长度

func (WorkerList) Swap

func (wl WorkerList) Swap(i, j int)

Swap 交换

type WorkerStatus

type WorkerStatus struct {
	// contains filtered or unexported fields
}

WorkerStatus worker状态

func NewWorkerStatus

func NewWorkerStatus() *WorkerStatus

NewWorkerStatus 初始化WorkerStatus

func (*WorkerStatus) SetStatusCode

func (ws *WorkerStatus) SetStatusCode(sc StatusCode)

SetStatusCode 设置worker状态码

func (*WorkerStatus) StatusCode

func (ws *WorkerStatus) StatusCode() StatusCode

StatusCode 返回状态码

func (*WorkerStatus) StatusText

func (ws *WorkerStatus) StatusText() string

StatusText 返回状态信息

type WorkerStatuser

type WorkerStatuser interface {
	StatusCode() StatusCode //状态码
	StatusText() string
}

WorkerStatuser 状态

type Writer

type Writer interface {
	io.WriterAt
}

Writer 下载器数据输出接口

func NewDownloaderWriterByFilename

func NewDownloaderWriterByFilename(name string, flag int, perm os.FileMode) (writer Writer, file *os.File, err error)

NewDownloaderWriterByFilename 创建下载器数据输出接口, 类似于os.OpenFile

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL