Documentation ¶
Overview ¶
Used only in sorted_table
Description of ordered_table.go: https://st.yandex-team.ru/TM-887#5fbcddfd5372c4026b073bab
Used only in sorted_table
Index ¶
- Constants
- Variables
- func CleanupSingleStaticTable(ctx context.Context, client yt.Client, logger log.Logger, dirPath ypath.Path, ...) error
- func IsIncompatibleSchemaErr(err error) bool
- func Merge(ctx context.Context, client yt.Client, logger log.Logger, dirPath ypath.Path, ...) error
- func NewRotatedStaticSink(cfg yt2.YtDestinationModel, registry metrics.Registry, logger log.Logger, ...) (abstract.Sinker, error)
- func NewSinker(cfg yt2.YtDestinationModel, transferID string, jobIndex int, logger log.Logger, ...) (abstract.Sinker, error)
- func Restore(colSchema abstract.ColSchema, val interface{}) (interface{}, error)
- type BanRow
- type GenericTable
- func NewGenericTable(ytClient yt.Client, path ypath.Path, schema []abstract.ColSchema, ...) (GenericTable, error)
- func NewOrderedTable(ytClient yt.Client, path ypath.Path, schema []abstract.ColSchema, ...) (GenericTable, error)
- func NewSortedTable(ytClient yt.Client, path ypath.Path, schema []abstract.ColSchema, ...) (GenericTable, error)
- func NewVersionedTable(ytClient yt.Client, path ypath.Path, schema []abstract.ColSchema, ...) (GenericTable, error)
- type IncompatibleSchemaErr
- type InsertChangeItem
- type LbOffsetToRowIndexKey
- type LbOffsetToRowIndexVal
- type OrderedTable
- type PartitionToTabletIndexKey
- type PartitionToTabletIndexVal
- type Schema
- func (s *Schema) Attrs() map[string]interface{}
- func (s *Schema) BuildSchema(schemas []abstract.ColSchema) (*schema.Schema, error)
- func (s *Schema) Cols() []abstract.ColSchema
- func (s *Schema) DataKeys() []abstract.ColSchema
- func (s *Schema) IndexTables() map[ypath.Path]migrate.Table
- func (s *Schema) PivotKeys() (pivots []interface{})
- func (s *Schema) PrimaryKeys() []abstract.ColSchema
- func (s *Schema) ShardCol() (abstract.ColSchema, string)
- func (s *Schema) Table() (migrate.Table, error)
- type SingleStaticTable
- type SortedTable
- type StaticTable
- type StaticTableSnapshotState
- type TableProgress
- type TableStatus
- type VersionedTable
- type YtRotationNode
Constants ¶
const ( Snapshot = TableStatus("snapshot") SyncWait = TableStatus("sync_wait") Synced = TableStatus("synced") )
Variables ¶
var MaxRetriesCount uint64 = 10 // For tests only
var NoKeyColumnsFound = xerrors.New("No key columns found")
var (
TableProgressSchema = schema.MustInfer(new(TableProgress))
)
var WalTableSchema = []abstract.ColSchema{ {ColumnName: "id", DataType: "int64", PrimaryKey: true}, {ColumnName: "nextlsn", DataType: "int64", PrimaryKey: true}, {ColumnName: "txPosition", DataType: "int64", PrimaryKey: true}, {ColumnName: "commitTime", DataType: "int64"}, {ColumnName: "tx_id", DataType: "string"}, {ColumnName: "kind", DataType: "string"}, {ColumnName: "schema", DataType: "string"}, {ColumnName: "table", DataType: "string"}, {ColumnName: "columnnames", DataType: "any"}, {ColumnName: "columnvalues", DataType: "any"}, {ColumnName: "table_schema", DataType: "any"}, {ColumnName: "oldkeys", DataType: "any"}, }
Functions ¶
func IsIncompatibleSchemaErr ¶
func Merge ¶
func Merge( ctx context.Context, client yt.Client, logger log.Logger, dirPath ypath.Path, cleanupPath ypath.Path, prefix string, infix string, tmpSuffix string, pathToBinary ypath.Path, tableWriterSpec interface{}, buildAttrs func(schema schema.Schema) map[string]interface{}, tableRotationEnabled bool, ) error
func NewRotatedStaticSink ¶
func NewRotatedStaticSink(cfg yt2.YtDestinationModel, registry metrics.Registry, logger log.Logger, cp coordinator.Coordinator, transferID string) (abstract.Sinker, error)
func NewSinker ¶
func NewSinker( cfg yt2.YtDestinationModel, transferID string, jobIndex int, logger log.Logger, registry metrics.Registry, cp coordinator.Coordinator, tmpPolicyConfig *server.TmpPolicyConfig, ) (abstract.Sinker, error)
Types ¶
type GenericTable ¶
type GenericTable interface {
Write(input []abstract.ChangeItem) error
}
func NewGenericTable ¶
func NewGenericTable(ytClient yt.Client, path ypath.Path, schema []abstract.ColSchema, cfg yt2.YtDestinationModel, metrics *stats.SinkerStats, logger log.Logger) (GenericTable, error)
func NewOrderedTable ¶
func NewOrderedTable(ytClient yt.Client, path ypath.Path, schema []abstract.ColSchema, cfg yt2.YtDestinationModel, metrics *stats.SinkerStats, logger log.Logger) (GenericTable, error)
func NewSortedTable ¶
func NewSortedTable(ytClient yt.Client, path ypath.Path, schema []abstract.ColSchema, cfg yt2.YtDestinationModel, metrics *stats.SinkerStats, logger log.Logger) (GenericTable, error)
func NewVersionedTable ¶
func NewVersionedTable(ytClient yt.Client, path ypath.Path, schema []abstract.ColSchema, cfg yt2.YtDestinationModel, metrics *stats.SinkerStats, logger log.Logger) (GenericTable, error)
type IncompatibleSchemaErr ¶
type IncompatibleSchemaErr struct {
// contains filtered or unexported fields
}
func NewIncompatibleSchemaErr ¶
func NewIncompatibleSchemaErr(err error) *IncompatibleSchemaErr
func (IncompatibleSchemaErr) Is ¶
func (u IncompatibleSchemaErr) Is(err error) bool
func (IncompatibleSchemaErr) Unwrap ¶
func (u IncompatibleSchemaErr) Unwrap() error
type InsertChangeItem ¶
type InsertChangeItem struct { TabletIndex uint32 ChangeItem abstract.ChangeItem }
func (*InsertChangeItem) MarshalYSON ¶
func (i *InsertChangeItem) MarshalYSON(w *yson.Writer) error
type LbOffsetToRowIndexKey ¶
type LbOffsetToRowIndexVal ¶
type OrderedTable ¶
type OrderedTable struct {
// contains filtered or unexported fields
}
func (*OrderedTable) Init ¶
func (t *OrderedTable) Init() error
func (*OrderedTable) Write ¶
func (t *OrderedTable) Write(input []abstract.ChangeItem) error
type PartitionToTabletIndexKey ¶
type PartitionToTabletIndexKey struct {
Partition string `yson:"partition,key"`
}
type Schema ¶
type Schema struct {
// contains filtered or unexported fields
}
func (*Schema) BuildSchema ¶
func (*Schema) PrimaryKeys ¶
type SingleStaticTable ¶
type SingleStaticTable struct {
// contains filtered or unexported fields
}
SingleStaticTable is a specialization of StaticTable but for single table which behaves like GenericTable for dynamic sinker. Assumed that only one goroutine works with SingleStaticTable at moment, otherwise all methods should be protected with primitives in future
func NewSingleStaticTable ¶
func NewSingleStaticTable( ytClient yt.Client, dirPath ypath.Path, tableName string, schema []abstract.ColSchema, cfg yt2.YtDestinationModel, jobIndex int, transferID string, cleanupType server.CleanupType, metrics *stats.SinkerStats, logger log.Logger, pathToBinary ypath.Path, ) (*SingleStaticTable, error)
func (*SingleStaticTable) Abort ¶
func (t *SingleStaticTable) Abort() error
func (*SingleStaticTable) Commit ¶
func (t *SingleStaticTable) Commit(ctx context.Context) error
Commit tries to commit the transaction and performs some sorting and moving operations After commit object is not usable
func (*SingleStaticTable) UpdateSchema ¶
func (t *SingleStaticTable) UpdateSchema(schema []abstract.ColSchema)
func (*SingleStaticTable) Write ¶
func (t *SingleStaticTable) Write(input []abstract.ChangeItem) error
Write writes change items to table Note: after calling this method object is still usable, but if error returned: object is NOT usable
type SortedTable ¶
type SortedTable struct {
// contains filtered or unexported fields
}
func (*SortedTable) Init ¶
func (t *SortedTable) Init() error
func (*SortedTable) Insert ¶
func (t *SortedTable) Insert(insertRows []interface{}) error
func (*SortedTable) Write ¶
func (t *SortedTable) Write(input []abstract.ChangeItem) error
Write accept input which will be collapsed as very first step
type StaticTable ¶
type StaticTable struct {
// contains filtered or unexported fields
}
func NewStaticTable ¶
func NewStaticTableFromConfig ¶
func NewStaticTableFromConfig(ytClient yt.Client, cfg yt2.YtDestinationModel, registry metrics.Registry, lgr log.Logger, cp coordinator.Coordinator, transferID string) *StaticTable
func (*StaticTable) Close ¶
func (t *StaticTable) Close() error
func (*StaticTable) Push ¶
func (t *StaticTable) Push(items []abstract.ChangeItem) error
type StaticTableSnapshotState ¶
type StaticTableSnapshotState string // it's like in-place finite-state machine
const ( StaticTableSnapshotUninitialized StaticTableSnapshotState = "" StaticTableSnapshotInitialized StaticTableSnapshotState = "StaticTableSnapshotInitialized" // after 'init_table_load' StaticTableSnapshotActivated StaticTableSnapshotState = "StaticTableSnapshotActivated" // 'activation' - it's successfully called checkTable StaticTableSnapshotDone StaticTableSnapshotState = "StaticTableSnapshotDone" // set in 'commitSnapshot' (on 'done_load_table'), if state is activated )
type TableProgress ¶
type TableProgress struct { TransferID string `yson:"transfer_id,key"` Table string `yson:"table,key"` LSN uint64 `yson:"lsn"` Status TableStatus `yson:"status"` }
type TableStatus ¶
type TableStatus string
type VersionedTable ¶
type VersionedTable struct {
// contains filtered or unexported fields
}
func (*VersionedTable) Init ¶
func (t *VersionedTable) Init() error
func (*VersionedTable) Write ¶
func (t *VersionedTable) Write(input []abstract.ChangeItem) error