Documentation
¶
Overview ¶
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
Index ¶
- Variables
- func AddGo(wg *sync.WaitGroup, funcs ...GoFunc) <-chan error
- func DefaultventsWatcher(ch chan EventType, hooker EventHooksInterface) error
- func ExecuteCmd(engine *CrawlEngine)
- func GetAllParserMethod(spider SpiderInterface) map[string]Parser
- func GetFunctionName(fn Parser) string
- func GetLogger(Name string) *logrus.Entry
- func GetMachineIp() string
- func GetUUID() string
- func Map2String(m interface{}) string
- func NewDefaultLimiter(limitRate int) *defaultLimiter
- func NewLeakyBucketLimiterWithRdb(safetyLevel int, rdb redis.Cmdable, keyFunc GetRDBKey) *leakyBucketLimiterWithRdb
- func NewRdbClient(config *DistributedWorkerConfig) *redis.Client
- func NewRdbClusterCLient(config *WorkerConfigWithRdbCluster) *redis.ClusterClient
- func NewRdbConfig(config *DistributedWorkerConfig) *redis.Options
- func NewRequestCache() *requestCache
- func OptimalNumOfBits(n int64, p float64) int64
- func OptimalNumOfHashFunctions(n int64, m int64) int64
- type BaseSpider
- type CacheInterface
- type CheckMasterLive
- type Configuration
- type Context
- type ContextInterface
- type ContextOption
- type CrawlEngine
- func (e *CrawlEngine) Close()
- func (e *CrawlEngine) EventsWatcherRunner() error
- func (e *CrawlEngine) Execute()
- func (e *CrawlEngine) GetSpiders() *Spiders
- func (e *CrawlEngine) RegisterDownloadMiddlewares(middlewares MiddlewaresInterface)
- func (e *CrawlEngine) RegisterPipelines(pipeline PipelinesInterface)
- func (e *CrawlEngine) RegisterSpiders(spider SpiderInterface)
- func (e *CrawlEngine) Scheduler() error
- type DefaultFieldHook
- type DefualtHooks
- func (d *DefualtHooks) Error(params ...interface{}) error
- func (d *DefualtHooks) EventsWatcher(ch chan EventType) error
- func (d *DefualtHooks) Exit(params ...interface{}) error
- func (d *DefualtHooks) Heartbeat(params ...interface{}) error
- func (d *DefualtHooks) Start(params ...interface{}) error
- func (d *DefualtHooks) Stop(params ...interface{}) error
- type DistributeOptions
- func DistributedWithBloomN(bloomN uint) DistributeOptions
- func DistributedWithBloomP(bloomP float64) DistributeOptions
- func DistributedWithConnectionsSize(size int) DistributeOptions
- func DistributedWithGetBFKey(keyFunc GetRDBKey) DistributeOptions
- func DistributedWithGetLimitKey(keyFunc GetRDBKey) DistributeOptions
- func DistributedWithGetqueueKey(keyFunc GetRDBKey) DistributeOptions
- func DistributedWithLimiterRate(rate int) DistributeOptions
- func DistributedWithRdbMaxRetry(retry int) DistributeOptions
- func DistributedWithRdbTimeout(timeout time.Duration) DistributeOptions
- type DistributeStatistic
- func (s *DistributeStatistic) GetDownloadFail() uint64
- func (s *DistributeStatistic) GetErrorCount() uint64
- func (s *DistributeStatistic) GetItemScraped() uint64
- func (s *DistributeStatistic) GetRequestSent() uint64
- func (s *DistributeStatistic) GetStatsField(field StatsFieldType) uint64
- func (s *DistributeStatistic) IncrDownloadFail()
- func (s *DistributeStatistic) IncrErrorCount()
- func (s *DistributeStatistic) IncrItemScraped()
- func (s *DistributeStatistic) IncrRequestSent()
- func (s *DistributeStatistic) IncrStats(field StatsFieldType)
- func (s *DistributeStatistic) OutputStats() map[string]uint64
- func (s *DistributeStatistic) Reset() error
- type DistributeStatisticOption
- type DistributedHooks
- func (d *DistributedHooks) Error(params ...interface{}) error
- func (d *DistributedHooks) EventsWatcher(ch chan EventType) error
- func (d *DistributedHooks) Exit(params ...interface{}) error
- func (d *DistributedHooks) Heartbeat(params ...interface{}) error
- func (d *DistributedHooks) Start(params ...interface{}) error
- func (d *DistributedHooks) Stop(params ...interface{}) error
- type DistributedWorker
- func (w *DistributedWorker) Add(fingerprint []byte) error
- func (w *DistributedWorker) AddNode() error
- func (w *DistributedWorker) CheckAllNodesStop() (bool, error)
- func (w *DistributedWorker) CheckMasterLive() (bool, error)
- func (w *DistributedWorker) DelNode() error
- func (w *DistributedWorker) DoDupeFilter(ctx *Context) (bool, error)
- func (w *DistributedWorker) Fingerprint(ctx *Context) ([]byte, error)
- func (w *DistributedWorker) GetLimter() LimitInterface
- func (w *DistributedWorker) Heartbeat() error
- func (w *DistributedWorker) SetMaster(flag bool)
- func (w *DistributedWorker) SetSpiders(spiders *Spiders)
- func (w *DistributedWorker) StopNode() error
- func (w *DistributedWorker) TestOrAdd(fingerprint []byte) (bool, error)
- type DistributedWorkerConfig
- type DistributedWorkerInterface
- type Downloader
- type DownloaderOption
- func DownloadWithClient(client http.Client) DownloaderOption
- func DownloadWithH2(h2 bool) DownloaderOption
- func DownloadWithTimeout(timeout time.Duration) DownloaderOption
- func DownloadWithTlsConfig(tls *tls.Config) DownloaderOption
- func DownloaderWithtransport(transport *http.Transport) DownloaderOption
- type EngineOption
- func EngineWithCache(cache CacheInterface) EngineOption
- func EngineWithDistributedWorker(woker DistributedWorkerInterface) EngineOption
- func EngineWithDownloader(downloader Downloader) EngineOption
- func EngineWithFilter(filter RFPDupeFilterInterface) EngineOption
- func EngineWithLimiter(limiter LimitInterface) EngineOption
- func EngineWithUniqueReq(uniqueReq bool) EngineOption
- type ErrorOption
- type EventHooksInterface
- type EventType
- type EventsWatcher
- type GetRDBKey
- type GoFunc
- type HandleError
- type Hook
- type ItemInterface
- type ItemMeta
- type ItemPipelines
- type LimitInterface
- type Middlewares
- type MiddlewaresBase
- type MiddlewaresInterface
- type Parser
- type PipelinesBase
- type PipelinesInterface
- type ProcessResponse
- type Proxy
- type RFPDupeFilter
- type RFPDupeFilterInterface
- type RdbNodes
- type RedirectError
- type Request
- type RequestMethod
- type RequestOption
- func RequestWithAllowRedirects(allowRedirects bool) RequestOption
- func RequestWithAllowedStatusCode(allowStatusCode []uint64) RequestOption
- func RequestWithMaxConnsPerHost(maxConnsPerHost int) RequestOption
- func RequestWithMaxRedirects(maxRedirects int) RequestOption
- func RequestWithParser(parser Parser) RequestOption
- func RequestWithRequestBody(body map[string]interface{}) RequestOption
- func RequestWithRequestBytesBody(body []byte) RequestOption
- func RequestWithRequestCookies(cookies map[string]string) RequestOption
- func RequestWithRequestHeader(header map[string]string) RequestOption
- func RequestWithRequestMeta(meta map[string]interface{}) RequestOption
- func RequestWithRequestParams(params map[string]string) RequestOption
- func RequestWithRequestProxy(proxy Proxy) RequestOption
- func RequestWithResponseWriter(write io.Writer) RequestOption
- func RequestWithTimeout(timeout time.Duration) RequestOption
- type Response
- type Settings
- type SpiderDownloader
- type SpiderInterface
- type Spiders
- type Statistic
- func (s *Statistic) GetDownloadFail() uint64
- func (s *Statistic) GetErrorCount() uint64
- func (s *Statistic) GetItemScraped() uint64
- func (s *Statistic) GetRequestSent() uint64
- func (s *Statistic) IncrDownloadFail()
- func (s *Statistic) IncrErrorCount()
- func (s *Statistic) IncrItemScraped()
- func (s *Statistic) IncrRequestSent()
- func (s *Statistic) OutputStats() map[string]uint64
- func (s *Statistic) Reset() error
- type StatisticInterface
- type StatsFieldType
- type WorkerConfigWithRdbCluster
Constants ¶
This section is empty.
Variables ¶
var ( // ErrSpiderMiddleware 下载中间件处理异常 ErrSpiderMiddleware error = errors.New("handle spider middleware error") // ErrSpiderCrawls 抓取流程错误 ErrSpiderCrawls error = errors.New("handle spider crawl error") // ErrDuplicateSpiderName 爬虫名重复错误 ErrDuplicateSpiderName error = errors.New("register a duplicate spider name error") // ErrEmptySpiderName 爬虫名不能为空 ErrEmptySpiderName error = errors.New("register a empty spider name error") // ErrSpiderNotExist 爬虫实例不存在 ErrSpiderNotExist error = errors.New("not found spider") // ErrNotAllowStatusCode 不允许的状态码 ErrNotAllowStatusCode error = errors.New("not allow handle status code") // ErrGetCacheItem 获取item 错误 ErrGetCacheItem error = errors.New("getting item from cache error") // ErrGetHttpProxy 获取http代理错误 ErrGetHttpProxy error = errors.New("getting http proxy ") // ErrGetHttpsProxy 获取https代理错误 ErrGetHttpsProxy error = errors.New("getting https proxy ") // ErrParseSocksProxy 解析socks代理错误 ErrParseSocksProxy error = errors.New("parse socks proxy ") // ErrResponseRead 响应读取失败 ErrResponseRead error = errors.New("read response to buffer error") // ErrResponseParse 响应解析失败 ErrResponseParse error = errors.New("parse response error") )
var ProcessId string = uuid.New().String()
Functions ¶
func DefaultventsWatcher ¶ added in v0.4.1
func DefaultventsWatcher(ch chan EventType, hooker EventHooksInterface) error
DefaultventsWatcher 默认的事件监听器 ch 用于接收事件 hooker 事件处理实例化接口,比如DefualtHooks
func ExecuteCmd ¶ added in v0.4.1
func ExecuteCmd(engine *CrawlEngine)
Execute adds all child commands to the root command and sets flags appropriately. This is called by main.main(). It only needs to happen once to the rootCmd.
func GetAllParserMethod ¶ added in v0.4.1
func GetAllParserMethod(spider SpiderInterface) map[string]Parser
GetAllParserMethod 获取spider实例所有的解析函数
func GetFunctionName ¶ added in v0.4.1
GetFunctionName 提取解析函数名
func NewDefaultLimiter ¶ added in v0.4.1
func NewDefaultLimiter(limitRate int) *defaultLimiter
NewDefaultLimiter 创建一个新的限速器 limitRate 最大请求速率
func NewLeakyBucketLimiterWithRdb ¶ added in v0.4.1
func NewLeakyBucketLimiterWithRdb(safetyLevel int, rdb redis.Cmdable, keyFunc GetRDBKey) *leakyBucketLimiterWithRdb
NewLeakyBucketLimiterWithRdb leakyBucketLimiterWithRdb 构造函数
func NewRdbClient ¶ added in v0.4.1
func NewRdbClient(config *DistributedWorkerConfig) *redis.Client
func NewRdbClusterCLient ¶ added in v0.4.1
func NewRdbClusterCLient(config *WorkerConfigWithRdbCluster) *redis.ClusterClient
func NewRdbConfig ¶ added in v0.4.1
func NewRdbConfig(config *DistributedWorkerConfig) *redis.Options
NewRdbConfig redis 配置构造函数
func OptimalNumOfBits ¶ added in v0.4.1
OptimalNumOfBits 计算位数组长度
func OptimalNumOfHashFunctions ¶ added in v0.4.1
OptimalNumOfHashFunctions计算最优的布隆过滤器哈希函数个数
Types ¶
type BaseSpider ¶
BaseSpider base spider
func NewBaseSpider ¶
func NewBaseSpider(name string, feedUrls []string) *BaseSpider
func (*BaseSpider) ErrorHandler ¶
func (s *BaseSpider) ErrorHandler(err *HandleError)
func (*BaseSpider) Parser ¶
func (s *BaseSpider) Parser(resp *Context, item chan<- *ItemMeta, req chan<- *Context) error
Parser parse request respone it will send item or new request to engine
func (*BaseSpider) StartRequest ¶
func (s *BaseSpider) StartRequest(req chan<- *Context)
type CacheInterface ¶
type CacheInterface interface {
// contains filtered or unexported methods
}
CacheInterface request缓存组件
type CheckMasterLive ¶ added in v0.4.1
CheckMasterLive 检查所有的master节点是否都在线
type Configuration ¶
var Config *Configuration = nil
func (*Configuration) GetValue ¶ added in v0.4.1
func (c *Configuration) GetValue(key string) (interface{}, error)
type Context ¶
type Context struct { // Request 请求对象 Request *Request // Response 响应对象 Response *Response // CtxId context 唯一id由uuid生成 CtxId string // Error Error error // Cancel context.CancelFunc // Ref int64 // Items chan *ItemMeta Spider SpiderInterface // contains filtered or unexported fields }
Context 在引擎中的数据流通载体,负责单个抓取任务的生命周期维护
func NewContext ¶
func NewContext(request *Request, Spider SpiderInterface, opts ...ContextOption) *Context
NewContext 从内存池中构建context对象
type ContextInterface ¶ added in v0.4.1
type ContextInterface interface {
IsDone() bool
}
type ContextOption ¶
type ContextOption func(c *Context)
func WithContextId ¶ added in v0.4.1
func WithContextId(ctxId string) ContextOption
WithContextId 设置自定义的ctxId
func WithItemChannelSize ¶ added in v0.4.1
func WithItemChannelSize(size int) ContextOption
type CrawlEngine ¶ added in v0.4.1
type CrawlEngine struct {
// contains filtered or unexported fields
}
CrawlEngine 引擎是整个框架数据流调度核心
func (*CrawlEngine) EventsWatcherRunner ¶ added in v0.4.1
func (e *CrawlEngine) EventsWatcherRunner() error
EventsWatcherRunner 事件监听器运行组件
func (*CrawlEngine) GetSpiders ¶ added in v0.4.1
func (e *CrawlEngine) GetSpiders() *Spiders
GetSpiders 获取所有的已经注册到引擎的spider实例
func (*CrawlEngine) RegisterDownloadMiddlewares ¶ added in v0.4.1
func (e *CrawlEngine) RegisterDownloadMiddlewares(middlewares MiddlewaresInterface)
RegisterDownloadMiddlewares 注册下载中间件到引擎
func (*CrawlEngine) RegisterPipelines ¶ added in v0.4.1
func (e *CrawlEngine) RegisterPipelines(pipeline PipelinesInterface)
RegisterPipelines 注册pipelines到引擎
func (*CrawlEngine) RegisterSpiders ¶ added in v0.4.1
func (e *CrawlEngine) RegisterSpiders(spider SpiderInterface)
RegisterSpider 将spider实例注册到引擎的 spiders
func (*CrawlEngine) Scheduler ¶ added in v0.4.1
func (e *CrawlEngine) Scheduler() error
Scheduler 调度器
type DefaultFieldHook ¶
type DefaultFieldHook struct { }
func (*DefaultFieldHook) Levels ¶
func (hook *DefaultFieldHook) Levels() []logrus.Level
type DefualtHooks ¶ added in v0.4.1
type DefualtHooks struct { }
func NewDefualtHooks ¶ added in v0.4.1
func NewDefualtHooks() *DefualtHooks
NewDefualtHooks 构建新的默认事件监听器
func (*DefualtHooks) Error ¶ added in v0.4.1
func (d *DefualtHooks) Error(params ...interface{}) error
Error 处理ERROR事件
func (*DefualtHooks) EventsWatcher ¶ added in v0.4.1
func (d *DefualtHooks) EventsWatcher(ch chan EventType) error
EventsWatcher DefualtHooks 的事件监听器
func (*DefualtHooks) Exit ¶ added in v0.4.1
func (d *DefualtHooks) Exit(params ...interface{}) error
Exit 处理EXIT事件
func (*DefualtHooks) Heartbeat ¶ added in v0.4.1
func (d *DefualtHooks) Heartbeat(params ...interface{}) error
Heartbeat 处理HEARTBEAT事件
func (*DefualtHooks) Start ¶ added in v0.4.1
func (d *DefualtHooks) Start(params ...interface{}) error
Start 处理START事件
func (*DefualtHooks) Stop ¶ added in v0.4.1
func (d *DefualtHooks) Stop(params ...interface{}) error
Stop 处理STOP事件
type DistributeOptions ¶ added in v0.4.1
type DistributeOptions func(w *DistributedWorkerConfig)
func DistributedWithBloomN ¶ added in v0.4.1
func DistributedWithBloomN(bloomN uint) DistributeOptions
DistributedWithBloomN 布隆过滤器数据规模
func DistributedWithBloomP ¶ added in v0.4.1
func DistributedWithBloomP(bloomP float64) DistributeOptions
DistributedWithBloomP 布隆过滤器容错率
func DistributedWithConnectionsSize ¶ added in v0.4.1
func DistributedWithConnectionsSize(size int) DistributeOptions
DistributedWithConnectionsSize rdb 连接池最大连接数
func DistributedWithGetBFKey ¶ added in v0.4.1
func DistributedWithGetBFKey(keyFunc GetRDBKey) DistributeOptions
DistributedWithGetBFKey 布隆过滤器的key生成函数
func DistributedWithGetLimitKey ¶ added in v0.4.1
func DistributedWithGetLimitKey(keyFunc GetRDBKey) DistributeOptions
DistributedWithGetLimitKey 限速器key的生成函数
func DistributedWithGetqueueKey ¶ added in v0.4.1
func DistributedWithGetqueueKey(keyFunc GetRDBKey) DistributeOptions
DistributedWithGetqueueKey 队列key生成函数
func DistributedWithLimiterRate ¶ added in v0.4.1
func DistributedWithLimiterRate(rate int) DistributeOptions
DistributedWithLimiterRate 分布式组件下限速器的限速值
func DistributedWithRdbMaxRetry ¶ added in v0.4.1
func DistributedWithRdbMaxRetry(retry int) DistributeOptions
DistributedWithRdbMaxRetry rdb失败重试次数
func DistributedWithRdbTimeout ¶ added in v0.4.1
func DistributedWithRdbTimeout(timeout time.Duration) DistributeOptions
DistributedWithRdbTimeout rdb超时时间设置
type DistributeStatistic ¶ added in v0.4.1
type DistributeStatistic struct {
// contains filtered or unexported fields
}
DistributeStatistic 分布式统计组件
func NewDistributeStatistic ¶ added in v0.4.1
func NewDistributeStatistic(statsPrefixKey string, rdb redis.Cmdable, wg *sync.WaitGroup, opts ...DistributeStatisticOption) *DistributeStatistic
NewDistributeStatistic 分布式数据统计组件构造函数
func (*DistributeStatistic) GetDownloadFail ¶ added in v0.4.1
func (s *DistributeStatistic) GetDownloadFail() uint64
GetDownloadFail 获取下载失败数
func (*DistributeStatistic) GetErrorCount ¶ added in v0.4.1
func (s *DistributeStatistic) GetErrorCount() uint64
GetErrorCount 获取错误数
func (*DistributeStatistic) GetItemScraped ¶ added in v0.4.1
func (s *DistributeStatistic) GetItemScraped() uint64
GetItemScraped 获取items的值
func (*DistributeStatistic) GetRequestSent ¶ added in v0.4.1
func (s *DistributeStatistic) GetRequestSent() uint64
GetRequestSent 获取request 量
func (*DistributeStatistic) GetStatsField ¶ added in v0.4.1
func (s *DistributeStatistic) GetStatsField(field StatsFieldType) uint64
GetStatsField 获取指定的数据指标值
func (*DistributeStatistic) IncrDownloadFail ¶ added in v0.4.1
func (s *DistributeStatistic) IncrDownloadFail()
GetDownloadFail 累计获取下载失败的数量
func (*DistributeStatistic) IncrErrorCount ¶ added in v0.4.1
func (s *DistributeStatistic) IncrErrorCount()
IncrErrorCount 累加一条错误
func (*DistributeStatistic) IncrItemScraped ¶ added in v0.4.1
func (s *DistributeStatistic) IncrItemScraped()
IncrItemScraped 累加一条item
func (*DistributeStatistic) IncrRequestSent ¶ added in v0.4.1
func (s *DistributeStatistic) IncrRequestSent()
IncrRequestSent 累加一条request
func (*DistributeStatistic) IncrStats ¶ added in v0.4.1
func (s *DistributeStatistic) IncrStats(field StatsFieldType)
IncrStats累加指定的统计指标
func (*DistributeStatistic) OutputStats ¶ added in v0.4.1
func (s *DistributeStatistic) OutputStats() map[string]uint64
OutputStats 格式化输出所有的数据指标
func (*DistributeStatistic) Reset ¶ added in v0.4.1
func (s *DistributeStatistic) Reset() error
Reset 重置各项指标 若afterResetTTL>0则为每一项指标设置ttl否则直接删除指标
type DistributeStatisticOption ¶ added in v0.4.1
type DistributeStatisticOption func(d *DistributeStatistic)
DistributeStatisticOption 分布式组件可选参数定义
func DistributeStatisticAfterResetTTL ¶ added in v0.4.1
func DistributeStatisticAfterResetTTL(ttl time.Duration) DistributeStatisticOption
DistributeStatisticAfterResetTTL 为分布式计数器设置重置之前的ttl
type DistributedHooks ¶ added in v0.4.1
type DistributedHooks struct {
// contains filtered or unexported fields
}
DistributedHooks 分布式事件监听器
func NewDistributedHooks ¶ added in v0.4.1
func NewDistributedHooks(worker DistributedWorkerInterface) *DistributedHooks
DistributedHooks 构建新的分布式监听器组件对象
func (*DistributedHooks) Error ¶ added in v0.4.1
func (d *DistributedHooks) Error(params ...interface{}) error
Error 用于处理分布式模式下的ERROR事件
func (*DistributedHooks) EventsWatcher ¶ added in v0.4.1
func (d *DistributedHooks) EventsWatcher(ch chan EventType) error
EventsWatcher 分布式模式下的事件监听器
func (*DistributedHooks) Exit ¶ added in v0.4.1
func (d *DistributedHooks) Exit(params ...interface{}) error
Exit 用于处理分布式模式下的Exit事件
func (*DistributedHooks) Heartbeat ¶ added in v0.4.1
func (d *DistributedHooks) Heartbeat(params ...interface{}) error
Exit 用于处理分布式模式下的HEARTBEAT事件
func (*DistributedHooks) Start ¶ added in v0.4.1
func (d *DistributedHooks) Start(params ...interface{}) error
Start 用于处理分布式模式下的START事件
func (*DistributedHooks) Stop ¶ added in v0.4.1
func (d *DistributedHooks) Stop(params ...interface{}) error
Stop 用于处理分布式模式下的STOP事件
type DistributedWorker ¶ added in v0.4.1
type DistributedWorker struct {
// contains filtered or unexported fields
}
DistributedWorker 分布式组件,包含两个组件: request请求缓存队列,由各个节点上的引擎读队列消费, redis 队列缓存的是经过gob序列化之后的二进制数据 布隆过滤器主要是用于去重 该组件同事实现了RFPDupeFilterInterface 和CacheInterface
func NewDistributedWorker ¶ added in v0.4.1
func NewDistributedWorker(addr string, config *DistributedWorkerConfig) *DistributedWorker
NewDistributedWorker 构建redis单机模式下的分布式工作组件
func NewWorkerWithRdbCluster ¶ added in v0.4.1
func NewWorkerWithRdbCluster(config *WorkerConfigWithRdbCluster) *DistributedWorker
NewWorkerWithRdbCluster redis cluster模式下的分布式工作组件
func (*DistributedWorker) Add ¶ added in v0.4.1
func (w *DistributedWorker) Add(fingerprint []byte) error
Add 添加指纹到布隆过滤器
func (*DistributedWorker) AddNode ¶ added in v0.4.1
func (w *DistributedWorker) AddNode() error
AddNode 新增节点
func (*DistributedWorker) CheckAllNodesStop ¶ added in v0.4.1
func (w *DistributedWorker) CheckAllNodesStop() (bool, error)
CheckAllNodesStop 检查所有的节点是否都已经停止
func (*DistributedWorker) CheckMasterLive ¶ added in v0.4.1
func (w *DistributedWorker) CheckMasterLive() (bool, error)
CheckMasterLive 检查所有master 节点是否都在线
func (*DistributedWorker) DelNode ¶ added in v0.4.1
func (w *DistributedWorker) DelNode() error
DelNode 删除当前节点
func (*DistributedWorker) DoDupeFilter ¶ added in v0.4.1
func (w *DistributedWorker) DoDupeFilter(ctx *Context) (bool, error)
DoDupeFilter request去重处理,如果指纹已经存在则返回True,否则为False 指纹不存在的情况下会将指纹添加到缓存
func (*DistributedWorker) Fingerprint ¶ added in v0.4.1
func (w *DistributedWorker) Fingerprint(ctx *Context) ([]byte, error)
Fingerprint 生成request 对象的指纹
func (*DistributedWorker) GetLimter ¶ added in v0.4.1
func (w *DistributedWorker) GetLimter() LimitInterface
GetLimter 获取限速器
func (*DistributedWorker) Heartbeat ¶ added in v0.4.1
func (w *DistributedWorker) Heartbeat() error
Heartbeat 心跳包
func (*DistributedWorker) SetMaster ¶ added in v0.4.1
func (w *DistributedWorker) SetMaster(flag bool)
SetMaster 设置当前的节点是否为master
func (*DistributedWorker) SetSpiders ¶ added in v0.4.1
func (w *DistributedWorker) SetSpiders(spiders *Spiders)
SetSpiders 设置所有的已注册的spider
func (*DistributedWorker) StopNode ¶ added in v0.4.1
func (w *DistributedWorker) StopNode() error
StopNode 停止当前节点的活动
type DistributedWorkerConfig ¶ added in v0.4.1
type DistributedWorkerConfig struct { // RedisAddr redis 地址 RedisAddr string // RedisPasswd redis 密码 RedisPasswd string // RedisUsername redis 用户名 RedisUsername string // RedisDB redis 数据库索引 index RedisDB uint32 // RdbConnectionsSize 连接池大小 RdbConnectionsSize uint64 // RdbTimeout redis 超时时间 RdbTimeout time.Duration // RdbMaxRetry redis操作失败后的重试次数 RdbMaxRetry int // BloomP 布隆过滤器的容错率 BloomP float64 // BloomN 数据规模,比如1024 * 1024 BloomN uint // 并发量 LimiterRate int // GetqueueKey 生成队列key的函数,允许用户自定义 GetqueueKey GetRDBKey // GetBFKey 布隆过滤器对应的生成key的函数,允许用户自定义 GetBFKey GetRDBKey // contains filtered or unexported fields }
DistributedWorkerConfig 分布式组件的配置参数
func NewDistributedWorkerConfig ¶ added in v0.4.1
func NewDistributedWorkerConfig(username string, passwd string, db uint32, opts ...DistributeOptions) *DistributedWorkerConfig
NewDistributedWorkerConfig 新建分布式组件的配置
type DistributedWorkerInterface ¶ added in v0.4.1
type DistributedWorkerInterface interface { CacheInterface RFPDupeFilterInterface // AddNode 新增一个节点 AddNode() error // DelNode 删除当前的节点 DelNode() error // StopNode 停止当前的节点 StopNode() error // Heartbeat 心跳 Heartbeat() error // CheckAllNodesStop 检查所有的节点是否都已经停止 CheckAllNodesStop() (bool, error) // GetLimter 获取限速器 GetLimter() LimitInterface // CheckMasterLive 检测主节点是否还在线 CheckMasterLive() (bool, error) // SetMaster 是否将当前的节点设置为主节点 SetMaster(flag bool) // SetSpiders 设置已经注册的所有的spider SetSpiders(spiders *Spiders) }
DistributedWorkerInterface 分布式组件接口
type Downloader ¶
type Downloader interface { // Download 下载函数 Download(ctx *Context) (*Response, error) // CheckStatus 检查响应状态码的合法性 CheckStatus(statusCode uint64, allowStatus []uint64) bool }
Downloader 下载器接口
type DownloaderOption ¶
type DownloaderOption func(d *SpiderDownloader)
DownloaderOption 下载器可选参数函数
func DownloadWithClient ¶
func DownloadWithClient(client http.Client) DownloaderOption
DownloadWithClient 设置下载器的http.Client客户端
func DownloadWithH2 ¶ added in v0.4.1
func DownloadWithH2(h2 bool) DownloaderOption
DownloadWithTlsConfig 下载器是否开启http2
func DownloadWithTimeout ¶
func DownloadWithTimeout(timeout time.Duration) DownloaderOption
DownloadWithTimeout 设置下载器的网络请求超时时间
func DownloadWithTlsConfig ¶
func DownloadWithTlsConfig(tls *tls.Config) DownloaderOption
DownloadWithTlsConfig 设置下载器的tls
func DownloaderWithtransport ¶
func DownloaderWithtransport(transport *http.Transport) DownloaderOption
DownloaderWithtransport 为下载器设置 http.Transport
type EngineOption ¶
type EngineOption func(r *CrawlEngine)
EngineOption 引擎构造过程中的可选参数
func EngineWithCache ¶ added in v0.4.1
func EngineWithCache(cache CacheInterface) EngineOption
EngineWithCache 引擎使用的缓存组件
func EngineWithDistributedWorker ¶ added in v0.4.1
func EngineWithDistributedWorker(woker DistributedWorkerInterface) EngineOption
EngineWithDistributedWorker 引擎使用的的分布式组件
func EngineWithDownloader ¶
func EngineWithDownloader(downloader Downloader) EngineOption
EngineWithDownloader 引擎使用的下载器组件
func EngineWithFilter ¶ added in v0.4.1
func EngineWithFilter(filter RFPDupeFilterInterface) EngineOption
EngineWithFilter 引擎使用的过滤去重组件
func EngineWithLimiter ¶ added in v0.4.1
func EngineWithLimiter(limiter LimitInterface) EngineOption
EngineWithLimiter 引擎使用的限速器
func EngineWithUniqueReq ¶
func EngineWithUniqueReq(uniqueReq bool) EngineOption
EngineWithUniqueReq 是否进行去重处理
type ErrorOption ¶
type ErrorOption func(e *HandleError)
ErrorOption HandleError 可选参数
func ErrorWithExtras ¶ added in v0.4.1
func ErrorWithExtras(extras map[string]interface{}) ErrorOption
ErrorWithExtras HandleError 添加额外的数据
type EventHooksInterface ¶ added in v0.4.1
type EventHooksInterface interface { // Start 处理引擎启动事件 Start(params ...interface{}) error // Stop 处理引擎停止事件 Stop(params ...interface{}) error // Error处理错误事件 Error(params ...interface{}) error // Exit 退出引擎事件 Exit(params ...interface{}) error // Heartbeat 心跳检查事件 Heartbeat(params ...interface{}) error // EventsWatcher 事件监听器 EventsWatcher(ch chan EventType) error }
EventHooksInterface 事件处理函数接口
type EventsWatcher ¶ added in v0.4.1
EventsWatcher 事件监听器
type HandleError ¶
type HandleError struct { // CtxId 上下文id CtxId string // Err 处理过程的错误 Err error // Extras 携带的额外信息 Extras map[string]interface{} }
HandleError 错误处理接口
func NewError ¶
func NewError(ctx *Context, err error, opts ...ErrorOption) *HandleError
NewError 构建新的HandleError实例
type ItemMeta ¶
type ItemMeta struct { // CtxId 对应的context id CtxId string // Item item对象 Item ItemInterface }
ItemMeta item元数据结构
type ItemPipelines ¶
type ItemPipelines []PipelinesInterface
func (ItemPipelines) Len ¶
func (p ItemPipelines) Len() int
func (ItemPipelines) Less ¶
func (p ItemPipelines) Less(i, j int) bool
func (ItemPipelines) Swap ¶
func (p ItemPipelines) Swap(i, j int)
type LimitInterface ¶ added in v0.4.1
type LimitInterface interface {
// contains filtered or unexported methods
}
LimitInterface 限速器接口
type Middlewares ¶
type Middlewares []MiddlewaresInterface
Middlewares 下载中间件队列
func (Middlewares) Less ¶
func (p Middlewares) Less(i, j int) bool
func (Middlewares) Swap ¶
func (p Middlewares) Swap(i, j int)
type MiddlewaresBase ¶
type MiddlewaresBase struct {
Priority int
}
type MiddlewaresInterface ¶
type MiddlewaresInterface interface { // GetPriority 获取优先级,数字越小优先级越高 GetPriority() int // ProcessRequest 处理request请求对象 // 此处用于增加请求头 // 按优先级执行 ProcessRequest(ctx *Context) error // ProcessResponse 用于处理请求成功之后的response // 执行顺序你优先级,及优先级越高执行顺序越晚 ProcessResponse(ctx *Context, req chan<- *Context) error // GetName 获取中间件的名称 GetName() string }
MiddlewaresInterface 下载中间件的接口用于处理进入下载器之前的request对象 和下载之后的response
type Parser ¶
Parser 响应解析函数结构
func GetParserByName ¶ added in v0.4.1
func GetParserByName(spider SpiderInterface, name string) Parser
GetParserByName 通过函数名从spider实例中获取解析函数
type PipelinesBase ¶
type PipelinesBase struct {
Priority int
}
type PipelinesInterface ¶
type PipelinesInterface interface { // GetPriority 获取当前pipeline的优先级 GetPriority() int // ProcessItem item处理单元 ProcessItem(spider SpiderInterface, item *ItemMeta) error }
PipelinesInterface pipeline 接口 pipeline 主要用于处理item,例如数据存储、数据清洗 将多个pipeline注册到引擎可以实现责任链模式的数据处理
type ProcessResponse ¶
ProcessResponse 处理下载之后的response函数
type RFPDupeFilter ¶
type RFPDupeFilter struct {
// contains filtered or unexported fields
}
RFPDupeFilter 去重组件
func NewRFPDupeFilter ¶
func NewRFPDupeFilter(bloomP float64, bloomN uint) *RFPDupeFilter
NewRFPDupeFilter 新建去重组件 bloomP容错率 bloomN数据规模
func (*RFPDupeFilter) DoDupeFilter ¶
func (f *RFPDupeFilter) DoDupeFilter(ctx *Context) (bool, error)
DoDupeFilter 通过布隆过滤器对request对象进行去重处理
func (*RFPDupeFilter) Fingerprint ¶
func (f *RFPDupeFilter) Fingerprint(ctx *Context) ([]byte, error)
Fingerprint 计算指纹
type RFPDupeFilterInterface ¶
type RFPDupeFilterInterface interface { // Fingerprint request指纹计算 Fingerprint(ctx *Context) ([]byte, error) // DoDupeFilter request去重 DoDupeFilter(ctx *Context) (bool, error) }
RFPDupeFilterInterface request 对象指纹计算和布隆过滤器去重
type Request ¶
type Request struct { // Url 请求Url Url string `json:"url"` // Header 请求头 Header map[string]string `json:"header"` // Method 请求方式 Method RequestMethod `json:"method"` // Body 请求body Body []byte `json:"body"` // Params 请求url的参数 Params map[string]string `json:"params"` // Proxy 代理实例 Proxy *Proxy `json:"-"` // Cookies 请求携带的cookies Cookies map[string]string `json:"cookies"` // Meta 请求携带的额外的信息 Meta map[string]interface{} `json:"meta"` // AllowRedirects 是否允许跳转默认允许 AllowRedirects bool `json:"allowRedirects"` // MaxRedirects 最大的跳转次数 MaxRedirects int `json:"maxRedirects"` // Parser 该请求绑定的响应解析函数,必须是一个spider实例 Parser Parser `json:"-"` // MaxConnsPerHost 单个域名最大的连接数 MaxConnsPerHost int `json:"maxConnsPerHost"` // BodyReader 用于读取body BodyReader io.Reader `json:"-"` // ResponseWriter 响应读取到本地的接口 ResponseWriter io.Writer `json:"-"` // AllowStatusCode 允许的状态码 AllowStatusCode []uint64 `json:"allowStatusCode"` // Timeout 请求超时时间 Timeout time.Duration `json:"timeout"` }
Request 请求对象的结构
func NewRequest ¶
func NewRequest(url string, method RequestMethod, parser Parser, opts ...RequestOption) *Request
NewRequest 从Request对象内存池创建新的Request对象
func RequestFromMap ¶ added in v0.4.1
func RequestFromMap(src map[string]interface{}, opts ...RequestOption) *Request
RequestFromMap 从map创建requests
type RequestMethod ¶ added in v0.4.1
type RequestMethod string
Request 请求方式
const ( GET RequestMethod = "GET" POST RequestMethod = "POST" PUT RequestMethod = "PUT" DELETE RequestMethod = "DELETE" OPTIONS RequestMethod = "OPTIONS" HEAD RequestMethod = "HEAD" )
type RequestOption ¶ added in v0.4.1
type RequestOption func(r *Request)
Option NewRequest 可选参数
func RequestWithAllowRedirects ¶
func RequestWithAllowRedirects(allowRedirects bool) RequestOption
RequestWithAllowRedirects 设置是否允许跳转 如果不允许则MaxRedirects=0
func RequestWithAllowedStatusCode ¶ added in v0.4.1
func RequestWithAllowedStatusCode(allowStatusCode []uint64) RequestOption
RequestWithAllowedStatusCode 设置AllowStatusCode
func RequestWithMaxConnsPerHost ¶
func RequestWithMaxConnsPerHost(maxConnsPerHost int) RequestOption
RequestWithMaxConnsPerHost 设置MaxConnsPerHost
func RequestWithMaxRedirects ¶
func RequestWithMaxRedirects(maxRedirects int) RequestOption
RequestWithMaxRedirects 设置最大的跳转次数 若maxRedirects <= 0则认为不允许跳转AllowRedirects = false
func RequestWithParser ¶ added in v0.4.1
func RequestWithParser(parser Parser) RequestOption
RequestWithParser 设置Parser
func RequestWithRequestBody ¶
func RequestWithRequestBody(body map[string]interface{}) RequestOption
RequestWithRequestBody 传入请求体到request
func RequestWithRequestBytesBody ¶ added in v0.4.1
func RequestWithRequestBytesBody(body []byte) RequestOption
RequestWithRequestBytesBody request绑定bytes body
func RequestWithRequestCookies ¶
func RequestWithRequestCookies(cookies map[string]string) RequestOption
RequestWithRequestCookies 设置cookie
func RequestWithRequestHeader ¶
func RequestWithRequestHeader(header map[string]string) RequestOption
RequestWithRequestHeader 设置请求头
func RequestWithRequestMeta ¶
func RequestWithRequestMeta(meta map[string]interface{}) RequestOption
RequestWithRequestMeta 设置 meta
func RequestWithRequestParams ¶
func RequestWithRequestParams(params map[string]string) RequestOption
RequestWithRequestParams 设置请求的url参数
func RequestWithRequestProxy ¶
func RequestWithRequestProxy(proxy Proxy) RequestOption
RequestWithRequestProxy 设置代理
func RequestWithResponseWriter ¶
func RequestWithResponseWriter(write io.Writer) RequestOption
RequestWithResponseWriter 设置ResponseWriter
func RequestWithTimeout ¶ added in v0.4.1
func RequestWithTimeout(timeout time.Duration) RequestOption
RequestWithTimeout 设置请求超时时间 若timeout<=0则认为没有超时时间
type Response ¶
type Response struct { // Status状态码 Status int // Header 响应头 Header map[string][]string // Header response header // Delay 请求延迟 Delay float64 // Delay the time of handle download request // ContentLength 响应体大小 ContentLength uint64 // ContentLength response content length // URL 请求url URL string // URL of request url // Buffer 响应体缓存 Buffer *bytes.Buffer // buffer read response buffer }
Response 请求响应体的结构
type SpiderDownloader ¶
type SpiderDownloader struct { // ProxyFunc 对单个请求进行代理设置 ProxyFunc func(req *http.Request) (*url.URL, error) // contains filtered or unexported fields }
SpiderDownloader tegenaria 爬虫下载器
func (*SpiderDownloader) CheckStatus ¶
func (d *SpiderDownloader) CheckStatus(statusCode uint64, allowStatus []uint64) bool
CheckStatus 检查状态码是否合法
type SpiderInterface ¶
type SpiderInterface interface { // StartRequest 通过GetFeedUrls()获取种子 // urls并构建初始请求 StartRequest(req chan<- *Context) // Parser 默认的请求响应解析函数 // 在解析过程中生成的新的请求可以推送到req channel Parser(resp *Context, req chan<- *Context) error // ErrorHandler 错误处理函数,允许在此过程中生成新的请求 // 并推送到req channel ErrorHandler(err *Context, req chan<- *Context) // GetName 获取spider名称 GetName() string // GetFeedUrls 获取种子urls GetFeedUrls() []string }
type Spiders ¶
type Spiders struct { // SpidersModules spider名称和spider实例的映射 SpidersModules map[string]SpiderInterface // Parsers parser函数名和函数的映射 // 用于序列化和反序列化 Parsers map[string]Parser }
Spiders 全局spiders管理器 用于接收注册的SpiderInterface实例
var SpidersList *Spiders
func (*Spiders) GetSpider ¶
func (s *Spiders) GetSpider(name string) (SpiderInterface, error)
GetSpider 通过爬虫名获取spider实例
func (*Spiders) Register ¶
func (s *Spiders) Register(spider SpiderInterface) error
Register spider实例注册到Spiders.SpidersModules
type Statistic ¶ added in v0.4.1
type Statistic struct { // ItemScraped items总数 ItemScraped uint64 `json:"items"` // RequestSent 请求总数 RequestSent uint64 `json:"requets"` // DownloadFail 下载失败总数 DownloadFail uint64 `json:"download_fail"` // ErrorCount 错误总数 ErrorCount uint64 `json:"errors"` // contains filtered or unexported fields }
Statistic 数据统计指标
func (*Statistic) GetDownloadFail ¶ added in v0.4.1
GetDownloadFail 获取下载失败的总数
func (*Statistic) GetErrorCount ¶ added in v0.4.1
GetErrorCount 获取捕获到错误总数
func (*Statistic) GetItemScraped ¶ added in v0.4.1
GetItemScraped 获取拿到的item总数
func (*Statistic) GetRequestSent ¶ added in v0.4.1
GetRequestSent 获取发送的请求总数
func (*Statistic) IncrDownloadFail ¶ added in v0.4.1
func (s *Statistic) IncrDownloadFail()
IncrDownloadFail 累加一条下载失败数据
func (*Statistic) IncrErrorCount ¶ added in v0.4.1
func (s *Statistic) IncrErrorCount()
IncrErrorCount 累加捕获到的数量
func (*Statistic) IncrItemScraped ¶ added in v0.4.1
func (s *Statistic) IncrItemScraped()
IncrItemScraped 累加一条item
func (*Statistic) IncrRequestSent ¶ added in v0.4.1
func (s *Statistic) IncrRequestSent()
IncrRequestSent 累加一条request
func (*Statistic) OutputStats ¶ added in v0.4.1
OutputStats 格式化统计数据
type StatisticInterface ¶ added in v0.4.1
type StatisticInterface interface { // IncrItemScraped 累加一条item IncrItemScraped() // IncrRequestSent 累加一条request IncrRequestSent() // IncrDownloadFail 累加一条下载失败数据 IncrDownloadFail() // IncrErrorCount 累加一条错误数据 IncrErrorCount() // GetItemScraped 获取拿到的item总数 GetItemScraped() uint64 // GetRequestSent 获取发送request的总数 GetRequestSent() uint64 // GetErrorCount 获取处理过程中生成的error总数 GetErrorCount() uint64 // GetDownloadFail 获取下载失败的总数 GetDownloadFail() uint64 // OutputStats 格式化统计指标 OutputStats() map[string]uint64 // Reset 重置指标统计组件,所有指标归零 Reset() error // contains filtered or unexported methods }
StatisticInterface 数据统计组件接口
type StatsFieldType ¶ added in v0.4.1
type StatsFieldType string
StatsFieldType 统计指标的数据类型
const ( // RequestStats 发起的请求总数 RequestStats StatsFieldType = "requests" // ItemsStats 获取到的items总数 ItemsStats StatsFieldType = "items" // DownloadFailStats 请求失败总数 DownloadFailStats StatsFieldType = "download_fail" // ErrorStats 错误总数 ErrorStats StatsFieldType = "errors" )
type WorkerConfigWithRdbCluster ¶ added in v0.4.1
type WorkerConfigWithRdbCluster struct { *DistributedWorkerConfig RdbNodes }
WorkerConfigWithRdbCluster redis cluser 模式下的分布式组件配置参数
func NewWorkerConfigWithRdbCluster ¶ added in v0.4.1
func NewWorkerConfigWithRdbCluster(config *DistributedWorkerConfig, nodes RdbNodes) *WorkerConfigWithRdbCluster
NewWorkerConfigWithRdbCluster redis cluster模式的分布式组件