Documentation ¶
Index ¶
- Constants
- func Abort(ctx context.Context, tCtx core.TaskExecutionContext, ...) error
- func BuildResourceConfig(cfg []config.ClusterConfig) map[core.ResourceNamespace]int
- func ConstructTaskInfo(e ExecutionState) *core.TaskInfo
- func ConstructTaskLog(e ExecutionState) *idlCore.TaskLog
- func Finalize(ctx context.Context, tCtx core.TaskExecutionContext, _ ExecutionState, ...) error
- func GetMockTaskExecutionContext() core.TaskExecutionContext
- func GetMockTaskExecutionMetadata() core.TaskExecutionMetadata
- func GetQueryInfo(ctx context.Context, tCtx core.TaskExecutionContext) (formattedQuery string, cluster string, tags []string, timeoutSec uint32, ...)
- func GetSingleHiveQueryTaskTemplate() idlCore.TaskTemplate
- func InTerminalState(e ExecutionState) bool
- func InitializeHiveExecutor(ctx context.Context, iCtx core.SetupContext, cfg *config.Config, ...) (core.Plugin, error)
- func IsNotYetSubmitted(e ExecutionState) bool
- func MapExecutionStateToPhaseInfo(state ExecutionState, _ client.QuboleClient) core.PhaseInfo
- func QuboleHiveExecutorLoader(ctx context.Context, iCtx core.SetupContext) (core.Plugin, error)
- type ExecutionPhase
- type ExecutionState
- func GetAllocationToken(ctx context.Context, tCtx core.TaskExecutionContext, ...) (ExecutionState, error)
- func HandleExecutionState(ctx context.Context, tCtx core.TaskExecutionContext, ...) (ExecutionState, error)
- func KickOffQuery(ctx context.Context, tCtx core.TaskExecutionContext, ...) (ExecutionState, error)
- func MonitorQuery(ctx context.Context, tCtx core.TaskExecutionContext, ...) (ExecutionState, error)
- func WriteOutputs(ctx context.Context, tCtx core.TaskExecutionContext, ...) (ExecutionState, error)
- type ExecutionStateCacheItem
- type QuboleHiveExecutionsCache
- type QuboleHiveExecutor
- func (q QuboleHiveExecutor) Abort(ctx context.Context, tCtx core.TaskExecutionContext) error
- func (q QuboleHiveExecutor) Finalize(ctx context.Context, tCtx core.TaskExecutionContext) error
- func (q QuboleHiveExecutor) GetID() string
- func (q QuboleHiveExecutor) GetProperties() core.PluginProperties
- func (q QuboleHiveExecutor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (core.Transition, error)
- type QuboleHiveExecutorMetrics
Constants ¶
View Source
const (
BadQuboleReturnCodeError stdErrors.ErrorCode = "QUBOLE_RETURNED_UNKNOWN"
)
View Source
const DefaultClusterPrimaryLabel = "default"
View Source
const ResyncDuration = 30 * time.Second
Variables ¶
This section is empty.
Functions ¶
func Abort ¶
func Abort(ctx context.Context, tCtx core.TaskExecutionContext, currentState ExecutionState, qubole client.QuboleClient, apiKey string) error
func BuildResourceConfig ¶
func BuildResourceConfig(cfg []config.ClusterConfig) map[core.ResourceNamespace]int
func ConstructTaskInfo ¶
func ConstructTaskInfo(e ExecutionState) *core.TaskInfo
func ConstructTaskLog ¶
func ConstructTaskLog(e ExecutionState) *idlCore.TaskLog
func Finalize ¶
func Finalize(ctx context.Context, tCtx core.TaskExecutionContext, _ ExecutionState, metrics QuboleHiveExecutorMetrics) error
func GetMockTaskExecutionContext ¶
func GetMockTaskExecutionContext() core.TaskExecutionContext
func GetMockTaskExecutionMetadata ¶
func GetMockTaskExecutionMetadata() core.TaskExecutionMetadata
func GetQueryInfo ¶
func GetQueryInfo(ctx context.Context, tCtx core.TaskExecutionContext) ( formattedQuery string, cluster string, tags []string, timeoutSec uint32, taskName string, err error)
This function is the link between the output written by the SDK, and the execution side. It extracts the query out of the task template.
func GetSingleHiveQueryTaskTemplate ¶
func GetSingleHiveQueryTaskTemplate() idlCore.TaskTemplate
func InTerminalState ¶
func InTerminalState(e ExecutionState) bool
func InitializeHiveExecutor ¶
func InitializeHiveExecutor(ctx context.Context, iCtx core.SetupContext, cfg *config.Config, resourceConfig map[core.ResourceNamespace]int, quboleClient client.QuboleClient) (core.Plugin, error)
func IsNotYetSubmitted ¶
func IsNotYetSubmitted(e ExecutionState) bool
func MapExecutionStateToPhaseInfo ¶
func MapExecutionStateToPhaseInfo(state ExecutionState, _ client.QuboleClient) core.PhaseInfo
Types ¶
type ExecutionPhase ¶
type ExecutionPhase int
const ( PhaseNotStarted ExecutionPhase = iota PhaseQueued // resource manager token gotten PhaseSubmitted // Sent off to Qubole PhaseWriteOutputFile PhaseQuerySucceeded PhaseQueryFailed )
func QuboleStatusToExecutionPhase ¶
func QuboleStatusToExecutionPhase(s client.QuboleStatus) (ExecutionPhase, error)
We need some way to translate results we get from Qubole, into a plugin phase NB: This function should only return plugin phases that are greater than (">") phases that represent states before
the query was kicked off. That is, it will never make sense to go back to PhaseNotStarted, after we've submitted the query to Qubole.
func (ExecutionPhase) IsTerminal ¶
func (p ExecutionPhase) IsTerminal() bool
func (ExecutionPhase) String ¶
func (p ExecutionPhase) String() string
type ExecutionState ¶
type ExecutionState struct { Phase ExecutionPhase // This will store the command ID from Qubole CommandID string `json:"command_id,omitempty"` URI string `json:"uri,omitempty"` // This number keeps track of the number of failures within the sync function. Without this, what happens in // the sync function is entirely opaque. Note that this field is completely orthogonal to Kozmo system/node/task // level retries, just errors from hitting the Qubole API, inside the sync loop SyncFailureCount int `json:"sync_failure_count,omitempty"` // In kicking off the Qubole command, this is the number of failures CreationFailureCount int `json:"creation_failure_count,omitempty"` // The time the execution first requests for an allocation token AllocationTokenRequestStartTime time.Time `json:"allocation_token_request_start_time,omitempty"` }
func GetAllocationToken ¶
func GetAllocationToken(ctx context.Context, tCtx core.TaskExecutionContext, currentState ExecutionState, metric QuboleHiveExecutorMetrics) (ExecutionState, error)
func HandleExecutionState ¶
func HandleExecutionState(ctx context.Context, tCtx core.TaskExecutionContext, currentState ExecutionState, quboleClient client.QuboleClient, executionsCache cache.AutoRefresh, cfg *config.Config, metrics QuboleHiveExecutorMetrics) (ExecutionState, error)
This is the main state iteration
func KickOffQuery ¶
func KickOffQuery(ctx context.Context, tCtx core.TaskExecutionContext, currentState ExecutionState, quboleClient client.QuboleClient, cache cache.AutoRefresh, cfg *config.Config) (ExecutionState, error)
func MonitorQuery ¶
func MonitorQuery(ctx context.Context, tCtx core.TaskExecutionContext, currentState ExecutionState, cache cache.AutoRefresh) ( ExecutionState, error)
func WriteOutputs ¶
func WriteOutputs(ctx context.Context, tCtx core.TaskExecutionContext, currentState ExecutionState) ( ExecutionState, error)
type ExecutionStateCacheItem ¶
type ExecutionStateCacheItem struct { ExecutionState // This ID is the cache key and so will need to be unique across all objects in the cache (it will probably be // unique across all of Kozmo) and needs to be deterministic. // This will also be used as the allocation token for now. Identifier string `json:"id"` }
func (ExecutionStateCacheItem) ID ¶
func (e ExecutionStateCacheItem) ID() string
func (ExecutionStateCacheItem) IsTerminal ¶
func (e ExecutionStateCacheItem) IsTerminal() bool
type QuboleHiveExecutionsCache ¶
type QuboleHiveExecutionsCache struct { cache.AutoRefresh // contains filtered or unexported fields }
func NewQuboleHiveExecutionsCache ¶
func NewQuboleHiveExecutionsCache(ctx context.Context, quboleClient client.QuboleClient, secretManager core.SecretManager, cfg *config.Config, scope promutils.Scope) (QuboleHiveExecutionsCache, error)
func (*QuboleHiveExecutionsCache) SyncQuboleQuery ¶
func (q *QuboleHiveExecutionsCache) SyncQuboleQuery(ctx context.Context, batch cache.Batch) ( updatedBatch []cache.ItemSyncResponse, err error)
This basically grab an updated status from the Qubole API and store it in the cache All other handling should be in the synchronous loop.
type QuboleHiveExecutor ¶
type QuboleHiveExecutor struct {
// contains filtered or unexported fields
}
func NewQuboleHiveExecutor ¶
func NewQuboleHiveExecutor(ctx context.Context, cfg *config.Config, quboleClient client.QuboleClient, secretManager core.SecretManager, scope promutils.Scope) (QuboleHiveExecutor, error)
type PluginLoader func(ctx context.Context, iCtx SetupContext) (Plugin, error)
func (QuboleHiveExecutor) Abort ¶
func (q QuboleHiveExecutor) Abort(ctx context.Context, tCtx core.TaskExecutionContext) error
func (QuboleHiveExecutor) Finalize ¶
func (q QuboleHiveExecutor) Finalize(ctx context.Context, tCtx core.TaskExecutionContext) error
func (QuboleHiveExecutor) GetID ¶
func (q QuboleHiveExecutor) GetID() string
func (QuboleHiveExecutor) GetProperties ¶
func (q QuboleHiveExecutor) GetProperties() core.PluginProperties
func (QuboleHiveExecutor) Handle ¶
func (q QuboleHiveExecutor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (core.Transition, error)
Source Files ¶
Click to show internal directories.
Click to hide internal directories.