hive

package
v1.13.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 23, 2024 License: Apache-2.0 Imports: 31 Imported by: 1

Documentation

Index

Constants

View Source
const (
	BadQuboleReturnCodeError stdErrors.ErrorCode = "QUBOLE_RETURNED_UNKNOWN"
)
View Source
const DefaultClusterPrimaryLabel = "default"
View Source
const ResyncDuration = 30 * time.Second

Variables

This section is empty.

Functions

func Abort

func Abort(ctx context.Context, tCtx core.TaskExecutionContext, currentState ExecutionState, qubole client.QuboleClient, apiKey string) error

func BuildResourceConfig

func BuildResourceConfig(cfg []config.ClusterConfig) map[core.ResourceNamespace]int

func ConstructTaskInfo

func ConstructTaskInfo(e ExecutionState) *core.TaskInfo

func ConstructTaskLog

func ConstructTaskLog(e ExecutionState) *idlCore.TaskLog

func GetMockTaskExecutionContext

func GetMockTaskExecutionContext() core.TaskExecutionContext

func GetMockTaskExecutionMetadata

func GetMockTaskExecutionMetadata() core.TaskExecutionMetadata

func GetQueryInfo

func GetQueryInfo(ctx context.Context, tCtx core.TaskExecutionContext) (
	formattedQuery string, cluster string, tags []string, timeoutSec uint32, taskName string, err error)

This function is the link between the output written by the SDK, and the execution side. It extracts the query out of the task template.

func GetSingleHiveQueryTaskTemplate

func GetSingleHiveQueryTaskTemplate() idlCore.TaskTemplate

func InTerminalState

func InTerminalState(e ExecutionState) bool

func InitializeHiveExecutor

func InitializeHiveExecutor(ctx context.Context, iCtx core.SetupContext, cfg *config.Config, resourceConfig map[core.ResourceNamespace]int,
	quboleClient client.QuboleClient) (core.Plugin, error)

func IsNotYetSubmitted

func IsNotYetSubmitted(e ExecutionState) bool

func MapExecutionStateToPhaseInfo

func MapExecutionStateToPhaseInfo(state ExecutionState, _ client.QuboleClient) core.PhaseInfo

func QuboleHiveExecutorLoader

func QuboleHiveExecutorLoader(ctx context.Context, iCtx core.SetupContext) (core.Plugin, error)

Types

type ExecutionPhase

type ExecutionPhase int
const (
	PhaseNotStarted ExecutionPhase = iota
	PhaseQueued                    // resource manager token gotten
	PhaseSubmitted                 // Sent off to Qubole
	PhaseWriteOutputFile
	PhaseQuerySucceeded
	PhaseQueryFailed
)

func QuboleStatusToExecutionPhase

func QuboleStatusToExecutionPhase(s client.QuboleStatus) (ExecutionPhase, error)

We need some way to translate results we get from Qubole, into a plugin phase NB: This function should only return plugin phases that are greater than (">") phases that represent states before

the query was kicked off. That is, it will never make sense to go back to PhaseNotStarted, after we've
submitted the query to Qubole.

func (ExecutionPhase) IsTerminal

func (p ExecutionPhase) IsTerminal() bool

func (ExecutionPhase) String

func (p ExecutionPhase) String() string

type ExecutionState

type ExecutionState struct {
	Phase ExecutionPhase

	// This will store the command ID from Qubole
	CommandID string `json:"command_id,omitempty"`
	URI       string `json:"uri,omitempty"`

	// This number keeps track of the number of failures within the sync function. Without this, what happens in
	// the sync function is entirely opaque. Note that this field is completely orthogonal to Flyte system/node/task
	// level retries, just errors from hitting the Qubole API, inside the sync loop
	SyncFailureCount int `json:"sync_failure_count,omitempty"`

	// In kicking off the Qubole command, this is the number of failures
	CreationFailureCount int `json:"creation_failure_count,omitempty"`

	// The time the execution first requests for an allocation token
	AllocationTokenRequestStartTime time.Time `json:"allocation_token_request_start_time,omitempty"`
}

func HandleExecutionState

func HandleExecutionState(ctx context.Context, tCtx core.TaskExecutionContext, currentState ExecutionState, quboleClient client.QuboleClient,
	executionsCache cache.AutoRefresh, cfg *config.Config, metrics QuboleHiveExecutorMetrics) (ExecutionState, error)

This is the main state iteration

func KickOffQuery

func KickOffQuery(ctx context.Context, tCtx core.TaskExecutionContext, currentState ExecutionState, quboleClient client.QuboleClient,
	cache cache.AutoRefresh, cfg *config.Config) (ExecutionState, error)

func MonitorQuery

func MonitorQuery(ctx context.Context, tCtx core.TaskExecutionContext, currentState ExecutionState, cache cache.AutoRefresh) (
	ExecutionState, error)

func WriteOutputs

func WriteOutputs(ctx context.Context, tCtx core.TaskExecutionContext, currentState ExecutionState) (
	ExecutionState, error)

type ExecutionStateCacheItem

type ExecutionStateCacheItem struct {
	ExecutionState

	// This ID is the cache key and so will need to be unique across all objects in the cache (it will probably be
	// unique across all of Flyte) and needs to be deterministic.
	// This will also be used as the allocation token for now.
	Identifier string `json:"id"`
}

func (ExecutionStateCacheItem) ID

func (ExecutionStateCacheItem) IsTerminal

func (e ExecutionStateCacheItem) IsTerminal() bool

type QuboleHiveExecutionsCache

type QuboleHiveExecutionsCache struct {
	cache.AutoRefresh
	// contains filtered or unexported fields
}

func NewQuboleHiveExecutionsCache

func NewQuboleHiveExecutionsCache(ctx context.Context, quboleClient client.QuboleClient,
	secretManager core.SecretManager, cfg *config.Config, scope promutils.Scope) (QuboleHiveExecutionsCache, error)

func (*QuboleHiveExecutionsCache) SyncQuboleQuery

func (q *QuboleHiveExecutionsCache) SyncQuboleQuery(ctx context.Context, batch cache.Batch) (
	updatedBatch []cache.ItemSyncResponse, err error)

This basically grab an updated status from the Qubole API and store it in the cache All other handling should be in the synchronous loop.

type QuboleHiveExecutor

type QuboleHiveExecutor struct {
	// contains filtered or unexported fields
}

func NewQuboleHiveExecutor

func NewQuboleHiveExecutor(ctx context.Context, cfg *config.Config, quboleClient client.QuboleClient, secretManager core.SecretManager, scope promutils.Scope) (QuboleHiveExecutor, error)

type PluginLoader func(ctx context.Context, iCtx SetupContext) (Plugin, error)

func (QuboleHiveExecutor) Abort

func (QuboleHiveExecutor) Finalize

func (QuboleHiveExecutor) GetID

func (q QuboleHiveExecutor) GetID() string

func (QuboleHiveExecutor) GetProperties

func (q QuboleHiveExecutor) GetProperties() core.PluginProperties

func (QuboleHiveExecutor) Handle

type QuboleHiveExecutorMetrics

type QuboleHiveExecutorMetrics struct {
	Scope                 promutils.Scope
	ResourceReleased      labeled.Counter
	ResourceReleaseFailed labeled.Counter
	AllocationGranted     labeled.Counter
	AllocationNotGranted  labeled.Counter
	ResourceWaitTime      prometheus.Summary
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL