store

package
v2.14.0-beta0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 3, 2024 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	StatusStarted            = "started"
	StatusErrored            = "errored"
	StatusTimeout            = "timeout"
	StatusCompleted          = "completed"
	StatusCompletedEarlyExit = "completed_early_exit"
)

Note: any update to the enum below should be reflected in ValidStatuses and the database enum `workflow_status`.

Variables

Functions

This section is empty.

Types

type DBStore

type DBStore struct {
	// contains filtered or unexported fields
}

`DBStore` is a postgres-backed data store that persists workflow progress.

func NewDBStore

func NewDBStore(ds sqlutil.DataSource, lggr logger.Logger, clock clockwork.Clock) *DBStore

func (*DBStore) Add

func (d *DBStore) Add(ctx context.Context, state *WorkflowExecution) error

`Add` creates the relevant workflow_execution and workflow_step entries to persist the passed in ExecutionState.

func (*DBStore) Get

func (d *DBStore) Get(ctx context.Context, executionID string) (WorkflowExecution, error)

`Get` fetches the ExecutionState from the database.

func (*DBStore) GetUnfinished

func (d *DBStore) GetUnfinished(ctx context.Context, offset, limit int) ([]WorkflowExecution, error)

func (*DBStore) UpdateStatus

func (d *DBStore) UpdateStatus(ctx context.Context, executionID string, status string) error

`UpdateStatus` updates the status of the given workflow execution

func (*DBStore) UpsertStep

func (d *DBStore) UpsertStep(ctx context.Context, stepState *WorkflowExecutionStep) (WorkflowExecution, error)

`UpsertStep` updates the given step. This will correspond to an insert, or an update depending on whether a step with the ref already exists.

type StepOutput

type StepOutput struct {
	Err   error
	Value values.Value
}

type Store

type Store interface {
	Add(ctx context.Context, state *WorkflowExecution) error
	UpsertStep(ctx context.Context, step *WorkflowExecutionStep) (WorkflowExecution, error)
	UpdateStatus(ctx context.Context, executionID string, status string) error
	Get(ctx context.Context, executionID string) (WorkflowExecution, error)
	GetUnfinished(ctx context.Context, offset, limit int) ([]WorkflowExecution, error)
}

type WorkflowExecution

type WorkflowExecution struct {
	Steps       map[string]*WorkflowExecutionStep
	ExecutionID string
	WorkflowID  string

	Status     string
	CreatedAt  *time.Time
	UpdatedAt  *time.Time
	FinishedAt *time.Time
}

type WorkflowExecutionStep

type WorkflowExecutionStep struct {
	ExecutionID string
	Ref         string
	Status      string

	Inputs  *values.Map
	Outputs StepOutput

	UpdatedAt *time.Time
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL