Documentation ¶
Index ¶
- Constants
- func ExecuteCurrentStage(ctx *ExecutionContext) (done bool, err error)
- func GetDataResolver() resolve.CompositeResolver
- func NewResourceLoader(mapperFactory mapper.Factory, resolver resolve.CompositeResolver) resource.Loader
- func Resume(ctx *ExecutionContext) error
- func ResumeCurrentStage(ctx *ExecutionContext) (done bool, err error)
- type BasicRemotePipelineProvider
- type Definition
- type DefinitionConfig
- type ExecutionContext
- func (eCtx *ExecutionContext) ActivityHost() activity.Host
- func (eCtx *ExecutionContext) CancelTimer(repeating bool)
- func (eCtx *ExecutionContext) CreateTimer(interval time.Duration, callback support.TimerCallback, repeating bool) error
- func (eCtx *ExecutionContext) GetInput(name string) interface{}
- func (eCtx *ExecutionContext) GetInputObject(input data.StructValue) error
- func (eCtx *ExecutionContext) GetOutput(name string) interface{}
- func (eCtx *ExecutionContext) GetSetting(setting string) (value interface{}, exists bool)
- func (eCtx *ExecutionContext) GetSharedTempData() map[string]interface{}
- func (eCtx *ExecutionContext) GetTracingContext() trace.TracingContext
- func (eCtx *ExecutionContext) HasTimer(repeating bool) bool
- func (eCtx *ExecutionContext) ID() string
- func (eCtx *ExecutionContext) IOMetadata() *metadata.IOMetadata
- func (eCtx *ExecutionContext) Logger() log.Logger
- func (eCtx *ExecutionContext) Name() string
- func (eCtx *ExecutionContext) Reply(replyData map[string]interface{}, err error)
- func (eCtx *ExecutionContext) Return(returnData map[string]interface{}, err error)
- func (eCtx *ExecutionContext) Scope() data.Scope
- func (eCtx *ExecutionContext) SetOutput(name string, value interface{}) error
- func (eCtx *ExecutionContext) SetOutputObject(output data.StructValue) error
- func (eCtx *ExecutionContext) Status() ExecutionStatus
- func (eCtx *ExecutionContext) UpdateTimer(repeating bool)
- func (eCtx *ExecutionContext) UpdateTimers()
- type ExecutionStatus
- type Instance
- func (inst *Instance) DoStep(ctx *ExecutionContext, resume bool) (hasWork bool, err error)
- func (inst *Instance) Id() string
- func (inst *Instance) PipelineId() string
- func (inst *Instance) Run(discriminator string, input map[string]interface{}) (output map[string]interface{}, status ExecutionStatus, err error)
- type Manager
- type MultiScope
- type MultiScopeResolver
- type ResourceLoader
- type ScopeId
- type SharedScope
- type Stage
- type StageConfig
- type StageInputScope
- type StageOutputScope
- type State
- type StateManager
- type Status
- type TickerHolder
- type TimerHolder
Constants ¶
View Source
const ( ResType = "stream" ResTypeOld = "pipeline" )
Variables ¶
This section is empty.
Functions ¶
func ExecuteCurrentStage ¶
func ExecuteCurrentStage(ctx *ExecutionContext) (done bool, err error)
func GetDataResolver ¶
func GetDataResolver() resolve.CompositeResolver
func NewResourceLoader ¶
func Resume ¶
func Resume(ctx *ExecutionContext) error
func ResumeCurrentStage ¶
func ResumeCurrentStage(ctx *ExecutionContext) (done bool, err error)
Types ¶
type BasicRemotePipelineProvider ¶
type BasicRemotePipelineProvider struct { }
func (*BasicRemotePipelineProvider) GetPipeline ¶
func (*BasicRemotePipelineProvider) GetPipeline(streamURI string) (*DefinitionConfig, error)
todo this can be generalized an shared with flow
type Definition ¶
type Definition struct {
// contains filtered or unexported fields
}
func NewDefinition ¶
func NewDefinition(config *DefinitionConfig, mf mapper.Factory, resolver resolve.CompositeResolver) (*Definition, error)
func (*Definition) Cleanup ¶
func (d *Definition) Cleanup() error
func (*Definition) Id ¶
func (d *Definition) Id() string
func (*Definition) Metadata ¶
func (d *Definition) Metadata() *metadata.IOMetadata
Metadata returns IO metadata for the pipeline
func (*Definition) Name ¶
func (d *Definition) Name() string
type DefinitionConfig ¶
type DefinitionConfig struct { Name string `json:"name"` Metadata *metadata.IOMetadata `json:"metadata"` Stages []*StageConfig `json:"stages"` // contains filtered or unexported fields }
type ExecutionContext ¶
type ExecutionContext struct {
// contains filtered or unexported fields
}
func (*ExecutionContext) ActivityHost ¶
func (eCtx *ExecutionContext) ActivityHost() activity.Host
func (*ExecutionContext) CancelTimer ¶
func (eCtx *ExecutionContext) CancelTimer(repeating bool)
CancelTimer cancels the existing timer
func (*ExecutionContext) CreateTimer ¶
func (eCtx *ExecutionContext) CreateTimer(interval time.Duration, callback support.TimerCallback, repeating bool) error
CreateTimer creates a timer, note: can only have one active timer at a time for an activity
func (*ExecutionContext) GetInput ¶
func (eCtx *ExecutionContext) GetInput(name string) interface{}
func (*ExecutionContext) GetInputObject ¶
func (eCtx *ExecutionContext) GetInputObject(input data.StructValue) error
func (*ExecutionContext) GetOutput ¶
func (eCtx *ExecutionContext) GetOutput(name string) interface{}
func (*ExecutionContext) GetSetting ¶
func (eCtx *ExecutionContext) GetSetting(setting string) (value interface{}, exists bool)
func (*ExecutionContext) GetSharedTempData ¶
func (eCtx *ExecutionContext) GetSharedTempData() map[string]interface{}
func (*ExecutionContext) GetTracingContext ¶
func (eCtx *ExecutionContext) GetTracingContext() trace.TracingContext
func (*ExecutionContext) HasTimer ¶
func (eCtx *ExecutionContext) HasTimer(repeating bool) bool
HasTimer indicates if a timer already exists
func (*ExecutionContext) ID ¶
func (eCtx *ExecutionContext) ID() string
func (*ExecutionContext) IOMetadata ¶
func (eCtx *ExecutionContext) IOMetadata() *metadata.IOMetadata
func (*ExecutionContext) Logger ¶
func (eCtx *ExecutionContext) Logger() log.Logger
func (*ExecutionContext) Name ¶
func (eCtx *ExecutionContext) Name() string
func (*ExecutionContext) Reply ¶
func (eCtx *ExecutionContext) Reply(replyData map[string]interface{}, err error)
func (*ExecutionContext) Return ¶
func (eCtx *ExecutionContext) Return(returnData map[string]interface{}, err error)
func (*ExecutionContext) Scope ¶
func (eCtx *ExecutionContext) Scope() data.Scope
func (*ExecutionContext) SetOutput ¶
func (eCtx *ExecutionContext) SetOutput(name string, value interface{}) error
func (*ExecutionContext) SetOutputObject ¶
func (eCtx *ExecutionContext) SetOutputObject(output data.StructValue) error
func (*ExecutionContext) Status ¶
func (eCtx *ExecutionContext) Status() ExecutionStatus
func (*ExecutionContext) UpdateTimer ¶
func (eCtx *ExecutionContext) UpdateTimer(repeating bool)
CreateTimer creates a timer, note: can only have one active timer at a time for an activity
func (*ExecutionContext) UpdateTimers ¶
func (eCtx *ExecutionContext) UpdateTimers()
CreateTimer creates a timer, note: can only have one active timer at a time for an activity
type ExecutionStatus ¶
type ExecutionStatus int
const ( // ExecStatusNotStarted indicates that the Pipeline execution has not started ExecStatusNotStarted ExecutionStatus = 0 // ExecStatusActive indicates that the Pipeline execution is active ExecStatusActive ExecutionStatus = 100 // ExecStatusStalled indicates that the Pipeline execution has stalled ExecStatusStalled ExecutionStatus = 400 // ExecStatusCompleted indicates that the Pipeline execution has been completed ExecStatusCompleted ExecutionStatus = 500 // ExecStatusCancelled indicates that the Pipeline execution has been cancelled ExecStatusCancelled ExecutionStatus = 600 // ExecStatusFailed indicates that the Pipeline execution has failed ExecStatusFailed ExecutionStatus = 700 )
type Instance ¶
type Instance struct {
// contains filtered or unexported fields
}
func NewInstance ¶
func (*Instance) DoStep ¶
func (inst *Instance) DoStep(ctx *ExecutionContext, resume bool) (hasWork bool, err error)
func (*Instance) PipelineId ¶
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
func NewManager ¶
func NewManager() *Manager
func (*Manager) GetPipeline ¶
func (m *Manager) GetPipeline(uri string) (*Definition, error)
type MultiScope ¶
type MultiScopeResolver ¶
type MultiScopeResolver struct {
// contains filtered or unexported fields
}
func (*MultiScopeResolver) GetResolverInfo ¶
func (r *MultiScopeResolver) GetResolverInfo() *resolve.ResolverInfo
type ResourceLoader ¶
type ResourceLoader struct {
// contains filtered or unexported fields
}
func (*ResourceLoader) LoadResource ¶
type SharedScope ¶
type SharedScope struct {
// contains filtered or unexported fields
}
func (*SharedScope) GetValue ¶
func (s *SharedScope) GetValue(name string) (value interface{}, exists bool)
func (*SharedScope) SetValue ¶
func (s *SharedScope) SetValue(name string, value interface{}) error
type Stage ¶
type Stage struct {
// contains filtered or unexported fields
}
func NewStage ¶
func NewStage(config *StageConfig, mf mapper.Factory, resolver resolve.CompositeResolver) (*Stage, error)
type StageConfig ¶
type StageInputScope ¶
type StageInputScope struct {
// contains filtered or unexported fields
}
SimpleScope is a basic implementation of a scope
func (*StageInputScope) GetValue ¶
func (s *StageInputScope) GetValue(name string) (value interface{}, exists bool)
func (*StageInputScope) GetValueByScope ¶
func (s *StageInputScope) GetValueByScope(scopeId ScopeId, name string) (value interface{}, exists bool)
func (*StageInputScope) SetValue ¶
func (s *StageInputScope) SetValue(name string, value interface{}) error
type StageOutputScope ¶
type StageOutputScope struct {
// contains filtered or unexported fields
}
SimpleScope is a basic implementation of a scope
func (*StageOutputScope) GetValue ¶
func (s *StageOutputScope) GetValue(name string) (value interface{}, exists bool)
func (*StageOutputScope) GetValueByScope ¶
func (s *StageOutputScope) GetValueByScope(scopeId ScopeId, name string) (value interface{}, exists bool)
func (*StageOutputScope) SetValue ¶
func (s *StageOutputScope) SetValue(name string, value interface{}) error
type State ¶
type State interface { GetScope() data.Scope GetSharedData(act activity.Activity) map[string]interface{} NewTicker(act activity.Activity, interval time.Duration) (*TickerHolder, error) GetTicker(act activity.Activity) (*TickerHolder, bool) RemoveTicker(act activity.Activity) bool NewTimer(act activity.Activity, interval time.Duration) (*TimerHolder, error) GetTimer(act activity.Activity) (*TimerHolder, bool) RemoveTimer(act activity.Activity) bool }
type StateManager ¶
func NewMultiStateManager ¶
func NewMultiStateManager() StateManager
func NewSimpleStateManager ¶
func NewSimpleStateManager() StateManager
type TickerHolder ¶
type TickerHolder struct {
// contains filtered or unexported fields
}
func (*TickerHolder) GetLastExecCtx ¶
func (t *TickerHolder) GetLastExecCtx() *ExecutionContext
func (*TickerHolder) GetTicker ¶
func (t *TickerHolder) GetTicker() *time.Ticker
func (*TickerHolder) SetLastExecCtx ¶
func (t *TickerHolder) SetLastExecCtx(ctx *ExecutionContext)
type TimerHolder ¶
type TimerHolder struct {
// contains filtered or unexported fields
}
func (*TimerHolder) GetLastExecCtx ¶
func (t *TimerHolder) GetLastExecCtx() *ExecutionContext
func (*TimerHolder) GetTimer ¶
func (t *TimerHolder) GetTimer() *time.Timer
func (*TimerHolder) SetLastExecCtx ¶
func (t *TimerHolder) SetLastExecCtx(ctx *ExecutionContext)
Source Files ¶
Click to show internal directories.
Click to hide internal directories.