Documentation ¶
Overview ¶
Package metadata contains types to record/retrieve metadata stored in MLMD for individual pipeline steps.
Package metadata contains types to record/retrieve metadata stored in MLMD for individual pipeline steps.
Index ¶
- Constants
- func GenerateOutputURI(pipelineRoot string, paths []string, preserveQueryString bool) string
- func PbValueToText(v *structpb.Value) (string, error)
- func SchemaToArtifactType(schema string) (*pb.ArtifactType, error)
- func StringValue(s string) *pb.Value
- func StructValueToMLMDValue(v *structpb.Value) (*pb.Value, error)
- func TextToPbValue(text string, t pipelinespec.ParameterType_ParameterTypeEnum) (*structpb.Value, error)
- func UnmarshalRuntimeArtifact(bytes []byte) (*pipelinespec.RuntimeArtifact, error)
- type Client
- func (c *Client) CreateExecution(ctx context.Context, pipeline *Pipeline, config *ExecutionConfig) (*Execution, error)
- func (c *Client) FindMatchedArtifact(ctx context.Context, artifactToMatch *pb.Artifact, pipelineContextId int64) (matchedArtifact *pb.Artifact, err error)
- func (c *Client) GetArtifactName(ctx context.Context, artifactId int64) (string, error)
- func (c *Client) GetArtifacts(ctx context.Context, ids []int64) ([]*pb.Artifact, error)
- func (c *Client) GetDAG(ctx context.Context, executionID int64) (*DAG, error)
- func (c *Client) GetEventsByArtifactIDs(ctx context.Context, artifactIds []int64) ([]*pb.Event, error)
- func (c *Client) GetExecution(ctx context.Context, id int64) (*Execution, error)
- func (c *Client) GetExecutions(ctx context.Context, ids []int64) ([]*pb.Execution, error)
- func (c *Client) GetExecutionsInDAG(ctx context.Context, dag *DAG, pipeline *Pipeline, filter bool) (executionsMap map[string]*Execution, err error)
- func (c *Client) GetInputArtifactsByExecutionID(ctx context.Context, executionID int64) (inputs map[string]*pipelinespec.ArtifactList, err error)
- func (c *Client) GetOrInsertArtifactType(ctx context.Context, schema string) (typeID int64, err error)
- func (c *Client) GetOutputArtifactsByExecutionId(ctx context.Context, executionId int64) (map[string]*OutputArtifact, error)
- func (c *Client) GetPipeline(ctx context.Context, ...) (*Pipeline, error)
- func (c *Client) GetPipelineFromExecution(ctx context.Context, id int64) (*Pipeline, error)
- func (c *Client) PrePublishExecution(ctx context.Context, execution *Execution, config *ExecutionConfig) (*Execution, error)
- func (c *Client) PublishExecution(ctx context.Context, execution *Execution, ...) error
- func (c *Client) PutDAGExecutionState(ctx context.Context, executionID int64, state pb.Execution_State) error
- func (c *Client) RecordArtifact(ctx context.Context, outputName, schema string, ...) (*OutputArtifact, error)
- func (c *Client) UpdateDAGExecutionsState(ctx context.Context, dag *DAG, pipeline *Pipeline) error
- type ClientInterface
- type DAG
- type Execution
- func (e *Execution) FingerPrint() string
- func (e *Execution) GetExecution() *pb.Execution
- func (e *Execution) GetID() int64
- func (e *Execution) GetParameters() (inputs, outputs map[string]*structpb.Value, err error)
- func (e *Execution) GetPipeline() *Pipeline
- func (e *Execution) String() string
- func (e *Execution) TaskName() string
- type ExecutionConfig
- type ExecutionType
- type FakeClient
- func (c *FakeClient) CreateExecution(ctx context.Context, pipeline *Pipeline, config *ExecutionConfig) (*Execution, error)
- func (c *FakeClient) FindMatchedArtifact(ctx context.Context, artifactToMatch *pb.Artifact, pipelineContextId int64) (matchedArtifact *pb.Artifact, err error)
- func (c *FakeClient) GetArtifactName(ctx context.Context, artifactId int64) (string, error)
- func (c *FakeClient) GetArtifacts(ctx context.Context, ids []int64) ([]*pb.Artifact, error)
- func (c *FakeClient) GetDAG(ctx context.Context, executionID int64) (*DAG, error)
- func (c *FakeClient) GetEventsByArtifactIDs(ctx context.Context, artifactIds []int64) ([]*pb.Event, error)
- func (c *FakeClient) GetExecution(ctx context.Context, id int64) (*Execution, error)
- func (c *FakeClient) GetExecutions(ctx context.Context, ids []int64) ([]*pb.Execution, error)
- func (c *FakeClient) GetExecutionsInDAG(ctx context.Context, dag *DAG, pipeline *Pipeline, filter bool) (executionsMap map[string]*Execution, err error)
- func (c *FakeClient) GetOrInsertArtifactType(ctx context.Context, schema string) (typeID int64, err error)
- func (c *FakeClient) GetOutputArtifactsByExecutionId(ctx context.Context, executionId int64) (map[string]*OutputArtifact, error)
- func (c *FakeClient) GetPipeline(ctx context.Context, ...) (*Pipeline, error)
- func (c *FakeClient) GetPipelineFromExecution(ctx context.Context, id int64) (*Pipeline, error)
- func (c *FakeClient) PrePublishExecution(ctx context.Context, execution *Execution, config *ExecutionConfig) (*Execution, error)
- func (c *FakeClient) PublishExecution(ctx context.Context, execution *Execution, ...) error
- func (c *FakeClient) PutDAGExecutionState(ctx context.Context, executionID int64, state pb.Execution_State) (err error)
- func (c *FakeClient) RecordArtifact(ctx context.Context, outputName, schema string, ...) (*OutputArtifact, error)
- func (c *FakeClient) UpdateDAGExecutionsState(ctx context.Context, dag *DAG, pipeline *Pipeline) (err error)
- type InputArtifact
- type OutputArtifact
- type Pipeline
- type ServerConfig
Constants ¶
const (
ImporterExecutionTypeName = "system.ImporterExecution"
)
Variables ¶
This section is empty.
Functions ¶
func GenerateOutputURI ¶
GenerateOutputURI appends the specified paths to the pipeline root. It may be configured to preserve the query part of the pipeline root by splitting it off and appending it back to the full URI.
func SchemaToArtifactType ¶
func SchemaToArtifactType(schema string) (*pb.ArtifactType, error)
func StringValue ¶
func TextToPbValue ¶
func TextToPbValue(text string, t pipelinespec.ParameterType_ParameterTypeEnum) (*structpb.Value, error)
func UnmarshalRuntimeArtifact ¶
func UnmarshalRuntimeArtifact(bytes []byte) (*pipelinespec.RuntimeArtifact, error)
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is an MLMD service client.
func (*Client) CreateExecution ¶
func (c *Client) CreateExecution(ctx context.Context, pipeline *Pipeline, config *ExecutionConfig) (*Execution, error)
CreateExecution creates a new MLMD execution under the specified Pipeline.
func (*Client) FindMatchedArtifact ¶
func (*Client) GetArtifactName ¶
func (*Client) GetArtifacts ¶
GetArtifacts ...
func (*Client) GetEventsByArtifactIDs ¶
func (c *Client) GetEventsByArtifactIDs(ctx context.Context, artifactIds []int64) ([]*pb.Event, error)
GetEventsByArtifactIDs ...
func (*Client) GetExecution ¶
func (*Client) GetExecutions ¶
GetExecutions ...
func (*Client) GetExecutionsInDAG ¶
func (c *Client) GetExecutionsInDAG(ctx context.Context, dag *DAG, pipeline *Pipeline, filter bool) (executionsMap map[string]*Execution, err error)
GetExecutionsInDAG gets all executions in the DAG, and organize them into a map, keyed by task name.
func (*Client) GetInputArtifactsByExecutionID ¶
func (c *Client) GetInputArtifactsByExecutionID(ctx context.Context, executionID int64) (inputs map[string]*pipelinespec.ArtifactList, err error)
func (*Client) GetOrInsertArtifactType ¶
func (c *Client) GetOrInsertArtifactType(ctx context.Context, schema string) (typeID int64, err error)
TODO consider batching these requests TODO(lingqinggan): need to create artifact types during initiation, and only allow these types. Currently we allow users to create any artifact type.
func (*Client) GetOutputArtifactsByExecutionId ¶
func (c *Client) GetOutputArtifactsByExecutionId(ctx context.Context, executionId int64) (map[string]*OutputArtifact, error)
GetOutputArtifactsByExecutionId ... TODO: Support multiple artifacts someday, probably through the v2 engine.
func (*Client) GetPipeline ¶
func (c *Client) GetPipeline(ctx context.Context, pipelineName, runID, namespace, runResource, pipelineRoot, storeSessionInfo string) (*Pipeline, error)
GetPipeline returns the current pipeline represented by the specified pipeline name and run ID.
func (*Client) GetPipelineFromExecution ¶
func (*Client) PrePublishExecution ¶
func (c *Client) PrePublishExecution(ctx context.Context, execution *Execution, config *ExecutionConfig) (*Execution, error)
PrePublishExecution updates an existing MLMD execution with Pod info.
func (*Client) PublishExecution ¶
func (c *Client) PublishExecution(ctx context.Context, execution *Execution, outputParameters map[string]*structpb.Value, outputArtifacts []*OutputArtifact, state pb.Execution_State) error
PublishExecution publishes the specified execution with the given output parameters, artifacts and state.
func (*Client) PutDAGExecutionState ¶
func (c *Client) PutDAGExecutionState(ctx context.Context, executionID int64, state pb.Execution_State) error
PutDAGExecutionState updates the given DAG Id to the state provided.
func (*Client) RecordArtifact ¶
func (c *Client) RecordArtifact(ctx context.Context, outputName, schema string, runtimeArtifact *pipelinespec.RuntimeArtifact, state pb.Artifact_State, bucketConfig *objectstore.Config) (*OutputArtifact, error)
RecordArtifact ...
type ClientInterface ¶
type ClientInterface interface { GetPipeline(ctx context.Context, pipelineName, runID, namespace, runResource, pipelineRoot, storeSessionInfo string) (*Pipeline, error) GetDAG(ctx context.Context, executionID int64) (*DAG, error) PublishExecution(ctx context.Context, execution *Execution, outputParameters map[string]*structpb.Value, outputArtifacts []*OutputArtifact, state pb.Execution_State) error CreateExecution(ctx context.Context, pipeline *Pipeline, config *ExecutionConfig) (*Execution, error) PrePublishExecution(ctx context.Context, execution *Execution, config *ExecutionConfig) (*Execution, error) GetExecutions(ctx context.Context, ids []int64) ([]*pb.Execution, error) GetExecution(ctx context.Context, id int64) (*Execution, error) GetPipelineFromExecution(ctx context.Context, id int64) (*Pipeline, error) GetExecutionsInDAG(ctx context.Context, dag *DAG, pipeline *Pipeline, filter bool) (executionsMap map[string]*Execution, err error) UpdateDAGExecutionsState(ctx context.Context, dag *DAG, pipeline *Pipeline) (err error) PutDAGExecutionState(ctx context.Context, executionID int64, state pb.Execution_State) (err error) GetEventsByArtifactIDs(ctx context.Context, artifactIds []int64) ([]*pb.Event, error) GetArtifactName(ctx context.Context, artifactId int64) (string, error) GetArtifacts(ctx context.Context, ids []int64) ([]*pb.Artifact, error) GetOutputArtifactsByExecutionId(ctx context.Context, executionId int64) (map[string]*OutputArtifact, error) RecordArtifact(ctx context.Context, outputName, schema string, runtimeArtifact *pipelinespec.RuntimeArtifact, state pb.Artifact_State, bucketConfig *objectstore.Config) (*OutputArtifact, error) GetOrInsertArtifactType(ctx context.Context, schema string) (typeID int64, err error) FindMatchedArtifact(ctx context.Context, artifactToMatch *pb.Artifact, pipelineContextId int64) (matchedArtifact *pb.Artifact, err error) }
type Execution ¶
type Execution struct {
// contains filtered or unexported fields
}
Execution is a handle for the current execution.
func NewExecution ¶
A hacky way to get Execution from pb.Execution, usually you should get an Execution from this metadata package directly without using ml_metadata.Execution
func (*Execution) FingerPrint ¶
func (*Execution) GetExecution ¶
func (*Execution) GetParameters ¶
func (*Execution) GetPipeline ¶
type ExecutionConfig ¶
type ExecutionConfig struct { TaskName string Name string // optional, MLMD execution name. When provided, this needs to be unique among all MLMD executions. ExecutionType ExecutionType NotTriggered bool // optional, not triggered executions will have CANCELED state. ParentDagID int64 // parent DAG execution ID. Only the root DAG does not have a parent DAG. InputParameters map[string]*structpb.Value OutputParameters map[string]*pipelinespec.DagOutputsSpec_DagOutputParameterSpec OutputArtifacts map[string]*pipelinespec.DagOutputsSpec_DagOutputArtifactSpec InputArtifactIDs map[string][]int64 IterationIndex *int // Index of the iteration. // ContainerExecution custom properties Image, CachedMLMDExecutionID, FingerPrint string PodName, PodUID, Namespace string // DAGExecution custom properties IterationCount *int // Number of iterations for an iterator DAG. }
ExecutionConfig represents the input parameters and artifacts to an Execution.
func GenerateExecutionConfig ¶
func GenerateExecutionConfig(executorInput *pipelinespec.ExecutorInput) (*ExecutionConfig, error)
type ExecutionType ¶
type ExecutionType string
const ( ContainerExecutionTypeName ExecutionType = "system.ContainerExecution" DagExecutionTypeName ExecutionType = "system.DAGExecution" )
type FakeClient ¶
type FakeClient struct { }
func NewFakeClient ¶
func NewFakeClient() *FakeClient
func (*FakeClient) CreateExecution ¶
func (c *FakeClient) CreateExecution(ctx context.Context, pipeline *Pipeline, config *ExecutionConfig) (*Execution, error)
func (*FakeClient) FindMatchedArtifact ¶
func (*FakeClient) GetArtifactName ¶
func (*FakeClient) GetArtifacts ¶
func (*FakeClient) GetEventsByArtifactIDs ¶
func (*FakeClient) GetExecution ¶
func (*FakeClient) GetExecutions ¶
func (*FakeClient) GetExecutionsInDAG ¶
func (*FakeClient) GetOrInsertArtifactType ¶
func (*FakeClient) GetOutputArtifactsByExecutionId ¶
func (c *FakeClient) GetOutputArtifactsByExecutionId(ctx context.Context, executionId int64) (map[string]*OutputArtifact, error)
func (*FakeClient) GetPipeline ¶
func (*FakeClient) GetPipelineFromExecution ¶
func (*FakeClient) PrePublishExecution ¶
func (c *FakeClient) PrePublishExecution(ctx context.Context, execution *Execution, config *ExecutionConfig) (*Execution, error)
func (*FakeClient) PublishExecution ¶
func (c *FakeClient) PublishExecution(ctx context.Context, execution *Execution, outputParameters map[string]*structpb.Value, outputArtifacts []*OutputArtifact, state pb.Execution_State) error
func (*FakeClient) PutDAGExecutionState ¶
func (c *FakeClient) PutDAGExecutionState(ctx context.Context, executionID int64, state pb.Execution_State) (err error)
func (*FakeClient) RecordArtifact ¶
func (c *FakeClient) RecordArtifact(ctx context.Context, outputName, schema string, runtimeArtifact *pipelinespec.RuntimeArtifact, state pb.Artifact_State, bucketConfig *objectstore.Config) (*OutputArtifact, error)
func (*FakeClient) UpdateDAGExecutionsState ¶
type InputArtifact ¶
InputArtifact is a wrapper around an MLMD artifact used as component inputs.
type OutputArtifact ¶
OutputArtifact represents a schema and an MLMD artifact for output artifacts produced by a component.
func (*OutputArtifact) Marshal ¶
func (oa *OutputArtifact) Marshal() ([]byte, error)
func (*OutputArtifact) ToRuntimeArtifact ¶
func (oa *OutputArtifact) ToRuntimeArtifact() (*pipelinespec.RuntimeArtifact, error)
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
Pipeline is a handle for the current pipeline.
func (*Pipeline) GetPipelineRoot ¶
func (*Pipeline) GetRunCtxID ¶
func (*Pipeline) GetStoreSessionInfo ¶
type ServerConfig ¶
func DefaultConfig ¶
func DefaultConfig() *ServerConfig