tegenaria

package module
v0.4.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 21, 2023 License: MIT Imports: 36 Imported by: 1

README

Tegenaria crawl framework

codecov go workflow CodeQL
tegenaria是一个基于golang开发的快速、高效率的网络爬虫框架

特性

  • 支持分布式

  • 支持自定义分布式组件

  • 支持自定义的事件监控

  • 支持命令行控制

    安装

  1. go 版本要求>1.17
go get -u github.com/wetrycode/tegenaria@latest
  1. 在您的项目中导入
import "github.com/wetrycode/tegenaria"

快速开始

查看实例demo example

文档

TODO

  • 管理WEB API

Contribution

Feel free to PR and raise issues.
Send me an email directly, vforfreedom96@gmail.com

License

MIT © geebytes

Documentation

Overview

Package tegenaria is a crawler framework based on golang

tegenaria是一个基于golang开发的快速、高效率的网络爬虫框架

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrSpiderMiddleware 下载中间件处理异常
	ErrSpiderMiddleware error = errors.New("handle spider middleware error")
	// ErrSpiderCrawls 抓取流程错误
	ErrSpiderCrawls error = errors.New("handle spider crawl error")
	// ErrDuplicateSpiderName 爬虫名重复错误
	ErrDuplicateSpiderName error = errors.New("register a duplicate spider name error")
	// ErrEmptySpiderName 爬虫名不能为空
	ErrEmptySpiderName error = errors.New("register a empty spider name error")
	// ErrSpiderNotExist 爬虫实例不存在
	ErrSpiderNotExist error = errors.New("not found spider")
	// ErrNotAllowStatusCode 不允许的状态码
	ErrNotAllowStatusCode error = errors.New("not allow handle status code")
	// ErrGetCacheItem 获取item 错误
	ErrGetCacheItem error = errors.New("getting item from cache error")
	// ErrGetHttpProxy 获取http代理错误
	ErrGetHttpProxy error = errors.New("getting http proxy ")
	// ErrGetHttpsProxy 获取https代理错误
	ErrGetHttpsProxy error = errors.New("getting https proxy ")
	// ErrParseSocksProxy 解析socks代理错误
	ErrParseSocksProxy error = errors.New("parse socks proxy ")
	// ErrResponseRead 响应读取失败
	ErrResponseRead error = errors.New("read response to buffer error")
	// ErrResponseParse 响应解析失败
	ErrResponseParse error = errors.New("parse response error")
)
View Source
var ProcessId string = uuid.New().String()

Functions

func AddGo added in v0.4.1

func AddGo(wg *sync.WaitGroup, funcs ...GoFunc) <-chan error

AddGo 向指定的wg添加协程函数 使用chan error 传递异常

func DefaultventsWatcher added in v0.4.1

func DefaultventsWatcher(ch chan EventType, hooker EventHooksInterface) error

DefaultventsWatcher 默认的事件监听器 ch 用于接收事件 hooker 事件处理实例化接口,比如DefualtHooks

func ExecuteCmd added in v0.4.1

func ExecuteCmd(engine *CrawlEngine)

Execute adds all child commands to the root command and sets flags appropriately. This is called by main.main(). It only needs to happen once to the rootCmd.

func GetAllParserMethod added in v0.4.1

func GetAllParserMethod(spider SpiderInterface) map[string]Parser

GetAllParserMethod 获取spider实例所有的解析函数

func GetFunctionName added in v0.4.1

func GetFunctionName(fn Parser) string

GetFunctionName 提取解析函数名

func GetLogger

func GetLogger(Name string) *logrus.Entry

func GetMachineIp added in v0.4.1

func GetMachineIp() string

GetMachineIp 获取本机ip

func GetUUID

func GetUUID() string

func Map2String added in v0.4.1

func Map2String(m interface{}) string

Map2String 将map转为string

func NewDefaultLimiter added in v0.4.1

func NewDefaultLimiter(limitRate int) *defaultLimiter

NewDefaultLimiter 创建一个新的限速器 limitRate 最大请求速率

func NewLeakyBucketLimiterWithRdb added in v0.4.1

func NewLeakyBucketLimiterWithRdb(safetyLevel int, rdb redis.Cmdable, keyFunc GetRDBKey) *leakyBucketLimiterWithRdb

NewLeakyBucketLimiterWithRdb leakyBucketLimiterWithRdb 构造函数

func NewRdbClient added in v0.4.1

func NewRdbClient(config *DistributedWorkerConfig) *redis.Client

func NewRdbClusterCLient added in v0.4.1

func NewRdbClusterCLient(config *WorkerConfigWithRdbCluster) *redis.ClusterClient

func NewRdbConfig added in v0.4.1

func NewRdbConfig(config *DistributedWorkerConfig) *redis.Options

NewRdbConfig redis 配置构造函数

func NewRequestCache

func NewRequestCache() *requestCache

NewRequestCache get a new requestCache

func OptimalNumOfBits added in v0.4.1

func OptimalNumOfBits(n int64, p float64) int64

OptimalNumOfBits 计算位数组长度

func OptimalNumOfHashFunctions added in v0.4.1

func OptimalNumOfHashFunctions(n int64, m int64) int64

OptimalNumOfHashFunctions计算最优的布隆过滤器哈希函数个数

Types

type BaseSpider

type BaseSpider struct {
	// Name spider name
	Name string

	// FeedUrls feed urls
	FeedUrls []string
}

BaseSpider base spider

func NewBaseSpider

func NewBaseSpider(name string, feedUrls []string) *BaseSpider

func (*BaseSpider) ErrorHandler

func (s *BaseSpider) ErrorHandler(err *HandleError)

func (*BaseSpider) Parser

func (s *BaseSpider) Parser(resp *Context, item chan<- *ItemMeta, req chan<- *Context) error

Parser parse request respone it will send item or new request to engine

func (*BaseSpider) StartRequest

func (s *BaseSpider) StartRequest(req chan<- *Context)

type CacheInterface

type CacheInterface interface {
	// contains filtered or unexported methods
}

CacheInterface request缓存组件

type CheckMasterLive added in v0.4.1

type CheckMasterLive func() (bool, error)

CheckMasterLive 检查所有的master节点是否都在线

type Configuration

type Configuration struct {
	// Log *Logger `ymal:"log"`
	*viper.Viper
}
var Config *Configuration = nil

func (*Configuration) GetValue added in v0.4.1

func (c *Configuration) GetValue(key string) (interface{}, error)

type Context

type Context struct {
	// Request 请求对象
	Request *Request

	// Response 响应对象
	Response *Response

	// CtxId context 唯一id由uuid生成
	CtxId string

	// Error
	Error error

	//
	Cancel context.CancelFunc
	//
	Ref int64

	//
	Items chan *ItemMeta

	Spider SpiderInterface
	// contains filtered or unexported fields
}

Context 在引擎中的数据流通载体,负责单个抓取任务的生命周期维护

func NewContext

func NewContext(request *Request, Spider SpiderInterface, opts ...ContextOption) *Context

NewContext 从内存池中构建context对象

func (*Context) Close added in v0.4.1

func (c *Context) Close()

Close 关闭context

func (*Context) Deadline

func (c *Context) Deadline() (deadline time.Time, ok bool)

Deadline

func (*Context) Done

func (c *Context) Done() <-chan struct{}

func (*Context) Err

func (c *Context) Err() error

func (Context) GetCtxId

func (c Context) GetCtxId() string

func (*Context) Value

func (c *Context) Value(key interface{}) interface{}

type ContextInterface added in v0.4.1

type ContextInterface interface {
	IsDone() bool
}

type ContextOption

type ContextOption func(c *Context)

func WithContext

func WithContext(ctx context.Context) ContextOption

WithContext 设置父context

func WithContextId added in v0.4.1

func WithContextId(ctxId string) ContextOption

WithContextId 设置自定义的ctxId

func WithItemChannelSize added in v0.4.1

func WithItemChannelSize(size int) ContextOption

type CrawlEngine added in v0.4.1

type CrawlEngine struct {
	// contains filtered or unexported fields
}

CrawlEngine 引擎是整个框架数据流调度核心

func NewEngine added in v0.4.1

func NewEngine(opts ...EngineOption) *CrawlEngine

构建新的引擎

func (*CrawlEngine) Close added in v0.4.1

func (e *CrawlEngine) Close()

Close 关闭引擎

func (*CrawlEngine) EventsWatcherRunner added in v0.4.1

func (e *CrawlEngine) EventsWatcherRunner() error

EventsWatcherRunner 事件监听器运行组件

func (*CrawlEngine) Execute added in v0.4.1

func (e *CrawlEngine) Execute()

Execute 通过命令行启动spider

func (*CrawlEngine) GetSpiders added in v0.4.1

func (e *CrawlEngine) GetSpiders() *Spiders

GetSpiders 获取所有的已经注册到引擎的spider实例

func (*CrawlEngine) RegisterDownloadMiddlewares added in v0.4.1

func (e *CrawlEngine) RegisterDownloadMiddlewares(middlewares MiddlewaresInterface)

RegisterDownloadMiddlewares 注册下载中间件到引擎

func (*CrawlEngine) RegisterPipelines added in v0.4.1

func (e *CrawlEngine) RegisterPipelines(pipeline PipelinesInterface)

RegisterPipelines 注册pipelines到引擎

func (*CrawlEngine) RegisterSpiders added in v0.4.1

func (e *CrawlEngine) RegisterSpiders(spider SpiderInterface)

RegisterSpider 将spider实例注册到引擎的 spiders

func (*CrawlEngine) Scheduler added in v0.4.1

func (e *CrawlEngine) Scheduler() error

Scheduler 调度器

type DefaultFieldHook

type DefaultFieldHook struct {
}

func (*DefaultFieldHook) Fire

func (hook *DefaultFieldHook) Fire(entry *logrus.Entry) error

func (*DefaultFieldHook) Levels

func (hook *DefaultFieldHook) Levels() []logrus.Level

type DefualtHooks added in v0.4.1

type DefualtHooks struct {
}

func NewDefualtHooks added in v0.4.1

func NewDefualtHooks() *DefualtHooks

NewDefualtHooks 构建新的默认事件监听器

func (*DefualtHooks) Error added in v0.4.1

func (d *DefualtHooks) Error(params ...interface{}) error

Error 处理ERROR事件

func (*DefualtHooks) EventsWatcher added in v0.4.1

func (d *DefualtHooks) EventsWatcher(ch chan EventType) error

EventsWatcher DefualtHooks 的事件监听器

func (*DefualtHooks) Exit added in v0.4.1

func (d *DefualtHooks) Exit(params ...interface{}) error

Exit 处理EXIT事件

func (*DefualtHooks) Heartbeat added in v0.4.1

func (d *DefualtHooks) Heartbeat(params ...interface{}) error

Heartbeat 处理HEARTBEAT事件

func (*DefualtHooks) Start added in v0.4.1

func (d *DefualtHooks) Start(params ...interface{}) error

Start 处理START事件

func (*DefualtHooks) Stop added in v0.4.1

func (d *DefualtHooks) Stop(params ...interface{}) error

Stop 处理STOP事件

type DistributeOptions added in v0.4.1

type DistributeOptions func(w *DistributedWorkerConfig)

func DistributedWithBloomN added in v0.4.1

func DistributedWithBloomN(bloomN uint) DistributeOptions

DistributedWithBloomN 布隆过滤器数据规模

func DistributedWithBloomP added in v0.4.1

func DistributedWithBloomP(bloomP float64) DistributeOptions

DistributedWithBloomP 布隆过滤器容错率

func DistributedWithConnectionsSize added in v0.4.1

func DistributedWithConnectionsSize(size int) DistributeOptions

DistributedWithConnectionsSize rdb 连接池最大连接数

func DistributedWithGetBFKey added in v0.4.1

func DistributedWithGetBFKey(keyFunc GetRDBKey) DistributeOptions

DistributedWithGetBFKey 布隆过滤器的key生成函数

func DistributedWithGetLimitKey added in v0.4.1

func DistributedWithGetLimitKey(keyFunc GetRDBKey) DistributeOptions

DistributedWithGetLimitKey 限速器key的生成函数

func DistributedWithGetqueueKey added in v0.4.1

func DistributedWithGetqueueKey(keyFunc GetRDBKey) DistributeOptions

DistributedWithGetqueueKey 队列key生成函数

func DistributedWithLimiterRate added in v0.4.1

func DistributedWithLimiterRate(rate int) DistributeOptions

DistributedWithLimiterRate 分布式组件下限速器的限速值

func DistributedWithRdbMaxRetry added in v0.4.1

func DistributedWithRdbMaxRetry(retry int) DistributeOptions

DistributedWithRdbMaxRetry rdb失败重试次数

func DistributedWithRdbTimeout added in v0.4.1

func DistributedWithRdbTimeout(timeout time.Duration) DistributeOptions

DistributedWithRdbTimeout rdb超时时间设置

type DistributeStatistic added in v0.4.1

type DistributeStatistic struct {
	// contains filtered or unexported fields
}

DistributeStatistic 分布式统计组件

func NewDistributeStatistic added in v0.4.1

func NewDistributeStatistic(statsPrefixKey string, rdb redis.Cmdable, wg *sync.WaitGroup, opts ...DistributeStatisticOption) *DistributeStatistic

NewDistributeStatistic 分布式数据统计组件构造函数

func (*DistributeStatistic) GetDownloadFail added in v0.4.1

func (s *DistributeStatistic) GetDownloadFail() uint64

GetDownloadFail 获取下载失败数

func (*DistributeStatistic) GetErrorCount added in v0.4.1

func (s *DistributeStatistic) GetErrorCount() uint64

GetErrorCount 获取错误数

func (*DistributeStatistic) GetItemScraped added in v0.4.1

func (s *DistributeStatistic) GetItemScraped() uint64

GetItemScraped 获取items的值

func (*DistributeStatistic) GetRequestSent added in v0.4.1

func (s *DistributeStatistic) GetRequestSent() uint64

GetRequestSent 获取request 量

func (*DistributeStatistic) GetStatsField added in v0.4.1

func (s *DistributeStatistic) GetStatsField(field StatsFieldType) uint64

GetStatsField 获取指定的数据指标值

func (*DistributeStatistic) IncrDownloadFail added in v0.4.1

func (s *DistributeStatistic) IncrDownloadFail()

GetDownloadFail 累计获取下载失败的数量

func (*DistributeStatistic) IncrErrorCount added in v0.4.1

func (s *DistributeStatistic) IncrErrorCount()

IncrErrorCount 累加一条错误

func (*DistributeStatistic) IncrItemScraped added in v0.4.1

func (s *DistributeStatistic) IncrItemScraped()

IncrItemScraped 累加一条item

func (*DistributeStatistic) IncrRequestSent added in v0.4.1

func (s *DistributeStatistic) IncrRequestSent()

IncrRequestSent 累加一条request

func (*DistributeStatistic) IncrStats added in v0.4.1

func (s *DistributeStatistic) IncrStats(field StatsFieldType)

IncrStats累加指定的统计指标

func (*DistributeStatistic) OutputStats added in v0.4.1

func (s *DistributeStatistic) OutputStats() map[string]uint64

OutputStats 格式化输出所有的数据指标

func (*DistributeStatistic) Reset added in v0.4.1

func (s *DistributeStatistic) Reset() error

Reset 重置各项指标 若afterResetTTL>0则为每一项指标设置ttl否则直接删除指标

type DistributeStatisticOption added in v0.4.1

type DistributeStatisticOption func(d *DistributeStatistic)

DistributeStatisticOption 分布式组件可选参数定义

func DistributeStatisticAfterResetTTL added in v0.4.1

func DistributeStatisticAfterResetTTL(ttl time.Duration) DistributeStatisticOption

DistributeStatisticAfterResetTTL 为分布式计数器设置重置之前的ttl

type DistributedHooks added in v0.4.1

type DistributedHooks struct {
	// contains filtered or unexported fields
}

DistributedHooks 分布式事件监听器

func NewDistributedHooks added in v0.4.1

func NewDistributedHooks(worker DistributedWorkerInterface) *DistributedHooks

DistributedHooks 构建新的分布式监听器组件对象

func (*DistributedHooks) Error added in v0.4.1

func (d *DistributedHooks) Error(params ...interface{}) error

Error 用于处理分布式模式下的ERROR事件

func (*DistributedHooks) EventsWatcher added in v0.4.1

func (d *DistributedHooks) EventsWatcher(ch chan EventType) error

EventsWatcher 分布式模式下的事件监听器

func (*DistributedHooks) Exit added in v0.4.1

func (d *DistributedHooks) Exit(params ...interface{}) error

Exit 用于处理分布式模式下的Exit事件

func (*DistributedHooks) Heartbeat added in v0.4.1

func (d *DistributedHooks) Heartbeat(params ...interface{}) error

Exit 用于处理分布式模式下的HEARTBEAT事件

func (*DistributedHooks) Start added in v0.4.1

func (d *DistributedHooks) Start(params ...interface{}) error

Start 用于处理分布式模式下的START事件

func (*DistributedHooks) Stop added in v0.4.1

func (d *DistributedHooks) Stop(params ...interface{}) error

Stop 用于处理分布式模式下的STOP事件

type DistributedWorker added in v0.4.1

type DistributedWorker struct {
	// contains filtered or unexported fields
}

DistributedWorker 分布式组件,包含两个组件: request请求缓存队列,由各个节点上的引擎读队列消费, redis 队列缓存的是经过gob序列化之后的二进制数据 布隆过滤器主要是用于去重 该组件同事实现了RFPDupeFilterInterface 和CacheInterface

func NewDistributedWorker added in v0.4.1

func NewDistributedWorker(addr string, config *DistributedWorkerConfig) *DistributedWorker

NewDistributedWorker 构建redis单机模式下的分布式工作组件

func NewWorkerWithRdbCluster added in v0.4.1

func NewWorkerWithRdbCluster(config *WorkerConfigWithRdbCluster) *DistributedWorker

NewWorkerWithRdbCluster redis cluster模式下的分布式工作组件

func (*DistributedWorker) Add added in v0.4.1

func (w *DistributedWorker) Add(fingerprint []byte) error

Add 添加指纹到布隆过滤器

func (*DistributedWorker) AddNode added in v0.4.1

func (w *DistributedWorker) AddNode() error

AddNode 新增节点

func (*DistributedWorker) CheckAllNodesStop added in v0.4.1

func (w *DistributedWorker) CheckAllNodesStop() (bool, error)

CheckAllNodesStop 检查所有的节点是否都已经停止

func (*DistributedWorker) CheckMasterLive added in v0.4.1

func (w *DistributedWorker) CheckMasterLive() (bool, error)

CheckMasterLive 检查所有master 节点是否都在线

func (*DistributedWorker) DelNode added in v0.4.1

func (w *DistributedWorker) DelNode() error

DelNode 删除当前节点

func (*DistributedWorker) DoDupeFilter added in v0.4.1

func (w *DistributedWorker) DoDupeFilter(ctx *Context) (bool, error)

DoDupeFilter request去重处理,如果指纹已经存在则返回True,否则为False 指纹不存在的情况下会将指纹添加到缓存

func (*DistributedWorker) Fingerprint added in v0.4.1

func (w *DistributedWorker) Fingerprint(ctx *Context) ([]byte, error)

Fingerprint 生成request 对象的指纹

func (*DistributedWorker) GetLimter added in v0.4.1

func (w *DistributedWorker) GetLimter() LimitInterface

GetLimter 获取限速器

func (*DistributedWorker) Heartbeat added in v0.4.1

func (w *DistributedWorker) Heartbeat() error

Heartbeat 心跳包

func (*DistributedWorker) SetMaster added in v0.4.1

func (w *DistributedWorker) SetMaster(flag bool)

SetMaster 设置当前的节点是否为master

func (*DistributedWorker) SetSpiders added in v0.4.1

func (w *DistributedWorker) SetSpiders(spiders *Spiders)

SetSpiders 设置所有的已注册的spider

func (*DistributedWorker) StopNode added in v0.4.1

func (w *DistributedWorker) StopNode() error

StopNode 停止当前节点的活动

func (*DistributedWorker) TestOrAdd added in v0.4.1

func (w *DistributedWorker) TestOrAdd(fingerprint []byte) (bool, error)

TestOrAdd 如果指纹已经存在则返回True,否则为False 指纹不存在的情况下会将指纹添加到缓存

type DistributedWorkerConfig added in v0.4.1

type DistributedWorkerConfig struct {
	// RedisAddr redis 地址
	RedisAddr string
	// RedisPasswd redis 密码
	RedisPasswd string
	// RedisUsername redis 用户名
	RedisUsername string
	// RedisDB redis 数据库索引 index
	RedisDB uint32
	// RdbConnectionsSize 连接池大小
	RdbConnectionsSize uint64
	// RdbTimeout redis 超时时间
	RdbTimeout time.Duration
	// RdbMaxRetry redis操作失败后的重试次数
	RdbMaxRetry int
	// BloomP 布隆过滤器的容错率
	BloomP float64
	// BloomN 数据规模,比如1024 * 1024
	BloomN uint
	// 并发量
	LimiterRate int
	// GetqueueKey 生成队列key的函数,允许用户自定义
	GetqueueKey GetRDBKey
	// GetBFKey 布隆过滤器对应的生成key的函数,允许用户自定义
	GetBFKey GetRDBKey
	// contains filtered or unexported fields
}

DistributedWorkerConfig 分布式组件的配置参数

func NewDistributedWorkerConfig added in v0.4.1

func NewDistributedWorkerConfig(username string, passwd string, db uint32, opts ...DistributeOptions) *DistributedWorkerConfig

NewDistributedWorkerConfig 新建分布式组件的配置

type DistributedWorkerInterface added in v0.4.1

type DistributedWorkerInterface interface {
	CacheInterface
	RFPDupeFilterInterface
	// AddNode 新增一个节点
	AddNode() error
	// DelNode 删除当前的节点
	DelNode() error
	// StopNode 停止当前的节点
	StopNode() error
	// Heartbeat 心跳
	Heartbeat() error
	// CheckAllNodesStop 检查所有的节点是否都已经停止
	CheckAllNodesStop() (bool, error)
	// GetLimter 获取限速器
	GetLimter() LimitInterface
	// CheckMasterLive 检测主节点是否还在线
	CheckMasterLive() (bool, error)
	// SetMaster 是否将当前的节点设置为主节点
	SetMaster(flag bool)
	// SetSpiders 设置已经注册的所有的spider
	SetSpiders(spiders *Spiders)
}

DistributedWorkerInterface 分布式组件接口

type Downloader

type Downloader interface {
	// Download 下载函数
	Download(ctx *Context) (*Response, error)

	// CheckStatus 检查响应状态码的合法性
	CheckStatus(statusCode uint64, allowStatus []uint64) bool
}

Downloader 下载器接口

func NewDownloader

func NewDownloader(opts ...DownloaderOption) Downloader

NewDownloader 构建新的下载器

type DownloaderOption

type DownloaderOption func(d *SpiderDownloader)

DownloaderOption 下载器可选参数函数

func DownloadWithClient

func DownloadWithClient(client http.Client) DownloaderOption

DownloadWithClient 设置下载器的http.Client客户端

func DownloadWithH2 added in v0.4.1

func DownloadWithH2(h2 bool) DownloaderOption

DownloadWithTlsConfig 下载器是否开启http2

func DownloadWithTimeout

func DownloadWithTimeout(timeout time.Duration) DownloaderOption

DownloadWithTimeout 设置下载器的网络请求超时时间

func DownloadWithTlsConfig

func DownloadWithTlsConfig(tls *tls.Config) DownloaderOption

DownloadWithTlsConfig 设置下载器的tls

func DownloaderWithtransport

func DownloaderWithtransport(transport *http.Transport) DownloaderOption

DownloaderWithtransport 为下载器设置 http.Transport

type EngineOption

type EngineOption func(r *CrawlEngine)

EngineOption 引擎构造过程中的可选参数

func EngineWithCache added in v0.4.1

func EngineWithCache(cache CacheInterface) EngineOption

EngineWithCache 引擎使用的缓存组件

func EngineWithDistributedWorker added in v0.4.1

func EngineWithDistributedWorker(woker DistributedWorkerInterface) EngineOption

EngineWithDistributedWorker 引擎使用的的分布式组件

func EngineWithDownloader

func EngineWithDownloader(downloader Downloader) EngineOption

EngineWithDownloader 引擎使用的下载器组件

func EngineWithFilter added in v0.4.1

func EngineWithFilter(filter RFPDupeFilterInterface) EngineOption

EngineWithFilter 引擎使用的过滤去重组件

func EngineWithLimiter added in v0.4.1

func EngineWithLimiter(limiter LimitInterface) EngineOption

EngineWithLimiter 引擎使用的限速器

func EngineWithUniqueReq

func EngineWithUniqueReq(uniqueReq bool) EngineOption

EngineWithUniqueReq 是否进行去重处理

type ErrorOption

type ErrorOption func(e *HandleError)

ErrorOption HandleError 可选参数

func ErrorWithExtras added in v0.4.1

func ErrorWithExtras(extras map[string]interface{}) ErrorOption

ErrorWithExtras HandleError 添加额外的数据

type EventHooksInterface added in v0.4.1

type EventHooksInterface interface {
	// Start 处理引擎启动事件
	Start(params ...interface{}) error
	// Stop 处理引擎停止事件
	Stop(params ...interface{}) error
	// Error处理错误事件
	Error(params ...interface{}) error
	// Exit 退出引擎事件
	Exit(params ...interface{}) error
	// Heartbeat 心跳检查事件
	Heartbeat(params ...interface{}) error
	// EventsWatcher 事件监听器
	EventsWatcher(ch chan EventType) error
}

EventHooksInterface 事件处理函数接口

type EventType added in v0.4.1

type EventType int

EventType 事件类型

const (
	// START 启动
	START EventType = iota
	// HEARTBEAT 心跳
	HEARTBEAT
	// STOP 停止
	STOP
	// ERROR 错误
	ERROR
	// EXIT 退出
	EXIT
)

type EventsWatcher added in v0.4.1

type EventsWatcher func(ch chan EventType) error

EventsWatcher 事件监听器

type GetRDBKey added in v0.4.1

type GetRDBKey func(params ...interface{}) (string, time.Duration)

GetRDBKey 获取缓存rdb key和ttl

type GoFunc added in v0.4.1

type GoFunc func() error

GoFunc 协程函数

type HandleError

type HandleError struct {
	// CtxId 上下文id
	CtxId string
	// Err 处理过程的错误
	Err error
	// Extras 携带的额外信息
	Extras map[string]interface{}
}

HandleError 错误处理接口

func NewError

func NewError(ctx *Context, err error, opts ...ErrorOption) *HandleError

NewError 构建新的HandleError实例

func (*HandleError) Error

func (e *HandleError) Error() string

Error 获取HandleError错误信息

type Hook added in v0.4.1

type Hook func(params ...interface{}) error

Hook 事件处理函数类型

type ItemInterface

type ItemInterface interface {
}

ItemInterface item实例接口

type ItemMeta

type ItemMeta struct {
	// CtxId 对应的context id
	CtxId string
	// Item item对象
	Item ItemInterface
}

ItemMeta item元数据结构

func NewItem

func NewItem(ctx *Context, item ItemInterface) *ItemMeta

NewItem 构建新的ItemMeta对象

type ItemPipelines

type ItemPipelines []PipelinesInterface

func (ItemPipelines) Len

func (p ItemPipelines) Len() int

func (ItemPipelines) Less

func (p ItemPipelines) Less(i, j int) bool

func (ItemPipelines) Swap

func (p ItemPipelines) Swap(i, j int)

type LimitInterface added in v0.4.1

type LimitInterface interface {
	// contains filtered or unexported methods
}

LimitInterface 限速器接口

type Middlewares

type Middlewares []MiddlewaresInterface

Middlewares 下载中间件队列

func (Middlewares) Len

func (p Middlewares) Len() int

实现sort接口

func (Middlewares) Less

func (p Middlewares) Less(i, j int) bool

func (Middlewares) Swap

func (p Middlewares) Swap(i, j int)

type MiddlewaresBase

type MiddlewaresBase struct {
	Priority int
}

type MiddlewaresInterface

type MiddlewaresInterface interface {
	// GetPriority 获取优先级,数字越小优先级越高
	GetPriority() int

	// ProcessRequest 处理request请求对象
	// 此处用于增加请求头
	// 按优先级执行
	ProcessRequest(ctx *Context) error

	// ProcessResponse 用于处理请求成功之后的response
	// 执行顺序你优先级,及优先级越高执行顺序越晚
	ProcessResponse(ctx *Context, req chan<- *Context) error

	// GetName 获取中间件的名称
	GetName() string
}

MiddlewaresInterface 下载中间件的接口用于处理进入下载器之前的request对象 和下载之后的response

type Parser

type Parser func(resp *Context, req chan<- *Context) error

Parser 响应解析函数结构

func GetParserByName added in v0.4.1

func GetParserByName(spider SpiderInterface, name string) Parser

GetParserByName 通过函数名从spider实例中获取解析函数

type PipelinesBase

type PipelinesBase struct {
	Priority int
}

type PipelinesInterface

type PipelinesInterface interface {
	// GetPriority 获取当前pipeline的优先级
	GetPriority() int
	// ProcessItem item处理单元
	ProcessItem(spider SpiderInterface, item *ItemMeta) error
}

PipelinesInterface pipeline 接口 pipeline 主要用于处理item,例如数据存储、数据清洗 将多个pipeline注册到引擎可以实现责任链模式的数据处理

type ProcessResponse

type ProcessResponse func(ctx *Context) error

ProcessResponse 处理下载之后的response函数

type Proxy

type Proxy struct {
	// ProxyUrl 代理链接
	ProxyUrl string
}

Proxy 代理数据结构

type RFPDupeFilter

type RFPDupeFilter struct {
	// contains filtered or unexported fields
}

RFPDupeFilter 去重组件

func NewRFPDupeFilter

func NewRFPDupeFilter(bloomP float64, bloomN uint) *RFPDupeFilter

NewRFPDupeFilter 新建去重组件 bloomP容错率 bloomN数据规模

func (*RFPDupeFilter) DoDupeFilter

func (f *RFPDupeFilter) DoDupeFilter(ctx *Context) (bool, error)

DoDupeFilter 通过布隆过滤器对request对象进行去重处理

func (*RFPDupeFilter) Fingerprint

func (f *RFPDupeFilter) Fingerprint(ctx *Context) ([]byte, error)

Fingerprint 计算指纹

type RFPDupeFilterInterface

type RFPDupeFilterInterface interface {
	// Fingerprint request指纹计算
	Fingerprint(ctx *Context) ([]byte, error)

	// DoDupeFilter request去重
	DoDupeFilter(ctx *Context) (bool, error)
}

RFPDupeFilterInterface request 对象指纹计算和布隆过滤器去重

type RdbNodes added in v0.4.1

type RdbNodes []string

RdbNodes redis cluster 节点地址

type RedirectError

type RedirectError struct {
	RedirectNum int
}

RedirectError 重定向错误

func (*RedirectError) Error

func (e *RedirectError) Error() string

Error获取RedirectError错误

type Request

type Request struct {
	// Url 请求Url
	Url string `json:"url"`
	// Header 请求头
	Header map[string]string `json:"header"`
	// Method 请求方式
	Method RequestMethod `json:"method"`
	// Body 请求body
	Body []byte `json:"body"`
	// Params 请求url的参数
	Params map[string]string `json:"params"`
	// Proxy 代理实例
	Proxy *Proxy `json:"-"`
	// Cookies 请求携带的cookies
	Cookies map[string]string `json:"cookies"`
	// Meta 请求携带的额外的信息
	Meta map[string]interface{} `json:"meta"`
	// AllowRedirects 是否允许跳转默认允许
	AllowRedirects bool `json:"allowRedirects"`
	// MaxRedirects 最大的跳转次数
	MaxRedirects int `json:"maxRedirects"`
	// Parser 该请求绑定的响应解析函数,必须是一个spider实例
	Parser Parser `json:"-"`
	// MaxConnsPerHost 单个域名最大的连接数
	MaxConnsPerHost int `json:"maxConnsPerHost"`
	// BodyReader 用于读取body
	BodyReader io.Reader `json:"-"`
	// ResponseWriter 响应读取到本地的接口
	ResponseWriter io.Writer `json:"-"`
	// AllowStatusCode 允许的状态码
	AllowStatusCode []uint64 `json:"allowStatusCode"`
	// Timeout 请求超时时间
	Timeout time.Duration `json:"timeout"`
}

Request 请求对象的结构

func NewRequest

func NewRequest(url string, method RequestMethod, parser Parser, opts ...RequestOption) *Request

NewRequest 从Request对象内存池创建新的Request对象

func RequestFromMap added in v0.4.1

func RequestFromMap(src map[string]interface{}, opts ...RequestOption) *Request

RequestFromMap 从map创建requests

func (*Request) ToMap added in v0.4.1

func (r *Request) ToMap() (map[string]interface{}, error)

ToMap 将request对象转为map

type RequestMethod added in v0.4.1

type RequestMethod string

Request 请求方式

const (
	GET     RequestMethod = "GET"
	POST    RequestMethod = "POST"
	PUT     RequestMethod = "PUT"
	DELETE  RequestMethod = "DELETE"
	OPTIONS RequestMethod = "OPTIONS"
	HEAD    RequestMethod = "HEAD"
)

type RequestOption added in v0.4.1

type RequestOption func(r *Request)

Option NewRequest 可选参数

func RequestWithAllowRedirects

func RequestWithAllowRedirects(allowRedirects bool) RequestOption

RequestWithAllowRedirects 设置是否允许跳转 如果不允许则MaxRedirects=0

func RequestWithAllowedStatusCode added in v0.4.1

func RequestWithAllowedStatusCode(allowStatusCode []uint64) RequestOption

RequestWithAllowedStatusCode 设置AllowStatusCode

func RequestWithMaxConnsPerHost

func RequestWithMaxConnsPerHost(maxConnsPerHost int) RequestOption

RequestWithMaxConnsPerHost 设置MaxConnsPerHost

func RequestWithMaxRedirects

func RequestWithMaxRedirects(maxRedirects int) RequestOption

RequestWithMaxRedirects 设置最大的跳转次数 若maxRedirects <= 0则认为不允许跳转AllowRedirects = false

func RequestWithParser added in v0.4.1

func RequestWithParser(parser Parser) RequestOption

RequestWithParser 设置Parser

func RequestWithRequestBody

func RequestWithRequestBody(body map[string]interface{}) RequestOption

RequestWithRequestBody 传入请求体到request

func RequestWithRequestBytesBody added in v0.4.1

func RequestWithRequestBytesBody(body []byte) RequestOption

RequestWithRequestBytesBody request绑定bytes body

func RequestWithRequestCookies

func RequestWithRequestCookies(cookies map[string]string) RequestOption

RequestWithRequestCookies 设置cookie

func RequestWithRequestHeader

func RequestWithRequestHeader(header map[string]string) RequestOption

RequestWithRequestHeader 设置请求头

func RequestWithRequestMeta

func RequestWithRequestMeta(meta map[string]interface{}) RequestOption

RequestWithRequestMeta 设置 meta

func RequestWithRequestParams

func RequestWithRequestParams(params map[string]string) RequestOption

RequestWithRequestParams 设置请求的url参数

func RequestWithRequestProxy

func RequestWithRequestProxy(proxy Proxy) RequestOption

RequestWithRequestProxy 设置代理

func RequestWithResponseWriter

func RequestWithResponseWriter(write io.Writer) RequestOption

RequestWithResponseWriter 设置ResponseWriter

func RequestWithTimeout added in v0.4.1

func RequestWithTimeout(timeout time.Duration) RequestOption

RequestWithTimeout 设置请求超时时间 若timeout<=0则认为没有超时时间

type Response

type Response struct {
	// Status状态码
	Status int
	// Header 响应头
	Header map[string][]string // Header response header
	// Delay 请求延迟
	Delay float64 // Delay the time of handle download request
	// ContentLength 响应体大小
	ContentLength uint64 // ContentLength response content length
	// URL 请求url
	URL string // URL of request url
	// Buffer 响应体缓存
	Buffer *bytes.Buffer // buffer read response buffer
}

Response 请求响应体的结构

func NewResponse

func NewResponse() *Response

NewResponse 从内存池中创建新的response对象

func (*Response) Json

func (r *Response) Json() (map[string]interface{}, error)

Json 将响应数据转为json

func (*Response) String

func (r *Response) String() string

String 将响应数据转为string

type Settings

type Settings interface {
	// GetValue 获取指定的参数值
	GetValue(key string) (interface{}, error)
}

type SpiderDownloader

type SpiderDownloader struct {

	// ProxyFunc 对单个请求进行代理设置
	ProxyFunc func(req *http.Request) (*url.URL, error)
	// contains filtered or unexported fields
}

SpiderDownloader tegenaria 爬虫下载器

func (*SpiderDownloader) CheckStatus

func (d *SpiderDownloader) CheckStatus(statusCode uint64, allowStatus []uint64) bool

CheckStatus 检查状态码是否合法

func (*SpiderDownloader) Download

func (d *SpiderDownloader) Download(ctx *Context) (*Response, error)

Download 处理request请求

type SpiderInterface

type SpiderInterface interface {
	// StartRequest 通过GetFeedUrls()获取种子
	// urls并构建初始请求
	StartRequest(req chan<- *Context)

	// Parser 默认的请求响应解析函数
	// 在解析过程中生成的新的请求可以推送到req channel
	Parser(resp *Context, req chan<- *Context) error

	// ErrorHandler 错误处理函数,允许在此过程中生成新的请求
	// 并推送到req channel
	ErrorHandler(err *Context, req chan<- *Context)

	// GetName 获取spider名称
	GetName() string
	// GetFeedUrls 获取种子urls
	GetFeedUrls() []string
}

type Spiders

type Spiders struct {
	// SpidersModules spider名称和spider实例的映射
	SpidersModules map[string]SpiderInterface
	// Parsers parser函数名和函数的映射
	// 用于序列化和反序列化
	Parsers map[string]Parser
}

Spiders 全局spiders管理器 用于接收注册的SpiderInterface实例

var SpidersList *Spiders

func NewSpiders

func NewSpiders() *Spiders

NewSpiders 构建Spiders实例

func (*Spiders) GetSpider

func (s *Spiders) GetSpider(name string) (SpiderInterface, error)

GetSpider 通过爬虫名获取spider实例

func (*Spiders) Register

func (s *Spiders) Register(spider SpiderInterface) error

Register spider实例注册到Spiders.SpidersModules

type Statistic added in v0.4.1

type Statistic struct {
	// ItemScraped items总数
	ItemScraped uint64 `json:"items"`
	// RequestSent 请求总数
	RequestSent uint64 `json:"requets"`
	// DownloadFail 下载失败总数
	DownloadFail uint64 `json:"download_fail"`
	// ErrorCount 错误总数
	ErrorCount uint64 `json:"errors"`
	// contains filtered or unexported fields
}

Statistic 数据统计指标

func NewStatistic added in v0.4.1

func NewStatistic() *Statistic

NewStatistic 默认统计数据组件构造函数

func (*Statistic) GetDownloadFail added in v0.4.1

func (s *Statistic) GetDownloadFail() uint64

GetDownloadFail 获取下载失败的总数

func (*Statistic) GetErrorCount added in v0.4.1

func (s *Statistic) GetErrorCount() uint64

GetErrorCount 获取捕获到错误总数

func (*Statistic) GetItemScraped added in v0.4.1

func (s *Statistic) GetItemScraped() uint64

GetItemScraped 获取拿到的item总数

func (*Statistic) GetRequestSent added in v0.4.1

func (s *Statistic) GetRequestSent() uint64

GetRequestSent 获取发送的请求总数

func (*Statistic) IncrDownloadFail added in v0.4.1

func (s *Statistic) IncrDownloadFail()

IncrDownloadFail 累加一条下载失败数据

func (*Statistic) IncrErrorCount added in v0.4.1

func (s *Statistic) IncrErrorCount()

IncrErrorCount 累加捕获到的数量

func (*Statistic) IncrItemScraped added in v0.4.1

func (s *Statistic) IncrItemScraped()

IncrItemScraped 累加一条item

func (*Statistic) IncrRequestSent added in v0.4.1

func (s *Statistic) IncrRequestSent()

IncrRequestSent 累加一条request

func (*Statistic) OutputStats added in v0.4.1

func (s *Statistic) OutputStats() map[string]uint64

OutputStats 格式化统计数据

func (*Statistic) Reset added in v0.4.1

func (s *Statistic) Reset() error

Reset 重置统计数据

type StatisticInterface added in v0.4.1

type StatisticInterface interface {
	// IncrItemScraped 累加一条item
	IncrItemScraped()
	// IncrRequestSent 累加一条request
	IncrRequestSent()
	// IncrDownloadFail 累加一条下载失败数据
	IncrDownloadFail()
	// IncrErrorCount 累加一条错误数据
	IncrErrorCount()
	// GetItemScraped 获取拿到的item总数
	GetItemScraped() uint64
	// GetRequestSent 获取发送request的总数
	GetRequestSent() uint64
	// GetErrorCount 获取处理过程中生成的error总数
	GetErrorCount() uint64
	// GetDownloadFail 获取下载失败的总数
	GetDownloadFail() uint64
	// OutputStats 格式化统计指标
	OutputStats() map[string]uint64
	// Reset 重置指标统计组件,所有指标归零
	Reset() error
	// contains filtered or unexported methods
}

StatisticInterface 数据统计组件接口

type StatsFieldType added in v0.4.1

type StatsFieldType string

StatsFieldType 统计指标的数据类型

const (
	// RequestStats 发起的请求总数
	RequestStats StatsFieldType = "requests"
	// ItemsStats 获取到的items总数
	ItemsStats StatsFieldType = "items"
	// DownloadFailStats 请求失败总数
	DownloadFailStats StatsFieldType = "download_fail"
	// ErrorStats 错误总数
	ErrorStats StatsFieldType = "errors"
)

type WorkerConfigWithRdbCluster added in v0.4.1

type WorkerConfigWithRdbCluster struct {
	*DistributedWorkerConfig
	RdbNodes
}

WorkerConfigWithRdbCluster redis cluser 模式下的分布式组件配置参数

func NewWorkerConfigWithRdbCluster added in v0.4.1

func NewWorkerConfigWithRdbCluster(config *DistributedWorkerConfig, nodes RdbNodes) *WorkerConfigWithRdbCluster

NewWorkerConfigWithRdbCluster redis cluster模式的分布式组件

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL