jewel_crawler

package module
v0.7.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 4, 2021 License: MIT Imports: 11 Imported by: 0

README

jewl-crawler 简介

jewl-crawler 是一个分布式爬虫框架

jewel-crawler 可以用来网站数据采集,支持通过接口、静态页面、动态页面、以及图片等资源抓取任务,通过Redis进行任务的流转,支持多层次抓取需求,比如支持新闻类网站以列表和详情页两层数据流转采集。

架构集成了GraphQuery方便对数据进行解析,go-readability适用于大部分的文章采集。

安装与使用

go get -u github.com/SunMaybo/jewel-crawler

栗子

项目启动

通过全局SetLogLevel方法设置日志级别

使用go-redis启动Redis,Concurrent为支持最大并发数量,防止因为goroutine开启过大造成内存开销巨大。

Queue为使用的队列名字,实践中我们通过依据:图片、视频、文档、静态页面、动态页面、接口等对任务进行划分,依据是请求耗时和数据大小。

	jewel_crawler.SetLogLevel("info")
	engine := jewel_crawler.New(&jewel_crawler.Config{
		Queue:      config.Queue,
		Concurrent: config.Concurrent,
		Redis:      &redis.Options{
			Addr:         config.Redis.Addr,
			MinIdleConns: config.Redis.Idle,
			PoolSize:     config.Redis.Active,
			DB:           config.Redis.DB,
			Password:     config.Redis.Password,
			Username:     config.Redis.Name,
		},
	})

注册抓取模版

engine.Pipeline.AddCrawler("default_html_crawler", &crawler.DefaultHtmlCrawler{})

实现抓取模版

抓取模版需要实现接口方法并注册

type Crawler interface {
	Collect(event CollectEvent) (string, error)
	Parser(event ParserEvent) (map[string]interface{}, error)
	Storage(event StorageEvent) error
}

文章抓取的模版

其中event中封装了抓取组建,并且可以制定抓取数据最大大小,防止数据过大撑爆内存

type DefaultHtmlCrawler struct {
}

func (dhc *DefaultHtmlCrawler) Collect(event CollectEvent) (string, error) {
	s := spider.NewShtmlSpider(1 * 1024 * 1024)
	resp, err := s.Do(spider.Request{
		Url:     event.Task.CrawlerUrl,
		Method:  event.Task.Method,
		Param:   event.Task.Param,
		Headers: event.Task.Header,
		Timeout: event.Task.Timeout,
	})
	if err != nil {
		return "", err
	}
	return resp.GetContent(), nil
}
func (dhc *DefaultHtmlCrawler) Parser(event ParserEvent) (map[string]interface{}, error) {
	data, err := event.ReadabilityParser(event.Content, event.Task.CrawlerUrl)
	if err != nil {
		return nil, err
	}
	return data, nil
}
func (dhc *DefaultHtmlCrawler) Storage(event StorageEvent) error {
	logs.S.Info(event.Data)
	return nil
}

抓取组件

接口采集
apiSpider := event.ApiSpider(1024 * 1024)
	resp, err := apiSpider.Do(spider.Request{
		Url:     event.Task.CrawlerUrl,
		Method:  event.Task.Method,
		Param:   event.Task.Param,
		Headers: event.Task.Header,
		Timeout: event.Task.Timeout,
		ProxyCallBack: func() string {
			return "http://127.0.0.1:7890"
		},
	})
静态页面采集

静态页面采集返回的数据进行自动转码UTF-8,解决不同网站编码不同带来乱码问题,以及返回RedirectUrl最大重定向深度为10

shtmlSpider := event.ShtmlSpider(1024 * 1024)
	resp, err := shtmlSpider.Do(spider.Request{
		Url:    event.Task.CrawlerUrl,
		Method: "GET",
		ProxyCallBack: func() string {
			return proxy.GetRandomProxy()
		},
	})
    fmt.Println(resp.GetContent())
	fmt.Println(resp.GetCharset())
	fmt.Println(resp.RedirectUrl)
动态页面采集

动态采集使用chromedp底层依赖于Chrome浏览器需要在启动Chrome浏览器的环境中进行启用。

dhtmlSpider := event.DhtmlSpider(1024 * 1024)
	resp, err := dhtmlSpider.Do(spider.Request{
		Url:    event.Task.CrawlerUrl,
		Method: "GET",
		ProxyCallBack: func() string {
			return proxy.GetRandomProxy()
		},
	})
文件采集

目前仅支持图片采集自动解析图片格式

sp := spider.NewFileSpider(50 * 1024 * 1024)
	resp, err := sp.Do(spider.Request{
		Url:     event.Task.CrawlerUrl,
		Method:  "GET",
		Timeout: 60 * time.Second,
		ProxyCallBack: func() string {
			return proxy.GetRandomProxy()
		},
	})
	
fmt.Println(resp.GetCharset()) //图片格式

任务下发

其中TinyExtras 用户任务小数据的转移,较大数据临时存储通过temp_storage_id 进行关联,数据存储在redis中

childImage := task.ChildTask{
			CrawlerName: ImgCrawler,
			CrawlerUrl:  img,
			ContentType: "json",
			TempStorageId: "",
			Method:      "GET",
			Index:       i,
			TinyExtras: map[string]interface{}{
				"path": p.ImagePath(img),
			},
		}
event.Task.Next(context.Background(), event.Queue, childImage)
较大数据临时存储
event.TempStorage.Set(context.Background(),"id",temp.Temp{},1*time.Second)
event.TempStorage.Get(context.Background(),"id")
event.TempStorage.Clear(context.Background(),"id")

使用GraphQuery进行解析

patternTotal := "{\n  total `css(\".cta-link\");regex(\"[0-9]+\")`\n}"
totalSecondInter, _ := event.Parser(respTotal.GetContent(), patternTotal)

启动任务

config.CrawlerEngine.Start(context.Background(), config.MaxRetry)

其它

分布式锁支持
阻塞锁
lock := s.Config.CrawlerEngine.NewMutex()
lock.Name = "lock"   \\锁名字
lock.Timeout = 1*time.Second  \\超时时间设置
lock.Lock()
defer lock.UnLock()
非阻塞锁
lock := s.Config.CrawlerEngine.NewMutex()
lock.Name = "lock"   \\锁名字
lock.Timeout = 1*time.Second  \\超时时间设置
lock.NLock()
defer lock.UnLock()
全局偏移量存储
lock := s.Config.CrawlerEngine.NewMutex()
lock.Name = "lock"   \\锁名字
lock.SetOffset("1")
offset,err:=lock.GetOffset()

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func SetLogLevel

func SetLogLevel(level string)

Types

type Config

type Config struct {
	Redis         *redis.Options
	Queue         string
	ConsumerQueue string
	Concurrent    int
}

type CrawlerEngine

type CrawlerEngine struct {
	Pipeline *crawler.PipeLine

	Concurrent int

	RateLimiter *simpleratelimit.RateLimiter
	CallBack    func(task task.Task, err error)
	// contains filtered or unexported fields
}

func New

func New(cfg *Config) *CrawlerEngine

func (*CrawlerEngine) Close

func (p *CrawlerEngine) Close() error

func (*CrawlerEngine) NewMutex added in v0.1.6

func (p *CrawlerEngine) NewMutex() *sync.Mutex

func (*CrawlerEngine) Push

func (p *CrawlerEngine) Push(ctx context.Context, queue string, task task.Task) error

func (*CrawlerEngine) Start

func (p *CrawlerEngine) Start(ctx context.Context, maxExecuteCount int)

开启

func (*CrawlerEngine) StartThread added in v0.7.5

func (p *CrawlerEngine) StartThread(ctx context.Context, maxExecuteCount int)

开启

Directories

Path Synopsis
uuid
Package uuid provides implementation of Universally Unique Identifier (UUID).
Package uuid provides implementation of Universally Unique Identifier (UUID).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL