Documentation ¶
Overview ¶
Package tegenaria is a crawler framework based on golang
tegenaria是一个基于golang开发的快速、高效率的网络爬虫框架
Index ¶
- Variables
- func AddGo(wg *sync.WaitGroup, funcs ...GoFunc) <-chan error
- func DefaultventsWatcher(ch chan EventType, hooker EventHooksInterface) error
- func ExecuteCmd(engine *CrawlEngine)
- func GetAllParserMethod(spider SpiderInterface) map[string]Parser
- func GetFunctionName(fn Parser) string
- func GetLogger(Name string) *logrus.Entry
- func GetMachineIp() string
- func GetUUID() string
- func Map2String(m interface{}) string
- func NewDefaultLimiter(limitRate int) *defaultLimiter
- func NewLeakyBucketLimiterWithRdb(safetyLevel int, rdb redis.Cmdable, keyFunc GetRDBKey) *leakyBucketLimiterWithRdb
- func NewRdbClient(config *DistributedWorkerConfig) *redis.Client
- func NewRdbClusterCLient(config *WorkerConfigWithRdbCluster) *redis.ClusterClient
- func NewRdbConfig(config *DistributedWorkerConfig) *redis.Options
- func NewRequestCache() *requestCache
- func OptimalNumOfBits(n int64, p float64) int64
- func OptimalNumOfHashFunctions(n int64, m int64) int64
- type BaseSpider
- type CacheInterface
- type CheckMasterLive
- type Configuration
- type Context
- type ContextInterface
- type ContextOption
- type CrawlEngine
- func (e *CrawlEngine) Close()
- func (e *CrawlEngine) EventsWatcherRunner() error
- func (e *CrawlEngine) Execute()
- func (e *CrawlEngine) GetSpiders() *Spiders
- func (e *CrawlEngine) RegisterDownloadMiddlewares(middlewares MiddlewaresInterface)
- func (e *CrawlEngine) RegisterPipelines(pipeline PipelinesInterface)
- func (e *CrawlEngine) RegisterSpiders(spider SpiderInterface)
- func (e *CrawlEngine) Scheduler() error
- type DefaultFieldHook
- type DefualtHooks
- func (d *DefualtHooks) Error(params ...interface{}) error
- func (d *DefualtHooks) EventsWatcher(ch chan EventType) error
- func (d *DefualtHooks) Exit(params ...interface{}) error
- func (d *DefualtHooks) Heartbeat(params ...interface{}) error
- func (d *DefualtHooks) Start(params ...interface{}) error
- func (d *DefualtHooks) Stop(params ...interface{}) error
- type DistributeOptions
- func DistributedWithBloomN(bloomN uint) DistributeOptions
- func DistributedWithBloomP(bloomP float64) DistributeOptions
- func DistributedWithConnectionsSize(size int) DistributeOptions
- func DistributedWithGetBFKey(keyFunc GetRDBKey) DistributeOptions
- func DistributedWithGetLimitKey(keyFunc GetRDBKey) DistributeOptions
- func DistributedWithGetqueueKey(keyFunc GetRDBKey) DistributeOptions
- func DistributedWithLimiterRate(rate int) DistributeOptions
- func DistributedWithRdbMaxRetry(retry int) DistributeOptions
- func DistributedWithRdbTimeout(timeout time.Duration) DistributeOptions
- type DistributeStatistic
- func (s *DistributeStatistic) GetDownloadFail() uint64
- func (s *DistributeStatistic) GetErrorCount() uint64
- func (s *DistributeStatistic) GetItemScraped() uint64
- func (s *DistributeStatistic) GetRequestSent() uint64
- func (s *DistributeStatistic) GetStatsField(field StatsFieldType) uint64
- func (s *DistributeStatistic) IncrDownloadFail()
- func (s *DistributeStatistic) IncrErrorCount()
- func (s *DistributeStatistic) IncrItemScraped()
- func (s *DistributeStatistic) IncrRequestSent()
- func (s *DistributeStatistic) IncrStats(field StatsFieldType)
- func (s *DistributeStatistic) OutputStats() map[string]uint64
- func (s *DistributeStatistic) Reset() error
- type DistributeStatisticOption
- type DistributedHooks
- func (d *DistributedHooks) Error(params ...interface{}) error
- func (d *DistributedHooks) EventsWatcher(ch chan EventType) error
- func (d *DistributedHooks) Exit(params ...interface{}) error
- func (d *DistributedHooks) Heartbeat(params ...interface{}) error
- func (d *DistributedHooks) Start(params ...interface{}) error
- func (d *DistributedHooks) Stop(params ...interface{}) error
- type DistributedWorker
- func (w *DistributedWorker) Add(fingerprint []byte) error
- func (w *DistributedWorker) AddNode() error
- func (w *DistributedWorker) CheckAllNodesStop() (bool, error)
- func (w *DistributedWorker) CheckMasterLive() (bool, error)
- func (w *DistributedWorker) DelNode() error
- func (w *DistributedWorker) DoDupeFilter(ctx *Context) (bool, error)
- func (w *DistributedWorker) Fingerprint(ctx *Context) ([]byte, error)
- func (w *DistributedWorker) GetLimter() LimitInterface
- func (w *DistributedWorker) Heartbeat() error
- func (w *DistributedWorker) SetMaster(flag bool)
- func (w *DistributedWorker) SetSpiders(spiders *Spiders)
- func (w *DistributedWorker) StopNode() error
- func (w *DistributedWorker) TestOrAdd(fingerprint []byte) (bool, error)
- type DistributedWorkerConfig
- type DistributedWorkerInterface
- type Downloader
- type DownloaderOption
- func DownloadWithClient(client http.Client) DownloaderOption
- func DownloadWithH2(h2 bool) DownloaderOption
- func DownloadWithTimeout(timeout time.Duration) DownloaderOption
- func DownloadWithTlsConfig(tls *tls.Config) DownloaderOption
- func DownloaderWithtransport(transport *http.Transport) DownloaderOption
- type EngineOption
- func EngineWithCache(cache CacheInterface) EngineOption
- func EngineWithDistributedWorker(woker DistributedWorkerInterface) EngineOption
- func EngineWithDownloader(downloader Downloader) EngineOption
- func EngineWithFilter(filter RFPDupeFilterInterface) EngineOption
- func EngineWithLimiter(limiter LimitInterface) EngineOption
- func EngineWithUniqueReq(uniqueReq bool) EngineOption
- type ErrorOption
- type EventHooksInterface
- type EventType
- type EventsWatcher
- type GetRDBKey
- type GoFunc
- type HandleError
- type Hook
- type ItemInterface
- type ItemMeta
- type ItemPipelines
- type LimitInterface
- type Middlewares
- type MiddlewaresBase
- type MiddlewaresInterface
- type Parser
- type PipelinesBase
- type PipelinesInterface
- type ProcessResponse
- type Proxy
- type RFPDupeFilter
- type RFPDupeFilterInterface
- type RdbNodes
- type RedirectError
- type Request
- type RequestMethod
- type RequestOption
- func RequestWithAllowRedirects(allowRedirects bool) RequestOption
- func RequestWithAllowedStatusCode(allowStatusCode []uint64) RequestOption
- func RequestWithMaxConnsPerHost(maxConnsPerHost int) RequestOption
- func RequestWithMaxRedirects(maxRedirects int) RequestOption
- func RequestWithParser(parser Parser) RequestOption
- func RequestWithRequestBody(body map[string]interface{}) RequestOption
- func RequestWithRequestBytesBody(body []byte) RequestOption
- func RequestWithRequestCookies(cookies map[string]string) RequestOption
- func RequestWithRequestHeader(header map[string]string) RequestOption
- func RequestWithRequestMeta(meta map[string]interface{}) RequestOption
- func RequestWithRequestParams(params map[string]string) RequestOption
- func RequestWithRequestProxy(proxy Proxy) RequestOption
- func RequestWithResponseWriter(write io.Writer) RequestOption
- func RequestWithTimeout(timeout time.Duration) RequestOption
- type Response
- type Settings
- type SpiderDownloader
- type SpiderInterface
- type Spiders
- type Statistic
- func (s *Statistic) GetDownloadFail() uint64
- func (s *Statistic) GetErrorCount() uint64
- func (s *Statistic) GetItemScraped() uint64
- func (s *Statistic) GetRequestSent() uint64
- func (s *Statistic) IncrDownloadFail()
- func (s *Statistic) IncrErrorCount()
- func (s *Statistic) IncrItemScraped()
- func (s *Statistic) IncrRequestSent()
- func (s *Statistic) OutputStats() map[string]uint64
- func (s *Statistic) Reset() error
- type StatisticInterface
- type StatsFieldType
- type WorkerConfigWithRdbCluster
Constants ¶
This section is empty.
Variables ¶
var ( // ErrSpiderMiddleware 下载中间件处理异常 ErrSpiderMiddleware error = errors.New("handle spider middleware error") // ErrSpiderCrawls 抓取流程错误 ErrSpiderCrawls error = errors.New("handle spider crawl error") // ErrDuplicateSpiderName 爬虫名重复错误 ErrDuplicateSpiderName error = errors.New("register a duplicate spider name error") // ErrEmptySpiderName 爬虫名不能为空 ErrEmptySpiderName error = errors.New("register a empty spider name error") // ErrSpiderNotExist 爬虫实例不存在 ErrSpiderNotExist error = errors.New("not found spider") // ErrNotAllowStatusCode 不允许的状态码 ErrNotAllowStatusCode error = errors.New("not allow handle status code") // ErrGetCacheItem 获取item 错误 ErrGetCacheItem error = errors.New("getting item from cache error") // ErrGetHttpProxy 获取http代理错误 ErrGetHttpProxy error = errors.New("getting http proxy ") // ErrGetHttpsProxy 获取https代理错误 ErrGetHttpsProxy error = errors.New("getting https proxy ") // ErrParseSocksProxy 解析socks代理错误 ErrParseSocksProxy error = errors.New("parse socks proxy ") // ErrResponseRead 响应读取失败 ErrResponseRead error = errors.New("read response to buffer error") // ErrResponseParse 响应解析失败 ErrResponseParse error = errors.New("parse response error") )
var ProcessId string = uuid.New().String()
Functions ¶
func DefaultventsWatcher ¶ added in v0.4.1
func DefaultventsWatcher(ch chan EventType, hooker EventHooksInterface) error
DefaultventsWatcher 默认的事件监听器 ch 用于接收事件 hooker 事件处理实例化接口,比如DefualtHooks
func ExecuteCmd ¶ added in v0.4.1
func ExecuteCmd(engine *CrawlEngine)
Execute adds all child commands to the root command and sets flags appropriately. This is called by main.main(). It only needs to happen once to the rootCmd.
func GetAllParserMethod ¶ added in v0.4.1
func GetAllParserMethod(spider SpiderInterface) map[string]Parser
GetAllParserMethod 获取spider实例所有的解析函数
func GetFunctionName ¶ added in v0.4.1
GetFunctionName 提取解析函数名
func NewDefaultLimiter ¶ added in v0.4.1
func NewDefaultLimiter(limitRate int) *defaultLimiter
NewDefaultLimiter 创建一个新的限速器 limitRate 最大请求速率
func NewLeakyBucketLimiterWithRdb ¶ added in v0.4.1
func NewLeakyBucketLimiterWithRdb(safetyLevel int, rdb redis.Cmdable, keyFunc GetRDBKey) *leakyBucketLimiterWithRdb
NewLeakyBucketLimiterWithRdb leakyBucketLimiterWithRdb 构造函数
func NewRdbClient ¶ added in v0.4.1
func NewRdbClient(config *DistributedWorkerConfig) *redis.Client
func NewRdbClusterCLient ¶ added in v0.4.1
func NewRdbClusterCLient(config *WorkerConfigWithRdbCluster) *redis.ClusterClient
func NewRdbConfig ¶ added in v0.4.1
func NewRdbConfig(config *DistributedWorkerConfig) *redis.Options
NewRdbConfig redis 配置构造函数
func OptimalNumOfBits ¶ added in v0.4.1
OptimalNumOfBits 计算位数组长度
func OptimalNumOfHashFunctions ¶ added in v0.4.1
OptimalNumOfHashFunctions计算最优的布隆过滤器哈希函数个数
Types ¶
type BaseSpider ¶
BaseSpider base spider
func NewBaseSpider ¶
func NewBaseSpider(name string, feedUrls []string) *BaseSpider
func (*BaseSpider) ErrorHandler ¶
func (s *BaseSpider) ErrorHandler(err *HandleError)
func (*BaseSpider) Parser ¶
func (s *BaseSpider) Parser(resp *Context, item chan<- *ItemMeta, req chan<- *Context) error
Parser parse request respone it will send item or new request to engine
func (*BaseSpider) StartRequest ¶
func (s *BaseSpider) StartRequest(req chan<- *Context)
type CacheInterface ¶
type CacheInterface interface {
// contains filtered or unexported methods
}
CacheInterface request缓存组件
type CheckMasterLive ¶ added in v0.4.1
CheckMasterLive 检查所有的master节点是否都在线
type Configuration ¶
var Config *Configuration = nil
func (*Configuration) GetValue ¶ added in v0.4.1
func (c *Configuration) GetValue(key string) (interface{}, error)
type Context ¶
type Context struct { // Request 请求对象 Request *Request // Response 响应对象 Response *Response // CtxId context 唯一id由uuid生成 CtxId string // Error Error error // Cancel context.CancelFunc // Ref int64 // Items chan *ItemMeta Spider SpiderInterface // contains filtered or unexported fields }
Context 在引擎中的数据流通载体,负责单个抓取任务的生命周期维护
func NewContext ¶
func NewContext(request *Request, Spider SpiderInterface, opts ...ContextOption) *Context
NewContext 从内存池中构建context对象
type ContextInterface ¶ added in v0.4.1
type ContextInterface interface {
IsDone() bool
}
type ContextOption ¶
type ContextOption func(c *Context)
func WithContextId ¶ added in v0.4.1
func WithContextId(ctxId string) ContextOption
WithContextId 设置自定义的ctxId
func WithItemChannelSize ¶ added in v0.4.1
func WithItemChannelSize(size int) ContextOption
type CrawlEngine ¶ added in v0.4.1
type CrawlEngine struct {
// contains filtered or unexported fields
}
CrawlEngine 引擎是整个框架数据流调度核心
func (*CrawlEngine) EventsWatcherRunner ¶ added in v0.4.1
func (e *CrawlEngine) EventsWatcherRunner() error
EventsWatcherRunner 事件监听器运行组件
func (*CrawlEngine) GetSpiders ¶ added in v0.4.1
func (e *CrawlEngine) GetSpiders() *Spiders
GetSpiders 获取所有的已经注册到引擎的spider实例
func (*CrawlEngine) RegisterDownloadMiddlewares ¶ added in v0.4.1
func (e *CrawlEngine) RegisterDownloadMiddlewares(middlewares MiddlewaresInterface)
RegisterDownloadMiddlewares 注册下载中间件到引擎
func (*CrawlEngine) RegisterPipelines ¶ added in v0.4.1
func (e *CrawlEngine) RegisterPipelines(pipeline PipelinesInterface)
RegisterPipelines 注册pipelines到引擎
func (*CrawlEngine) RegisterSpiders ¶ added in v0.4.1
func (e *CrawlEngine) RegisterSpiders(spider SpiderInterface)
RegisterSpider 将spider实例注册到引擎的 spiders
func (*CrawlEngine) Scheduler ¶ added in v0.4.1
func (e *CrawlEngine) Scheduler() error
Scheduler 调度器
type DefaultFieldHook ¶
type DefaultFieldHook struct { }
func (*DefaultFieldHook) Levels ¶
func (hook *DefaultFieldHook) Levels() []logrus.Level
type DefualtHooks ¶ added in v0.4.1
type DefualtHooks struct { }
func NewDefualtHooks ¶ added in v0.4.1
func NewDefualtHooks() *DefualtHooks
NewDefualtHooks 构建新的默认事件监听器
func (*DefualtHooks) Error ¶ added in v0.4.1
func (d *DefualtHooks) Error(params ...interface{}) error
Error 处理ERROR事件
func (*DefualtHooks) EventsWatcher ¶ added in v0.4.1
func (d *DefualtHooks) EventsWatcher(ch chan EventType) error
EventsWatcher DefualtHooks 的事件监听器
func (*DefualtHooks) Exit ¶ added in v0.4.1
func (d *DefualtHooks) Exit(params ...interface{}) error
Exit 处理EXIT事件
func (*DefualtHooks) Heartbeat ¶ added in v0.4.1
func (d *DefualtHooks) Heartbeat(params ...interface{}) error
Heartbeat 处理HEARTBEAT事件
func (*DefualtHooks) Start ¶ added in v0.4.1
func (d *DefualtHooks) Start(params ...interface{}) error
Start 处理START事件
func (*DefualtHooks) Stop ¶ added in v0.4.1
func (d *DefualtHooks) Stop(params ...interface{}) error
Stop 处理STOP事件
type DistributeOptions ¶ added in v0.4.1
type DistributeOptions func(w *DistributedWorkerConfig)
func DistributedWithBloomN ¶ added in v0.4.1
func DistributedWithBloomN(bloomN uint) DistributeOptions
DistributedWithBloomN 布隆过滤器数据规模
func DistributedWithBloomP ¶ added in v0.4.1
func DistributedWithBloomP(bloomP float64) DistributeOptions
DistributedWithBloomP 布隆过滤器容错率
func DistributedWithConnectionsSize ¶ added in v0.4.1
func DistributedWithConnectionsSize(size int) DistributeOptions
DistributedWithConnectionsSize rdb 连接池最大连接数
func DistributedWithGetBFKey ¶ added in v0.4.1
func DistributedWithGetBFKey(keyFunc GetRDBKey) DistributeOptions
DistributedWithGetBFKey 布隆过滤器的key生成函数
func DistributedWithGetLimitKey ¶ added in v0.4.1
func DistributedWithGetLimitKey(keyFunc GetRDBKey) DistributeOptions
DistributedWithGetLimitKey 限速器key的生成函数
func DistributedWithGetqueueKey ¶ added in v0.4.1
func DistributedWithGetqueueKey(keyFunc GetRDBKey) DistributeOptions
DistributedWithGetqueueKey 队列key生成函数
func DistributedWithLimiterRate ¶ added in v0.4.1
func DistributedWithLimiterRate(rate int) DistributeOptions
DistributedWithLimiterRate 分布式组件下限速器的限速值
func DistributedWithRdbMaxRetry ¶ added in v0.4.1
func DistributedWithRdbMaxRetry(retry int) DistributeOptions
DistributedWithRdbMaxRetry rdb失败重试次数
func DistributedWithRdbTimeout ¶ added in v0.4.1
func DistributedWithRdbTimeout(timeout time.Duration) DistributeOptions
DistributedWithRdbTimeout rdb超时时间设置
type DistributeStatistic ¶ added in v0.4.1
type DistributeStatistic struct {
// contains filtered or unexported fields
}
DistributeStatistic 分布式统计组件
func NewDistributeStatistic ¶ added in v0.4.1
func NewDistributeStatistic(statsPrefixKey string, rdb redis.Cmdable, wg *sync.WaitGroup, opts ...DistributeStatisticOption) *DistributeStatistic
NewDistributeStatistic 分布式数据统计组件构造函数
func (*DistributeStatistic) GetDownloadFail ¶ added in v0.4.1
func (s *DistributeStatistic) GetDownloadFail() uint64
GetDownloadFail 获取下载失败数
func (*DistributeStatistic) GetErrorCount ¶ added in v0.4.1
func (s *DistributeStatistic) GetErrorCount() uint64
GetErrorCount 获取错误数
func (*DistributeStatistic) GetItemScraped ¶ added in v0.4.1
func (s *DistributeStatistic) GetItemScraped() uint64
GetItemScraped 获取items的值
func (*DistributeStatistic) GetRequestSent ¶ added in v0.4.1
func (s *DistributeStatistic) GetRequestSent() uint64
GetRequestSent 获取request 量
func (*DistributeStatistic) GetStatsField ¶ added in v0.4.1
func (s *DistributeStatistic) GetStatsField(field StatsFieldType) uint64
GetStatsField 获取指定的数据指标值
func (*DistributeStatistic) IncrDownloadFail ¶ added in v0.4.1
func (s *DistributeStatistic) IncrDownloadFail()
GetDownloadFail 累计获取下载失败的数量
func (*DistributeStatistic) IncrErrorCount ¶ added in v0.4.1
func (s *DistributeStatistic) IncrErrorCount()
IncrErrorCount 累加一条错误
func (*DistributeStatistic) IncrItemScraped ¶ added in v0.4.1
func (s *DistributeStatistic) IncrItemScraped()
IncrItemScraped 累加一条item
func (*DistributeStatistic) IncrRequestSent ¶ added in v0.4.1
func (s *DistributeStatistic) IncrRequestSent()
IncrRequestSent 累加一条request
func (*DistributeStatistic) IncrStats ¶ added in v0.4.1
func (s *DistributeStatistic) IncrStats(field StatsFieldType)
IncrStats累加指定的统计指标
func (*DistributeStatistic) OutputStats ¶ added in v0.4.1
func (s *DistributeStatistic) OutputStats() map[string]uint64
OutputStats 格式化输出所有的数据指标
func (*DistributeStatistic) Reset ¶ added in v0.4.1
func (s *DistributeStatistic) Reset() error
Reset 重置各项指标 若afterResetTTL>0则为每一项指标设置ttl否则直接删除指标
type DistributeStatisticOption ¶ added in v0.4.1
type DistributeStatisticOption func(d *DistributeStatistic)
DistributeStatisticOption 分布式组件可选参数定义
func DistributeStatisticAfterResetTTL ¶ added in v0.4.1
func DistributeStatisticAfterResetTTL(ttl time.Duration) DistributeStatisticOption
DistributeStatisticAfterResetTTL 为分布式计数器设置重置之前的ttl
type DistributedHooks ¶ added in v0.4.1
type DistributedHooks struct {
// contains filtered or unexported fields
}
DistributedHooks 分布式事件监听器
func NewDistributedHooks ¶ added in v0.4.1
func NewDistributedHooks(worker DistributedWorkerInterface) *DistributedHooks
DistributedHooks 构建新的分布式监听器组件对象
func (*DistributedHooks) Error ¶ added in v0.4.1
func (d *DistributedHooks) Error(params ...interface{}) error
Error 用于处理分布式模式下的ERROR事件
func (*DistributedHooks) EventsWatcher ¶ added in v0.4.1
func (d *DistributedHooks) EventsWatcher(ch chan EventType) error
EventsWatcher 分布式模式下的事件监听器
func (*DistributedHooks) Exit ¶ added in v0.4.1
func (d *DistributedHooks) Exit(params ...interface{}) error
Exit 用于处理分布式模式下的Exit事件
func (*DistributedHooks) Heartbeat ¶ added in v0.4.1
func (d *DistributedHooks) Heartbeat(params ...interface{}) error
Exit 用于处理分布式模式下的HEARTBEAT事件
func (*DistributedHooks) Start ¶ added in v0.4.1
func (d *DistributedHooks) Start(params ...interface{}) error
Start 用于处理分布式模式下的START事件
func (*DistributedHooks) Stop ¶ added in v0.4.1
func (d *DistributedHooks) Stop(params ...interface{}) error
Stop 用于处理分布式模式下的STOP事件
type DistributedWorker ¶ added in v0.4.1
type DistributedWorker struct {
// contains filtered or unexported fields
}
DistributedWorker 分布式组件,包含两个组件: request请求缓存队列,由各个节点上的引擎读队列消费, redis 队列缓存的是经过gob序列化之后的二进制数据 布隆过滤器主要是用于去重 该组件同事实现了RFPDupeFilterInterface 和CacheInterface
func NewDistributedWorker ¶ added in v0.4.1
func NewDistributedWorker(addr string, config *DistributedWorkerConfig) *DistributedWorker
NewDistributedWorker 构建redis单机模式下的分布式工作组件
func NewWorkerWithRdbCluster ¶ added in v0.4.1
func NewWorkerWithRdbCluster(config *WorkerConfigWithRdbCluster) *DistributedWorker
NewWorkerWithRdbCluster redis cluster模式下的分布式工作组件
func (*DistributedWorker) Add ¶ added in v0.4.1
func (w *DistributedWorker) Add(fingerprint []byte) error
Add 添加指纹到布隆过滤器
func (*DistributedWorker) AddNode ¶ added in v0.4.1
func (w *DistributedWorker) AddNode() error
AddNode 新增节点
func (*DistributedWorker) CheckAllNodesStop ¶ added in v0.4.1
func (w *DistributedWorker) CheckAllNodesStop() (bool, error)
CheckAllNodesStop 检查所有的节点是否都已经停止
func (*DistributedWorker) CheckMasterLive ¶ added in v0.4.1
func (w *DistributedWorker) CheckMasterLive() (bool, error)
CheckMasterLive 检查所有master 节点是否都在线
func (*DistributedWorker) DelNode ¶ added in v0.4.1
func (w *DistributedWorker) DelNode() error
DelNode 删除当前节点
func (*DistributedWorker) DoDupeFilter ¶ added in v0.4.1
func (w *DistributedWorker) DoDupeFilter(ctx *Context) (bool, error)
DoDupeFilter request去重处理,如果指纹已经存在则返回True,否则为False 指纹不存在的情况下会将指纹添加到缓存
func (*DistributedWorker) Fingerprint ¶ added in v0.4.1
func (w *DistributedWorker) Fingerprint(ctx *Context) ([]byte, error)
Fingerprint 生成request 对象的指纹
func (*DistributedWorker) GetLimter ¶ added in v0.4.1
func (w *DistributedWorker) GetLimter() LimitInterface
GetLimter 获取限速器
func (*DistributedWorker) Heartbeat ¶ added in v0.4.1
func (w *DistributedWorker) Heartbeat() error
Heartbeat 心跳包
func (*DistributedWorker) SetMaster ¶ added in v0.4.1
func (w *DistributedWorker) SetMaster(flag bool)
SetMaster 设置当前的节点是否为master
func (*DistributedWorker) SetSpiders ¶ added in v0.4.1
func (w *DistributedWorker) SetSpiders(spiders *Spiders)
SetSpiders 设置所有的已注册的spider
func (*DistributedWorker) StopNode ¶ added in v0.4.1
func (w *DistributedWorker) StopNode() error
StopNode 停止当前节点的活动
type DistributedWorkerConfig ¶ added in v0.4.1
type DistributedWorkerConfig struct { // RedisAddr redis 地址 RedisAddr string // RedisPasswd redis 密码 RedisPasswd string // RedisUsername redis 用户名 RedisUsername string // RedisDB redis 数据库索引 index RedisDB uint32 // RdbConnectionsSize 连接池大小 RdbConnectionsSize uint64 // RdbTimeout redis 超时时间 RdbTimeout time.Duration // RdbMaxRetry redis操作失败后的重试次数 RdbMaxRetry int // BloomP 布隆过滤器的容错率 BloomP float64 // BloomN 数据规模,比如1024 * 1024 BloomN uint // 并发量 LimiterRate int // GetqueueKey 生成队列key的函数,允许用户自定义 GetqueueKey GetRDBKey // GetBFKey 布隆过滤器对应的生成key的函数,允许用户自定义 GetBFKey GetRDBKey // contains filtered or unexported fields }
DistributedWorkerConfig 分布式组件的配置参数
func NewDistributedWorkerConfig ¶ added in v0.4.1
func NewDistributedWorkerConfig(username string, passwd string, db uint32, opts ...DistributeOptions) *DistributedWorkerConfig
NewDistributedWorkerConfig 新建分布式组件的配置
type DistributedWorkerInterface ¶ added in v0.4.1
type DistributedWorkerInterface interface { CacheInterface RFPDupeFilterInterface // AddNode 新增一个节点 AddNode() error // DelNode 删除当前的节点 DelNode() error // StopNode 停止当前的节点 StopNode() error // Heartbeat 心跳 Heartbeat() error // CheckAllNodesStop 检查所有的节点是否都已经停止 CheckAllNodesStop() (bool, error) // GetLimter 获取限速器 GetLimter() LimitInterface // CheckMasterLive 检测主节点是否还在线 CheckMasterLive() (bool, error) // SetMaster 是否将当前的节点设置为主节点 SetMaster(flag bool) // SetSpiders 设置已经注册的所有的spider SetSpiders(spiders *Spiders) }
DistributedWorkerInterface 分布式组件接口
type Downloader ¶
type Downloader interface { // Download 下载函数 Download(ctx *Context) (*Response, error) // CheckStatus 检查响应状态码的合法性 CheckStatus(statusCode uint64, allowStatus []uint64) bool }
Downloader 下载器接口
type DownloaderOption ¶
type DownloaderOption func(d *SpiderDownloader)
DownloaderOption 下载器可选参数函数
func DownloadWithClient ¶
func DownloadWithClient(client http.Client) DownloaderOption
DownloadWithClient 设置下载器的http.Client客户端
func DownloadWithH2 ¶ added in v0.4.1
func DownloadWithH2(h2 bool) DownloaderOption
DownloadWithTlsConfig 下载器是否开启http2
func DownloadWithTimeout ¶
func DownloadWithTimeout(timeout time.Duration) DownloaderOption
DownloadWithTimeout 设置下载器的网络请求超时时间
func DownloadWithTlsConfig ¶
func DownloadWithTlsConfig(tls *tls.Config) DownloaderOption
DownloadWithTlsConfig 设置下载器的tls
func DownloaderWithtransport ¶
func DownloaderWithtransport(transport *http.Transport) DownloaderOption
DownloaderWithtransport 为下载器设置 http.Transport
type EngineOption ¶
type EngineOption func(r *CrawlEngine)
EngineOption 引擎构造过程中的可选参数
func EngineWithCache ¶ added in v0.4.1
func EngineWithCache(cache CacheInterface) EngineOption
EngineWithCache 引擎使用的缓存组件
func EngineWithDistributedWorker ¶ added in v0.4.1
func EngineWithDistributedWorker(woker DistributedWorkerInterface) EngineOption
EngineWithDistributedWorker 引擎使用的的分布式组件
func EngineWithDownloader ¶
func EngineWithDownloader(downloader Downloader) EngineOption
EngineWithDownloader 引擎使用的下载器组件
func EngineWithFilter ¶ added in v0.4.1
func EngineWithFilter(filter RFPDupeFilterInterface) EngineOption
EngineWithFilter 引擎使用的过滤去重组件
func EngineWithLimiter ¶ added in v0.4.1
func EngineWithLimiter(limiter LimitInterface) EngineOption
EngineWithLimiter 引擎使用的限速器
func EngineWithUniqueReq ¶
func EngineWithUniqueReq(uniqueReq bool) EngineOption
EngineWithUniqueReq 是否进行去重处理
type ErrorOption ¶
type ErrorOption func(e *HandleError)
ErrorOption HandleError 可选参数
func ErrorWithExtras ¶ added in v0.4.1
func ErrorWithExtras(extras map[string]interface{}) ErrorOption
ErrorWithExtras HandleError 添加额外的数据
type EventHooksInterface ¶ added in v0.4.1
type EventHooksInterface interface { // Start 处理引擎启动事件 Start(params ...interface{}) error // Stop 处理引擎停止事件 Stop(params ...interface{}) error // Error处理错误事件 Error(params ...interface{}) error // Exit 退出引擎事件 Exit(params ...interface{}) error // Heartbeat 心跳检查事件 Heartbeat(params ...interface{}) error // EventsWatcher 事件监听器 EventsWatcher(ch chan EventType) error }
EventHooksInterface 事件处理函数接口
type EventsWatcher ¶ added in v0.4.1
EventsWatcher 事件监听器
type HandleError ¶
type HandleError struct { // CtxId 上下文id CtxId string // Err 处理过程的错误 Err error // Extras 携带的额外信息 Extras map[string]interface{} }
HandleError 错误处理接口
func NewError ¶
func NewError(ctx *Context, err error, opts ...ErrorOption) *HandleError
NewError 构建新的HandleError实例
type ItemMeta ¶
type ItemMeta struct { // CtxId 对应的context id CtxId string // Item item对象 Item ItemInterface }
ItemMeta item元数据结构
type ItemPipelines ¶
type ItemPipelines []PipelinesInterface
func (ItemPipelines) Len ¶
func (p ItemPipelines) Len() int
func (ItemPipelines) Less ¶
func (p ItemPipelines) Less(i, j int) bool
func (ItemPipelines) Swap ¶
func (p ItemPipelines) Swap(i, j int)
type LimitInterface ¶ added in v0.4.1
type LimitInterface interface {
// contains filtered or unexported methods
}
LimitInterface 限速器接口
type Middlewares ¶
type Middlewares []MiddlewaresInterface
Middlewares 下载中间件队列
func (Middlewares) Less ¶
func (p Middlewares) Less(i, j int) bool
func (Middlewares) Swap ¶
func (p Middlewares) Swap(i, j int)
type MiddlewaresBase ¶
type MiddlewaresBase struct {
Priority int
}
type MiddlewaresInterface ¶
type MiddlewaresInterface interface { // GetPriority 获取优先级,数字越小优先级越高 GetPriority() int // ProcessRequest 处理request请求对象 // 此处用于增加请求头 // 按优先级执行 ProcessRequest(ctx *Context) error // ProcessResponse 用于处理请求成功之后的response // 执行顺序你优先级,及优先级越高执行顺序越晚 ProcessResponse(ctx *Context, req chan<- *Context) error // GetName 获取中间件的名称 GetName() string }
MiddlewaresInterface 下载中间件的接口用于处理进入下载器之前的request对象 和下载之后的response
type Parser ¶
Parser 响应解析函数结构
func GetParserByName ¶ added in v0.4.1
func GetParserByName(spider SpiderInterface, name string) Parser
GetParserByName 通过函数名从spider实例中获取解析函数
type PipelinesBase ¶
type PipelinesBase struct {
Priority int
}
type PipelinesInterface ¶
type PipelinesInterface interface { // GetPriority 获取当前pipeline的优先级 GetPriority() int // ProcessItem item处理单元 ProcessItem(spider SpiderInterface, item *ItemMeta) error }
PipelinesInterface pipeline 接口 pipeline 主要用于处理item,例如数据存储、数据清洗 将多个pipeline注册到引擎可以实现责任链模式的数据处理
type ProcessResponse ¶
ProcessResponse 处理下载之后的response函数
type RFPDupeFilter ¶
type RFPDupeFilter struct {
// contains filtered or unexported fields
}
RFPDupeFilter 去重组件
func NewRFPDupeFilter ¶
func NewRFPDupeFilter(bloomP float64, bloomN uint) *RFPDupeFilter
NewRFPDupeFilter 新建去重组件 bloomP容错率 bloomN数据规模
func (*RFPDupeFilter) DoDupeFilter ¶
func (f *RFPDupeFilter) DoDupeFilter(ctx *Context) (bool, error)
DoDupeFilter 通过布隆过滤器对request对象进行去重处理
func (*RFPDupeFilter) Fingerprint ¶
func (f *RFPDupeFilter) Fingerprint(ctx *Context) ([]byte, error)
Fingerprint 计算指纹
type RFPDupeFilterInterface ¶
type RFPDupeFilterInterface interface { // Fingerprint request指纹计算 Fingerprint(ctx *Context) ([]byte, error) // DoDupeFilter request去重 DoDupeFilter(ctx *Context) (bool, error) }
RFPDupeFilterInterface request 对象指纹计算和布隆过滤器去重
type Request ¶
type Request struct { // Url 请求Url Url string `json:"url"` // Header 请求头 Header map[string]string `json:"header"` // Method 请求方式 Method RequestMethod `json:"method"` // Body 请求body Body []byte `json:"body"` // Params 请求url的参数 Params map[string]string `json:"params"` // Proxy 代理实例 Proxy *Proxy `json:"-"` // Cookies 请求携带的cookies Cookies map[string]string `json:"cookies"` // Meta 请求携带的额外的信息 Meta map[string]interface{} `json:"meta"` // AllowRedirects 是否允许跳转默认允许 AllowRedirects bool `json:"allowRedirects"` // MaxRedirects 最大的跳转次数 MaxRedirects int `json:"maxRedirects"` // Parser 该请求绑定的响应解析函数,必须是一个spider实例 Parser Parser `json:"-"` // MaxConnsPerHost 单个域名最大的连接数 MaxConnsPerHost int `json:"maxConnsPerHost"` // BodyReader 用于读取body BodyReader io.Reader `json:"-"` // ResponseWriter 响应读取到本地的接口 ResponseWriter io.Writer `json:"-"` // AllowStatusCode 允许的状态码 AllowStatusCode []uint64 `json:"allowStatusCode"` // Timeout 请求超时时间 Timeout time.Duration `json:"timeout"` }
Request 请求对象的结构
func NewRequest ¶
func NewRequest(url string, method RequestMethod, parser Parser, opts ...RequestOption) *Request
NewRequest 从Request对象内存池创建新的Request对象
func RequestFromMap ¶ added in v0.4.1
func RequestFromMap(src map[string]interface{}, opts ...RequestOption) *Request
RequestFromMap 从map创建requests
type RequestMethod ¶ added in v0.4.1
type RequestMethod string
Request 请求方式
const ( GET RequestMethod = "GET" POST RequestMethod = "POST" PUT RequestMethod = "PUT" DELETE RequestMethod = "DELETE" OPTIONS RequestMethod = "OPTIONS" HEAD RequestMethod = "HEAD" )
type RequestOption ¶ added in v0.4.1
type RequestOption func(r *Request)
Option NewRequest 可选参数
func RequestWithAllowRedirects ¶
func RequestWithAllowRedirects(allowRedirects bool) RequestOption
RequestWithAllowRedirects 设置是否允许跳转 如果不允许则MaxRedirects=0
func RequestWithAllowedStatusCode ¶ added in v0.4.1
func RequestWithAllowedStatusCode(allowStatusCode []uint64) RequestOption
RequestWithAllowedStatusCode 设置AllowStatusCode
func RequestWithMaxConnsPerHost ¶
func RequestWithMaxConnsPerHost(maxConnsPerHost int) RequestOption
RequestWithMaxConnsPerHost 设置MaxConnsPerHost
func RequestWithMaxRedirects ¶
func RequestWithMaxRedirects(maxRedirects int) RequestOption
RequestWithMaxRedirects 设置最大的跳转次数 若maxRedirects <= 0则认为不允许跳转AllowRedirects = false
func RequestWithParser ¶ added in v0.4.1
func RequestWithParser(parser Parser) RequestOption
RequestWithParser 设置Parser
func RequestWithRequestBody ¶
func RequestWithRequestBody(body map[string]interface{}) RequestOption
RequestWithRequestBody 传入请求体到request
func RequestWithRequestBytesBody ¶ added in v0.4.1
func RequestWithRequestBytesBody(body []byte) RequestOption
RequestWithRequestBytesBody request绑定bytes body
func RequestWithRequestCookies ¶
func RequestWithRequestCookies(cookies map[string]string) RequestOption
RequestWithRequestCookies 设置cookie
func RequestWithRequestHeader ¶
func RequestWithRequestHeader(header map[string]string) RequestOption
RequestWithRequestHeader 设置请求头
func RequestWithRequestMeta ¶
func RequestWithRequestMeta(meta map[string]interface{}) RequestOption
RequestWithRequestMeta 设置 meta
func RequestWithRequestParams ¶
func RequestWithRequestParams(params map[string]string) RequestOption
RequestWithRequestParams 设置请求的url参数
func RequestWithRequestProxy ¶
func RequestWithRequestProxy(proxy Proxy) RequestOption
RequestWithRequestProxy 设置代理
func RequestWithResponseWriter ¶
func RequestWithResponseWriter(write io.Writer) RequestOption
RequestWithResponseWriter 设置ResponseWriter
func RequestWithTimeout ¶ added in v0.4.1
func RequestWithTimeout(timeout time.Duration) RequestOption
RequestWithTimeout 设置请求超时时间 若timeout<=0则认为没有超时时间
type Response ¶
type Response struct { // Status状态码 Status int // Header 响应头 Header map[string][]string // Header response header // Delay 请求延迟 Delay float64 // Delay the time of handle download request // ContentLength 响应体大小 ContentLength uint64 // ContentLength response content length // URL 请求url URL string // URL of request url // Buffer 响应体缓存 Buffer *bytes.Buffer // buffer read response buffer }
Response 请求响应体的结构
type SpiderDownloader ¶
type SpiderDownloader struct { // ProxyFunc 对单个请求进行代理设置 ProxyFunc func(req *http.Request) (*url.URL, error) // contains filtered or unexported fields }
SpiderDownloader tegenaria 爬虫下载器
func (*SpiderDownloader) CheckStatus ¶
func (d *SpiderDownloader) CheckStatus(statusCode uint64, allowStatus []uint64) bool
CheckStatus 检查状态码是否合法
type SpiderInterface ¶
type SpiderInterface interface { // StartRequest 通过GetFeedUrls()获取种子 // urls并构建初始请求 StartRequest(req chan<- *Context) // Parser 默认的请求响应解析函数 // 在解析过程中生成的新的请求可以推送到req channel Parser(resp *Context, req chan<- *Context) error // ErrorHandler 错误处理函数,允许在此过程中生成新的请求 // 并推送到req channel ErrorHandler(err *Context, req chan<- *Context) // GetName 获取spider名称 GetName() string // GetFeedUrls 获取种子urls GetFeedUrls() []string }
type Spiders ¶
type Spiders struct { // SpidersModules spider名称和spider实例的映射 SpidersModules map[string]SpiderInterface // Parsers parser函数名和函数的映射 // 用于序列化和反序列化 Parsers map[string]Parser }
Spiders 全局spiders管理器 用于接收注册的SpiderInterface实例
var SpidersList *Spiders
func (*Spiders) GetSpider ¶
func (s *Spiders) GetSpider(name string) (SpiderInterface, error)
GetSpider 通过爬虫名获取spider实例
func (*Spiders) Register ¶
func (s *Spiders) Register(spider SpiderInterface) error
Register spider实例注册到Spiders.SpidersModules
type Statistic ¶ added in v0.4.1
type Statistic struct { // ItemScraped items总数 ItemScraped uint64 `json:"items"` // RequestSent 请求总数 RequestSent uint64 `json:"requets"` // DownloadFail 下载失败总数 DownloadFail uint64 `json:"download_fail"` // ErrorCount 错误总数 ErrorCount uint64 `json:"errors"` // contains filtered or unexported fields }
Statistic 数据统计指标
func (*Statistic) GetDownloadFail ¶ added in v0.4.1
GetDownloadFail 获取下载失败的总数
func (*Statistic) GetErrorCount ¶ added in v0.4.1
GetErrorCount 获取捕获到错误总数
func (*Statistic) GetItemScraped ¶ added in v0.4.1
GetItemScraped 获取拿到的item总数
func (*Statistic) GetRequestSent ¶ added in v0.4.1
GetRequestSent 获取发送的请求总数
func (*Statistic) IncrDownloadFail ¶ added in v0.4.1
func (s *Statistic) IncrDownloadFail()
IncrDownloadFail 累加一条下载失败数据
func (*Statistic) IncrErrorCount ¶ added in v0.4.1
func (s *Statistic) IncrErrorCount()
IncrErrorCount 累加捕获到的数量
func (*Statistic) IncrItemScraped ¶ added in v0.4.1
func (s *Statistic) IncrItemScraped()
IncrItemScraped 累加一条item
func (*Statistic) IncrRequestSent ¶ added in v0.4.1
func (s *Statistic) IncrRequestSent()
IncrRequestSent 累加一条request
func (*Statistic) OutputStats ¶ added in v0.4.1
OutputStats 格式化统计数据
type StatisticInterface ¶ added in v0.4.1
type StatisticInterface interface { // IncrItemScraped 累加一条item IncrItemScraped() // IncrRequestSent 累加一条request IncrRequestSent() // IncrDownloadFail 累加一条下载失败数据 IncrDownloadFail() // IncrErrorCount 累加一条错误数据 IncrErrorCount() // GetItemScraped 获取拿到的item总数 GetItemScraped() uint64 // GetRequestSent 获取发送request的总数 GetRequestSent() uint64 // GetErrorCount 获取处理过程中生成的error总数 GetErrorCount() uint64 // GetDownloadFail 获取下载失败的总数 GetDownloadFail() uint64 // OutputStats 格式化统计指标 OutputStats() map[string]uint64 // Reset 重置指标统计组件,所有指标归零 Reset() error // contains filtered or unexported methods }
StatisticInterface 数据统计组件接口
type StatsFieldType ¶ added in v0.4.1
type StatsFieldType string
StatsFieldType 统计指标的数据类型
const ( // RequestStats 发起的请求总数 RequestStats StatsFieldType = "requests" // ItemsStats 获取到的items总数 ItemsStats StatsFieldType = "items" // DownloadFailStats 请求失败总数 DownloadFailStats StatsFieldType = "download_fail" // ErrorStats 错误总数 ErrorStats StatsFieldType = "errors" )
type WorkerConfigWithRdbCluster ¶ added in v0.4.1
type WorkerConfigWithRdbCluster struct { *DistributedWorkerConfig RdbNodes }
WorkerConfigWithRdbCluster redis cluser 模式下的分布式组件配置参数
func NewWorkerConfigWithRdbCluster ¶ added in v0.4.1
func NewWorkerConfigWithRdbCluster(config *DistributedWorkerConfig, nodes RdbNodes) *WorkerConfigWithRdbCluster
NewWorkerConfigWithRdbCluster redis cluster模式的分布式组件