Documentation ¶
Index ¶
- Constants
- func Abort(ctx context.Context, currentState ExecutionState, client client.PrestoClient) error
- func ConstructTaskInfo(e ExecutionState) *core.TaskInfo
- func ConstructTaskLog(e ExecutionState) *idlCore.TaskLog
- func ExecutorLoader(ctx context.Context, iCtx core.SetupContext) (core.Plugin, error)
- func Finalize(ctx context.Context, tCtx core.TaskExecutionContext, _ ExecutionState, ...) error
- func GetQueryInfo(ctx context.Context, tCtx core.TaskExecutionContext) (string, string, string, string, error)
- func InTerminalState(e ExecutionState) bool
- func InitializePrestoExecutor(ctx context.Context, iCtx core.SetupContext, cfg *config.Config, ...) (core.Plugin, error)
- func IsNotYetSubmitted(e ExecutionState) bool
- func MapExecutionStateToPhaseInfo(state ExecutionState) core.PhaseInfo
- type ExecutionPhase
- type ExecutionState
- func GetAllocationToken(ctx context.Context, tCtx core.TaskExecutionContext, ...) (ExecutionState, error)
- func HandleExecutionState(ctx context.Context, tCtx core.TaskExecutionContext, ...) (ExecutionState, error)
- func KickOffQuery(ctx context.Context, tCtx core.TaskExecutionContext, ...) (ExecutionState, error)
- func MonitorQuery(ctx context.Context, tCtx core.TaskExecutionContext, ...) (ExecutionState, error)
- type ExecutionStateCacheItem
- type ExecutionsCache
- type Executor
- func (p Executor) Abort(ctx context.Context, tCtx core.TaskExecutionContext) error
- func (p Executor) Finalize(ctx context.Context, tCtx core.TaskExecutionContext) error
- func (p Executor) GetID() string
- func (p Executor) GetProperties() core.PluginProperties
- func (p Executor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (core.Transition, error)
- type ExecutorMetrics
- type Query
Constants ¶
View Source
const (
BadPrestoReturnCodeError stdErrors.ErrorCode = "PRESTO_RETURNED_UNKNOWN"
)
View Source
const PrestoSource = "flyte"
Variables ¶
This section is empty.
Functions ¶
func Abort ¶
func Abort(ctx context.Context, currentState ExecutionState, client client.PrestoClient) error
func ConstructTaskInfo ¶
func ConstructTaskInfo(e ExecutionState) *core.TaskInfo
func ConstructTaskLog ¶
func ConstructTaskLog(e ExecutionState) *idlCore.TaskLog
func ExecutorLoader ¶
func Finalize ¶
func Finalize(ctx context.Context, tCtx core.TaskExecutionContext, _ ExecutionState, metrics ExecutorMetrics) error
func GetQueryInfo ¶
func GetQueryInfo(ctx context.Context, tCtx core.TaskExecutionContext) (string, string, string, string, error)
This function is the link between the output written by the SDK, and the execution side. It extracts the query out of the task template.
func InTerminalState ¶
func InTerminalState(e ExecutionState) bool
func IsNotYetSubmitted ¶
func IsNotYetSubmitted(e ExecutionState) bool
func MapExecutionStateToPhaseInfo ¶
func MapExecutionStateToPhaseInfo(state ExecutionState) core.PhaseInfo
The 'PhaseInfoRunning' occurs 15 times (3 for each of the 5 Presto queries that get run for every Presto task) which are differentiated by the version (1-15)
Types ¶
type ExecutionPhase ¶
type ExecutionPhase int
const ( PhaseNotStarted ExecutionPhase = iota PhaseQueued // resource manager token gotten PhaseSubmitted // Sent off to Presto PhaseQuerySucceeded PhaseQueryFailed )
func StatusToExecutionPhase ¶
func StatusToExecutionPhase(s client.PrestoStatus) (ExecutionPhase, error)
We need some way to translate results we get from Presto, into a plugin phase
func (ExecutionPhase) IsTerminal ¶
func (p ExecutionPhase) IsTerminal() bool
func (ExecutionPhase) String ¶
func (p ExecutionPhase) String() string
type ExecutionState ¶
type ExecutionState struct { CurrentPhase ExecutionPhase PreviousPhase ExecutionPhase // This will store the command ID from Presto CommandID string `json:"commandId,omitempty"` // This will have the nextUri from Presto which is used to advance the query forward URI string `json:"uri,omitempty"` // This is the current Presto query (out of 5) needed to complete a Presto task CurrentPrestoQuery Query `json:"currentPrestoQuery,omitempty"` // This is an id to keep track of the current query. Every query's id should be unique for caching purposes CurrentPrestoQueryUUID string `json:"currentPrestoQueryUUID,omitempty"` // Keeps track of which Presto query we are on. Its values range from 0-4 for the 5 queries that are needed QueryCount int `json:"queryCount,omitempty"` // This number keeps track of the number of failures within the sync function. Without this, what happens in // the sync function is entirely opaque. Note that this field is completely orthogonal to Flyte system/node/task // level retries, just errors from hitting the Presto API, inside the sync loop SyncFailureCount int `json:"syncFailureCount,omitempty"` // In kicking off the Presto command, this is the number of failures CreationFailureCount int `json:"creationFailureCount,omitempty"` // The time the execution first requests for an allocation token AllocationTokenRequestStartTime time.Time `json:"allocationTokenRequestStartTime,omitempty"` }
func GetAllocationToken ¶
func GetAllocationToken( ctx context.Context, tCtx core.TaskExecutionContext, currentState ExecutionState, metric ExecutorMetrics) (ExecutionState, error)
func HandleExecutionState ¶
func HandleExecutionState( ctx context.Context, tCtx core.TaskExecutionContext, currentState ExecutionState, prestoClient client.PrestoClient, executionsCache cache.AutoRefresh, metrics ExecutorMetrics) (ExecutionState, error)
This is the main state iteration
func KickOffQuery ¶
func KickOffQuery( ctx context.Context, tCtx core.TaskExecutionContext, currentState ExecutionState, prestoClient client.PrestoClient, cache cache.AutoRefresh) (ExecutionState, error)
func MonitorQuery ¶
func MonitorQuery( ctx context.Context, tCtx core.TaskExecutionContext, currentState ExecutionState, cache cache.AutoRefresh) (ExecutionState, error)
type ExecutionStateCacheItem ¶
type ExecutionStateCacheItem struct { ExecutionState // This ID is the cache key and so will need to be unique across all objects in the cache (it will probably be // unique across all of Flyte) and needs to be deterministic. // This will also be used as the allocation token for now. Identifier string `json:"id"` }
func (ExecutionStateCacheItem) ID ¶
func (e ExecutionStateCacheItem) ID() string
func (ExecutionStateCacheItem) IsTerminal ¶
func (e ExecutionStateCacheItem) IsTerminal() bool
type ExecutionsCache ¶
type ExecutionsCache struct { cache.AutoRefresh // contains filtered or unexported fields }
func NewPrestoExecutionsCache ¶
func NewPrestoExecutionsCache( ctx context.Context, prestoClient client.PrestoClient, cfg *config.Config, scope promutils.Scope) (ExecutionsCache, error)
func (*ExecutionsCache) SyncPrestoQuery ¶
func (p *ExecutionsCache) SyncPrestoQuery(ctx context.Context, batch cache.Batch) ( updatedBatch []cache.ItemSyncResponse, err error)
This basically grab an updated status from the Presto API and stores it in the cache All other handling should be in the synchronous loop.
type Executor ¶
type Executor struct {
// contains filtered or unexported fields
}
func NewPrestoExecutor ¶
func (Executor) GetProperties ¶
func (p Executor) GetProperties() core.PluginProperties
func (Executor) Handle ¶
func (p Executor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (core.Transition, error)
type ExecutorMetrics ¶
type Query ¶
type Query struct { Statement string `json:"statement,omitempty"` ExecuteArgs client.PrestoExecuteArgs `json:"executeArgs,omitempty"` TempTableName string `json:"tempTableName,omitempty"` ExternalTableName string `json:"externalTableName,omitempty"` ExternalLocation string `json:"externalLocation"` }
func GetNextQuery ¶
func GetNextQuery( ctx context.Context, tCtx core.TaskExecutionContext, currentState ExecutionState) (Query, error)
Source Files ¶
Click to show internal directories.
Click to hide internal directories.