metadata

package
v0.0.0-...-7f2278f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 3, 2024 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Overview

Package metadata contains types to record/retrieve metadata stored in MLMD for individual pipeline steps.

Package metadata contains types to record/retrieve metadata stored in MLMD for individual pipeline steps.

Index

Constants

View Source
const (
	ImporterExecutionTypeName = "system.ImporterExecution"
)

Variables

This section is empty.

Functions

func GenerateOutputURI

func GenerateOutputURI(pipelineRoot string, paths []string, preserveQueryString bool) string

GenerateOutputURI appends the specified paths to the pipeline root. It may be configured to preserve the query part of the pipeline root by splitting it off and appending it back to the full URI.

func PbValueToText

func PbValueToText(v *structpb.Value) (string, error)

func SchemaToArtifactType

func SchemaToArtifactType(schema string) (*pb.ArtifactType, error)

func StringValue

func StringValue(s string) *pb.Value

func StructValueToMLMDValue

func StructValueToMLMDValue(v *structpb.Value) (*pb.Value, error)

func UnmarshalRuntimeArtifact

func UnmarshalRuntimeArtifact(bytes []byte) (*pipelinespec.RuntimeArtifact, error)

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is an MLMD service client.

func NewClient

func NewClient(serverAddress, serverPort string) (*Client, error)

NewClient creates a Client given the MLMD server address and port.

func (*Client) CreateExecution

func (c *Client) CreateExecution(ctx context.Context, pipeline *Pipeline, config *ExecutionConfig) (*Execution, error)

CreateExecution creates a new MLMD execution under the specified Pipeline.

func (*Client) FindMatchedArtifact

func (c *Client) FindMatchedArtifact(ctx context.Context, artifactToMatch *pb.Artifact, pipelineContextId int64) (matchedArtifact *pb.Artifact, err error)

func (*Client) GetArtifactName

func (c *Client) GetArtifactName(ctx context.Context, artifactId int64) (string, error)

func (*Client) GetArtifacts

func (c *Client) GetArtifacts(ctx context.Context, ids []int64) ([]*pb.Artifact, error)

GetArtifacts ...

func (*Client) GetDAG

func (c *Client) GetDAG(ctx context.Context, executionID int64) (*DAG, error)

func (*Client) GetEventsByArtifactIDs

func (c *Client) GetEventsByArtifactIDs(ctx context.Context, artifactIds []int64) ([]*pb.Event, error)

GetEventsByArtifactIDs ...

func (*Client) GetExecution

func (c *Client) GetExecution(ctx context.Context, id int64) (*Execution, error)

func (*Client) GetExecutions

func (c *Client) GetExecutions(ctx context.Context, ids []int64) ([]*pb.Execution, error)

GetExecutions ...

func (*Client) GetExecutionsInDAG

func (c *Client) GetExecutionsInDAG(ctx context.Context, dag *DAG, pipeline *Pipeline, filter bool) (executionsMap map[string]*Execution, err error)

GetExecutionsInDAG gets all executions in the DAG, and organize them into a map, keyed by task name.

func (*Client) GetInputArtifactsByExecutionID

func (c *Client) GetInputArtifactsByExecutionID(ctx context.Context, executionID int64) (inputs map[string]*pipelinespec.ArtifactList, err error)

func (*Client) GetOrInsertArtifactType

func (c *Client) GetOrInsertArtifactType(ctx context.Context, schema string) (typeID int64, err error)

TODO consider batching these requests TODO(lingqinggan): need to create artifact types during initiation, and only allow these types. Currently we allow users to create any artifact type.

func (*Client) GetOutputArtifactsByExecutionId

func (c *Client) GetOutputArtifactsByExecutionId(ctx context.Context, executionId int64) (map[string]*OutputArtifact, error)

GetOutputArtifactsByExecutionId ... TODO: Support multiple artifacts someday, probably through the v2 engine.

func (*Client) GetPipeline

func (c *Client) GetPipeline(ctx context.Context, pipelineName, runID, namespace, runResource, pipelineRoot, storeSessionInfo string) (*Pipeline, error)

GetPipeline returns the current pipeline represented by the specified pipeline name and run ID.

func (*Client) GetPipelineFromExecution

func (c *Client) GetPipelineFromExecution(ctx context.Context, id int64) (*Pipeline, error)

func (*Client) PrePublishExecution

func (c *Client) PrePublishExecution(ctx context.Context, execution *Execution, config *ExecutionConfig) (*Execution, error)

PrePublishExecution updates an existing MLMD execution with Pod info.

func (*Client) PublishExecution

func (c *Client) PublishExecution(ctx context.Context, execution *Execution, outputParameters map[string]*structpb.Value, outputArtifacts []*OutputArtifact, state pb.Execution_State) error

PublishExecution publishes the specified execution with the given output parameters, artifacts and state.

func (*Client) PutDAGExecutionState

func (c *Client) PutDAGExecutionState(ctx context.Context, executionID int64, state pb.Execution_State) error

PutDAGExecutionState updates the given DAG Id to the state provided.

func (*Client) RecordArtifact

func (c *Client) RecordArtifact(ctx context.Context, outputName, schema string, runtimeArtifact *pipelinespec.RuntimeArtifact, state pb.Artifact_State, bucketConfig *objectstore.Config) (*OutputArtifact, error)

RecordArtifact ...

func (*Client) UpdateDAGExecutionsState

func (c *Client) UpdateDAGExecutionsState(ctx context.Context, dag *DAG, pipeline *Pipeline) error

UpdateDAGExecutionState checks all the statuses of the tasks in the given DAG, based on that it will update the DAG to the corresponding status if necessary.

type ClientInterface

type ClientInterface interface {
	GetPipeline(ctx context.Context, pipelineName, runID, namespace, runResource, pipelineRoot, storeSessionInfo string) (*Pipeline, error)
	GetDAG(ctx context.Context, executionID int64) (*DAG, error)
	PublishExecution(ctx context.Context, execution *Execution, outputParameters map[string]*structpb.Value, outputArtifacts []*OutputArtifact, state pb.Execution_State) error
	CreateExecution(ctx context.Context, pipeline *Pipeline, config *ExecutionConfig) (*Execution, error)
	PrePublishExecution(ctx context.Context, execution *Execution, config *ExecutionConfig) (*Execution, error)
	GetExecutions(ctx context.Context, ids []int64) ([]*pb.Execution, error)
	GetExecution(ctx context.Context, id int64) (*Execution, error)
	GetPipelineFromExecution(ctx context.Context, id int64) (*Pipeline, error)
	GetExecutionsInDAG(ctx context.Context, dag *DAG, pipeline *Pipeline, filter bool) (executionsMap map[string]*Execution, err error)
	UpdateDAGExecutionsState(ctx context.Context, dag *DAG, pipeline *Pipeline) (err error)
	PutDAGExecutionState(ctx context.Context, executionID int64, state pb.Execution_State) (err error)
	GetEventsByArtifactIDs(ctx context.Context, artifactIds []int64) ([]*pb.Event, error)
	GetArtifactName(ctx context.Context, artifactId int64) (string, error)
	GetArtifacts(ctx context.Context, ids []int64) ([]*pb.Artifact, error)
	GetOutputArtifactsByExecutionId(ctx context.Context, executionId int64) (map[string]*OutputArtifact, error)
	RecordArtifact(ctx context.Context, outputName, schema string, runtimeArtifact *pipelinespec.RuntimeArtifact, state pb.Artifact_State, bucketConfig *objectstore.Config) (*OutputArtifact, error)
	GetOrInsertArtifactType(ctx context.Context, schema string) (typeID int64, err error)
	FindMatchedArtifact(ctx context.Context, artifactToMatch *pb.Artifact, pipelineContextId int64) (matchedArtifact *pb.Artifact, err error)
}

type DAG

type DAG struct {
	Execution *Execution
}

a Kubeflow Pipelines DAG

func (*DAG) Info

func (d *DAG) Info() string

identifier info for error message purposes

type Execution

type Execution struct {
	// contains filtered or unexported fields
}

Execution is a handle for the current execution.

func NewExecution

func NewExecution(e *pb.Execution) *Execution

A hacky way to get Execution from pb.Execution, usually you should get an Execution from this metadata package directly without using ml_metadata.Execution

func (*Execution) FingerPrint

func (e *Execution) FingerPrint() string

func (*Execution) GetExecution

func (e *Execution) GetExecution() *pb.Execution

func (*Execution) GetID

func (e *Execution) GetID() int64

func (*Execution) GetParameters

func (e *Execution) GetParameters() (inputs, outputs map[string]*structpb.Value, err error)

func (*Execution) GetPipeline

func (e *Execution) GetPipeline() *Pipeline

func (*Execution) String

func (e *Execution) String() string

func (*Execution) TaskName

func (e *Execution) TaskName() string

type ExecutionConfig

type ExecutionConfig struct {
	TaskName         string
	Name             string // optional, MLMD execution name. When provided, this needs to be unique among all MLMD executions.
	ExecutionType    ExecutionType
	NotTriggered     bool  // optional, not triggered executions will have CANCELED state.
	ParentDagID      int64 // parent DAG execution ID. Only the root DAG does not have a parent DAG.
	InputParameters  map[string]*structpb.Value
	OutputParameters map[string]*pipelinespec.DagOutputsSpec_DagOutputParameterSpec
	OutputArtifacts  map[string]*pipelinespec.DagOutputsSpec_DagOutputArtifactSpec
	InputArtifactIDs map[string][]int64
	IterationIndex   *int // Index of the iteration.

	// ContainerExecution custom properties
	Image, CachedMLMDExecutionID, FingerPrint string
	PodName, PodUID, Namespace                string

	// DAGExecution custom properties
	IterationCount *int // Number of iterations for an iterator DAG.
}

ExecutionConfig represents the input parameters and artifacts to an Execution.

func GenerateExecutionConfig

func GenerateExecutionConfig(executorInput *pipelinespec.ExecutorInput) (*ExecutionConfig, error)

type ExecutionType

type ExecutionType string
const (
	ContainerExecutionTypeName ExecutionType = "system.ContainerExecution"
	DagExecutionTypeName       ExecutionType = "system.DAGExecution"
)

type FakeClient

type FakeClient struct {
}

func NewFakeClient

func NewFakeClient() *FakeClient

func (*FakeClient) CreateExecution

func (c *FakeClient) CreateExecution(ctx context.Context, pipeline *Pipeline, config *ExecutionConfig) (*Execution, error)

func (*FakeClient) FindMatchedArtifact

func (c *FakeClient) FindMatchedArtifact(ctx context.Context, artifactToMatch *pb.Artifact, pipelineContextId int64) (matchedArtifact *pb.Artifact, err error)

func (*FakeClient) GetArtifactName

func (c *FakeClient) GetArtifactName(ctx context.Context, artifactId int64) (string, error)

func (*FakeClient) GetArtifacts

func (c *FakeClient) GetArtifacts(ctx context.Context, ids []int64) ([]*pb.Artifact, error)

func (*FakeClient) GetDAG

func (c *FakeClient) GetDAG(ctx context.Context, executionID int64) (*DAG, error)

func (*FakeClient) GetEventsByArtifactIDs

func (c *FakeClient) GetEventsByArtifactIDs(ctx context.Context, artifactIds []int64) ([]*pb.Event, error)

func (*FakeClient) GetExecution

func (c *FakeClient) GetExecution(ctx context.Context, id int64) (*Execution, error)

func (*FakeClient) GetExecutions

func (c *FakeClient) GetExecutions(ctx context.Context, ids []int64) ([]*pb.Execution, error)

func (*FakeClient) GetExecutionsInDAG

func (c *FakeClient) GetExecutionsInDAG(ctx context.Context, dag *DAG, pipeline *Pipeline, filter bool) (executionsMap map[string]*Execution, err error)

func (*FakeClient) GetOrInsertArtifactType

func (c *FakeClient) GetOrInsertArtifactType(ctx context.Context, schema string) (typeID int64, err error)

func (*FakeClient) GetOutputArtifactsByExecutionId

func (c *FakeClient) GetOutputArtifactsByExecutionId(ctx context.Context, executionId int64) (map[string]*OutputArtifact, error)

func (*FakeClient) GetPipeline

func (c *FakeClient) GetPipeline(ctx context.Context, pipelineName, runID, namespace, runResource, pipelineRoot string, storeSessionInfo string) (*Pipeline, error)

func (*FakeClient) GetPipelineFromExecution

func (c *FakeClient) GetPipelineFromExecution(ctx context.Context, id int64) (*Pipeline, error)

func (*FakeClient) PrePublishExecution

func (c *FakeClient) PrePublishExecution(ctx context.Context, execution *Execution, config *ExecutionConfig) (*Execution, error)

func (*FakeClient) PublishExecution

func (c *FakeClient) PublishExecution(ctx context.Context, execution *Execution, outputParameters map[string]*structpb.Value, outputArtifacts []*OutputArtifact, state pb.Execution_State) error

func (*FakeClient) PutDAGExecutionState

func (c *FakeClient) PutDAGExecutionState(ctx context.Context, executionID int64, state pb.Execution_State) (err error)

func (*FakeClient) RecordArtifact

func (c *FakeClient) RecordArtifact(ctx context.Context, outputName, schema string, runtimeArtifact *pipelinespec.RuntimeArtifact, state pb.Artifact_State, bucketConfig *objectstore.Config) (*OutputArtifact, error)

func (*FakeClient) UpdateDAGExecutionsState

func (c *FakeClient) UpdateDAGExecutionsState(ctx context.Context, dag *DAG, pipeline *Pipeline) (err error)

type InputArtifact

type InputArtifact struct {
	Artifact *pb.Artifact
}

InputArtifact is a wrapper around an MLMD artifact used as component inputs.

type OutputArtifact

type OutputArtifact struct {
	Name     string
	Artifact *pb.Artifact
	Schema   string
}

OutputArtifact represents a schema and an MLMD artifact for output artifacts produced by a component.

func (*OutputArtifact) Marshal

func (oa *OutputArtifact) Marshal() ([]byte, error)

func (*OutputArtifact) ToRuntimeArtifact

func (oa *OutputArtifact) ToRuntimeArtifact() (*pipelinespec.RuntimeArtifact, error)

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline is a handle for the current pipeline.

func (*Pipeline) GetCtxID

func (p *Pipeline) GetCtxID() int64

func (*Pipeline) GetPipelineRoot

func (p *Pipeline) GetPipelineRoot() string

func (*Pipeline) GetRunCtxID

func (p *Pipeline) GetRunCtxID() int64

func (*Pipeline) GetStoreSessionInfo

func (p *Pipeline) GetStoreSessionInfo() string

type ServerConfig

type ServerConfig struct {
	Address string
	Port    string
}

func DefaultConfig

func DefaultConfig() *ServerConfig

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL