Documentation ¶
Index ¶
- Constants
- Variables
- func ComposePartialProgressFn(base abstract.LoadProgress, completedParts uint, totalParts uint, ...) abstract.LoadProgress
- func CreateRandDistTableQuery(fullTableName string, schema []abstract.ColSchema) (string, error)
- func DropTableQuery(tableFQTN string) string
- func InsertFromSelectQuery(tableDst string, tableSrc string, columnNames []string) string
- func InsertQueryColumns(ci *abstract.ChangeItem) []string
- func New(lgr log.Logger, registry metrics.Registry, cp coordinator.Coordinator, ...) providers.Provider
- func NewSink(transfer *model.Transfer, registry metrics.Registry, lgr log.Logger, ...) (abstract.Sinker, error)
- type GPRole
- type GPSegPointer
- type GpCluster
- type GpConnection
- type GpDestination
- func (d *GpDestination) BuffererConfig() bufferer.BuffererConfig
- func (d *GpDestination) CleanupMode() dp_model.CleanupType
- func (d *GpDestination) GetProviderType() abstract.ProviderType
- func (d *GpDestination) IsDestination()
- func (d *GpDestination) MDBClusterID() string
- func (d *GpDestination) ToGpSource() *GpSource
- func (d *GpDestination) Transformer() map[string]string
- func (d *GpDestination) Validate() error
- func (d *GpDestination) WithDefaults()
- type GpHAP
- type GpHP
- type GpSegAndXID
- type GpSource
- func (s *GpSource) AllIncludes() []string
- func (s *GpSource) FulfilledIncludes(tID abstract.TableID) (result []string)
- func (s *GpSource) GetProviderType() abstract.ProviderType
- func (s *GpSource) Include(tID abstract.TableID) bool
- func (s *GpSource) IsSource()
- func (s *GpSource) IsStrictSource()
- func (s *GpSource) MDBClusterID() string
- func (s *GpSource) Validate() error
- func (s *GpSource) WithDefaults()
- type GpSourceAdvancedProps
- type GreenplumCluster
- type GreenplumFlavour
- func (f *GreenplumFlavour) ListSchemaQuery(excludeViews bool, withSpecificTable bool, forbiddenSchemas []string, ...) string
- func (f *GreenplumFlavour) ListTablesQuery(excludeViews bool, forbiddenSchemas []string, forbiddenTables []string) string
- func (f *GreenplumFlavour) PgClassFilter() string
- func (f *GreenplumFlavour) PgClassRelsOnlyFilter() string
- type GreenplumHAPair
- type GreenplumHostPort
- type MDBClusterCreds
- type MasterHostResolver
- type PgAuthProps
- type PgSinkParamsRegulated
- func (p PgSinkParamsRegulated) AllHosts() []string
- func (p PgSinkParamsRegulated) CleanupMode() model.CleanupType
- func (p PgSinkParamsRegulated) ClusterID() string
- func (p PgSinkParamsRegulated) ConnectionID() string
- func (p PgSinkParamsRegulated) CopyUpload() bool
- func (p PgSinkParamsRegulated) Database() string
- func (p PgSinkParamsRegulated) DisableSQLFallback() bool
- func (p PgSinkParamsRegulated) HasTLS() bool
- func (p PgSinkParamsRegulated) IgnoreUniqueConstraint() bool
- func (p PgSinkParamsRegulated) LoozeMode() bool
- func (p PgSinkParamsRegulated) MaintainTables() bool
- func (p PgSinkParamsRegulated) Password() string
- func (p PgSinkParamsRegulated) PerTransactionPush() bool
- func (p PgSinkParamsRegulated) Port() int
- func (p PgSinkParamsRegulated) QueryTimeout() time.Duration
- func (p PgSinkParamsRegulated) TLSFile() string
- func (p PgSinkParamsRegulated) Tables() map[string]string
- func (p PgSinkParamsRegulated) User() string
- type Provider
- type SegPointerPool
- type Sink
- type Storage
- func (s *Storage) BeginGPSnapshot(ctx context.Context, tables []abstract.TableDescription) error
- func (s *Storage) Close()
- func (s *Storage) EndGPSnapshot(ctx context.Context) error
- func (s *Storage) EnsureAvailability(ctx context.Context, sp GPSegPointer) error
- func (s *Storage) EstimateTableRowsCount(table abstract.TableID) (uint64, error)
- func (s *Storage) ExactTableRowsCount(table abstract.TableID) (uint64, error)
- func (s *Storage) LoadTable(ctx context.Context, table abstract.TableDescription, pusher abstract.Pusher) error
- func (s *Storage) LoadTableImplDistributed(ctx context.Context, table abstract.TableDescription, pusher abstract.Pusher) error
- func (s *Storage) LoadTableImplNonDistributed(ctx context.Context, table abstract.TableDescription, pusher abstract.Pusher) error
- func (s *Storage) PGStorage(ctx context.Context, sp GPSegPointer) (*postgres.Storage, error)
- func (s *Storage) Ping() error
- func (s *Storage) RunSlotMonitor(ctx context.Context, serverSource interface{}, registry metrics.Registry) (abstract.SlotKiller, <-chan error, error)
- func (s *Storage) SetShardingContext(shardedState []byte) error
- func (s *Storage) SetWorkersCount(count int)
- func (s *Storage) ShardTable(ctx context.Context, table abstract.TableDescription) ([]abstract.TableDescription, error)
- func (s *Storage) ShardingContext() ([]byte, error)
- func (s *Storage) TableExists(table abstract.TableID) (bool, error)
- func (s *Storage) TableList(filter abstract.IncludeTableList) (abstract.TableMap, error)
- func (s *Storage) TableSchema(ctx context.Context, table abstract.TableID) (*abstract.TableSchema, error)
- func (s *Storage) TotalSegments(ctx context.Context) (int, error)
- func (s *Storage) WorkersCount() int
- func (s *Storage) WorkersGpConfig() *WorkersGpConfig
- type WorkerIDToGpSegs
- type WorkersGpConfig
- type WorkersGpConfigContextKeyStruct
Constants ¶
const EtaRowPartialProgress = 1 << 20
const PingTimeout = 5 * time.Minute
const (
ProviderType = abstract.ProviderType("gp")
)
const SQLStateInRecovery string = "57M02"
Variables ¶
var WorkersGpConfigContextKey = &WorkersGpConfigContextKeyStruct{}
Functions ¶
func ComposePartialProgressFn ¶
func ComposePartialProgressFn(base abstract.LoadProgress, completedParts uint, totalParts uint, totalEta uint64) abstract.LoadProgress
ComposePartialProgressFn allows to transform progress by part into total progress by multiple parts
func DropTableQuery ¶
DropTableQuery returns a `DROP TABLE IF EXISTS` SQL query. So the resulting query is "ensuring", not "imperative"
func InsertFromSelectQuery ¶
InsertFromSelectQuery returns a `INSERT INTO ... SELECT FROM` SQL query
func InsertQueryColumns ¶
func InsertQueryColumns(ci *abstract.ChangeItem) []string
InsertQueryColumns returns a set of columns (fields, not values) for an INSERT query. Auto-generated columns are removed from the result
Types ¶
type GPSegPointer ¶
type GPSegPointer struct {
// contains filtered or unexported fields
}
func Coordinator ¶
func Coordinator() GPSegPointer
func Segment ¶
func Segment(index int) GPSegPointer
func (GPSegPointer) String ¶
func (s GPSegPointer) String() string
type GpCluster ¶
func GpClusterFromGreenplumCluster ¶
func GpClusterFromGreenplumCluster(c *GreenplumCluster) *GpCluster
type GpConnection ¶
type GpConnection struct { MDBCluster *MDBClusterCreds OnPremises *GpCluster Database string User string AuthProps PgAuthProps }
func (*GpConnection) Validate ¶
func (c *GpConnection) Validate() error
func (*GpConnection) WithDefaults ¶
func (c *GpConnection) WithDefaults()
type GpDestination ¶
type GpDestination struct { Connection GpConnection CleanupPolicy dp_model.CleanupType SubnetID string SecurityGroupIDs []string BufferTriggingSize uint64 BufferTriggingInterval time.Duration QueryTimeout time.Duration }
func (*GpDestination) BuffererConfig ¶
func (d *GpDestination) BuffererConfig() bufferer.BuffererConfig
func (*GpDestination) CleanupMode ¶
func (d *GpDestination) CleanupMode() dp_model.CleanupType
func (*GpDestination) GetProviderType ¶
func (d *GpDestination) GetProviderType() abstract.ProviderType
func (*GpDestination) IsDestination ¶
func (d *GpDestination) IsDestination()
func (*GpDestination) MDBClusterID ¶
func (d *GpDestination) MDBClusterID() string
func (*GpDestination) ToGpSource ¶
func (d *GpDestination) ToGpSource() *GpSource
func (*GpDestination) Transformer ¶
func (d *GpDestination) Transformer() map[string]string
func (*GpDestination) Validate ¶
func (d *GpDestination) Validate() error
func (*GpDestination) WithDefaults ¶
func (d *GpDestination) WithDefaults()
type GpHAP ¶
GpHAP stands for "Greenplum Highly Available host Pair"
func GpHAPFromGreenplumAPIHAPair ¶
func GpHAPFromGreenplumAPIHAPair(hap *GreenplumHAPair) *GpHAP
func GpHAPFromGreenplumUIHAPair ¶
func GpHAPFromGreenplumUIHAPair(hap greenplumHAPair) *GpHAP
type GpHP ¶
GpHP stands for "Greenplum Host/Port"
func NewGpHpWithMDBReplacement ¶
NewGpHpWithMDBReplacement replaces domain names for Cloud Preprod & Prod and returns a new host-port pair
type GpSegAndXID ¶
type GpSegAndXID struct { SegmentID int32 `json:"segmentID,omitempty"` Xid int64 `json:"xid,omitempty"` }
func (*GpSegAndXID) GetSegmentID ¶
func (x *GpSegAndXID) GetSegmentID() int32
func (*GpSegAndXID) GetXid ¶
func (x *GpSegAndXID) GetXid() int64
type GpSource ¶
type GpSource struct { Connection GpConnection IncludeTables []string ExcludeTables []string AdvancedProps GpSourceAdvancedProps SubnetID string SecurityGroupIDs []string }
func (*GpSource) AllIncludes ¶
func (*GpSource) FulfilledIncludes ¶
func (*GpSource) GetProviderType ¶
func (s *GpSource) GetProviderType() abstract.ProviderType
func (*GpSource) IsStrictSource ¶
func (s *GpSource) IsStrictSource()
func (*GpSource) MDBClusterID ¶
func (*GpSource) WithDefaults ¶
func (s *GpSource) WithDefaults()
type GpSourceAdvancedProps ¶
type GpSourceAdvancedProps struct { // EnforceConsistency enables *enforcement* of consistent snapshot. When it is not set, the user is responsible for snapshot consistency EnforceConsistency bool ServiceSchema string // AllowCoordinatorTxFailure disables coordinator TX monitoring (liveness monitor) and enables the transfer to finish snapshot successfully even if the coordinator TX fails AllowCoordinatorTxFailure bool LivenessMonitorCheckInterval time.Duration // contains filtered or unexported fields }
func (*GpSourceAdvancedProps) Validate ¶
func (p *GpSourceAdvancedProps) Validate() error
func (*GpSourceAdvancedProps) WithDefaults ¶
func (p *GpSourceAdvancedProps) WithDefaults()
type GreenplumCluster ¶
type GreenplumCluster struct { Coordinator *GreenplumHAPair `json:"coordintor,omitempty"` Segments []*GreenplumHAPair `json:"segments,omitempty"` }
func GreenplumClusterFromGpCluster ¶
func GreenplumClusterFromGpCluster(c *GpCluster) *GreenplumCluster
func (*GreenplumCluster) GetCoordinator ¶
func (x *GreenplumCluster) GetCoordinator() *GreenplumHAPair
func (*GreenplumCluster) GetSegments ¶
func (x *GreenplumCluster) GetSegments() []*GreenplumHAPair
type GreenplumFlavour ¶
type GreenplumFlavour struct {
// contains filtered or unexported fields
}
func NewGreenplumFlavour ¶
func NewGreenplumFlavour(coordinatorOnlyMode bool) *GreenplumFlavour
NewGreenplumFlavour constructs a flavour for PostgreSQL schema extractor
func NewGreenplumFlavourImpl ¶
func (*GreenplumFlavour) ListSchemaQuery ¶
func (*GreenplumFlavour) ListTablesQuery ¶
func (f *GreenplumFlavour) ListTablesQuery(excludeViews bool, forbiddenSchemas []string, forbiddenTables []string) string
func (*GreenplumFlavour) PgClassFilter ¶
func (f *GreenplumFlavour) PgClassFilter() string
func (*GreenplumFlavour) PgClassRelsOnlyFilter ¶
func (f *GreenplumFlavour) PgClassRelsOnlyFilter() string
type GreenplumHAPair ¶
type GreenplumHAPair struct { Mirror *GreenplumHostPort `json:"mirror,omitempty"` Primary *GreenplumHostPort `json:"primary,omitempty"` }
func GreenplumAPIHAPairFromGpHAP ¶
func GreenplumAPIHAPairFromGpHAP(hap *GpHAP) *GreenplumHAPair
func (*GreenplumHAPair) GetMirror ¶
func (p *GreenplumHAPair) GetMirror() *GreenplumHostPort
func (*GreenplumHAPair) GetPrimary ¶
func (p *GreenplumHAPair) GetPrimary() *GreenplumHostPort
type GreenplumHostPort ¶
func (*GreenplumHostPort) GetHost ¶
func (p *GreenplumHostPort) GetHost() string
func (*GreenplumHostPort) GetPort ¶
func (p *GreenplumHostPort) GetPort() int64
type MDBClusterCreds ¶
type MDBClusterCreds struct {
ClusterID string
}
type MasterHostResolver ¶
type PgAuthProps ¶
type PgAuthProps struct { Password model.SecretString CACertificate string }
type PgSinkParamsRegulated ¶
type PgSinkParamsRegulated struct { FClusterID string FAllHosts []string FPort int FDatabase string FUser string FPassword string FTLSFile string FMaintainTables bool FPerTransactionPush bool FLoozeMode bool FCleanupMode model.CleanupType FTables map[string]string FCopyUpload bool FIgnoreUniqueConstraint bool FDisableSQLFallback bool FQueryTimeout time.Duration }
func GpDestinationToPgSinkParamsRegulated ¶
func GpDestinationToPgSinkParamsRegulated(d *GpDestination) *PgSinkParamsRegulated
func (PgSinkParamsRegulated) AllHosts ¶
func (p PgSinkParamsRegulated) AllHosts() []string
func (PgSinkParamsRegulated) CleanupMode ¶
func (p PgSinkParamsRegulated) CleanupMode() model.CleanupType
func (PgSinkParamsRegulated) ClusterID ¶
func (p PgSinkParamsRegulated) ClusterID() string
func (PgSinkParamsRegulated) ConnectionID ¶
func (p PgSinkParamsRegulated) ConnectionID() string
func (PgSinkParamsRegulated) CopyUpload ¶
func (p PgSinkParamsRegulated) CopyUpload() bool
func (PgSinkParamsRegulated) Database ¶
func (p PgSinkParamsRegulated) Database() string
func (PgSinkParamsRegulated) DisableSQLFallback ¶
func (p PgSinkParamsRegulated) DisableSQLFallback() bool
func (PgSinkParamsRegulated) HasTLS ¶
func (p PgSinkParamsRegulated) HasTLS() bool
func (PgSinkParamsRegulated) IgnoreUniqueConstraint ¶
func (p PgSinkParamsRegulated) IgnoreUniqueConstraint() bool
func (PgSinkParamsRegulated) LoozeMode ¶
func (p PgSinkParamsRegulated) LoozeMode() bool
func (PgSinkParamsRegulated) MaintainTables ¶
func (p PgSinkParamsRegulated) MaintainTables() bool
func (PgSinkParamsRegulated) Password ¶
func (p PgSinkParamsRegulated) Password() string
func (PgSinkParamsRegulated) PerTransactionPush ¶
func (p PgSinkParamsRegulated) PerTransactionPush() bool
func (PgSinkParamsRegulated) Port ¶
func (p PgSinkParamsRegulated) Port() int
func (PgSinkParamsRegulated) QueryTimeout ¶
func (p PgSinkParamsRegulated) QueryTimeout() time.Duration
func (PgSinkParamsRegulated) TLSFile ¶
func (p PgSinkParamsRegulated) TLSFile() string
func (PgSinkParamsRegulated) Tables ¶
func (p PgSinkParamsRegulated) Tables() map[string]string
func (PgSinkParamsRegulated) User ¶
func (p PgSinkParamsRegulated) User() string
type Provider ¶
type Provider struct {
// contains filtered or unexported fields
}
func (*Provider) Activate ¶
func (p *Provider) Activate(ctx context.Context, task *model.TransferOperation, tables abstract.TableMap, callbacks providers.ActivateCallbacks) error
func (*Provider) Type ¶
func (p *Provider) Type() abstract.ProviderType
type SegPointerPool ¶
type SegPointerPool struct {
// contains filtered or unexported fields
}
SegPointerPool is a set of Greenplum storage segment pointers with additional functions
func NewRandomSegPointerPool ¶
func NewRandomSegPointerPool(totalSegments int, size int) *SegPointerPool
NewRandomSegPointerPool constructs a pool of the given size, the first element of which is chosen randomly from a ring consisting of the given total number of segments
func (*SegPointerPool) NextRoundRobin ¶
func (p *SegPointerPool) NextRoundRobin() GPSegPointer
type Storage ¶
type Storage struct {
// contains filtered or unexported fields
}
func NewStorageImpl ¶
func (*Storage) BeginGPSnapshot ¶
Named BeginGPSnapshot to NOT match abstract.SnapshotableStorage; BeginGPSnapshot starts a Greenplum cluster-global transaction;
func (*Storage) EndGPSnapshot ¶
Named EndGPSnapshot to NOT match abstract.SnapshotableStorage; EndGPSnapshot ceases a Greenplum cluster-global transaction;
func (*Storage) EnsureAvailability ¶
func (s *Storage) EnsureAvailability(ctx context.Context, sp GPSegPointer) error
func (*Storage) EstimateTableRowsCount ¶
func (*Storage) ExactTableRowsCount ¶
func (*Storage) LoadTableImplDistributed ¶
func (*Storage) LoadTableImplNonDistributed ¶
func (*Storage) RunSlotMonitor ¶
func (s *Storage) RunSlotMonitor(ctx context.Context, serverSource interface{}, registry metrics.Registry) (abstract.SlotKiller, <-chan error, error)
RunSlotMonitor in Greenplum returns the liveness monitor. There are no replication slots in Greenplum. The liveness monitor ensures the transaction is still open and simple queries can be run on it. The liveness monitor is only run in sharded transfers. It starts automatically at BeginSnapshot.
func (*Storage) SetShardingContext ¶
func (*Storage) SetWorkersCount ¶
func (*Storage) ShardTable ¶
func (s *Storage) ShardTable(ctx context.Context, table abstract.TableDescription) ([]abstract.TableDescription, error)
ShardTable implements ShardingStorage by replicating the table, producing the number of tables equal to the number of jobs. This approach is taken because Greenplum shards load by segments, stored in context; not by tables.
func (*Storage) ShardingContext ¶
func (*Storage) TableSchema ¶
func (*Storage) TotalSegments ¶
TotalSegments returns the actual total number of segments in Greenplum cluster. Never returns `0`
func (*Storage) WorkersCount ¶
func (*Storage) WorkersGpConfig ¶
func (s *Storage) WorkersGpConfig() *WorkersGpConfig
type WorkerIDToGpSegs ¶
type WorkerIDToGpSegs struct { WorkerID int32 `json:"workerID,omitempty"` Segments []*GpSegAndXID `json:"segments,omitempty"` }
func (*WorkerIDToGpSegs) GetSegments ¶
func (x *WorkerIDToGpSegs) GetSegments() []*GpSegAndXID
func (*WorkerIDToGpSegs) GetWorkerID ¶
func (x *WorkerIDToGpSegs) GetWorkerID() int32
type WorkersGpConfig ¶
type WorkersGpConfig struct { WtsList []*WorkerIDToGpSegs `json:"wtsList"` Cluster *GreenplumCluster }
func (*WorkersGpConfig) GetCluster ¶
func (x *WorkersGpConfig) GetCluster() *GreenplumCluster
func (*WorkersGpConfig) GetWtsList ¶
func (x *WorkersGpConfig) GetWtsList() []*WorkerIDToGpSegs
type WorkersGpConfigContextKeyStruct ¶
type WorkersGpConfigContextKeyStruct struct{}