sink

package
v0.0.0-rc2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 19, 2024 License: Apache-2.0 Imports: 46 Imported by: 0

Documentation

Overview

Used only in sorted_table

Description of ordered_table.go: https://st.yandex-team.ru/TM-887#5fbcddfd5372c4026b073bab

Used only in sorted_table

Index

Constants

View Source
const (
	Snapshot = TableStatus("snapshot")
	SyncWait = TableStatus("sync_wait")
	Synced   = TableStatus("synced")
)

Variables

View Source
var MaxRetriesCount uint64 = 10 // For tests only
View Source
var NoKeyColumnsFound = xerrors.New("No key columns found")
View Source
var (
	TableProgressSchema = schema.MustInfer(new(TableProgress))
)
View Source
var WalTableSchema = []abstract.ColSchema{
	{ColumnName: "id", DataType: "int64", PrimaryKey: true},
	{ColumnName: "nextlsn", DataType: "int64", PrimaryKey: true},
	{ColumnName: "txPosition", DataType: "int64", PrimaryKey: true},
	{ColumnName: "commitTime", DataType: "int64"},
	{ColumnName: "tx_id", DataType: "string"},
	{ColumnName: "kind", DataType: "string"},
	{ColumnName: "schema", DataType: "string"},
	{ColumnName: "table", DataType: "string"},
	{ColumnName: "columnnames", DataType: "any"},
	{ColumnName: "columnvalues", DataType: "any"},
	{ColumnName: "table_schema", DataType: "any"},
	{ColumnName: "oldkeys", DataType: "any"},
}

Functions

func CleanupSingleStaticTable

func CleanupSingleStaticTable(ctx context.Context, client yt.Client, logger log.Logger, dirPath ypath.Path, transferID string) error

func IsIncompatibleSchemaErr

func IsIncompatibleSchemaErr(err error) bool

func Merge

func Merge(
	ctx context.Context,
	client yt.Client,
	logger log.Logger,
	dirPath ypath.Path,
	cleanupPath ypath.Path,
	prefix string,
	infix string,
	tmpSuffix string,
	pathToBinary ypath.Path,
	tableWriterSpec interface{},
	buildAttrs func(schema schema.Schema) map[string]interface{},
	tableRotationEnabled bool,
) error

func NewRotatedStaticSink

func NewRotatedStaticSink(cfg yt2.YtDestinationModel, registry metrics.Registry, logger log.Logger, cp coordinator.Coordinator, transferID string) (abstract.Sinker, error)

func NewSinker

func NewSinker(
	cfg yt2.YtDestinationModel,
	transferID string,
	jobIndex int,
	logger log.Logger,
	registry metrics.Registry,
	cp coordinator.Coordinator,
	tmpPolicyConfig *server.TmpPolicyConfig,
) (abstract.Sinker, error)

func Restore

func Restore(colSchema abstract.ColSchema, val interface{}) (interface{}, error)

Types

type BanRow

type BanRow struct {
	Cluster  string    `yson:"cluster,key"`
	Path     string    `yson:"part,key"`
	Table    string    `yson:"table,key"`
	LastFail time.Time `yson:"last_fail"`
}

type GenericTable

type GenericTable interface {
	Write(input []abstract.ChangeItem) error
}

func NewGenericTable

func NewGenericTable(ytClient yt.Client, path ypath.Path, schema []abstract.ColSchema, cfg yt2.YtDestinationModel, metrics *stats.SinkerStats, logger log.Logger) (GenericTable, error)

func NewOrderedTable

func NewOrderedTable(ytClient yt.Client, path ypath.Path, schema []abstract.ColSchema, cfg yt2.YtDestinationModel, metrics *stats.SinkerStats, logger log.Logger) (GenericTable, error)

func NewSortedTable

func NewSortedTable(ytClient yt.Client, path ypath.Path, schema []abstract.ColSchema, cfg yt2.YtDestinationModel, metrics *stats.SinkerStats, logger log.Logger) (GenericTable, error)

func NewVersionedTable

func NewVersionedTable(ytClient yt.Client, path ypath.Path, schema []abstract.ColSchema, cfg yt2.YtDestinationModel, metrics *stats.SinkerStats, logger log.Logger) (GenericTable, error)

type IncompatibleSchemaErr

type IncompatibleSchemaErr struct {
	// contains filtered or unexported fields
}

func NewIncompatibleSchemaErr

func NewIncompatibleSchemaErr(err error) *IncompatibleSchemaErr

func (IncompatibleSchemaErr) Is

func (u IncompatibleSchemaErr) Is(err error) bool

func (IncompatibleSchemaErr) Unwrap

func (u IncompatibleSchemaErr) Unwrap() error

type InsertChangeItem

type InsertChangeItem struct {
	TabletIndex uint32
	ChangeItem  abstract.ChangeItem
}

func (*InsertChangeItem) MarshalYSON

func (i *InsertChangeItem) MarshalYSON(w *yson.Writer) error

type LbOffsetToRowIndexKey

type LbOffsetToRowIndexKey struct {
	TabletIndex uint32 `yson:"tablet_index,key"`
	LbOffset    uint64 `yson:"lb_offset,key"`
}

type LbOffsetToRowIndexVal

type LbOffsetToRowIndexVal struct {
	MinRowIndex uint64 `yson:"min_row_index"`
	MaxRowIndex uint64 `yson:"max_row_index"`
}

type OrderedTable

type OrderedTable struct {
	// contains filtered or unexported fields
}

func (*OrderedTable) Init

func (t *OrderedTable) Init() error

func (*OrderedTable) Write

func (t *OrderedTable) Write(input []abstract.ChangeItem) error

type PartitionToTabletIndexKey

type PartitionToTabletIndexKey struct {
	Partition string `yson:"partition,key"`
}

type PartitionToTabletIndexVal

type PartitionToTabletIndexVal struct {
	TabletIndex  uint32 `yson:"tablet_index"`
	LastLbOffset uint64 `yson:"last_lb_offset"`
}

type Schema

type Schema struct {
	// contains filtered or unexported fields
}

func NewSchema

func NewSchema(cols []abstract.ColSchema, config yt.YtDestinationModel, path ypath.Path) *Schema

func (*Schema) Attrs

func (s *Schema) Attrs() map[string]interface{}

func (*Schema) BuildSchema

func (s *Schema) BuildSchema(schemas []abstract.ColSchema) (*schema.Schema, error)

func (*Schema) Cols

func (s *Schema) Cols() []abstract.ColSchema

func (*Schema) DataKeys

func (s *Schema) DataKeys() []abstract.ColSchema

func (*Schema) IndexTables

func (s *Schema) IndexTables() map[ypath.Path]migrate.Table

func (*Schema) PivotKeys

func (s *Schema) PivotKeys() (pivots []interface{})

func (*Schema) PrimaryKeys

func (s *Schema) PrimaryKeys() []abstract.ColSchema

func (*Schema) ShardCol

func (s *Schema) ShardCol() (abstract.ColSchema, string)

func (*Schema) Table

func (s *Schema) Table() (migrate.Table, error)

type SingleStaticTable

type SingleStaticTable struct {
	// contains filtered or unexported fields
}

SingleStaticTable is a specialization of StaticTable but for single table which behaves like GenericTable for dynamic sinker. Assumed that only one goroutine works with SingleStaticTable at moment, otherwise all methods should be protected with primitives in future

func NewSingleStaticTable

func NewSingleStaticTable(
	ytClient yt.Client,
	dirPath ypath.Path,
	tableName string,
	schema []abstract.ColSchema,
	cfg yt2.YtDestinationModel,
	jobIndex int,
	transferID string,
	cleanupType server.CleanupType,
	metrics *stats.SinkerStats,
	logger log.Logger,
	pathToBinary ypath.Path,
) (*SingleStaticTable, error)

func (*SingleStaticTable) Abort

func (t *SingleStaticTable) Abort() error

func (*SingleStaticTable) Commit

func (t *SingleStaticTable) Commit(ctx context.Context) error

Commit tries to commit the transaction and performs some sorting and moving operations After commit object is not usable

func (*SingleStaticTable) UpdateSchema

func (t *SingleStaticTable) UpdateSchema(schema []abstract.ColSchema)

func (*SingleStaticTable) Write

func (t *SingleStaticTable) Write(input []abstract.ChangeItem) error

Write writes change items to table Note: after calling this method object is still usable, but if error returned: object is NOT usable

type SortedTable

type SortedTable struct {
	// contains filtered or unexported fields
}

func (*SortedTable) Init

func (t *SortedTable) Init() error

func (*SortedTable) Insert

func (t *SortedTable) Insert(insertRows []interface{}) error

func (*SortedTable) Write

func (t *SortedTable) Write(input []abstract.ChangeItem) error

Write accept input which will be collapsed as very first step

type StaticTable

type StaticTable struct {
	// contains filtered or unexported fields
}

func NewStaticTable

func NewStaticTable(ytClient yt.Client, path ypath.Path, ytSpec map[string]interface{}, registry metrics.Registry) *StaticTable

func NewStaticTableFromConfig

func NewStaticTableFromConfig(ytClient yt.Client, cfg yt2.YtDestinationModel, registry metrics.Registry, lgr log.Logger, cp coordinator.Coordinator, transferID string) *StaticTable

func (*StaticTable) Close

func (t *StaticTable) Close() error

func (*StaticTable) Push

func (t *StaticTable) Push(items []abstract.ChangeItem) error

type StaticTableSnapshotState

type StaticTableSnapshotState string // it's like in-place finite-state machine
const (
	StaticTableSnapshotUninitialized StaticTableSnapshotState = ""
	StaticTableSnapshotInitialized   StaticTableSnapshotState = "StaticTableSnapshotInitialized" // after 'init_table_load'
	StaticTableSnapshotActivated     StaticTableSnapshotState = "StaticTableSnapshotActivated"   // 'activation' - it's successfully called checkTable
	StaticTableSnapshotDone          StaticTableSnapshotState = "StaticTableSnapshotDone"        // set in 'commitSnapshot' (on 'done_load_table'), if state is activated
)

type TableProgress

type TableProgress struct {
	TransferID string      `yson:"transfer_id,key"`
	Table      string      `yson:"table,key"`
	LSN        uint64      `yson:"lsn"`
	Status     TableStatus `yson:"status"`
}

type TableStatus

type TableStatus string

type VersionedTable

type VersionedTable struct {
	// contains filtered or unexported fields
}

func (*VersionedTable) Init

func (t *VersionedTable) Init() error

func (*VersionedTable) Write

func (t *VersionedTable) Write(input []abstract.ChangeItem) error

type YtRotationNode

type YtRotationNode struct {
	Name string `yson:",value"`
	Type string `yson:"type,attr"`
	Path string `yson:"path,attr"`
}

Directories

Path Synopsis
v2

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL