greenplum

package
v0.0.0-rc1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 16, 2024 License: Apache-2.0 Imports: 33 Imported by: 0

README

Greenplum: snapshot provider

Термины

  • Snapshot consistency — гарантия, что трансфер направляет в целевую базу каждую строку из исходной базы ровно один раз.
    • Это означает, что при сбое в целевой базе консистентность переданных данных не гарантируется.

Модель сбоя трансфера из Greenplum (snapshot)

Порядок работы трансфера при сбое регулируется настройкой StrongConsistency:

  • true: трансфер не переживает сбой даже одного сегмента в кластере Greenplum, но при успешном завершении гарантирует snapshot consistency.
  • false: трансфер переживает отказ любого количества сегментов, но при успешном завершении гарантирует snapshot consistency при условии отсутствия операций UPDATE и DELETE (а также любых операций, чей эффект эквивалентен эффекту этих операций — например, TRUNCATE) с исходной таблицей, выполненными над этой таблицей во время исполнения трансфера.

Documentation

Index

Constants

View Source
const EtaRowPartialProgress = 1 << 20
View Source
const PingTimeout = 5 * time.Minute
View Source
const (
	ProviderType = abstract.ProviderType("gp")
)
View Source
const SQLStateInRecovery string = "57M02"

Variables

View Source
var WorkersGpConfigContextKey = &WorkersGpConfigContextKeyStruct{}

Functions

func ComposePartialProgressFn

func ComposePartialProgressFn(base abstract.LoadProgress, completedParts uint, totalParts uint, totalEta uint64) abstract.LoadProgress

ComposePartialProgressFn allows to transform progress by part into total progress by multiple parts

func CreateRandDistTableQuery

func CreateRandDistTableQuery(fullTableName string, schema []abstract.ColSchema) (string, error)

func DropTableQuery

func DropTableQuery(tableFQTN string) string

DropTableQuery returns a `DROP TABLE IF EXISTS` SQL query. So the resulting query is "ensuring", not "imperative"

func InsertFromSelectQuery

func InsertFromSelectQuery(tableDst string, tableSrc string, columnNames []string) string

InsertFromSelectQuery returns a `INSERT INTO ... SELECT FROM` SQL query

func InsertQueryColumns

func InsertQueryColumns(ci *abstract.ChangeItem) []string

InsertQueryColumns returns a set of columns (fields, not values) for an INSERT query. Auto-generated columns are removed from the result

func New

func NewSink

func NewSink(transfer *server.Transfer, registry metrics.Registry, lgr log.Logger, config middlewares.Config) (abstract.Sinker, error)

Types

type GPRole

type GPRole string

type GPSegPointer

type GPSegPointer struct {
	// contains filtered or unexported fields
}

func Coordinator

func Coordinator() GPSegPointer

func Segment

func Segment(index int) GPSegPointer

func (GPSegPointer) String

func (s GPSegPointer) String() string

type GpCluster

type GpCluster struct {
	Coordinator *GpHAP
	Segments    []*GpHAP
}

func GpClusterFromGreenplumCluster

func GpClusterFromGreenplumCluster(c *GreenplumCluster) *GpCluster

func (*GpCluster) SegByID

func (s *GpCluster) SegByID(id int) *GpHAP

type GpConnection

type GpConnection struct {
	MDBCluster *MDBClusterCreds
	OnPremises *GpCluster
	Database   string
	User       string
	AuthProps  PgAuthProps
}

func (*GpConnection) Validate

func (c *GpConnection) Validate() error

func (*GpConnection) WithDefaults

func (c *GpConnection) WithDefaults()

type GpDestination

type GpDestination struct {
	Connection GpConnection

	CleanupPolicy server.CleanupType

	SubnetID         string
	SecurityGroupIDs []string

	BufferTriggingSize     uint64
	BufferTriggingInterval time.Duration

	QueryTimeout time.Duration
}

func (*GpDestination) BuffererConfig

func (d *GpDestination) BuffererConfig() bufferer.BuffererConfig

func (*GpDestination) CleanupMode

func (d *GpDestination) CleanupMode() server.CleanupType

func (*GpDestination) GetProviderType

func (d *GpDestination) GetProviderType() abstract.ProviderType

func (*GpDestination) IsDestination

func (d *GpDestination) IsDestination()

func (*GpDestination) MDBClusterID

func (d *GpDestination) MDBClusterID() string

func (*GpDestination) ToGpSource

func (d *GpDestination) ToGpSource() *GpSource

func (*GpDestination) Transformer

func (d *GpDestination) Transformer() map[string]string

func (*GpDestination) Validate

func (d *GpDestination) Validate() error

func (*GpDestination) WithDefaults

func (d *GpDestination) WithDefaults()

type GpHAP

type GpHAP struct {
	Primary *GpHP
	Mirror  *GpHP
}

GpHAP stands for "Greenplum Highly Available host Pair"

func GpHAPFromGreenplumAPIHAPair

func GpHAPFromGreenplumAPIHAPair(hap *GreenplumHAPair) *GpHAP

func GpHAPFromGreenplumUIHAPair

func GpHAPFromGreenplumUIHAPair(hap greenplumHAPair) *GpHAP

func (*GpHAP) String

func (s *GpHAP) String() string

func (*GpHAP) Validate

func (s *GpHAP) Validate() error

type GpHP

type GpHP struct {
	Host string
	Port int
}

GpHP stands for "Greenplum Host/Port"

func NewGpHP

func NewGpHP(host string, port int) *GpHP

func NewGpHpWithMDBReplacement

func NewGpHpWithMDBReplacement(host string, port int) *GpHP

NewGpHpWithMDBReplacement replaces domain names for Cloud Preprod & Prod and returns a new host-port pair

func (*GpHP) String

func (s *GpHP) String() string

func (*GpHP) Valid

func (s *GpHP) Valid() bool

func (*GpHP) Validate

func (s *GpHP) Validate() error

type GpSegAndXID

type GpSegAndXID struct {
	SegmentID int32 `json:"segmentID,omitempty"`
	Xid       int64 `json:"xid,omitempty"`
}

func (*GpSegAndXID) GetSegmentID

func (x *GpSegAndXID) GetSegmentID() int32

func (*GpSegAndXID) GetXid

func (x *GpSegAndXID) GetXid() int64

type GpSource

type GpSource struct {
	Connection       GpConnection
	IncludeTables    []string
	ExcludeTables    []string
	AdvancedProps    GpSourceAdvancedProps
	SubnetID         string
	SecurityGroupIDs []string
}

func (*GpSource) AllIncludes

func (s *GpSource) AllIncludes() []string

func (*GpSource) FulfilledIncludes

func (s *GpSource) FulfilledIncludes(tID abstract.TableID) (result []string)

func (*GpSource) GetProviderType

func (s *GpSource) GetProviderType() abstract.ProviderType

func (*GpSource) Include

func (s *GpSource) Include(tID abstract.TableID) bool

func (*GpSource) IsSource

func (s *GpSource) IsSource()

func (*GpSource) IsStrictSource

func (s *GpSource) IsStrictSource()

func (*GpSource) MDBClusterID

func (s *GpSource) MDBClusterID() string

func (*GpSource) Validate

func (s *GpSource) Validate() error

func (*GpSource) WithDefaults

func (s *GpSource) WithDefaults()

type GpSourceAdvancedProps

type GpSourceAdvancedProps struct {
	// EnforceConsistency enables *enforcement* of consistent snapshot. When it is not set, the user is responsible for snapshot consistency
	EnforceConsistency bool

	ServiceSchema string

	// AllowCoordinatorTxFailure disables coordinator TX monitoring (liveness monitor) and enables the transfer to finish snapshot successfully even if the coordinator TX fails
	AllowCoordinatorTxFailure    bool
	LivenessMonitorCheckInterval time.Duration
	// contains filtered or unexported fields
}

func (*GpSourceAdvancedProps) Validate

func (p *GpSourceAdvancedProps) Validate() error

func (*GpSourceAdvancedProps) WithDefaults

func (p *GpSourceAdvancedProps) WithDefaults()

type GreenplumCluster

type GreenplumCluster struct {
	Coordinator *GreenplumHAPair   `json:"coordintor,omitempty"`
	Segments    []*GreenplumHAPair `json:"segments,omitempty"`
}

func GreenplumClusterFromGpCluster

func GreenplumClusterFromGpCluster(c *GpCluster) *GreenplumCluster

func (*GreenplumCluster) GetCoordinator

func (x *GreenplumCluster) GetCoordinator() *GreenplumHAPair

func (*GreenplumCluster) GetSegments

func (x *GreenplumCluster) GetSegments() []*GreenplumHAPair

type GreenplumFlavour

type GreenplumFlavour struct {
	// contains filtered or unexported fields
}

func NewGreenplumFlavour

func NewGreenplumFlavour(coordinatorOnlyMode bool) *GreenplumFlavour

NewGreenplumFlavour constructs a flavour for PostgreSQL schema extractor

func NewGreenplumFlavourImpl

func NewGreenplumFlavourImpl(coordinatorOnlyMode bool, pgClassFilter func(bool, func() string) string, pgClassRelsOnlyFilter func() string) *GreenplumFlavour

func (*GreenplumFlavour) ListSchemaQuery

func (f *GreenplumFlavour) ListSchemaQuery(excludeViews bool, withSpecificTable bool, forbiddenSchemas []string, forbiddenTables []string) string

func (*GreenplumFlavour) ListTablesQuery

func (f *GreenplumFlavour) ListTablesQuery(excludeViews bool, forbiddenSchemas []string, forbiddenTables []string) string

func (*GreenplumFlavour) PgClassFilter

func (f *GreenplumFlavour) PgClassFilter() string

func (*GreenplumFlavour) PgClassRelsOnlyFilter

func (f *GreenplumFlavour) PgClassRelsOnlyFilter() string

type GreenplumHAPair

type GreenplumHAPair struct {
	Mirror  *GreenplumHostPort `json:"mirror,omitempty"`
	Primary *GreenplumHostPort `json:"primary,omitempty"`
}

func GreenplumAPIHAPairFromGpHAP

func GreenplumAPIHAPairFromGpHAP(hap *GpHAP) *GreenplumHAPair

func (*GreenplumHAPair) GetMirror

func (p *GreenplumHAPair) GetMirror() *GreenplumHostPort

func (*GreenplumHAPair) GetPrimary

func (p *GreenplumHAPair) GetPrimary() *GreenplumHostPort

type GreenplumHostPort

type GreenplumHostPort struct {
	Host string `json:"host"`
	Port int64  `json:"port"`
}

func (*GreenplumHostPort) GetHost

func (p *GreenplumHostPort) GetHost() string

func (*GreenplumHostPort) GetPort

func (p *GreenplumHostPort) GetPort() int64

type MDBClusterCreds

type MDBClusterCreds struct {
	ClusterID string
}

type MasterHostResolver

type MasterHostResolver interface {
	MasterHosts() (master string, replica string, err error)
}

type PgAuthProps

type PgAuthProps struct {
	Password      server.SecretString
	CACertificate string
}

type PgSinkParamsRegulated

type PgSinkParamsRegulated struct {
	FClusterID              string
	FAllHosts               []string
	FPort                   int
	FDatabase               string
	FUser                   string
	FPassword               string
	FTLSFile                string
	FMaintainTables         bool
	FPerTransactionPush     bool
	FLoozeMode              bool
	FCleanupMode            server.CleanupType
	FTables                 map[string]string
	FCopyUpload             bool
	FIgnoreUniqueConstraint bool
	FDisableSQLFallback     bool
	FQueryTimeout           time.Duration
}

func GpDestinationToPgSinkParamsRegulated

func GpDestinationToPgSinkParamsRegulated(d *GpDestination) *PgSinkParamsRegulated

func (PgSinkParamsRegulated) AllHosts

func (p PgSinkParamsRegulated) AllHosts() []string

func (PgSinkParamsRegulated) CleanupMode

func (p PgSinkParamsRegulated) CleanupMode() server.CleanupType

func (PgSinkParamsRegulated) ClusterID

func (p PgSinkParamsRegulated) ClusterID() string

func (PgSinkParamsRegulated) ConnectionID

func (p PgSinkParamsRegulated) ConnectionID() string

func (PgSinkParamsRegulated) CopyUpload

func (p PgSinkParamsRegulated) CopyUpload() bool

func (PgSinkParamsRegulated) Database

func (p PgSinkParamsRegulated) Database() string

func (PgSinkParamsRegulated) DisableSQLFallback

func (p PgSinkParamsRegulated) DisableSQLFallback() bool

func (PgSinkParamsRegulated) HasTLS

func (p PgSinkParamsRegulated) HasTLS() bool

func (PgSinkParamsRegulated) IgnoreUniqueConstraint

func (p PgSinkParamsRegulated) IgnoreUniqueConstraint() bool

func (PgSinkParamsRegulated) LoozeMode

func (p PgSinkParamsRegulated) LoozeMode() bool

func (PgSinkParamsRegulated) MaintainTables

func (p PgSinkParamsRegulated) MaintainTables() bool

func (PgSinkParamsRegulated) Password

func (p PgSinkParamsRegulated) Password() string

func (PgSinkParamsRegulated) PerTransactionPush

func (p PgSinkParamsRegulated) PerTransactionPush() bool

func (PgSinkParamsRegulated) Port

func (p PgSinkParamsRegulated) Port() int

func (PgSinkParamsRegulated) QueryTimeout

func (p PgSinkParamsRegulated) QueryTimeout() time.Duration

func (PgSinkParamsRegulated) TLSFile

func (p PgSinkParamsRegulated) TLSFile() string

func (PgSinkParamsRegulated) Tables

func (p PgSinkParamsRegulated) Tables() map[string]string

func (PgSinkParamsRegulated) User

func (p PgSinkParamsRegulated) User() string

type Provider

type Provider struct {
	// contains filtered or unexported fields
}

func (*Provider) Activate

func (*Provider) Sink

func (p *Provider) Sink(config middlewares.Config) (abstract.Sinker, error)

func (*Provider) Storage

func (p *Provider) Storage() (abstract.Storage, error)

func (*Provider) Type

func (p *Provider) Type() abstract.ProviderType

type SegPointerPool

type SegPointerPool struct {
	// contains filtered or unexported fields
}

SegPointerPool is a set of Greenplum storage segment pointers with additional functions

func NewRandomSegPointerPool

func NewRandomSegPointerPool(totalSegments int, size int) *SegPointerPool

NewRandomSegPointerPool constructs a pool of the given size, the first element of which is chosen randomly from a ring consisting of the given total number of segments

func (*SegPointerPool) NextRoundRobin

func (p *SegPointerPool) NextRoundRobin() GPSegPointer

type Sink

type Sink struct {

	// SegPoolShare is the share of segments (from their total count) used by this sink
	SegPoolShare float64
	// contains filtered or unexported fields
}

func (*Sink) Close

func (s *Sink) Close() error

func (*Sink) Push

func (s *Sink) Push(input []abstract.ChangeItem) error

type Storage

type Storage struct {
	// contains filtered or unexported fields
}

func NewStorage

func NewStorage(config *GpSource, mRegistry metrics.Registry) *Storage

func NewStorageImpl

func NewStorageImpl(config *GpSource, mRegistry metrics.Registry, checkConnection checkConnectionFunc, newFlavor newFlavorFunc) *Storage

func (*Storage) BeginGPSnapshot

func (s *Storage) BeginGPSnapshot(ctx context.Context, tables []abstract.TableDescription) error

Named BeginGPSnapshot to NOT match abstract.SnapshotableStorage; BeginGPSnapshot starts a Greenplum cluster-global transaction;

func (*Storage) Close

func (s *Storage) Close()

func (*Storage) EndGPSnapshot

func (s *Storage) EndGPSnapshot(ctx context.Context) error

Named EndGPSnapshot to NOT match abstract.SnapshotableStorage; EndGPSnapshot ceases a Greenplum cluster-global transaction;

func (*Storage) EnsureAvailability

func (s *Storage) EnsureAvailability(ctx context.Context, sp GPSegPointer) error

func (*Storage) EstimateTableRowsCount

func (s *Storage) EstimateTableRowsCount(table abstract.TableID) (uint64, error)

func (*Storage) ExactTableRowsCount

func (s *Storage) ExactTableRowsCount(table abstract.TableID) (uint64, error)

func (*Storage) LoadTable

func (s *Storage) LoadTable(ctx context.Context, table abstract.TableDescription, pusher abstract.Pusher) error

func (*Storage) LoadTableImplDistributed

func (s *Storage) LoadTableImplDistributed(ctx context.Context, table abstract.TableDescription, pusher abstract.Pusher) error

func (*Storage) LoadTableImplNonDistributed

func (s *Storage) LoadTableImplNonDistributed(ctx context.Context, table abstract.TableDescription, pusher abstract.Pusher) error

func (*Storage) PGStorage

func (s *Storage) PGStorage(ctx context.Context, sp GPSegPointer) (*postgres.Storage, error)

PGStorage returns a live PG storage or an error

func (*Storage) Ping

func (s *Storage) Ping() error

func (*Storage) RunSlotMonitor

func (s *Storage) RunSlotMonitor(ctx context.Context, serverSource interface{}, registry metrics.Registry) (abstract.SlotKiller, <-chan error, error)

RunSlotMonitor in Greenplum returns the liveness monitor. There are no replication slots in Greenplum. The liveness monitor ensures the transaction is still open and simple queries can be run on it. The liveness monitor is only run in sharded transfers. It starts automatically at BeginSnapshot.

func (*Storage) SetShardingContext

func (s *Storage) SetShardingContext(shardedState []byte) error

func (*Storage) SetWorkersCount

func (s *Storage) SetWorkersCount(count int)

func (*Storage) ShardTable

ShardTable implements ShardingStorage by replicating the table, producing the number of tables equal to the number of jobs. This approach is taken because Greenplum shards load by segments, stored in context; not by tables.

func (*Storage) ShardingContext

func (s *Storage) ShardingContext() ([]byte, error)

func (*Storage) TableExists

func (s *Storage) TableExists(table abstract.TableID) (bool, error)

func (*Storage) TableList

func (s *Storage) TableList(filter abstract.IncludeTableList) (abstract.TableMap, error)

func (*Storage) TableSchema

func (s *Storage) TableSchema(ctx context.Context, table abstract.TableID) (*abstract.TableSchema, error)

func (*Storage) TotalSegments

func (s *Storage) TotalSegments(ctx context.Context) (int, error)

TotalSegments returns the actual total number of segments in Greenplum cluster. Never returns `0`

func (*Storage) WorkersCount

func (s *Storage) WorkersCount() int

func (*Storage) WorkersGpConfig

func (s *Storage) WorkersGpConfig() *WorkersGpConfig

type WorkerIDToGpSegs

type WorkerIDToGpSegs struct {
	WorkerID int32          `json:"workerID,omitempty"`
	Segments []*GpSegAndXID `json:"segments,omitempty"`
}

func (*WorkerIDToGpSegs) GetSegments

func (x *WorkerIDToGpSegs) GetSegments() []*GpSegAndXID

func (*WorkerIDToGpSegs) GetWorkerID

func (x *WorkerIDToGpSegs) GetWorkerID() int32

type WorkersGpConfig

type WorkersGpConfig struct {
	WtsList []*WorkerIDToGpSegs `json:"wtsList"`
	Cluster *GreenplumCluster
}

func (*WorkersGpConfig) GetCluster

func (x *WorkersGpConfig) GetCluster() *GreenplumCluster

func (*WorkersGpConfig) GetWtsList

func (x *WorkersGpConfig) GetWtsList() []*WorkerIDToGpSegs

type WorkersGpConfigContextKeyStruct

type WorkersGpConfigContextKeyStruct struct{}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL