Documentation ¶
Index ¶
- func DatasetIDToIdentifier(id *datacatalog.DatasetID) *core.Identifier
- func EventCatalogMetadata(datasetID *datacatalog.DatasetID, tag *datacatalog.Tag, ...) *core.CatalogMetadata
- func GenerateArtifactTagName(ctx context.Context, inputs *core.LiteralMap) (string, error)
- func GenerateDatasetIDForTask(ctx context.Context, k catalog.Key) (*datacatalog.DatasetID, error)
- func GenerateTaskOutputsFromArtifact(id core.Identifier, taskInterface core.TypedInterface, ...) (*core.LiteralMap, error)
- func GetArtifactMetadataForSource(taskExecutionID *core.TaskExecutionIdentifier) *datacatalog.Metadata
- func GetDatasetMetadataForSource(taskExecutionID *core.TaskExecutionIdentifier) *datacatalog.Metadata
- func GetOrDefault(m map[string]string, key, defaultValue string) string
- func GetSourceFromMetadata(datasetMd, artifactMd *datacatalog.Metadata, currentID core.Identifier) *core.TaskExecutionIdentifier
- type CatalogClient
- func (m *CatalogClient) CreateArtifact(ctx context.Context, datasetID *datacatalog.DatasetID, ...) (*datacatalog.Artifact, error)
- func (m *CatalogClient) CreateDataset(ctx context.Context, key catalog.Key, metadata *datacatalog.Metadata) (*datacatalog.DatasetID, error)
- func (m *CatalogClient) Get(ctx context.Context, key catalog.Key) (catalog.Entry, error)
- func (m *CatalogClient) GetArtifactByTag(ctx context.Context, tagName string, dataset *datacatalog.Dataset) (*datacatalog.Artifact, error)
- func (m *CatalogClient) GetDataset(ctx context.Context, key catalog.Key) (*datacatalog.Dataset, error)
- func (m *CatalogClient) Put(ctx context.Context, key catalog.Key, reader io.OutputReader, ...) (catalog.Status, error)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DatasetIDToIdentifier ¶ added in v0.7.0
func DatasetIDToIdentifier(id *datacatalog.DatasetID) *core.Identifier
func EventCatalogMetadata ¶ added in v0.7.0
func EventCatalogMetadata(datasetID *datacatalog.DatasetID, tag *datacatalog.Tag, sourceID *core.TaskExecutionIdentifier) *core.CatalogMetadata
Given the Catalog Information (returned from a Catalog call), returns the CatalogMetadata that is populated in the event.
func GenerateArtifactTagName ¶
Generate a tag by hashing the input values
func GenerateDatasetIDForTask ¶
Get the DataSetID for a task. NOTE: the version of the task is a combination of both the discoverable_version and the task signature. This is because the interface may of changed even if the discoverable_version hadn't.
func GenerateTaskOutputsFromArtifact ¶
func GenerateTaskOutputsFromArtifact(id core.Identifier, taskInterface core.TypedInterface, artifact *datacatalog.Artifact) (*core.LiteralMap, error)
Transform the artifact Data into task execution outputs as a literal map
func GetArtifactMetadataForSource ¶ added in v0.7.0
func GetArtifactMetadataForSource(taskExecutionID *core.TaskExecutionIdentifier) *datacatalog.Metadata
func GetDatasetMetadataForSource ¶ added in v0.7.0
func GetDatasetMetadataForSource(taskExecutionID *core.TaskExecutionIdentifier) *datacatalog.Metadata
Understanding Catalog Identifiers DatasetID represents the ID of the dataset. For Flyte this represents the ID of the generating task and the version calculated as the hash of the interface & cache version. refer to `GenerateDatasetIDForTask` TaskID is the same as the DatasetID + name: (DataSetID - namespace) + task version which is stored in the metadata ExecutionID is stored only in the metadata (project and domain available after Jul-2020) NodeExecID = Execution ID + Node ID (available after Jul-2020) TaskExecID is the same as the NodeExecutionID + attempt (attempt is available in Metadata) after Jul-2020
func GetOrDefault ¶ added in v0.7.0
Returns a default value, if the given key is not found in the map, else returns the value in the map
func GetSourceFromMetadata ¶ added in v0.7.0
func GetSourceFromMetadata(datasetMd, artifactMd *datacatalog.Metadata, currentID core.Identifier) *core.TaskExecutionIdentifier
Returns the Source TaskExecutionIdentifier from the catalog metadata For all the information not available it returns Unknown. This is because as of July-2020 Catalog does not have all the information. After the first deployment of this code, it will have this and the "unknown's" can be phased out
Types ¶
type CatalogClient ¶
type CatalogClient struct {
// contains filtered or unexported fields
}
This is the client that caches task executions to DataCatalog service.
func NewDataCatalog ¶
func NewDataCatalog(ctx context.Context, endpoint string, insecureConnection bool, maxCacheAge time.Duration) (*CatalogClient, error)
Create a new Datacatalog client for task execution caching
func (*CatalogClient) CreateArtifact ¶
func (m *CatalogClient) CreateArtifact(ctx context.Context, datasetID *datacatalog.DatasetID, outputs *core.LiteralMap, md *datacatalog.Metadata) (*datacatalog.Artifact, error)
func (*CatalogClient) CreateDataset ¶
func (m *CatalogClient) CreateDataset(ctx context.Context, key catalog.Key, metadata *datacatalog.Metadata) (*datacatalog.DatasetID, error)
func (*CatalogClient) Get ¶
Get the cached task execution from Catalog. These are the steps taken: - Verify there is a Dataset created for the Task - Lookup the Artifact that is tagged with the hash of the input values - The artifactData contains the literal values that serve as the task outputs
func (*CatalogClient) GetArtifactByTag ¶
func (m *CatalogClient) GetArtifactByTag(ctx context.Context, tagName string, dataset *datacatalog.Dataset) (*datacatalog.Artifact, error)
Helper method to retrieve an artifact by the tag
func (*CatalogClient) GetDataset ¶
func (m *CatalogClient) GetDataset(ctx context.Context, key catalog.Key) (*datacatalog.Dataset, error)
Helper method to retrieve a dataset that is associated with the task
func (*CatalogClient) Put ¶
func (m *CatalogClient) Put(ctx context.Context, key catalog.Key, reader io.OutputReader, metadata catalog.Metadata) (catalog.Status, error)
Catalog the task execution as a cached Artifact. We associate an Artifact as the cached data by tagging the Artifact with the hash of the input values.
The steps taken to cache an execution: - Ensure a Dataset exists for the Artifact. The Dataset represents the proj/domain/name/version of the task - Create an Artifact with the execution data that belongs to the dataset - Tag the Artifact with a hash generated by the input values