persistence

package
v0.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 15, 2024 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrWorkflowIDExists = errors.New("workflow ID already exists")

Functions

This section is empty.

Types

type Persister

type Persister interface {
	IsUniqueWorkflowID(ctx context.Context, workflowID string) error

	LogWorkflowStep(ctx context.Context, entry *WorkflowLogEntry) error
	LogWorkflowStatus(ctx context.Context, status WorkflowStatus) error

	// LoadOpenWorkflows returns all workflows in the given state.
	// On boot, the orchestrator will call this method with state=StateOpen to
	// recover all open workflows.
	LoadOpenWorkflows(ctx context.Context, workflowName string) ([]*WorkflowStatus, error)

	// LoadWorkflowSteps returns all log entries for the given workflowID.
	LoadWorkflowSteps(ctx context.Context, workflowID string) ([]*WorkflowLogEntry, error)
}

type SQLitePersister

type SQLitePersister struct {
	DB *sql.DB
}

func NewSQLitePersister

func NewSQLitePersister(dbPath string) (*SQLitePersister, error)

func (*SQLitePersister) IsUniqueWorkflowID

func (s *SQLitePersister) IsUniqueWorkflowID(ctx context.Context, workflowID string) error

func (*SQLitePersister) LoadOpenWorkflows

func (s *SQLitePersister) LoadOpenWorkflows(ctx context.Context, workflowName string) ([]*WorkflowStatus, error)

LoadWorkflows loads all workflows in a given state but have not reached a terminal state.

func (*SQLitePersister) LoadWorkflowSteps

func (s *SQLitePersister) LoadWorkflowSteps(ctx context.Context, workflowID string) ([]*WorkflowLogEntry, error)

Adjusted LoadWorkflowSteps method in SQLitePersister

func (*SQLitePersister) LogWorkflowStatus

func (s *SQLitePersister) LogWorkflowStatus(ctx context.Context, status WorkflowStatus) error

func (*SQLitePersister) LogWorkflowStep

func (s *SQLitePersister) LogWorkflowStep(ctx context.Context, entry *WorkflowLogEntry) error

type SlogDebugger

type SlogDebugger struct {
	// contains filtered or unexported fields
}

SlogDebugger is not a real persister, but a debugger that logs to slog. It is used for debugging purposes only and shows the log entries through slog.

func NewSlogDebugger

func NewSlogDebugger(logger *slog.Logger) *SlogDebugger

func (*SlogDebugger) IsUniqueWorkflowID

func (s *SlogDebugger) IsUniqueWorkflowID(ctx context.Context, workflowID string) error

func (*SlogDebugger) LoadOpenWorkflows

func (s *SlogDebugger) LoadOpenWorkflows(ctx context.Context, workflowName string) ([]*WorkflowStatus, error)

func (*SlogDebugger) LoadWorkflowSteps

func (s *SlogDebugger) LoadWorkflowSteps(ctx context.Context, workflowID string) ([]*WorkflowLogEntry, error)

func (*SlogDebugger) LogWorkflowStatus

func (s *SlogDebugger) LogWorkflowStatus(ctx context.Context, status WorkflowStatus) error

func (*SlogDebugger) LogWorkflowStep

func (s *SlogDebugger) LogWorkflowStep(ctx context.Context, entry *WorkflowLogEntry) error

type TaskState

type TaskState string
const (
	StateOpen      TaskState = "open"
	StateCompleted TaskState = "completed"
	StateFailed    TaskState = "failed"
	StatePanicked  TaskState = "panicked" // retryable
	StateTimedOut  TaskState = "timed_out"
)

type WorkflowLogEntry

type WorkflowLogEntry struct {
	WorkflowID    string
	WorkflowName  string
	ActivityName  string
	ActivityToken string
	ActivityState TaskState
	NodeID        int64
	Input         []byte
	Output        []byte
	Config        *string
	Error         *string
	Timestamp     time.Time
	Duration      time.Duration
}

WorkflowLogEntry represents a single append-only log entry for a workflow.

type WorkflowStatus

type WorkflowStatus struct {
	WorkflowID    string
	WorkflowName  string
	WorkflowState TaskState
	Timestamp     time.Time

	// NonRestorable indicates that the workflow is not recoverable on reboot.
	// By default, workflows are restorable.
	NonRestorable bool `json:"non_restorable"`

	ParentWorkflowID *string
}

WorkflowStatus represents an append-only log entry for a workflow.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL