Documentation ¶
Overview ¶
Package etl provides utilities to initialize and use transformation pods.
- Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.
Package etl provides utilities to initialize and use transformation pods.
- Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved.
Package etl provides utilities to initialize and use transformation pods.
- Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved.
Package etl provides utilities to initialize and use transformation pods.
- Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved.
Package etl provides utilities to initialize and use transformation pods.
- Copyright (c) 2021-2022, NVIDIA CORPORATION. All rights reserved.
Package etl provides utilities to initialize and use transformation pods.
- Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.
Package etl provides utilities to initialize and use transformation pods.
- Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved.
Index ¶
- Constants
- func CheckSecret(secret string) error
- func InitCode(msg *InitCodeMsg, xid string) error
- func InitSpec(msg *InitSpecMsg, etlName string, opts StartOpts) error
- func ParsePodSpec(errCtx *cmn.ETLErrCtx, spec []byte) (*corev1.Pod, error)
- func PodHealth(etlName string) (string, error)
- func Stop(id string, errCause error) error
- func StopAll()
- type Aborter
- type CPUMemByTarget
- type CPUMemUsed
- type CommStats
- type Communicator
- type ETLs
- type HealthByTarget
- type HealthStatus
- type Info
- type InfoList
- type InitCodeMsg
- type InitMsg
- type InitMsgBase
- type InitSpecMsg
- type Logs
- type LogsByTarget
- type MD
- func (e *MD) Add(msg InitMsg)
- func (e *MD) Del(id string) (deleted bool)
- func (e *MD) Get(id string) (msg InitMsg, present bool)
- func (e *MD) Init(l int)
- func (*MD) JspOpts() jsp.Options
- func (e *MD) MarshalJSON() ([]byte, error)
- func (e *MD) String() string
- func (e *MD) UnmarshalJSON(data []byte) (err error)
- type OfflineDP
- type StartOpts
Constants ¶
const ( Spec = "spec" Code = "code" )
const ( // ETL container receives POST request from target with the data. It // must read the data and return response to the target which then will be // transferred to the client. Hpush = "hpush://" // Target redirects the GET request to the ETL container. Then ETL container // contacts the target via `AIS_TARGET_URL` env variable to get the data. // The data is then transformed and returned to the client. Hpull = "hpull://" // Similar to redirection strategy but with usage of reverse proxy. Hrev = "hrev://" // Stdin/stdout communication. HpushStdin = "io://" )
enum communication types (`commTypes`)
const ( ArgTypeDefault = "" ArgTypeURL = "url" ArgTypeFQN = "fqn" )
enum arg types (`argTypes`)
const CommTypeSeparator = "://"
consistent with rfc2396.txt "Uniform Resource Identifiers (URI): Generic Syntax"
const DefaultTimeout = 45 * time.Second
const PrefixXactID = "etl-"
Variables ¶
This section is empty.
Functions ¶
func CheckSecret ¶
func InitCode ¶
func InitCode(msg *InitCodeMsg, xid string) error
Given user message `InitCodeMsg`: - make the corresponding assorted substitutions in the etl/runtime/podspec.yaml spec, and - execute `InitSpec` with the modified podspec See also: etl/runtime/podspec.yaml
func InitSpec ¶
func InitSpec(msg *InitSpecMsg, etlName string, opts StartOpts) error
(common for both `InitCode` and `InitSpec` flows)
Types ¶
type Aborter ¶
type Aborter struct {
// contains filtered or unexported fields
}
Aborter listens to smap changes and aborts the ETL on the target when there is any change in targets membership. Aborter should be registered on ETL init. It is unregistered by Stop function. The is no synchronization between aborters on different targets. It is assumed that if one target received smap with changed targets membership, eventually each of the targets will receive it as well. Hence, all ETL containers will be stopped.
func (*Aborter) ListenSmapChanged ¶
func (e *Aborter) ListenSmapChanged()
type CPUMemByTarget ¶
type CPUMemByTarget []*CPUMemUsed
type CPUMemUsed ¶
type CPUMemUsed struct { TargetID string `json:"target_id"` CPU float64 `json:"cpu"` Mem int64 `json:"mem"` }
func PodMetrics ¶
func PodMetrics(etlName string) (*CPUMemUsed, error)
type Communicator ¶
type Communicator interface { meta.Slistener Name() string Xact() core.Xact PodName() string SvcName() string String() string // InlineTransform uses one of the two ETL container endpoints: // - Method "PUT", Path "/" // - Method "GET", Path "/bucket/object" InlineTransform(w http.ResponseWriter, r *http.Request, lom *core.LOM) error // OfflineTransform is driven by `OfflineDP` to provide offline transformation, as it were // Implementations include: // - pushComm // - redirectComm // - revProxyComm // See also, and separately: on-the-fly transformation as part of a user (e.g. training model) GET request handling OfflineTransform(lom *core.LOM, timeout time.Duration) (cos.ReadCloseSizer, error) Stop() CommStats }
Communicator is responsible for managing communications with local ETL container. It listens to cluster membership changes and terminates ETL container, if need be.
func GetCommunicator ¶
func GetCommunicator(etlName string) (Communicator, error)
type HealthByTarget ¶
type HealthByTarget []*HealthStatus
type HealthStatus ¶
type Info ¶
type InitCodeMsg ¶
type InitCodeMsg struct { InitMsgBase Code []byte `json:"code"` Deps []byte `json:"dependencies"` Runtime string `json:"runtime"` // ======================================================================================== // InitCodeMsg carries the name of the transforming function; // the `Transform` function is mandatory and cannot be "" (empty) - it _will_ be called // by the `Runtime` container (see etl/runtime/all.go for all supported pre-built runtimes); // ========================================================================================= Funcs struct { Transform string `json:"transform"` // cannot be omitted } // 0 (zero) - read the entire payload in memory and then transform it in one shot; // > 0 - use chunk-size buffering and transform incrementally, one chunk at a time ChunkSize int64 `json:"chunk_size"` // bitwise flags: (streaming | debug | strict | ...) future enhancements Flags int64 `json:"flags"` }
func (*InitCodeMsg) MsgType ¶ added in v1.3.19
func (*InitCodeMsg) MsgType() string
func (*InitCodeMsg) String ¶
func (m *InitCodeMsg) String() string
func (*InitCodeMsg) Validate ¶
func (m *InitCodeMsg) Validate() error
type InitMsg ¶
type InitMsg interface { Name() string MsgType() string // Code or Spec CommType() string ArgType() string Validate() error String() string }
func UnmarshalInitMsg ¶
TODO: double-take, unmarshaling-wise. To avoid, include (`Spec`, `Code`) in API calls
type InitMsgBase ¶
type InitMsgBase struct { IDX string `json:"id"` // etlName (not to be confused) CommTypeX string `json:"communication"` // enum commTypes ArgTypeX string `json:"argument"` // enum argTypes Timeout cos.Duration `json:"timeout"` }
and implementations
func (InitMsgBase) ArgType ¶ added in v1.3.19
func (m InitMsgBase) ArgType() string
func (InitMsgBase) CommType ¶
func (m InitMsgBase) CommType() string
func (InitMsgBase) Name ¶
func (m InitMsgBase) Name() string
type InitSpecMsg ¶
type InitSpecMsg struct { InitMsgBase Spec []byte `json:"spec"` }
func (*InitSpecMsg) MsgType ¶ added in v1.3.19
func (*InitSpecMsg) MsgType() string
func (*InitSpecMsg) String ¶
func (m *InitSpecMsg) String() string
func (*InitSpecMsg) Validate ¶
func (m *InitSpecMsg) Validate() (err error)
type LogsByTarget ¶
type LogsByTarget []Logs