yt

package
v0.0.0-rc14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 20, 2024 License: Apache-2.0 Imports: 37 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TableProgressRelativePath = abstract.TableLSN // "__data_transfer_lsn"
	TableWAL                  = "__wal"

	ProviderType = abstract.ProviderType("yt")
	StagingType  = abstract.ProviderType("lfstaging")
	CopyType     = abstract.ProviderType("ytcopy")
)

Variables

View Source
var (
	ExePath ypath.Path
)

Functions

func DataplaneExecutablePath

func DataplaneExecutablePath(cluster, revision string) ypath.Path

func DataplaneVersion

func DataplaneVersion() (string, bool)

func InitExe

func InitExe()

InitExe uploads exe and initializes related variables

func Merge

func Merge(
	ctx context.Context,
	cypressClient yt.CypressClient,
	mrClient mapreduce.Client,
	logger log.Logger,
	mergeNodes map[ypath.Path][]*NodeInfo,
	pathToBinary ypath.Path,
	tableWriterSpec interface{},
) error

func MountAndWaitRecursive

func MountAndWaitRecursive(ctx context.Context, logger log.Logger, client yt.Client, path ypath.Path, params *HandleParams) error

func MoveAndMount

func MoveAndMount(
	ctx context.Context,
	logger log.Logger,
	client yt.Client,
	srcDstMap map[ypath.Path]ypath.Path,
	buildAttrs func(schema schema.Schema) map[string]interface{},
) error

func NewTmpCleaner

func NewTmpCleaner(dst YtDestinationModel, logger log.Logger) (providers.Cleaner, error)

func ParseYtSpec

func ParseYtSpec(jsonStr string, spec *map[string]interface{}) error

func SafeChild

func SafeChild(path ypath.Path, children ...string) ypath.Path

SafeChild appends children to path. It works like path.Child(child) with exceptions. this method assumes:

  1. ypath object is correct, i.e. no trailing path delimiter symbol exists

This method guarantees:

  1. YPath with appended children has deduplicated path delimiters in appended string and no trailing path delimiter would be presented.
  2. TODO(@kry127) TM-6290 not yet guaranteed, but nice to have: special symbols should be replaced

func TXOptions

func TXOptions(txID yt.TxID) *yt.TransactionOptions

func UnmountAndWaitRecursive

func UnmountAndWaitRecursive(ctx context.Context, logger log.Logger, client yt.Client, path ypath.Path, params *HandleParams) error

func WaitMountingPreloadState

func WaitMountingPreloadState(yc yt.Client, path ypath.Path) error

func WithTx

func WithTx(ctx context.Context, client yt.Client, f func(ctx context.Context, tx yt.Tx) error) error

func YTColumnToColSchema

func YTColumnToColSchema(columns []schema.Column) *abstract.TableSchema

Types

type ConnectionData

type ConnectionData struct {
	Hosts                 []string
	Subnet                string
	SecurityGroups        []string
	DisableProxyDiscovery bool
}

type HandleParams

type HandleParams struct {
	ConcurrencyLimit int64
}

func NewHandleParams

func NewHandleParams(concurrencyLimit int64) *HandleParams

type LfStagingDestination

type LfStagingDestination struct {
	Cluster           string
	Topic             string
	YtAccount         string
	LogfellerHomePath string
	TmpBasePath       string

	AggregationPeriod time.Duration

	SecondsPerTmpTable int64
	BytesPerTmpTable   int64

	YtToken string

	UsePersistentIntermediateTables bool
	UseNewMetadataFlow              bool
	MergeYtPool                     string
}

func (*LfStagingDestination) CleanupMode

func (d *LfStagingDestination) CleanupMode() model.CleanupType

func (*LfStagingDestination) GetProviderType

func (d *LfStagingDestination) GetProviderType() abstract.ProviderType

func (LfStagingDestination) IsDestination

func (LfStagingDestination) IsDestination()

func (*LfStagingDestination) Transformer

func (d *LfStagingDestination) Transformer() map[string]string

func (*LfStagingDestination) Validate

func (d *LfStagingDestination) Validate() error

func (*LfStagingDestination) WithDefaults

func (d *LfStagingDestination) WithDefaults()

type NodeAttrs

type NodeAttrs struct {
	Type        yt.NodeType   `yson:"type"`
	Dynamic     bool          `yson:"dynamic"`
	TabletState string        `yson:"tablet_state"`
	Schema      schema.Schema `yson:"schema"`
	OptimizeFor string        `yson:"optimize_for"`
	Atomicity   string        `yson:"atomicity"`
}

type NodeInfo

type NodeInfo struct {
	Name  string
	Path  ypath.Path
	Attrs *NodeAttrs
}

func GetNodeInfo

func GetNodeInfo(ctx context.Context, client yt.CypressClient, path ypath.Path) (*NodeInfo, error)

func ListNodesWithAttrs

func ListNodesWithAttrs(ctx context.Context, client yt.CypressClient, path ypath.Path, prefix string, recursive bool) ([]*NodeInfo, error)

ListNodesWithAttrs returns name-sorted list of nodes with attributes based on specified arguments

func NewNodeInfo

func NewNodeInfo(name string, path ypath.Path, attrs *NodeAttrs) *NodeInfo

type YTSpec

type YTSpec struct {
	// contains filtered or unexported fields
}

func NewYTSpec

func NewYTSpec(config map[string]interface{}) *YTSpec

func (*YTSpec) GetConfig

func (s *YTSpec) GetConfig() map[string]interface{}

func (*YTSpec) IsEmpty

func (s *YTSpec) IsEmpty() bool

func (YTSpec) MarshalBinary

func (s YTSpec) MarshalBinary() (data []byte, err error)

func (YTSpec) MarshalJSON

func (s YTSpec) MarshalJSON() ([]byte, error)

func (*YTSpec) UnmarshalBinary

func (s *YTSpec) UnmarshalBinary(data []byte) error

func (*YTSpec) UnmarshalJSON

func (s *YTSpec) UnmarshalJSON(data []byte) error

type YtCopyDestination

type YtCopyDestination struct {
	Cluster            string
	YtToken            string
	Prefix             string
	Parallelism        uint64
	Pool               string
	UsePushTransaction bool
	ResourceLimits     *spec.ResourceLimits
}

func (*YtCopyDestination) CleanupMode

func (y *YtCopyDestination) CleanupMode() model.CleanupType

func (*YtCopyDestination) CompressionCodec

func (y *YtCopyDestination) CompressionCodec() yt.ClientCompressionCodec

func (*YtCopyDestination) DisableProxyDiscovery

func (y *YtCopyDestination) DisableProxyDiscovery() bool

func (*YtCopyDestination) GetProviderType

func (y *YtCopyDestination) GetProviderType() abstract.ProviderType

func (*YtCopyDestination) IsDestination

func (y *YtCopyDestination) IsDestination()

func (*YtCopyDestination) Proxy

func (y *YtCopyDestination) Proxy() string

func (*YtCopyDestination) SupportMultiThreads

func (y *YtCopyDestination) SupportMultiThreads() bool

func (*YtCopyDestination) SupportMultiWorkers

func (y *YtCopyDestination) SupportMultiWorkers() bool

func (*YtCopyDestination) Token

func (y *YtCopyDestination) Token() string

func (*YtCopyDestination) Transformer

func (y *YtCopyDestination) Transformer() map[string]string

func (*YtCopyDestination) Validate

func (y *YtCopyDestination) Validate() error

func (*YtCopyDestination) WithDefaults

func (y *YtCopyDestination) WithDefaults()

type YtDestination

type YtDestination struct {
	Path           string
	Cluster        string
	Token          string
	PushWal        bool
	NeedArchive    bool
	CellBundle     string
	TTL            int64 // it's in milliseconds
	OptimizeFor    string
	CanAlter       bool
	TimeShardCount int
	Index          []string
	HashColumn     string
	PrimaryMedium  string
	Pool           string       // pool for running merge and sort operations for static tables
	Strict         bool         // DEPRECATED, UNUSED IN NEW DATA PLANE - use LoseDataOnError and Atomicity
	Atomicity      yt.Atomicity // Atomicity for the dynamic tables being created in YT. See https://yt.yandex-team.ru/docs/description/dynamic_tables/sorted_dynamic_tables#atomarnost

	// If true, some errors on data insertion to YT will be skipped, and a warning will be written to the log.
	// Among such errors are:
	// * we were unable to find table schema in cache for some reason: https://github.com/doublecloud/transfer/arcadia/transfer_manager/go/pkg/providers/yt/sink/sink.go?rev=11063561#L482-484
	// * a row (or a value inside a row) being inserted into the YT table has exceeded YT limits (16 MB by default).
	LoseDataOnError bool

	DiscardBigValues         bool
	TabletCount              int // DEPRECATED - remove in March
	Rotation                 *dp_model.RotatorConfig
	VersionColumn            string
	AutoFlushPeriod          int
	Ordered                  bool
	TransformerConfig        map[string]string
	UseStaticTableOnSnapshot bool // optional.Optional[bool] breaks compatibility
	AltNames                 map[string]string
	Cleanup                  dp_model.CleanupType
	Spec                     YTSpec
	TolerateKeyChanges       bool
	InitialTabletCount       uint32
	WriteTimeoutSec          uint32
	ChunkSize                uint32 // ChunkSize defines the number of items in a single request to YT for dynamic sink and chunk size in bytes for static sink
	BufferTriggingSize       uint64
	BufferTriggingInterval   time.Duration
	CompressionCodec         yt.ClientCompressionCodec
	DisableDatetimeHack      bool // This disable old hack for inverting time.Time columns as int64 timestamp for LF>YT
	Connection               ConnectionData
	CustomAttributes         map[string]string

	Static          bool
	SortedStatic    bool // true, if we need to sort static tables
	StaticChunkSize int  // desired size of static table chunk in bytes
}

func (*YtDestination) GetUseStaticTableOnSnapshot

func (d *YtDestination) GetUseStaticTableOnSnapshot() bool

func (*YtDestination) ToStorageParams

func (d *YtDestination) ToStorageParams() *YtStorageParams

type YtDestinationModel

type YtDestinationModel interface {
	dp_model.TmpPolicyProvider
	ytclient.ConnParams

	ToStorageParams() *YtStorageParams

	Path() string
	Cluster() string
	Token() string
	PushWal() bool
	NeedArchive() bool
	CellBundle() string
	TTL() int64
	OptimizeFor() string
	CanAlter() bool
	TimeShardCount() int
	Index() []string
	HashColumn() string
	PrimaryMedium() string
	Pool() string
	Atomicity() yt.Atomicity
	LoseDataOnError() bool
	DiscardBigValues() bool
	TabletCount() int
	Rotation() *dp_model.RotatorConfig
	VersionColumn() string
	AutoFlushPeriod() int
	Ordered() bool
	UseStaticTableOnSnapshot() bool
	AltNames() map[string]string
	Spec() *YTSpec
	TolerateKeyChanges() bool
	InitialTabletCount() uint32
	WriteTimeoutSec() uint32
	ChunkSize() uint32
	BufferTriggingSize() uint64
	BufferTriggingInterval() time.Duration
	Transformer() map[string]string
	CleanupMode() dp_model.CleanupType
	WithDefaults()
	IsDestination()
	GetProviderType() abstract.ProviderType
	GetTableAltName(table string) string
	Validate() error
	SetSnapshotLoad()
	LegacyModel() interface{}
	AllowAlter()
	SetStaticTable()
	SetIndex(index []string)
	SetOrdered()
	CompressionCodec() yt.ClientCompressionCodec

	Static() bool
	SortedStatic() bool
	StaticChunkSize() int

	DisableDatetimeHack() bool // TODO(@kry127) when remove hack?

	GetConnectionData() ConnectionData
	DisableProxyDiscovery() bool

	BuffererConfig() bufferer.BuffererConfig

	SupportSharding() bool

	CustomAttributes() map[string]any
	// MergeAttributes should be used to merge user-defined custom table attributes
	// with arbitrary attribute set (usually table settings like medium, ttl, ...)
	// with the priority to the latter one
	// It guarantees to keep unchanged both the argument and custom attributes map in the model
	MergeAttributes(tableSettings map[string]any) map[string]any
}

func NewYtDestinationV1

func NewYtDestinationV1(model YtDestination) YtDestinationModel

type YtDestinationWrapper

type YtDestinationWrapper struct {
	Model *YtDestination
	// contains filtered or unexported fields
}

func (*YtDestinationWrapper) AllowAlter

func (d *YtDestinationWrapper) AllowAlter()

func (*YtDestinationWrapper) AltNames

func (d *YtDestinationWrapper) AltNames() map[string]string

func (*YtDestinationWrapper) Atomicity

func (d *YtDestinationWrapper) Atomicity() yt.Atomicity

func (*YtDestinationWrapper) AutoFlushPeriod

func (d *YtDestinationWrapper) AutoFlushPeriod() int

func (*YtDestinationWrapper) BufferTriggingInterval

func (d *YtDestinationWrapper) BufferTriggingInterval() time.Duration

func (*YtDestinationWrapper) BufferTriggingSize

func (d *YtDestinationWrapper) BufferTriggingSize() uint64

func (*YtDestinationWrapper) BuffererConfig

func (d *YtDestinationWrapper) BuffererConfig() bufferer.BuffererConfig

func (*YtDestinationWrapper) CanAlter

func (d *YtDestinationWrapper) CanAlter() bool

func (*YtDestinationWrapper) CellBundle

func (d *YtDestinationWrapper) CellBundle() string

func (*YtDestinationWrapper) ChunkSize

func (d *YtDestinationWrapper) ChunkSize() uint32

func (*YtDestinationWrapper) CleanupMode

func (d *YtDestinationWrapper) CleanupMode() dp_model.CleanupType

func (*YtDestinationWrapper) Cluster

func (d *YtDestinationWrapper) Cluster() string

func (*YtDestinationWrapper) CompressionCodec

func (d *YtDestinationWrapper) CompressionCodec() yt.ClientCompressionCodec

func (*YtDestinationWrapper) CustomAttributes

func (d *YtDestinationWrapper) CustomAttributes() map[string]any

func (*YtDestinationWrapper) DisableDatetimeHack

func (d *YtDestinationWrapper) DisableDatetimeHack() bool

TODO: Remove in march

func (*YtDestinationWrapper) DisableProxyDiscovery

func (d *YtDestinationWrapper) DisableProxyDiscovery() bool

func (*YtDestinationWrapper) DiscardBigValues

func (d *YtDestinationWrapper) DiscardBigValues() bool

func (*YtDestinationWrapper) EnsureCustomTmpPolicySupported

func (d *YtDestinationWrapper) EnsureCustomTmpPolicySupported() error

func (*YtDestinationWrapper) EnsureTmpPolicySupported

func (d *YtDestinationWrapper) EnsureTmpPolicySupported() error

func (*YtDestinationWrapper) GetConnectionData

func (d *YtDestinationWrapper) GetConnectionData() ConnectionData

func (*YtDestinationWrapper) GetProviderType

func (d *YtDestinationWrapper) GetProviderType() abstract.ProviderType

func (*YtDestinationWrapper) GetTableAltName

func (d *YtDestinationWrapper) GetTableAltName(table string) string

func (*YtDestinationWrapper) HashColumn

func (d *YtDestinationWrapper) HashColumn() string

func (*YtDestinationWrapper) Index

func (d *YtDestinationWrapper) Index() []string

func (*YtDestinationWrapper) InitialTabletCount

func (d *YtDestinationWrapper) InitialTabletCount() uint32

func (YtDestinationWrapper) IsDestination

func (YtDestinationWrapper) IsDestination()

func (*YtDestinationWrapper) LegacyModel

func (d *YtDestinationWrapper) LegacyModel() interface{}

this is kusok govna, it here for purpose - backward compatibility and no reuse without backward compatibility

func (*YtDestinationWrapper) LoseDataOnError

func (d *YtDestinationWrapper) LoseDataOnError() bool

func (*YtDestinationWrapper) MarshalJSON

func (d *YtDestinationWrapper) MarshalJSON() ([]byte, error)

func (*YtDestinationWrapper) MergeAttributes

func (d *YtDestinationWrapper) MergeAttributes(tableSettings map[string]any) map[string]any

func (*YtDestinationWrapper) NeedArchive

func (d *YtDestinationWrapper) NeedArchive() bool

func (*YtDestinationWrapper) OptimizeFor

func (d *YtDestinationWrapper) OptimizeFor() string

func (*YtDestinationWrapper) Ordered

func (d *YtDestinationWrapper) Ordered() bool

func (*YtDestinationWrapper) Params

func (d *YtDestinationWrapper) Params() string

func (*YtDestinationWrapper) Path

func (d *YtDestinationWrapper) Path() string

func (*YtDestinationWrapper) Pool

func (d *YtDestinationWrapper) Pool() string

func (*YtDestinationWrapper) PostSnapshotHacks

func (d *YtDestinationWrapper) PostSnapshotHacks()

func (*YtDestinationWrapper) PreSnapshotHacks

func (d *YtDestinationWrapper) PreSnapshotHacks()

func (*YtDestinationWrapper) PrimaryMedium

func (d *YtDestinationWrapper) PrimaryMedium() string

func (*YtDestinationWrapper) Proxy

func (d *YtDestinationWrapper) Proxy() string

func (*YtDestinationWrapper) PushWal

func (d *YtDestinationWrapper) PushWal() bool

func (*YtDestinationWrapper) Rotation

func (*YtDestinationWrapper) SetIndex

func (d *YtDestinationWrapper) SetIndex(index []string)

func (*YtDestinationWrapper) SetOrdered

func (d *YtDestinationWrapper) SetOrdered()

func (*YtDestinationWrapper) SetParams

func (d *YtDestinationWrapper) SetParams(jsonStr string) error

func (*YtDestinationWrapper) SetSnapshotLoad

func (d *YtDestinationWrapper) SetSnapshotLoad()

func (*YtDestinationWrapper) SetStaticTable

func (d *YtDestinationWrapper) SetStaticTable()

func (*YtDestinationWrapper) SortedStatic

func (d *YtDestinationWrapper) SortedStatic() bool

func (*YtDestinationWrapper) Spec

func (d *YtDestinationWrapper) Spec() *YTSpec

func (*YtDestinationWrapper) Static

func (d *YtDestinationWrapper) Static() bool

func (*YtDestinationWrapper) StaticChunkSize

func (d *YtDestinationWrapper) StaticChunkSize() int

func (*YtDestinationWrapper) SupportSharding

func (d *YtDestinationWrapper) SupportSharding() bool

func (*YtDestinationWrapper) TTL

func (d *YtDestinationWrapper) TTL() int64

func (*YtDestinationWrapper) TabletCount

func (d *YtDestinationWrapper) TabletCount() int

func (*YtDestinationWrapper) TimeShardCount

func (d *YtDestinationWrapper) TimeShardCount() int

func (*YtDestinationWrapper) ToStorageParams

func (d *YtDestinationWrapper) ToStorageParams() *YtStorageParams

func (*YtDestinationWrapper) Token

func (d *YtDestinationWrapper) Token() string

func (*YtDestinationWrapper) TolerateKeyChanges

func (d *YtDestinationWrapper) TolerateKeyChanges() bool

func (*YtDestinationWrapper) Transformer

func (d *YtDestinationWrapper) Transformer() map[string]string

func (*YtDestinationWrapper) UnmarshalJSON

func (d *YtDestinationWrapper) UnmarshalJSON(data []byte) error

func (*YtDestinationWrapper) UseStaticTableOnSnapshot

func (d *YtDestinationWrapper) UseStaticTableOnSnapshot() bool

func (*YtDestinationWrapper) Validate

func (d *YtDestinationWrapper) Validate() error

func (*YtDestinationWrapper) VersionColumn

func (d *YtDestinationWrapper) VersionColumn() string

func (*YtDestinationWrapper) WithDefaults

func (d *YtDestinationWrapper) WithDefaults()

func (*YtDestinationWrapper) WriteTimeoutSec

func (d *YtDestinationWrapper) WriteTimeoutSec() uint32

type YtSource

type YtSource struct {
	Cluster          string
	Proxy            string
	Paths            []string
	YtToken          string
	RowIdxColumnName string

	DesiredPartSizeBytes int64
	Connection           ConnectionData
}

func (*YtSource) ConnParams

func (s *YtSource) ConnParams() ytclient.ConnParams

func (*YtSource) GetProviderType

func (s *YtSource) GetProviderType() abstract.ProviderType

func (*YtSource) IsAbstract2

func (s *YtSource) IsAbstract2(model.Destination) bool

func (*YtSource) IsAsyncShardPartsSource

func (s *YtSource) IsAsyncShardPartsSource()

func (*YtSource) IsSource

func (s *YtSource) IsSource()

func (*YtSource) IsStrictSource

func (s *YtSource) IsStrictSource()

func (*YtSource) RowIdxEnabled

func (s *YtSource) RowIdxEnabled() bool

func (*YtSource) Validate

func (s *YtSource) Validate() error

func (*YtSource) WithDefaults

func (s *YtSource) WithDefaults()

type YtStorageParams

type YtStorageParams struct {
	Token                 string
	Cluster               string
	Path                  string
	Spec                  map[string]interface{}
	DisableProxyDiscovery bool
}

Directories

Path Synopsis
copy
Used only in sorted_table
Used only in sorted_table
v2

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL