Documentation ¶
Index ¶
- Constants
- func HashLiteralMap(ctx context.Context, literalMap *core.LiteralMap) (string, error)
- func IsNotFound(err error) bool
- func NewWriterProcessor(catalogClient Client) workqueue.Processor
- type AsyncClient
- type AsyncClientImpl
- type Client
- type Config
- type DownloadFuture
- type DownloadRequest
- type DownloadResponse
- type Entry
- type Future
- type Key
- type Metadata
- type ReaderProcessor
- type ReaderWorkItem
- type ReadyHandler
- type ReservationEntry
- type ResponseStatus
- type Status
- type UploadFuture
- type UploadRequest
- type WriterWorkItem
Constants ¶
const ( ErrResponseNotReady errors.ErrorCode = "RESPONSE_NOT_READY" ErrSystemError errors.ErrorCode = "SYSTEM_ERROR" )
Variables ¶
This section is empty.
Functions ¶
func HashLiteralMap ¶
func IsNotFound ¶
func NewWriterProcessor ¶
Types ¶
type AsyncClient ¶
type AsyncClient interface { // Returns if an entry exists for the given task and input. It returns the data as a LiteralMap Download(ctx context.Context, requests ...DownloadRequest) (outputFuture DownloadFuture, err error) // Adds a new entry to catalog for the given task execution context and the generated output Upload(ctx context.Context, requests ...UploadRequest) (putFuture UploadFuture, err error) }
An interface that helps async interaction with catalog service
type AsyncClientImpl ¶
type AsyncClientImpl struct { Reader workqueue.IndexedWorkQueue Writer workqueue.IndexedWorkQueue }
An async-client for catalog that can queue download and upload requests on workqueues.
func NewAsyncClient ¶
func (AsyncClientImpl) Download ¶
func (c AsyncClientImpl) Download(ctx context.Context, requests ...DownloadRequest) (outputFuture DownloadFuture, err error)
func (AsyncClientImpl) Upload ¶
func (c AsyncClientImpl) Upload(ctx context.Context, requests ...UploadRequest) (putFuture UploadFuture, err error)
type Client ¶
type Client interface { // Get returns the artifact associated with the given key. Get(ctx context.Context, key Key) (Entry, error) // GetOrExtendReservation tries to retrieve a (valid) reservation for the given key, creating a new one using the // specified owner ID if none was found or updating an existing one if it has expired. GetOrExtendReservation(ctx context.Context, key Key, ownerID string, heartbeatInterval time.Duration) (*datacatalog.Reservation, error) // Put stores the given data using the specified key, creating artifact entries as required. // To update an existing artifact, use Update instead. Put(ctx context.Context, key Key, reader io.OutputReader, metadata Metadata) (Status, error) // Update updates existing data stored at the specified key, overwriting artifact entries with the new data provided. // To create a new (non-existent) artifact, use Put instead. Update(ctx context.Context, key Key, reader io.OutputReader, metadata Metadata) (Status, error) // ReleaseReservation releases an acquired reservation for the given key and owner ID. ReleaseReservation(ctx context.Context, key Key, ownerID string) error }
Client represents the default Catalog client that allows memoization and indexing of intermediate data in Nebula
type Config ¶
type DownloadFuture ¶
type DownloadFuture interface { Future // Gets the actual response from the future. This will return an error if the future isn't ready yet. GetResponse() (DownloadResponse, error) }
Catalog download future to represent async process of downloading catalog artifacts.
type DownloadRequest ¶
type DownloadRequest struct { Key Key Target io.OutputWriter }
Catalog Download Request to represent async operation download request.
type DownloadResponse ¶
type DownloadResponse interface { // Gets a bit set representing which items from the request were cached. GetCachedResults() *bitarray.BitSet // Gets the total size of the cached result. GetResultsSize() int // A convenience method to retrieve the number of cached items. GetCachedCount() int }
Catalog download response.
type Entry ¶
type Entry struct {
// contains filtered or unexported fields
}
Indicates the Entry in Catalog that was populated
func NewCatalogEntry ¶
func NewCatalogEntry(outputs io.OutputReader, status Status) Entry
func NewFailedCatalogEntry ¶
func (Entry) GetOutputs ¶
func (e Entry) GetOutputs() io.OutputReader
type Future ¶
type Future interface { // Gets the response status for the future. If the future represents multiple operations, the status will only be // ready if all of them are. GetResponseStatus() ResponseStatus // Sets a callback handler to be called when the future status changes to ready. OnReady(handler ReadyHandler) GetResponseError() error }
A generic Future interface to represent async operations results
type Key ¶
type Key struct { Identifier core.Identifier CacheVersion string TypedInterface core.TypedInterface InputReader io.InputReader }
An identifier for a catalog object.
type Metadata ¶
type Metadata struct { WorkflowExecutionIdentifier *core.WorkflowExecutionIdentifier NodeExecutionIdentifier *core.NodeExecutionIdentifier TaskExecutionIdentifier *core.TaskExecutionIdentifier }
Metadata to be associated with the catalog object
type ReaderProcessor ¶
type ReaderProcessor struct {
// contains filtered or unexported fields
}
func NewReaderProcessor ¶
func NewReaderProcessor(catalogClient Client) ReaderProcessor
func (ReaderProcessor) Process ¶
func (p ReaderProcessor) Process(ctx context.Context, workItem workqueue.WorkItem) (workqueue.WorkStatus, error)
type ReaderWorkItem ¶
type ReaderWorkItem struct {
// contains filtered or unexported fields
}
func NewReaderWorkItem ¶
func NewReaderWorkItem(key Key, outputsWriter io.OutputWriter) *ReaderWorkItem
func (ReaderWorkItem) IsCached ¶
func (item ReaderWorkItem) IsCached() bool
type ReadyHandler ¶
type ReservationEntry ¶
type ReservationEntry struct {
// contains filtered or unexported fields
}
ReservationEntry encapsulates the current state of an artifact reservation within the catalog
func NewReservationEntry ¶
func NewReservationEntry(expiresAt time.Time, heartbeatInterval time.Duration, ownerID string, status core.CatalogReservation_Status) ReservationEntry
Creates a new ReservationEntry populated with the specified parameters
func NewReservationEntryStatus ¶
func NewReservationEntryStatus(status core.CatalogReservation_Status) ReservationEntry
Creates a new ReservationEntry using the status, all other fields are set to default values
func (ReservationEntry) GetExpiresAt ¶
func (r ReservationEntry) GetExpiresAt() time.Time
Returns the expiration timestamp at which the reservation will no longer be valid
func (ReservationEntry) GetHeartbeatInterval ¶
func (r ReservationEntry) GetHeartbeatInterval() time.Duration
Returns the heartbeat interval, denoting how often the catalog expects a reservation extension request
func (ReservationEntry) GetOwnerID ¶
func (r ReservationEntry) GetOwnerID() string
Returns the ID of the current reservation owner
func (ReservationEntry) GetStatus ¶
func (r ReservationEntry) GetStatus() core.CatalogReservation_Status
Returns the status of the attempted reservation operation
type ResponseStatus ¶
type ResponseStatus uint8
const ( ResponseStatusNotReady ResponseStatus = iota ResponseStatusReady )
type Status ¶
type Status struct {
// contains filtered or unexported fields
}
Indicates that status of the query to Catalog. This can be returned for both Get and Put calls
func NewStatus ¶
func NewStatus(cacheStatus core.CatalogCacheStatus, md *core.CatalogMetadata) Status
func (Status) GetCacheStatus ¶
func (s Status) GetCacheStatus() core.CatalogCacheStatus
func (Status) GetMetadata ¶
func (s Status) GetMetadata() *core.CatalogMetadata
type UploadFuture ¶
type UploadFuture interface { Future }
Catalog Sidecar future to represent async process of uploading catalog artifacts.
type UploadRequest ¶
type UploadRequest struct { Key Key ArtifactData io.OutputReader ArtifactMetadata Metadata }
type WriterWorkItem ¶
type WriterWorkItem struct {
// contains filtered or unexported fields
}
func NewWriterWorkItem ¶
func NewWriterWorkItem(key Key, data io.OutputReader, metadata Metadata) *WriterWorkItem