ydb

package
v0.0.0-rc9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 12, 2024 License: Apache-2.0 Imports: 64 Imported by: 0

Documentation

Index

Constants

View Source
const ProviderType = abstract.ProviderType("ydb")
View Source
const (
	YDBRelativePathTransformerType = "ydb-path-relativizer-transformer"
)

Variables

View Source
var JWTCredentials = func(content string, tokenServiceURL string) (TokenCredentials, error) {
	return nil, xerrors.Errorf("not implemented")
}
View Source
var NewYDBCredsFromYCCreds = func(ycCreds Credentials, tokenService string) TokenCredentials {
	return nil
}
View Source
var SchemaMismatchErr = xerrors.New("table deleted, due schema mismatch")
View Source
var TypeYdbDecimal types.Type = types.DecimalType(22, 9)

Functions

func ConvertTableMapToYDBRelPath

func ConvertTableMapToYDBRelPath(params *YdbStorageParams, tableMap abstract.TableMap) abstract.TableMap

func CreateChangeFeed

func CreateChangeFeed(cfg *YdbSource, transferID string) error

func CreateChangeFeedIfNotExists

func CreateChangeFeedIfNotExists(cfg *YdbSource, transferID string) error

func DropChangeFeed

func DropChangeFeed(cfg *YdbSource, transferID string) error

func Fqtn

func Fqtn(tid abstract.TableID) string

func FromYdbSchema

func FromYdbSchema(original []options.Column, keys []string) abstract.TableColumns

func MakeYDBRelPath

func MakeYDBRelPath(useFullPaths bool, paths []string, tableName string) string

func New

func NewSinker

func NewSinker(lgr log.Logger, cfg *YdbDestination, mtrcs metrics.Registry) (abstract.Sinker, error)

func NewYDBDriver

func NewYDBDriver(ctx context.Context, database, instance string, credentials credentials.Credentials, tlsConfig *tls.Config) (*ydb3.Driver, error)

Types

type AllowedIn

type AllowedIn string
const (
	BOTH AllowedIn = "both"
	OLTP AllowedIn = "oltp"
	OLAP AllowedIn = "olap"
)

type AlterTableTemplate

type AlterTableTemplate struct {
	Path        string
	AddColumns  []ColumnTemplate
	DropColumns []string
}

type ChangeFeedModeType

type ChangeFeedModeType string
const (
	ChangeFeedModeUpdates         ChangeFeedModeType = "UPDATES"
	ChangeFeedModeNewImage        ChangeFeedModeType = "NEW_IMAGE"
	ChangeFeedModeNewAndOldImages ChangeFeedModeType = "NEW_AND_OLD_IMAGES"
)

type ColumnTemplate

type ColumnTemplate struct {
	Name string
	Type string
	// For now is supported only for primary keys in OLAP tables
	NotNull bool
}

type CreateTableTemplate

type CreateTableTemplate struct {
	Path                  string
	Columns               []ColumnTemplate
	Keys                  []string
	ShardCount            int64
	IsTableColumnOriented bool
	DefaultCompression    string
}

type Credentials

type Credentials interface {
	// YandexCloudAPICredentials is a marker method. All compatible Credentials implementations have it
	YandexCloudAPICredentials()
}

Credentials is an abstraction of API authorization credentials. See https://cloud.yandex.ru/docs/iam/concepts/authorization/authorization for details. Note that functions that return Credentials may return different Credentials implementation in next SDK version, and this is not considered breaking change.

type DropTableTemplate

type DropTableTemplate struct {
	Path string
}

type JWTAuthParams

type JWTAuthParams struct {
	KeyContent      string
	TokenServiceURL string
}

type Provider

type Provider struct {
	// contains filtered or unexported fields
}

func (*Provider) Activate

func (p *Provider) Activate(ctx context.Context, task *model.TransferOperation, tables abstract.TableMap, callbacks providers.ActivateCallbacks) error

func (*Provider) Cleanup

func (p *Provider) Cleanup(ctx context.Context, task *model.TransferOperation) error

func (*Provider) Deactivate

func (p *Provider) Deactivate(ctx context.Context, task *model.TransferOperation) error

func (*Provider) Sink

func (*Provider) Source

func (p *Provider) Source() (abstract.Source, error)

func (*Provider) Storage

func (p *Provider) Storage() (abstract.Storage, error)

func (*Provider) Type

func (p *Provider) Type() abstract.ProviderType

type Source

type Source struct {
	// contains filtered or unexported fields
}

func NewSource

func NewSource(transferID string, cfg *YdbSource, logger log.Logger, _ metrics.Registry) (*Source, error)

func (*Source) Run

func (s *Source) Run(sink abstract.AsyncSink) error

func (*Source) Stop

func (s *Source) Stop()

type Storage

type Storage struct {
	// contains filtered or unexported fields
}

func NewStorage

func NewStorage(cfg *YdbStorageParams) (*Storage, error)

func (*Storage) Close

func (s *Storage) Close()

func (*Storage) EstimateTableRowsCount

func (s *Storage) EstimateTableRowsCount(tid abstract.TableID) (uint64, error)

func (*Storage) ExactTableRowsCount

func (s *Storage) ExactTableRowsCount(tid abstract.TableID) (uint64, error)

func (*Storage) GetIncrementalState

func (s *Storage) GetIncrementalState(ctx context.Context, incremental []abstract.IncrementalTable) ([]abstract.TableDescription, error)

func (*Storage) LoadRandomSample

func (s *Storage) LoadRandomSample(table abstract.TableDescription, pusher abstract.Pusher) error

func (*Storage) LoadSampleBySet

func (s *Storage) LoadSampleBySet(table abstract.TableDescription, keySet []map[string]interface{}, pusher abstract.Pusher) error

func (*Storage) LoadTable

func (s *Storage) LoadTable(ctx context.Context, tableDescr abstract.TableDescription, pusher abstract.Pusher) error

func (*Storage) LoadTopBottomSample

func (s *Storage) LoadTopBottomSample(table abstract.TableDescription, pusher abstract.Pusher) error

func (*Storage) Ping

func (s *Storage) Ping() error

func (*Storage) SetInitialState

func (s *Storage) SetInitialState(tables []abstract.TableDescription, incremental []abstract.IncrementalTable)

func (*Storage) TableAccessible

func (s *Storage) TableAccessible(table abstract.TableDescription) bool

func (*Storage) TableExists

func (s *Storage) TableExists(tid abstract.TableID) (bool, error)

func (*Storage) TableList

func (s *Storage) TableList(includeTableFilter abstract.IncludeTableList) (abstract.TableMap, error)

func (*Storage) TableSchema

func (s *Storage) TableSchema(ctx context.Context, tableID abstract.TableID) (*abstract.TableSchema, error)

func (*Storage) TableSizeInBytes

func (s *Storage) TableSizeInBytes(table abstract.TableID) (uint64, error)

type TemplateCol

type TemplateCol struct{ Name, Typ, Comma string }

type TemplateModel

type TemplateModel struct {
	Cols []TemplateCol
	Path string
}

type TokenCredentials

type TokenCredentials interface {
	Token(context.Context) (string, error)
}

TokenCredentials is an interface that contains options used to authorize a client.

func ResolveCredentials

func ResolveCredentials(
	userDataAuth bool,
	oauthToken string,
	jwt JWTAuthParams,
	serviceAccountID string,
	logger log.Logger,
) (TokenCredentials, error)

type YDBPathRelativizerTransformer

type YDBPathRelativizerTransformer struct {
	Paths []string
}

func NewYDBRelativePathTransformer

func NewYDBRelativePathTransformer(paths []string) *YDBPathRelativizerTransformer

func (*YDBPathRelativizerTransformer) Apply

func (*YDBPathRelativizerTransformer) Description

func (r *YDBPathRelativizerTransformer) Description() string

func (*YDBPathRelativizerTransformer) ResultSchema

func (*YDBPathRelativizerTransformer) Suitable

func (*YDBPathRelativizerTransformer) Type

type YdbColumnsFilter

type YdbColumnsFilter struct {
	TableNamesRegexp  string
	ColumnNamesRegexp string
	Type              YdbColumnsFilterType
}

type YdbColumnsFilterType

type YdbColumnsFilterType string
const (
	YdbColumnsBlackList YdbColumnsFilterType = "blacklist"
	YdbColumnsWhiteList YdbColumnsFilterType = "whitelist"
)

type YdbDestination

type YdbDestination struct {
	Token                   model.SecretString
	Database                string
	Path                    string
	Instance                string
	LegacyWriter            bool
	ShardCount              int64
	Rotation                *model.RotatorConfig
	TransformerConfig       map[string]string
	AltNames                map[string]string
	StoragePolicy           string
	CompactionPolicy        string
	SubNetworkID            string
	SecurityGroupIDs        []string
	Cleanup                 model.CleanupType
	DropUnknownColumns      bool
	Underlay                bool
	ServiceAccountID        string
	IgnoreRowTooLargeErrors bool
	FitDatetime             bool // will crop date-time to allowed time range (with data-loss)
	SAKeyContent            string
	TriggingInterval        time.Duration
	TriggingSize            uint64
	IsTableColumnOriented   bool
	DefaultCompression      string

	Primary bool // if worker is first, i.e. primary, will run background jobs

	TLSEnabled      bool
	RootCAFiles     []string
	TokenServiceURL string
	UserdataAuth    bool // allow fallback to Instance metadata Auth
}

func (*YdbDestination) BuffererConfig

func (d *YdbDestination) BuffererConfig() bufferer.BuffererConfig

func (*YdbDestination) CleanupMode

func (d *YdbDestination) CleanupMode() model.CleanupType

func (*YdbDestination) GetProviderType

func (d *YdbDestination) GetProviderType() abstract.ProviderType

func (YdbDestination) IsDestination

func (YdbDestination) IsDestination()

func (*YdbDestination) MDBClusterID

func (d *YdbDestination) MDBClusterID() string

func (*YdbDestination) ToStorageParams

func (d *YdbDestination) ToStorageParams() *YdbStorageParams

func (*YdbDestination) Transformer

func (d *YdbDestination) Transformer() map[string]string

func (*YdbDestination) Validate

func (d *YdbDestination) Validate() error

func (*YdbDestination) WithDefaults

func (d *YdbDestination) WithDefaults()

type YdbSource

type YdbSource struct {
	Database           string
	Instance           string
	Tables             []string // actually it's 'paths', but migrating...
	TableColumnsFilter []YdbColumnsFilter
	SubNetworkID       string
	SecurityGroupIDs   []string
	Underlay           bool
	UseFullPaths       bool // can be useful to deal with names collision

	TLSEnabled  bool
	RootCAFiles []string

	// replication stuff:
	ChangeFeedMode       ChangeFeedModeType
	ChangeFeedCustomName string          // user can specify pre-created feed's name, otherwise it will created with name == transferID
	BufferSize           model.BytesSize // it's not some real buffer size - see comments to waitLimits() method in kafka-source
	VerboseSDKLogs       bool

	// auth stuff:
	Token            model.SecretString
	UserdataAuth     bool
	ServiceAccountID string
	TokenServiceURL  string
	SAKeyContent     string
}

func (*YdbSource) AllIncludes

func (s *YdbSource) AllIncludes() []string

func (*YdbSource) ExtraTransformers

func (s *YdbSource) ExtraTransformers(_ context.Context, _ *model.Transfer, _ metrics.Registry) ([]abstract.Transformer, error)

func (*YdbSource) FulfilledIncludes

func (s *YdbSource) FulfilledIncludes(tableID abstract.TableID) []string

func (*YdbSource) GetProviderType

func (s *YdbSource) GetProviderType() abstract.ProviderType

func (*YdbSource) Include

func (s *YdbSource) Include(tID abstract.TableID) bool

func (*YdbSource) IsIncremental

func (*YdbSource) IsIncremental()

func (*YdbSource) IsSource

func (s *YdbSource) IsSource()

func (*YdbSource) MDBClusterID

func (s *YdbSource) MDBClusterID() string

func (*YdbSource) SupportsStartCursorValue

func (*YdbSource) SupportsStartCursorValue() bool

func (*YdbSource) ToStorageParams

func (s *YdbSource) ToStorageParams() *YdbStorageParams

func (*YdbSource) Validate

func (s *YdbSource) Validate() error

func (*YdbSource) WithDefaults

func (s *YdbSource) WithDefaults()

type YdbStorageParams

type YdbStorageParams struct {
	Database           string
	Instance           string
	Tables             []string
	TableColumnsFilter []YdbColumnsFilter
	UseFullPaths       bool

	// auth props
	Token            model.SecretString
	ServiceAccountID string
	UserdataAuth     bool
	SAKeyContent     string
	TokenServiceURL  string

	RootCAFiles []string
	TLSEnabled  bool
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL