package module
v0.0.0-...-8a66ba0 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Oct 29, 2016 License: MIT Imports: 21 Imported by: 0



Build Status License

An easy distributed and restful crawler framework


go get -u github.com/buptmiao/eago


  • Eago works like scrapy, but it is more lightweight and effective.

  • Eago supports RESTful API, through which users can monitor eago's statistic information, add new crawler job, control the crawler and so on.

  • Eago can be deployed as clusters. An eago cluster consist of one master and multiple slavers, and the master node is auto-discovered


You can run eago like this without any crawlers:

import (

func main() {

	node := eago.GetNodeInstance()
	cluster := eago.GetClusterInstance()

	// Descover will Block the execution, until a master node
	// is found, or become master itself.
	// start the Http Server

Run the example:

go run demo.go -c /yourpath/config.toml 

Monitor the eago's status by REST API:

curl -XGET localhost:12002?pretty


        "ClusterName": "eagles",
        "Running": false,
        "Begin At": "",
        "Elapse": "",
        "NodeNumber": 1,
        "Master": {
            "NodeName": "miao_shard1",
            "IP": "",
            "Port": 12001
        "slavers": null,
        "CrawlerStatistics": {},
        "Message": "You know, for data"

Write a crawler

see the demo, which crawls the bbs web pages. The crawler implements multiple Parsers that analysis web page and extract key information.

More updates will come




View Source
const (
	KeeperPeriod        = time.Second * 5
	HeartBeatInterval   = time.Millisecond
	MonitorMasterPeriod = time.Second * 12
View Source
const (
	Pretty        = "pretty"
	StartSuccess  = "Start Successfully"
	StopSuccess   = "Stop Successfully"
	AuthFailed    = "You do not have the authorization!"
	AddSuccess    = "Add Successfully"
	GetStatFailed = "Fail to get Statistic Info"
	Success       = "Success!"
View Source
const (
	KeyForStore = "url:%s"
	Expiration  = time.Second * 3600 * 24 * 7
View Source
const (
	STOP    = "stop"
	RUNNING = "running"
View Source
const (
	KeyForCrawledUrls = "crawledurls"
View Source
const (
	Message = "You know, for data"


View Source
var (
	Debug *log.Logger
	Log   *log.Logger
	Error *log.Logger
View Source
var (
	ErrNotClusterMember = errors.New("Not the cluster member")
	ErrNotMaster        = errors.New("I am not the master, thank you!")
	ErrNoneMaster       = errors.New("Master is not found")
	ErrDistributeUrl    = errors.New("Distribute the url to the wrong node")
View Source
var Configs = new(config)
View Source
var OneCluster sync.Once
View Source
var OneNode sync.Once

Every Node has a DefaultNode.Singleton

View Source
var Stat = NewStatistic()


func AddUrlToCrawl

func AddUrlToCrawl(c *gin.Context)

func ArbitrateConfigs

func ArbitrateConfigs(c *config)

func Authorize

func Authorize(c *gin.Context) bool

verify the operator's Authorize

func DumpHttp

func DumpHttp(req *UrlRequest)

this is to test the http requests

func GBKtoUTF

func GBKtoUTF(input string) (string, error)

func GetProfile

func GetProfile(c *gin.Context)

func Help

func Help(c *gin.Context)

func KeyForCrawlByDay

func KeyForCrawlByDay() string

func KeyForUrlStore

func KeyForUrlStore(url string) string

func KillMaster

func KillMaster(c *gin.Context)

func LoadConfig

func LoadConfig()

func NewCluster

func NewCluster()

func NewNode

func NewNode()

func Response

func Response(c *gin.Context, v interface{})

response the json if url's query string contains pretty param

func RestartCrawler

func RestartCrawler(c *gin.Context)

func StartCrawler

func StartCrawler(c *gin.Context)

func StopCrawler

func StopCrawler(c *gin.Context)


type Cluster

type Cluster struct {
	ClusterName string
	Local       *Node
	Master      *NodeInfo
	//Nodes describe the slavers' status, if true, the slaver is active,
	//otherwise, the slaver is down.
	Nodes map[*NodeInfo]bool
	// contains filtered or unexported fields
var DefaultCluster *Cluster

func GetClusterInstance

func GetClusterInstance() *Cluster

func (*Cluster) AddNode

func (c *Cluster) AddNode(node *NodeInfo)

func (*Cluster) BecomeMaster

func (c *Cluster) BecomeMaster()

Current node becomes Master, and startup tasks belong to master.

func (*Cluster) BecomeSlaver

func (c *Cluster) BecomeSlaver()

/////////////////////////////////////////////////////////////////// Functions below are for slavers ///////////////////////////////////////////////////////////////////

func (*Cluster) Discover

func (c *Cluster) Discover()

scan nodeList, call Join Rpc Method, if returns error, the remote is not the master, or set master to that node. if all the node list are not the Master, make itself Master.

func (*Cluster) GetNode

func (c *Cluster) GetNode(url string) string

GetNode will return a nodename from the nodelist by hash the url.

func (*Cluster) IsMember

func (c *Cluster) IsMember(node *NodeInfo) bool

check the node, if the node has joined in the cluster, return true

func (*Cluster) MonitorMaster

func (c *Cluster) MonitorMaster()

MonitorMaster check the heart beat package from master, If there is no HB package for 12 seconds, stop the world and discover the new master. when a new master is selected, restart the world.

func (*Cluster) PushRequest

func (c *Cluster) PushRequest(req *UrlRequest)

func (*Cluster) ResetTimer

func (c *Cluster) ResetTimer()

func (*Cluster) RestartDistributor

func (c *Cluster) RestartDistributor()

func (*Cluster) StartDistributor

func (c *Cluster) StartDistributor()

func (*Cluster) StartKeeper

func (c *Cluster) StartKeeper()

Master must detect the slavers, if a slaver is down remove it. the func is only invoked by master.

func (*Cluster) StopDistributor

func (c *Cluster) StopDistributor()

func (*Cluster) StopKeeper

func (c *Cluster) StopKeeper()

stop the keeper by closing the chan

func (*Cluster) StopTheWorld

func (c *Cluster) StopTheWorld()

func (*Cluster) UpdateSlaverStatus

func (c *Cluster) UpdateSlaverStatus(node *NodeInfo, v bool)

type Crawler

type Crawler struct {
	// Name stand for a unique identifier of the crawler.
	Name string
	// StartURL is the entrance of website to crawl.
	SeedUrls []string
	//Depth defines the website depth to crawl
	Depth int32
	// If the field is set true, the crawler will only crawl the pages
	// that are of same host address.
	InSite bool
	// the timeout of per request to the target website
	Timeout int32
	// when a request fails, it will retry 'Retry' times.
	Retry int32
	// TTL is the interval of two urls to fetch using by fetch
	TTL int32

	ParserMap map[string]Parser

	// some extra data for http request, such as Header and PostForm
	MetaData map[string]interface{}
	// contains filtered or unexported fields

Crawler implements the main work of the node. It defines some primitive info. If the current node is slave, a node will manage three entities fetcher, extractor and reporter. else, if the current node is master a distributor is appended.

func NewCrawler

func NewCrawler(name string) *Crawler

func (*Crawler) AddParser

func (c *Crawler) AddParser(name string, p Parser) *Crawler

func (*Crawler) AddSeedUrls

func (c *Crawler) AddSeedUrls(urls ...string) *Crawler

Set the Urls of the crawler

func (*Crawler) GetParam

func (c *Crawler) GetParam(key string) interface{}

func (*Crawler) GetParser

func (c *Crawler) GetParser(name string) Parser

func (*Crawler) PostRequest

func (c *Crawler) PostRequest(url, parser string, cookieJar int) *UrlRequest

func (*Crawler) Request

func (c *Crawler) Request(url, parser string, cookieJar int) *UrlRequest

func (*Crawler) SetDepth

func (c *Crawler) SetDepth(depth int32) *Crawler

Set the depth of the crawler

func (*Crawler) SetInSite

func (c *Crawler) SetInSite(inSite bool) *Crawler

Set the InsSite of the crawler

func (*Crawler) SetParam

func (c *Crawler) SetParam(key string, value interface{}) *Crawler

func (*Crawler) SetRetry

func (c *Crawler) SetRetry(retry int32) *Crawler

Set the Retry of the crawler

func (*Crawler) SetStorage

func (c *Crawler) SetStorage(st Storage) *Crawler

To customize the storage strategy.

func (*Crawler) SetTTL

func (c *Crawler) SetTTL(ttl int32) *Crawler

Set the TTL of the crawler

func (*Crawler) SetTimeout

func (c *Crawler) SetTimeout(to int32) *Crawler

Set the Timeout of the crawler

func (*Crawler) StartWith

func (c *Crawler) StartWith(call func() []*UrlRequest) *Crawler

type CrawlerStatistic

type CrawlerStatistic struct {
	Name               string `json:"CrawlerName"`
	CrawledUrlsCount   uint64 `json:"CrawledUrlsCount"`
	TotalCount         uint64 `json:"TotalUrlsCount"`
	ToCrawledUrlsCount uint64 `json:"ToCrawledUrlsCount"`

CrawlerStatistic demonstrate the crawler's basic info used by restful api to monitor current state of the crawler

func NewCrawlerStatistic

func NewCrawlerStatistic(name string) *CrawlerStatistic

type DefaultStore

type DefaultStore struct {

By default, store the response into Redis.

func NewDefaultStore

func NewDefaultStore(r *RedisClient) *DefaultStore

func (*DefaultStore) Store

func (d *DefaultStore) Store(resp *UrlResponse)

type Distributor

type Distributor struct {
	Requests RequestChan
	// contains filtered or unexported fields

func NewDistributor

func NewDistributor() *Distributor

func (*Distributor) Restart

func (r *Distributor) Restart()

this func will restart the Reporter

func (*Distributor) Run

func (r *Distributor) Run()

func (*Distributor) Status

func (r *Distributor) Status() string

func (*Distributor) Stop

func (r *Distributor) Stop()

type Extractor

type Extractor struct {
	// contains filtered or unexported fields

func NewExtractor

func NewExtractor(in ResponseChan, out RequestChan) *Extractor

func (*Extractor) Restart

func (e *Extractor) Restart()

this func will restart the Extractor

func (*Extractor) Run

func (e *Extractor) Run()

func (*Extractor) Status

func (e *Extractor) Status() string

func (*Extractor) Stop

func (e *Extractor) Stop()

type Fetcher

type Fetcher struct {
	// contains filtered or unexported fields

Fetcher is an executer doing some kind of job

func NewFetcher

func NewFetcher(in chan []*UrlRequest, out chan []*UrlResponse) *Fetcher

func (*Fetcher) Add

func (f *Fetcher) Add(req *UrlRequest)

func (*Fetcher) Restart

func (f *Fetcher) Restart()

this func will restart the Fetcher

func (*Fetcher) Run

func (f *Fetcher) Run()

It is a dead loop until the stop signal is received. every request is handled per 'ttl' seconds

func (*Fetcher) Status

func (f *Fetcher) Status() string

func (*Fetcher) Stop

func (f *Fetcher) Stop()

stop the Fetcher, in fact, it just send the STOP signal to the Fetcher itself, it is invoked by the up-level in general

type HttpServer

type HttpServer struct {
	// contains filtered or unexported fields

func NewHttpServer

func NewHttpServer(node *Node) *HttpServer

func (*HttpServer) Register

func (h *HttpServer) Register()

func (*HttpServer) Serve

func (h *HttpServer) Serve()

type Node

type Node struct {
	Info *NodeInfo
	// contains filtered or unexported fields

there is only one Node instance per go process.

var DefaultNode *Node

func GetNodeInstance

func GetNodeInstance() *Node

func (*Node) AddCrawler

func (n *Node) AddCrawler(c *Crawler)

func (*Node) AddRequest

func (n *Node) AddRequest(req *UrlRequest)

func (*Node) GetCrawler

func (n *Node) GetCrawler(name string) *Crawler

func (*Node) GetName

func (n *Node) GetName() string

func (*Node) GetStatistic

func (n *Node) GetStatistic() (*Statistic, error)

func (*Node) IsMaster

func (n *Node) IsMaster() bool

func (*Node) RemCrawler

func (n *Node) RemCrawler(name string)

func (*Node) Restart

func (n *Node) Restart()

func (*Node) Start

func (n *Node) Start()

func (*Node) Stop

func (n *Node) Stop()

type NodeInfo

type NodeInfo struct {
	NodeName string
	IP       string
	Port     uint16

NodeInfo contains the basic info of a node

func NewNodeInfo

func NewNodeInfo(name string, ip string, port uint16) *NodeInfo

constructor of NodeInfo

type Parser

type Parser func(resp *UrlResponse) (urls []*UrlRequest)

type RedisClient

type RedisClient struct {
	Clients map[string]*redis.Client
	// contains filtered or unexported fields
var DefaultRedisClient *RedisClient

func GetRedisClient

func GetRedisClient() *RedisClient

func (*RedisClient) AddClient

func (r *RedisClient) AddClient(name string, re *RedisInstance)

func (*RedisClient) GetClient

func (r *RedisClient) GetClient(key string) *redis.Client

type RedisInstance

type RedisInstance struct {
	Host string
	DB   int64
	Pool int

type Reporter

type Reporter struct {
	// contains filtered or unexported fields

func NewReporter

func NewReporter(pop RequestChan) *Reporter

func (*Reporter) Restart

func (r *Reporter) Restart()

this func will restart the Reporter

func (*Reporter) Run

func (r *Reporter) Run()

func (*Reporter) Status

func (r *Reporter) Status() string

func (*Reporter) Stop

func (r *Reporter) Stop()

type ResponseChan

type ResponseChan chan []*UrlResponse

func NewResponseChan

func NewResponseChan() ResponseChan

type RpcClient

type RpcClient struct {
	// contains filtered or unexported fields

func NewRpcClient

func NewRpcClient() *RpcClient

func (*RpcClient) AddClient

func (r *RpcClient) AddClient(node *NodeInfo)

func (*RpcClient) Distribute

func (r *RpcClient) Distribute(req *UrlRequest) error

Rpc Method at Client side as Master, to distribute the request to the slavers.

func (*RpcClient) Join

func (r *RpcClient) Join(local, node *NodeInfo) error

Invoker should send local NodeInfo to the remote

func (*RpcClient) KeepAlive

func (r *RpcClient) KeepAlive(remote *NodeInfo) error

Rpc Method at Client side as Master, to detect the slavers' status

func (*RpcClient) RemClient

func (r *RpcClient) RemClient(node *NodeInfo)

func (*RpcClient) ReportRequest

func (r *RpcClient) ReportRequest(req *UrlRequest) error

Rpc Method at Client side as slavers, Report the new reuests to the master.

func (*RpcClient) SyncStatistic

func (r *RpcClient) SyncStatistic(node *NodeInfo) (*Statistic, error)

Rpc Method at Client side as Slaver, to sync the statistic info

type RpcServer

type RpcServer struct {
	// contains filtered or unexported fields

func NewRpcServer

func NewRpcServer() *RpcServer

func (*RpcServer) Distribute

func (r *RpcServer) Distribute(req *UrlRequest) error

Rpc Method at server side as Slave, receive the req distributed from master, and add it to the crawler to fetch content.

func (*RpcServer) Join

func (r *RpcServer) Join(remote *NodeInfo) error

Rpc Method at server side as either Master or slave , if it is Master, add the remote Node and return nil, otherwise return error.

func (*RpcServer) KeepAlive

func (r *RpcServer) KeepAlive(remote *NodeInfo) error

Rpc Method at server side as slaver, response the KeepAlive request.

func (*RpcServer) Register

func (r *RpcServer) Register()

Register all the Rpc Service, they may be invoked by either the master or the slaver

func (*RpcServer) RegisterType

func (r *RpcServer) RegisterType()

func (*RpcServer) ReportRequest

func (r *RpcServer) ReportRequest(req *UrlRequest) error

Rpc Method at server side as Master, receive the request from slavers and store them to distribute

func (*RpcServer) Start

func (r *RpcServer) Start()

func (*RpcServer) Stop

func (r *RpcServer) Stop()

func (*RpcServer) SyncStatistic

func (r *RpcServer) SyncStatistic(node *NodeInfo) (*Statistic, error)

Rpc Method at server side as master, response the statistic information to the remote. check node's info

type SlaverStatus

type SlaverStatus struct {
	Alive bool `json:"alive"`

type Statistic

type Statistic struct {
	ClusterName string                       `json:"ClusterName"`
	Running     bool                         `json:"Running"`
	BeginAt     string                       `json:"Begin At"`
	Elapse      string                       `json:"Elapse"`
	NodeNum     int                          `json:"NodeNumber"`
	Master      *NodeInfo                    `json:"Master"`
	Slavers     []*SlaverStatus              `json:"slavers"`
	Crawler     map[string]*CrawlerStatistic `json:"CrawlerStatistics"`
	Message     string                       `json:"Message"`

The statistic information of the cluster, it will be updated by the master node, when the slavers need it, they must call the Rpc SyncStatistic to sync this.

func NewStatistic

func NewStatistic() *Statistic

func (*Statistic) AddCrawledCount

func (s *Statistic) AddCrawledCount(name string)

func (*Statistic) AddCrawlerStatistic

func (s *Statistic) AddCrawlerStatistic(name string) *Statistic

func (*Statistic) AddNode

func (s *Statistic) AddNode(Node *NodeInfo) *Statistic

func (*Statistic) AddTotalCount

func (s *Statistic) AddTotalCount(name string)

func (*Statistic) BeginNow

func (s *Statistic) BeginNow() *Statistic

record current time at which the crawler begin

func (*Statistic) GetCrawlerStatistic

func (s *Statistic) GetCrawlerStatistic(name string) *CrawlerStatistic

func (*Statistic) GetStatistic

func (s *Statistic) GetStatistic() *Statistic

Get the current info of the crawler cluster, this will always invoked by the master node

func (*Statistic) RemCrawlerStatistic

func (s *Statistic) RemCrawlerStatistic(name string) *Statistic

func (*Statistic) SetClusterName

func (s *Statistic) SetClusterName(name string) *Statistic

func (*Statistic) SetMaster

func (s *Statistic) SetMaster(Node *NodeInfo) *Statistic

func (*Statistic) Stop

func (s *Statistic) Stop() *Statistic

func (*Statistic) UpdateNodeAlive

func (s *Statistic) UpdateNodeAlive(Node *NodeInfo, v bool) *Statistic

type Storage

type Storage interface {
	Store(resp *UrlResponse)

You can customize the storage strategy in your application by implementing the interface Storage

type UrlRequest

type UrlRequest struct {
	Url    string
	Method string
	// Params include some key-value pairs, URL_Encode
	Headers http.Header
	Params  string
	Proxy   string

	Node      string
	CookieJar int

	Crawler string
	Parser  string

	Depth int32
	Retry int32

func NewUrlRequest

func NewUrlRequest(url, method, crawler, parser string, cookie int) *UrlRequest

func (*UrlRequest) Incr

func (u *UrlRequest) Incr() *UrlRequest

func (*UrlRequest) SetDepth

func (u *UrlRequest) SetDepth(depth int32) *UrlRequest

func (*UrlRequest) SetHeader

func (u *UrlRequest) SetHeader(header http.Header) *UrlRequest

func (*UrlRequest) SetParams

func (u *UrlRequest) SetParams(params string) *UrlRequest

func (*UrlRequest) SetProxy

func (u *UrlRequest) SetProxy(proxy string) *UrlRequest

func (*UrlRequest) SetRetry

func (u *UrlRequest) SetRetry(retry int32) *UrlRequest

func (*UrlRequest) ToRequest

func (u *UrlRequest) ToRequest() (*http.Request, error)

type UrlResponse

type UrlResponse struct {
	Src  *UrlRequest
	Resp *http.Response
	Body string

func NewResponse

func NewResponse(req *UrlRequest, resp *http.Response) *UrlResponse

type Worker

type Worker interface {

the Worker defines these three method:Run, Stop, Restart. Actually, Fetcher, Extractor, Reporter and Distributor implement this interface. Every worker has a dead loop to do a specific task in one go routine unless Stop is invoked


Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL