yt

package
v0.0.0-rc9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 12, 2024 License: Apache-2.0 Imports: 38 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TableProgressRelativePath = abstract.TableLSN // "__data_transfer_lsn"
	TableWAL                  = "__wal"

	ProviderType = abstract.ProviderType("yt")
	StagingType  = abstract.ProviderType("lfstaging")
	CopyType     = abstract.ProviderType("ytcopy")
)

Variables

View Source
var (
	ExePath ypath.Path
)

Functions

func DataplaneExecutablePath

func DataplaneExecutablePath(cluster, revision string) ypath.Path

func DataplaneVersion

func DataplaneVersion() (string, bool)

func FallbackBytesAsStringGoType

func FallbackBytesAsStringGoType(ci *abstract.ChangeItem, cache map[string]*abstract.TableSchema) (*abstract.ChangeItem, error)

func InitExe

func InitExe()

InitExe uploads exe and initializes related variables

func Merge

func Merge(
	ctx context.Context,
	cypressClient yt.CypressClient,
	mrClient mapreduce.Client,
	logger log.Logger,
	mergeNodes map[ypath.Path][]*NodeInfo,
	pathToBinary ypath.Path,
	tableWriterSpec interface{},
) error

func MountAndWaitRecursive

func MountAndWaitRecursive(ctx context.Context, logger log.Logger, client yt.Client, path ypath.Path, params *HandleParams) error

func MoveAndMount

func MoveAndMount(
	ctx context.Context,
	logger log.Logger,
	client yt.Client,
	srcDstMap map[ypath.Path]ypath.Path,
	buildAttrs func(schema schema.Schema) map[string]interface{},
) error

func NewTmpCleaner

func NewTmpCleaner(dst YtDestinationModel, logger log.Logger) (providers.Cleaner, error)

func ParseYtSpec

func ParseYtSpec(jsonStr string, spec *map[string]interface{}) error

func SafeChild

func SafeChild(path ypath.Path, children ...string) ypath.Path

SafeChild appends children to path. It works like path.Child(child) with exceptions. this method assumes:

  1. ypath object is correct, i.e. no trailing path delimiter symbol exists

This method guarantees:

  1. YPath with appended children has deduplicated path delimiters in appended string and no trailing path delimiter would be presented.
  2. TODO(@kry127) TM-6290 not yet guaranteed, but nice to have: special symbols should be replaced

func TXOptions

func TXOptions(txID yt.TxID) *yt.TransactionOptions

func UnmountAndWaitRecursive

func UnmountAndWaitRecursive(ctx context.Context, logger log.Logger, client yt.Client, path ypath.Path, params *HandleParams) error

func WaitMountingPreloadState

func WaitMountingPreloadState(yc yt.Client, path ypath.Path) error

func WithTx

func WithTx(ctx context.Context, client yt.Client, f func(ctx context.Context, tx yt.Tx) error) error

func YTColumnToColSchema

func YTColumnToColSchema(columns []schema.Column) *abstract.TableSchema

Types

type ConnectionData

type ConnectionData struct {
	Hosts                 []string
	Subnet                string
	SecurityGroups        []string
	DisableProxyDiscovery bool
}

type HandleParams

type HandleParams struct {
	ConcurrencyLimit int64
}

func NewHandleParams

func NewHandleParams(concurrencyLimit int64) *HandleParams

type LfStagingDestination

type LfStagingDestination struct {
	Cluster           string
	Topic             string
	YtAccount         string
	LogfellerHomePath string
	TmpBasePath       string

	AggregationPeriod time.Duration

	SecondsPerTmpTable int64
	BytesPerTmpTable   int64

	YtToken string

	UsePersistentIntermediateTables bool
	UseNewMetadataFlow              bool
	MergeYtPool                     string
}

func (*LfStagingDestination) CleanupMode

func (d *LfStagingDestination) CleanupMode() model.CleanupType

func (*LfStagingDestination) GetProviderType

func (d *LfStagingDestination) GetProviderType() abstract.ProviderType

func (LfStagingDestination) IsDestination

func (LfStagingDestination) IsDestination()

func (*LfStagingDestination) Transformer

func (d *LfStagingDestination) Transformer() map[string]string

func (*LfStagingDestination) Validate

func (d *LfStagingDestination) Validate() error

func (*LfStagingDestination) WithDefaults

func (d *LfStagingDestination) WithDefaults()

type NodeAttrs

type NodeAttrs struct {
	Type        yt.NodeType   `yson:"type"`
	Dynamic     bool          `yson:"dynamic"`
	TabletState string        `yson:"tablet_state"`
	Schema      schema.Schema `yson:"schema"`
	OptimizeFor string        `yson:"optimize_for"`
	Atomicity   string        `yson:"atomicity"`
}

type NodeInfo

type NodeInfo struct {
	Name  string
	Path  ypath.Path
	Attrs *NodeAttrs
}

func GetNodeInfo

func GetNodeInfo(ctx context.Context, client yt.CypressClient, path ypath.Path) (*NodeInfo, error)

func ListNodesWithAttrs

func ListNodesWithAttrs(ctx context.Context, client yt.CypressClient, path ypath.Path, prefix string, recursive bool) ([]*NodeInfo, error)

ListNodesWithAttrs returns name-sorted list of nodes with attributes based on specified arguments

func NewNodeInfo

func NewNodeInfo(name string, path ypath.Path, attrs *NodeAttrs) *NodeInfo

type YTSpec

type YTSpec struct {
	// contains filtered or unexported fields
}

func NewYTSpec

func NewYTSpec(config map[string]interface{}) *YTSpec

func (*YTSpec) GetConfig

func (s *YTSpec) GetConfig() map[string]interface{}

func (*YTSpec) IsEmpty

func (s *YTSpec) IsEmpty() bool

func (YTSpec) MarshalBinary

func (s YTSpec) MarshalBinary() (data []byte, err error)

func (YTSpec) MarshalJSON

func (s YTSpec) MarshalJSON() ([]byte, error)

func (*YTSpec) UnmarshalBinary

func (s *YTSpec) UnmarshalBinary(data []byte) error

func (*YTSpec) UnmarshalJSON

func (s *YTSpec) UnmarshalJSON(data []byte) error

type YtCopyDestination

type YtCopyDestination struct {
	Cluster            string
	YtToken            string
	Prefix             string
	Parallelism        uint64
	Pool               string
	UsePushTransaction bool
	ResourceLimits     *spec.ResourceLimits
}

func (*YtCopyDestination) CleanupMode

func (y *YtCopyDestination) CleanupMode() model.CleanupType

func (*YtCopyDestination) CompressionCodec

func (y *YtCopyDestination) CompressionCodec() yt.ClientCompressionCodec

func (*YtCopyDestination) DisableProxyDiscovery

func (y *YtCopyDestination) DisableProxyDiscovery() bool

func (*YtCopyDestination) GetProviderType

func (y *YtCopyDestination) GetProviderType() abstract.ProviderType

func (*YtCopyDestination) IsDestination

func (y *YtCopyDestination) IsDestination()

func (*YtCopyDestination) Proxy

func (y *YtCopyDestination) Proxy() string

func (*YtCopyDestination) SupportMultiThreads

func (y *YtCopyDestination) SupportMultiThreads() bool

func (*YtCopyDestination) SupportMultiWorkers

func (y *YtCopyDestination) SupportMultiWorkers() bool

func (*YtCopyDestination) Token

func (y *YtCopyDestination) Token() string

func (*YtCopyDestination) Transformer

func (y *YtCopyDestination) Transformer() map[string]string

func (*YtCopyDestination) Validate

func (y *YtCopyDestination) Validate() error

func (*YtCopyDestination) WithDefaults

func (y *YtCopyDestination) WithDefaults()

type YtDestination

type YtDestination struct {
	Path           string
	Cluster        string
	Token          string
	PushWal        bool
	NeedArchive    bool
	CellBundle     string
	TTL            int64 // it's in milliseconds
	OptimizeFor    string
	CanAlter       bool
	TimeShardCount int
	Index          []string
	HashColumn     string
	PrimaryMedium  string
	Pool           string       // pool for running merge and sort operations for static tables
	Strict         bool         // DEPRECATED, UNUSED IN NEW DATA PLANE - use LoseDataOnError and Atomicity
	Atomicity      yt.Atomicity // Atomicity for the dynamic tables being created in YT. See https://yt.yandex-team.ru/docs/description/dynamic_tables/sorted_dynamic_tables#atomarnost

	// If true, some errors on data insertion to YT will be skipped, and a warning will be written to the log.
	// Among such errors are:
	// * we were unable to find table schema in cache for some reason: https://github.com/doublecloud/transfer/arcadia/transfer_manager/go/pkg/providers/yt/sink/sink.go?rev=11063561#L482-484
	// * a row (or a value inside a row) being inserted into the YT table has exceeded YT limits (16 MB by default).
	LoseDataOnError bool

	DiscardBigValues         bool
	TabletCount              int // DEPRECATED - remove in March
	Rotation                 *dp_model.RotatorConfig
	VersionColumn            string
	AutoFlushPeriod          int
	Ordered                  bool
	TransformerConfig        map[string]string
	UseStaticTableOnSnapshot bool // optional.Optional[bool] breaks compatibility
	AltNames                 map[string]string
	Cleanup                  dp_model.CleanupType
	Spec                     YTSpec
	TolerateKeyChanges       bool
	InitialTabletCount       uint32
	WriteTimeoutSec          uint32
	ChunkSize                uint32 // ChunkSize defines the number of items in a single request to YT for dynamic sink and chunk size in bytes for static sink
	BufferTriggingSize       uint64
	BufferTriggingInterval   time.Duration
	CompressionCodec         yt.ClientCompressionCodec
	DisableDatetimeHack      bool // This disable old hack for inverting time.Time columns as int64 timestamp for LF>YT
	Connection               ConnectionData
	CustomAttributes         map[string]string

	Static          bool
	SortedStatic    bool // true, if we need to sort static tables
	StaticChunkSize int  // desired size of static table chunk in bytes
}

func (*YtDestination) GetUseStaticTableOnSnapshot

func (d *YtDestination) GetUseStaticTableOnSnapshot() bool

func (*YtDestination) ToStorageParams

func (d *YtDestination) ToStorageParams() *YtStorageParams

type YtDestinationModel

type YtDestinationModel interface {
	dp_model.TmpPolicyProvider
	ytclient.ConnParams

	ToStorageParams() *YtStorageParams

	Path() string
	Cluster() string
	Token() string
	PushWal() bool
	NeedArchive() bool
	CellBundle() string
	TTL() int64
	OptimizeFor() string
	CanAlter() bool
	TimeShardCount() int
	Index() []string
	HashColumn() string
	PrimaryMedium() string
	Pool() string
	Atomicity() yt.Atomicity
	LoseDataOnError() bool
	DiscardBigValues() bool
	TabletCount() int
	Rotation() *dp_model.RotatorConfig
	VersionColumn() string
	AutoFlushPeriod() int
	Ordered() bool
	UseStaticTableOnSnapshot() bool
	AltNames() map[string]string
	Spec() *YTSpec
	TolerateKeyChanges() bool
	InitialTabletCount() uint32
	WriteTimeoutSec() uint32
	ChunkSize() uint32
	BufferTriggingSize() uint64
	BufferTriggingInterval() time.Duration
	Transformer() map[string]string
	CleanupMode() dp_model.CleanupType
	WithDefaults()
	IsDestination()
	GetProviderType() abstract.ProviderType
	GetTableAltName(table string) string
	Validate() error
	SetSnapshotLoad()
	LegacyModel() interface{}
	AllowAlter()
	SetStaticTable()
	SetIndex(index []string)
	SetOrdered()
	CompressionCodec() yt.ClientCompressionCodec

	Static() bool
	SortedStatic() bool
	StaticChunkSize() int

	DisableDatetimeHack() bool // TODO(@kry127) when remove hack?

	GetConnectionData() ConnectionData
	DisableProxyDiscovery() bool

	BuffererConfig() bufferer.BuffererConfig

	SupportSharding() bool

	CustomAttributes() map[string]any
	// MergeAttributes should be used to merge user-defined custom table attributes
	// with arbitrary attribute set (usually table settings like medium, ttl, ...)
	// with the priority to the latter one
	// It guarantees to keep unchanged both the argument and custom attributes map in the model
	MergeAttributes(tableSettings map[string]any) map[string]any
}

func NewYtDestinationV1

func NewYtDestinationV1(model YtDestination) YtDestinationModel

type YtDestinationWrapper

type YtDestinationWrapper struct {
	Model *YtDestination
	// contains filtered or unexported fields
}

func (*YtDestinationWrapper) AllowAlter

func (d *YtDestinationWrapper) AllowAlter()

func (*YtDestinationWrapper) AltNames

func (d *YtDestinationWrapper) AltNames() map[string]string

func (*YtDestinationWrapper) Atomicity

func (d *YtDestinationWrapper) Atomicity() yt.Atomicity

func (*YtDestinationWrapper) AutoFlushPeriod

func (d *YtDestinationWrapper) AutoFlushPeriod() int

func (*YtDestinationWrapper) BufferTriggingInterval

func (d *YtDestinationWrapper) BufferTriggingInterval() time.Duration

func (*YtDestinationWrapper) BufferTriggingSize

func (d *YtDestinationWrapper) BufferTriggingSize() uint64

func (*YtDestinationWrapper) BuffererConfig

func (d *YtDestinationWrapper) BuffererConfig() bufferer.BuffererConfig

func (*YtDestinationWrapper) CanAlter

func (d *YtDestinationWrapper) CanAlter() bool

func (*YtDestinationWrapper) CellBundle

func (d *YtDestinationWrapper) CellBundle() string

func (*YtDestinationWrapper) ChunkSize

func (d *YtDestinationWrapper) ChunkSize() uint32

func (*YtDestinationWrapper) CleanupMode

func (d *YtDestinationWrapper) CleanupMode() dp_model.CleanupType

func (*YtDestinationWrapper) Cluster

func (d *YtDestinationWrapper) Cluster() string

func (*YtDestinationWrapper) CompressionCodec

func (d *YtDestinationWrapper) CompressionCodec() yt.ClientCompressionCodec

func (*YtDestinationWrapper) CustomAttributes

func (d *YtDestinationWrapper) CustomAttributes() map[string]any

func (*YtDestinationWrapper) DisableDatetimeHack

func (d *YtDestinationWrapper) DisableDatetimeHack() bool

TODO: Remove in march

func (*YtDestinationWrapper) DisableProxyDiscovery

func (d *YtDestinationWrapper) DisableProxyDiscovery() bool

func (*YtDestinationWrapper) DiscardBigValues

func (d *YtDestinationWrapper) DiscardBigValues() bool

func (*YtDestinationWrapper) EnsureCustomTmpPolicySupported

func (d *YtDestinationWrapper) EnsureCustomTmpPolicySupported() error

func (*YtDestinationWrapper) EnsureTmpPolicySupported

func (d *YtDestinationWrapper) EnsureTmpPolicySupported() error

func (*YtDestinationWrapper) GetConnectionData

func (d *YtDestinationWrapper) GetConnectionData() ConnectionData

func (*YtDestinationWrapper) GetProviderType

func (d *YtDestinationWrapper) GetProviderType() abstract.ProviderType

func (*YtDestinationWrapper) GetTableAltName

func (d *YtDestinationWrapper) GetTableAltName(table string) string

func (*YtDestinationWrapper) HashColumn

func (d *YtDestinationWrapper) HashColumn() string

func (*YtDestinationWrapper) Index

func (d *YtDestinationWrapper) Index() []string

func (*YtDestinationWrapper) InitialTabletCount

func (d *YtDestinationWrapper) InitialTabletCount() uint32

func (YtDestinationWrapper) IsDestination

func (YtDestinationWrapper) IsDestination()

func (*YtDestinationWrapper) LegacyModel

func (d *YtDestinationWrapper) LegacyModel() interface{}

this is kusok govna, it here for purpose - backward compatibility and no reuse without backward compatibility

func (*YtDestinationWrapper) LoseDataOnError

func (d *YtDestinationWrapper) LoseDataOnError() bool

func (*YtDestinationWrapper) MarshalJSON

func (d *YtDestinationWrapper) MarshalJSON() ([]byte, error)

func (*YtDestinationWrapper) MergeAttributes

func (d *YtDestinationWrapper) MergeAttributes(tableSettings map[string]any) map[string]any

func (*YtDestinationWrapper) NeedArchive

func (d *YtDestinationWrapper) NeedArchive() bool

func (*YtDestinationWrapper) OptimizeFor

func (d *YtDestinationWrapper) OptimizeFor() string

func (*YtDestinationWrapper) Ordered

func (d *YtDestinationWrapper) Ordered() bool

func (*YtDestinationWrapper) Params

func (d *YtDestinationWrapper) Params() string

func (*YtDestinationWrapper) Path

func (d *YtDestinationWrapper) Path() string

func (*YtDestinationWrapper) Pool

func (d *YtDestinationWrapper) Pool() string

func (*YtDestinationWrapper) PostSnapshotHacks

func (d *YtDestinationWrapper) PostSnapshotHacks()

func (*YtDestinationWrapper) PreSnapshotHacks

func (d *YtDestinationWrapper) PreSnapshotHacks()

func (*YtDestinationWrapper) PrimaryMedium

func (d *YtDestinationWrapper) PrimaryMedium() string

func (*YtDestinationWrapper) Proxy

func (d *YtDestinationWrapper) Proxy() string

func (*YtDestinationWrapper) PushWal

func (d *YtDestinationWrapper) PushWal() bool

func (*YtDestinationWrapper) Rotation

func (*YtDestinationWrapper) SetIndex

func (d *YtDestinationWrapper) SetIndex(index []string)

func (*YtDestinationWrapper) SetOrdered

func (d *YtDestinationWrapper) SetOrdered()

func (*YtDestinationWrapper) SetParams

func (d *YtDestinationWrapper) SetParams(jsonStr string) error

func (*YtDestinationWrapper) SetSnapshotLoad

func (d *YtDestinationWrapper) SetSnapshotLoad()

func (*YtDestinationWrapper) SetStaticTable

func (d *YtDestinationWrapper) SetStaticTable()

func (*YtDestinationWrapper) SortedStatic

func (d *YtDestinationWrapper) SortedStatic() bool

func (*YtDestinationWrapper) Spec

func (d *YtDestinationWrapper) Spec() *YTSpec

func (*YtDestinationWrapper) Static

func (d *YtDestinationWrapper) Static() bool

func (*YtDestinationWrapper) StaticChunkSize

func (d *YtDestinationWrapper) StaticChunkSize() int

func (*YtDestinationWrapper) SupportSharding

func (d *YtDestinationWrapper) SupportSharding() bool

func (*YtDestinationWrapper) TTL

func (d *YtDestinationWrapper) TTL() int64

func (*YtDestinationWrapper) TabletCount

func (d *YtDestinationWrapper) TabletCount() int

func (*YtDestinationWrapper) TimeShardCount

func (d *YtDestinationWrapper) TimeShardCount() int

func (*YtDestinationWrapper) ToStorageParams

func (d *YtDestinationWrapper) ToStorageParams() *YtStorageParams

func (*YtDestinationWrapper) Token

func (d *YtDestinationWrapper) Token() string

func (*YtDestinationWrapper) TolerateKeyChanges

func (d *YtDestinationWrapper) TolerateKeyChanges() bool

func (*YtDestinationWrapper) Transformer

func (d *YtDestinationWrapper) Transformer() map[string]string

func (*YtDestinationWrapper) UnmarshalJSON

func (d *YtDestinationWrapper) UnmarshalJSON(data []byte) error

func (*YtDestinationWrapper) UseStaticTableOnSnapshot

func (d *YtDestinationWrapper) UseStaticTableOnSnapshot() bool

func (*YtDestinationWrapper) Validate

func (d *YtDestinationWrapper) Validate() error

func (*YtDestinationWrapper) VersionColumn

func (d *YtDestinationWrapper) VersionColumn() string

func (*YtDestinationWrapper) WithDefaults

func (d *YtDestinationWrapper) WithDefaults()

func (*YtDestinationWrapper) WriteTimeoutSec

func (d *YtDestinationWrapper) WriteTimeoutSec() uint32

type YtSource

type YtSource struct {
	Cluster          string
	Proxy            string
	Paths            []string
	YtToken          string
	RowIdxColumnName string

	DesiredPartSizeBytes int64
	Connection           ConnectionData
}

func (*YtSource) ConnParams

func (s *YtSource) ConnParams() ytclient.ConnParams

func (*YtSource) GetProviderType

func (s *YtSource) GetProviderType() abstract.ProviderType

func (*YtSource) IsAbstract2

func (s *YtSource) IsAbstract2(model.Destination) bool

func (*YtSource) IsAsyncShardPartsSource

func (s *YtSource) IsAsyncShardPartsSource()

func (*YtSource) IsSource

func (s *YtSource) IsSource()

func (*YtSource) IsStrictSource

func (s *YtSource) IsStrictSource()

func (*YtSource) RowIdxEnabled

func (s *YtSource) RowIdxEnabled() bool

func (*YtSource) Validate

func (s *YtSource) Validate() error

func (*YtSource) WithDefaults

func (s *YtSource) WithDefaults()

type YtStorageParams

type YtStorageParams struct {
	Token                 string
	Cluster               string
	Path                  string
	Spec                  map[string]interface{}
	DisableProxyDiscovery bool
}

Directories

Path Synopsis
copy
Used only in sorted_table
Used only in sorted_table
v2

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL