processor

package
v0.0.0-...-b7db0ac Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 12, 2017 License: MPL-2.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	//RandomSource ...
	RandomSource = rand.NewSource(time.Now().UnixNano())
	//Random ...
	Random = rand.New(RandomSource)

	//RandomMax ...
	RandomMax = func(max int) int {
		rand.Seed(time.Now().UnixNano())
		return rand.Intn(max)
	}
)
View Source
var (
	//TestServer ...
	TestServer *rizo.RequestRecordingServer
)

Functions

func AfterTest

func AfterTest()

func BeforeTest

func BeforeTest()

func CreatePlanFromURLList

func CreatePlanFromURLList(config *config.Configuration) core.Plan

CreatePlanFromURLList ...

Types

type Control

type Control interface {
	Start(*config.Configuration) (*ExecutionID, error)
	Stop(*ExecutionID) statistics.AggregatorSnapShot
	Status(*ExecutionID) statistics.AggregatorSnapShot
	History() []*ExecutionID
	Events() <-chan string
}

Control ...

func NewControl

func NewControl(bar ProgressBar, registry core.Registry) Control

NewControl ...

type Controller

type Controller struct {
	// contains filtered or unexported fields
}

Controller ...

func (*Controller) Events

func (instance *Controller) Events() <-chan string

Events ...

func (*Controller) History

func (instance *Controller) History() []*ExecutionID

History ...

func (*Controller) Start

func (instance *Controller) Start(config *config.Configuration) (*ExecutionID, error)

Start ...

func (*Controller) Status

Status ...

func (*Controller) Stop

Stop ... A1

type ExecutionBranch

type ExecutionBranch interface {
	Execute() error
}

ExecutionBranch ...

type ExecutionID

type ExecutionID struct {
	// contains filtered or unexported fields
}

ExecutionID ...

func NewExecutionID

func NewExecutionID() ExecutionID

NewExecutionID ...

func (ExecutionID) String

func (id ExecutionID) String() string

String ...

type Host

type Host interface {
	SetControl(*Control)
}

Host ...

type JobDurationStream

type JobDurationStream struct {
	// contains filtered or unexported fields
}

JobDurationStream ...

func CreateJobDurationStream

func CreateJobDurationStream(stream JobStream, duration time.Duration) *JobDurationStream

CreateJobDurationStream ...

func (*JobDurationStream) HasNext

func (instance *JobDurationStream) HasNext() bool

HasNext ...

func (*JobDurationStream) Next

func (instance *JobDurationStream) Next() core.Job

Next ...

func (*JobDurationStream) Progress

func (instance *JobDurationStream) Progress() int

Progress ...

func (*JobDurationStream) Reset

func (instance *JobDurationStream) Reset()

Reset ...

func (*JobDurationStream) Size

func (instance *JobDurationStream) Size() int

Size ...

type JobIterationStream

type JobIterationStream struct {
	// contains filtered or unexported fields
}

JobIterationStream ...

func CreateJobIterationStream

func CreateJobIterationStream(stream JobRevolvingStream, jobCount int, iterations int) *JobIterationStream

CreateJobIterationStream ...

func (*JobIterationStream) HasNext

func (instance *JobIterationStream) HasNext() bool

HasNext ...

func (*JobIterationStream) Next

func (instance *JobIterationStream) Next() core.Job

Next ...

func (*JobIterationStream) Progress

func (instance *JobIterationStream) Progress() int

Progress ...

func (*JobIterationStream) Reset

func (instance *JobIterationStream) Reset()

Reset ...

func (*JobIterationStream) Size

func (instance *JobIterationStream) Size() int

Size ...

type JobRandomStream

type JobRandomStream struct {
	// contains filtered or unexported fields
}

JobRandomStream ...

func CreateJobRandomStream

func CreateJobRandomStream(items []core.Job) *JobRandomStream

CreateJobRandomStream ...

func (*JobRandomStream) HasNext

func (instance *JobRandomStream) HasNext() bool

HasNext ...

func (*JobRandomStream) Next

func (instance *JobRandomStream) Next() core.Job

Next ...

func (*JobRandomStream) Progress

func (instance *JobRandomStream) Progress() int

Progress ...

func (*JobRandomStream) Reset

func (instance *JobRandomStream) Reset()

Reset ...

func (*JobRandomStream) Size

func (instance *JobRandomStream) Size() int

Size ...

type JobRevolvingStream

type JobRevolvingStream struct {
	// contains filtered or unexported fields
}

JobRevolvingStream ...

func CreateJobRevolvingStream

func CreateJobRevolvingStream(stream JobStream) *JobRevolvingStream

CreateJobRevolvingStream ...

func (JobRevolvingStream) HasNext

func (instance JobRevolvingStream) HasNext() bool

HasNext ...

func (JobRevolvingStream) Next

func (instance JobRevolvingStream) Next() core.Job

Next ...

func (JobRevolvingStream) Progress

func (instance JobRevolvingStream) Progress() int

Progress ...

func (JobRevolvingStream) Reset

func (instance JobRevolvingStream) Reset()

Reset ...

func (JobRevolvingStream) Size

func (instance JobRevolvingStream) Size() int

Size ...

type JobSequentialStream

type JobSequentialStream struct {
	// contains filtered or unexported fields
}

JobSequentialStream ...

func CreateJobSequentialStream

func CreateJobSequentialStream(items []core.Job) *JobSequentialStream

CreateJobSequentialStream ...

func (*JobSequentialStream) HasNext

func (instance *JobSequentialStream) HasNext() bool

HasNext ...

func (*JobSequentialStream) Next

func (instance *JobSequentialStream) Next() core.Job

Next ...

func (*JobSequentialStream) Progress

func (instance *JobSequentialStream) Progress() int

Progress ...

func (*JobSequentialStream) Reset

func (instance *JobSequentialStream) Reset()

Reset ...

func (*JobSequentialStream) Size

func (instance *JobSequentialStream) Size() int

Size ...

type JobStream

type JobStream interface {
	HasNext() bool
	Next() core.Job
	Reset()
	Progress() int
	Size() int
}

JobStream ...

func CreateJobStream

func CreateJobStream(jobs []core.Job, config *config.Configuration) JobStream

CreateJobStream ...

type ListRingRevolver

type ListRingRevolver struct {
	Lists map[string]ListStream
}

ListRingRevolver ...

func NewListRingRevolver

func NewListRingRevolver(data map[string][]map[string]interface{}) *ListRingRevolver

NewListRingRevolver ...

func (*ListRingRevolver) Values

func (instance *ListRingRevolver) Values() map[string]interface{}

Values ...

type ListStream

type ListStream interface {
	HasNext() bool
	Next() map[string]interface{}
	Reset()
	Size() int
}

ListStream ...

type PlanExecutionContext

type PlanExecutionContext struct {
	Plan         core.Plan
	Lists        *ListRingRevolver
	Config       *config.Configuration
	Publisher    chan core.ExecutionResult
	PlanContext  core.ExtractionResult
	JobContexts  map[int]core.ExtractionResult
	StepContexts map[int]map[int]core.ExtractionResult
	Bar          ProgressBar
	// contains filtered or unexported fields
}

PlanExecutionContext encapsulates the runtime state in order to execute a plan

type PlanExecutor

type PlanExecutor struct {
	Config *config.Configuration
	Bar    ProgressBar

	Publisher    chan core.ExecutionResult
	Lists        *ListRingRevolver
	Plan         core.Plan
	PlanContext  core.ExtractionResult
	JobContexts  map[int]core.ExtractionResult
	StepContexts map[int]map[int]core.ExtractionResult
	// contains filtered or unexported fields
}

PlanExecutor ...

func CreatePlanExecutor

func CreatePlanExecutor(config *config.Configuration, bar ProgressBar, registry core.Registry, aggregator statistics.AggregatorInterfaceToRenameLater, publisher chan core.ExecutionResult) *PlanExecutor

CreatePlanExecutor ...

func (*PlanExecutor) Execute

func (instance *PlanExecutor) Execute() error

Execute ...

type ProgressBar

type ProgressBar interface {
	Set(progress int) error
}

ProgressBar ...

type RevolvingListStream

type RevolvingListStream struct {
	// contains filtered or unexported fields
}

RevolvingListStream ...

func NewRevolvingListStream

func NewRevolvingListStream(data []map[string]interface{}) *RevolvingListStream

NewRevolvingListStream ...

func (*RevolvingListStream) HasNext

func (instance *RevolvingListStream) HasNext() bool

HasNext ...

func (*RevolvingListStream) Next

func (instance *RevolvingListStream) Next() map[string]interface{}

Next ...

func (*RevolvingListStream) Reset

func (instance *RevolvingListStream) Reset()

Reset ...

func (*RevolvingListStream) Size

func (instance *RevolvingListStream) Size() int

Size ...

type StepDelayStream

type StepDelayStream struct {
	// contains filtered or unexported fields
}

StepDelayStream ...

func CreateStepDelayStream

func CreateStepDelayStream(stream StepStream, delay time.Duration) StepDelayStream

CreateStepDelayStream ...

func (StepDelayStream) HasNext

func (instance StepDelayStream) HasNext() bool

HasNext ...

func (StepDelayStream) Next

func (instance StepDelayStream) Next() core.Step

Next ...

func (StepDelayStream) Progress

func (instance StepDelayStream) Progress() int

Progress ...

func (StepDelayStream) Reset

func (instance StepDelayStream) Reset()

Reset ...

func (StepDelayStream) Size

func (instance StepDelayStream) Size() int

Size ...

type StepSequentialStream

type StepSequentialStream struct {
	// contains filtered or unexported fields
}

StepSequentialStream ...

func CreateStepSequentialStream

func CreateStepSequentialStream(items []core.Step) *StepSequentialStream

CreateStepSequentialStream ...

func (*StepSequentialStream) HasNext

func (instance *StepSequentialStream) HasNext() bool

HasNext ...

func (*StepSequentialStream) Next

func (instance *StepSequentialStream) Next() core.Step

Next ...

func (*StepSequentialStream) Progress

func (instance *StepSequentialStream) Progress() int

Progress ...

func (*StepSequentialStream) Reset

func (instance *StepSequentialStream) Reset()

Reset ...

func (*StepSequentialStream) Size

func (instance *StepSequentialStream) Size() int

Size ...

type StepStream

type StepStream interface {
	HasNext() bool
	Next() core.Step
	Reset()
	Progress() int
	Size() int
}

StepStream ...

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL