Documentation ¶
Index ¶
- Constants
- func CalculateOriginalIndex(childIdx int, toCache *bitarray.BitSet) int
- func InitializeExternalResources(ctx context.Context, tCtx core.TaskExecutionContext, state *State, ...) ([]*core.ExternalResource, error)
- func InvertBitSet(input *bitarray.BitSet, limit uint) *bitarray.BitSet
- func MapArrayStateToPluginPhase(_ context.Context, state *State, logLinks []*idlCore.TaskLog, ...) (core.PhaseInfo, error)
- func NewPhasesCompactArray(count uint) bitarray.CompactArray
- func ToArrayJob(structObj *structpb.Struct, taskTypeVersion int32) (*idlPlugins.ArrayJob, error)
- type Phase
- type State
- func (s State) GetArrayStatus() arraystatus.ArrayStatus
- func (s State) GetExecutionArraySize() int
- func (s *State) GetExecutionErr() *idlCore.ExecutionError
- func (s *State) GetIndexesToCache() *bitarray.BitSet
- func (s *State) GetOriginalArraySize() int64
- func (s *State) GetOriginalMinSuccesses() int64
- func (s State) GetPhase() (phase Phase, version uint32)
- func (s State) GetReason() string
- func (s *State) SetArrayStatus(state arraystatus.ArrayStatus) *State
- func (s *State) SetExecutionArraySize(size int) *State
- func (s *State) SetExecutionErr(err *idlCore.ExecutionError) *State
- func (s *State) SetIndexesToCache(set *bitarray.BitSet) *State
- func (s *State) SetOriginalArraySize(size int64) *State
- func (s *State) SetOriginalMinSuccesses(size int64) *State
- func (s *State) SetPhase(phase Phase, phaseVersion uint32) *State
- func (s *State) SetReason(reason string) *State
- func (s *State) SetRetryAttempts(retryAttempts bitarray.CompactArray) *State
Constants ¶
Variables ¶
This section is empty.
Functions ¶
func CalculateOriginalIndex ¶
CalculateOriginalIndex computes the original index of a sub-task.
func InitializeExternalResources ¶
func InitializeExternalResources(ctx context.Context, tCtx core.TaskExecutionContext, state *State, generateSubTaskID func(core.TaskExecutionContext, int) string) ([]*core.ExternalResource, error)
InitializeExternalResources constructs an ExternalResource array where each element describes the initial state of the subtask. This involves labeling all cached subtasks as successful with a cache hit and initializing others to undefined state.
func MapArrayStateToPluginPhase ¶
func MapArrayStateToPluginPhase(_ context.Context, state *State, logLinks []*idlCore.TaskLog, externalResources []*core.ExternalResource) (core.PhaseInfo, error)
Any state of the plugin needs to map to a core.PhaseInfo (which in turn will map to Admin events) so that the rest of the Nebula platform can understand what's happening. That is, each possible state that our plugin state machine returns should map to a unique (core.Phase, core.PhaseInfo.version). Info fields will always be nil, because we're going to send log links individually. This simplifies our state handling as we don't have to keep an ever growing list of log links (our batch jobs can be 5000 sub-tasks, keeping all the log links takes up a lot of space).
func NewPhasesCompactArray ¶
func NewPhasesCompactArray(count uint) bitarray.CompactArray
func ToArrayJob ¶
Types ¶
type Phase ¶
type Phase uint8
func PhaseString ¶
PhaseString retrieves an enum value from the enum constants string name. Throws an error if the param is not part of the enum.
func SummaryToPhase ¶
func SummaryToPhase(ctx context.Context, minSuccesses int64, summary arraystatus.ArraySummary) Phase
type State ¶
type State struct { CurrentPhase Phase `json:"phase"` PhaseVersion uint32 `json:"phaseVersion"` Reason string `json:"reason"` ExecutionErr *idlCore.ExecutionError `json:"err"` ExecutionArraySize int `json:"arraySize"` OriginalArraySize int64 `json:"originalArraySize"` ArrayStatus arraystatus.ArrayStatus `json:"arrayStatus"` OriginalMinSuccesses int64 `json:"minSuccess"` // Which sub-tasks to cache, (using the original index, that is, the length is ArrayJob.size) IndexesToCache *bitarray.BitSet `json:"indexesToCache"` // Tracks the number of subtask retries using the execution index RetryAttempts bitarray.CompactArray `json:"retryAttempts"` // Tracks the number of system failures for each subtask using the execution index SystemFailures bitarray.CompactArray `json:"systemFailures"` }
func (State) GetArrayStatus ¶
func (s State) GetArrayStatus() arraystatus.ArrayStatus
func (State) GetExecutionArraySize ¶
func (*State) GetExecutionErr ¶
func (s *State) GetExecutionErr() *idlCore.ExecutionError
func (*State) GetIndexesToCache ¶
func (*State) GetOriginalArraySize ¶
func (*State) GetOriginalMinSuccesses ¶
func (*State) SetArrayStatus ¶
func (s *State) SetArrayStatus(state arraystatus.ArrayStatus) *State
func (*State) SetExecutionArraySize ¶
func (*State) SetExecutionErr ¶
func (s *State) SetExecutionErr(err *idlCore.ExecutionError) *State
func (*State) SetOriginalArraySize ¶
func (*State) SetOriginalMinSuccesses ¶
func (*State) SetRetryAttempts ¶
func (s *State) SetRetryAttempts(retryAttempts bitarray.CompactArray) *State