Documentation ¶
Index ¶
- Constants
- Variables
- func CreateManagerServiceClient(config *config) (sparkv1.ManagerServiceClient, error)
- func NewSparkMetadataFromGrpcRequest(ctx context.Context, req *sparkv1.ExecuteJobRequest) sparkMetadata
- type Behaviour
- type Bindable
- type BindableConfig
- type Builder
- type BuilderChain
- type Chain
- type ChainCancelled
- type ChainCancelledOrComplete
- type ChainCompensate
- type ChainComplete
- type ChainFinalizer
- type ChainNode
- type ChainReport
- type ChainReportNode
- type ChainReportStage
- type ChainStage
- type ChainStageAny
- type CompleteContext
- type CompleteDefinitionFn
- type ConfigType
- type Context
- type DelegateCompleteDefinitionFn
- type DelegateStageDefinitionFn
- type ErrorOption
- type Gettable
- type IOHandler
- type InMemoryStageProgressHandler
- func (i *InMemoryStageProgressHandler) AddBehaviour() *Behaviour
- func (i *InMemoryStageProgressHandler) AssertStageCancelled(jobKey, stageName string)
- func (i *InMemoryStageProgressHandler) AssertStageCompleted(jobKey, stageName string)
- func (i *InMemoryStageProgressHandler) AssertStageFailed(jobKey, stageName string)
- func (i *InMemoryStageProgressHandler) AssertStageResult(jobKey, stageName string, expectedStageResult any)
- func (i *InMemoryStageProgressHandler) AssertStageSkipped(jobKey, stageName string)
- func (i *InMemoryStageProgressHandler) AssertStageStarted(jobKey, stageName string)
- func (i *InMemoryStageProgressHandler) AssertStageUnspecified(jobKey, stageName string)
- func (i *InMemoryStageProgressHandler) Get(jobKey, name string) (*sparkv1.StageStatus, error)
- func (i *InMemoryStageProgressHandler) GetResult(jobKey, name string) Bindable
- func (i *InMemoryStageProgressHandler) ResetBehaviour()
- func (i *InMemoryStageProgressHandler) Set(stageStatus *sparkv1.SetStageStatusRequest) error
- func (i *InMemoryStageProgressHandler) SetJobStatus(jobStatus *sparkv1.SetJobStatusRequest) error
- func (i *InMemoryStageProgressHandler) SetResult(result *sparkv1.SetStageResultRequest) error
- type InitContext
- type Input
- type Inputs
- type Logger
- type Option
- func WithConfiguration(b []byte, t ConfigType) Option
- func WithDelegateCompletion(delegate DelegateCompleteDefinitionFn) Option
- func WithDelegateStage(delegate DelegateStageDefinitionFn) Option
- func WithIOHandler(vh IOHandler) Option
- func WithLog(log Logger) Option
- func WithStageProgressHandler(sph StageProgressHandler) Option
- type ResultBehaviourParams
- type RetryConfig
- type Spark
- type SparkContext
- type StageBehaviourParams
- type StageContext
- type StageDefinitionFn
- type StageError
- type StageOption
- type StageOptionParams
- type StageProgressHandler
- type TestIOHandler
- type TestStageProgressHandler
- type Var
- type Worker
Constants ¶
const ( NoMimeType = "" MimeTypeJSON = "application/json" )
Variables ¶
var ( ErrStageDoesNotExist = errors.New("stage does not exists") ErrBindValueFailed = errors.New("bind value failed") ErrVariableNotFound = errors.New("variable not found") ErrStageNotFoundInNodeChain = errors.New("stage not found in the node chain") ErrConditionalStageSkipped = errors.New("conditional stage execution") ErrChainIsNotValid = errors.New("chain is not valid") ErrInputVariableNotFound = errors.New("input variable not found") )
var CompleteError = func(ctx CompleteContext) StageError { return NewStageError(errors.New("complete failed")) }
var CompleteSuccess = func(ctx CompleteContext) StageError { return nil }
Functions ¶
func CreateManagerServiceClient ¶
func CreateManagerServiceClient(config *config) (sparkv1.ManagerServiceClient, error)
func NewSparkMetadataFromGrpcRequest ¶
func NewSparkMetadataFromGrpcRequest(ctx context.Context, req *sparkv1.ExecuteJobRequest) sparkMetadata
Types ¶
type Behaviour ¶ added in v1.6.0
type Behaviour struct {
// contains filtered or unexported fields
}
func (*Behaviour) Set ¶ added in v1.6.0
func (b *Behaviour) Set(stageName string, status sparkv1.StageStatus, err error) *InMemoryStageProgressHandler
type BindableConfig ¶ added in v1.6.0
type Builder ¶
type Builder interface { NewChain(name string) BuilderChain ChainFinalizer }
Builder contract for the chain builder
type Chain ¶
type Chain interface {
// contains filtered or unexported methods
}
Chain finalizes a node in the chain, used internally to build a part of the chain
type ChainCancelled ¶
type ChainCancelled interface {
Cancelled(newNode Chain) ChainComplete
}
ChainCancelled contract the builder must implement for cancellation
type ChainCancelledOrComplete ¶
type ChainCancelledOrComplete interface { ChainCancelled ChainComplete }
ChainCancelledOrComplete allows defining only cancel or completion
type ChainCompensate ¶
type ChainCompensate interface {
Compensate(newNode Chain) ChainCancelledOrComplete
}
ChainCompensate contract the builder must implement for compensation
type ChainComplete ¶
type ChainComplete interface {
Complete(completeDefinitionFn CompleteDefinitionFn, options ...StageOption) Chain
}
ChainComplete contract the builder must implement for completion
type ChainFinalizer ¶
type ChainFinalizer interface {
// contains filtered or unexported methods
}
ChainFinalizer finalizes the entire chain, used internally to build the chain
type ChainNode ¶
type ChainNode interface { ChainStage // must have at least 1 stage }
ChainNode a node in the chain
type ChainReport ¶
type ChainReport struct { Errors []error StageMap map[string]ChainReportStage NodeMap map[string]ChainReportNode }
type ChainReportNode ¶
type ChainReportStage ¶
type ChainStage ¶
type ChainStage interface {
Stage(name string, stageDefinitionFn StageDefinitionFn, options ...StageOption) ChainStageAny
}
ChainStage a stage in the chain node
type ChainStageAny ¶
type ChainStageAny interface { ChainStage ChainCompensate ChainCancelled ChainComplete }
ChainStageAny allows defining more stages and at least 1 of each compensate, cancelled or complete
type CompleteContext ¶
type CompleteContext interface { StageContext Output(variables ...*Var) error Name() string }
func NewCompleteContext ¶
func NewCompleteContext(ctx SparkContext, name string) CompleteContext
type CompleteDefinitionFn ¶
type CompleteDefinitionFn = func(ctx CompleteContext) StageError
type ConfigType ¶ added in v1.8.0
type ConfigType string
const ( ConfigTypeYaml ConfigType = "yaml" ConfigTypeJson ConfigType = "json" )
type Context ¶
type Context interface { Ctx() context.Context JobKey() string CorrelationID() string TransactionID() string LastActiveStage() *sparkv1.LastActiveStage }
func NewSparkMetadata ¶
type DelegateCompleteDefinitionFn ¶
type DelegateCompleteDefinitionFn = func(ctx CompleteContext, cb CompleteDefinitionFn) StageError
type DelegateStageDefinitionFn ¶
type DelegateStageDefinitionFn = func(ctx StageContext, cb StageDefinitionFn) (any, StageError)
type ErrorOption ¶
type ErrorOption = func(err *stageError) *stageError
func WithCancel ¶
func WithCancel() ErrorOption
func WithErrorCode ¶
func WithErrorCode(code uint32) ErrorOption
func WithFatal ¶
func WithFatal() ErrorOption
func WithMetadata ¶
func WithMetadata(metadata any) ErrorOption
func WithSkip ¶
func WithSkip() ErrorOption
type InMemoryStageProgressHandler ¶ added in v1.6.0
type InMemoryStageProgressHandler struct {
// contains filtered or unexported fields
}
func (*InMemoryStageProgressHandler) AddBehaviour ¶ added in v1.6.0
func (i *InMemoryStageProgressHandler) AddBehaviour() *Behaviour
func (*InMemoryStageProgressHandler) AssertStageCancelled ¶ added in v1.10.0
func (i *InMemoryStageProgressHandler) AssertStageCancelled(jobKey, stageName string)
func (*InMemoryStageProgressHandler) AssertStageCompleted ¶ added in v1.10.0
func (i *InMemoryStageProgressHandler) AssertStageCompleted(jobKey, stageName string)
func (*InMemoryStageProgressHandler) AssertStageFailed ¶ added in v1.10.0
func (i *InMemoryStageProgressHandler) AssertStageFailed(jobKey, stageName string)
func (*InMemoryStageProgressHandler) AssertStageResult ¶ added in v1.6.0
func (i *InMemoryStageProgressHandler) AssertStageResult(jobKey, stageName string, expectedStageResult any)
func (*InMemoryStageProgressHandler) AssertStageSkipped ¶ added in v1.10.0
func (i *InMemoryStageProgressHandler) AssertStageSkipped(jobKey, stageName string)
func (*InMemoryStageProgressHandler) AssertStageStarted ¶ added in v1.10.0
func (i *InMemoryStageProgressHandler) AssertStageStarted(jobKey, stageName string)
func (*InMemoryStageProgressHandler) AssertStageUnspecified ¶ added in v1.10.0
func (i *InMemoryStageProgressHandler) AssertStageUnspecified(jobKey, stageName string)
func (*InMemoryStageProgressHandler) Get ¶ added in v1.6.0
func (i *InMemoryStageProgressHandler) Get(jobKey, name string) (*sparkv1.StageStatus, error)
func (*InMemoryStageProgressHandler) GetResult ¶ added in v1.6.0
func (i *InMemoryStageProgressHandler) GetResult(jobKey, name string) Bindable
func (*InMemoryStageProgressHandler) ResetBehaviour ¶ added in v1.6.0
func (i *InMemoryStageProgressHandler) ResetBehaviour()
func (*InMemoryStageProgressHandler) Set ¶ added in v1.6.0
func (i *InMemoryStageProgressHandler) Set(stageStatus *sparkv1.SetStageStatusRequest) error
func (*InMemoryStageProgressHandler) SetJobStatus ¶ added in v1.6.0
func (i *InMemoryStageProgressHandler) SetJobStatus(jobStatus *sparkv1.SetJobStatusRequest) error
func (*InMemoryStageProgressHandler) SetResult ¶ added in v1.6.0
func (i *InMemoryStageProgressHandler) SetResult(result *sparkv1.SetStageResultRequest) error
type InitContext ¶ added in v1.6.0
type InitContext interface {
Config() BindableConfig
}
type Logger ¶
type Option ¶
type Option = func(je *sparkOpts) *sparkOpts
func WithConfiguration ¶ added in v1.8.0
func WithConfiguration(b []byte, t ConfigType) Option
func WithDelegateCompletion ¶
func WithDelegateCompletion(delegate DelegateCompleteDefinitionFn) Option
WithDelegateCompletion delegates execution of all completion stages TODO support delegating single completion stage by name
func WithDelegateStage ¶
func WithDelegateStage(delegate DelegateStageDefinitionFn) Option
WithDelegateStage delegates execution of all stages TODO support delegating single stage by name
func WithIOHandler ¶
func WithStageProgressHandler ¶
func WithStageProgressHandler(sph StageProgressHandler) Option
type ResultBehaviourParams ¶ added in v1.6.0
type ResultBehaviourParams struct {
// contains filtered or unexported fields
}
type RetryConfig ¶
type RetryConfig struct {
// contains filtered or unexported fields
}
type Spark ¶
type Spark interface { BuildChain(b Builder) Chain Init(ctx InitContext) error Stop() }
Spark the contract a developer must implement in order to be accepted by a worker
type SparkContext ¶
type SparkContext interface { Context IOHandler() IOHandler StageProgressHandler() StageProgressHandler LastActiveStage() *sparkv1.LastActiveStage Log() Logger WithoutLastActiveStage() SparkContext // contains filtered or unexported methods }
func NewJobContext ¶
func NewJobContext(metadata Context, opts *sparkOpts) SparkContext
type StageBehaviourParams ¶ added in v1.6.0
type StageBehaviourParams struct {
// contains filtered or unexported fields
}
type StageContext ¶
type StageContext interface { Context Inputs(names ...string) Inputs Input(names string) Input StageResult(name string) Bindable Log() Logger Name() string }
func NewStageContext ¶
func NewStageContext(ctx SparkContext, name string) StageContext
type StageDefinitionFn ¶
type StageDefinitionFn = func(ctx StageContext) (any, StageError)
type StageError ¶
type StageError interface { Error() string Code() uint32 Metadata() map[string]any ErrorType() sparkv1.ErrorType ToErrorMessage() *sparkv1.Error }
func NewStageError ¶
func NewStageError(err error, opts ...ErrorOption) StageError
type StageOption ¶
type StageOption = func(StageOptionParams) StageError
func WithStageStatus ¶
func WithStageStatus(stageName string, status sparkv1.StageStatus) StageOption
type StageOptionParams ¶
type StageOptionParams interface { StageName() string StageProgressHandler() StageProgressHandler IOHandler() IOHandler Context() Context }
type StageProgressHandler ¶
type StageProgressHandler interface { Get(jobKey, name string) (*sparkv1.StageStatus, error) Set(stageStatus *sparkv1.SetStageStatusRequest) error GetResult(jobKey, name string) Bindable SetResult(resultResult *sparkv1.SetStageResultRequest) error SetJobStatus(jobStatus *sparkv1.SetJobStatusRequest) error }
type TestIOHandler ¶ added in v1.6.0
func NewInMemoryIOHandler ¶ added in v1.6.0
func NewInMemoryIOHandler(t *testing.T) TestIOHandler
type TestStageProgressHandler ¶ added in v1.10.0
type TestStageProgressHandler interface { StageProgressHandler AssertStageCompleted(jobKey, stageName string) AssertStageStarted(jobKey, stageName string) AssertStageSkipped(jobKey, stageName string) AssertStageCancelled(jobKey, stageName string) AssertStageFailed(jobKey, stageName string) AssertStageUnspecified(jobKey, stageName string) AddBehaviour() *Behaviour ResetBehaviour() AssertStageResult(jobKey, stageName string, expectedStageResult any) }
func NewInMemoryStageProgressHandler ¶ added in v1.6.0
func NewInMemoryStageProgressHandler(t *testing.T, seeds ...any) TestStageProgressHandler