Documentation
¶
Index ¶
- Constants
- Variables
- func DownloadImages(textIn string, c *http.Client, fs FileStore) (string, error)
- type Config
- type DefaultRotator
- type Discollector
- func (d *Discollector) GetPlugin(name string) (*Plugin, error)
- func (d *Discollector) ListPlugins() []string
- func (d *Discollector) PluginForEntrypoint(url string, blacklist []string) (*Plugin, *HandlerOpts, error)
- func (d *Discollector) Shutdown(ctx context.Context)
- func (d *Discollector) Start(workers int) error
- type ErrorReporter
- type FileStore
- type Handler
- type HandlerOpts
- type HandlerResponse
- type Limiter
- type LocalFS
- type MemMetastore
- type MemQueue
- func (mq *MemQueue) CompleteScrape(ctx context.Context, scrapeID uuid.UUID) error
- func (mq *MemQueue) Error(ctx context.Context, qt *QueuedTask) error
- func (mq *MemQueue) Finish(ctx context.Context, qt *QueuedTask) error
- func (mq *MemQueue) Pop(ctx context.Context) (*QueuedTask, error)
- func (mq *MemQueue) Push(ctx context.Context, tasks []*QueuedTask) error
- func (mq *MemQueue) Status(ctx context.Context, scrapeID uuid.UUID) (*ScrapeStatus, error)
- type Metastore
- type NilLimiter
- type OptionFn
- func WithErrorReporter(er ErrorReporter) OptionFn
- func WithFileStore(fs FileStore) OptionFn
- func WithLimiter(l Limiter) OptionFn
- func WithMetastore(ms Metastore) OptionFn
- func WithPlugins(p ...*Plugin) OptionFn
- func WithQueue(q Queue) OptionFn
- func WithRotator(ro Rotator) OptionFn
- func WithWriter(w Writer) OptionFn
- type Plugin
- type Queue
- type QueuedTask
- type RateLimit
- type Registry
- type ReporterOpts
- type Reservation
- type Resolver
- type Rotator
- type ScheduleRequest
- type Scheduler
- type Scrape
- type ScrapeSchedule
- type ScrapeStatus
- type StdoutReporter
- type StdoutWriter
- type StubFS
- type Task
- type Worker
- type Writer
Constants ¶
const ( FullScrape = "full_scrape" DeltaScrape = "delta_scrape" )
Variables ¶
var ( ErrPluginUnregistered = errors.New("discollect: plugin not registered") ErrHandlerNotFound = errors.New("discollect: handler not found for route") ErrNoValidPluginForEntrypoint = errors.New("discollect: no plugin found for entrypoint") )
var ErrCompletedScrape = errors.New("completed scrape")
var ( // ErrRateLimitExceeded is thrown when the rate limit is exceeded ErrRateLimitExceeded = errors.New("discollect: rate limit exceeded") )
Functions ¶
Types ¶
type Config ¶
type Config struct { // friendly identifier for this config Type string // Entrypoints is used to start a scrape Entrypoints []string // Since is used to convey delta information Since time.Time // Countries is a list of countries this scrape can be executed from // in two code, ISO-3166-2 form // nil if unused Countries []string }
Config is a specific configuration of a given plugin
type DefaultRotator ¶
type DefaultRotator struct {
// contains filtered or unexported fields
}
DefaultRotator is a no-op rotator that does no proxy rotation
func NewDefaultRotator ¶
func NewDefaultRotator() *DefaultRotator
NewDefaultRotator provisions a new default rotator
type Discollector ¶
type Discollector struct {
// contains filtered or unexported fields
}
A Discollector ties every element of Discollect together
func (*Discollector) GetPlugin ¶
func (d *Discollector) GetPlugin(name string) (*Plugin, error)
GetPlugin returns the plugin with the given name
func (*Discollector) ListPlugins ¶
func (d *Discollector) ListPlugins() []string
ListPlugins lists all registered plugins
func (*Discollector) PluginForEntrypoint ¶
func (d *Discollector) PluginForEntrypoint(url string, blacklist []string) (*Plugin, *HandlerOpts, error)
GetPlugin returns the first plugin that matches the given entrypoint
func (*Discollector) Shutdown ¶
func (d *Discollector) Shutdown(ctx context.Context)
Shutdown spins down all the workers after allowing them to finish their current tasks
func (*Discollector) Start ¶
func (d *Discollector) Start(workers int) error
Start starts the scraping loops
type ErrorReporter ¶
type ErrorReporter interface {
Report(ctx context.Context, ro *ReporterOpts, err error)
}
An ErrorReporter is used to send forward faulty handler runs to a semi-permanent sink for later analysis. Generally, this can be a service such as Sentry or Bugsnag but may also be a simpler DB backend, like Postgres An ErrorReporter should discard any calls with err == nil
type FileStore ¶
A FileStore shoves files somewhere and returns a link at which they can be retrieved
type Handler ¶
type Handler func(ctx context.Context, ho *HandlerOpts, t *Task) *HandlerResponse
A Handler can handle an individual Task
type HandlerOpts ¶
type HandlerOpts struct { Config *Config // RouteParams are Capture Groups from the Route regexp RouteParams []string FileStore FileStore Client *http.Client }
HandlerOpts are passed to a Handler
type HandlerResponse ¶
A HandlerResponse is returned from a Handler
func ErrorResponse ¶
func ErrorResponse(err error) *HandlerResponse
ErrorResponse is a helper for returning an error from a Handler
func NilResponse ¶
func NilResponse() *HandlerResponse
func Response ¶
func Response(facts []interface{}, tasks ...*Task) *HandlerResponse
Response is shorthand for a successful response
type Limiter ¶
type Limiter interface { // ReserveN returns a Reservation that indicates how long the caller must // wait before n events happen. The Limiter takes this Reservation into // account when allowing future events. ReserveN returns false if n exceeds // the Limiter's burst size. Reserve(rl *RateLimit, url string, scrapeID uuid.UUID) (Reservation, error) }
A Limiter is used for per-site and per-config rate limits abstracted out into an interface so that distributed rate limiting is practical
type LocalFS ¶
type LocalFS struct {
// contains filtered or unexported fields
}
LocalFS is both a FileStore implementation backed by the filesystem, but also a http.Handler that will serve the images back up
func NewLocalFS ¶
NewLocalFS creates a LocalFS set up to save files to path and serve from staticPath
type MemMetastore ¶
type MemMetastore struct{}
MemMetastore is a metastore that only stores information in memory TODO: allow this to function again.
type MemQueue ¶
type MemQueue struct {
// contains filtered or unexported fields
}
A MemQueue is a super simple Queue backed by an array and a mutex
func (*MemQueue) CompleteScrape ¶
func (*MemQueue) Finish ¶
func (mq *MemQueue) Finish(ctx context.Context, qt *QueuedTask) error
Finish is a no-op for the MemQueue
func (*MemQueue) Pop ¶
func (mq *MemQueue) Pop(ctx context.Context) (*QueuedTask, error)
Pop pops a single task off the left side of the array
type Metastore ¶
type Metastore interface { // StartScrapes selects a number of currently STOPPED scrapes, moves them to // RUNNING and returns their details StartScrapes(ctx context.Context, limit int) ([]*Scrape, error) // ListScrapes is used to list and filter scrapes, for both session resumption // and UI purposes ListScrapes(ctx context.Context, statusFilter string, limit, offset int) ([]*Scrape, error) // FindMissingSchedules adds scrapes that should be run to the future set FindMissingSchedules(ctx context.Context, limit int) ([]*ScheduleRequest, error) InsertSchedule(context.Context, *ScheduleRequest, []*ScrapeSchedule) error // EndScrape marks a scrape as SUCCESS and records the number of datums and // tasks returned EndScrape(ctx context.Context, id uuid.UUID, datums, retries, tasks int) error // ErrorScrape marks a scrape as ERRORED and adds the error to its list ErrorScrape(ctx context.Context, id uuid.UUID, err error) error }
A Metastore is used to store the history of all scrape runs and enough meta information to allow session resumption on restart of hydrocarbon
type NilLimiter ¶
type NilLimiter struct{}
A NilLimiter is a Limiter that doesn't restrict anything
func (*NilLimiter) Reserve ¶
func (*NilLimiter) Reserve(rl *RateLimit, url string, scrapeID uuid.UUID) (Reservation, error)
Reserve returns a dummy reservation that always waits one second
type OptionFn ¶
type OptionFn func(d *Discollector) error
An OptionFn is used to pass options to a Discollector
func WithErrorReporter ¶
func WithErrorReporter(er ErrorReporter) OptionFn
WithErrorReporter sets the ErrorReporter for the Discollector
func WithFileStore ¶
WithFileStore sets the FileStore for the Discollector
func WithLimiter ¶
WithLimiter sets the Limiter for the Discollector
func WithMetastore ¶
WithMetastore sets the Metastore for the Discollector
func WithRotator ¶
WithRotator sets the Rotator for the Discollector
func WithWriter ¶
WithWriter sets the Writer for the Discollector
type Plugin ¶
type Plugin struct { Name string Configs []*Config // RateLimit is set per-plugin RateLimit *RateLimit // a list of valid Entrypoint patterns for this plugin, can easily just be `.*` // especially if it merits further testing via the ConfigCreator // this gets compiled into regexps at boot Entrypoints []string // A ConfigCreator is used to validate submitted entrypoints and convert // them into a fully valid config as well as returning the normalized title ConfigCreator func(url string, ho *HandlerOpts) (string, *Config, error) // the Scheduler looks into the past and tells the future Scheduler func(*ScheduleRequest) ([]*ScrapeSchedule, error) // map of regexp to Handler Routes map[string]Handler }
A Plugin is capable of running scrapes, ideally of a common type or against a single site
type Queue ¶
type Queue interface { Pop(ctx context.Context) (*QueuedTask, error) Push(ctx context.Context, tasks []*QueuedTask) error Finish(ctx context.Context, qt *QueuedTask) error Error(ctx context.Context, qt *QueuedTask) error Status(ctx context.Context, scrapeID uuid.UUID) (*ScrapeStatus, error) CompleteScrape(ctx context.Context, scrapeID uuid.UUID) error }
A Queue is used to submit and retrieve individual tasks
type QueuedTask ¶
type QueuedTask struct { // set by the TaskQueue TaskID uuid.UUID `json:"task_id"` ScrapeID uuid.UUID `json:"scrape_id"` QueuedAt time.Time `json:"queued_at"` Config *Config `json:"config"` Plugin string `json:"plugin"` Retries int `json:"retries"` Task *Task `json:"task"` }
A QueuedTask is the struct for a task that goes on the Queue
type RateLimit ¶
type RateLimit struct { // Rate a single IP can make requests per second PerIP float64 // Rate the entire scrape can operate at per second PerScrape float64 // Rate per domain using the publicsuffix list to differentiate per second PerDomain float64 }
RateLimit is a wrapper struct around a variety of per-config rate limits
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
A Registry stores and indexes all available plugins
func NewRegistry ¶
NewRegistry indexes a list of plugins and precomputes the routing table
func (*Registry) HandlerFor ¶
HandlerFor is the core "router" used to point Tasks to an individual Handler
type ReporterOpts ¶
ReporterOpts is used to attach additional information to an error
type Reservation ¶
type Reservation interface { // Cancel indicates that the reservation holder will not perform the // reserved action and reverses the effects of this Reservation on the rate // limit as much as possible, considering that other reservations may have // already been made. Cancel() // OK returns whether the limiter can provide the requested number of tokens // within the maximum wait time. If OK is false, Delay returns InfDuration, // and Cancel does nothing. OK() bool // Delay returns the duration for which the reservation holder must wait // before taking the reserved action. Zero duration means act immediately. // InfDuration means the limiter cannot grant the tokens requested in this // Reservation within the maximum wait time. Delay() time.Duration }
A Reservation holds information about events that are permitted by a Limiter to happen after a delay. A Reservation may be canceled, which may enable the Limiter to permit additional events.
type Resolver ¶
type Resolver struct {
// contains filtered or unexported fields
}
Resolver watches for scrapes that should be marked complete.
type Rotator ¶
Rotator is a proxy rotator interface capable of rotating and rate limiting between many IPs TODO(fortytw2): this interface is totally wrong, needs rate limits in it
type ScheduleRequest ¶
type ScheduleRequest struct { Plugin string FeedID uuid.UUID LatestScrapes []*Scrape LatestDatums interface{} }
A ScheduleRequest is used to ask for future schedules
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
A Scheduler initiates new scrapes according to plugin-level schedules
type Scrape ¶
type Scrape struct { ID uuid.UUID `json:"id"` FeedID uuid.UUID `json:"feed_id"` CreatedAt time.Time `json:"created_at"` ScheduledStartAt time.Time `json:"scheduled_start_at"` StartedAt time.Time `json:"started_at"` EndedAt time.Time `json:"ended_at"` State string `json:"state"` Errors []string `json:"errors"` TotalDatums int `json:"total_datums"` TotalRetries int `json:"total_retries"` TotalTasks int `json:"total_tasks"` Plugin string `json:"plugin"` Config *Config `json:"config"` }
type ScrapeSchedule ¶
A ScrapeSchedule adds to the future
func DefaultScheduler ¶
func DefaultScheduler(sr *ScheduleRequest) ([]*ScrapeSchedule, error)
DefaultScheduler uses a simple heuristic to predict when to next scrape.
func NeverSchedule ¶
func NeverSchedule(sr *ScheduleRequest) ([]*ScrapeSchedule, error)
NeverSchedule simple never schedules another scrape
type ScrapeStatus ¶
type ScrapeStatus struct { TotalTasks int `json:"total_tasks,omitempty"` InFlightTasks int `json:"in_flight_tasks,omitempty"` CompletedTasks int `json:"completed_tasks,omitempty"` RetriedTasks int `json:"retried_tasks,omitempty"` }
ScrapeStatus is returned from a Queue with information about a specific scrape
type StdoutReporter ¶
type StdoutReporter struct{}
StdoutReporter writes all errors to Stdout
func (StdoutReporter) Report ¶
func (StdoutReporter) Report(_ context.Context, ro *ReporterOpts, err error)
Report prints out the error
type StdoutWriter ¶
type StdoutWriter struct{}
StdoutWriter fmt.Printfs to stdout
func (*StdoutWriter) Close ¶
func (sw *StdoutWriter) Close() error
Close is a no-op function so the StdoutWriter works
type StubFS ¶
type StubFS struct {
URL string
}
type Task ¶
type Task struct { URL string `json:"url"` // Extra can be used to send information from a parent task to its children Extra map[string]json.RawMessage `json:"extra,omitempty"` // Timeout is the timeout a single task should have attached to it // defaults to 15s Timeout time.Duration }
A Task generally maps to a single HTTP request, but sometimes more than one may be made
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
A Worker is a single-threaded worker that pulls a single task from the queue at a time and process it to completion