pipeline

package
v0.33.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 24, 2021 License: GPL-2.0, GPL-3.0 Imports: 18 Imported by: 0

Documentation

Overview

Package pipeline provides models to represent pipeline files and runs

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CollectReferences added in v0.32.32

func CollectReferences(references []Reference) ([]*string, []map[string]interface{}, []string)

func ParseArguments added in v0.32.2

func ParseArguments(
	middlewareArguments interface{},
	middlewareIdentifier string,
	run *Run,
) bool

ParseArguments transfers an unstructured map of arguments into a struct

func ParseArgumentsIncludingParents added in v0.32.2

func ParseArgumentsIncludingParents(
	middlewareArguments interface{},
	middlewareIdentifier string,
	run *Run,
) bool

ParseArgumentsIncludingParents is like ParseArguments, but will traverse through parents if no suitable has been found

Types

type Arguments added in v0.32.2

type Arguments = map[string]interface{}

Arguments maps the string key corresponding to each middleware's identifier to its specific arguments

type DataConnection added in v0.32.32

type DataConnection struct {
	Source *Run
	Target *Run
	Label  *string
}

func NewDataConnection added in v0.32.32

func NewDataConnection(source *Run, target *Run, label string) *DataConnection

type DefaultSettings

type DefaultSettings struct {
	Command string
	Dir     string
}

DefaultSettings are file-level options (to be refined in future)

type Definition added in v0.32.2

type Definition struct {
	BuiltIn             bool
	DefinitionArguments map[string]interface{}
	FileName            string
	Public              bool
}

Definition defines default arguments for a pipeline identifier

func NewDefinition added in v0.32.2

func NewDefinition(
	arguments map[string]interface{},
	pipelineFileName string,
	isPublic bool,
	isBuiltIn bool,
) *Definition

NewDefinition creates a new Definition

type DefinitionsLookup added in v0.32.2

type DefinitionsLookup = map[string][]Definition

DefinitionsLookup maps identifiers to their definitions

func MergePipelineDefinitions

func MergePipelineDefinitions(definition1 DefinitionsLookup, definition2 DefinitionsLookup) DefinitionsLookup

MergePipelineDefinitions merges two definition lookups into a single one

type File

type File struct {
	Default  DefaultSettings
	Path     string
	FileName string
	Hooks    HookDefinitions
	Import   []string
	// in pipeline files, each command can have arbitrary parameters
	// it may also have "steps"
	// each step can be either a string referencing another pipeline
	// or a dictionary containing additional parameters
	Public  map[string]map[string]interface{}
	Private map[string]map[string]interface{}
}

File is a representation of a yaml pipeline file as a Go struct

Note that this struct does not have exactly the same structure as the yaml file

type FileImportSkeleton added in v0.32.2

type FileImportSkeleton struct {
	Import []string
}

FileImportSkeleton is a very basic representation of a yaml pipeline file concerned only with import declarations

type HookDefinitions

type HookDefinitions = map[string][]string

HookDefinitions are not currently used

type Logger added in v0.32.2

type Logger struct {
	Indentation int

	ErrorCallback func(error)
	// contains filtered or unexported fields
}

Logger keeps track of a pipeline run's log entries while several pipeline runs might be executing asynchronously. It needs to: - implement the io.Reader interface - be non-blocking, even when its output is not yet being read - keep a running total of the number of log entries at each log level, as well as all errors - be nestable in the sense that the output of another logger can be slotted in - preserve the order of log entries, including those slotted in

func NewClosedLoggerWithResult added in v0.32.2

func NewClosedLoggerWithResult(buffer *bytes.Buffer) *Logger

NewClosedLoggerWithResult creates a new Logger that is already closed with the specified result

func NewLogger added in v0.32.2

func NewLogger(run *Run, indentation int) *Logger

NewLogger creates a new Logger

func (*Logger) AddReaderEntry added in v0.32.2

func (logger *Logger) AddReaderEntry(reader io.Reader)

AddReaderEntry adds an entry that will write the entire contents of the provided reader before proceeding to the next entry

func (*Logger) AddWriteCloserEntry added in v0.32.2

func (logger *Logger) AddWriteCloserEntry() io.WriteCloser

AddWriteCloserEntry adds an entry that will write everything written to the writer to the log before proceeding to the next entry

You must close the io.WriteCloser to indicate that you are done writing, so that the log can move on.

func (*Logger) AllErrorMessages added in v0.32.2

func (logger *Logger) AllErrorMessages() []string

AllErrorMessages returns all errors logged up to this point

func (*Logger) Close added in v0.32.2

func (logger *Logger) Close()

Close finalizes the log, preventing further entries from being logged

func (*Logger) Closed added in v0.32.2

func (logger *Logger) Closed() bool

Closed indicates whether the run has been closed

func (*Logger) Debug added in v0.32.2

func (logger *Logger) Debug(logFields ...fields.LogEntryField)

Debug adds an appropriate entry for a debug message

func (*Logger) DebugCount added in v0.32.2

func (logger *Logger) DebugCount() int

DebugCount is the total number of debug logs encountered

func (*Logger) Error added in v0.32.2

func (logger *Logger) Error(err error, logFields ...fields.LogEntryField)

Error adds an appropriate entry for an encountered error

func (*Logger) ErrorCount added in v0.32.2

func (logger *Logger) ErrorCount() int

ErrorCount is the total number of error logs encountered

func (*Logger) Info added in v0.32.2

func (logger *Logger) Info(logFields ...fields.LogEntryField)

Info adds an appropriate entry for non-critical information

func (*Logger) InfoCount added in v0.32.2

func (logger *Logger) InfoCount() int

InfoCount is the total number of info logs encountered

func (*Logger) LastError added in v0.32.2

func (logger *Logger) LastError() error

LastError returns the most recent error level log entry

func (*Logger) Level added in v0.32.2

func (logger *Logger) Level() logrus.Level

Level indicates the log level

func (*Logger) PossibleError added in v0.32.2

func (logger *Logger) PossibleError(err error)

PossibleError does nothing if it is passed nil and otherwise logs the provided error

func (*Logger) PossibleErrorWithExplanation added in v0.32.2

func (logger *Logger) PossibleErrorWithExplanation(err error, explanation string)

PossibleErrorWithExplanation does nothing if it is passed nil and otherwise logs the provided error with an additional explanation

func (*Logger) Read added in v0.32.2

func (logger *Logger) Read(p []byte) (int, error)

Read writes an entry, or part of an entry to the provided buffer

io.EOF indicates that the log is closed and all log entries have been read.

func (*Logger) SetLevel added in v0.32.2

func (logger *Logger) SetLevel(level logrus.Level)

SetLevel sets the logger's log level

func (*Logger) StderrOutput added in v0.32.2

func (logger *Logger) StderrOutput(message string, logFields ...fields.LogEntryField)

StderrOutput adds an appropriate entry for non-trivial stderr output to the logs

func (*Logger) String added in v0.32.2

func (logger *Logger) String() string

String returns the logger's total output as a string

Note that this will consume the output, competing with other reads, so String should only be called once and not in conjunction with Read.

func (*Logger) Summary added in v0.32.2

func (logger *Logger) Summary() string

Summary returns a human-readable short description of the logged warnings and errors

func (*Logger) Trace added in v0.32.2

func (logger *Logger) Trace(logFields ...fields.LogEntryField)

Trace adds an appropriate entry for a trace message

func (*Logger) TraceCount added in v0.32.2

func (logger *Logger) TraceCount() int

TraceCount is the total number of trace logs encountered

func (*Logger) Warn added in v0.32.2

func (logger *Logger) Warn(logFields ...fields.LogEntryField)

Warn adds an appropriate entry for an encountered warning

func (*Logger) WarnCount added in v0.32.2

func (logger *Logger) WarnCount() int

WarnCount is the total number of warning logs encountered

type Reference added in v0.32.2

type Reference = map[*string]Arguments

Reference is a map containing a single value indexed by the pipeline's identifier (possibly nil)

type Run

type Run struct {

	// Identifier is a unique name for pipeline to be executed
	//
	// Note that anonymous pipes without an identifier can have invocation arguments, but no definition
	Identifier *string
	Id         string
	// Definition references the definition matching the pipeline identifier, if any
	Definition *Definition
	// InvocationArguments are passed to the pipe at the time of invocation / run creation
	InvocationArguments map[string]interface{}

	// Stdin is a data stream through which the run's input is passed
	Stdin *datastream.ComposableDataStream
	// Stdout is a data stream through which the run's output is passed
	Stdout *datastream.ComposableDataStream
	// Stderr is a data stream through which the run's stderr output is passed
	Stderr *datastream.ComposableDataStream
	// ExitCode is the exit code of the run's shell command, if any
	ExitCode *int

	// Log is the dedicated logger for this run
	//
	// We need to organize our logs by run, so that the order of entries remains consistent
	// during parallel execution of several pipelines.
	Log *Logger

	// Parent is run that started this run, if any
	Parent *Run

	// LogClosingWaitGroup will keep the Log available to be written to, even if the run's shell command has completed
	//
	// This is needed if further log entries might have to be added after shell command execution.
	LogClosingWaitGroup *sync.WaitGroup

	// StartWaitGroup defers the start of the run's shell command execution
	//
	// Middleware might use this to ensure that the shell command is only started
	// when all required data (e.g. environment variables set by another run) is available.
	StartWaitGroup *sync.WaitGroup
	// contains filtered or unexported fields
}

Run contains everything needed to actually execute the invocation of a pipe

The middleware operates on these objects, triggering further runs or shell invocations there are three steps to this process:

  1. Setup In the setup phase the arguments, connections between inputs and outputs, etc. of each run are defined.
  2. Finalization After the setup, Close() is called to prevent any further changes to input/output connections.
  3. Execution The shell command is executed and data is piped through the defined input/output connections. Note that some middleware might start additional runs in the execution phase. For example, the `when` middleware for conditional execution will trigger runs based on whether the result of previous runs satisfies a certain condition

func NewRun added in v0.32.2

func NewRun(
	identifier *string,
	invocationArguments map[string]interface{},
	definition *Definition,
	parent *Run,
) (*Run, error)

NewRun creates a new Run with the specified identifier, invocation arguments, definition and parent

All passed parameters are optional.

func (*Run) AddCancelHook

func (run *Run) AddCancelHook(cancelHook func() error)

AddCancelHook adds a hook that will be executed when the run is cancelled

Use this to implement cancel functionality in middleware.

func (*Run) ArgumentAtPath

func (run *Run) ArgumentAtPath(path ...string) (interface{}, error)

ArgumentAtPath returns the value of the run's arguments at the specified path

func (*Run) ArgumentAtPathIncludingParents

func (run *Run) ArgumentAtPathIncludingParents(path ...string) (interface{}, error)

ArgumentAtPathIncludingParents looks up the argument path within the run's arguments or, failing that, its parents

ArgumentAtPathIncludingParents will keep traversing parents until a value is found

func (*Run) ArgumentsCopy

func (run *Run) ArgumentsCopy() map[string]interface{}

ArgumentsCopy is a deep copy of the run's arguments that can be safely mutated

func (*Run) Cancel

func (run *Run) Cancel() error

Cancel cancels the run without waiting for execution to complete

func (*Run) Cancelled

func (run *Run) Cancelled() bool

Cancelled indicates whether the run has been cancelled

func (*Run) Close

func (run *Run) Close()

Close closes the Run's input, output, sterr data streams and log, which is required for execution & completion

It is fine to call Close multiple times. After the first, subsequent calls will have no effect.

func (*Run) Completed

func (run *Run) Completed() bool

Completed indicates whether the run has finished executing, logging etc.

func (*Run) DisplayString added in v0.32.32

func (run *Run) DisplayString() string

func (*Run) GraphGroup added in v0.32.32

func (run *Run) GraphGroup() string

func (*Run) GraphLabel added in v0.32.32

func (run *Run) GraphLabel() string

func (*Run) HaveArgumentAtPath added in v0.32.32

func (run *Run) HaveArgumentAtPath(path ...string) bool

HaveArgumentAtPath indicates whether the run's arguments contain a value at the specified path

func (*Run) RemoveArgumentAtPath added in v0.32.12

func (run *Run) RemoveArgumentAtPath(path ...string) error

RemoveArgumentAtPath removes the run's argument at the specified path

func (*Run) SetArgumentAtPath

func (run *Run) SetArgumentAtPath(value interface{}, path ...string) error

SetArgumentAtPath overwrites the run's argument at the specified path, creating additional levels of nesting if required

func (*Run) SetArguments

func (run *Run) SetArguments(value map[string]interface{})

SetArguments overwrites the run's arguments entirely

func (*Run) String

func (run *Run) String() string

String returns a string description of the run suitable for logging

The value will keep changing until the run has completed

func (*Run) Wait

func (run *Run) Wait()

Wait halts execution until the run has completed

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL