datacatalog

package
v0.13.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 20, 2021 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DatasetIDToIdentifier added in v0.7.0

func DatasetIDToIdentifier(id *datacatalog.DatasetID) *core.Identifier

func EventCatalogMetadata added in v0.7.0

func EventCatalogMetadata(datasetID *datacatalog.DatasetID, tag *datacatalog.Tag, sourceID *core.TaskExecutionIdentifier) *core.CatalogMetadata

Given the Catalog Information (returned from a Catalog call), returns the CatalogMetadata that is populated in the event.

func GenerateArtifactTagName

func GenerateArtifactTagName(ctx context.Context, inputs *core.LiteralMap) (string, error)

Generate a tag by hashing the input values

func GenerateDatasetIDForTask

func GenerateDatasetIDForTask(ctx context.Context, k catalog.Key) (*datacatalog.DatasetID, error)

Get the DataSetID for a task. NOTE: the version of the task is a combination of both the discoverable_version and the task signature. This is because the interface may of changed even if the discoverable_version hadn't.

func GenerateTaskOutputsFromArtifact

func GenerateTaskOutputsFromArtifact(id core.Identifier, taskInterface core.TypedInterface, artifact *datacatalog.Artifact) (*core.LiteralMap, error)

Transform the artifact Data into task execution outputs as a literal map

func GetArtifactMetadataForSource added in v0.7.0

func GetArtifactMetadataForSource(taskExecutionID *core.TaskExecutionIdentifier) *datacatalog.Metadata

func GetDatasetMetadataForSource added in v0.7.0

func GetDatasetMetadataForSource(taskExecutionID *core.TaskExecutionIdentifier) *datacatalog.Metadata

Understanding Catalog Identifiers DatasetID represents the ID of the dataset. For Flyte this represents the ID of the generating task and the version calculated as the hash of the interface & cache version. refer to `GenerateDatasetIDForTask` TaskID is the same as the DatasetID + name: (DataSetID - namespace) + task version which is stored in the metadata ExecutionID is stored only in the metadata (project and domain available after Jul-2020) NodeExecID = Execution ID + Node ID (available after Jul-2020) TaskExecID is the same as the NodeExecutionID + attempt (attempt is available in Metadata) after Jul-2020

func GetOrDefault added in v0.7.0

func GetOrDefault(m map[string]string, key, defaultValue string) string

Returns a default value, if the given key is not found in the map, else returns the value in the map

func GetSourceFromMetadata added in v0.7.0

func GetSourceFromMetadata(datasetMd, artifactMd *datacatalog.Metadata, currentID core.Identifier) *core.TaskExecutionIdentifier

Returns the Source TaskExecutionIdentifier from the catalog metadata For all the information not available it returns Unknown. This is because as of July-2020 Catalog does not have all the information. After the first deployment of this code, it will have this and the "unknown's" can be phased out

Types

type CatalogClient

type CatalogClient struct {
	// contains filtered or unexported fields
}

This is the client that caches task executions to DataCatalog service.

func NewDataCatalog

func NewDataCatalog(ctx context.Context, endpoint string, insecureConnection bool, maxCacheAge time.Duration) (*CatalogClient, error)

Create a new Datacatalog client for task execution caching

func (*CatalogClient) CreateArtifact

func (m *CatalogClient) CreateArtifact(ctx context.Context, datasetID *datacatalog.DatasetID, outputs *core.LiteralMap, md *datacatalog.Metadata) (*datacatalog.Artifact, error)

func (*CatalogClient) CreateDataset

func (m *CatalogClient) CreateDataset(ctx context.Context, key catalog.Key, metadata *datacatalog.Metadata) (*datacatalog.DatasetID, error)

func (*CatalogClient) Get

func (m *CatalogClient) Get(ctx context.Context, key catalog.Key) (catalog.Entry, error)

Get the cached task execution from Catalog. These are the steps taken: - Verify there is a Dataset created for the Task - Lookup the Artifact that is tagged with the hash of the input values - The artifactData contains the literal values that serve as the task outputs

func (*CatalogClient) GetArtifactByTag

func (m *CatalogClient) GetArtifactByTag(ctx context.Context, tagName string, dataset *datacatalog.Dataset) (*datacatalog.Artifact, error)

Helper method to retrieve an artifact by the tag

func (*CatalogClient) GetDataset

func (m *CatalogClient) GetDataset(ctx context.Context, key catalog.Key) (*datacatalog.Dataset, error)

Helper method to retrieve a dataset that is associated with the task

func (*CatalogClient) Put

func (m *CatalogClient) Put(ctx context.Context, key catalog.Key, reader io.OutputReader, metadata catalog.Metadata) (catalog.Status, error)

Catalog the task execution as a cached Artifact. We associate an Artifact as the cached data by tagging the Artifact with the hash of the input values.

The steps taken to cache an execution: - Ensure a Dataset exists for the Artifact. The Dataset represents the proj/domain/name/version of the task - Create an Artifact with the execution data that belongs to the dataset - Tag the Artifact with a hash generated by the input values

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL