downloader

package
v0.0.0-...-e1fd43b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 20, 2020 License: Apache-2.0 Imports: 27 Imported by: 0

Documentation

Overview

Package downloader 多线程下载器, 重构版

Index

Constants

View Source
const (
	//CacheSize 默认的下载缓存
	CacheSize = 8192
)

Variables

View Source
var (
	// ContentRangeRE Content-Range 正则
	ContentRangeRE = regexp.MustCompile(`^.*? \d*?-\d*?/(\d*?)$`)
)
View Source
var (
	//ErrNoWokers no workers
	ErrNoWokers = errors.New("no workers")
)
View Source
var (
	// MinParallelSize 单个线程最小的数据量
	MinParallelSize int64 = 128 * 1024 // 128kb
)

Functions

func DoDownload

func DoDownload(durl string, savePath string, cfg *Config)

DoDownload 执行下载

func GetFileName

func GetFileName(uri string, client *requester.HTTPClient) (filename string, err error)

GetFileName 获取文件名

func GetStatusText

func GetStatusText(sc StatusCode) string

GetStatusText 根据状态码获取状态信息

func ParseContentRange

func ParseContentRange(contentRange string) (contentLength int64)

ParseContentRange 解析Content-Range

func RandomNumber

func RandomNumber(min, max int) int

RandomNumber 生成指定区间随机数

func ServerEqual

func ServerEqual(resp, subResp *http.Response) bool

ServerEqual 检测负载均衡的服务器是否一致

Types

type ByLeftDesc

type ByLeftDesc struct {
	WorkerList
}

ByLeftDesc 根据剩余下载量倒序排序

func (ByLeftDesc) Less

func (wl ByLeftDesc) Less(i, j int) bool

Less 实现倒序

type Config

type Config struct {
	MaxParallel       int    // 最大下载并发量
	CacheSize         int    // 下载缓冲
	InstanceStatePath string // 断点续传信息路径
	IsTest            bool   // 是否测试下载
	// contains filtered or unexported fields
}

Config 下载配置

func NewConfig

func NewConfig() *Config

NewConfig 返回默认配置

func (*Config) Copy

func (cfg *Config) Copy() *Config

Copy 拷贝新的配置

func (*Config) Fix

func (cfg *Config) Fix()

Fix 修复配置信息, 使其合法

type DlStatus

type DlStatus interface {
	TotalSize() int64
	Downloaded() int64
	SpeedsPerSecond() int64
	TimeElapsed() time.Duration
}

DlStatus 下载状态接口

type DownloadStatus

type DownloadStatus struct {
	// contains filtered or unexported fields
}

DownloadStatus 下载状态及统计信息

func NewDownloadStatus

func NewDownloadStatus() *DownloadStatus

NewDownloadStatus 初始化DownloadStatus

func (*DownloadStatus) Add

func (ds *DownloadStatus) Add(i int64)

Add 实现Adder接口

func (*DownloadStatus) AddDownloaded

func (ds *DownloadStatus) AddDownloaded(d int64)

AddDownloaded 增加已下载数据量

func (*DownloadStatus) AddSpeedsDownloaded

func (ds *DownloadStatus) AddSpeedsDownloaded(d int64)

AddSpeedsDownloaded 增加已下载数据量, 用于统计速度

func (*DownloadStatus) Downloaded

func (ds *DownloadStatus) Downloaded() int64

Downloaded 返回已下载数据量

func (*DownloadStatus) MaxSpeeds

func (ds *DownloadStatus) MaxSpeeds() int64

MaxSpeeds 返回最大速度

func (*DownloadStatus) ResetMaxSpeeds

func (ds *DownloadStatus) ResetMaxSpeeds()

ResetMaxSpeeds 清空最大速度统计

func (*DownloadStatus) SpeedsDownloaded

func (ds *DownloadStatus) SpeedsDownloaded() int64

SpeedsDownloaded 返回用于统计速度的已下载数据量

func (*DownloadStatus) SpeedsPerSecond

func (ds *DownloadStatus) SpeedsPerSecond() int64

SpeedsPerSecond 返回每秒速度

func (*DownloadStatus) StoreMaxSpeeds

func (ds *DownloadStatus) StoreMaxSpeeds(speeds int64)

StoreMaxSpeeds 储存最大速度, 原子操作

func (*DownloadStatus) StoreSpeedsPerSecond

func (ds *DownloadStatus) StoreSpeedsPerSecond(speeds int64)

StoreSpeedsPerSecond 储存速度, 原子操作

func (*DownloadStatus) TimeElapsed

func (ds *DownloadStatus) TimeElapsed() time.Duration

TimeElapsed 返回花费的时间

func (*DownloadStatus) TotalSize

func (ds *DownloadStatus) TotalSize() int64

TotalSize 返回总大小

type Downloader

type Downloader struct {
	// contains filtered or unexported fields
}

Downloader 下载

func NewDownloader

func NewDownloader(durl string, writer io.WriterAt, config *Config) (der *Downloader)

NewDownloader 初始化Downloader

func (*Downloader) AddLoadBalanceServer

func (der *Downloader) AddLoadBalanceServer(urls ...string)

AddLoadBalanceServer 增加负载均衡服务器

func (*Downloader) Cancel

func (der *Downloader) Cancel()

Cancel 取消

func (*Downloader) Execute

func (der *Downloader) Execute() error

Execute 开始任务

func (*Downloader) GetDownloadStatusChan

func (der *Downloader) GetDownloadStatusChan() <-chan DlStatus

GetDownloadStatusChan 获取下载统计信息

func (*Downloader) OnCancel

func (der *Downloader) OnCancel(onCancelEvent requester.Event)

OnCancel 设置取消下载事件

func (*Downloader) OnExecute

func (der *Downloader) OnExecute(onExecuteEvent requester.Event)

OnExecute 设置开始下载事件

func (*Downloader) OnFinish

func (der *Downloader) OnFinish(onFinishEvent requester.Event)

OnFinish 设置结束下载事件

func (*Downloader) OnPause

func (der *Downloader) OnPause(onPauseEvent requester.Event)

OnPause 设置暂停下载事件

func (*Downloader) OnResume

func (der *Downloader) OnResume(onResumeEvent requester.Event)

OnResume 设置恢复下载事件

func (*Downloader) OnSuccess

func (der *Downloader) OnSuccess(onSuccessEvent requester.Event)

OnSuccess 设置成功下载事件

func (*Downloader) Pause

func (der *Downloader) Pause()

Pause 暂停

func (*Downloader) PrintAllWorkers

func (der *Downloader) PrintAllWorkers()

PrintAllWorkers 输出所有的worker

func (*Downloader) Resume

func (der *Downloader) Resume()

Resume 恢复

func (*Downloader) SetClient

func (der *Downloader) SetClient(client *requester.HTTPClient)

SetClient 设置http客户端

func (*Downloader) SetStatusCodeBodyCheckFunc

func (der *Downloader) SetStatusCodeBodyCheckFunc(f func(respBody io.Reader) error)

SetStatusCodeBodyCheckFunc 设置响应状态码出错的检查函数, 当FirstCheckMethod不为HEAD时才有效

func (*Downloader) TryHTTP

func (der *Downloader) TryHTTP(t bool)

TryHTTP 尝试使用 http 连接

type InstanceInfo

type InstanceInfo struct {
	DlStatus *DownloadStatus
	Ranges   RangeList
}

InstanceInfo 状态详细信息, 用于导出状态文件

type InstanceState

type InstanceState struct {
	// contains filtered or unexported fields
}

InstanceState 状态, 断点续传信息

func NewInstanceState

func NewInstanceState(saveFile *os.File) *InstanceState

NewInstanceState 初始化InstanceState

func (*InstanceState) Close

func (is *InstanceState) Close() error

Close 关闭

func (*InstanceState) Get

func (is *InstanceState) Get() (eii *InstanceInfo)

Get 拉取信息

func (*InstanceState) Put

func (is *InstanceState) Put(eii *InstanceInfo)

Put 提交信息

type LoadBalancerResponse

type LoadBalancerResponse struct {
	URL     string
	Referer string
}

LoadBalancerResponse 负载均衡响应状态

type LoadBalancerResponseList

type LoadBalancerResponseList struct {
	// contains filtered or unexported fields
}

LoadBalancerResponseList 负载均衡列表

func NewLoadBalancerResponseList

func NewLoadBalancerResponseList(lbr []*LoadBalancerResponse) *LoadBalancerResponseList

NewLoadBalancerResponseList 初始化负载均衡列表

func (*LoadBalancerResponseList) RandomGet

RandomGet 随机获取

func (*LoadBalancerResponseList) SequentialGet

func (lbrl *LoadBalancerResponseList) SequentialGet() *LoadBalancerResponse

SequentialGet 顺序获取

type Monitor

type Monitor struct {
	// contains filtered or unexported fields
}

Monitor 线程监控器

func NewMonitor

func NewMonitor() *Monitor

NewMonitor 初始化Monitor

func (*Monitor) AllCompleted

func (mt *Monitor) AllCompleted() <-chan struct{}

AllCompleted 全部完成则发送消息

func (*Monitor) Append

func (mt *Monitor) Append(worker *Worker)

Append 增加Worker

func (*Monitor) CompletedChan

func (mt *Monitor) CompletedChan() <-chan struct{}

CompletedChan 获取completed chan

func (*Monitor) Err

func (mt *Monitor) Err() error

Err 返回遇到的错误

func (*Monitor) Execute

func (mt *Monitor) Execute(cancelCtx context.Context)

Execute 执行任务

func (*Monitor) GetAllWorkersRange

func (mt *Monitor) GetAllWorkersRange() (ranges []*Range)

GetAllWorkersRange 获取所有worker的范围

func (*Monitor) GetAvaliableWorker

func (mt *Monitor) GetAvaliableWorker() *Worker

GetAvaliableWorker 获取空闲的worker

func (*Monitor) GetSpeedsPerSecondFunc

func (mt *Monitor) GetSpeedsPerSecondFunc() func() int64

GetSpeedsPerSecondFunc 获取每秒的速度, 返回获取速度的函数

func (*Monitor) InitMonitorCapacity

func (mt *Monitor) InitMonitorCapacity(capacity int)

InitMonitorCapacity 初始化workers, 用于Append

func (*Monitor) IsLeftWorkersAllFailed

func (mt *Monitor) IsLeftWorkersAllFailed() bool

IsLeftWorkersAllFailed 剩下的线程是否全部失败

func (*Monitor) NumLeftWorkers

func (mt *Monitor) NumLeftWorkers() (num int)

NumLeftWorkers 剩余的worker数量

func (*Monitor) Pause

func (mt *Monitor) Pause()

Pause 暂停所有的下载

func (*Monitor) RangeWorker

func (mt *Monitor) RangeWorker(f func(key int, worker *Worker) bool)

RangeWorker 遍历worker

func (*Monitor) ResetFailedAndNetErrorWorkers

func (mt *Monitor) ResetFailedAndNetErrorWorkers()

ResetFailedAndNetErrorWorkers 重设部分网络错误的worker

func (*Monitor) Resume

func (mt *Monitor) Resume()

Resume 恢复所有的下载

func (*Monitor) SetInstanceState

func (mt *Monitor) SetInstanceState(instanceState *InstanceState)

SetInstanceState 设置状态

func (*Monitor) SetReloadWorker

func (mt *Monitor) SetReloadWorker(b bool)

SetReloadWorker 是否重载worker

func (*Monitor) SetStatus

func (mt *Monitor) SetStatus(status *DownloadStatus)

SetStatus 设置DownloadStatus

func (*Monitor) SetWorkers

func (mt *Monitor) SetWorkers(workers WorkerList)

SetWorkers 设置workers, 此操作会覆盖原有的workers

func (*Monitor) ShowWorkers

func (mt *Monitor) ShowWorkers() string

ShowWorkers 返回所有worker的状态

func (*Monitor) Status

func (mt *Monitor) Status() *DownloadStatus

Status 返回DownloadStatus

type Range

type Range struct {
	Begin int64
	End   int64
}

Range 请求范围

func (*Range) AddBegin

func (r *Range) AddBegin(i int64) (newi int64)

AddBegin 增加Begin, 原子操作

func (*Range) Len

func (r *Range) Len() int64

Len 长度

func (*Range) LoadBegin

func (r *Range) LoadBegin() int64

LoadBegin 读取Begin, 原子操作

func (*Range) LoadEnd

func (r *Range) LoadEnd() int64

LoadEnd 读取End, 原子操作

func (*Range) StoreBegin

func (r *Range) StoreBegin(end int64)

StoreBegin 储存End, 原子操作

func (*Range) StoreEnd

func (r *Range) StoreEnd(end int64)

StoreEnd 储存End, 原子操作

func (*Range) String

func (r *Range) String() string

type RangeList

type RangeList []*Range

RangeList 请求范围列表

func (*RangeList) Len

func (rl *RangeList) Len() int64

Len 获取所有的Range长度

type ResetController

type ResetController struct {
	// contains filtered or unexported fields
}

ResetController 网络连接控制器

func NewResetController

func NewResetController(maxResetNum int) *ResetController

NewResetController 初始化*ResetController

func (*ResetController) AddResetNum

func (rc *ResetController) AddResetNum()

AddResetNum 增加连接

func (*ResetController) CanReset

func (rc *ResetController) CanReset() bool

CanReset 是否可以建立连接

type Status

type Status interface {
	StatusCode() StatusCode //状态码
	StatusText() string
}

Status 状态

type StatusCode

type StatusCode int

StatusCode 状态码

const (
	//StatusCodeInit 初始化
	StatusCodeInit StatusCode = iota
	//StatusCodeSuccessed 成功
	StatusCodeSuccessed
	//StatusCodePending 等待响应
	StatusCodePending
	//StatusCodeDownloading 下载中
	StatusCodeDownloading
	//StatusCodeWaitToWrite 等待写入数据
	StatusCodeWaitToWrite
	//StatusCodeInternalError 内部错误
	StatusCodeInternalError
	//StatusCodeTooManyConnections 连接数太多
	StatusCodeTooManyConnections
	//StatusCodeNetError 网络错误
	StatusCodeNetError
	//StatusCodeFailed 下载失败
	StatusCodeFailed
	//StatusCodePaused 已暂停
	StatusCodePaused
	//StatusCodeReseted 已重设连接
	StatusCodeReseted
	//StatusCodeCanceled 已取消
	StatusCodeCanceled
)

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker 工作单元

func NewWorker

func NewWorker(id int, durl string, writerAt io.WriterAt) *Worker

NewWorker 初始化Worker

func (*Worker) Cancel

func (wer *Worker) Cancel() error

Cancel 取消下载

func (*Worker) Canceled

func (wer *Worker) Canceled() bool

Canceled 是否已经取消

func (*Worker) CleanStatus

func (wer *Worker) CleanStatus()

CleanStatus 清空状态

func (*Worker) Completed

func (wer *Worker) Completed() bool

Completed 是否已经完成

func (*Worker) Err

func (wer *Worker) Err() error

Err 返回worker错误

func (*Worker) Execute

func (wer *Worker) Execute()

Execute 执行任务

func (*Worker) Failed

func (wer *Worker) Failed() bool

Failed 是否失败

func (*Worker) GetRange

func (wer *Worker) GetRange() *Range

GetRange 返回worker范围

func (*Worker) GetSpeedsPerSecond

func (wer *Worker) GetSpeedsPerSecond() int64

GetSpeedsPerSecond 获取每秒的速度

func (*Worker) GetStatus

func (wer *Worker) GetStatus() Status

GetStatus 返回下载状态

func (*Worker) ID

func (wer *Worker) ID() int

ID 返回worker ID

func (*Worker) Pause

func (wer *Worker) Pause()

Pause 暂停下载

func (*Worker) Reset

func (wer *Worker) Reset()

Reset 重设连接

func (*Worker) Resume

func (wer *Worker) Resume()

Resume 恢复下载

func (*Worker) SetCacheSize

func (wer *Worker) SetCacheSize(size int)

SetCacheSize 设置下载缓存

func (*Worker) SetClient

func (wer *Worker) SetClient(c *requester.HTTPClient)

SetClient 设置http客户端

func (*Worker) SetDownloadStatus

func (wer *Worker) SetDownloadStatus(downloadStatus *DownloadStatus)

SetDownloadStatus 增加其他需要统计的数据

func (*Worker) SetRange

func (wer *Worker) SetRange(acceptRanges string, r Range)

SetRange 设置请求范围

func (*Worker) SetReferer

func (wer *Worker) SetReferer(referer string)

SetReferer 设置来源

func (*Worker) SetWriteMutex

func (wer *Worker) SetWriteMutex(mu *sync.Mutex)

SetWriteMutex 设置数据写锁

type WorkerList

type WorkerList []*Worker

WorkerList worker列表

func (WorkerList) Duplicate

func (wl WorkerList) Duplicate() WorkerList

Duplicate 构造新的列表

func (WorkerList) Len

func (wl WorkerList) Len() int

Len 返回长度

func (WorkerList) Swap

func (wl WorkerList) Swap(i, j int)

Swap 交换

type WorkerStatus

type WorkerStatus struct {
	// contains filtered or unexported fields
}

WorkerStatus worker状态

func NewWorkerStatus

func NewWorkerStatus() *WorkerStatus

NewWorkerStatus 初始化WorkerStatus

func (*WorkerStatus) SetStatusCode

func (ws *WorkerStatus) SetStatusCode(sc StatusCode)

SetStatusCode 设置worker状态码

func (*WorkerStatus) StatusCode

func (ws *WorkerStatus) StatusCode() StatusCode

StatusCode 返回状态码

func (*WorkerStatus) StatusText

func (ws *WorkerStatus) StatusText() string

StatusText 返回状态信息

Directories

Path Synopsis
Package cachepool []byte缓存池
Package cachepool []byte缓存池

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL