Documentation ¶
Index ¶
- Constants
- Variables
- func ConvertTableMapToYDBRelPath(params *YdbStorageParams, tableMap abstract.TableMap) abstract.TableMap
- func CreateChangeFeed(cfg *YdbSource, transferID string) error
- func CreateChangeFeedIfNotExists(cfg *YdbSource, transferID string) error
- func DropChangeFeed(cfg *YdbSource, transferID string) error
- func Fqtn(tid abstract.TableID) string
- func FromYdbSchema(original []options.Column, keys []string) abstract.TableColumns
- func MakeYDBRelPath(useFullPaths bool, paths []string, tableName string) string
- func New(lgr log.Logger, registry metrics.Registry, cp coordinator.Coordinator, ...) providers.Provider
- func NewSinker(lgr log.Logger, cfg *YdbDestination, mtrcs metrics.Registry) (abstract.Sinker, error)
- func NewYDBDriver(ctx context.Context, database, instance string, ...) (*ydb3.Driver, error)
- type AllowedIn
- type AlterTableTemplate
- type ChangeFeedModeType
- type ColumnTemplate
- type CreateTableTemplate
- type Credentials
- type DropTableTemplate
- type JWTAuthParams
- type Provider
- func (p *Provider) Activate(ctx context.Context, task *model.TransferOperation, tables abstract.TableMap, ...) error
- func (p *Provider) Cleanup(ctx context.Context, task *model.TransferOperation) error
- func (p *Provider) Deactivate(ctx context.Context, task *model.TransferOperation) error
- func (p *Provider) Sink(middlewares.Config) (abstract.Sinker, error)
- func (p *Provider) Source() (abstract.Source, error)
- func (p *Provider) Storage() (abstract.Storage, error)
- func (p *Provider) Type() abstract.ProviderType
- type Source
- type Storage
- func (s *Storage) Close()
- func (s *Storage) EstimateTableRowsCount(tid abstract.TableID) (uint64, error)
- func (s *Storage) ExactTableRowsCount(tid abstract.TableID) (uint64, error)
- func (s *Storage) GetIncrementalState(ctx context.Context, incremental []abstract.IncrementalTable) ([]abstract.TableDescription, error)
- func (s *Storage) LoadRandomSample(table abstract.TableDescription, pusher abstract.Pusher) error
- func (s *Storage) LoadSampleBySet(table abstract.TableDescription, keySet []map[string]interface{}, ...) error
- func (s *Storage) LoadTable(ctx context.Context, tableDescr abstract.TableDescription, ...) error
- func (s *Storage) LoadTopBottomSample(table abstract.TableDescription, pusher abstract.Pusher) error
- func (s *Storage) Ping() error
- func (s *Storage) SetInitialState(tables []abstract.TableDescription, incremental []abstract.IncrementalTable)
- func (s *Storage) TableAccessible(table abstract.TableDescription) bool
- func (s *Storage) TableExists(tid abstract.TableID) (bool, error)
- func (s *Storage) TableList(includeTableFilter abstract.IncludeTableList) (abstract.TableMap, error)
- func (s *Storage) TableSchema(ctx context.Context, tableID abstract.TableID) (*abstract.TableSchema, error)
- func (s *Storage) TableSizeInBytes(table abstract.TableID) (uint64, error)
- type TemplateCol
- type TemplateModel
- type TokenCredentials
- type YDBPathRelativizerTransformer
- func (r *YDBPathRelativizerTransformer) Apply(input []abstract.ChangeItem) abstract.TransformerResult
- func (r *YDBPathRelativizerTransformer) Description() string
- func (r *YDBPathRelativizerTransformer) ResultSchema(original *abstract.TableSchema) (*abstract.TableSchema, error)
- func (r *YDBPathRelativizerTransformer) Suitable(table abstract.TableID, schema *abstract.TableSchema) bool
- func (r *YDBPathRelativizerTransformer) Type() abstract.TransformerType
- type YdbColumnsFilter
- type YdbColumnsFilterType
- type YdbDestination
- func (d *YdbDestination) BuffererConfig() bufferer.BuffererConfig
- func (d *YdbDestination) CleanupMode() model.CleanupType
- func (d *YdbDestination) GetProviderType() abstract.ProviderType
- func (YdbDestination) IsDestination()
- func (d *YdbDestination) MDBClusterID() string
- func (d *YdbDestination) ToStorageParams() *YdbStorageParams
- func (d *YdbDestination) Transformer() map[string]string
- func (d *YdbDestination) Validate() error
- func (d *YdbDestination) WithDefaults()
- type YdbSource
- func (s *YdbSource) AllIncludes() []string
- func (s *YdbSource) ExtraTransformers(_ context.Context, _ *model.Transfer, _ metrics.Registry) ([]abstract.Transformer, error)
- func (s *YdbSource) FulfilledIncludes(tableID abstract.TableID) []string
- func (s *YdbSource) GetProviderType() abstract.ProviderType
- func (s *YdbSource) Include(tID abstract.TableID) bool
- func (*YdbSource) IsIncremental()
- func (s *YdbSource) IsSource()
- func (s *YdbSource) MDBClusterID() string
- func (*YdbSource) SupportsStartCursorValue() bool
- func (s *YdbSource) ToStorageParams() *YdbStorageParams
- func (s *YdbSource) Validate() error
- func (s *YdbSource) WithDefaults()
- type YdbStorageParams
Constants ¶
View Source
const ProviderType = abstract.ProviderType("ydb")
View Source
const (
YDBRelativePathTransformerType = "ydb-path-relativizer-transformer"
)
Variables ¶
View Source
var JWTCredentials = func(content string, tokenServiceURL string) (TokenCredentials, error) { return nil, xerrors.Errorf("not implemented") }
View Source
var NewYDBCredsFromYCCreds = func(ycCreds Credentials, tokenService string) TokenCredentials { return nil }
View Source
var SchemaMismatchErr = xerrors.New("table deleted, due schema mismatch")
View Source
var TypeYdbDecimal types.Type = types.DecimalType(22, 9)
Functions ¶
func ConvertTableMapToYDBRelPath ¶
func ConvertTableMapToYDBRelPath(params *YdbStorageParams, tableMap abstract.TableMap) abstract.TableMap
func CreateChangeFeed ¶
func DropChangeFeed ¶
func FromYdbSchema ¶
func FromYdbSchema(original []options.Column, keys []string) abstract.TableColumns
func MakeYDBRelPath ¶
func New ¶
func New(lgr log.Logger, registry metrics.Registry, cp coordinator.Coordinator, transfer *model.Transfer) providers.Provider
func NewYDBDriver ¶
func NewYDBDriver(ctx context.Context, database, instance string, credentials credentials.Credentials, tlsConfig *tls.Config) (*ydb3.Driver, error)
Types ¶
type AlterTableTemplate ¶
type AlterTableTemplate struct { Path string AddColumns []ColumnTemplate DropColumns []string }
type ChangeFeedModeType ¶
type ChangeFeedModeType string
const ( ChangeFeedModeUpdates ChangeFeedModeType = "UPDATES" ChangeFeedModeNewImage ChangeFeedModeType = "NEW_IMAGE" ChangeFeedModeNewAndOldImages ChangeFeedModeType = "NEW_AND_OLD_IMAGES" )
type ColumnTemplate ¶
type CreateTableTemplate ¶
type Credentials ¶
type Credentials interface {
// YandexCloudAPICredentials is a marker method. All compatible Credentials implementations have it
YandexCloudAPICredentials()
}
Credentials is an abstraction of API authorization credentials. See https://cloud.yandex.ru/docs/iam/concepts/authorization/authorization for details. Note that functions that return Credentials may return different Credentials implementation in next SDK version, and this is not considered breaking change.
type DropTableTemplate ¶
type DropTableTemplate struct {
Path string
}
type JWTAuthParams ¶
type Provider ¶
type Provider struct {
// contains filtered or unexported fields
}
func (*Provider) Activate ¶
func (p *Provider) Activate(ctx context.Context, task *model.TransferOperation, tables abstract.TableMap, callbacks providers.ActivateCallbacks) error
func (*Provider) Deactivate ¶
func (*Provider) Type ¶
func (p *Provider) Type() abstract.ProviderType
type Storage ¶
type Storage struct {
// contains filtered or unexported fields
}
func NewStorage ¶
func NewStorage(cfg *YdbStorageParams) (*Storage, error)
func (*Storage) EstimateTableRowsCount ¶
func (*Storage) ExactTableRowsCount ¶
func (*Storage) GetIncrementalState ¶
func (s *Storage) GetIncrementalState(ctx context.Context, incremental []abstract.IncrementalTable) ([]abstract.TableDescription, error)
func (*Storage) LoadRandomSample ¶
func (*Storage) LoadSampleBySet ¶
func (*Storage) LoadTopBottomSample ¶
func (*Storage) SetInitialState ¶
func (s *Storage) SetInitialState(tables []abstract.TableDescription, incremental []abstract.IncrementalTable)
func (*Storage) TableAccessible ¶
func (s *Storage) TableAccessible(table abstract.TableDescription) bool
func (*Storage) TableSchema ¶
type TemplateCol ¶
type TemplateCol struct{ Name, Typ, Comma string }
type TemplateModel ¶
type TemplateModel struct { Cols []TemplateCol Path string }
type TokenCredentials ¶
TokenCredentials is an interface that contains options used to authorize a client.
func ResolveCredentials ¶
func ResolveCredentials( userDataAuth bool, oauthToken string, jwt JWTAuthParams, serviceAccountID string, logger log.Logger, ) (TokenCredentials, error)
type YDBPathRelativizerTransformer ¶
type YDBPathRelativizerTransformer struct {
Paths []string
}
func NewYDBRelativePathTransformer ¶
func NewYDBRelativePathTransformer(paths []string) *YDBPathRelativizerTransformer
func (*YDBPathRelativizerTransformer) Apply ¶
func (r *YDBPathRelativizerTransformer) Apply(input []abstract.ChangeItem) abstract.TransformerResult
func (*YDBPathRelativizerTransformer) Description ¶
func (r *YDBPathRelativizerTransformer) Description() string
func (*YDBPathRelativizerTransformer) ResultSchema ¶
func (r *YDBPathRelativizerTransformer) ResultSchema(original *abstract.TableSchema) (*abstract.TableSchema, error)
func (*YDBPathRelativizerTransformer) Suitable ¶
func (r *YDBPathRelativizerTransformer) Suitable(table abstract.TableID, schema *abstract.TableSchema) bool
func (*YDBPathRelativizerTransformer) Type ¶
func (r *YDBPathRelativizerTransformer) Type() abstract.TransformerType
type YdbColumnsFilter ¶
type YdbColumnsFilter struct { TableNamesRegexp string ColumnNamesRegexp string Type YdbColumnsFilterType }
type YdbColumnsFilterType ¶
type YdbColumnsFilterType string
const ( YdbColumnsBlackList YdbColumnsFilterType = "blacklist" YdbColumnsWhiteList YdbColumnsFilterType = "whitelist" )
type YdbDestination ¶
type YdbDestination struct { Token model.SecretString Database string Path string Instance string LegacyWriter bool ShardCount int64 Rotation *model.RotatorConfig TransformerConfig map[string]string AltNames map[string]string StoragePolicy string CompactionPolicy string SubNetworkID string SecurityGroupIDs []string Cleanup model.CleanupType DropUnknownColumns bool Underlay bool ServiceAccountID string IgnoreRowTooLargeErrors bool FitDatetime bool // will crop date-time to allowed time range (with data-loss) SAKeyContent string TriggingInterval time.Duration TriggingSize uint64 IsTableColumnOriented bool DefaultCompression string Primary bool // if worker is first, i.e. primary, will run background jobs TLSEnabled bool RootCAFiles []string TokenServiceURL string UserdataAuth bool // allow fallback to Instance metadata Auth }
func (*YdbDestination) BuffererConfig ¶
func (d *YdbDestination) BuffererConfig() bufferer.BuffererConfig
func (*YdbDestination) CleanupMode ¶
func (d *YdbDestination) CleanupMode() model.CleanupType
func (*YdbDestination) GetProviderType ¶
func (d *YdbDestination) GetProviderType() abstract.ProviderType
func (YdbDestination) IsDestination ¶
func (YdbDestination) IsDestination()
func (*YdbDestination) MDBClusterID ¶
func (d *YdbDestination) MDBClusterID() string
func (*YdbDestination) ToStorageParams ¶
func (d *YdbDestination) ToStorageParams() *YdbStorageParams
func (*YdbDestination) Transformer ¶
func (d *YdbDestination) Transformer() map[string]string
func (*YdbDestination) Validate ¶
func (d *YdbDestination) Validate() error
func (*YdbDestination) WithDefaults ¶
func (d *YdbDestination) WithDefaults()
type YdbSource ¶
type YdbSource struct { Database string Instance string Tables []string // actually it's 'paths', but migrating... TableColumnsFilter []YdbColumnsFilter SubNetworkID string SecurityGroupIDs []string Underlay bool UseFullPaths bool // can be useful to deal with names collision TLSEnabled bool RootCAFiles []string // replication stuff: ChangeFeedMode ChangeFeedModeType ChangeFeedCustomName string // user can specify pre-created feed's name, otherwise it will created with name == transferID BufferSize model.BytesSize // it's not some real buffer size - see comments to waitLimits() method in kafka-source VerboseSDKLogs bool // auth stuff: Token model.SecretString UserdataAuth bool ServiceAccountID string TokenServiceURL string SAKeyContent string }
func (*YdbSource) AllIncludes ¶
func (*YdbSource) ExtraTransformers ¶
func (*YdbSource) FulfilledIncludes ¶
func (*YdbSource) GetProviderType ¶
func (s *YdbSource) GetProviderType() abstract.ProviderType
func (*YdbSource) IsIncremental ¶
func (*YdbSource) IsIncremental()
func (*YdbSource) MDBClusterID ¶
func (*YdbSource) SupportsStartCursorValue ¶
func (*YdbSource) ToStorageParams ¶
func (s *YdbSource) ToStorageParams() *YdbStorageParams
func (*YdbSource) WithDefaults ¶
func (s *YdbSource) WithDefaults()
type YdbStorageParams ¶
Source Files ¶
- auth.go
- cdc_converter.go
- cdc_event.go
- client.go
- client2.go
- fallback_date_and_datetime_as_timestamp.go
- messages_batch.go
- model_destination.go
- model_source.go
- model_storage_params.go
- provider.go
- reader_threadsafe.go
- schema.go
- schema_wrapper.go
- sink.go
- source.go
- source_tasks.go
- storage.go
- storage_incremental.go
- storage_sampleable.go
- typesystem.go
- utils.go
- ydb_path_relativizer.go
Click to show internal directories.
Click to hide internal directories.