postgres

package
v0.0.0-rc2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 19, 2024 License: Apache-2.0 Imports: 81 Imported by: 0

Documentation

Index

Constants

View Source
const (
	PgSerializationFormatAuto   = PgSerializationFormat("")
	PgSerializationFormatText   = PgSerializationFormat("text")
	PgSerializationFormatBinary = PgSerializationFormat("binary")
)
View Source
const (
	TableConsumerKeeper = abstract.TableConsumerKeeper // "__consumer_keeper"
	TableMoleFinder     = "__data_transfer_mole_finder"
	TableLSN            = abstract.TableLSN // "__data_transfer_lsn"
)
View Source
const (
	PgDateFormat      = "2006-01-02"
	PgDatetimeFormat  = "2006-01-02 15:04:05Z07:00:00"
	PgTimestampFormat = "2006-01-02 15:04:05.999999Z07:00:00"
)
View Source
const (
	SortAsc  SortOrder = "asc"
	SortDesc SortOrder = "desc"

	All DuplicatesPolicy = "all"

	PartitionsFilterPrefix = "__dt_resolved_hard:"
)
View Source
const BufferLimit = 16 * humanize.MiByte
View Source
const DatabaseTimeZone = abstract.PropertyKey("pg:database_timezone")
View Source
const EnumAllValues = abstract.PropertyKey("pg:enum_all_values")
View Source
const FakeParentPKeyStatusMessageCategory string = "fake_primary_key_parent"
View Source
const PGDefaultQueryTimeout time.Duration = 30 * time.Minute
View Source
const ProviderType = abstract.ProviderType("pg")
View Source
const SelectCurrentLsnDelay = `select pg_wal_lsn_diff(pg_last_wal_replay_lsn(), $1);`
View Source
const TimeZoneParameterStatusKey string = "TimeZone"

TimeZoneParameterStatusKey is the identifier of the PostgreSQL connection property containing time zone

Variables

View Source
var (
	InitLSNSlotDDL = fmt.Sprintf(`
create table if not exists %%s.%s (
	slot_id   text primary key,
	iterator int,
	replicated_iterator int,
	last_iterator_update timestamp,
	last_replicated_update timestamp,
	committed_lsn text
);`, TableMoleFinder)

	MoveIteratorQuery = fmt.Sprintf(`
update %%s.%s set iterator = iterator + 1, last_iterator_update = now()
where slot_id = $1;`, TableMoleFinder)

	UpdateReplicatedQuery = fmt.Sprintf(`
update %%s.%s set replicated_iterator = $2, last_replicated_update = now()
where slot_id = $1 and replicated_iterator = $2 - 1;`, TableMoleFinder)

	SelectCommittedLSN = fmt.Sprintf(`
select committed_lsn from %%s.%s where slot_id = $1;`, TableMoleFinder)

	InsertIntoMoleFinder = fmt.Sprintf(`
insert into %%s.%s
	(slot_id, iterator, replicated_iterator, last_iterator_update, last_replicated_update, committed_lsn)
values
	($1, null, null, now(), now(), $2)
on conflict (slot_id) do update
	set
		replicated_iterator = null,
		iterator = null,
		last_iterator_update = now(),
		last_replicated_update = now(),
		committed_lsn = excluded.committed_lsn;`, TableMoleFinder)

	DeleteFromMoleFinder = fmt.Sprintf(`delete from %%s.%s where slot_id = $1`, TableMoleFinder)

	InsertIntoMoleFinder2 = fmt.Sprintf(`
insert into %%s.%s (slot_id, committed_lsn)
values ($1, $2)
on conflict (slot_id) do update set committed_lsn = excluded.committed_lsn;`, TableMoleFinder)

	SelectCommittedLSN2 = fmt.Sprintf(`
select committed_lsn, replicated_iterator from %%s.%s where slot_id = $1`, TableMoleFinder)

	SelectLsnForSlot = `select restart_lsn from pg_replication_slots where slot_name = $1;`
)
View Source
var ErrBadPostgresHealth = errors.New("postgres health problem")
View Source
var ErrConsumerLocked = errors.New("keeper: Consumer already locked")
View Source
var (
	LSNTrackTableDDL = fmt.Sprintf(`
		create table if not exists %s(
			transfer_id text,
			schema_name text,
			table_name text,
			lsn bigint,
			primary key (transfer_id, schema_name, table_name)
		);`,
		TableLSN,
	)
)
View Source
var (
	NoPrimaryKeyCode = coded.Register("postgres", "no_primary_key")
)
View Source
var PGGlobalExclude = []abstract.TableID{
	{Namespace: "public", Name: "repl_mon"},
}

Functions

func ApplyCommands

func ApplyCommands(commands []*pgDumpItem, transfer server.Transfer, registry metrics.Registry, types ...string) error

func ApplyPgDumpPostSteps

func ApplyPgDumpPostSteps(pgdump []*pgDumpItem, transfer *server.Transfer, registry metrics.Registry) error

ApplyPgDumpPostSteps takes the given dump and applies post-steps defined in transfer source ONLY for homogenous PG-PG transfers. It also logs its actions

func ApplyPgDumpPreSteps

func ApplyPgDumpPreSteps(pgdump []*pgDumpItem, transfer *server.Transfer, registry metrics.Registry) error

ApplyPgDumpPreSteps takes the given dump and applies pre-steps defined in transfer source ONLY for homogenous PG-PG transfers. It also logs its actions

func BeginTx

func BeginTx(ctx context.Context, conn *pgx.Conn, options pgx.TxOptions, lgr log.Logger) (pgx.Tx, *util.Rollbacks, error)

BeginTx starts a transaction for the given pool with the given options and automatically composes sufficient rollback object.

func BeginTxWithSnapshot

func BeginTxWithSnapshot(ctx context.Context, conn *pgx.Conn, options pgx.TxOptions, snapshot string, lgr log.Logger) (pgx.Tx, *util.Rollbacks, error)

BeginTxWithSnapshot starts a transaction at the given connection with the given snapshot and automatically composes a sufficient rollback object.

If the source database does not support SET TRANSACTION SNAPSHOT or the given snapshot identifier is an empty string, a transaction without snapshot is started automatically.

func BuildColSchemaArrayElement

func BuildColSchemaArrayElement(colSchema abstract.ColSchema) abstract.ColSchema

func CalculatePartCount

func CalculatePartCount(totalSize, desiredPartSize, partCountLimit uint64) uint64

func ClearOriginalType

func ClearOriginalType(pgType string) string

func CreateReplicationSlot

func CreateReplicationSlot(src *PgSource) error

func CreateSchemaQueryOptional

func CreateSchemaQueryOptional(fullTableName string) string

CreateSchemaQueryOptional returns a query to create schema if it is absent. May return empty string, in which case schema must not be created. Full table name may contain quotes in schema name, which are removed.

func CreateTableQuery

func CreateTableQuery(fullTableName string, schema []abstract.ColSchema) (string, error)

func CurrentTxStartTime

func CurrentTxStartTime(ctx context.Context, conn *pgx.Conn) (time.Time, error)

CurrentTxStartTime returns the start time of the current transaction at the given connection. If there is no active transaction, current time of the database is returned.

func DaterangeToString

func DaterangeToString(t []time.Time) string

func DropReplicationSlot

func DropReplicationSlot(src *PgSource) error

func ExtractPgDumpSchema

func ExtractPgDumpSchema(transfer *server.Transfer) ([]*pgDumpItem, error)

ExtractPgDumpSchema returns the dump ONLY for homogenous PG-PG transfers. It also logs its actions

func FallbackBitAsBytes

func FallbackBitAsBytes(item *abstract.ChangeItem) (*abstract.ChangeItem, error)

FallbackBitAsBytes implements backward compatibility for https://st.yandex-team.ru/TM-5445#640b22b5af0c626bd522d179

WARNING: this fallback violates type strictness!

func FallbackNotNullAsNull

func FallbackNotNullAsNull(ci *abstract.ChangeItem) (*abstract.ChangeItem, error)

func FallbackTimestampToUTC

func FallbackTimestampToUTC(item *abstract.ChangeItem) (*abstract.ChangeItem, error)

FallbackTimestampToUTC implements https://st.yandex-team.ru/TM-5092

func GetArrElemTypeDescr

func GetArrElemTypeDescr(originalType string) string

func GetCurrentStateOfSequence

func GetCurrentStateOfSequence(ctx context.Context, conn *pgx.Conn, sequenceID abstract.TableID) (lastValue int64, isCalled bool, err error)

func GetDeleteFromMoleFinder

func GetDeleteFromMoleFinder(schema string) string

func GetInitLSNSlotDDL

func GetInitLSNSlotDDL(schema string) string

func GetInsertIntoMoleFinder

func GetInsertIntoMoleFinder(schema string) string

func GetInsertIntoMoleFinder2

func GetInsertIntoMoleFinder2(schema string) string

func GetMoveIteratorQuery

func GetMoveIteratorQuery(schema string) string

func GetPropertyEnumAllValues

func GetPropertyEnumAllValues(in *abstract.ColSchema) []string

func GetSelectCommittedLSN

func GetSelectCommittedLSN(schema string) string

func GetSelectCommittedLSN2

func GetSelectCommittedLSN2(schema string) string

func GetUpdateReplicatedQuery

func GetUpdateReplicatedQuery(schema string) string

func HstoreToJSON

func HstoreToJSON(colVal string) (string, error)

func HstoreToMap

func HstoreToMap(colVal string) (map[string]interface{}, error)

func IsKeyword

func IsKeyword(name string) bool

Returns whether name is a keyword or not according to a well-known keywords list. See https://www.postgresql.org/docs/current/sql-keywords-appendix.html

func IsPKeyCheckError

func IsPKeyCheckError(err error) bool

func IsPgError

func IsPgError(err error, code PgErrorCode) bool

func IsPgTypeTimeWithTimeZone

func IsPgTypeTimeWithTimeZone(originalType string) bool

func IsPgTypeTimeWithTimeZoneUnprefixed

func IsPgTypeTimeWithTimeZoneUnprefixed(in string) bool

func IsPgTypeTimeWithoutTimeZone

func IsPgTypeTimeWithoutTimeZone(originalType string) bool

func IsPgTypeTimeWithoutTimeZoneUnprefixed

func IsPgTypeTimeWithoutTimeZoneUnprefixed(in string) bool

func IsPgTypeTimestampWithTimeZone

func IsPgTypeTimestampWithTimeZone(originalType string) bool

func IsPgTypeTimestampWithTimeZoneUnprefixed

func IsPgTypeTimestampWithTimeZoneUnprefixed(in string) bool

func IsPgTypeTimestampWithoutTimeZone

func IsPgTypeTimestampWithoutTimeZone(originalType string) bool

func IsPgTypeTimestampWithoutTimeZoneUnprefixed

func IsPgTypeTimestampWithoutTimeZoneUnprefixed(in string) bool

func IsUserDefinedType

func IsUserDefinedType(col *abstract.ColSchema) bool

func JSONToHstore

func JSONToHstore(in string) (string, error)

func KeeperDDL

func KeeperDDL(schema string) string

func ListWithCommaSingleQuoted

func ListWithCommaSingleQuoted(values []string) string

func LockQuery

func LockQuery(t abstract.TableID, mode PGTableLockMode) string

func MakeConnConfigFromSink

func MakeConnConfigFromSink(lgr log.Logger, params PgSinkParams) (*pgx.ConnConfig, error)

func MakeConnConfigFromSrc

func MakeConnConfigFromSrc(lgr log.Logger, pgSrc *PgSource) (*pgx.ConnConfig, error)

func MakeConnConfigFromStorage

func MakeConnConfigFromStorage(lgr log.Logger, storage *PgStorageParams) (*pgx.ConnConfig, error)

func MakeConnPoolFromDst

func MakeConnPoolFromDst(pgDst *PgDestination, lgr log.Logger) (*pgxpool.Pool, error)

func MakeConnPoolFromSrc

func MakeConnPoolFromSrc(pgSrc *PgSource, lgr log.Logger) (*pgxpool.Pool, error)

func MakeInitDataTypes

func MakeInitDataTypes(options ...DataTypesOption) func(ctx context.Context, conn *pgx.Conn) error

func MakeSetSQL

func MakeSetSQL(key string, value string) string

func MinusToBC

func MinusToBC(v string) string

MinusToBC checks if the given string starts with a minus and if so, trims it and adds a "BC" suffix

func New

func NewPgConnPool

func NewPgConnPool(connConfig *pgx.ConnConfig, lgr log.Logger, dataTypesOptions ...DataTypesOption) (*pgxpool.Pool, error)

func NewPgConnPoolConfig

func NewPgConnPoolConfig(ctx context.Context, poolConfig *pgxpool.Config) (*pgxpool.Pool, error)

NewPgConnPoolConfig creates a connection pool. It provides built-in timeouts to limit the duration of connection attempts

func NewPollingPublisher

func NewPollingPublisher(
	version PgVersion,
	conn *pgxpool.Pool,
	slot AbstractSlot,
	registry *stats.SourceStats,
	cfg *PgSource,
	objects *server.DataObjects,
	transferID string,
	lgr log.Logger,
	cp coordinator.Coordinator,
) (abstract.Source, error)

func NewReplicationPublisher

func NewReplicationPublisher(version PgVersion, replConn *mutexedPgConn, connPool *pgxpool.Pool, slot AbstractSlot, stats *stats.SourceStats, source *PgSource, transferID string, lgr log.Logger, cp coordinator.Coordinator, objects *server.DataObjects) (abstract.Source, error)

func NewSink

func NewSink(lgr log.Logger, transferID string, config PgSinkParams, mtrcs metrics.Registry) (abstract.Sinker, error)

func NewSinkWithPool

func NewSinkWithPool(ctx context.Context, lgr log.Logger, transferID string, config PgSinkParams, mtrcs metrics.Registry, pool *pgxpool.Pool) (abstract.Sinker, error)

func NewSourceWrapper

func NewSourceWrapper(src *PgSource, transferID string, objects *server.DataObjects, lgr log.Logger, registry *stats.SourceStats, cp coordinator.Coordinator) (abstract.Source, error)

func ParseLsn

func ParseLsn(lsn string) uint64

func PgTypeToYTType

func PgTypeToYTType(pgType string) schema.Type

func PostgresDumpConnString

func PostgresDumpConnString(src *PgSource) (string, server.SecretString, error)

func ReadQuery

func ReadQuery(t *abstract.TableDescription, columns string) string

func Represent

func Represent(val interface{}, colSchema abstract.ColSchema) (string, error)

func RepresentWithCast

func RepresentWithCast(v interface{}, colSchema abstract.ColSchema) (string, error)

func RollbackFuncForPgxTx

func RollbackFuncForPgxTx(ctx context.Context, tx pgx.Tx, lgr log.Logger) func()

RollbackFuncForPgxTx returns a function which ROLLBACKs the given `pgx.Tx` and if an error happens, logs it with the given logger at WARNING level

func RowCount

func RowCount(ctx context.Context, conn *pgx.Conn, table *abstract.TableDescription) (uint64, error)

RowCount queries the storage and returns the exact number of rows in the given table

func RunSlotMonitor

func RunSlotMonitor(ctx context.Context, pgSrc *PgSource, registry metrics.Registry) (abstract.SlotKiller, <-chan error, error)

func Sanitize

func Sanitize(identifier string) string

Sanitize sanitizes the given identifier so that it can be used safely in queries to PostgreSQL. Note that the input cannot contain multiple identifiers (e.g., `schema.table` is not a valid input).

func SetInitialState

func SetInitialState(tables []abstract.TableDescription, incrementalTables []abstract.IncrementalTable)

func TableName

func TableName(t *abstract.TableDescription) string

func TimeWithTimeZoneToTime

func TimeWithTimeZoneToTime(val string) (time.Time, error)

TimeWithTimeZoneToTime converts the given (valid) PostgreSQL TIME WITH TIME ZONE into a time.Time whose time zone and time are set to valid values All resulting time.Times are guaranteed to be properly comparable among themselves. This implies they are all based on the same date.

func UnwrapArrayElement

func UnwrapArrayElement(in string) string

func VerifyPostgresTables

func VerifyPostgresTables(src *PgSource, transfer *server.Transfer, lgr log.Logger) error

func VerifyPostgresTablesNames

func VerifyPostgresTablesNames(tables []string) error

func WhereClause

func WhereClause(filter abstract.WhereStatement) string

func WithLogger

func WithLogger(connConfig *pgx.ConnConfig, lgr log.Logger) *pgx.ConnConfig

Types

type AbstractSlot

type AbstractSlot interface {
	Init(sinker abstract.AsyncSink) error
	Exist() (bool, error)
	Close()
	Create() error
	Suicide() error
}

func NewLsnTrackedSlot

func NewLsnTrackedSlot(pool *pgxpool.Pool, logger log.Logger, src *PgSource) AbstractSlot

func NewNotTrackedSlot

func NewNotTrackedSlot(pool *pgxpool.Pool, logger log.Logger, src *PgSource) AbstractSlot

func NewSlot

func NewSlot(pool *pgxpool.Pool, logger log.Logger, src *PgSource) (AbstractSlot, error)

type ChangeItemsFetcher

type ChangeItemsFetcher struct {
	// contains filtered or unexported fields
}

ChangeItemsFetcher consolidates multiple objects (of which most important is pgx.Rows) into a single one, providing bufferized fetching and parsing of rows from a postgres source

func NewChangeItemsFetcher

func NewChangeItemsFetcher(rows pgx.Rows, conn *pgx.Conn, template abstract.ChangeItem, sourceStats *stats.SourceStats) *ChangeItemsFetcher

NewChangeItemsFetcher constructs minimal working fetcher. ChangeItems fetched are shallow copies of the template except for ColumnValues - these are filled by the fetcher. The sequence of ColumnValues, ColSchemas (in TableSchema) and rows.FieldDescriptions() MUST BE THE SAME.

func (*ChangeItemsFetcher) Fetch

func (f *ChangeItemsFetcher) Fetch() (items []abstract.ChangeItem, err error)

func (*ChangeItemsFetcher) MaybeHasMore

func (f *ChangeItemsFetcher) MaybeHasMore() bool

func (*ChangeItemsFetcher) Sniffer

func (f *ChangeItemsFetcher) Sniffer() (items []abstract.ChangeItem, err error)

Sniffer you may replace Fetch with Sniffer to get raw data.

func (*ChangeItemsFetcher) WithLimitBytes

func (f *ChangeItemsFetcher) WithLimitBytes(limitBytes server.BytesSize) *ChangeItemsFetcher

WithLimitBytes accepts 0 to disable limit

func (*ChangeItemsFetcher) WithLimitCount

func (f *ChangeItemsFetcher) WithLimitCount(limitCount int) *ChangeItemsFetcher

func (*ChangeItemsFetcher) WithLogger

func (f *ChangeItemsFetcher) WithLogger(logger log.Logger) *ChangeItemsFetcher

func (*ChangeItemsFetcher) WithUnmarshallerData

func (f *ChangeItemsFetcher) WithUnmarshallerData(data UnmarshallerData) *ChangeItemsFetcher

type Config

type Config struct {
	config.DBConfig
}

type ConnectionParams

type ConnectionParams struct {
	Host           string
	Port           uint16
	Database       string
	User           string
	Password       model.SecretString
	HasTLS         bool
	CACertificates string
	ClusterID      string
}

func GetConnParamsFromSrc

func GetConnParamsFromSrc(lgr log.Logger, src *PgSource) (*ConnectionParams, error)

type DBFlavour

type DBFlavour interface {
	PgClassFilter() string
	PgClassRelsOnlyFilter() string
	// ListSchemaQuery returns a query with TWO placeholders if withSpecificTable is true; with ZERO placeholders otherwise
	ListSchemaQuery(excludeViews bool, withSpecificTable bool, forbiddenSchemas []string, forbiddenTables []string) string
	// ListTablesQuery returns a query with zero placeholders
	ListTablesQuery(excludeViews bool, forbiddenSchemas []string, forbiddenTables []string) string
}

type DataTypesOption

type DataTypesOption interface {
	// contains filtered or unexported methods
}

go-sumtype:decl DataTypesOption

type Date

type Date struct {
	pgtype.Date
}

func NewDate

func NewDate() *Date

NewDate constructs a DATE representation which supports BC years

TODO: remove this when https://st.yandex-team.ru/TM-5127 is done

func (*Date) DecodeText

func (t *Date) DecodeText(ci *pgtype.ConnInfo, src []byte) error

func (*Date) HomoValue

func (t *Date) HomoValue() any

func (*Date) Value

func (t *Date) Value() (driver.Value, error)

type DefaultDataType

type DefaultDataType struct{ pgtype.Value }

The Go type for PostgreSQL types of which pgx driver is unaware. Default is *pgx.GenericText. The implementation type must be a pointer (this is required by pgx). May be nil pointer.

type DuplicatesPolicy

type DuplicatesPolicy string

type FieldDescription

type FieldDescription struct {
	Name                 string
	TableOID             uint32
	TableAttributeNumber uint16
	DataTypeOID          uint32
	DataTypeSize         int16
	TypeModifier         int32
	Format               int16
}

type GenericArray

type GenericArray struct {
	// contains filtered or unexported fields
}

GenericArray is based on ArrayType https://github.com/doublecloud/transfer/arc_vcs/vendor/github.com/jackc/pgtype/array_type.go?rev=r8263453#L15

func NewGenericArray

func NewGenericArray(logger log.Logger, typeName string, elementOID pgtype.OID, newElement func() (pgtype.Value, error)) (*GenericArray, error)

func (*GenericArray) AssignTo

func (ga *GenericArray) AssignTo(_ interface{}) error

func (*GenericArray) DecodeBinary

func (ga *GenericArray) DecodeBinary(ci *pgtype.ConnInfo, src []byte) error

func (*GenericArray) DecodeText

func (ga *GenericArray) DecodeText(ci *pgtype.ConnInfo, src []byte) error

func (*GenericArray) EncodeBinary

func (ga *GenericArray) EncodeBinary(ci *pgtype.ConnInfo, buf []byte) ([]byte, error)

func (*GenericArray) EncodeText

func (ga *GenericArray) EncodeText(ci *pgtype.ConnInfo, buf []byte) ([]byte, error)

func (*GenericArray) ExtractValue

func (ga *GenericArray) ExtractValue(connInfo *pgtype.ConnInfo) (interface{}, error)

func (*GenericArray) Get

func (ga *GenericArray) Get() interface{}

func (*GenericArray) GetValue

func (ga *GenericArray) GetValue(connInfo *pgtype.ConnInfo) (interface{}, error)

func (*GenericArray) NewTypeValue

func (ga *GenericArray) NewTypeValue() pgtype.Value

func (*GenericArray) NewTypeValueImpl

func (ga *GenericArray) NewTypeValueImpl() *GenericArray

func (*GenericArray) Scan

func (ga *GenericArray) Scan(_ interface{}) error

func (*GenericArray) Set

func (ga *GenericArray) Set(interface{}) error

func (*GenericArray) TypeName

func (ga *GenericArray) TypeName() string

func (*GenericArray) Unpack

func (ga *GenericArray) Unpack(connInfo *pgtype.ConnInfo) (*util.XDArray, error)

func (*GenericArray) Value

func (ga *GenericArray) Value() (driver.Value, error)

type Keeper

type Keeper struct {
	CloseSign chan bool
	// contains filtered or unexported fields
}

func NewKeeper

func NewKeeper(conn *pgxpool.Pool, logger log.Logger, schema string) (*Keeper, error)

func (*Keeper) Init

func (k *Keeper) Init(sink abstract.AsyncSink) error

func (*Keeper) Lock

func (k *Keeper) Lock(consumer string) (chan bool, error)

func (*Keeper) Stop

func (k *Keeper) Stop()

type LsnTrackedSlot

type LsnTrackedSlot struct {
	Conn *pgxpool.Pool
	// contains filtered or unexported fields
}

func (*LsnTrackedSlot) CheckMonotonic

func (l *LsnTrackedSlot) CheckMonotonic(ci abstract.ChangeItem) (bool, error)

func (*LsnTrackedSlot) Close

func (l *LsnTrackedSlot) Close()

func (*LsnTrackedSlot) Create

func (l *LsnTrackedSlot) Create() error

func (*LsnTrackedSlot) Exist

func (l *LsnTrackedSlot) Exist() (bool, error)

func (*LsnTrackedSlot) Init

func (l *LsnTrackedSlot) Init(sink abstract.AsyncSink) error

func (*LsnTrackedSlot) Move

func (l *LsnTrackedSlot) Move(lsn string) error

func (*LsnTrackedSlot) Run

func (l *LsnTrackedSlot) Run()

func (*LsnTrackedSlot) Suicide

func (l *LsnTrackedSlot) Suicide() error

func (*LsnTrackedSlot) UpdateReplicated

func (l *LsnTrackedSlot) UpdateReplicated(iter int, lsn uint64) error

type Metrics

type Metrics struct{ metrics.Registry }

type OldKeysType

type OldKeysType struct {
	abstract.OldKeysType
	KeyTypeOids []pgtype.OID `json:"keytypeoids"`
}

type PGTableLockMode

type PGTableLockMode string
const (
	AccessShareLockMode          PGTableLockMode = "ACCESS SHARE"
	RowShareLockMode             PGTableLockMode = "ROW SHARE"
	RowExclusiveLockMode         PGTableLockMode = "ROW EXCLUSIVE"
	ShareUpdateExclusiveLockMode PGTableLockMode = "SHARE UPDATE EXCLUSIVE"
	ShareLockMode                PGTableLockMode = "SHARE"
	ShareRowExclusiveLockMode    PGTableLockMode = "SHARE ROW EXCLUSIVE"
	ExclusiveLockMode            PGTableLockMode = "EXCLUSIVE"
	AccessExclusiveLockMode      PGTableLockMode = "ACCESS EXCLUSIVE"
)

type PgDestination

type PgDestination struct {
	// oneof
	ClusterID string `json:"Cluster"`
	Host      string // legacy field for back compatibility; for now, we are using only 'Hosts' field
	Hosts     []string

	Database               string `json:"Name"`
	User                   string
	Password               server.SecretString
	Port                   int
	TLSFile                string
	MaintainTables         bool
	AllowDuplicates        bool
	LoozeMode              bool
	IgnoreUniqueConstraint bool
	Tables                 map[string]string
	TransformerConfig      map[string]string
	SubNetworkID           string
	SecurityGroupIDs       []string
	CopyUpload             bool // THIS IS NOT PARAMETER. If you set it on endpoint into true/false - nothing happened. It's workaround, this flag is set by common code (Activate/UploadTable) automatically. You have not options to turn-off CopyUpload behaviour.
	PerTransactionPush     bool
	Cleanup                server.CleanupType
	BatchSize              int // deprecated: use BufferTriggingSize instead
	BufferTriggingSize     uint64
	BufferTriggingInterval time.Duration
	QueryTimeout           time.Duration
	DisableSQLFallback     bool
	ConnectionID           string
}

func (*PgDestination) AllHosts

func (d *PgDestination) AllHosts() []string

AllHosts - function to move from legacy 'Host' into modern 'Hosts'

func (*PgDestination) BuffererConfig

func (d *PgDestination) BuffererConfig() bufferer.BuffererConfig

func (*PgDestination) CleanupMode

func (d *PgDestination) CleanupMode() server.CleanupType

func (*PgDestination) FillDependentFields

func (d *PgDestination) FillDependentFields(transfer *server.Transfer)

func (*PgDestination) GetConnectionID

func (d *PgDestination) GetConnectionID() string

func (*PgDestination) GetProviderType

func (d *PgDestination) GetProviderType() abstract.ProviderType

func (*PgDestination) HasTLS

func (d *PgDestination) HasTLS() bool

func (PgDestination) IsDestination

func (PgDestination) IsDestination()

func (*PgDestination) MDBClusterID

func (d *PgDestination) MDBClusterID() string

func (*PgDestination) ReliesOnSystemTablesTransferring

func (d *PgDestination) ReliesOnSystemTablesTransferring() bool

func (*PgDestination) ToSinkParams

func (d *PgDestination) ToSinkParams() PgDestinationWrapper

func (*PgDestination) ToStorageParams

func (d *PgDestination) ToStorageParams() *PgStorageParams

func (*PgDestination) Transformer

func (d *PgDestination) Transformer() map[string]string

func (*PgDestination) Validate

func (d *PgDestination) Validate() error

func (*PgDestination) WithDefaults

func (d *PgDestination) WithDefaults()

type PgDestinationWrapper

type PgDestinationWrapper struct {
	Model *PgDestination
}

func (PgDestinationWrapper) AllHosts

func (d PgDestinationWrapper) AllHosts() []string

func (PgDestinationWrapper) CleanupMode

func (d PgDestinationWrapper) CleanupMode() server.CleanupType

func (PgDestinationWrapper) ClusterID

func (d PgDestinationWrapper) ClusterID() string

func (PgDestinationWrapper) ConnectionID

func (d PgDestinationWrapper) ConnectionID() string

func (PgDestinationWrapper) CopyUpload

func (d PgDestinationWrapper) CopyUpload() bool

func (PgDestinationWrapper) Database

func (d PgDestinationWrapper) Database() string

func (PgDestinationWrapper) DisableSQLFallback

func (d PgDestinationWrapper) DisableSQLFallback() bool

func (PgDestinationWrapper) HasTLS

func (d PgDestinationWrapper) HasTLS() bool

func (PgDestinationWrapper) IgnoreUniqueConstraint

func (d PgDestinationWrapper) IgnoreUniqueConstraint() bool

func (PgDestinationWrapper) LoozeMode

func (d PgDestinationWrapper) LoozeMode() bool

func (PgDestinationWrapper) MaintainTables

func (d PgDestinationWrapper) MaintainTables() bool

func (PgDestinationWrapper) Password

func (d PgDestinationWrapper) Password() string

func (PgDestinationWrapper) PerTransactionPush

func (d PgDestinationWrapper) PerTransactionPush() bool

func (PgDestinationWrapper) Port

func (d PgDestinationWrapper) Port() int

func (PgDestinationWrapper) QueryTimeout

func (d PgDestinationWrapper) QueryTimeout() time.Duration

func (PgDestinationWrapper) TLSFile

func (d PgDestinationWrapper) TLSFile() string

func (PgDestinationWrapper) Tables

func (d PgDestinationWrapper) Tables() map[string]string

func (PgDestinationWrapper) User

func (d PgDestinationWrapper) User() string

type PgDumpSteps

type PgDumpSteps struct {
	Table, PrimaryKey, View, Sequence         bool
	SequenceOwnedBy, Rule, Type               bool
	Constraint, FkConstraint, Index, Function bool
	Collation, Trigger, Policy, Cast          bool
	Default                                   bool
	MaterializedView                          bool
	SequenceSet                               *bool
	TableAttach                               bool
	IndexAttach                               bool
}

func DefaultPgDumpPostSteps

func DefaultPgDumpPostSteps() *PgDumpSteps

func DefaultPgDumpPreSteps

func DefaultPgDumpPreSteps() *PgDumpSteps

func (*PgDumpSteps) AnyStepIsTrue

func (p *PgDumpSteps) AnyStepIsTrue() bool

func (*PgDumpSteps) List

func (p *PgDumpSteps) List() []string

type PgErrorCode

type PgErrorCode string
const (
	ErrcUniqueViolation              PgErrorCode = "23505"
	ErrcWrongObjectType              PgErrorCode = "42809"
	ErrcRelationDoesNotExists        PgErrorCode = "42P01"
	ErrcSchemaDoesNotExists          PgErrorCode = "3F000"
	ErrcObjectNotInPrerequisiteState PgErrorCode = "55000"
	ErrcInvalidPassword              PgErrorCode = "28P01"
	ErrcInvalidAuthSpec              PgErrorCode = "28000"
)

PostgreSQL error codes from https://www.postgresql.org/docs/12/errcodes-appendix.html

type PgObjectType

type PgObjectType string
const (
	Table            PgObjectType = "TABLE"
	TableAttach      PgObjectType = "TABLE_ATTACH"
	PrimaryKey       PgObjectType = "PRIMARY_KEY"
	View             PgObjectType = "VIEW"
	Sequence         PgObjectType = "SEQUENCE"
	SequenceSet      PgObjectType = "SEQUENCE_SET"
	SequenceOwnedBy  PgObjectType = "SEQUENCE_OWNED_BY"
	Rule             PgObjectType = "RULE"
	Type             PgObjectType = "TYPE"
	Constraint       PgObjectType = "CONSTRAINT"
	FkConstraint     PgObjectType = "FK_CONSTRAINT"
	Index            PgObjectType = "INDEX"
	IndexAttach      PgObjectType = "INDEX_ATTACH"
	Function         PgObjectType = "FUNCTION"
	Collation        PgObjectType = "COLLATION"
	Trigger          PgObjectType = "TRIGGER"
	Policy           PgObjectType = "POLICY"
	Cast             PgObjectType = "CAST"
	MaterializedView PgObjectType = "MATERIALIZED_VIEW"
)

func (PgObjectType) IsValid

func (pot PgObjectType) IsValid() error

type PgSerializationFormat

type PgSerializationFormat string

type PgSinkParams

type PgSinkParams interface {
	ClusterID() string
	AllHosts() []string
	Port() int
	Database() string
	User() string
	Password() string
	HasTLS() bool
	TLSFile() string

	// MaintainTables
	// If true - on every batchInsert calls 'create schema...' + 'create table ...'
	// For now, it's not true on every running-transfer
	// It's for lb->pg delivery. Can be auto-derived
	// It's like legacy
	// private option
	MaintainTables() bool
	// PerTransactionPush
	// It's 'SaveTxBoundaries' from proto-spec
	PerTransactionPush() bool
	// LoozeMode
	// If 'true' - when error occurs, we are logging error, and return nil. So it's 'lose data if error'
	// private option
	LoozeMode() bool
	CleanupMode() server.CleanupType
	// Tables
	// It's altnames source->destination
	// private option
	Tables() map[string]string
	// CopyUpload
	// use mechanism 'CopyUpload' for inserts
	CopyUpload() bool
	// IgnoreUniqueConstraint
	// Ignore 'uniqueViolation' error - just go further
	// private option
	IgnoreUniqueConstraint() bool
	// QueryTimeout returns the timeout for query execution by this sink
	QueryTimeout() time.Duration
	// DisableSQLFallback returns true if the sink should never use SQL when copying snapshot and should always use "COPY FROM"
	DisableSQLFallback() bool
	ConnectionID() string
}

type PgSource

type PgSource struct {
	// oneof
	ClusterID string `json:"Cluster"`
	Host      string // legacy field for back compatibility; for now, we are using only 'Hosts' field
	Hosts     []string

	Database                    string
	User                        string
	Password                    server.SecretString
	Port                        int
	DBTables                    []string
	BatchSize                   uint32 // BatchSize is a limit on the number of rows in the replication (not snapshot) source internal buffer
	SlotID                      string
	SlotByteLagLimit            int64
	TLSFile                     string
	KeeperSchema                string
	SubNetworkID                string
	SecurityGroupIDs            []string
	CollapseInheritTables       bool
	UsePolling                  bool
	ExcludedTables              []string
	IsHomo                      bool
	NoHomo                      bool // force hetero relations instead of homo-haram-stuff
	AutoActivate                bool
	PreSteps                    *PgDumpSteps
	PostSteps                   *PgDumpSteps
	UseFakePrimaryKey           bool
	IgnoreUserTypes             bool
	IgnoreUnknownTables         bool             // see: if table schema unknown - ignore it TM-3104 and TM-2934
	MaxBufferSize               server.BytesSize // Deprecated: is not used anymore
	ExcludeDescendants          bool             // Deprecated: is not used more, use CollapseInheritTables instead
	DesiredTableSize            uint64           // desired table part size for snapshot sharding
	SnapshotDegreeOfParallelism int              // desired table parts count for snapshot sharding
	EmitTimeTypes               bool             // Deprecated: is not used anymore

	DBLogEnabled bool   // force DBLog snapshot instead of common
	ChunkSize    uint64 // number of rows in chunk, this field needed for DBLog snapshot, if it is 0, it will be calculated automatically

	// Whether text or binary serialization format should be used when readeing
	// snapshot from PostgreSQL storage snapshot (see
	// https://www.postgresql.org/docs/current/protocol-overview.html#PROTOCOL-FORMAT-CODES).
	// If not specified (i.e. equal to PgSerializationFormatAuto), defaults to
	// "binary" for homogeneous pg->pg transfer, and to "text" in all other
	// cases. Binary is preferred for pg->pg transfers since we use CopyFrom
	// function from pgx driver and it requires all values to be binary
	// serializable.
	SnapshotSerializationFormat PgSerializationFormat
	ShardingKeyFields           map[string][]string
	PgDumpCommand               []string
	ConnectionID                string
}

func (*PgSource) AllHosts

func (s *PgSource) AllHosts() []string

AllHosts - function to move from legacy 'Host' into modern 'Hosts'

func (*PgSource) AllIncludes

func (s *PgSource) AllIncludes() []string

func (*PgSource) AuxTables

func (s *PgSource) AuxTables() []string

func (*PgSource) ExcludeWithGlobals

func (s *PgSource) ExcludeWithGlobals() []string

func (*PgSource) ExtraTransformers

func (s *PgSource) ExtraTransformers(ctx context.Context, transfer *server.Transfer, registry metrics.Registry) ([]abstract.Transformer, error)

func (*PgSource) FillDependentFields

func (s *PgSource) FillDependentFields(transfer *server.Transfer)

func (*PgSource) FulfilledIncludes

func (s *PgSource) FulfilledIncludes(tID abstract.TableID) (result []string)

func (*PgSource) GetConnectionID

func (s *PgSource) GetConnectionID() string

func (*PgSource) GetProviderType

func (s *PgSource) GetProviderType() abstract.ProviderType

func (*PgSource) HasTLS

func (s *PgSource) HasTLS() bool

func (*PgSource) Include

func (s *PgSource) Include(tID abstract.TableID) bool

func (*PgSource) IsIncremental

func (*PgSource) IsIncremental()

func (*PgSource) IsSource

func (*PgSource) IsSource()

func (*PgSource) IsStrictSource

func (*PgSource) IsStrictSource()

func (*PgSource) MDBClusterID

func (s *PgSource) MDBClusterID() string

func (*PgSource) SupportsStartCursorValue

func (*PgSource) SupportsStartCursorValue() bool

func (*PgSource) ToSinkParams

func (s *PgSource) ToSinkParams() PgSourceWrapper

func (*PgSource) ToStorageParams

func (s *PgSource) ToStorageParams(transfer *server.Transfer) *PgStorageParams

func (*PgSource) Validate

func (s *PgSource) Validate() error

func (*PgSource) WithDefaults

func (s *PgSource) WithDefaults()

type PgSourceWrapper

type PgSourceWrapper struct {
	Model *PgSource
	// contains filtered or unexported fields
}

func (PgSourceWrapper) AllHosts

func (d PgSourceWrapper) AllHosts() []string

func (PgSourceWrapper) CleanupMode

func (d PgSourceWrapper) CleanupMode() server.CleanupType

func (PgSourceWrapper) ClusterID

func (d PgSourceWrapper) ClusterID() string

func (PgSourceWrapper) ConnectionID

func (d PgSourceWrapper) ConnectionID() string

func (PgSourceWrapper) CopyUpload

func (d PgSourceWrapper) CopyUpload() bool

func (PgSourceWrapper) Database

func (d PgSourceWrapper) Database() string

func (PgSourceWrapper) DisableSQLFallback

func (d PgSourceWrapper) DisableSQLFallback() bool

func (PgSourceWrapper) HasTLS

func (d PgSourceWrapper) HasTLS() bool

func (PgSourceWrapper) IgnoreUniqueConstraint

func (d PgSourceWrapper) IgnoreUniqueConstraint() bool

func (PgSourceWrapper) LoozeMode

func (d PgSourceWrapper) LoozeMode() bool

func (PgSourceWrapper) MaintainTables

func (d PgSourceWrapper) MaintainTables() bool

func (PgSourceWrapper) Password

func (d PgSourceWrapper) Password() string

func (PgSourceWrapper) PerTransactionPush

func (d PgSourceWrapper) PerTransactionPush() bool

func (PgSourceWrapper) Port

func (d PgSourceWrapper) Port() int

func (PgSourceWrapper) QueryTimeout

func (d PgSourceWrapper) QueryTimeout() time.Duration

func (*PgSourceWrapper) SetMaintainTables

func (d *PgSourceWrapper) SetMaintainTables(maintainTables bool)

func (PgSourceWrapper) TLSFile

func (d PgSourceWrapper) TLSFile() string

func (PgSourceWrapper) Tables

func (d PgSourceWrapper) Tables() map[string]string

func (PgSourceWrapper) User

func (d PgSourceWrapper) User() string

type PgStorageParams

type PgStorageParams struct {
	AllHosts                    []string // should be non-empty only one field: Hosts/ClusterID
	Port                        int
	User                        string
	Password                    string
	Database                    string
	ClusterID                   string // should be non-empty only one field: Hosts/ClusterID
	TLSFile                     string
	UseFakePrimaryKey           bool
	DBFilter                    []string
	IgnoreUserTypes             bool
	PreferReplica               bool // has the meaning only if ClusterID not empty. Choosing replica is implemented via mdb api. If set as 'true' - it expects you initialized dbaas.Initialize*Cloud
	ExcludeDescendants          bool
	DesiredTableSize            uint64
	SnapshotDegreeOfParallelism int
	ConnString                  string // used in greenplum
	TableFilter                 abstract.Includeable
	TryHostCACertificates       bool // will force SSL connection with host-provided SSL certificates
	UseBinarySerialization      bool // Whether binary serialization format should be used. Defaults to true in homogeneous pg->pg transfers.
	SlotID                      string
	ShardingKeyFields           map[string][]string
	ConnectionID                string
}

func (*PgStorageParams) HasTLS

func (p *PgStorageParams) HasTLS() bool

func (*PgStorageParams) String

func (p *PgStorageParams) String() string

func (*PgStorageParams) TLSConfigTemplate

func (p *PgStorageParams) TLSConfigTemplate() (*tls.Config, error)

TLSConfigTemplate returns a TLS configuration template without ServerName set. It returns nil when a no-TLS connection is requested by the configuration.

type PgVersion

type PgVersion struct {
	Is9x, Is10x, Is11x, Is12x, Is13x, Is14x bool
	Version                                 string
}

func NewPgVersion

func NewPgVersion(version string) PgVersion

func ResolveVersion

func ResolveVersion(pool *pgxpool.Pool) PgVersion

type Plugin

type Plugin string

type PostgreSQLFlavour

type PostgreSQLFlavour struct{}

func NewPostgreSQLFlavour

func NewPostgreSQLFlavour() *PostgreSQLFlavour

func (*PostgreSQLFlavour) ListSchemaQuery

func (f *PostgreSQLFlavour) ListSchemaQuery(excludeViews bool, withSpecificTable bool, forbiddenSchemas []string, forbiddenTables []string) string

func (*PostgreSQLFlavour) ListTablesQuery

func (f *PostgreSQLFlavour) ListTablesQuery(excludeViews bool, forbiddenSchemas []string, forbiddenTables []string) string

func (*PostgreSQLFlavour) PgClassFilter

func (f *PostgreSQLFlavour) PgClassFilter() string

func (*PostgreSQLFlavour) PgClassRelsOnlyFilter

func (f *PostgreSQLFlavour) PgClassRelsOnlyFilter() string

type PostgresSlotKiller

type PostgresSlotKiller struct {
	Slot AbstractSlot
}

func (*PostgresSlotKiller) KillSlot

func (k *PostgresSlotKiller) KillSlot() error

type Provider

type Provider struct {
	// contains filtered or unexported fields
}

func (*Provider) Activate

func (*Provider) Cleanup

func (p *Provider) Cleanup(ctx context.Context, task *server.TransferOperation) error

func (*Provider) DBLogUpload

func (p *Provider) DBLogUpload(src *PgSource, tables abstract.TableMap) error

func (*Provider) Deactivate

func (p *Provider) Deactivate(ctx context.Context, task *server.TransferOperation) error

func (*Provider) DestinationSampleableStorage

func (p *Provider) DestinationSampleableStorage() (abstract.SampleableStorage, error)

func (*Provider) Sink

func (p *Provider) Sink(config middlewares.Config) (abstract.Sinker, error)

func (*Provider) Source

func (p *Provider) Source() (abstract.Source, error)

func (*Provider) SourceSampleableStorage

func (p *Provider) SourceSampleableStorage() (abstract.SampleableStorage, []abstract.TableDescription, error)

func (*Provider) Storage

func (p *Provider) Storage() (abstract.Storage, error)

func (*Provider) Type

func (p *Provider) Type() abstract.ProviderType

func (*Provider) Verify

func (p *Provider) Verify(ctx context.Context) error

type SchemaExtractor

type SchemaExtractor struct {
	// contains filtered or unexported fields
}

func NewSchemaExtractor

func NewSchemaExtractor() *SchemaExtractor

func (*SchemaExtractor) FindDependentViews

func (e *SchemaExtractor) FindDependentViews(ctx context.Context, conn *pgx.Conn, tables abstract.TableMap) (map[abstract.TableID][]abstract.TableID, error)

func (*SchemaExtractor) LoadSchema

func (e *SchemaExtractor) LoadSchema(ctx context.Context, conn *pgx.Conn, specificTable *abstract.TableID) (abstract.DBSchema, error)

LoadSchema returns a settings-customized schema(s) of table(s) in PostgreSQL

func (*SchemaExtractor) TablesList

func (e *SchemaExtractor) TablesList(ctx context.Context, conn *pgx.Conn) ([]tableIDWithInfo, time.Time, error)

TablesList returns a list of basic information pieces about all tables in the given schema

func (*SchemaExtractor) WithExcludeViews

func (e *SchemaExtractor) WithExcludeViews(excludeViews bool) *SchemaExtractor

func (*SchemaExtractor) WithFlavour

func (e *SchemaExtractor) WithFlavour(flavour DBFlavour) *SchemaExtractor

func (*SchemaExtractor) WithForbiddenSchemas

func (e *SchemaExtractor) WithForbiddenSchemas(forbiddenSchemas []string) *SchemaExtractor

func (*SchemaExtractor) WithForbiddenTables

func (e *SchemaExtractor) WithForbiddenTables(forbiddenTables []string) *SchemaExtractor

func (*SchemaExtractor) WithLogger

func (e *SchemaExtractor) WithLogger(logger log.Logger) *SchemaExtractor

func (*SchemaExtractor) WithUseFakePrimaryKey

func (e *SchemaExtractor) WithUseFakePrimaryKey(useFakePrimaryKey bool) *SchemaExtractor

type SequenceInfo

type SequenceInfo struct {
	SequenceID      abstract.TableID
	DependentTables []abstract.TableID
}

SequenceInfo is a description of a PostgreSQL sequence

type SequenceMap

type SequenceMap map[abstract.TableID]*SequenceInfo

SequenceMap is a mapping of sequence identifiers to their descriptions

func ListSequencesWithDependants

func ListSequencesWithDependants(ctx context.Context, conn *pgx.Conn, serviceSchema string) (SequenceMap, error)

ListSequencesWithDependants returns a mapping with sequence information.

This method executes CREATE FUNCTION and DROP FUNCTION statements.

type Slot

type Slot struct {
	// contains filtered or unexported fields
}

func (*Slot) Close

func (slot *Slot) Close()

func (*Slot) Create

func (slot *Slot) Create() error

func (*Slot) Exist

func (slot *Slot) Exist() (bool, error)

func (*Slot) Init

func (slot *Slot) Init(sink abstract.AsyncSink) error

func (*Slot) Suicide

func (slot *Slot) Suicide() error

func (*Slot) SuicideImpl

func (slot *Slot) SuicideImpl() error

type SlotMonitor

type SlotMonitor struct {
	// contains filtered or unexported fields
}

func NewSlotMonitor

func NewSlotMonitor(conn *pgxpool.Pool, slotName string, slotDatabaseName string, metrics *stats.SourceStats, logger log.Logger) *SlotMonitor

NewSlotMonitor constructs a slot monitor, but does NOT start it.

The provided pool is NOT closed by the monitor. It is the caller's responsibility to manage the pool.

func (*SlotMonitor) Close

func (m *SlotMonitor) Close()

func (*SlotMonitor) StartSlotMonitoring

func (m *SlotMonitor) StartSlotMonitoring(maxSlotByteLag int64) <-chan error

type SnapshotKey

type SnapshotKey string

func (SnapshotKey) String

func (c SnapshotKey) String() string

type SnapshotState

type SnapshotState struct {
	SnapshotLSN string
	SnapshotTS  int64
}

type SortOrder

type SortOrder string

type Storage

type Storage struct {
	Config *PgStorageParams
	Conn   *pgxpool.Pool

	IsHomo bool

	ForbiddenSchemas []string
	ForbiddenTables  []string
	Flavour          DBFlavour

	ShardedStateLSN string
	ShardedStateTS  time.Time
	// contains filtered or unexported fields
}

func NewStorage

func NewStorage(config *PgStorageParams, opts ...StorageOpt) (*Storage, error)

func (*Storage) BeginPGSnapshot

func (s *Storage) BeginPGSnapshot(ctx context.Context) error

Named BeginPGSnapshot to NOT match abstract.SnapshotableStorage

func (*Storage) BuildTypeMapping

func (s *Storage) BuildTypeMapping(ctx context.Context) (TypeNameToOIDMap, error)

func (*Storage) Close

func (s *Storage) Close()

func (*Storage) EndPGSnapshot

func (s *Storage) EndPGSnapshot(ctx context.Context) error

Named EndPGSnapshot to NOT match abstract.SnapshotableStorage

func (*Storage) EstimateTableRowsCount

func (s *Storage) EstimateTableRowsCount(table abstract.TableID) (uint64, error)

func (*Storage) ExactTableDescriptionRowsCount

func (s *Storage) ExactTableDescriptionRowsCount(ctx context.Context, table abstract.TableDescription, timeout time.Duration) (uint64, error)

func (*Storage) ExactTableRowsCount

func (s *Storage) ExactTableRowsCount(table abstract.TableID) (uint64, error)

func (*Storage) GetIncrementalState

func (s *Storage) GetIncrementalState(ctx context.Context, incremental []abstract.IncrementalTable) ([]abstract.TableDescription, error)

func (*Storage) GetInheritedTables

func (s *Storage) GetInheritedTables(ctx context.Context) (map[abstract.TableID]abstract.TableID, error)

func (*Storage) LoadQueryTable

func (s *Storage) LoadQueryTable(ctx context.Context, tableQuery tablequery.TableQuery, pusher abstract.Pusher) error

func (*Storage) LoadRandomSample

func (s *Storage) LoadRandomSample(table abstract.TableDescription, pusher abstract.Pusher) error

func (*Storage) LoadSampleBySet

func (s *Storage) LoadSampleBySet(table abstract.TableDescription, keySet []map[string]interface{}, pusher abstract.Pusher) error

func (*Storage) LoadSchema

func (s *Storage) LoadSchema() (abstract.DBSchema, error)

func (*Storage) LoadSchemaForTable

func (s *Storage) LoadSchemaForTable(ctx context.Context, conn *pgx.Conn, table abstract.TableDescription) (*abstract.TableSchema, error)

func (*Storage) LoadTable

func (s *Storage) LoadTable(ctx context.Context, table abstract.TableDescription, pusher abstract.Pusher) error

func (*Storage) LoadTopBottomSample

func (s *Storage) LoadTopBottomSample(table abstract.TableDescription, pusher abstract.Pusher) error

func (*Storage) OrderedRead

func (s *Storage) OrderedRead(
	t *abstract.TableDescription,
	schemas []abstract.ColSchema,
	sortOrder SortOrder,
	filter abstract.WhereStatement,
	duplicatesPolicy DuplicatesPolicy,
	excludeDescendants bool,
) string

func (*Storage) Ping

func (s *Storage) Ping() error

func (*Storage) RunSlotMonitor

func (s *Storage) RunSlotMonitor(ctx context.Context, serverSource interface{}, registry metrics.Registry) (abstract.SlotKiller, <-chan error, error)

func (*Storage) SetInitialState

func (s *Storage) SetInitialState(tables []abstract.TableDescription, incrementalTables []abstract.IncrementalTable)

func (*Storage) SetLoadDescending

func (s *Storage) SetLoadDescending(loadDescending bool)

func (*Storage) SetShardingContext

func (s *Storage) SetShardingContext(shardedState []byte) error

func (*Storage) ShardTable

function 'ShardTable' tries to shard one table/view into parts (split task for snapshotting one table/view into parts):

TableDescription -> []TableDescription

who can be sharded & who can't be sharded:

  • tables with non-empty 'Offset' - not sharded
  • tables with non-empty 'Filter' (dolivochki or ad-hoc upload_table) - sharded
  • views where filled explicitKeys - sharded

func (*Storage) ShardingContext

func (s *Storage) ShardingContext() ([]byte, error)

func (*Storage) SnapshotLSN

func (s *Storage) SnapshotLSN() string

func (*Storage) TableAccessible

func (s *Storage) TableAccessible(table abstract.TableDescription) bool

func (*Storage) TableExists

func (s *Storage) TableExists(table abstract.TableID) (bool, error)

func (*Storage) TableList

func (s *Storage) TableList(filter abstract.IncludeTableList) (abstract.TableMap, error)

TableList in PostgreSQL returns a table map with schema

func (*Storage) TableSchema

func (s *Storage) TableSchema(ctx context.Context, table abstract.TableID) (*abstract.TableSchema, error)

func (*Storage) TableSizeInBytes

func (s *Storage) TableSizeInBytes(table abstract.TableID) (uint64, error)

func (*Storage) WorkersSnapshotState

func (s *Storage) WorkersSnapshotState() *SnapshotState

type StorageOpt

type StorageOpt interface {
	// contains filtered or unexported methods
}

go-sumtype:decl StorageOpt

func WithMetrics

func WithMetrics(registry metrics.Registry) StorageOpt

func WithTypeMapping

func WithTypeMapping(typeMapping TypeNameToOIDMap) StorageOpt

type TextDecoderAndValuerWithHomo

type TextDecoderAndValuerWithHomo interface {
	pgtype.TextDecoder
	driver.Valuer
	abstract.HomoValuer
}

type Timestamp

type Timestamp struct {
	pgtype.Timestamp
	// contains filtered or unexported fields
}

func NewTimestamp

func NewTimestamp(tz *time.Location) *Timestamp

NewTimestamp constructs a TIMESTAMP WITHOUT TIME ZONE representation which supports BC years

TODO: this type must become significantly simpler after https://st.yandex-team.ru/TM-5127 is done

func (*Timestamp) DecodeText

func (t *Timestamp) DecodeText(ci *pgtype.ConnInfo, src []byte) error

func (*Timestamp) HomoValue

func (t *Timestamp) HomoValue() any

func (*Timestamp) Value

func (t *Timestamp) Value() (driver.Value, error)

type Timestamptz

type Timestamptz struct {
	pgtype.Timestamptz
}

func NewTimestamptz

func NewTimestamptz() *Timestamptz

NewTimestamptz constructs a TIMESTAMP WITH TIME ZONE representation which supports BC years

TODO: remove this when https://st.yandex-team.ru/TM-5127 is done

func (*Timestamptz) DecodeText

func (t *Timestamptz) DecodeText(ci *pgtype.ConnInfo, src []byte) error

func (*Timestamptz) HomoValue

func (t *Timestamptz) HomoValue() any

func (*Timestamptz) Value

func (t *Timestamptz) Value() (driver.Value, error)

type TypeFullName

type TypeFullName struct {
	Namespace string // typnamespace from pg_type
	Name      string // typname from pg_type
}

Text representation of PostgreSQL type from pg_type catalog, opposed to OID: https://www.postgresql.org/docs/current/catalog-pg-type.html

type TypeMapping

type TypeMapping struct{ TypeNameToOIDMap }

type TypeNameToOIDMap

type TypeNameToOIDMap map[TypeFullName]pgtype.OID

type Unmarshaller

type Unmarshaller struct {
	// contains filtered or unexported fields
}

Unmarshaller converts data from the PostgreSQL format to the Data Transfer format. The format differs for homogenous and heterogenous transfers, but the same unmarshaller is used in both cases.

func NewUnmarshaller

func NewUnmarshaller(castData *UnmarshallerData, connInfo *pgtype.ConnInfo, schema *abstract.ColSchema, fieldDesc *pgproto3.FieldDescription) (*Unmarshaller, error)

func (*Unmarshaller) Cast

func (c *Unmarshaller) Cast(input []byte) (any, error)

Cast consumes raw SELECT output and produces a valid ChangeItem.Value

type UnmarshallerData

type UnmarshallerData struct {
	// contains filtered or unexported fields
}

func MakeUnmarshallerData

func MakeUnmarshallerData(isHomo bool, conn *pgx.Conn) UnmarshallerData

type Wal2JSONItem

type Wal2JSONItem struct {
	ID             uint32               `json:"id" yson:"id"`
	LSN            uint64               `json:"nextlsn" yson:"nextlsn"`
	CommitTime     uint64               `json:"commitTime" yson:"commitTime"`
	Counter        int                  `json:"txPosition" yson:"txPosition"`
	Kind           abstract.Kind        `json:"kind" yson:"kind"`
	Schema         string               `json:"schema" yson:"schema"`
	Table          string               `json:"table" yson:"table"`
	PartID         string               `json:"part" yson:"-"`
	ColumnNames    []string             `json:"columnnames" yson:"columnnames"`
	ColumnValues   []interface{}        `json:"columnvalues,omitempty" yson:"columnvalues"`
	TableSchema    []abstract.ColSchema `json:"table_schema,omitempty" yson:"table_schema"`
	OldKeys        OldKeysType          `json:"oldkeys" yson:"oldkeys"`
	TxID           string               `json:"tx_id" yson:"-"`
	Query          string               `json:"query" yson:"-"`
	Size           abstract.EventSize   `json:"-" yson:"-"`
	ColumnTypeOIDs []pgtype.OID         `json:"columntypeoids"`
}

This is basically an abstract.ChangeItem, but with two additional fields: * columntypeoids * oldkeys.keytypeoids Note that historically abstract.ChangeItem's structure was exactly the same as the structure of objects we received from wal2json, since PostgreSQL was one of the first endpoints implemented in DataTransfer. Later ChangeItem had to encompass other databases and become "abstract". There is no point in adding any more PostgreSQL specifics into it, so we use Wal2JSONItem for parsing PostgreSQL changes from wal2json now.

type Wal2JsonParser

type Wal2JsonParser struct {
	// contains filtered or unexported fields
}

func NewWal2JsonParser

func NewWal2JsonParser() *Wal2JsonParser

func (*Wal2JsonParser) Close

func (p *Wal2JsonParser) Close()

func (*Wal2JsonParser) Parse

func (p *Wal2JsonParser) Parse(data []byte) (result []*Wal2JSONItem, err error)

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

func (*Worker) Run

func (w *Worker) Run(sink abstract.AsyncSink) error

func (*Worker) Stop

func (w *Worker) Stop()

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL