dag

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 9, 2023 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Overview

Dag provides a way to describe a directed acyclic graph of work to be done. It starts with a root node, then you add nodes to it. Outputs are automatically connected to subsequent inputs.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrWorkNotFound      = fmt.Errorf("work not found")
	ErrWorkAlreadyExists = fmt.Errorf("work already exists")
)
View Source
var (
	MaxTime = time.Unix(1<<63-62135596801, 999999999)
)

Functions

func AllNodesFinished added in v0.5.12

func AllNodesFinished(ctx context.Context, nodes []Node[IOSpec]) (bool, error)

func GetDagStartTime added in v0.3.0

func GetDagStartTime[T any](ctx context.Context, dag []Node[T]) (time.Time, error)

func GetEndTimeIfDagComplete added in v0.3.0

func GetEndTimeIfDagComplete[T any](ctx context.Context, dag []Node[T]) (time.Time, error)

Types

type ExecutionInfo

type ExecutionInfo struct {
	ID     string // External ID of the job
	Stdout string // Stdout of the job
	Stderr string // Stderr of the job
	Status string // Status of the job
}

type IOSpec

type IOSpec interface {
	NodeName() string // The node ID from the graph spec (not the internal node ID)
	ID() string       // The node's input ID from the graph spec
	CID() string
	Context() string
	Path() string
	IsRoot() bool
}

IOSpec is a generic input/output specification for a DAG

func NewIOSpec

func NewIOSpec(name, id, value, path string, root bool, context string) IOSpec

func NilFunc added in v0.3.0

func NilFunc(ctx context.Context, inputs []IOSpec, statusChan chan NodeResult) []IOSpec

type Node

type Node[T any] interface {
	ID() int32
	Get(context.Context) (NodeRepresentation[T], error)
	AddChild(context.Context, Node[T]) error
	AddParentChildRelationship(context.Context, Node[T]) error
	AddParent(context.Context, Node[T]) error
	AddInput(context.Context, T) error
	AddOutput(context.Context, T) error
	SetMetadata(context.Context, NodeMetadata) error
	SetResults(context.Context, NodeResult) error
}

func FilterForRootNodes added in v0.3.0

func FilterForRootNodes(ctx context.Context, dags []Node[IOSpec]) ([]Node[IOSpec], error)

func NewNode

func NewNode(ctx context.Context, persistence db.NodePersistence, workRepo WorkRepository[IOSpec], n NodeSpec[IOSpec]) (Node[IOSpec], error)

func NodeMapToList added in v0.3.0

func NodeMapToList[T any](dags map[T]Node[IOSpec]) (nodes []Node[IOSpec])

type NodeExecutor added in v0.5.3

type NodeExecutor[T any] interface {
	Execute(ctx context.Context, executionId uuid.UUID, node Node[T])
}

func NewNodeExecutor added in v0.5.3

func NewNodeExecutor[T any](ctx context.Context, analytics analytics.AnalyticsRepository) (NodeExecutor[T], error)

type NodeMetadata

type NodeMetadata struct {
	CreatedAt time.Time
	StartedAt time.Time
	EndedAt   time.Time
	Status    string // Status of the execution
}

NodeMetadata contains metadata about a node

type NodeRepresentation added in v0.3.0

type NodeRepresentation[T any] struct {
	Id          int32        // Keep track of the node's ID, useful during debugging
	Name        string       // Name of the node
	QueueItemID uuid.UUID    // ID of the queue item
	Work        Work[T]      // The work to be done by the node
	Children    []Node[T]    // Children of the node
	Parents     []Node[T]    // Parents of the node
	Inputs      []T          // Input data
	Outputs     []T          // Output data (which is fed into the inputs of its children)
	Metadata    NodeMetadata // Metadata about the node
	Results     NodeResult   // Result of the node
}

type NodeResult added in v0.3.0

type NodeResult struct {
	ID      string // External ID of the execution
	StdOut  string // Stdout of the execution
	StdErr  string // Stderr of the execution
	Skipped bool   // Whether the execution was skipped
}

type NodeSpec added in v0.3.0

type NodeSpec[T any] struct {
	OwnerID uuid.UUID
	Name    string
	Work    Work[T]
}

type NodeStore added in v0.3.0

type NodeStore[T any] interface {
	NewNode(context.Context, NodeSpec[T]) (Node[T], error)
	GetNode(context.Context, int32) (Node[T], error)
}

func NewNodeStore added in v0.3.0

type Status added in v0.3.0

type Status int64
const (
	Queued Status = iota
	Started
	Finished
)

func (Status) String added in v0.3.0

func (s Status) String() string

type Work

type Work[T any] func(ctx context.Context, inputs []T, statusChan chan NodeResult) []T

Work is shorthand for a function that accepts inputs and returns outputs.

type WorkRepository added in v0.3.0

type WorkRepository[T any] interface {
	Get(context.Context, int32) (Work[T], error)
	Set(context.Context, int32, Work[T]) error
}

func NewInMemWorkRepository added in v0.3.0

func NewInMemWorkRepository[T any]() WorkRepository[T]

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL