ingest

package module
v0.0.0-...-2ac6bf7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 13, 2016 License: Apache-2.0 Imports: 9 Imported by: 0

README

Ingest

Data ingestion framework for Go

Deprecated This was used to experiment with ideas that were used in urbint/ingest

Documentation

Index

Constants

This section is empty.

Variables

View Source
var AbortTimeout = time.Second * 10

AbortTimeout is the duration after which aborting will be assumed as timed out. It will be logged as a warning

Functions

This section is empty.

Types

type EmptyLogger

type EmptyLogger struct{}

EmptyLogger is the Default Logger used. It wont log. We recommend you set it to something better.

func (*EmptyLogger) Debug

func (e *EmptyLogger) Debug(args ...interface{})

Debug placeholder

func (*EmptyLogger) Error

func (e *EmptyLogger) Error(args ...interface{})

Error placeholder

func (*EmptyLogger) Info

func (e *EmptyLogger) Info(args ...interface{})

Info placeholder

func (*EmptyLogger) Warn

func (e *EmptyLogger) Warn(args ...interface{})

Warn placeholder

func (*EmptyLogger) WithError

func (e *EmptyLogger) WithError(err error) Logger

WithError placeholder

func (*EmptyLogger) WithField

func (e *EmptyLogger) WithField(field string, value interface{}) Logger

WithField placeholder

type HasDefaultOptions

type HasDefaultOptions interface {
	Runner
	DefaultOpts() ThenOpts
}

HasDefaultOptions is an interface which a Runner can implement that allows it to specify default ThenOpts

type InStream

type InStream struct {
	// contains filtered or unexported fields
}

An InStream is a runner that will pass data from the input channel down to future runners

func NewInStream

func NewInStream(name string, in chan interface{}) *InStream

NewInStream builds an input stream for the specified input channel

func (*InStream) Name

func (i *InStream) Name() string

Name implements Runner for InStream

func (*InStream) Run

func (i *InStream) Run(stage *Stage) error

Run implements Runner for InStream

func (*InStream) SkipAbortErr

func (i *InStream) SkipAbortErr() bool

SkipAbortErr saves us having to send nil errors back on abort

type Job

type Job struct {
	// contains filtered or unexported fields
}

A Job is a control structure for interacting with a Pipeline

func NewJob

func NewJob(pipeline Pipeline) *Job

NewJob builds a job with the specified pipeline

func (*Job) Abort

func (j *Job) Abort() <-chan error

Abort aborts the job and cancels all running processors

It returns a channel of errors encountered while aborting

func (*Job) Error

func (j *Job) Error() error

Error returns any error that the Job encountered while running

func (*Job) Run

func (j *Job) Run() error

Run runs the Job and blocks until it has completed

It returns any error that occured anywhere in the pipeline

func (*Job) RunAsync

func (j *Job) RunAsync() <-chan error

RunAsync runs the job and returns an error channel that will emit the result of the job and then be closed

func (*Job) Start

func (j *Job) Start() *Job

Start starts the job but does not block

It returns itself for a chainable API

func (*Job) Wait

func (j *Job) Wait() error

Wait waits for the job to complete and returns any error encountered

type Logger

type Logger interface {
	WithError(err error) Logger
	WithField(field string, value interface{}) Logger
	Debug(args ...interface{})
	Info(args ...interface{})
	Warn(args ...interface{})
	Error(args ...interface{})
}

Logger is a logging interface which mirrors pico.Logger, apex.Logger, and logrus.Logger

var DefaultLogger Logger = &EmptyLogger{}

DefaultLogger is where ingest will log to. By default, it wont log. We recommend setting it to Logrus, or Apex, or your own.

type NoErrAbortRunner

type NoErrAbortRunner interface {
	Runner
	SkipAbortErr() bool
}

NoErrAbortRunner allows a runner to avoid having to send an error back on Abort

type NoOpRunner

type NoOpRunner interface {
	Runner
	NoOpRunner() bool
}

NoOpRunner allows a runner to specify that it shouldn't be added to the run pipeline at add time

type OnAdd

type OnAdd interface {
	Runner
	OnAdd(prevRunner Runner)
}

OnAdd is an interface which a Runner can implement to allow it to hook in to being added to the pipeline

type OnDone

type OnDone interface {
	Runner
	OnPipelineDone() error
}

OnDone is an interface which a Runner can implement to allow it to run code after the pipeline has completed running fully

type OpenOpts

type OpenOpts struct {
	// If TempDir is specified, remote files will be downloaded to a temporary directory in full before
	// being being emitted to the next stage
	TempDir string

	// Logger is the logger that the Opener will log to
	Logger Logger

	// StreamProgressTo is used to specify a channel which will receive a count of bytes for files that are being downloaded
	// Currently only works if TempDir is specified. Opening will NOT be blocked by this channel blocking
	StreamProgressTo chan int64
}

OpenOpts is used to configure how a file is opened

type Opener

type Opener struct {
	Opts OpenOpts
	// contains filtered or unexported fields
}

An Opener is a Runner that opens files

func NewOpener

func NewOpener(path string, opts ...OpenOpts) *Opener

NewOpener builds an Opener which will open the specified path

func (*Opener) Name

func (o *Opener) Name() string

Name implements the Runner interface for the Opener

func (*Opener) OnPipelineDone

func (o *Opener) OnPipelineDone() error

OnPipelineDone implements ingest.OnDone for Opener

func (*Opener) Run

func (o *Opener) Run(stage *Stage) error

Run implements Runner for Opener

func (*Opener) SetSelection

func (o *Opener) SetSelection(selection ...string)

SetSelection implements ingest.Selectable for Opener

func (*Opener) SkipAbortErr

func (o *Opener) SkipAbortErr() bool

SkipAbortErr saves us having to send nil errors back on abort

type PassOnAddTarget

type PassOnAddTarget interface {
	Runner
	PassOnAddTarget() bool
}

PassOnAddTarget is an interface which a Runner can implement to cause OnAdd hooks that would target it to target the previous targetable interface

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

A Pipeline is a sequence of ingest.Processors that are linked together

The processors will share common control channels (Err, Quit) which are managed via the ingest.Job (which is created by calling Build or Run on the Pipeline).

func NewPipeline

func NewPipeline() *Pipeline

NewPipeline instantiates a new pipeline for use

func Open

func Open(path string, opts ...OpenOpts) *Pipeline

Open is a shortcut to create a new Pipeline that starts with the specified path or file.

If the path is a directory, files can be selected from the directory using ingest.Select If the path is a file, the file will be emitted to the next Processor

func StartWith

func StartWith(data interface{}) *Pipeline

StartWith will build a pipeline that emits the provided argument as a first step

func StreamFrom

func StreamFrom(input chan interface{}) *Pipeline

StreamFrom will build a pipeline that reads events from the provided channel

func (*Pipeline) Build

func (p *Pipeline) Build() *Job

Build builds the pipeline and returns a Job control structure

func (*Pipeline) ForEach

func (p *Pipeline) ForEach(fn TransformFn, nameArg ...string) *Pipeline

ForEach runs a transform function on each record in the pipeline.

The record returned will be forwarded to the later stages Returning an error will cause the pipeline to fail An optional name can be specified as a string argument and will be used for logging

func (*Pipeline) StreamTo

func (p *Pipeline) StreamTo(out chan interface{}, opt ...StreamToOpt) *Pipeline

StreamTo causes the pipeline emit records at that stage a channel passed as an argument

An optional name can be specified as a string argument

func (*Pipeline) Then

func (p *Pipeline) Then(runner Runner, opts ...ThenOpts) *Pipeline

Then queues the provided processor to opperate on the data emitted by the previous stage.

Then returns the pipeline for a chainable API

type Runner

type Runner interface {
	Name() string
	Run(*Stage) error
}

Runner is an interface which can be processed by a pipeline

type Selectable

type Selectable interface {
	Runner
	SetSelection(selection ...string)
}

Selectable is an interface that a Runner can implement which allows it to be filtered via the SetSelection method

type Selector

type Selector struct {
	// contains filtered or unexported fields
}

Selector is a NoOp runner that will configure the previous stage

func Select

func Select(selection ...string) *Selector

Select builds a NoOp Selector runner that will configure the previous stage

func (*Selector) Name

func (s *Selector) Name() string

Name implements Runner for Selector

func (*Selector) NoOpRunner

func (s *Selector) NoOpRunner() bool

NoOpRunner implements NoOpRunner interface for Selector

func (*Selector) OnAdd

func (s *Selector) OnAdd(prevRunner Runner)

OnAdd implements OnAdd interface for Selector

func (*Selector) Run

func (s *Selector) Run(stage *Stage) error

Run implements Runner for Selector

type Stage

type Stage struct {
	In    chan interface{}
	Out   chan interface{}
	Abort <-chan chan error
}

A Stage is a control structure passed to an induvidual Runner

func NewStage

func NewStage() *Stage

NewStage builds a blank Stage.

It is mostly to facilitate testing and rarely called directly

type StreamToOpt

type StreamToOpt struct {
	// Name is used for debugging and logging
	Name string
	// NoClose keeps the out channel open even once the stage before it has finished
	NoClose bool
}

StreamToOpt are optional options to StreamTo

type ThenOpts

type ThenOpts struct {
	// InBuffer is the size of the buffer for the input channel
	InBuffer int
	// OutBuffer is the size of the buffer for the output channel
	OutBuffer int
}

ThenOpts are an optional argument to configure the behavior of calling Then

type TransformFn

type TransformFn func(rec interface{}) (res interface{}, err error)

A TransformFn is the function signature used by the TransformStream

type TransformStream

type TransformStream struct {
	// contains filtered or unexported fields
}

A TransformStream is a stream which will run a transform function on all entries before emitting them back out

func NewTransformStream

func NewTransformStream(name string, transformFn TransformFn) *TransformStream

NewTransformStream builds an input stream for the specified input channel

func (*TransformStream) Name

func (t *TransformStream) Name() string

Name implements Runner for TransformStream

func (*TransformStream) Run

func (t *TransformStream) Run(stage *Stage) error

Run implements Runner for TransformStream

func (*TransformStream) SkipAbortErr

func (t *TransformStream) SkipAbortErr() bool

SkipAbortErr saves us having to send nil errors back on abort

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL