Documentation ¶
Index ¶
- Constants
- Variables
- func ApplyCommands(commands []*pgDumpItem, transfer model.Transfer, registry metrics.Registry, ...) error
- func ApplyPgDumpPostSteps(pgdump []*pgDumpItem, transfer *model.Transfer, registry metrics.Registry) error
- func ApplyPgDumpPreSteps(pgdump []*pgDumpItem, transfer *model.Transfer, registry metrics.Registry) error
- func BeginTx(ctx context.Context, conn *pgx.Conn, options pgx.TxOptions, lgr log.Logger) (pgx.Tx, *util.Rollbacks, error)
- func BeginTxWithSnapshot(ctx context.Context, conn *pgx.Conn, options pgx.TxOptions, snapshot string, ...) (pgx.Tx, *util.Rollbacks, error)
- func BuildColSchemaArrayElement(colSchema abstract.ColSchema) abstract.ColSchema
- func CalculatePartCount(totalSize, desiredPartSize, partCountLimit uint64) uint64
- func ClearOriginalType(pgType string) string
- func CreateReplicationSlot(src *PgSource) error
- func CreateSchemaQueryOptional(fullTableName string) string
- func CreateTableQuery(fullTableName string, schema []abstract.ColSchema) (string, error)
- func CurrentTxStartTime(ctx context.Context, conn *pgx.Conn) (time.Time, error)
- func DaterangeToString(t []time.Time) string
- func DropReplicationSlot(src *PgSource) error
- func ExtractPgDumpSchema(transfer *model.Transfer) ([]*pgDumpItem, error)
- func FallbackBitAsBytes(item *abstract.ChangeItem) (*abstract.ChangeItem, error)
- func FallbackNotNullAsNull(ci *abstract.ChangeItem) (*abstract.ChangeItem, error)
- func FallbackTimestampToUTC(item *abstract.ChangeItem) (*abstract.ChangeItem, error)
- func GetArrElemTypeDescr(originalType string) string
- func GetCurrentStateOfSequence(ctx context.Context, conn *pgx.Conn, sequenceID abstract.TableID) (lastValue int64, isCalled bool, err error)
- func GetDeleteFromMoleFinder(schema string) string
- func GetInitLSNSlotDDL(schema string) string
- func GetInsertIntoMoleFinder(schema string) string
- func GetInsertIntoMoleFinder2(schema string) string
- func GetMoveIteratorQuery(schema string) string
- func GetPropertyEnumAllValues(in *abstract.ColSchema) []string
- func GetSelectCommittedLSN(schema string) string
- func GetSelectCommittedLSN2(schema string) string
- func GetUpdateReplicatedQuery(schema string) string
- func HstoreToJSON(colVal string) (string, error)
- func HstoreToMap(colVal string) (map[string]interface{}, error)
- func IsKeyword(name string) bool
- func IsPKeyCheckError(err error) bool
- func IsPgError(err error, code PgErrorCode) bool
- func IsPgTypeTimeWithTimeZone(originalType string) bool
- func IsPgTypeTimeWithTimeZoneUnprefixed(in string) bool
- func IsPgTypeTimeWithoutTimeZone(originalType string) bool
- func IsPgTypeTimeWithoutTimeZoneUnprefixed(in string) bool
- func IsPgTypeTimestampWithTimeZone(originalType string) bool
- func IsPgTypeTimestampWithTimeZoneUnprefixed(in string) bool
- func IsPgTypeTimestampWithoutTimeZone(originalType string) bool
- func IsPgTypeTimestampWithoutTimeZoneUnprefixed(in string) bool
- func IsUserDefinedType(col *abstract.ColSchema) bool
- func JSONToHstore(in string) (string, error)
- func KeeperDDL(schema string) string
- func ListWithCommaSingleQuoted(values []string) string
- func LockQuery(t abstract.TableID, mode PGTableLockMode) string
- func MakeChildParentMap(ctx context.Context, conn pgxtype.Querier) (map[abstract.TableID]abstract.TableID, error)
- func MakeConnConfigFromSink(lgr log.Logger, params PgSinkParams) (*pgx.ConnConfig, error)
- func MakeConnConfigFromSrc(lgr log.Logger, pgSrc *PgSource) (*pgx.ConnConfig, error)
- func MakeConnConfigFromStorage(lgr log.Logger, storage *PgStorageParams) (*pgx.ConnConfig, error)
- func MakeConnPoolFromDst(pgDst *PgDestination, lgr log.Logger) (*pgxpool.Pool, error)
- func MakeConnPoolFromSrc(pgSrc *PgSource, lgr log.Logger) (*pgxpool.Pool, error)
- func MakeInitDataTypes(options ...DataTypesOption) func(ctx context.Context, conn *pgx.Conn) error
- func MakeSetSQL(key string, value string) string
- func MinusToBC(v string) string
- func New(lgr log.Logger, registry metrics.Registry, cp coordinator.Coordinator, ...) providers.Provider
- func NewPgConnPool(connConfig *pgx.ConnConfig, lgr log.Logger, ...) (*pgxpool.Pool, error)
- func NewPgConnPoolConfig(ctx context.Context, poolConfig *pgxpool.Config) (*pgxpool.Pool, error)
- func NewPollingPublisher(version PgVersion, conn *pgxpool.Pool, slot AbstractSlot, ...) (abstract.Source, error)
- func NewReplicationPublisher(version PgVersion, replConn *mutexedPgConn, connPool *pgxpool.Pool, ...) (abstract.Source, error)
- func NewSink(lgr log.Logger, transferID string, config PgSinkParams, mtrcs metrics.Registry) (abstract.Sinker, error)
- func NewSinkWithPool(ctx context.Context, lgr log.Logger, transferID string, config PgSinkParams, ...) (abstract.Sinker, error)
- func NewSourceWrapper(src *PgSource, transferID string, objects *model.DataObjects, lgr log.Logger, ...) (abstract.Source, error)
- func ParseLsn(lsn string) uint64
- func PgTypeToYTType(pgType string) schema.Type
- func PostgresDumpConnString(src *PgSource) (string, model.SecretString, error)
- func ReadQuery(t *abstract.TableDescription, columns string) string
- func Represent(val interface{}, colSchema abstract.ColSchema) (string, error)
- func RepresentWithCast(v interface{}, colSchema abstract.ColSchema) (string, error)
- func RollbackFuncForPgxTx(ctx context.Context, tx pgx.Tx, lgr log.Logger) func()
- func RowCount(ctx context.Context, conn *pgx.Conn, table *abstract.TableDescription) (uint64, error)
- func RunSlotMonitor(ctx context.Context, pgSrc *PgSource, registry metrics.Registry) (abstract.SlotKiller, <-chan error, error)
- func Sanitize(identifier string) string
- func SetInitialState(tables []abstract.TableDescription, ...)
- func TableName(t *abstract.TableDescription) string
- func TimeWithTimeZoneToTime(val string) (time.Time, error)
- func UnwrapArrayElement(in string) string
- func VerifyPostgresTables(src *PgSource, transfer *model.Transfer, lgr log.Logger) error
- func VerifyPostgresTablesNames(tables []string) error
- func WhereClause(filter abstract.WhereStatement) string
- func WithLogger(connConfig *pgx.ConnConfig, lgr log.Logger) *pgx.ConnConfig
- type AbstractSlot
- type ChangeItemsFetcher
- func (f *ChangeItemsFetcher) Fetch() (items []abstract.ChangeItem, err error)
- func (f *ChangeItemsFetcher) MaybeHasMore() bool
- func (f *ChangeItemsFetcher) Sniffer() (items []abstract.ChangeItem, err error)
- func (f *ChangeItemsFetcher) WithLimitBytes(limitBytes model.BytesSize) *ChangeItemsFetcher
- func (f *ChangeItemsFetcher) WithLimitCount(limitCount int) *ChangeItemsFetcher
- func (f *ChangeItemsFetcher) WithLogger(logger log.Logger) *ChangeItemsFetcher
- func (f *ChangeItemsFetcher) WithUnmarshallerData(data UnmarshallerData) *ChangeItemsFetcher
- type Config
- type ConnectionParams
- type DBFlavour
- type DataTypesOption
- type Date
- type DefaultDataType
- type DuplicatesPolicy
- type FieldDescription
- type GenericArray
- func (ga *GenericArray) AssignTo(_ interface{}) error
- func (ga *GenericArray) DecodeBinary(ci *pgtype.ConnInfo, src []byte) error
- func (ga *GenericArray) DecodeText(ci *pgtype.ConnInfo, src []byte) error
- func (ga *GenericArray) EncodeBinary(ci *pgtype.ConnInfo, buf []byte) ([]byte, error)
- func (ga *GenericArray) EncodeText(ci *pgtype.ConnInfo, buf []byte) ([]byte, error)
- func (ga *GenericArray) ExtractValue(connInfo *pgtype.ConnInfo) (interface{}, error)
- func (ga *GenericArray) Get() interface{}
- func (ga *GenericArray) GetValue(connInfo *pgtype.ConnInfo) (interface{}, error)
- func (ga *GenericArray) NewTypeValue() pgtype.Value
- func (ga *GenericArray) NewTypeValueImpl() *GenericArray
- func (ga *GenericArray) Scan(_ interface{}) error
- func (ga *GenericArray) Set(interface{}) error
- func (ga *GenericArray) TypeName() string
- func (ga *GenericArray) Unpack(connInfo *pgtype.ConnInfo) (*util.XDArray, error)
- func (ga *GenericArray) Value() (driver.Value, error)
- type Keeper
- type LsnTrackedSlot
- func (l *LsnTrackedSlot) CheckMonotonic(ci abstract.ChangeItem) (bool, error)
- func (l *LsnTrackedSlot) Close()
- func (l *LsnTrackedSlot) Create() error
- func (l *LsnTrackedSlot) Exist() (bool, error)
- func (l *LsnTrackedSlot) Init(sink abstract.AsyncSink) error
- func (l *LsnTrackedSlot) Move(lsn string) error
- func (l *LsnTrackedSlot) Run()
- func (l *LsnTrackedSlot) Suicide() error
- func (l *LsnTrackedSlot) UpdateReplicated(iter int, lsn uint64) error
- type Metrics
- type OldKeysType
- type PGTableLockMode
- type PgDestination
- func (d *PgDestination) AllHosts() []string
- func (d *PgDestination) BuffererConfig() bufferer.BuffererConfig
- func (d *PgDestination) CleanupMode() dp_model.CleanupType
- func (d *PgDestination) FillDependentFields(transfer *dp_model.Transfer)
- func (d *PgDestination) GetConnectionID() string
- func (d *PgDestination) GetProviderType() abstract.ProviderType
- func (d *PgDestination) HasTLS() bool
- func (PgDestination) IsDestination()
- func (d *PgDestination) MDBClusterID() string
- func (d *PgDestination) ReliesOnSystemTablesTransferring() bool
- func (d *PgDestination) ToSinkParams() PgDestinationWrapper
- func (d *PgDestination) ToStorageParams() *PgStorageParams
- func (d *PgDestination) Transformer() map[string]string
- func (d *PgDestination) Validate() error
- func (d *PgDestination) WithDefaults()
- type PgDestinationWrapper
- func (d PgDestinationWrapper) AllHosts() []string
- func (d PgDestinationWrapper) CleanupMode() dp_model.CleanupType
- func (d PgDestinationWrapper) ClusterID() string
- func (d PgDestinationWrapper) ConnectionID() string
- func (d PgDestinationWrapper) CopyUpload() bool
- func (d PgDestinationWrapper) Database() string
- func (d PgDestinationWrapper) DisableSQLFallback() bool
- func (d PgDestinationWrapper) HasTLS() bool
- func (d PgDestinationWrapper) IgnoreUniqueConstraint() bool
- func (d PgDestinationWrapper) LoozeMode() bool
- func (d PgDestinationWrapper) MaintainTables() bool
- func (d PgDestinationWrapper) Password() string
- func (d PgDestinationWrapper) PerTransactionPush() bool
- func (d PgDestinationWrapper) Port() int
- func (d PgDestinationWrapper) QueryTimeout() time.Duration
- func (d PgDestinationWrapper) TLSFile() string
- func (d PgDestinationWrapper) Tables() map[string]string
- func (d PgDestinationWrapper) User() string
- type PgDumpSteps
- type PgErrorCode
- type PgObjectType
- type PgSerializationFormat
- type PgSinkParams
- type PgSource
- func (s *PgSource) AllHosts() []string
- func (s *PgSource) AllIncludes() []string
- func (s *PgSource) AuxTables() []string
- func (s *PgSource) ExcludeWithGlobals() []string
- func (s *PgSource) ExtraTransformers(ctx context.Context, transfer *model.Transfer, registry metrics.Registry) ([]abstract.Transformer, error)
- func (s *PgSource) FillDependentFields(transfer *model.Transfer)
- func (s *PgSource) FulfilledIncludes(tID abstract.TableID) (result []string)
- func (s *PgSource) GetConnectionID() string
- func (s *PgSource) GetProviderType() abstract.ProviderType
- func (s *PgSource) HasTLS() bool
- func (s *PgSource) Include(tID abstract.TableID) bool
- func (*PgSource) IsIncremental()
- func (*PgSource) IsSource()
- func (*PgSource) IsStrictSource()
- func (s *PgSource) MDBClusterID() string
- func (*PgSource) SupportsStartCursorValue() bool
- func (s *PgSource) ToSinkParams() PgSourceWrapper
- func (s *PgSource) ToStorageParams(transfer *model.Transfer) *PgStorageParams
- func (s *PgSource) Validate() error
- func (s *PgSource) WithDefaults()
- type PgSourceWrapper
- func (d PgSourceWrapper) AllHosts() []string
- func (d PgSourceWrapper) CleanupMode() model.CleanupType
- func (d PgSourceWrapper) ClusterID() string
- func (d PgSourceWrapper) ConnectionID() string
- func (d PgSourceWrapper) CopyUpload() bool
- func (d PgSourceWrapper) Database() string
- func (d PgSourceWrapper) DisableSQLFallback() bool
- func (d PgSourceWrapper) HasTLS() bool
- func (d PgSourceWrapper) IgnoreUniqueConstraint() bool
- func (d PgSourceWrapper) LoozeMode() bool
- func (d PgSourceWrapper) MaintainTables() bool
- func (d PgSourceWrapper) Password() string
- func (d PgSourceWrapper) PerTransactionPush() bool
- func (d PgSourceWrapper) Port() int
- func (d PgSourceWrapper) QueryTimeout() time.Duration
- func (d *PgSourceWrapper) SetMaintainTables(maintainTables bool)
- func (d PgSourceWrapper) TLSFile() string
- func (d PgSourceWrapper) Tables() map[string]string
- func (d PgSourceWrapper) User() string
- type PgStorageParams
- type PgVersion
- type Plugin
- type PostgreSQLFlavour
- func (f *PostgreSQLFlavour) ListSchemaQuery(excludeViews bool, withSpecificTable bool, forbiddenSchemas []string, ...) string
- func (f *PostgreSQLFlavour) ListTablesQuery(excludeViews bool, forbiddenSchemas []string, forbiddenTables []string) string
- func (f *PostgreSQLFlavour) PgClassFilter() string
- func (f *PostgreSQLFlavour) PgClassRelsOnlyFilter() string
- type PostgresSlotKiller
- type Provider
- func (p *Provider) Activate(ctx context.Context, task *model.TransferOperation, tables abstract.TableMap, ...) error
- func (p *Provider) Cleanup(ctx context.Context, task *model.TransferOperation) error
- func (p *Provider) DBLogUpload(src *PgSource, tables abstract.TableMap) error
- func (p *Provider) Deactivate(ctx context.Context, task *model.TransferOperation) error
- func (p *Provider) DestinationSampleableStorage() (abstract.SampleableStorage, error)
- func (p *Provider) Sink(config middlewares.Config) (abstract.Sinker, error)
- func (p *Provider) Source() (abstract.Source, error)
- func (p *Provider) SourceSampleableStorage() (abstract.SampleableStorage, []abstract.TableDescription, error)
- func (p *Provider) Storage() (abstract.Storage, error)
- func (p *Provider) Type() abstract.ProviderType
- func (p *Provider) Verify(ctx context.Context) error
- type SchemaExtractor
- func (e *SchemaExtractor) FindDependentViews(ctx context.Context, conn *pgx.Conn, tables abstract.TableMap) (map[abstract.TableID][]abstract.TableID, error)
- func (e *SchemaExtractor) LoadSchema(ctx context.Context, conn *pgx.Conn, specificTable *abstract.TableID) (abstract.DBSchema, error)
- func (e *SchemaExtractor) TablesList(ctx context.Context, conn *pgx.Conn) ([]tableIDWithInfo, time.Time, error)
- func (e *SchemaExtractor) WithExcludeViews(excludeViews bool) *SchemaExtractor
- func (e *SchemaExtractor) WithFlavour(flavour DBFlavour) *SchemaExtractor
- func (e *SchemaExtractor) WithForbiddenSchemas(forbiddenSchemas []string) *SchemaExtractor
- func (e *SchemaExtractor) WithForbiddenTables(forbiddenTables []string) *SchemaExtractor
- func (e *SchemaExtractor) WithLogger(logger log.Logger) *SchemaExtractor
- func (e *SchemaExtractor) WithUseFakePrimaryKey(useFakePrimaryKey bool) *SchemaExtractor
- type SequenceInfo
- type SequenceMap
- type Slot
- type SlotMonitor
- type SnapshotKey
- type SnapshotState
- type SortOrder
- type Storage
- func (s *Storage) BeginPGSnapshot(ctx context.Context) error
- func (s *Storage) BuildTypeMapping(ctx context.Context) (TypeNameToOIDMap, error)
- func (s *Storage) Close()
- func (s *Storage) EndPGSnapshot(ctx context.Context) error
- func (s *Storage) EstimateTableRowsCount(table abstract.TableID) (uint64, error)
- func (s *Storage) ExactTableDescriptionRowsCount(ctx context.Context, table abstract.TableDescription, timeout time.Duration) (uint64, error)
- func (s *Storage) ExactTableRowsCount(table abstract.TableID) (uint64, error)
- func (s *Storage) GetIncrementalState(ctx context.Context, incremental []abstract.IncrementalTable) ([]abstract.TableDescription, error)
- func (s *Storage) GetInheritedTables(ctx context.Context) (map[abstract.TableID]abstract.TableID, error)
- func (s *Storage) LoadQueryTable(ctx context.Context, tableQuery tablequery.TableQuery, pusher abstract.Pusher) error
- func (s *Storage) LoadRandomSample(table abstract.TableDescription, pusher abstract.Pusher) error
- func (s *Storage) LoadSampleBySet(table abstract.TableDescription, keySet []map[string]interface{}, ...) error
- func (s *Storage) LoadSchema() (abstract.DBSchema, error)
- func (s *Storage) LoadSchemaForTable(ctx context.Context, conn *pgx.Conn, table abstract.TableDescription) (*abstract.TableSchema, error)
- func (s *Storage) LoadTable(ctx context.Context, table abstract.TableDescription, pusher abstract.Pusher) error
- func (s *Storage) LoadTopBottomSample(table abstract.TableDescription, pusher abstract.Pusher) error
- func (s *Storage) OrderedRead(t *abstract.TableDescription, schemas []abstract.ColSchema, ...) string
- func (s *Storage) Ping() error
- func (s *Storage) RunSlotMonitor(ctx context.Context, serverSource interface{}, registry metrics.Registry) (abstract.SlotKiller, <-chan error, error)
- func (s *Storage) SetInitialState(tables []abstract.TableDescription, ...)
- func (s *Storage) SetLoadDescending(loadDescending bool)
- func (s *Storage) SetShardingContext(shardedState []byte) error
- func (s *Storage) ShardTable(ctx context.Context, table abstract.TableDescription) ([]abstract.TableDescription, error)
- func (s *Storage) ShardingContext() ([]byte, error)
- func (s *Storage) SnapshotLSN() string
- func (s *Storage) TableAccessible(table abstract.TableDescription) bool
- func (s *Storage) TableExists(table abstract.TableID) (bool, error)
- func (s *Storage) TableList(filter abstract.IncludeTableList) (abstract.TableMap, error)
- func (s *Storage) TableSchema(ctx context.Context, table abstract.TableID) (*abstract.TableSchema, error)
- func (s *Storage) TableSizeInBytes(table abstract.TableID) (uint64, error)
- func (s *Storage) WorkersSnapshotState() *SnapshotState
- type StorageOpt
- type TextDecoderAndValuerWithHomo
- type Timestamp
- type Timestamptz
- type TypeFullName
- type TypeMapping
- type TypeNameToOIDMap
- type Unmarshaller
- type UnmarshallerData
- type Wal2JSONItem
- type Wal2JsonParser
- type Worker
Constants ¶
const ( PgSerializationFormatAuto = PgSerializationFormat("") PgSerializationFormatText = PgSerializationFormat("text") PgSerializationFormatBinary = PgSerializationFormat("binary") )
const ( TableConsumerKeeper = abstract.TableConsumerKeeper // "__consumer_keeper" TableMoleFinder = "__data_transfer_mole_finder" TableLSN = abstract.TableLSN // "__data_transfer_lsn" )
const ( PgDateFormat = "2006-01-02" PgDatetimeFormat = "2006-01-02 15:04:05Z07:00:00" PgTimestampFormat = "2006-01-02 15:04:05.999999Z07:00:00" )
const ( SortAsc SortOrder = "asc" SortDesc SortOrder = "desc" All DuplicatesPolicy = "all" PartitionsFilterPrefix = "__dt_resolved_hard:" )
const BufferLimit = 16 * humanize.MiByte
const DatabaseTimeZone = abstract.PropertyKey("pg:database_timezone")
const EnumAllValues = abstract.PropertyKey("pg:enum_all_values")
const FakeParentPKeyStatusMessageCategory string = "fake_primary_key_parent"
const PGDefaultQueryTimeout time.Duration = 30 * time.Minute
const ProviderType = abstract.ProviderType("pg")
const SelectCurrentLsnDelay = `select pg_wal_lsn_diff(pg_last_wal_replay_lsn(), $1);`
const TimeZoneParameterStatusKey string = "TimeZone"
TimeZoneParameterStatusKey is the identifier of the PostgreSQL connection property containing time zone
Variables ¶
var ( InitLSNSlotDDL = fmt.Sprintf(` create table if not exists "%%s"."%s" ( slot_id text primary key, iterator int, replicated_iterator int, last_iterator_update timestamp, last_replicated_update timestamp, committed_lsn text );`, TableMoleFinder) MoveIteratorQuery = fmt.Sprintf(` update "%%s"."%s" set iterator = iterator + 1, last_iterator_update = now() where slot_id = $1;`, TableMoleFinder) UpdateReplicatedQuery = fmt.Sprintf(` update "%%s"."%s" set replicated_iterator = $2, last_replicated_update = now() where slot_id = $1 and replicated_iterator = $2 - 1;`, TableMoleFinder) SelectCommittedLSN = fmt.Sprintf(` select committed_lsn from "%%s"."%s" where slot_id = $1;`, TableMoleFinder) InsertIntoMoleFinder = fmt.Sprintf(` insert into "%%s"."%s" (slot_id, iterator, replicated_iterator, last_iterator_update, last_replicated_update, committed_lsn) values ($1, null, null, now(), now(), $2) on conflict (slot_id) do update set replicated_iterator = null, iterator = null, last_iterator_update = now(), last_replicated_update = now(), committed_lsn = excluded.committed_lsn;`, TableMoleFinder) DeleteFromMoleFinder = fmt.Sprintf(`delete from "%%s"."%s" where slot_id = $1`, TableMoleFinder) InsertIntoMoleFinder2 = fmt.Sprintf(` insert into "%%s"."%s" (slot_id, committed_lsn) values ($1, $2) on conflict (slot_id) do update set committed_lsn = excluded.committed_lsn;`, TableMoleFinder) SelectCommittedLSN2 = fmt.Sprintf(` select committed_lsn, replicated_iterator from "%%s"."%s" where slot_id = $1`, TableMoleFinder) SelectLsnForSlot = `select restart_lsn from pg_replication_slots where slot_name = $1;` )
var ErrBadPostgresHealth = errors.New("postgres health problem")
var ErrConsumerLocked = errors.New("keeper: Consumer already locked")
var ( LSNTrackTableDDL = fmt.Sprintf(` create table if not exists %s( transfer_id text, schema_name text, table_name text, lsn bigint, primary key (transfer_id, schema_name, table_name) );`, TableLSN, ) )
var (
NoPrimaryKeyCode = coded.Register("postgres", "no_primary_key")
)
var PGGlobalExclude = []abstract.TableID{
{Namespace: "public", Name: "repl_mon"},
}
Functions ¶
func ApplyCommands ¶
func ApplyPgDumpPostSteps ¶
func ApplyPgDumpPostSteps(pgdump []*pgDumpItem, transfer *model.Transfer, registry metrics.Registry) error
ApplyPgDumpPostSteps takes the given dump and applies post-steps defined in transfer source ONLY for homogenous PG-PG transfers. It also logs its actions
func ApplyPgDumpPreSteps ¶
func ApplyPgDumpPreSteps(pgdump []*pgDumpItem, transfer *model.Transfer, registry metrics.Registry) error
ApplyPgDumpPreSteps takes the given dump and applies pre-steps defined in transfer source ONLY for homogenous PG-PG transfers. It also logs its actions
func BeginTx ¶
func BeginTx(ctx context.Context, conn *pgx.Conn, options pgx.TxOptions, lgr log.Logger) (pgx.Tx, *util.Rollbacks, error)
BeginTx starts a transaction for the given pool with the given options and automatically composes sufficient rollback object.
func BeginTxWithSnapshot ¶
func BeginTxWithSnapshot(ctx context.Context, conn *pgx.Conn, options pgx.TxOptions, snapshot string, lgr log.Logger) (pgx.Tx, *util.Rollbacks, error)
BeginTxWithSnapshot starts a transaction at the given connection with the given snapshot and automatically composes a sufficient rollback object.
If the source database does not support SET TRANSACTION SNAPSHOT or the given snapshot identifier is an empty string, a transaction without snapshot is started automatically.
func CalculatePartCount ¶
func ClearOriginalType ¶
func CreateReplicationSlot ¶
func CreateSchemaQueryOptional ¶
CreateSchemaQueryOptional returns a query to create schema if it is absent. May return empty string, in which case schema must not be created. Full table name may contain quotes in schema name, which are removed.
func CreateTableQuery ¶
func CurrentTxStartTime ¶
CurrentTxStartTime returns the start time of the current transaction at the given connection. If there is no active transaction, current time of the database is returned.
func DaterangeToString ¶
func DropReplicationSlot ¶
func ExtractPgDumpSchema ¶
ExtractPgDumpSchema returns the dump ONLY for homogenous PG-PG transfers. It also logs its actions
func FallbackBitAsBytes ¶
func FallbackBitAsBytes(item *abstract.ChangeItem) (*abstract.ChangeItem, error)
FallbackBitAsBytes implements backward compatibility for https://st.yandex-team.ru/TM-5445#640b22b5af0c626bd522d179
WARNING: this fallback violates type strictness!
func FallbackNotNullAsNull ¶
func FallbackNotNullAsNull(ci *abstract.ChangeItem) (*abstract.ChangeItem, error)
func FallbackTimestampToUTC ¶
func FallbackTimestampToUTC(item *abstract.ChangeItem) (*abstract.ChangeItem, error)
FallbackTimestampToUTC implements https://st.yandex-team.ru/TM-5092
func GetArrElemTypeDescr ¶
func GetDeleteFromMoleFinder ¶
func GetInitLSNSlotDDL ¶
func GetInsertIntoMoleFinder ¶
func GetMoveIteratorQuery ¶
func GetSelectCommittedLSN ¶
func GetSelectCommittedLSN2 ¶
func HstoreToJSON ¶
func HstoreToMap ¶
func IsKeyword ¶
Returns whether name is a keyword or not according to a well-known keywords list. See https://www.postgresql.org/docs/current/sql-keywords-appendix.html
func IsPKeyCheckError ¶
func IsPgError ¶
func IsPgError(err error, code PgErrorCode) bool
func IsUserDefinedType ¶
func JSONToHstore ¶
func MakeChildParentMap ¶
func MakeConnConfigFromSink ¶
func MakeConnConfigFromSink(lgr log.Logger, params PgSinkParams) (*pgx.ConnConfig, error)
func MakeConnConfigFromSrc ¶
func MakeConnConfigFromStorage ¶
func MakeConnConfigFromStorage(lgr log.Logger, storage *PgStorageParams) (*pgx.ConnConfig, error)
func MakeConnPoolFromDst ¶
func MakeConnPoolFromSrc ¶
func MakeInitDataTypes ¶
func MakeInitDataTypes(options ...DataTypesOption) func(ctx context.Context, conn *pgx.Conn) error
func MakeSetSQL ¶
func MinusToBC ¶
MinusToBC checks if the given string starts with a minus and if so, trims it and adds a "BC" suffix
func New ¶
func New(lgr log.Logger, registry metrics.Registry, cp coordinator.Coordinator, transfer *model.Transfer) providers.Provider
func NewPgConnPool ¶
func NewPgConnPoolConfig ¶
NewPgConnPoolConfig creates a connection pool. It provides built-in timeouts to limit the duration of connection attempts
func NewPollingPublisher ¶
func NewPollingPublisher( version PgVersion, conn *pgxpool.Pool, slot AbstractSlot, registry *stats.SourceStats, cfg *PgSource, objects *model.DataObjects, transferID string, lgr log.Logger, cp coordinator.Coordinator, ) (abstract.Source, error)
func NewReplicationPublisher ¶
func NewReplicationPublisher(version PgVersion, replConn *mutexedPgConn, connPool *pgxpool.Pool, slot AbstractSlot, stats *stats.SourceStats, source *PgSource, transferID string, lgr log.Logger, cp coordinator.Coordinator, objects *model.DataObjects) (abstract.Source, error)
func NewSinkWithPool ¶
func NewSourceWrapper ¶
func NewSourceWrapper(src *PgSource, transferID string, objects *model.DataObjects, lgr log.Logger, registry *stats.SourceStats, cp coordinator.Coordinator) (abstract.Source, error)
func PgTypeToYTType ¶
func PostgresDumpConnString ¶
func PostgresDumpConnString(src *PgSource) (string, model.SecretString, error)
func RepresentWithCast ¶
func RollbackFuncForPgxTx ¶
RollbackFuncForPgxTx returns a function which ROLLBACKs the given `pgx.Tx` and if an error happens, logs it with the given logger at WARNING level
func RowCount ¶
func RowCount(ctx context.Context, conn *pgx.Conn, table *abstract.TableDescription) (uint64, error)
RowCount queries the storage and returns the exact number of rows in the given table
func RunSlotMonitor ¶
func Sanitize ¶
Sanitize sanitizes the given identifier so that it can be used safely in queries to PostgreSQL. Note that the input cannot contain multiple identifiers (e.g., `schema.table` is not a valid input).
func SetInitialState ¶
func SetInitialState(tables []abstract.TableDescription, incrementalTables []abstract.IncrementalTable)
func TableName ¶
func TableName(t *abstract.TableDescription) string
func TimeWithTimeZoneToTime ¶
TimeWithTimeZoneToTime converts the given (valid) PostgreSQL TIME WITH TIME ZONE into a time.Time whose time zone and time are set to valid values All resulting time.Times are guaranteed to be properly comparable among themselves. This implies they are all based on the same date.
func UnwrapArrayElement ¶
func VerifyPostgresTables ¶
func WhereClause ¶
func WhereClause(filter abstract.WhereStatement) string
func WithLogger ¶
Types ¶
type AbstractSlot ¶
type AbstractSlot interface { Init(sinker abstract.AsyncSink) error Exist() (bool, error) Close() Create() error Suicide() error }
func NewLsnTrackedSlot ¶
func NewNotTrackedSlot ¶
type ChangeItemsFetcher ¶
type ChangeItemsFetcher struct {
// contains filtered or unexported fields
}
ChangeItemsFetcher consolidates multiple objects (of which most important is pgx.Rows) into a single one, providing bufferized fetching and parsing of rows from a postgres source
func NewChangeItemsFetcher ¶
func NewChangeItemsFetcher(rows pgx.Rows, conn *pgx.Conn, template abstract.ChangeItem, sourceStats *stats.SourceStats) *ChangeItemsFetcher
NewChangeItemsFetcher constructs minimal working fetcher. ChangeItems fetched are shallow copies of the template except for ColumnValues - these are filled by the fetcher. The sequence of ColumnValues, ColSchemas (in TableSchema) and rows.FieldDescriptions() MUST BE THE SAME.
func (*ChangeItemsFetcher) Fetch ¶
func (f *ChangeItemsFetcher) Fetch() (items []abstract.ChangeItem, err error)
func (*ChangeItemsFetcher) MaybeHasMore ¶
func (f *ChangeItemsFetcher) MaybeHasMore() bool
func (*ChangeItemsFetcher) Sniffer ¶
func (f *ChangeItemsFetcher) Sniffer() (items []abstract.ChangeItem, err error)
Sniffer you may replace Fetch with Sniffer to get raw data.
func (*ChangeItemsFetcher) WithLimitBytes ¶
func (f *ChangeItemsFetcher) WithLimitBytes(limitBytes model.BytesSize) *ChangeItemsFetcher
WithLimitBytes accepts 0 to disable limit
func (*ChangeItemsFetcher) WithLimitCount ¶
func (f *ChangeItemsFetcher) WithLimitCount(limitCount int) *ChangeItemsFetcher
func (*ChangeItemsFetcher) WithLogger ¶
func (f *ChangeItemsFetcher) WithLogger(logger log.Logger) *ChangeItemsFetcher
func (*ChangeItemsFetcher) WithUnmarshallerData ¶
func (f *ChangeItemsFetcher) WithUnmarshallerData(data UnmarshallerData) *ChangeItemsFetcher
type ConnectionParams ¶
type ConnectionParams struct { Host string Port uint16 Database string User string Password model.SecretString HasTLS bool CACertificates string ClusterID string }
func GetConnParamsFromSrc ¶
func GetConnParamsFromSrc(lgr log.Logger, src *PgSource) (*ConnectionParams, error)
type DBFlavour ¶
type DBFlavour interface { PgClassFilter() string PgClassRelsOnlyFilter() string // ListSchemaQuery returns a query with TWO placeholders if withSpecificTable is true; with ZERO placeholders otherwise ListSchemaQuery(excludeViews bool, withSpecificTable bool, forbiddenSchemas []string, forbiddenTables []string) string // ListTablesQuery returns a query with zero placeholders ListTablesQuery(excludeViews bool, forbiddenSchemas []string, forbiddenTables []string) string }
type DataTypesOption ¶
type DataTypesOption interface {
// contains filtered or unexported methods
}
go-sumtype:decl DataTypesOption
type Date ¶
func NewDate ¶
func NewDate() *Date
NewDate constructs a DATE representation which supports BC years
TODO: remove this when https://st.yandex-team.ru/TM-5127 is done
type DefaultDataType ¶
The Go type for PostgreSQL types of which pgx driver is unaware. Default is *pgx.GenericText. The implementation type must be a pointer (this is required by pgx). May be nil pointer.
type DuplicatesPolicy ¶
type DuplicatesPolicy string
type FieldDescription ¶
type GenericArray ¶
type GenericArray struct {
// contains filtered or unexported fields
}
GenericArray is based on ArrayType https://github.com/doublecloud/transfer/arc_vcs/vendor/github.com/jackc/pgtype/array_type.go?rev=r8263453#L15
func NewGenericArray ¶
func (*GenericArray) AssignTo ¶
func (ga *GenericArray) AssignTo(_ interface{}) error
func (*GenericArray) DecodeBinary ¶
func (ga *GenericArray) DecodeBinary(ci *pgtype.ConnInfo, src []byte) error
func (*GenericArray) DecodeText ¶
func (ga *GenericArray) DecodeText(ci *pgtype.ConnInfo, src []byte) error
func (*GenericArray) EncodeBinary ¶
func (*GenericArray) EncodeText ¶
func (*GenericArray) ExtractValue ¶
func (ga *GenericArray) ExtractValue(connInfo *pgtype.ConnInfo) (interface{}, error)
func (*GenericArray) Get ¶
func (ga *GenericArray) Get() interface{}
func (*GenericArray) GetValue ¶
func (ga *GenericArray) GetValue(connInfo *pgtype.ConnInfo) (interface{}, error)
func (*GenericArray) NewTypeValue ¶
func (ga *GenericArray) NewTypeValue() pgtype.Value
func (*GenericArray) NewTypeValueImpl ¶
func (ga *GenericArray) NewTypeValueImpl() *GenericArray
func (*GenericArray) Scan ¶
func (ga *GenericArray) Scan(_ interface{}) error
func (*GenericArray) Set ¶
func (ga *GenericArray) Set(interface{}) error
func (*GenericArray) TypeName ¶
func (ga *GenericArray) TypeName() string
type Keeper ¶
type Keeper struct { CloseSign chan bool // contains filtered or unexported fields }
type LsnTrackedSlot ¶
func (*LsnTrackedSlot) CheckMonotonic ¶
func (l *LsnTrackedSlot) CheckMonotonic(ci abstract.ChangeItem) (bool, error)
func (*LsnTrackedSlot) Close ¶
func (l *LsnTrackedSlot) Close()
func (*LsnTrackedSlot) Create ¶
func (l *LsnTrackedSlot) Create() error
func (*LsnTrackedSlot) Exist ¶
func (l *LsnTrackedSlot) Exist() (bool, error)
func (*LsnTrackedSlot) Move ¶
func (l *LsnTrackedSlot) Move(lsn string) error
func (*LsnTrackedSlot) Run ¶
func (l *LsnTrackedSlot) Run()
func (*LsnTrackedSlot) Suicide ¶
func (l *LsnTrackedSlot) Suicide() error
func (*LsnTrackedSlot) UpdateReplicated ¶
func (l *LsnTrackedSlot) UpdateReplicated(iter int, lsn uint64) error
type OldKeysType ¶
type OldKeysType struct { abstract.OldKeysType KeyTypeOids []pgtype.OID `json:"keytypeoids"` }
type PGTableLockMode ¶
type PGTableLockMode string
const ( RowExclusiveLockMode PGTableLockMode = "ROW EXCLUSIVE" ExclusiveLockMode PGTableLockMode = "EXCLUSIVE" AccessExclusiveLockMode PGTableLockMode = "ACCESS EXCLUSIVE" )
type PgDestination ¶
type PgDestination struct { // oneof ClusterID string `json:"Cluster"` Host string // legacy field for back compatibility; for now, we are using only 'Hosts' field Hosts []string Database string `json:"Name"` User string Password dp_model.SecretString Port int TLSFile string EnableTLS bool MaintainTables bool AllowDuplicates bool LoozeMode bool IgnoreUniqueConstraint bool Tables map[string]string TransformerConfig map[string]string SubNetworkID string SecurityGroupIDs []string CopyUpload bool // THIS IS NOT PARAMETER. If you set it on endpoint into true/false - nothing happened. It's workaround, this flag is set by common code (Activate/UploadTable) automatically. You have not options to turn-off CopyUpload behaviour. PerTransactionPush bool Cleanup dp_model.CleanupType BatchSize int // deprecated: use BufferTriggingSize instead BufferTriggingSize uint64 BufferTriggingInterval time.Duration QueryTimeout time.Duration DisableSQLFallback bool ConnectionID string }
func (*PgDestination) AllHosts ¶
func (d *PgDestination) AllHosts() []string
AllHosts - function to move from legacy 'Host' into modern 'Hosts'
func (*PgDestination) BuffererConfig ¶
func (d *PgDestination) BuffererConfig() bufferer.BuffererConfig
func (*PgDestination) CleanupMode ¶
func (d *PgDestination) CleanupMode() dp_model.CleanupType
func (*PgDestination) FillDependentFields ¶
func (d *PgDestination) FillDependentFields(transfer *dp_model.Transfer)
func (*PgDestination) GetConnectionID ¶
func (d *PgDestination) GetConnectionID() string
func (*PgDestination) GetProviderType ¶
func (d *PgDestination) GetProviderType() abstract.ProviderType
func (*PgDestination) HasTLS ¶
func (d *PgDestination) HasTLS() bool
func (PgDestination) IsDestination ¶
func (PgDestination) IsDestination()
func (*PgDestination) MDBClusterID ¶
func (d *PgDestination) MDBClusterID() string
func (*PgDestination) ReliesOnSystemTablesTransferring ¶
func (d *PgDestination) ReliesOnSystemTablesTransferring() bool
func (*PgDestination) ToSinkParams ¶
func (d *PgDestination) ToSinkParams() PgDestinationWrapper
func (*PgDestination) ToStorageParams ¶
func (d *PgDestination) ToStorageParams() *PgStorageParams
func (*PgDestination) Transformer ¶
func (d *PgDestination) Transformer() map[string]string
func (*PgDestination) Validate ¶
func (d *PgDestination) Validate() error
func (*PgDestination) WithDefaults ¶
func (d *PgDestination) WithDefaults()
type PgDestinationWrapper ¶
type PgDestinationWrapper struct {
Model *PgDestination
}
func (PgDestinationWrapper) AllHosts ¶
func (d PgDestinationWrapper) AllHosts() []string
func (PgDestinationWrapper) CleanupMode ¶
func (d PgDestinationWrapper) CleanupMode() dp_model.CleanupType
func (PgDestinationWrapper) ClusterID ¶
func (d PgDestinationWrapper) ClusterID() string
func (PgDestinationWrapper) ConnectionID ¶
func (d PgDestinationWrapper) ConnectionID() string
func (PgDestinationWrapper) CopyUpload ¶
func (d PgDestinationWrapper) CopyUpload() bool
func (PgDestinationWrapper) Database ¶
func (d PgDestinationWrapper) Database() string
func (PgDestinationWrapper) DisableSQLFallback ¶
func (d PgDestinationWrapper) DisableSQLFallback() bool
func (PgDestinationWrapper) HasTLS ¶
func (d PgDestinationWrapper) HasTLS() bool
func (PgDestinationWrapper) IgnoreUniqueConstraint ¶
func (d PgDestinationWrapper) IgnoreUniqueConstraint() bool
func (PgDestinationWrapper) LoozeMode ¶
func (d PgDestinationWrapper) LoozeMode() bool
func (PgDestinationWrapper) MaintainTables ¶
func (d PgDestinationWrapper) MaintainTables() bool
func (PgDestinationWrapper) Password ¶
func (d PgDestinationWrapper) Password() string
func (PgDestinationWrapper) PerTransactionPush ¶
func (d PgDestinationWrapper) PerTransactionPush() bool
func (PgDestinationWrapper) Port ¶
func (d PgDestinationWrapper) Port() int
func (PgDestinationWrapper) QueryTimeout ¶
func (d PgDestinationWrapper) QueryTimeout() time.Duration
func (PgDestinationWrapper) TLSFile ¶
func (d PgDestinationWrapper) TLSFile() string
func (PgDestinationWrapper) Tables ¶
func (d PgDestinationWrapper) Tables() map[string]string
func (PgDestinationWrapper) User ¶
func (d PgDestinationWrapper) User() string
type PgDumpSteps ¶
type PgDumpSteps struct {
Table, PrimaryKey, View, Sequence bool
SequenceOwnedBy, Rule, Type bool
Constraint, FkConstraint, Index, Function bool
Collation, Trigger, Policy, Cast bool
Default bool
MaterializedView bool
SequenceSet *bool
TableAttach bool
IndexAttach bool
}
func DefaultPgDumpPostSteps ¶
func DefaultPgDumpPostSteps() *PgDumpSteps
func DefaultPgDumpPreSteps ¶
func DefaultPgDumpPreSteps() *PgDumpSteps
func (*PgDumpSteps) AnyStepIsTrue ¶
func (p *PgDumpSteps) AnyStepIsTrue() bool
func (*PgDumpSteps) List ¶
func (p *PgDumpSteps) List() []string
type PgErrorCode ¶
type PgErrorCode string
const ( ErrcUniqueViolation PgErrorCode = "23505" ErrcWrongObjectType PgErrorCode = "42809" ErrcRelationDoesNotExists PgErrorCode = "42P01" ErrcSchemaDoesNotExists PgErrorCode = "3F000" ErrcObjectNotInPrerequisiteState PgErrorCode = "55000" ErrcInvalidPassword PgErrorCode = "28P01" ErrcInvalidAuthSpec PgErrorCode = "28000" )
PostgreSQL error codes from https://www.postgresql.org/docs/12/errcodes-appendix.html
type PgObjectType ¶
type PgObjectType string
const ( Table PgObjectType = "TABLE" TableAttach PgObjectType = "TABLE_ATTACH" PrimaryKey PgObjectType = "PRIMARY_KEY" View PgObjectType = "VIEW" Sequence PgObjectType = "SEQUENCE" SequenceSet PgObjectType = "SEQUENCE_SET" SequenceOwnedBy PgObjectType = "SEQUENCE_OWNED_BY" Rule PgObjectType = "RULE" Type PgObjectType = "TYPE" Constraint PgObjectType = "CONSTRAINT" FkConstraint PgObjectType = "FK_CONSTRAINT" Index PgObjectType = "INDEX" IndexAttach PgObjectType = "INDEX_ATTACH" Function PgObjectType = "FUNCTION" Collation PgObjectType = "COLLATION" Trigger PgObjectType = "TRIGGER" Policy PgObjectType = "POLICY" Cast PgObjectType = "CAST" MaterializedView PgObjectType = "MATERIALIZED_VIEW" )
func (PgObjectType) IsValid ¶
func (pot PgObjectType) IsValid() error
type PgSerializationFormat ¶
type PgSerializationFormat string
type PgSinkParams ¶
type PgSinkParams interface { ClusterID() string AllHosts() []string Port() int Database() string User() string Password() string HasTLS() bool TLSFile() string // MaintainTables // If true - on every batchInsert calls 'create schema...' + 'create table ...' // For now, it's not true on every running-transfer // It's for lb->pg delivery. Can be auto-derived // It's like legacy // private option MaintainTables() bool // PerTransactionPush // It's 'SaveTxBoundaries' from proto-spec PerTransactionPush() bool // LoozeMode // If 'true' - when error occurs, we are logging error, and return nil. So it's 'lose data if error' // private option LoozeMode() bool CleanupMode() model.CleanupType // Tables // It's altnames source->destination // private option Tables() map[string]string // CopyUpload // use mechanism 'CopyUpload' for inserts CopyUpload() bool // IgnoreUniqueConstraint // Ignore 'uniqueViolation' error - just go further // private option IgnoreUniqueConstraint() bool // QueryTimeout returns the timeout for query execution by this sink QueryTimeout() time.Duration // DisableSQLFallback returns true if the sink should never use SQL when copying snapshot and should always use "COPY FROM" DisableSQLFallback() bool ConnectionID() string }
type PgSource ¶
type PgSource struct { // oneof ClusterID string `json:"Cluster"` Host string // legacy field for back compatibility; for now, we are using only 'Hosts' field Hosts []string Database string User string Password model.SecretString Port int DBTables []string BatchSize uint32 // BatchSize is a limit on the number of rows in the replication (not snapshot) source internal buffer SlotID string SlotByteLagLimit int64 TLSFile string EnableTLS bool KeeperSchema string SubNetworkID string SecurityGroupIDs []string CollapseInheritTables bool UsePolling bool ExcludedTables []string IsHomo bool NoHomo bool // force hetero relations instead of homo-haram-stuff AutoActivate bool PreSteps *PgDumpSteps PostSteps *PgDumpSteps UseFakePrimaryKey bool IgnoreUserTypes bool IgnoreUnknownTables bool // see: if table schema unknown - ignore it TM-3104 and TM-2934 MaxBufferSize model.BytesSize // Deprecated: is not used anymore ExcludeDescendants bool // Deprecated: is not used more, use CollapseInheritTables instead DesiredTableSize uint64 // desired table part size for snapshot sharding SnapshotDegreeOfParallelism int // desired table parts count for snapshot sharding EmitTimeTypes bool // Deprecated: is not used anymore DBLogEnabled bool // force DBLog snapshot instead of common ChunkSize uint64 // number of rows in chunk, this field needed for DBLog snapshot, if it is 0, it will be calculated automatically // Whether text or binary serialization format should be used when readeing // snapshot from PostgreSQL storage snapshot (see // https://www.postgresql.org/docs/current/protocol-overview.html#PROTOCOL-FORMAT-CODES). // If not specified (i.e. equal to PgSerializationFormatAuto), defaults to // "binary" for homogeneous pg->pg transfer, and to "text" in all other // cases. Binary is preferred for pg->pg transfers since we use CopyFrom // function from pgx driver and it requires all values to be binary // serializable. SnapshotSerializationFormat PgSerializationFormat ShardingKeyFields map[string][]string PgDumpCommand []string ConnectionID string }
func (*PgSource) AllIncludes ¶
func (*PgSource) ExcludeWithGlobals ¶
func (*PgSource) ExtraTransformers ¶
func (*PgSource) FillDependentFields ¶
func (*PgSource) FulfilledIncludes ¶
func (*PgSource) GetConnectionID ¶
func (*PgSource) GetProviderType ¶
func (s *PgSource) GetProviderType() abstract.ProviderType
func (*PgSource) IsIncremental ¶
func (*PgSource) IsIncremental()
func (*PgSource) IsStrictSource ¶
func (*PgSource) IsStrictSource()
func (*PgSource) MDBClusterID ¶
func (*PgSource) SupportsStartCursorValue ¶
func (*PgSource) ToSinkParams ¶
func (s *PgSource) ToSinkParams() PgSourceWrapper
func (*PgSource) ToStorageParams ¶
func (s *PgSource) ToStorageParams(transfer *model.Transfer) *PgStorageParams
func (*PgSource) WithDefaults ¶
func (s *PgSource) WithDefaults()
type PgSourceWrapper ¶
type PgSourceWrapper struct { Model *PgSource // contains filtered or unexported fields }
func (PgSourceWrapper) AllHosts ¶
func (d PgSourceWrapper) AllHosts() []string
func (PgSourceWrapper) CleanupMode ¶
func (d PgSourceWrapper) CleanupMode() model.CleanupType
func (PgSourceWrapper) ClusterID ¶
func (d PgSourceWrapper) ClusterID() string
func (PgSourceWrapper) ConnectionID ¶
func (d PgSourceWrapper) ConnectionID() string
func (PgSourceWrapper) CopyUpload ¶
func (d PgSourceWrapper) CopyUpload() bool
func (PgSourceWrapper) Database ¶
func (d PgSourceWrapper) Database() string
func (PgSourceWrapper) DisableSQLFallback ¶
func (d PgSourceWrapper) DisableSQLFallback() bool
func (PgSourceWrapper) HasTLS ¶
func (d PgSourceWrapper) HasTLS() bool
func (PgSourceWrapper) IgnoreUniqueConstraint ¶
func (d PgSourceWrapper) IgnoreUniqueConstraint() bool
func (PgSourceWrapper) LoozeMode ¶
func (d PgSourceWrapper) LoozeMode() bool
func (PgSourceWrapper) MaintainTables ¶
func (d PgSourceWrapper) MaintainTables() bool
func (PgSourceWrapper) Password ¶
func (d PgSourceWrapper) Password() string
func (PgSourceWrapper) PerTransactionPush ¶
func (d PgSourceWrapper) PerTransactionPush() bool
func (PgSourceWrapper) Port ¶
func (d PgSourceWrapper) Port() int
func (PgSourceWrapper) QueryTimeout ¶
func (d PgSourceWrapper) QueryTimeout() time.Duration
func (*PgSourceWrapper) SetMaintainTables ¶
func (d *PgSourceWrapper) SetMaintainTables(maintainTables bool)
func (PgSourceWrapper) TLSFile ¶
func (d PgSourceWrapper) TLSFile() string
func (PgSourceWrapper) Tables ¶
func (d PgSourceWrapper) Tables() map[string]string
func (PgSourceWrapper) User ¶
func (d PgSourceWrapper) User() string
type PgStorageParams ¶
type PgStorageParams struct { AllHosts []string // should be non-empty only one field: Hosts/ClusterID Port int User string Password string Database string ClusterID string // should be non-empty only one field: Hosts/ClusterID TLSFile string EnableTLS bool UseFakePrimaryKey bool DBFilter []string IgnoreUserTypes bool PreferReplica bool // has the meaning only if ClusterID not empty. Choosing replica is implemented via mdb api. If set as 'true' - it expects you initialized dbaas.Initialize*Cloud ExcludeDescendants bool DesiredTableSize uint64 SnapshotDegreeOfParallelism int ConnString string // used in greenplum TableFilter abstract.Includeable TryHostCACertificates bool // will force SSL connection with host-provided SSL certificates UseBinarySerialization bool // Whether binary serialization format should be used. Defaults to true in homogeneous pg->pg transfers. SlotID string ShardingKeyFields map[string][]string ConnectionID string }
func (*PgStorageParams) HasTLS ¶
func (p *PgStorageParams) HasTLS() bool
func (*PgStorageParams) String ¶
func (p *PgStorageParams) String() string
func (*PgStorageParams) TLSConfigTemplate ¶
func (p *PgStorageParams) TLSConfigTemplate() (*tls.Config, error)
TLSConfigTemplate returns a TLS configuration template without ServerName set. It returns nil when a no-TLS connection is requested by the configuration.
type PostgreSQLFlavour ¶
type PostgreSQLFlavour struct{}
func NewPostgreSQLFlavour ¶
func NewPostgreSQLFlavour() *PostgreSQLFlavour
func (*PostgreSQLFlavour) ListSchemaQuery ¶
func (*PostgreSQLFlavour) ListTablesQuery ¶
func (f *PostgreSQLFlavour) ListTablesQuery(excludeViews bool, forbiddenSchemas []string, forbiddenTables []string) string
func (*PostgreSQLFlavour) PgClassFilter ¶
func (f *PostgreSQLFlavour) PgClassFilter() string
func (*PostgreSQLFlavour) PgClassRelsOnlyFilter ¶
func (f *PostgreSQLFlavour) PgClassRelsOnlyFilter() string
type PostgresSlotKiller ¶
type PostgresSlotKiller struct {
Slot AbstractSlot
}
func (*PostgresSlotKiller) KillSlot ¶
func (k *PostgresSlotKiller) KillSlot() error
type Provider ¶
type Provider struct {
// contains filtered or unexported fields
}
func (*Provider) Activate ¶
func (p *Provider) Activate(ctx context.Context, task *model.TransferOperation, tables abstract.TableMap, callbacks providers.ActivateCallbacks) error
func (*Provider) DBLogUpload ¶
func (*Provider) Deactivate ¶
func (*Provider) DestinationSampleableStorage ¶
func (p *Provider) DestinationSampleableStorage() (abstract.SampleableStorage, error)
func (*Provider) SourceSampleableStorage ¶
func (p *Provider) SourceSampleableStorage() (abstract.SampleableStorage, []abstract.TableDescription, error)
func (*Provider) Type ¶
func (p *Provider) Type() abstract.ProviderType
type SchemaExtractor ¶
type SchemaExtractor struct {
// contains filtered or unexported fields
}
func NewSchemaExtractor ¶
func NewSchemaExtractor() *SchemaExtractor
func (*SchemaExtractor) FindDependentViews ¶
func (*SchemaExtractor) LoadSchema ¶
func (e *SchemaExtractor) LoadSchema(ctx context.Context, conn *pgx.Conn, specificTable *abstract.TableID) (abstract.DBSchema, error)
LoadSchema returns a settings-customized schema(s) of table(s) in PostgreSQL
func (*SchemaExtractor) TablesList ¶
func (e *SchemaExtractor) TablesList(ctx context.Context, conn *pgx.Conn) ([]tableIDWithInfo, time.Time, error)
TablesList returns a list of basic information pieces about all tables in the given schema
func (*SchemaExtractor) WithExcludeViews ¶
func (e *SchemaExtractor) WithExcludeViews(excludeViews bool) *SchemaExtractor
func (*SchemaExtractor) WithFlavour ¶
func (e *SchemaExtractor) WithFlavour(flavour DBFlavour) *SchemaExtractor
func (*SchemaExtractor) WithForbiddenSchemas ¶
func (e *SchemaExtractor) WithForbiddenSchemas(forbiddenSchemas []string) *SchemaExtractor
func (*SchemaExtractor) WithForbiddenTables ¶
func (e *SchemaExtractor) WithForbiddenTables(forbiddenTables []string) *SchemaExtractor
func (*SchemaExtractor) WithLogger ¶
func (e *SchemaExtractor) WithLogger(logger log.Logger) *SchemaExtractor
func (*SchemaExtractor) WithUseFakePrimaryKey ¶
func (e *SchemaExtractor) WithUseFakePrimaryKey(useFakePrimaryKey bool) *SchemaExtractor
type SequenceInfo ¶
SequenceInfo is a description of a PostgreSQL sequence
type SequenceMap ¶
type SequenceMap map[abstract.TableID]*SequenceInfo
SequenceMap is a mapping of sequence identifiers to their descriptions
func ListSequencesWithDependants ¶
func ListSequencesWithDependants(ctx context.Context, conn *pgx.Conn, serviceSchema string) (SequenceMap, error)
ListSequencesWithDependants returns a mapping with sequence information.
This method executes CREATE FUNCTION and DROP FUNCTION statements.
type Slot ¶
type Slot struct {
// contains filtered or unexported fields
}
func (*Slot) SuicideImpl ¶
type SlotMonitor ¶
type SlotMonitor struct {
// contains filtered or unexported fields
}
func NewSlotMonitor ¶
func NewSlotMonitor(conn *pgxpool.Pool, slotName string, slotDatabaseName string, metrics *stats.SourceStats, logger log.Logger) *SlotMonitor
NewSlotMonitor constructs a slot monitor, but does NOT start it.
The provided pool is NOT closed by the monitor. It is the caller's responsibility to manage the pool.
func (*SlotMonitor) Close ¶
func (m *SlotMonitor) Close()
func (*SlotMonitor) StartSlotMonitoring ¶
func (m *SlotMonitor) StartSlotMonitoring(maxSlotByteLag int64) <-chan error
type SnapshotKey ¶
type SnapshotKey string
func (SnapshotKey) String ¶
func (c SnapshotKey) String() string
type SnapshotState ¶
type Storage ¶
type Storage struct { Config *PgStorageParams Conn *pgxpool.Pool IsHomo bool ForbiddenSchemas []string ForbiddenTables []string Flavour DBFlavour ShardedStateLSN string ShardedStateTS time.Time // contains filtered or unexported fields }
func NewStorage ¶
func NewStorage(config *PgStorageParams, opts ...StorageOpt) (*Storage, error)
func (*Storage) BeginPGSnapshot ¶
Named BeginPGSnapshot to NOT match abstract.SnapshotableStorage
func (*Storage) BuildTypeMapping ¶
func (s *Storage) BuildTypeMapping(ctx context.Context) (TypeNameToOIDMap, error)
func (*Storage) EndPGSnapshot ¶
Named EndPGSnapshot to NOT match abstract.SnapshotableStorage
func (*Storage) EstimateTableRowsCount ¶
func (*Storage) ExactTableDescriptionRowsCount ¶
func (*Storage) ExactTableRowsCount ¶
func (*Storage) GetIncrementalState ¶
func (s *Storage) GetIncrementalState(ctx context.Context, incremental []abstract.IncrementalTable) ([]abstract.TableDescription, error)
func (*Storage) GetInheritedTables ¶
func (*Storage) LoadQueryTable ¶
func (s *Storage) LoadQueryTable(ctx context.Context, tableQuery tablequery.TableQuery, pusher abstract.Pusher) error
func (*Storage) LoadRandomSample ¶
func (*Storage) LoadSampleBySet ¶
func (*Storage) LoadSchemaForTable ¶
func (s *Storage) LoadSchemaForTable(ctx context.Context, conn *pgx.Conn, table abstract.TableDescription) (*abstract.TableSchema, error)
func (*Storage) LoadTopBottomSample ¶
func (*Storage) OrderedRead ¶
func (s *Storage) OrderedRead( t *abstract.TableDescription, schemas []abstract.ColSchema, sortOrder SortOrder, filter abstract.WhereStatement, duplicatesPolicy DuplicatesPolicy, excludeDescendants bool, ) string
func (*Storage) RunSlotMonitor ¶
func (*Storage) SetInitialState ¶
func (s *Storage) SetInitialState(tables []abstract.TableDescription, incrementalTables []abstract.IncrementalTable)
func (*Storage) SetLoadDescending ¶
func (*Storage) SetShardingContext ¶
func (*Storage) ShardTable ¶
func (s *Storage) ShardTable(ctx context.Context, table abstract.TableDescription) ([]abstract.TableDescription, error)
function 'ShardTable' tries to shard one table/view into parts (split task for snapshotting one table/view into parts):
TableDescription -> []TableDescription
who can be sharded & who can't be sharded:
- tables with non-empty 'Offset' - not sharded
- tables with non-empty 'Filter' (dolivochki or ad-hoc upload_table) - sharded
- views where filled explicitKeys - sharded
func (*Storage) ShardingContext ¶
func (*Storage) SnapshotLSN ¶
func (*Storage) TableAccessible ¶
func (s *Storage) TableAccessible(table abstract.TableDescription) bool
func (*Storage) TableSchema ¶
func (*Storage) TableSizeInBytes ¶
func (*Storage) WorkersSnapshotState ¶
func (s *Storage) WorkersSnapshotState() *SnapshotState
type StorageOpt ¶
type StorageOpt interface {
// contains filtered or unexported methods
}
go-sumtype:decl StorageOpt
func WithMetrics ¶
func WithMetrics(registry metrics.Registry) StorageOpt
func WithTypeMapping ¶
func WithTypeMapping(typeMapping TypeNameToOIDMap) StorageOpt
type TextDecoderAndValuerWithHomo ¶
type TextDecoderAndValuerWithHomo interface { pgtype.TextDecoder driver.Valuer abstract.HomoValuer }
type Timestamp ¶
func NewTimestamp ¶
NewTimestamp constructs a TIMESTAMP WITHOUT TIME ZONE representation which supports BC years
TODO: this type must become significantly simpler after https://st.yandex-team.ru/TM-5127 is done
func (*Timestamp) DecodeText ¶
type Timestamptz ¶
type Timestamptz struct {
pgtype.Timestamptz
}
func NewTimestamptz ¶
func NewTimestamptz() *Timestamptz
NewTimestamptz constructs a TIMESTAMP WITH TIME ZONE representation which supports BC years
TODO: remove this when https://st.yandex-team.ru/TM-5127 is done
func (*Timestamptz) DecodeText ¶
func (t *Timestamptz) DecodeText(ci *pgtype.ConnInfo, src []byte) error
func (*Timestamptz) HomoValue ¶
func (t *Timestamptz) HomoValue() any
type TypeFullName ¶
type TypeFullName struct { Namespace string // typnamespace from pg_type Name string // typname from pg_type }
Text representation of PostgreSQL type from pg_type catalog, opposed to OID: https://www.postgresql.org/docs/current/catalog-pg-type.html
type TypeMapping ¶
type TypeMapping struct{ TypeNameToOIDMap }
type TypeNameToOIDMap ¶
type TypeNameToOIDMap map[TypeFullName]pgtype.OID
type Unmarshaller ¶
type Unmarshaller struct {
// contains filtered or unexported fields
}
Unmarshaller converts data from the PostgreSQL format to the Data Transfer format. The format differs for homogenous and heterogenous transfers, but the same unmarshaller is used in both cases.
func NewUnmarshaller ¶
func NewUnmarshaller(castData *UnmarshallerData, connInfo *pgtype.ConnInfo, schema *abstract.ColSchema, fieldDesc *pgproto3.FieldDescription) (*Unmarshaller, error)
type UnmarshallerData ¶
type UnmarshallerData struct {
// contains filtered or unexported fields
}
func MakeUnmarshallerData ¶
func MakeUnmarshallerData(isHomo bool, conn *pgx.Conn) UnmarshallerData
type Wal2JSONItem ¶
type Wal2JSONItem struct { ID uint32 `json:"id" yson:"id"` LSN uint64 `json:"nextlsn" yson:"nextlsn"` CommitTime uint64 `json:"commitTime" yson:"commitTime"` Counter int `json:"txPosition" yson:"txPosition"` Kind abstract.Kind `json:"kind" yson:"kind"` Schema string `json:"schema" yson:"schema"` Table string `json:"table" yson:"table"` PartID string `json:"part" yson:"-"` ColumnNames []string `json:"columnnames" yson:"columnnames"` ColumnValues []interface{} `json:"columnvalues,omitempty" yson:"columnvalues"` TableSchema []abstract.ColSchema `json:"table_schema,omitempty" yson:"table_schema"` OldKeys OldKeysType `json:"oldkeys" yson:"oldkeys"` TxID string `json:"tx_id" yson:"-"` Query string `json:"query" yson:"-"` Size abstract.EventSize `json:"-" yson:"-"` ColumnTypeOIDs []pgtype.OID `json:"columntypeoids"` }
This is basically an abstract.ChangeItem, but with two additional fields: * columntypeoids * oldkeys.keytypeoids Note that historically abstract.ChangeItem's structure was exactly the same as the structure of objects we received from wal2json, since PostgreSQL was one of the first endpoints implemented in DataTransfer. Later ChangeItem had to encompass other databases and become "abstract". There is no point in adding any more PostgreSQL specifics into it, so we use Wal2JSONItem for parsing PostgreSQL changes from wal2json now.
type Wal2JsonParser ¶
type Wal2JsonParser struct {
// contains filtered or unexported fields
}
func NewWal2JsonParser ¶
func NewWal2JsonParser() *Wal2JsonParser
func (*Wal2JsonParser) Close ¶
func (p *Wal2JsonParser) Close()
func (*Wal2JsonParser) Parse ¶
func (p *Wal2JsonParser) Parse(data []byte) (result []*Wal2JSONItem, err error)
Source Files ¶
- array.go
- change_processor.go
- changeitems_fetcher.go
- changeitems_rows_stub.go
- client.go
- complex_type.go
- composite.go
- conn.go
- create_replication_slot.go
- date.go
- drop_replication_slot.go
- error.go
- fallback_bit_as_bytes.go
- fallback_date_as_string.go
- fallback_not_null_as_null.go
- fallback_timestamp_utc.go
- flavour.go
- generic_array.go
- hstore.go
- incremental_storage.go
- keeper.go
- keywords.go
- list_names.go
- logger.go
- lsn_slot.go
- model.go
- model_pg_destination.go
- model_pg_sink_params.go
- model_pg_source.go
- model_pg_storage_params.go
- mutexed_pgconn.go
- parent_resolver.go
- pg_dump.go
- provider.go
- publisher.go
- publisher_polling.go
- publisher_replication.go
- schema.go
- sequence.go
- sharding_partition_storage.go
- sharding_storage.go
- sink.go
- slot.go
- slot_monitor.go
- source_specific_properties.go
- source_wrapper.go
- storage.go
- table_information.go
- timestamp.go
- timestamptz.go
- timetz.go
- transcoder_adapter.go
- txutils.go
- type.go
- typesystem.go
- unmarshaller.go
- unmarshaller_hetero.go
- verify_tables.go
- version.go
- wal2json_item.go
- wal2json_parser.go