sparkv1

package
v1.22.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 11, 2023 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const JobGetStageResultQuery = "GET_STAGE_RESULT"

Variables

View Source
var (
	ErrTargetNotPointer            = errors.New("unable to set value of non-pointer")
	ErrUnableToBindUnknownMimeType = errors.New("unable to bind with unknown mime type")
)
View Source
var (
	ErrStageNotFoundInNodeChain = errors.New("stage not found in the Node SparkChain")
	ErrConditionalStageSkipped  = errors.New("conditional Stage execution")
	ErrChainIsNotValid          = errors.New("SparkChain is not valid")
)
View Source
var (
	DefaultRetryPolicy = &RetryPolicy{
		InitialInterval:        time.Second * 5,
		BackoffCoefficient:     2,
		MaximumInterval:        time.Minute * 5,
		MaximumAttempts:        1,
		NonRetryableErrorTypes: nil,
	}

	DefaultActivityOptions = &ActivityOptions{
		workflow.ActivityOptions{
			StartToCloseTimeout: time.Minute * 10,
		},
	}
)
View Source
var CompleteError = func(ctx CompleteContext) StageError {
	return NewStageError(errors.New("Complete failed"))
}
View Source
var CompleteSuccess = func(ctx CompleteContext) StageError {
	return nil
}
View Source
var (
	ErrInvalidStageResultMimeType = errors.New("stage result expects mime-type of application/json")
)
View Source
var (
	MimeJsonError = codec.MimeTypeJson.WithType("error")
)

Functions

func NewBindable added in v1.20.0

func NewBindable(value Value) *bindable

Types

type ActivityOptions added in v1.20.0

type ActivityOptions struct {
	workflow.ActivityOptions
}

func (ActivityOptions) GetTemporalActivityOptions added in v1.20.0

func (ao ActivityOptions) GetTemporalActivityOptions() workflow.ActivityOptions

type Bindable added in v1.6.0

type Bindable interface {
	Bind(a any) error
	GetValue() (any, error)
	GetMimeType() string
	String() string
}

func NewBindableError added in v1.20.0

func NewBindableError(err error) Bindable

type BindableConfig added in v1.6.0

type BindableConfig interface {
	Bind(a any) error
	Raw() ([]byte, error)
}

type BindableMap added in v1.20.0

type BindableMap map[string]*bindable

type Builder

type Builder interface {
	NewChain(name string) BuilderChain
	ChainFinalizer
}

Builder contract for the SparkChain builder

func NewBuilder

func NewBuilder() Builder

NewBuilder main entry point to the builder

type BuilderChain

type BuilderChain interface {
	ChainNode
}

BuilderChain the root of a SparkChain

type Chain

type Chain interface {
	// contains filtered or unexported methods
}

Chain finalizes a Node in the SparkChain, used internally to build a part of the SparkChain

type ChainCancelled

type ChainCancelled interface {
	Cancelled(newNode Chain) ChainComplete
}

ChainCancelled contract the builder must implement for cancellation

type ChainCancelledOrComplete

type ChainCancelledOrComplete interface {
	ChainCancelled
	ChainComplete
}

ChainCancelledOrComplete allows defining only Cancel or completion

type ChainCompensate

type ChainCompensate interface {
	Compensate(newNode Chain) ChainCancelledOrComplete
}

ChainCompensate contract the builder must implement for compensation

type ChainComplete

type ChainComplete interface {
	Complete(completeDefinitionFn CompleteDefinitionFn, options ...StageOption) Chain
}

ChainComplete contract the builder must implement for completion

type ChainFinalizer

type ChainFinalizer interface {
	BuildChain() *SparkChain
}

ChainFinalizer finalizes the entire SparkChain, used internally to build the SparkChain

type ChainNode

type ChainNode interface {
	ChainStage // must have at least 1 Stage
}

ChainNode a Node in the SparkChain

type ChainReport

type ChainReport struct {
	Errors   []error
	StageMap map[string]ChainReportStage
	NodeMap  map[string]ChainReportNode
}

type ChainReportNode

type ChainReportNode struct {
	Name          string
	CanCompensate bool
	CanCancel     bool
	// contains filtered or unexported fields
}

type ChainReportStage

type ChainReportStage struct {
	Name  string
	Crumb string
}

type ChainStage

type ChainStage interface {
	Stage(name string, stageDefinitionFn StageDefinitionFn, options ...StageOption) ChainStageAny
}

ChainStage a Stage in the SparkChain Node

type ChainStageAny

type ChainStageAny interface {
	ChainStage
	ChainCompensate
	ChainCancelled
	ChainComplete
}

ChainStageAny allows defining more Stages and at least 1 of each Compensate, cancelled or Complete

type CompleteContext

type CompleteContext interface {
	StageContext
	Output(variables ...*Var) error
	Name() string
}

func NewCompleteContext

func NewCompleteContext(req *ExecuteStageRequest, sparkDataIO SparkDataIO, workflowId, runId, name string, logger Logger, inputs ExecuteSparkInputs) CompleteContext

type CompleteDefinitionFn

type CompleteDefinitionFn = func(ctx CompleteContext) StageError

type CompleteStage added in v1.20.0

type CompleteStage struct {
	Node *Node
	Name string
	// contains filtered or unexported fields
}

type Config

type Config struct {
	Id         string          `yaml:"id"`
	Name       string          `yaml:"Name"`
	QueueGroup string          `yaml:"queue_group"`
	Health     *configHealth   `yaml:"health"`
	Server     *configServer   `yaml:"plugin"`
	Log        *configLog      `yaml:"logging"`
	App        *configApp      `yaml:"app"`
	Temporal   *configTemporal `yaml:"temporal"`
}

type ConfigType added in v1.8.0

type ConfigType string
const (
	ConfigTypeYaml ConfigType = "yaml"
	ConfigTypeJson ConfigType = "json"
)

type Context

type Context interface {
	JobKey() string
	CorrelationID() string
	TransactionID() string
}

func NewJobContext

func NewJobContext(metadata Context, opts *SparkOpts) Context

func NewSparkMetadata

func NewSparkMetadata(jobKey, correlationID, transactionID string, logger Logger) Context

type ErrorOption

type ErrorOption = func(err *stageError) *stageError

func WithMetadata

func WithMetadata(metadata any) ErrorOption

func WithRetry

func WithRetry(times uint, backoffMultiplier uint, firstBackoffWait time.Duration) ErrorOption

type ExecuteSparkError added in v1.20.0

type ExecuteSparkError struct {
	ErrorMessage string         `json:"error_message,omitempty"`
	Metadata     map[string]any `json:"metadata,omitempty"`
}

func (*ExecuteSparkError) Error added in v1.20.0

func (ese *ExecuteSparkError) Error() string

type ExecuteSparkInputs added in v1.20.0

type ExecuteSparkInputs BindableMap

type ExecuteSparkOutput added in v1.20.0

type ExecuteSparkOutput struct {
	Outputs BindableMap        `json:"outputs,omitempty"`
	Error   *ExecuteSparkError `json:"error,omitempty"`
}

type ExecuteStageRequest added in v1.20.0

type ExecuteStageRequest struct {
	StageName  string
	Inputs     ExecuteSparkInputs
	WorkflowId string
	RunId      string
}

type Gettable added in v1.6.0

type Gettable interface {
	Get(name string) Bindable
}

type IOState added in v1.20.0

type IOState interface {
	GetVar(varName string) any
}

type InitContext added in v1.6.0

type InitContext interface {
	Config() BindableConfig
}

func NewInitContext added in v1.20.0

func NewInitContext(opts *SparkOpts) InitContext

type Input

type Input interface {
	Bindable
}

type Inputs

type Inputs interface {
	Get(name string) Bindable
}

type InternalStageTracker added in v1.20.0

type InternalStageTracker interface {
	SetStageResult(name string, value Bindable)
	SetStageStatus(name string, status StageStatus)
}

type JobContext added in v1.20.0

type JobContext struct {
	context.Context
	Metadata *JobMetadata
}

func (*JobContext) CorrelationID added in v1.20.0

func (jc *JobContext) CorrelationID() string

func (*JobContext) JobKey added in v1.20.0

func (jc *JobContext) JobKey() string

func (*JobContext) TransactionID added in v1.20.0

func (jc *JobContext) TransactionID() string

type JobMetadata added in v1.20.0

type JobMetadata struct {
	SparkId            string             `json:"spark_id"` // id of the spark to execute
	Inputs             ExecuteSparkInputs `json:"inputs"`   // all inputs for the given spark id
	JobKeyValue        string             `json:"job_key"`
	CorrelationIdValue string             `json:"correlation_id"`
	TransactionIdValue string             `json:"transaction_id"`

	RootExecutionWorkflowId string `json:"execution_workflow_id"`     // workflow id of the root execution to query
	RootExecutionRunId      string `json:"execution_run_id"`          // run id of the root execution to query
	JobExecutionWorkflowId  string `json:"job_execution_workflow_id"` // workflow id of the root job workflow
	JobExecutionRunId       string `json:"job_execution_run_id"`      // run id of the root job workflow
}

JobMetadata the context for the spark we want to execute on a module TODO this type should come from the Module Library

type JobState added in v1.20.0

type JobState struct {
	JobContext   *JobMetadata
	StageResults map[string]Bindable
}

type JobWorkflow added in v1.20.0

type JobWorkflow interface {
	Run(ctx workflow.Context, jmd *JobMetadata) (*ExecuteSparkOutput, error)
	ExecuteStageActivity(ctx context.Context, req *ExecuteStageRequest) (Bindable, StageError)
	ExecuteCompleteActivity(ctx context.Context, req *ExecuteStageRequest) (*ExecuteSparkOutput, StageError)
}

func NewJobWorkflow added in v1.20.0

func NewJobWorkflow(ctx context.Context, sparkDataIO SparkDataIO, sparkId string, chain *SparkChain, opts ...WorkflowOption) JobWorkflow

type Logger

type Logger interface {
	Info(format string, v ...any)
	Warn(format string, v ...any)
	Debug(format string, v ...any)
	Error(err error, format string, v ...any)
	AddFields(k string, v any) Logger
}

func NewLogger

func NewLogger() Logger

type Node added in v1.20.0

type Node struct {
	Stages     []*Stage
	Complete   *CompleteStage
	Cancel     *Node
	Compensate *Node
	Name       string
	// contains filtered or unexported fields
}

Node wraps all the Stages of a single SparkChain these are represented as one or more Stages but only one of each - cancellation - compensation - completion (finalizer)

func (*Node) ChainName added in v1.20.0

func (n *Node) ChainName() string

func (*Node) CompletionName added in v1.20.0

func (n *Node) CompletionName() string

func (*Node) CountOfStages added in v1.20.0

func (n *Node) CountOfStages() int

func (*Node) HasCancellationStage added in v1.20.0

func (n *Node) HasCancellationStage() bool

func (*Node) HasCompensationStage added in v1.20.0

func (n *Node) HasCompensationStage() bool

func (*Node) HasCompletionStage added in v1.20.0

func (n *Node) HasCompletionStage() bool

func (*Node) IsCancel added in v1.20.0

func (n *Node) IsCancel() bool

func (*Node) IsCompensate added in v1.20.0

func (n *Node) IsCompensate() bool

func (*Node) IsRoot added in v1.20.0

func (n *Node) IsRoot() bool

type Option

type Option = func(je *SparkOpts) *SparkOpts

func WithSparkConfig added in v1.20.0

func WithSparkConfig(cfg any) Option

type RetryConfig

type RetryConfig struct {
	Times             uint          `json:"times" yaml:"times"`
	FirstBackoffWait  time.Duration `json:"first_backoff_wait" yaml:"first_backoff_wait"`
	BackoffMultiplier uint          `json:"backoff_multiplier" yaml:"backoff_multiplier"`
}

type RetryPolicy added in v1.20.0

type RetryPolicy struct {
	// Backoff interval for the first retry. If BackoffCoefficient is 1.0 then it is used for all retries.
	// If not set or set to 0, a default interval of 1s will be used.
	InitialInterval time.Duration `yaml:"initial_interval"`

	// Coefficient used to calculate the next retry backoff interval.
	// The next retry interval is previous interval multiplied by this coefficient.
	// Must be 1 or larger. Default is 2.0.
	BackoffCoefficient float64 `yaml:"backoff_coefficient"`

	// Maximum backoff interval between retries. Exponential backoff leads to interval increase.
	// This value is the cap of the interval. Default is 100x of initial interval.
	MaximumInterval time.Duration `yaml:"maximum_interval"`

	// Maximum number of attempts. When exceeded the retries stop even if not expired yet.
	// If not set or set to 0, it means unlimited, and rely on activity ScheduleToCloseTimeout to stop.
	MaximumAttempts int32 `yaml:"maximum_attempts"`

	// Non-Retriable errors. This is optional. Temporal server will stop retry if error type matches this list.
	// Note:
	//  - cancellation is not a failure, so it won't be retried,
	//  - only StartToClose or Heartbeat timeouts are retryable.
	NonRetryableErrorTypes []string `yaml:"non_retryable_error_types"`
}

func (RetryPolicy) GetTemporalPolicy added in v1.20.0

func (rp RetryPolicy) GetTemporalPolicy() *temporal.RetryPolicy

type Spark

type Spark interface {
	BuildChain(b Builder) Chain
	Init(ctx InitContext) error
	Stop()
}

Spark the contract a developer must implement in order to be accepted by a worker

type SparkChain added in v1.20.0

type SparkChain struct {
	RootNode *Node
	// contains filtered or unexported fields
}

SparkChain represents the entire SparkChain the RootNode is the main entry point of the entire SparkChain it holds its own children as a tree below the RootNode

func (*SparkChain) GetStageCompleteFunc added in v1.20.0

func (sc *SparkChain) GetStageCompleteFunc(name string) CompleteDefinitionFn

func (*SparkChain) GetStageFunc added in v1.20.0

func (sc *SparkChain) GetStageFunc(name string) StageDefinitionFn

type SparkDataIO added in v1.20.0

type SparkDataIO interface {
	GetStageResult(workflowId, runId, stageName string) (Bindable, error)
}

type SparkOpts added in v1.20.0

type SparkOpts struct {
	// contains filtered or unexported fields
}

type Stage added in v1.20.0

type Stage struct {
	Node *Node
	Name string
	// contains filtered or unexported fields
}

func (*Stage) ApplyConditionalExecutionOptions added in v1.20.0

func (s *Stage) ApplyConditionalExecutionOptions(ctx Context, stageName string) StageError

type StageContext

type StageContext interface {
	Input(names string) Input
	StageResult(name string) Bindable
	Log() Logger
	Name() string
}

func NewStageContext

func NewStageContext(req *ExecuteStageRequest, sparkDataIO SparkDataIO, workflowId, runId, name string, logger Logger, inputs ExecuteSparkInputs) StageContext

type StageDefinitionFn

type StageDefinitionFn = func(ctx StageContext) (any, StageError)

type StageError

type StageError interface {
	Error() string
	Metadata() map[string]any
	GetRetryConfig() *RetryConfig
}

func NewStageError

func NewStageError(err error, opts ...ErrorOption) StageError

type StageOption

type StageOption = func(StageOptionParams) StageError

type StageOptionParams

type StageOptionParams interface {
	StageName() string
	Context() Context
}

type StageStatus

type StageStatus string
const (
	StageStatus_STAGE_PENDING   StageStatus = "STAGE_PENDING"
	StageStatus_STAGE_STARTED   StageStatus = "STAGE_STARTED"
	StageStatus_STAGE_COMPLETED StageStatus = "STAGE_COMPLETED"
	StageStatus_STAGE_FAILED    StageStatus = "STAGE_FAILED"
	StageStatus_STAGE_SKIPPED   StageStatus = "STAGE_SKIPPED"
	StageStatus_STAGE_CANCELED  StageStatus = "CANCELED"
)

type StageTracker added in v1.20.0

type StageTracker interface {
	GetStageResult(name string) (data any, mime codec.MimeType, err StageError)
	AssertStageCompleted(stageName string)
	AssertStageStarted(stageName string)
	AssertStageSkipped(stageName string)
	AssertStageCancelled(stageName string)
	AssertStageFailed(stageName string)
	AssertStageResult(stageName string, expectedStageResult any)
	AssertStageOrder(stageNames ...string)
}

type TemporalLogger added in v1.20.0

type TemporalLogger struct {
}

Logger

func (*TemporalLogger) Debug added in v1.20.0

func (f *TemporalLogger) Debug(msg string, keyvals ...interface{})

func (*TemporalLogger) Error added in v1.20.0

func (f *TemporalLogger) Error(msg string, keyvals ...interface{})

func (*TemporalLogger) Info added in v1.20.0

func (f *TemporalLogger) Info(msg string, keyvals ...interface{})

func (*TemporalLogger) Warn added in v1.20.0

func (f *TemporalLogger) Warn(msg string, keyvals ...interface{})

type Value added in v1.20.0

type Value struct {
	Value    any    `json:"value"`
	Raw      any    `json:"raw"` // TODO Remove, only used for debug
	MimeType string `json:"mime_type"`
}

type Var

type Var struct {
	Name     string
	MimeType codec.MimeType
	Value    any
}

func NewVar

func NewVar(name string, mimeType codec.MimeType, value any) *Var

type Worker

type Worker interface {
	Run()
}

func NewSparkWorker

func NewSparkWorker(ctx context.Context, spark Spark, options ...Option) (Worker, error)

type WorkflowOption added in v1.20.0

type WorkflowOption = func(je *workflowOpts) *workflowOpts

func WithStageTracker added in v1.20.0

func WithStageTracker(ist InternalStageTracker) WorkflowOption

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL