spark_v1

package
v1.10.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 24, 2022 License: Apache-2.0 Imports: 27 Imported by: 0

Documentation

Index

Constants

View Source
const (
	NoMimeType   = ""
	MimeTypeJSON = "application/json"
)

Variables

View Source
var (
	ErrStageDoesNotExist        = errors.New("stage does not exists")
	ErrBindValueFailed          = errors.New("bind value failed")
	ErrVariableNotFound         = errors.New("variable not found")
	ErrStageNotFoundInNodeChain = errors.New("stage not found in the node chain")
	ErrConditionalStageSkipped  = errors.New("conditional stage execution")
	ErrChainIsNotValid          = errors.New("chain is not valid")
	ErrInputVariableNotFound    = errors.New("input variable not found")
)
View Source
var CompleteError = func(ctx CompleteContext) StageError {
	return NewStageError(errors.New("complete failed"))
}
View Source
var CompleteSuccess = func(ctx CompleteContext) StageError {
	return nil
}

Functions

func CreateManagerServiceClient

func CreateManagerServiceClient(config *config) (sparkv1.ManagerServiceClient, error)

func NewSparkMetadataFromGrpcRequest

func NewSparkMetadataFromGrpcRequest(ctx context.Context, req *sparkv1.ExecuteJobRequest) sparkMetadata

Types

type Behaviour added in v1.6.0

type Behaviour struct {
	// contains filtered or unexported fields
}

func (*Behaviour) Set added in v1.6.0

func (b *Behaviour) Set(stageName string, status sparkv1.StageStatus, err error) *InMemoryStageProgressHandler

func (*Behaviour) SetResult added in v1.6.0

func (b *Behaviour) SetResult(jobKey, stageName string, err error) *InMemoryStageProgressHandler

type Bindable added in v1.6.0

type Bindable interface {
	Bind(a any) error
	Raw() ([]byte, error)
	String() string
}

type BindableConfig added in v1.6.0

type BindableConfig interface {
	Bind(a any) error
	Raw() ([]byte, error)
}

type Builder

type Builder interface {
	NewChain(name string) BuilderChain
	ChainFinalizer
}

Builder contract for the chain builder

type BuilderChain

type BuilderChain interface {
	ChainNode
}

BuilderChain the root of a chain

type Chain

type Chain interface {
	// contains filtered or unexported methods
}

Chain finalizes a node in the chain, used internally to build a part of the chain

type ChainCancelled

type ChainCancelled interface {
	Cancelled(newNode Chain) ChainComplete
}

ChainCancelled contract the builder must implement for cancellation

type ChainCancelledOrComplete

type ChainCancelledOrComplete interface {
	ChainCancelled
	ChainComplete
}

ChainCancelledOrComplete allows defining only cancel or completion

type ChainCompensate

type ChainCompensate interface {
	Compensate(newNode Chain) ChainCancelledOrComplete
}

ChainCompensate contract the builder must implement for compensation

type ChainComplete

type ChainComplete interface {
	Complete(completeDefinitionFn CompleteDefinitionFn, options ...StageOption) Chain
}

ChainComplete contract the builder must implement for completion

type ChainFinalizer

type ChainFinalizer interface {
	// contains filtered or unexported methods
}

ChainFinalizer finalizes the entire chain, used internally to build the chain

type ChainNode

type ChainNode interface {
	ChainStage // must have at least 1 stage
}

ChainNode a node in the chain

type ChainReport

type ChainReport struct {
	Errors   []error
	StageMap map[string]ChainReportStage
	NodeMap  map[string]ChainReportNode
}

type ChainReportNode

type ChainReportNode struct {
	Name          string
	CanCompensate bool
	CanCancel     bool
	// contains filtered or unexported fields
}

type ChainReportStage

type ChainReportStage struct {
	Name  string
	Crumb string
}

type ChainStage

type ChainStage interface {
	Stage(name string, stageDefinitionFn StageDefinitionFn, options ...StageOption) ChainStageAny
}

ChainStage a stage in the chain node

type ChainStageAny

type ChainStageAny interface {
	ChainStage
	ChainCompensate
	ChainCancelled
	ChainComplete
}

ChainStageAny allows defining more stages and at least 1 of each compensate, cancelled or complete

type CompleteContext

type CompleteContext interface {
	StageContext
	Output(variables ...*Var) error
	Name() string
}

func NewCompleteContext

func NewCompleteContext(ctx SparkContext, name string) CompleteContext

type CompleteDefinitionFn

type CompleteDefinitionFn = func(ctx CompleteContext) StageError

type ConfigType added in v1.8.0

type ConfigType string
const (
	ConfigTypeYaml ConfigType = "yaml"
	ConfigTypeJson ConfigType = "json"
)

type Context

type Context interface {
	Ctx() context.Context
	JobKey() string
	CorrelationID() string
	TransactionID() string
	LastActiveStage() *sparkv1.LastActiveStage
}

func NewSparkMetadata

func NewSparkMetadata(ctx context.Context, jobKey, correlationID, transactionID string, lastActiveStage *sparkv1.LastActiveStage) Context

type DelegateCompleteDefinitionFn

type DelegateCompleteDefinitionFn = func(ctx CompleteContext, cb CompleteDefinitionFn) StageError

type DelegateStageDefinitionFn

type DelegateStageDefinitionFn = func(ctx StageContext, cb StageDefinitionFn) (any, StageError)

type ErrorOption

type ErrorOption = func(err *stageError) *stageError

func WithCancel

func WithCancel() ErrorOption

func WithErrorCode

func WithErrorCode(code uint32) ErrorOption

func WithFatal

func WithFatal() ErrorOption

func WithMetadata

func WithMetadata(metadata any) ErrorOption

func WithRetry

func WithRetry(times uint, backoffMillis time.Duration) ErrorOption

func WithSkip

func WithSkip() ErrorOption

type Gettable added in v1.6.0

type Gettable interface {
	Get(name string) Bindable
}

type IOHandler

type IOHandler interface {
	Inputs(jobKey string, names ...string) Inputs
	Input(jobKey, name string) Input
	Output(jobKey string, variables ...*Var) error
}

type InMemoryStageProgressHandler added in v1.6.0

type InMemoryStageProgressHandler struct {
	// contains filtered or unexported fields
}

func (*InMemoryStageProgressHandler) AddBehaviour added in v1.6.0

func (i *InMemoryStageProgressHandler) AddBehaviour() *Behaviour

func (*InMemoryStageProgressHandler) AssertStageCancelled added in v1.10.0

func (i *InMemoryStageProgressHandler) AssertStageCancelled(jobKey, stageName string)

func (*InMemoryStageProgressHandler) AssertStageCompleted added in v1.10.0

func (i *InMemoryStageProgressHandler) AssertStageCompleted(jobKey, stageName string)

func (*InMemoryStageProgressHandler) AssertStageFailed added in v1.10.0

func (i *InMemoryStageProgressHandler) AssertStageFailed(jobKey, stageName string)

func (*InMemoryStageProgressHandler) AssertStageResult added in v1.6.0

func (i *InMemoryStageProgressHandler) AssertStageResult(jobKey, stageName string, expectedStageResult any)

func (*InMemoryStageProgressHandler) AssertStageSkipped added in v1.10.0

func (i *InMemoryStageProgressHandler) AssertStageSkipped(jobKey, stageName string)

func (*InMemoryStageProgressHandler) AssertStageStarted added in v1.10.0

func (i *InMemoryStageProgressHandler) AssertStageStarted(jobKey, stageName string)

func (*InMemoryStageProgressHandler) AssertStageUnspecified added in v1.10.0

func (i *InMemoryStageProgressHandler) AssertStageUnspecified(jobKey, stageName string)

func (*InMemoryStageProgressHandler) Get added in v1.6.0

func (i *InMemoryStageProgressHandler) Get(jobKey, name string) (*sparkv1.StageStatus, error)

func (*InMemoryStageProgressHandler) GetResult added in v1.6.0

func (i *InMemoryStageProgressHandler) GetResult(jobKey, name string) Bindable

func (*InMemoryStageProgressHandler) ResetBehaviour added in v1.6.0

func (i *InMemoryStageProgressHandler) ResetBehaviour()

func (*InMemoryStageProgressHandler) Set added in v1.6.0

func (*InMemoryStageProgressHandler) SetJobStatus added in v1.6.0

func (i *InMemoryStageProgressHandler) SetJobStatus(jobStatus *sparkv1.SetJobStatusRequest) error

func (*InMemoryStageProgressHandler) SetResult added in v1.6.0

type InitContext added in v1.6.0

type InitContext interface {
	Config() BindableConfig
}

type Input

type Input interface {
	Bindable
}

type Inputs

type Inputs interface {
	Get(name string) Bindable
}

type Logger

type Logger interface {
	Info(format string, v ...any)
	Warn(format string, v ...any)
	Debug(format string, v ...any)
	Error(err error, format string, v ...any)
	AddFields(k string, v any) Logger
}

func NewLogger

func NewLogger() Logger

type Option

type Option = func(je *sparkOpts) *sparkOpts

func WithConfiguration added in v1.8.0

func WithConfiguration(b []byte, t ConfigType) Option

func WithDelegateCompletion

func WithDelegateCompletion(delegate DelegateCompleteDefinitionFn) Option

WithDelegateCompletion delegates execution of all completion stages TODO support delegating single completion stage by name

func WithDelegateStage

func WithDelegateStage(delegate DelegateStageDefinitionFn) Option

WithDelegateStage delegates execution of all stages TODO support delegating single stage by name

func WithIOHandler

func WithIOHandler(vh IOHandler) Option

func WithLog

func WithLog(log Logger) Option

func WithStageProgressHandler

func WithStageProgressHandler(sph StageProgressHandler) Option

type ResultBehaviourParams added in v1.6.0

type ResultBehaviourParams struct {
	// contains filtered or unexported fields
}

type RetryConfig

type RetryConfig struct {
	// contains filtered or unexported fields
}

type Spark

type Spark interface {
	BuildChain(b Builder) Chain
	Init(ctx InitContext) error
	Stop()
}

Spark the contract a developer must implement in order to be accepted by a worker

type SparkContext

type SparkContext interface {
	Context
	IOHandler() IOHandler
	StageProgressHandler() StageProgressHandler
	LastActiveStage() *sparkv1.LastActiveStage
	Log() Logger
	WithoutLastActiveStage() SparkContext
	// contains filtered or unexported methods
}

func NewJobContext

func NewJobContext(metadata Context, opts *sparkOpts) SparkContext

type StageBehaviourParams added in v1.6.0

type StageBehaviourParams struct {
	// contains filtered or unexported fields
}

type StageContext

type StageContext interface {
	Context
	Inputs(names ...string) Inputs
	Input(names string) Input
	StageResult(name string) Bindable
	Log() Logger
	Name() string
}

func NewStageContext

func NewStageContext(ctx SparkContext, name string) StageContext

type StageDefinitionFn

type StageDefinitionFn = func(ctx StageContext) (any, StageError)

type StageError

type StageError interface {
	Error() string
	Code() uint32
	Metadata() map[string]any
	ErrorType() sparkv1.ErrorType
	ToErrorMessage() *sparkv1.Error
}

func NewStageError

func NewStageError(err error, opts ...ErrorOption) StageError

type StageOption

type StageOption = func(StageOptionParams) StageError

func WithStageStatus

func WithStageStatus(stageName string, status sparkv1.StageStatus) StageOption

type StageOptionParams

type StageOptionParams interface {
	StageName() string
	StageProgressHandler() StageProgressHandler
	IOHandler() IOHandler
	Context() Context
}

type StageProgressHandler

type StageProgressHandler interface {
	Get(jobKey, name string) (*sparkv1.StageStatus, error)
	Set(stageStatus *sparkv1.SetStageStatusRequest) error
	GetResult(jobKey, name string) Bindable
	SetResult(resultResult *sparkv1.SetStageResultRequest) error
	SetJobStatus(jobStatus *sparkv1.SetJobStatusRequest) error
}

type TestIOHandler added in v1.6.0

type TestIOHandler interface {
	IOHandler
	SetVar(jobKey string, v *Var)
}

func NewInMemoryIOHandler added in v1.6.0

func NewInMemoryIOHandler(t *testing.T) TestIOHandler

type TestStageProgressHandler added in v1.10.0

type TestStageProgressHandler interface {
	StageProgressHandler
	AssertStageCompleted(jobKey, stageName string)
	AssertStageStarted(jobKey, stageName string)
	AssertStageSkipped(jobKey, stageName string)
	AssertStageCancelled(jobKey, stageName string)
	AssertStageFailed(jobKey, stageName string)
	AssertStageUnspecified(jobKey, stageName string)
	AddBehaviour() *Behaviour
	ResetBehaviour()
	AssertStageResult(jobKey, stageName string, expectedStageResult any)
}

func NewInMemoryStageProgressHandler added in v1.6.0

func NewInMemoryStageProgressHandler(t *testing.T, seeds ...any) TestStageProgressHandler

type Var

type Var struct {
	Name     string
	MimeType string
	Value    any
}

func NewVar

func NewVar(name, mimeType string, value any) *Var

type Worker

type Worker interface {
	Execute(ctx Context) StageError
	Run()
	LocalContext(jobKey, correlationID, transactionId string) Context
}

func NewSparkWorker

func NewSparkWorker(ctx context.Context, spark Spark, options ...Option) (Worker, error)

Directories

Path Synopsis
Package spark_v1_mock is a generated GoMock package.
Package spark_v1_mock is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL