presto

package
v1.1.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 18, 2023 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	BadPrestoReturnCodeError stdErrors.ErrorCode = "PRESTO_RETURNED_UNKNOWN"
)
View Source
const PrestoSource = "flyte"

Variables

This section is empty.

Functions

func Abort

func Abort(ctx context.Context, currentState ExecutionState, client client.PrestoClient) error

func ConstructTaskInfo

func ConstructTaskInfo(e ExecutionState) *core.TaskInfo

func ConstructTaskLog

func ConstructTaskLog(e ExecutionState) *idlCore.TaskLog

func ExecutorLoader

func ExecutorLoader(ctx context.Context, iCtx core.SetupContext) (core.Plugin, error)

func GetQueryInfo

func GetQueryInfo(ctx context.Context, tCtx core.TaskExecutionContext) (string, string, string, string, error)

This function is the link between the output written by the SDK, and the execution side. It extracts the query out of the task template.

func InTerminalState

func InTerminalState(e ExecutionState) bool

func InitializePrestoExecutor

func InitializePrestoExecutor(
	ctx context.Context,
	iCtx core.SetupContext,
	cfg *config.Config,
	prestoClient client.PrestoClient) (core.Plugin, error)

func IsNotYetSubmitted

func IsNotYetSubmitted(e ExecutionState) bool

func MapExecutionStateToPhaseInfo

func MapExecutionStateToPhaseInfo(state ExecutionState) core.PhaseInfo

The 'PhaseInfoRunning' occurs 15 times (3 for each of the 5 Presto queries that get run for every Presto task) which are differentiated by the version (1-15)

Types

type ExecutionPhase

type ExecutionPhase int
const (
	PhaseNotStarted ExecutionPhase = iota
	PhaseQueued                    // resource manager token gotten
	PhaseSubmitted                 // Sent off to Presto
	PhaseQuerySucceeded
	PhaseQueryFailed
)

func StatusToExecutionPhase

func StatusToExecutionPhase(s client.PrestoStatus) (ExecutionPhase, error)

We need some way to translate results we get from Presto, into a plugin phase

func (ExecutionPhase) String

func (p ExecutionPhase) String() string

type ExecutionState

type ExecutionState struct {
	CurrentPhase  ExecutionPhase
	PreviousPhase ExecutionPhase

	// This will store the command ID from Presto
	CommandID string `json:"commandId,omitempty"`

	// This will have the nextUri from Presto which is used to advance the query forward
	URI string `json:"uri,omitempty"`

	// This is the current Presto query (out of 5) needed to complete a Presto task
	CurrentPrestoQuery Query `json:"currentPrestoQuery,omitempty"`

	// This is an id to keep track of the current query. Every query's id should be unique for caching purposes
	CurrentPrestoQueryUUID string `json:"currentPrestoQueryUUID,omitempty"`

	// Keeps track of which Presto query we are on. Its values range from 0-4 for the 5 queries that are needed
	QueryCount int `json:"queryCount,omitempty"`

	// This number keeps track of the number of failures within the sync function. Without this, what happens in
	// the sync function is entirely opaque. Note that this field is completely orthogonal to Flyte system/node/task
	// level retries, just errors from hitting the Presto API, inside the sync loop
	SyncFailureCount int `json:"syncFailureCount,omitempty"`

	// In kicking off the Presto command, this is the number of failures
	CreationFailureCount int `json:"creationFailureCount,omitempty"`

	// The time the execution first requests for an allocation token
	AllocationTokenRequestStartTime time.Time `json:"allocationTokenRequestStartTime,omitempty"`
}

func GetAllocationToken

func GetAllocationToken(
	ctx context.Context,
	tCtx core.TaskExecutionContext,
	currentState ExecutionState,
	metric ExecutorMetrics) (ExecutionState, error)

func HandleExecutionState

func HandleExecutionState(
	ctx context.Context,
	tCtx core.TaskExecutionContext,
	currentState ExecutionState,
	prestoClient client.PrestoClient,
	executionsCache cache.AutoRefresh,
	metrics ExecutorMetrics) (ExecutionState, error)

This is the main state iteration

func KickOffQuery

func KickOffQuery(
	ctx context.Context,
	tCtx core.TaskExecutionContext,
	currentState ExecutionState,
	prestoClient client.PrestoClient,
	cache cache.AutoRefresh) (ExecutionState, error)

func MonitorQuery

func MonitorQuery(
	ctx context.Context,
	tCtx core.TaskExecutionContext,
	currentState ExecutionState,
	cache cache.AutoRefresh) (ExecutionState, error)

type ExecutionStateCacheItem

type ExecutionStateCacheItem struct {
	ExecutionState

	// This ID is the cache key and so will need to be unique across all objects in the cache (it will probably be
	// unique across all of Flyte) and needs to be deterministic.
	// This will also be used as the allocation token for now.
	Identifier string `json:"id"`
}

func (ExecutionStateCacheItem) ID

type ExecutionsCache

type ExecutionsCache struct {
	cache.AutoRefresh
	// contains filtered or unexported fields
}

func NewPrestoExecutionsCache

func NewPrestoExecutionsCache(
	ctx context.Context,
	prestoClient client.PrestoClient,
	cfg *config.Config,
	scope promutils.Scope) (ExecutionsCache, error)

func (*ExecutionsCache) SyncPrestoQuery

func (p *ExecutionsCache) SyncPrestoQuery(ctx context.Context, batch cache.Batch) (
	updatedBatch []cache.ItemSyncResponse, err error)

This basically grab an updated status from the Presto API and stores it in the cache All other handling should be in the synchronous loop.

type Executor

type Executor struct {
	// contains filtered or unexported fields
}

func NewPrestoExecutor

func NewPrestoExecutor(
	ctx context.Context,
	cfg *config.Config,
	prestoClient client.PrestoClient,
	scope promutils.Scope) (Executor, error)

func (Executor) Abort

func (Executor) Finalize

func (p Executor) Finalize(ctx context.Context, tCtx core.TaskExecutionContext) error

func (Executor) GetID

func (p Executor) GetID() string

func (Executor) GetProperties

func (p Executor) GetProperties() core.PluginProperties

func (Executor) Handle

type ExecutorMetrics

type ExecutorMetrics struct {
	Scope                 promutils.Scope
	ResourceReleased      labeled.Counter
	ResourceReleaseFailed labeled.Counter
	AllocationGranted     labeled.Counter
	AllocationNotGranted  labeled.Counter
}

type Query

type Query struct {
	Statement         string                   `json:"statement,omitempty"`
	ExecuteArgs       client.PrestoExecuteArgs `json:"executeArgs,omitempty"`
	TempTableName     string                   `json:"tempTableName,omitempty"`
	ExternalTableName string                   `json:"externalTableName,omitempty"`
	ExternalLocation  string                   `json:"externalLocation"`
}

func GetNextQuery

func GetNextQuery(
	ctx context.Context,
	tCtx core.TaskExecutionContext,
	currentState ExecutionState) (Query, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL