Documentation
¶
Overview ¶
Package etl provides utilities to initialize and use transformation pods.
- Copyright (c) 2018-2022, NVIDIA CORPORATION. All rights reserved.
Package etl provides utilities to initialize and use transformation pods.
- Copyright (c) 2018-2022, NVIDIA CORPORATION. All rights reserved.
Package etl provides utilities to initialize and use transformation pods.
- Copyright (c) 2018-2022, NVIDIA CORPORATION. All rights reserved.
Package etl provides utilities to initialize and use transformation pods.
- Copyright (c) 2018-2022, NVIDIA CORPORATION. All rights reserved.
Package etl provides utilities to initialize and use transformation pods.
- Copyright (c) 2021-2022, NVIDIA CORPORATION. All rights reserved.
Package etl provides utilities to initialize and use transformation pods.
- Copyright (c) 2018-2022, NVIDIA CORPORATION. All rights reserved.
Package etl provides utilities to initialize and use transformation pods.
- Copyright (c) 2018-2022, NVIDIA CORPORATION. All rights reserved.
Index ¶
- Constants
- func CheckSecret(secret string) error
- func InitCode(t cluster.Target, msg *InitCodeMsg) error
- func InitSpec(t cluster.Target, msg *InitSpecMsg, opts StartOpts) (err error)
- func ParsePodSpec(errCtx *cmn.ETLErrorContext, spec []byte) (*corev1.Pod, error)
- func Stop(t cluster.Target, id string, errCause error) error
- func StopAll(t cluster.Target)
- type Aborter
- type CommStats
- type Communicator
- type ETLs
- type Info
- type InfoList
- type InitCodeMsg
- type InitMsg
- type InitMsgBase
- type InitSpecMsg
- type MD
- func (e *MD) Add(spec InitMsg)
- func (e *MD) Del(id string) (deleted bool)
- func (e *MD) Get(id string) (msg InitMsg, present bool)
- func (e *MD) Init(l int)
- func (*MD) JspOpts() jsp.Options
- func (e *MD) MarshalJSON() ([]byte, error)
- func (e *MD) String() string
- func (e *MD) UnmarshalJSON(data []byte) (err error)
- type OfflineDataProvider
- type OfflineMsg
- type PodHealthMsg
- type PodLogsMsg
- type PodsHealthMsg
- type PodsLogsMsg
- type StartOpts
Constants ¶
const ( // ETL container receives POST request from target with the data. It // must read the data and return response to the target which then will be // transferred to the client. Hpush = "hpush://" // Target redirects the GET request to the ETL container. Then ETL container // contacts the target via `AIS_TARGET_URL` env variable to get the data. // The data is then transformed and returned to the client. Hpull = "hpull://" // Similar to redirection strategy but with usage of reverse proxy. Hrev = "hrev://" // Stdin/stdout communication. HpushStdin = "io://" )
Variables ¶
This section is empty.
Functions ¶
func CheckSecret ¶
func InitCode ¶
func InitCode(t cluster.Target, msg *InitCodeMsg) error
Given user message `InitCodeMsg`, make the corresponding assorted substitutions in the etl/runtime/podspec.yaml spec and run the container. See also: etl/runtime/podspec.yaml
func ParsePodSpec ¶
Types ¶
type Aborter ¶
type Aborter struct {
// contains filtered or unexported fields
}
Aborter listens to smap changes and aborts the ETL on the target when there is any change in targets membership. Aborter should be registered on ETL init. It is unregistered by Stop function. The is no synchronization between aborters on different targets. It is assumed that if one target received smap with changed targets membership, eventually each of the targets will receive it as well. Hence, all ETL containers will be stopped.
func (*Aborter) ListenSmapChanged ¶
func (e *Aborter) ListenSmapChanged()
type Communicator ¶
type Communicator interface { cluster.Slistener Name() string PodName() string SvcName() string // OnlineTransform uses one of the two ETL container endpoints: // - Method "PUT", Path "/" // - Method "GET", Path "/bucket/object" OnlineTransform(w http.ResponseWriter, r *http.Request, bck *cluster.Bck, objName string) error // OfflineTransform interface implementations realize offline ETL. // OfflineTransform is driven by `OfflineDataProvider` - not to confuse // with GET requests from users (such as training models and apps) // to perform on-the-fly transformation. OfflineTransform(bck *cluster.Bck, objName string, timeout time.Duration) (cos.ReadCloseSizer, error) Stop() CommStats }
Communicator is responsible for managing communications with local ETL container. It listens to cluster membership changes and terminates ETL container, if need be.
func GetCommunicator ¶
func GetCommunicator(transformID string, lsnode *cluster.Snode) (Communicator, error)
type Info ¶
type InitCodeMsg ¶
type InitCodeMsg struct { InitMsgBase Code []byte `json:"code"` Deps []byte `json:"dependencies"` Runtime string `json:"runtime"` // ======================================================================================== // `InitCodeMsg` carries the name of the transforming function; // the `Transform` function is mandatory and cannot be "" (empty) - it _will_ be called // by the `Runtime` container (see etl/runtime/all.go for all supported pre-built runtimes); // ========================================================================================= // TODO -- FIXME: decide if we need to remove nested struct for funcs Funcs struct { Transform string `json:"transform"` // cannot be omitted } // 0 (zero) - read the entire payload in memory and then transform it in one shot; // > 0 - use chunk-size buffering and transform incrementally, one chunk at a time ChunkSize int64 `json:"chunk_size"` // bitwise flags: (streaming | debug | strict | ...) Flags int64 `json:"flags"` }
func (*InitCodeMsg) InitType ¶
func (*InitCodeMsg) InitType() string
func (*InitCodeMsg) Validate ¶
func (m *InitCodeMsg) Validate() error
type InitMsg ¶
func UnmarshalInitMsg ¶
type InitMsgBase ¶
type InitMsgBase struct { IDX string `json:"id"` CommTypeX string `json:"communication"` Timeout cos.Duration `json:"timeout"` }
func (InitMsgBase) CommType ¶
func (m InitMsgBase) CommType() string
func (InitMsgBase) ID ¶
func (m InitMsgBase) ID() string
type InitSpecMsg ¶
type InitSpecMsg struct { InitMsgBase Spec []byte `json:"spec"` }
func (*InitSpecMsg) InitType ¶
func (*InitSpecMsg) InitType() string
func (*InitSpecMsg) Validate ¶
func (m *InitSpecMsg) Validate() (err error)
type OfflineDataProvider ¶
type OfflineDataProvider struct {
// contains filtered or unexported fields
}
func NewOfflineDataProvider ¶
func (*OfflineDataProvider) Reader ¶
func (dp *OfflineDataProvider) Reader(lom *cluster.LOM) (cos.ReadOpenCloser, cmn.ObjAttrsHolder, error)
Returns reader resulting from lom ETL transformation.
type OfflineMsg ¶
type OfflineMsg struct { ID string `json:"id"` // ETL ID Prefix string `json:"prefix"` // Prefix added to each resulting object. DryRun bool `json:"dry_run"` // Don't perform any PUT // New objects names will have this extension. Warning: if in a source // bucket exist two objects with the same base name, but different // extension, specifying this field might cause object overriding. // This is because of resulting name conflict. Ext string `json:"ext"` }
type PodHealthMsg ¶
type PodLogsMsg ¶
func (*PodLogsMsg) String ¶
func (p *PodLogsMsg) String(maxLen ...int) string
type PodsHealthMsg ¶
type PodsHealthMsg []*PodHealthMsg
type PodsLogsMsg ¶
type PodsLogsMsg []PodLogsMsg
func (PodsLogsMsg) Len ¶
func (p PodsLogsMsg) Len() int
func (PodsLogsMsg) Less ¶
func (p PodsLogsMsg) Less(i, j int) bool
func (PodsLogsMsg) Swap ¶
func (p PodsLogsMsg) Swap(i, j int)