Documentation ¶
Index ¶
- func DatasetIDToIdentifier(id *datacatalog.DatasetID) *core.Identifier
- func EventCatalogMetadata(datasetID *datacatalog.DatasetID, tag *datacatalog.Tag, ...) *core.CatalogMetadata
- func GenerateArtifactTagName(ctx context.Context, inputs *core.LiteralMap, cacheIgnoreInputVars []string) (string, error)
- func GenerateDatasetIDForTask(ctx context.Context, k catalog.Key) (*datacatalog.DatasetID, error)
- func GenerateTaskOutputsFromArtifact(id core.Identifier, taskInterface core.TypedInterface, ...) (*core.LiteralMap, error)
- func GetArtifactMetadataForSource(taskExecutionID *core.TaskExecutionIdentifier) *datacatalog.Metadata
- func GetDatasetMetadataForSource(taskExecutionID *core.TaskExecutionIdentifier) *datacatalog.Metadata
- func GetOrDefault(m map[string]string, key, defaultValue string) string
- func GetSourceFromMetadata(datasetMd, artifactMd *datacatalog.Metadata, currentID core.Identifier) (*core.TaskExecutionIdentifier, error)
- type CatalogClient
- func (m *CatalogClient) CreateArtifact(ctx context.Context, key catalog.Key, datasetID *datacatalog.DatasetID, ...) (catalog.Status, error)
- func (m *CatalogClient) CreateDataset(ctx context.Context, key catalog.Key, metadata *datacatalog.Metadata) (*datacatalog.DatasetID, error)
- func (m *CatalogClient) Get(ctx context.Context, key catalog.Key) (catalog.Entry, error)
- func (m *CatalogClient) GetArtifactByTag(ctx context.Context, tagName string, dataset *datacatalog.Dataset) (*datacatalog.Artifact, error)
- func (m *CatalogClient) GetDataset(ctx context.Context, key catalog.Key) (*datacatalog.Dataset, error)
- func (m *CatalogClient) GetOrExtendReservation(ctx context.Context, key catalog.Key, ownerID string, ...) (*datacatalog.Reservation, error)
- func (m *CatalogClient) Put(ctx context.Context, key catalog.Key, reader io.OutputReader, ...) (catalog.Status, error)
- func (m *CatalogClient) ReleaseReservation(ctx context.Context, key catalog.Key, ownerID string) error
- func (m *CatalogClient) Update(ctx context.Context, key catalog.Key, reader io.OutputReader, ...) (catalog.Status, error)
- func (m *CatalogClient) UpdateArtifact(ctx context.Context, key catalog.Key, datasetID *datacatalog.DatasetID, ...) (catalog.Status, error)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DatasetIDToIdentifier ¶
func DatasetIDToIdentifier(id *datacatalog.DatasetID) *core.Identifier
func EventCatalogMetadata ¶
func EventCatalogMetadata(datasetID *datacatalog.DatasetID, tag *datacatalog.Tag, sourceID *core.TaskExecutionIdentifier) *core.CatalogMetadata
Given the Catalog Information (returned from a Catalog call), returns the CatalogMetadata that is populated in the event.
func GenerateArtifactTagName ¶
func GenerateArtifactTagName(ctx context.Context, inputs *core.LiteralMap, cacheIgnoreInputVars []string) (string, error)
Generate a tag by hashing the input values which are not in cacheIgnoreInputVars
func GenerateDatasetIDForTask ¶
Get the DataSetID for a task. NOTE: the version of the task is a combination of both the discoverable_version and the task signature. This is because the interface may of changed even if the discoverable_version hadn't.
func GenerateTaskOutputsFromArtifact ¶
func GenerateTaskOutputsFromArtifact(id core.Identifier, taskInterface core.TypedInterface, artifact *datacatalog.Artifact) (*core.LiteralMap, error)
Transform the artifact Data into task execution outputs as a literal map
func GetArtifactMetadataForSource ¶
func GetArtifactMetadataForSource(taskExecutionID *core.TaskExecutionIdentifier) *datacatalog.Metadata
func GetDatasetMetadataForSource ¶
func GetDatasetMetadataForSource(taskExecutionID *core.TaskExecutionIdentifier) *datacatalog.Metadata
Understanding Catalog Identifiers DatasetID represents the ID of the dataset. For Kozmo this represents the ID of the generating task and the version calculated as the hash of the interface & cache version. refer to `GenerateDatasetIDForTask` TaskID is the same as the DatasetID + name: (DataSetID - namespace) + task version which is stored in the metadata ExecutionID is stored only in the metadata (project and domain available after Jul-2020) NodeExecID = Execution ID + Node ID (available after Jul-2020) TaskExecID is the same as the NodeExecutionID + attempt (attempt is available in Metadata) after Jul-2020
func GetOrDefault ¶
Returns a default value, if the given key is not found in the map, else returns the value in the map
func GetSourceFromMetadata ¶
func GetSourceFromMetadata(datasetMd, artifactMd *datacatalog.Metadata, currentID core.Identifier) (*core.TaskExecutionIdentifier, error)
GetSourceFromMetadata returns the Source TaskExecutionIdentifier from the catalog metadata For all the information not available it returns Unknown. This is because as of July-2020 Catalog does not have all the information. After the first deployment of this code, it will have this and the "unknown's" can be phased out
Types ¶
type CatalogClient ¶
type CatalogClient struct {
// contains filtered or unexported fields
}
CatalogClient is the client that caches task executions to DataCatalog service.
func NewDataCatalog ¶
func NewDataCatalog(ctx context.Context, endpoint string, insecureConnection bool, maxCacheAge time.Duration, useAdminAuth bool, defaultServiceConfig string, authOpt ...grpc.DialOption) (*CatalogClient, error)
NewDataCatalog creates a new Datacatalog client for task execution caching
func (*CatalogClient) CreateArtifact ¶
func (m *CatalogClient) CreateArtifact(ctx context.Context, key catalog.Key, datasetID *datacatalog.DatasetID, inputs *core.LiteralMap, outputs *core.LiteralMap, metadata catalog.Metadata) (catalog.Status, error)
CreateArtifact creates an Artifact in datacatalog including its associated ArtifactData and tags it with a hash of the provided input values for retrieval.
func (*CatalogClient) CreateDataset ¶
func (m *CatalogClient) CreateDataset(ctx context.Context, key catalog.Key, metadata *datacatalog.Metadata) (*datacatalog.DatasetID, error)
CreateDataset creates a Dataset in datacatalog including the associated metadata.
func (*CatalogClient) Get ¶
Get the cached task execution from Catalog. These are the steps taken: - Verify there is a Dataset created for the Task - Lookup the Artifact that is tagged with the hash of the input values - The artifactData contains the literal values that serve as the task outputs
func (*CatalogClient) GetArtifactByTag ¶
func (m *CatalogClient) GetArtifactByTag(ctx context.Context, tagName string, dataset *datacatalog.Dataset) (*datacatalog.Artifact, error)
GetArtifactByTag retrieves an artifact using the provided tag and dataset.
func (*CatalogClient) GetDataset ¶
func (m *CatalogClient) GetDataset(ctx context.Context, key catalog.Key) (*datacatalog.Dataset, error)
GetDataset retrieves a dataset that is associated with the task represented by the provided catalog.Key.
func (*CatalogClient) GetOrExtendReservation ¶
func (m *CatalogClient) GetOrExtendReservation(ctx context.Context, key catalog.Key, ownerID string, heartbeatInterval time.Duration) (*datacatalog.Reservation, error)
GetOrExtendReservation attempts to get a reservation for the cacheable task. If you have previously acquired a reservation it will be extended. If another entity holds the reservation that is returned.
func (*CatalogClient) Put ¶
func (m *CatalogClient) Put(ctx context.Context, key catalog.Key, reader io.OutputReader, metadata catalog.Metadata) (catalog.Status, error)
Put stores the result of a task execution as a cached Artifact and associates it with the data by tagging it with the hash of the input values. The CatalogClient will ensure a dataset exists for the Artifact to be created. A Dataset represents the project/domain/name/version of the task executed. Lastly, CatalogClient will create an Artifact tagged with the input value hash and store the provided execution data.
func (*CatalogClient) ReleaseReservation ¶
func (m *CatalogClient) ReleaseReservation(ctx context.Context, key catalog.Key, ownerID string) error
ReleaseReservation attempts to release a reservation for a cacheable task. If the reservation does not exist (e.x. it never existed or has been acquired by another owner) then this call still succeeds.
func (*CatalogClient) Update ¶
func (m *CatalogClient) Update(ctx context.Context, key catalog.Key, reader io.OutputReader, metadata catalog.Metadata) (catalog.Status, error)
Update stores the result of a task execution as a cached Artifact, overwriting any already stored data from a previous execution. The CatalogClient will ensure the referenced dataset exists and will silently create a new Artifact if the referenced key does not exist in datacatalog yet. After the operation succeeds, an artifact with the given key and data will be stored in catalog and a tag with the has of the input values will exist.
func (*CatalogClient) UpdateArtifact ¶
func (m *CatalogClient) UpdateArtifact(ctx context.Context, key catalog.Key, datasetID *datacatalog.DatasetID, inputs *core.LiteralMap, outputs *core.LiteralMap, metadata catalog.Metadata) (catalog.Status, error)
UpdateArtifact overwrites the ArtifactData of an existing artifact with the provided data in datacatalog.