etl

package
v1.3.25 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 6, 2024 License: MIT Imports: 33 Imported by: 1

README

ETL package

The etl package compiles into aisnode executable to facilitate running custom ETL containers and communicating with those containers at runtime.

AIStore supports both on the fly (aka online) and offline user-defined dataset transformations. All the respective I/O intensive (and expensive) operation is confined to the storage cluster, with computing clients retaining all their resources to execute computation over transformed, filtered, and sorted data.

Popular use cases include - but are not limited to - dataset augmentation (of any kind) and filtering of AI datasets.

Please refer to ETL readme for the prerequisites, 4 supported ais <=> container communication mechanisms, and usage examples.

ETL readme also contains an overview of the architecture, important technical details, and further guidance.

Architecture

AIS-ETL extension is designed to maximize the effectiveness of the transformation process. In particular, AIS-ETL optimizes-out the entire networking operation that would otherwise be required to move pre-transformed data between storage and compute nodes.

Based on the specification provided by a user, each target starts its own ETL container (worker) - one ETL container per each storage target in the cluster. From now this "local" ETL container will be responsible for transforming objects stored on "its" AIS target. This approach allows us to run custom transformations close to data. This approach also ensures performance and scalability of the transformation workloads - the scalability that for all intents and purposes must be considered practically unlimited.

The following figure illustrates a cluster of 3 AIS proxies (gateways) and 4 storage targets, with each target running user-defined ETL in parallel:

ETL architecture

Management and Benchmarking

  • AIS CLI includes commands to start, stop, and monitor ETL at runtime.
  • AIS Loader has been extended to benchmark and stress test AIS clusters by running a number of pre-defined transformations that we include with the source code.

For more information and details, please refer to ETL readme.

Documentation

Overview

Package etl provides utilities to initialize and use transformation pods.

  • Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.

Package etl provides utilities to initialize and use transformation pods.

  • Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved.

Package etl provides utilities to initialize and use transformation pods.

  • Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved.

Package etl provides utilities to initialize and use transformation pods.

  • Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved.

Package etl provides utilities to initialize and use transformation pods.

  • Copyright (c) 2021-2022, NVIDIA CORPORATION. All rights reserved.

Package etl provides utilities to initialize and use transformation pods.

  • Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.

Package etl provides utilities to initialize and use transformation pods.

  • Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved.

Index

Constants

View Source
const (
	Spec = "spec"
	Code = "code"
)
View Source
const (
	// ETL container receives POST request from target with the data. It
	// must read the data and return response to the target which then will be
	// transferred to the client.
	Hpush = "hpush://"
	// Target redirects the GET request to the ETL container. Then ETL container
	// contacts the target via `AIS_TARGET_URL` env variable to get the data.
	// The data is then transformed and returned to the client.
	Hpull = "hpull://"
	// Similar to redirection strategy but with usage of reverse proxy.
	Hrev = "hrev://"
	// Stdin/stdout communication.
	HpushStdin = "io://"
)

enum communication types (`commTypes`)

View Source
const (
	ArgTypeDefault = ""
	ArgTypeURL     = "url"
	ArgTypeFQN     = "fqn"
)

enum arg types (`argTypes`)

View Source
const CommTypeSeparator = "://"

consistent with rfc2396.txt "Uniform Resource Identifiers (URI): Generic Syntax"

View Source
const DefaultTimeout = 45 * time.Second
View Source
const PrefixXactID = "etl-"

Variables

This section is empty.

Functions

func CheckSecret

func CheckSecret(secret string) error

func InitCode

func InitCode(msg *InitCodeMsg, xid string) error

Given user message `InitCodeMsg`: - make the corresponding assorted substitutions in the etl/runtime/podspec.yaml spec, and - execute `InitSpec` with the modified podspec See also: etl/runtime/podspec.yaml

func InitSpec

func InitSpec(msg *InitSpecMsg, etlName string, opts StartOpts) error

(common for both `InitCode` and `InitSpec` flows)

func ParsePodSpec

func ParsePodSpec(errCtx *cmn.ETLErrCtx, spec []byte) (*corev1.Pod, error)

func PodHealth

func PodHealth(etlName string) (string, error)

func Stop

func Stop(id string, errCause error) error

Stop deletes all occupied by the ETL resources, including Pods and Services. It unregisters ETL smap listener.

func StopAll

func StopAll()

StopAll terminates all running ETLs.

Types

type Aborter

type Aborter struct {
	// contains filtered or unexported fields
}

Aborter listens to smap changes and aborts the ETL on the target when there is any change in targets membership. Aborter should be registered on ETL init. It is unregistered by Stop function. The is no synchronization between aborters on different targets. It is assumed that if one target received smap with changed targets membership, eventually each of the targets will receive it as well. Hence, all ETL containers will be stopped.

func (*Aborter) ListenSmapChanged

func (e *Aborter) ListenSmapChanged()

func (*Aborter) String

func (e *Aborter) String() string

type CPUMemByTarget

type CPUMemByTarget []*CPUMemUsed

type CPUMemUsed

type CPUMemUsed struct {
	TargetID string  `json:"target_id"`
	CPU      float64 `json:"cpu"`
	Mem      int64   `json:"mem"`
}

func PodMetrics

func PodMetrics(etlName string) (*CPUMemUsed, error)

type CommStats

type CommStats interface {
	ObjCount() int64
	InBytes() int64
	OutBytes() int64
}

type Communicator

type Communicator interface {
	meta.Slistener

	Name() string
	Xact() core.Xact
	PodName() string
	SvcName() string

	String() string

	// InlineTransform uses one of the two ETL container endpoints:
	//  - Method "PUT", Path "/"
	//  - Method "GET", Path "/bucket/object"
	InlineTransform(w http.ResponseWriter, r *http.Request, lom *core.LOM) error

	// OfflineTransform is driven by `OfflineDP` to provide offline transformation, as it were
	// Implementations include:
	// - pushComm
	// - redirectComm
	// - revProxyComm
	// See also, and separately: on-the-fly transformation as part of a user (e.g. training model) GET request handling
	OfflineTransform(lom *core.LOM, timeout time.Duration) (cos.ReadCloseSizer, error)

	Stop()

	CommStats
}

Communicator is responsible for managing communications with local ETL container. It listens to cluster membership changes and terminates ETL container, if need be.

func GetCommunicator

func GetCommunicator(etlName string) (Communicator, error)

type ETLs

type ETLs map[string]InitMsg

type HealthByTarget

type HealthByTarget []*HealthStatus

type HealthStatus

type HealthStatus struct {
	TargetID string `json:"target_id"`
	Status   string `json:"health_status"` // enum { HealthStatusRunning, ... } above
}

type Info

type Info struct {
	Name     string `json:"id"`
	XactID   string `json:"xaction_id"`
	ObjCount int64  `json:"obj_count"`
	InBytes  int64  `json:"in_bytes"`
	OutBytes int64  `json:"out_bytes"`
}

func List

func List() []Info

type InfoList

type InfoList []Info

func (InfoList) Len

func (il InfoList) Len() int

func (InfoList) Less

func (il InfoList) Less(i, j int) bool

func (InfoList) Swap

func (il InfoList) Swap(i, j int)

type InitCodeMsg

type InitCodeMsg struct {
	InitMsgBase
	Code    []byte `json:"code"`
	Deps    []byte `json:"dependencies"`
	Runtime string `json:"runtime"`
	// ========================================================================================
	// InitCodeMsg carries the name of the transforming function;
	// the `Transform` function is mandatory and cannot be "" (empty) - it _will_ be called
	// by the `Runtime` container (see etl/runtime/all.go for all supported pre-built runtimes);
	// =========================================================================================
	Funcs struct {
		Transform string `json:"transform"` // cannot be omitted
	}
	// 0 (zero) - read the entire payload in memory and then transform it in one shot;
	// > 0 - use chunk-size buffering and transform incrementally, one chunk at a time
	ChunkSize int64 `json:"chunk_size"`
	// bitwise flags: (streaming | debug | strict | ...) future enhancements
	Flags int64 `json:"flags"`
}

func (*InitCodeMsg) MsgType added in v1.3.19

func (*InitCodeMsg) MsgType() string

func (*InitCodeMsg) String

func (m *InitCodeMsg) String() string

func (*InitCodeMsg) Validate

func (m *InitCodeMsg) Validate() error

type InitMsg

type InitMsg interface {
	Name() string
	MsgType() string // Code or Spec
	CommType() string
	ArgType() string
	Validate() error
	String() string
}

func UnmarshalInitMsg

func UnmarshalInitMsg(b []byte) (msg InitMsg, err error)

TODO: double-take, unmarshaling-wise. To avoid, include (`Spec`, `Code`) in API calls

type InitMsgBase

type InitMsgBase struct {
	IDX       string       `json:"id"`            // etlName (not to be confused)
	CommTypeX string       `json:"communication"` // enum commTypes
	ArgTypeX  string       `json:"argument"`      // enum argTypes
	Timeout   cos.Duration `json:"timeout"`
}

and implementations

func (InitMsgBase) ArgType added in v1.3.19

func (m InitMsgBase) ArgType() string

func (InitMsgBase) CommType

func (m InitMsgBase) CommType() string

func (InitMsgBase) Name

func (m InitMsgBase) Name() string

type InitSpecMsg

type InitSpecMsg struct {
	InitMsgBase
	Spec []byte `json:"spec"`
}

func (*InitSpecMsg) MsgType added in v1.3.19

func (*InitSpecMsg) MsgType() string

func (*InitSpecMsg) String

func (m *InitSpecMsg) String() string

func (*InitSpecMsg) Validate

func (m *InitSpecMsg) Validate() (err error)

type Logs

type Logs struct {
	TargetID string `json:"target_id"`
	Logs     []byte `json:"logs"`
}

func PodLogs

func PodLogs(transformID string) (logs Logs, err error)

type LogsByTarget

type LogsByTarget []Logs

type MD

type MD struct {
	Version int64
	ETLs    ETLs
	Ext     any
}

ETL metadata

func (*MD) Add

func (e *MD) Add(msg InitMsg)

func (*MD) Del

func (e *MD) Del(id string) (deleted bool)

func (*MD) Get

func (e *MD) Get(id string) (msg InitMsg, present bool)

func (*MD) Init

func (e *MD) Init(l int)

func (*MD) JspOpts

func (*MD) JspOpts() jsp.Options

func (*MD) MarshalJSON

func (e *MD) MarshalJSON() ([]byte, error)

func (*MD) String

func (e *MD) String() string

func (*MD) UnmarshalJSON

func (e *MD) UnmarshalJSON(data []byte) (err error)

type OfflineDP

type OfflineDP struct {
	// contains filtered or unexported fields
}

func NewOfflineDP

func NewOfflineDP(msg *apc.TCBMsg, config *cmn.Config) (*OfflineDP, error)

func (*OfflineDP) Reader

func (dp *OfflineDP) Reader(lom *core.LOM, latestVer, sync bool) (cos.ReadOpenCloser, cos.OAH, error)

Returns reader resulting from lom ETL transformation. TODO -- FIXME: comm.OfflineTransform to support latestVer and sync

type StartOpts

type StartOpts struct {
	Env map[string]string
}

Directories

Path Synopsis
Package runtime provides skeletons and static specifications for building ETL from scratch.
Package runtime provides skeletons and static specifications for building ETL from scratch.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL