Documentation ¶
Index ¶
- Variables
- type EmptyLogger
- func (e *EmptyLogger) Debug(args ...interface{})
- func (e *EmptyLogger) Error(args ...interface{})
- func (e *EmptyLogger) Info(args ...interface{})
- func (e *EmptyLogger) Warn(args ...interface{})
- func (e *EmptyLogger) WithError(err error) Logger
- func (e *EmptyLogger) WithField(field string, value interface{}) Logger
- type HasDefaultOptions
- type InStream
- type Job
- type Logger
- type NoErrAbortRunner
- type NoOpRunner
- type OnAdd
- type OnDone
- type OpenOpts
- type Opener
- type PassOnAddTarget
- type Pipeline
- type Runner
- type Selectable
- type Selector
- type Stage
- type StreamToOpt
- type ThenOpts
- type TransformFn
- type TransformStream
Constants ¶
This section is empty.
Variables ¶
var AbortTimeout = time.Second * 10
AbortTimeout is the duration after which aborting will be assumed as timed out. It will be logged as a warning
Functions ¶
This section is empty.
Types ¶
type EmptyLogger ¶
type EmptyLogger struct{}
EmptyLogger is the Default Logger used. It wont log. We recommend you set it to something better.
func (*EmptyLogger) WithError ¶
func (e *EmptyLogger) WithError(err error) Logger
WithError placeholder
func (*EmptyLogger) WithField ¶
func (e *EmptyLogger) WithField(field string, value interface{}) Logger
WithField placeholder
type HasDefaultOptions ¶
HasDefaultOptions is an interface which a Runner can implement that allows it to specify default ThenOpts
type InStream ¶
type InStream struct {
// contains filtered or unexported fields
}
An InStream is a runner that will pass data from the input channel down to future runners
func NewInStream ¶
NewInStream builds an input stream for the specified input channel
func (*InStream) SkipAbortErr ¶
SkipAbortErr saves us having to send nil errors back on abort
type Job ¶
type Job struct {
// contains filtered or unexported fields
}
A Job is a control structure for interacting with a Pipeline
func (*Job) Abort ¶
Abort aborts the job and cancels all running processors
It returns a channel of errors encountered while aborting
func (*Job) Run ¶
Run runs the Job and blocks until it has completed
It returns any error that occured anywhere in the pipeline
func (*Job) RunAsync ¶
RunAsync runs the job and returns an error channel that will emit the result of the job and then be closed
type Logger ¶
type Logger interface { WithError(err error) Logger WithField(field string, value interface{}) Logger Debug(args ...interface{}) Info(args ...interface{}) Warn(args ...interface{}) Error(args ...interface{}) }
Logger is a logging interface which mirrors pico.Logger, apex.Logger, and logrus.Logger
var DefaultLogger Logger = &EmptyLogger{}
DefaultLogger is where ingest will log to. By default, it wont log. We recommend setting it to Logrus, or Apex, or your own.
type NoErrAbortRunner ¶
NoErrAbortRunner allows a runner to avoid having to send an error back on Abort
type NoOpRunner ¶
NoOpRunner allows a runner to specify that it shouldn't be added to the run pipeline at add time
type OnAdd ¶
OnAdd is an interface which a Runner can implement to allow it to hook in to being added to the pipeline
type OnDone ¶
OnDone is an interface which a Runner can implement to allow it to run code after the pipeline has completed running fully
type OpenOpts ¶
type OpenOpts struct { // If TempDir is specified, remote files will be downloaded to a temporary directory in full before // being being emitted to the next stage TempDir string // Logger is the logger that the Opener will log to Logger Logger // StreamProgressTo is used to specify a channel which will receive a count of bytes for files that are being downloaded // Currently only works if TempDir is specified. Opening will NOT be blocked by this channel blocking StreamProgressTo chan int64 }
OpenOpts is used to configure how a file is opened
type Opener ¶
type Opener struct { Opts OpenOpts // contains filtered or unexported fields }
An Opener is a Runner that opens files
func (*Opener) OnPipelineDone ¶
OnPipelineDone implements ingest.OnDone for Opener
func (*Opener) SetSelection ¶
SetSelection implements ingest.Selectable for Opener
func (*Opener) SkipAbortErr ¶
SkipAbortErr saves us having to send nil errors back on abort
type PassOnAddTarget ¶
PassOnAddTarget is an interface which a Runner can implement to cause OnAdd hooks that would target it to target the previous targetable interface
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
A Pipeline is a sequence of ingest.Processors that are linked together
The processors will share common control channels (Err, Quit) which are managed via the ingest.Job (which is created by calling Build or Run on the Pipeline).
func Open ¶
Open is a shortcut to create a new Pipeline that starts with the specified path or file.
If the path is a directory, files can be selected from the directory using ingest.Select If the path is a file, the file will be emitted to the next Processor
func StartWith ¶
func StartWith(data interface{}) *Pipeline
StartWith will build a pipeline that emits the provided argument as a first step
func StreamFrom ¶
func StreamFrom(input chan interface{}) *Pipeline
StreamFrom will build a pipeline that reads events from the provided channel
func (*Pipeline) ForEach ¶
func (p *Pipeline) ForEach(fn TransformFn, nameArg ...string) *Pipeline
ForEach runs a transform function on each record in the pipeline.
The record returned will be forwarded to the later stages Returning an error will cause the pipeline to fail An optional name can be specified as a string argument and will be used for logging
func (*Pipeline) StreamTo ¶
func (p *Pipeline) StreamTo(out chan interface{}, opt ...StreamToOpt) *Pipeline
StreamTo causes the pipeline emit records at that stage a channel passed as an argument
An optional name can be specified as a string argument
type Selectable ¶
Selectable is an interface that a Runner can implement which allows it to be filtered via the SetSelection method
type Selector ¶
type Selector struct {
// contains filtered or unexported fields
}
Selector is a NoOp runner that will configure the previous stage
func (*Selector) NoOpRunner ¶
NoOpRunner implements NoOpRunner interface for Selector
type Stage ¶
type Stage struct { In chan interface{} Out chan interface{} Abort <-chan chan error }
A Stage is a control structure passed to an induvidual Runner
type StreamToOpt ¶
type StreamToOpt struct { // Name is used for debugging and logging Name string // NoClose keeps the out channel open even once the stage before it has finished NoClose bool }
StreamToOpt are optional options to StreamTo
type ThenOpts ¶
type ThenOpts struct { // InBuffer is the size of the buffer for the input channel InBuffer int // OutBuffer is the size of the buffer for the output channel OutBuffer int }
ThenOpts are an optional argument to configure the behavior of calling Then
type TransformFn ¶
type TransformFn func(rec interface{}) (res interface{}, err error)
A TransformFn is the function signature used by the TransformStream
type TransformStream ¶
type TransformStream struct {
// contains filtered or unexported fields
}
A TransformStream is a stream which will run a transform function on all entries before emitting them back out
func NewTransformStream ¶
func NewTransformStream(name string, transformFn TransformFn) *TransformStream
NewTransformStream builds an input stream for the specified input channel
func (*TransformStream) Name ¶
func (t *TransformStream) Name() string
Name implements Runner for TransformStream
func (*TransformStream) Run ¶
func (t *TransformStream) Run(stage *Stage) error
Run implements Runner for TransformStream
func (*TransformStream) SkipAbortErr ¶
func (t *TransformStream) SkipAbortErr() bool
SkipAbortErr saves us having to send nil errors back on abort