Documentation ¶
Index ¶
- Constants
- func CreateMockTaskContextWithRealTaskExecId() *mocks.TaskContext
- func QuboleWorkIsTerminalState(status QuboleWorkItemStatus) bool
- type HiveExecutor
- func (h HiveExecutor) CheckTaskStatus(ctx context.Context, taskCtx types.TaskContext, _ *core.TaskTemplate) (types.TaskStatus, error)
- func (h HiveExecutor) GetID() types.TaskExecutorName
- func (h HiveExecutor) GetProperties() types.ExecutorProperties
- func (h *HiveExecutor) Initialize(ctx context.Context, param types.ExecutorInitializationParameters) error
- func (h HiveExecutor) KillTask(ctx context.Context, taskCtx types.TaskContext, reason string) error
- func (h HiveExecutor) StartTask(ctx context.Context, taskCtx types.TaskContext, task *core.TaskTemplate, ...) (types.TaskStatus, error)
- func (h *HiveExecutor) SyncQuboleQuery(ctx context.Context, obj utils2.CacheItem) (utils2.CacheItem, utils2.CacheSyncAction, error)
- func (h HiveExecutor) TranslateCurrentState(state map[string]interface{}) types.TaskStatus
- type HiveExecutorMetrics
- type MockAutoRefreshCache
- type MockSecretsManager
- type QuboleWorkItem
- type QuboleWorkItemStatus
- type SecretsManager
Constants ¶
const ResyncDuration = 30 * time.Second
Variables ¶
This section is empty.
Functions ¶
func CreateMockTaskContextWithRealTaskExecId ¶
func CreateMockTaskContextWithRealTaskExecId() *mocks.TaskContext
func QuboleWorkIsTerminalState ¶
func QuboleWorkIsTerminalState(status QuboleWorkItemStatus) bool
Types ¶
type HiveExecutor ¶
type HiveExecutor struct { types.OutputsResolver // contains filtered or unexported fields }
func NewHiveTaskExecutorWithCache ¶
func NewHiveTaskExecutorWithCache(ctx context.Context) (*HiveExecutor, error)
func (HiveExecutor) CheckTaskStatus ¶
func (h HiveExecutor) CheckTaskStatus(ctx context.Context, taskCtx types.TaskContext, _ *core.TaskTemplate) ( types.TaskStatus, error)
func (HiveExecutor) GetID ¶
func (h HiveExecutor) GetID() types.TaskExecutorName
func (HiveExecutor) GetProperties ¶
func (h HiveExecutor) GetProperties() types.ExecutorProperties
func (*HiveExecutor) Initialize ¶
func (h *HiveExecutor) Initialize(ctx context.Context, param types.ExecutorInitializationParameters) error
This runs once, after the constructor (since the constructor is called in the package init)
func (HiveExecutor) KillTask ¶
func (h HiveExecutor) KillTask(ctx context.Context, taskCtx types.TaskContext, reason string) error
Loop through all the queries in the task, if there are any in a non-terminal state, then submit the request to terminate the Qubole query. If there are any problems with anything, then return an error
func (HiveExecutor) StartTask ¶
func (h HiveExecutor) StartTask(ctx context.Context, taskCtx types.TaskContext, task *core.TaskTemplate, inputs *core.LiteralMap) (types.TaskStatus, error)
This function is only ever called once, assuming it doesn't return in error. Essentially, what this function does is translate the task's custom field into the TaskContext's CustomState that's stored back into etcd
func (*HiveExecutor) SyncQuboleQuery ¶
func (h *HiveExecutor) SyncQuboleQuery(ctx context.Context, obj utils2.CacheItem) ( utils2.CacheItem, utils2.CacheSyncAction, error)
This should do minimal work - basically grab an updated status from the Qubole API and store it in the cache All other handling should be in the synchronous loop.
func (HiveExecutor) TranslateCurrentState ¶
func (h HiveExecutor) TranslateCurrentState(state map[string]interface{}) types.TaskStatus
This translates a series of QuboleWorkItem statuses into what it means for the task as a whole
type HiveExecutorMetrics ¶
type MockAutoRefreshCache ¶
type MockAutoRefreshCache struct { *quboleMocks.AutoRefreshCache // contains filtered or unexported fields }
func NewMockAutoRefreshCache ¶
func NewMockAutoRefreshCache() MockAutoRefreshCache
func (MockAutoRefreshCache) GetOrCreate ¶
type MockSecretsManager ¶
type MockSecretsManager struct { }
func (MockSecretsManager) GetToken ¶
func (m MockSecretsManager) GetToken() (string, error)
type QuboleWorkItem ¶
type QuboleWorkItem struct { // This ID is the cache key and so will need to be unique across all objects in the cache (it will probably be // unique across all of Flyte) and needs to be deterministic. // This will also be used as the allocation token for now. UniqueWorkCacheKey string `json:"unique_work_cache_key"` // This will store the command ID from Qubole CommandId string `json:"command_id,omitempty"` // Our representation of the status of this work item Status QuboleWorkItemStatus `json:"status,omitempty"` // The Qubole cluster to do this work ClusterLabel string `json:"cluster_label,omitempty"` // These are Qubole Tags that show up on their UI Tags []string `json:"tags,omitempty"` // This number keeps track of the number of retries within the sync function. Without this, what happens in // the sync function is entirely opaque. Note that this field is not meant to represent the number of retries // of the work itself, just errors with the Qubole API when attempting to sync Retries int `json:"retries,omitempty"` // For Hive jobs, this is the query that will be run // Not necessary for other Qubole task types necessarily Query string `json:"query,omitempty"` TimeoutSec uint32 `json:"timeout,omitempty"` }
This struct is supposed to represent all the details of one query/unit of work on Qubole. For instance, a user's @qubole_hive_task will get unpacked to one of these for each query contained in the task. It is intentionally vaguely named, in an effort to potentially support extensibility to other things Qubole is capable of executing in the future. Retries and Status are the only two fields that should get changed.
func InterfaceConverter ¶
func InterfaceConverter(cachedInterface interface{}) (QuboleWorkItem, error)
func NewQuboleWorkItem ¶
func NewQuboleWorkItem(uniqueWorkCacheKey string, quboleCommandId string, status QuboleWorkItemStatus, clusterLabel string, tags []string, retries int) QuboleWorkItem
func (QuboleWorkItem) EqualTo ¶
func (q QuboleWorkItem) EqualTo(other QuboleWorkItem) bool
func (QuboleWorkItem) ID ¶
func (q QuboleWorkItem) ID() string
This ID will be used in a process-wide cache, so it needs to be unique across all concurrent work being done by that process, but does not necessarily need to be universally unique
type QuboleWorkItemStatus ¶
type QuboleWorkItemStatus int
This status encapsulates all possible states for our custom object. It is different from the QuboleStatus type in that this is our Flyte type. It represents the same thing as QuboleStatus, but will actually persist in etcd. It is also different from the TaskStatuses in that this is on the qubole job level, not the task level. A task, can contain many queries/spark jobs, etc.
const ( QuboleWorkNotStarted QuboleWorkItemStatus = iota QuboleWorkUnknown QuboleWorkRunnable QuboleWorkRunning QuboleWorkExecutionFailed QuboleWorkExecutionSucceeded QuboleWorkFailed QuboleWorkSucceeded )
func QuboleStatusToWorkItemStatus ¶
func QuboleStatusToWorkItemStatus(s client.QuboleStatus) QuboleWorkItemStatus
func (QuboleWorkItemStatus) String ¶
func (q QuboleWorkItemStatus) String() string
type SecretsManager ¶
func NewSecretsManager ¶
func NewSecretsManager() SecretsManager