Documentation ¶
Index ¶
- Constants
- Variables
- func NewBindable(value Value) *bindable
- type ActivityOptions
- type Bindable
- type BindableConfig
- type BindableMap
- type Builder
- type BuilderChain
- type Chain
- type ChainCancelled
- type ChainCancelledOrComplete
- type ChainCompensate
- type ChainComplete
- type ChainFinalizer
- type ChainNode
- type ChainReport
- type ChainReportNode
- type ChainReportStage
- type ChainStage
- type ChainStageAny
- type CompleteContext
- type CompleteDefinitionFn
- type CompleteStage
- type Config
- type ConfigType
- type Context
- type ErrorCode
- type ErrorOption
- type ExecuteSparkError
- type ExecuteSparkInputs
- type ExecuteSparkOutput
- type ExecuteStageRequest
- type Gettable
- type IOState
- type InitContext
- type Input
- type Inputs
- type InternalStageTracker
- type JobContext
- type JobMetadata
- type JobState
- type JobWorkflow
- type Logger
- type Node
- func (n *Node) ChainName() string
- func (n *Node) CompletionName() string
- func (n *Node) CountOfStages() int
- func (n *Node) HasCancellationStage() bool
- func (n *Node) HasCompensationStage() bool
- func (n *Node) HasCompletionStage() bool
- func (n *Node) IsCancel() bool
- func (n *Node) IsCompensate() bool
- func (n *Node) IsRoot() bool
- type Option
- type RetryConfig
- type RetryPolicy
- type Spark
- type SparkChain
- type SparkDataIO
- type SparkOpts
- type StackTraceItem
- type Stage
- type StageContext
- type StageDefinitionFn
- type StageError
- type StageOption
- type StageOptionParams
- type StageStatus
- type StageTracker
- type TemporalLogger
- type Value
- type Var
- type Worker
- type WorkflowOption
Constants ¶
const JobGetStageResultQuery = "GET_STAGE_RESULT"
Variables ¶
var ( ErrTargetNotPointer = errors.New("unable to set value of non-pointer") ErrUnableToBindUnknownMimeType = errors.New("unable to bind with unknown mime type") )
var ( ErrStageNotFoundInNodeChain = errors.New("stage not found in the Node SparkChain") ErrConditionalStageSkipped = errors.New("conditional Stage execution") ErrChainIsNotValid = errors.New("SparkChain is not valid") )
var ( DefaultRetryPolicy = &RetryPolicy{ InitialInterval: time.Second * 5, BackoffCoefficient: 2, MaximumInterval: time.Minute * 5, MaximumAttempts: 1, NonRetryableErrorTypes: nil, } DefaultActivityOptions = &ActivityOptions{ workflow.ActivityOptions{ StartToCloseTimeout: time.Minute * 10, }, } )
var CompleteError = func(ctx CompleteContext) StageError { return NewStageErrorWithCode(errorCodeInternal, errors.New("Complete failed")) }
var CompleteSuccess = func(ctx CompleteContext) StageError { return nil }
var (
ErrInvalidStageResultMimeType = errors.New("stage result expects mime-type of application/json")
)
var (
MimeJsonError = codec.MimeTypeJson.WithType("error")
)
Functions ¶
func NewBindable ¶ added in v1.20.0
func NewBindable(value Value) *bindable
Types ¶
type ActivityOptions ¶ added in v1.20.0
type ActivityOptions struct {
workflow.ActivityOptions
}
func (ActivityOptions) GetTemporalActivityOptions ¶ added in v1.20.0
func (ao ActivityOptions) GetTemporalActivityOptions() workflow.ActivityOptions
type Bindable ¶ added in v1.6.0
type Bindable interface { Bind(a any) error GetValue() (any, error) GetMimeType() string String() string }
func NewBindableError ¶ added in v1.20.0
type BindableConfig ¶ added in v1.6.0
type BindableMap ¶ added in v1.20.0
type BindableMap map[string]*bindable
type Builder ¶
type Builder interface { NewChain(name string) BuilderChain ChainFinalizer }
Builder contract for the SparkChain builder
type Chain ¶
type Chain interface {
// contains filtered or unexported methods
}
Chain finalizes a Node in the SparkChain, used internally to build a part of the SparkChain
type ChainCancelled ¶
type ChainCancelled interface {
Cancelled(newNode Chain) ChainComplete
}
ChainCancelled contract the builder must implement for cancellation
type ChainCancelledOrComplete ¶
type ChainCancelledOrComplete interface { ChainCancelled ChainComplete }
ChainCancelledOrComplete allows defining only Cancel or completion
type ChainCompensate ¶
type ChainCompensate interface {
Compensate(newNode Chain) ChainCancelledOrComplete
}
ChainCompensate contract the builder must implement for compensation
type ChainComplete ¶
type ChainComplete interface {
Complete(completeDefinitionFn CompleteDefinitionFn, options ...StageOption) Chain
}
ChainComplete contract the builder must implement for completion
type ChainFinalizer ¶
type ChainFinalizer interface {
BuildChain() *SparkChain
}
ChainFinalizer finalizes the entire SparkChain, used internally to build the SparkChain
type ChainNode ¶
type ChainNode interface { ChainStage // must have at least 1 Stage }
ChainNode a Node in the SparkChain
type ChainReport ¶
type ChainReport struct { Errors []error StageMap map[string]ChainReportStage NodeMap map[string]ChainReportNode }
type ChainReportNode ¶
type ChainReportStage ¶
type ChainStage ¶
type ChainStage interface {
Stage(name string, stageDefinitionFn StageDefinitionFn, options ...StageOption) ChainStageAny
}
ChainStage a Stage in the SparkChain Node
type ChainStageAny ¶
type ChainStageAny interface { ChainStage ChainCompensate ChainCancelled ChainComplete }
ChainStageAny allows defining more Stages and at least 1 of each Compensate, cancelled or Complete
type CompleteContext ¶
type CompleteContext interface { StageContext Output(variables ...*Var) error Name() string }
func NewCompleteContext ¶
func NewCompleteContext(req *ExecuteStageRequest, sparkDataIO SparkDataIO, workflowId, runId, name string, logger Logger, inputs ExecuteSparkInputs) CompleteContext
type CompleteDefinitionFn ¶
type CompleteDefinitionFn = func(ctx CompleteContext) StageError
type CompleteStage ¶ added in v1.20.0
type ConfigType ¶ added in v1.8.0
type ConfigType string
const ( ConfigTypeYaml ConfigType = "yaml" ConfigTypeJson ConfigType = "json" )
type ErrorCode ¶ added in v1.28.0
type ErrorCode string
const (
ErrorCodeGeneric ErrorCode = "GENERIC"
)
type ErrorOption ¶
type ErrorOption = func(err *stageError) *stageError
func WithErrorCode ¶
func WithErrorCode(errorCode ErrorCode) ErrorOption
func WithMetadata ¶
func WithMetadata(metadata any) ErrorOption
type ExecuteSparkError ¶ added in v1.20.0
type ExecuteSparkError struct { StageName string `json:"stage_name"` ErrorCode ErrorCode `json:"error_code"` ErrorMessage string `json:"error_message,omitempty"` Metadata map[string]any `json:"metadata,omitempty"` StackTrace []StackTraceItem `json:"stack_trace"` }
func (*ExecuteSparkError) Error ¶ added in v1.20.0
func (ese *ExecuteSparkError) Error() string
type ExecuteSparkInputs ¶ added in v1.20.0
type ExecuteSparkInputs BindableMap
type ExecuteSparkOutput ¶ added in v1.20.0
type ExecuteSparkOutput struct { Outputs BindableMap `json:"outputs,omitempty"` Error *ExecuteSparkError `json:"error,omitempty"` }
type ExecuteStageRequest ¶ added in v1.20.0
type InitContext ¶ added in v1.6.0
type InitContext interface {
Config() BindableConfig
}
func NewInitContext ¶ added in v1.20.0
func NewInitContext(opts *SparkOpts) InitContext
type InternalStageTracker ¶ added in v1.20.0
type InternalStageTracker interface { SetStageResult(name string, value Bindable) SetStageStatus(name string, status StageStatus) }
type JobContext ¶ added in v1.20.0
type JobContext struct { context.Context Metadata *JobMetadata }
func (*JobContext) CorrelationID ¶ added in v1.20.0
func (jc *JobContext) CorrelationID() string
func (*JobContext) JobKey ¶ added in v1.20.0
func (jc *JobContext) JobKey() string
func (*JobContext) TransactionID ¶ added in v1.20.0
func (jc *JobContext) TransactionID() string
type JobMetadata ¶ added in v1.20.0
type JobMetadata struct { SparkId string `json:"spark_id"` // id of the spark to execute Inputs ExecuteSparkInputs `json:"inputs"` // all inputs for the given spark id JobKeyValue string `json:"job_key"` CorrelationIdValue string `json:"correlation_id"` TransactionIdValue string `json:"transaction_id"` RootExecutionWorkflowId string `json:"execution_workflow_id"` // workflow id of the root execution to query RootExecutionRunId string `json:"execution_run_id"` // run id of the root execution to query JobExecutionWorkflowId string `json:"job_execution_workflow_id"` // workflow id of the root job workflow JobExecutionRunId string `json:"job_execution_run_id"` // run id of the root job workflow }
JobMetadata the context for the spark we want to execute on a module TODO this type should come from the Module Library
type JobState ¶ added in v1.20.0
type JobState struct { JobContext *JobMetadata StageResults map[string]Bindable }
type JobWorkflow ¶ added in v1.20.0
type JobWorkflow interface { Run(ctx workflow.Context, jmd *JobMetadata) (*ExecuteSparkOutput, error) ExecuteStageActivity(ctx context.Context, req *ExecuteStageRequest) (Bindable, StageError) ExecuteCompleteActivity(ctx context.Context, req *ExecuteStageRequest) (*ExecuteSparkOutput, StageError) }
func NewJobWorkflow ¶ added in v1.20.0
func NewJobWorkflow(ctx context.Context, sparkDataIO SparkDataIO, sparkId string, chain *SparkChain, opts ...WorkflowOption) JobWorkflow
type Logger ¶
type Node ¶ added in v1.20.0
type Node struct { Stages []*Stage Complete *CompleteStage Cancel *Node Compensate *Node Name string // contains filtered or unexported fields }
Node wraps all the Stages of a single SparkChain these are represented as one or more Stages but only one of each - cancellation - compensation - completion (finalizer)
func (*Node) CompletionName ¶ added in v1.20.0
func (*Node) CountOfStages ¶ added in v1.20.0
func (*Node) HasCancellationStage ¶ added in v1.20.0
func (*Node) HasCompensationStage ¶ added in v1.20.0
func (*Node) HasCompletionStage ¶ added in v1.20.0
func (*Node) IsCompensate ¶ added in v1.20.0
type Option ¶
func WithSparkConfig ¶ added in v1.20.0
type RetryConfig ¶
type RetryPolicy ¶ added in v1.20.0
type RetryPolicy struct { // Backoff interval for the first retry. If BackoffCoefficient is 1.0 then it is used for all retries. // If not set or set to 0, a default interval of 1s will be used. InitialInterval time.Duration `yaml:"initial_interval"` // Coefficient used to calculate the next retry backoff interval. // The next retry interval is previous interval multiplied by this coefficient. // Must be 1 or larger. Default is 2.0. BackoffCoefficient float64 `yaml:"backoff_coefficient"` // Maximum backoff interval between retries. Exponential backoff leads to interval increase. // This value is the cap of the interval. Default is 100x of initial interval. MaximumInterval time.Duration `yaml:"maximum_interval"` // Maximum number of attempts. When exceeded the retries stop even if not expired yet. // If not set or set to 0, it means unlimited, and rely on activity ScheduleToCloseTimeout to stop. MaximumAttempts int32 `yaml:"maximum_attempts"` // Non-Retriable errors. This is optional. Temporal server will stop retry if error type matches this list. // Note: // - cancellation is not a failure, so it won't be retried, // - only StartToClose or Heartbeat timeouts are retryable. NonRetryableErrorTypes []string `yaml:"non_retryable_error_types"` }
func (RetryPolicy) GetTemporalPolicy ¶ added in v1.20.0
func (rp RetryPolicy) GetTemporalPolicy() *temporal.RetryPolicy
type Spark ¶
type Spark interface { BuildChain(b Builder) Chain Init(ctx InitContext) error Stop() }
Spark the contract a developer must implement in order to be accepted by a worker
type SparkChain ¶ added in v1.20.0
type SparkChain struct { RootNode *Node // contains filtered or unexported fields }
SparkChain represents the entire SparkChain the RootNode is the main entry point of the entire SparkChain it holds its own children as a tree below the RootNode
func (*SparkChain) GetStageCompleteFunc ¶ added in v1.20.0
func (sc *SparkChain) GetStageCompleteFunc(name string) CompleteDefinitionFn
func (*SparkChain) GetStageFunc ¶ added in v1.20.0
func (sc *SparkChain) GetStageFunc(name string) StageDefinitionFn
type SparkDataIO ¶ added in v1.20.0
type SparkOpts ¶ added in v1.20.0
type SparkOpts struct {
// contains filtered or unexported fields
}
type StackTraceItem ¶ added in v1.25.0
type Stage ¶ added in v1.20.0
func (*Stage) ApplyConditionalExecutionOptions ¶ added in v1.20.0
func (s *Stage) ApplyConditionalExecutionOptions(ctx Context, stageName string) StageError
type StageContext ¶
type StageContext interface { Context Input(names string) Input StageResult(name string) Bindable Log() Logger Name() string }
func NewStageContext ¶
func NewStageContext(req *ExecuteStageRequest, sparkDataIO SparkDataIO, workflowId, runId, name string, logger Logger, inputs ExecuteSparkInputs) StageContext
type StageDefinitionFn ¶
type StageDefinitionFn = func(ctx StageContext) (any, StageError)
type StageError ¶
type StageError interface { ErrorCode() ErrorCode StageName() string Error() string Metadata() map[string]any GetRetryConfig() *RetryConfig // contains filtered or unexported methods }
func NewStageError ¶
func NewStageError(err error, opts ...ErrorOption) StageError
func NewStageErrorWithCode ¶ added in v1.28.0
func NewStageErrorWithCode(errorCode ErrorCode, err error, opts ...ErrorOption) StageError
type StageOption ¶
type StageOption = func(StageOptionParams) StageError
type StageOptionParams ¶
type StageStatus ¶
type StageStatus string
const ( StageStatus_STAGE_PENDING StageStatus = "STAGE_PENDING" StageStatus_STAGE_STARTED StageStatus = "STAGE_STARTED" StageStatus_STAGE_COMPLETED StageStatus = "STAGE_COMPLETED" StageStatus_STAGE_FAILED StageStatus = "STAGE_FAILED" StageStatus_STAGE_SKIPPED StageStatus = "STAGE_SKIPPED" StageStatus_STAGE_CANCELED StageStatus = "CANCELED" )
type StageTracker ¶ added in v1.20.0
type StageTracker interface { GetStageResult(name string) (data any, mime codec.MimeType, err StageError) AssertStageCompleted(stageName string) AssertStageStarted(stageName string) AssertStageSkipped(stageName string) AssertStageCancelled(stageName string) AssertStageFailed(stageName string) AssertStageResult(stageName string, expectedStageResult any) AssertStageOrder(stageNames ...string) }
type TemporalLogger ¶ added in v1.20.0
type TemporalLogger struct { }
Logger
func (*TemporalLogger) Debug ¶ added in v1.20.0
func (f *TemporalLogger) Debug(msg string, keyvals ...interface{})
func (*TemporalLogger) Error ¶ added in v1.20.0
func (f *TemporalLogger) Error(msg string, keyvals ...interface{})
func (*TemporalLogger) Info ¶ added in v1.20.0
func (f *TemporalLogger) Info(msg string, keyvals ...interface{})
func (*TemporalLogger) Warn ¶ added in v1.20.0
func (f *TemporalLogger) Warn(msg string, keyvals ...interface{})
type WorkflowOption ¶ added in v1.20.0
type WorkflowOption = func(je *workflowOpts) *workflowOpts
func WithStageTracker ¶ added in v1.20.0
func WithStageTracker(ist InternalStageTracker) WorkflowOption