pipeline

package
v0.0.0-...-6ec288f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 13, 2023 License: BSD-3-Clause Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ResType    = "stream"
	ResTypeOld = "pipeline"
)

Variables

This section is empty.

Functions

func ExecuteCurrentStage

func ExecuteCurrentStage(ctx *ExecutionContext) (done bool, err error)

func GetDataResolver

func GetDataResolver() resolve.CompositeResolver

func NewResourceLoader

func NewResourceLoader(mapperFactory mapper.Factory, resolver resolve.CompositeResolver) resource.Loader

func Resume

func Resume(ctx *ExecutionContext) error

func ResumeCurrentStage

func ResumeCurrentStage(ctx *ExecutionContext) (done bool, err error)

Types

type BasicRemotePipelineProvider

type BasicRemotePipelineProvider struct {
}

func (*BasicRemotePipelineProvider) GetPipeline

func (*BasicRemotePipelineProvider) GetPipeline(streamURI string) (*DefinitionConfig, error)

todo this can be generalized an shared with flow

type Definition

type Definition struct {
	// contains filtered or unexported fields
}

func NewDefinition

func NewDefinition(config *DefinitionConfig, mf mapper.Factory, resolver resolve.CompositeResolver) (*Definition, error)

func (*Definition) Cleanup

func (d *Definition) Cleanup() error

func (*Definition) Id

func (d *Definition) Id() string

func (*Definition) Metadata

func (d *Definition) Metadata() *metadata.IOMetadata

Metadata returns IO metadata for the pipeline

func (*Definition) Name

func (d *Definition) Name() string

type DefinitionConfig

type DefinitionConfig struct {
	Name     string               `json:"name"`
	Metadata *metadata.IOMetadata `json:"metadata"`
	Stages   []*StageConfig       `json:"stages"`
	// contains filtered or unexported fields
}

type ExecutionContext

type ExecutionContext struct {
	// contains filtered or unexported fields
}

func (*ExecutionContext) ActivityHost

func (eCtx *ExecutionContext) ActivityHost() activity.Host

func (*ExecutionContext) CancelTimer

func (eCtx *ExecutionContext) CancelTimer(repeating bool)

CancelTimer cancels the existing timer

func (*ExecutionContext) CreateTimer

func (eCtx *ExecutionContext) CreateTimer(interval time.Duration, callback support.TimerCallback, repeating bool) error

CreateTimer creates a timer, note: can only have one active timer at a time for an activity

func (*ExecutionContext) GetInput

func (eCtx *ExecutionContext) GetInput(name string) interface{}

func (*ExecutionContext) GetInputObject

func (eCtx *ExecutionContext) GetInputObject(input data.StructValue) error

func (*ExecutionContext) GetOutput

func (eCtx *ExecutionContext) GetOutput(name string) interface{}

func (*ExecutionContext) GetSetting

func (eCtx *ExecutionContext) GetSetting(setting string) (value interface{}, exists bool)

func (*ExecutionContext) GetSharedTempData

func (eCtx *ExecutionContext) GetSharedTempData() map[string]interface{}

func (*ExecutionContext) GetTracingContext

func (eCtx *ExecutionContext) GetTracingContext() trace.TracingContext

func (*ExecutionContext) HasTimer

func (eCtx *ExecutionContext) HasTimer(repeating bool) bool

HasTimer indicates if a timer already exists

func (*ExecutionContext) ID

func (eCtx *ExecutionContext) ID() string

func (*ExecutionContext) IOMetadata

func (eCtx *ExecutionContext) IOMetadata() *metadata.IOMetadata

func (*ExecutionContext) Logger

func (eCtx *ExecutionContext) Logger() log.Logger

func (*ExecutionContext) Name

func (eCtx *ExecutionContext) Name() string

func (*ExecutionContext) Reply

func (eCtx *ExecutionContext) Reply(replyData map[string]interface{}, err error)

func (*ExecutionContext) Return

func (eCtx *ExecutionContext) Return(returnData map[string]interface{}, err error)

func (*ExecutionContext) Scope

func (eCtx *ExecutionContext) Scope() data.Scope

func (*ExecutionContext) SetOutput

func (eCtx *ExecutionContext) SetOutput(name string, value interface{}) error

func (*ExecutionContext) SetOutputObject

func (eCtx *ExecutionContext) SetOutputObject(output data.StructValue) error

func (*ExecutionContext) Status

func (eCtx *ExecutionContext) Status() ExecutionStatus

func (*ExecutionContext) UpdateTimer

func (eCtx *ExecutionContext) UpdateTimer(repeating bool)

CreateTimer creates a timer, note: can only have one active timer at a time for an activity

func (*ExecutionContext) UpdateTimers

func (eCtx *ExecutionContext) UpdateTimers()

CreateTimer creates a timer, note: can only have one active timer at a time for an activity

type ExecutionStatus

type ExecutionStatus int
const (
	// ExecStatusNotStarted indicates that the Pipeline execution has not started
	ExecStatusNotStarted ExecutionStatus = 0

	// ExecStatusActive indicates that the Pipeline execution is active
	ExecStatusActive ExecutionStatus = 100

	// ExecStatusStalled indicates that the Pipeline execution has stalled
	ExecStatusStalled ExecutionStatus = 400

	// ExecStatusCompleted indicates that the Pipeline execution has been completed
	ExecStatusCompleted ExecutionStatus = 500

	// ExecStatusCancelled indicates that the Pipeline execution has been cancelled
	ExecStatusCancelled ExecutionStatus = 600

	// ExecStatusFailed indicates that the Pipeline execution has failed
	ExecStatusFailed ExecutionStatus = 700
)

type Instance

type Instance struct {
	// contains filtered or unexported fields
}

func NewInstance

func NewInstance(definition *Definition, id string, single bool, outChannel channels.Channel, logger log.Logger) *Instance

func (*Instance) DoStep

func (inst *Instance) DoStep(ctx *ExecutionContext, resume bool) (hasWork bool, err error)

func (*Instance) Id

func (inst *Instance) Id() string

func (*Instance) PipelineId

func (inst *Instance) PipelineId() string

func (*Instance) Run

func (inst *Instance) Run(discriminator string, input map[string]interface{}) (output map[string]interface{}, status ExecutionStatus, err error)

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

func NewManager

func NewManager() *Manager

func (*Manager) GetPipeline

func (m *Manager) GetPipeline(uri string) (*Definition, error)

type MultiScope

type MultiScope interface {
	GetValueByScope(scopeId ScopeId, name string) (value interface{}, exists bool)
}

type MultiScopeResolver

type MultiScopeResolver struct {
	// contains filtered or unexported fields
}

func (*MultiScopeResolver) GetResolverInfo

func (r *MultiScopeResolver) GetResolverInfo() *resolve.ResolverInfo

func (*MultiScopeResolver) Resolve

func (r *MultiScopeResolver) Resolve(scope data.Scope, itemName, valueName string) (interface{}, error)

type ResourceLoader

type ResourceLoader struct {
	// contains filtered or unexported fields
}

func (*ResourceLoader) LoadResource

func (rl *ResourceLoader) LoadResource(config *resource.Config) (*resource.Resource, error)

type ScopeId

type ScopeId int
const (
	ScopeDefault ScopeId = iota
	ScopePipeline
	ScopePassthru
)

func (ScopeId) String

func (t ScopeId) String() string

type SharedScope

type SharedScope struct {
	// contains filtered or unexported fields
}

func (*SharedScope) GetValue

func (s *SharedScope) GetValue(name string) (value interface{}, exists bool)

func (*SharedScope) SetValue

func (s *SharedScope) SetValue(name string, value interface{}) error

type Stage

type Stage struct {
	// contains filtered or unexported fields
}

func NewStage

func NewStage(config *StageConfig, mf mapper.Factory, resolver resolve.CompositeResolver) (*Stage, error)

type StageConfig

type StageConfig struct {
	*activity.Config

	Promotions []string `json:"addToPipeline,omitempty"`
}

type StageInputScope

type StageInputScope struct {
	// contains filtered or unexported fields
}

SimpleScope is a basic implementation of a scope

func (*StageInputScope) GetValue

func (s *StageInputScope) GetValue(name string) (value interface{}, exists bool)

func (*StageInputScope) GetValueByScope

func (s *StageInputScope) GetValueByScope(scopeId ScopeId, name string) (value interface{}, exists bool)

func (*StageInputScope) SetValue

func (s *StageInputScope) SetValue(name string, value interface{}) error

type StageOutputScope

type StageOutputScope struct {
	// contains filtered or unexported fields
}

SimpleScope is a basic implementation of a scope

func (*StageOutputScope) GetValue

func (s *StageOutputScope) GetValue(name string) (value interface{}, exists bool)

func (*StageOutputScope) GetValueByScope

func (s *StageOutputScope) GetValueByScope(scopeId ScopeId, name string) (value interface{}, exists bool)

func (*StageOutputScope) SetValue

func (s *StageOutputScope) SetValue(name string, value interface{}) error

type State

type State interface {
	GetScope() data.Scope

	//GetSharedTempData gets the activity instance specific shared data
	GetSharedData(act activity.Activity) map[string]interface{}

	NewTicker(act activity.Activity, interval time.Duration) (*TickerHolder, error)

	GetTicker(act activity.Activity) (*TickerHolder, bool)

	RemoveTicker(act activity.Activity) bool

	NewTimer(act activity.Activity, interval time.Duration) (*TimerHolder, error)

	GetTimer(act activity.Activity) (*TimerHolder, bool)

	RemoveTimer(act activity.Activity) bool
}

type StateManager

type StateManager interface {
	GetState(id string) State
}

func NewMultiStateManager

func NewMultiStateManager() StateManager

func NewSimpleStateManager

func NewSimpleStateManager() StateManager

type Status

type Status int
const (
	// StatusNotStarted indicates that the Pipeline has not started
	StatusNotStarted Status = 0

	// StatusActive indicates that the Pipeline is active
	StatusActive Status = 100

	// StatusDone indicates that the Pipeline is done
	StatusDone Status = 500
)

type TickerHolder

type TickerHolder struct {
	// contains filtered or unexported fields
}

func (*TickerHolder) GetLastExecCtx

func (t *TickerHolder) GetLastExecCtx() *ExecutionContext

func (*TickerHolder) GetTicker

func (t *TickerHolder) GetTicker() *time.Ticker

func (*TickerHolder) SetLastExecCtx

func (t *TickerHolder) SetLastExecCtx(ctx *ExecutionContext)

type TimerHolder

type TimerHolder struct {
	// contains filtered or unexported fields
}

func (*TimerHolder) GetLastExecCtx

func (t *TimerHolder) GetLastExecCtx() *ExecutionContext

func (*TimerHolder) GetTimer

func (t *TimerHolder) GetTimer() *time.Timer

func (*TimerHolder) SetLastExecCtx

func (t *TimerHolder) SetLastExecCtx(ctx *ExecutionContext)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL