Documentation ¶
Index ¶
- Constants
- func ConfigFromDestination(logger log.Logger, cfg *ElasticSearchDestination, serverType ServerType) (*elasticsearch.Config, error)
- func DeleteSystemFieldsFromIndexParams(params map[string]interface{})
- func DumpIndexInfo(transfer *server.Transfer, logger log.Logger, mRegistry metrics.Registry) error
- func New(lgr log.Logger, registry metrics.Registry, cp coordinator.Coordinator, ...) providers.Provider
- func NewSink(cfg *ElasticSearchDestination, logger log.Logger, registry metrics.Registry) (abstract.Sinker, error)
- func NewSinkImpl(cfg *ElasticSearchDestination, logger log.Logger, registry metrics.Registry, ...) (abstract.Sinker, error)
- func WaitForIndexToExist(client *elasticsearch.Client, indexName string, timeout time.Duration) error
- func WithLogger(config elasticsearch.Config, logger log.Logger, serverType ServerType) (*elasticsearch.Client, error)
- type ElasticSearchDestination
- func (d *ElasticSearchDestination) CleanupMode() server.CleanupType
- func (d *ElasticSearchDestination) Compatible(src server.Source, transferType abstract.TransferType) error
- func (d *ElasticSearchDestination) GetProviderType() abstract.ProviderType
- func (d *ElasticSearchDestination) Hosts() []string
- func (d *ElasticSearchDestination) IsDestination()
- func (d *ElasticSearchDestination) MDBClusterID() string
- func (d *ElasticSearchDestination) ToElasticSearchDestination() (*ElasticSearchDestination, ServerType)
- func (d *ElasticSearchDestination) Transformer() map[string]string
- func (d *ElasticSearchDestination) VPCSecurityGroups() []string
- func (d *ElasticSearchDestination) VPCSubnets() []string
- func (d *ElasticSearchDestination) Validate() error
- func (d *ElasticSearchDestination) WithDefaults()
- type ElasticSearchHostPort
- type ElasticSearchSource
- func (s *ElasticSearchSource) GetProviderType() abstract.ProviderType
- func (s *ElasticSearchSource) IsSource()
- func (s *ElasticSearchSource) MDBClusterID() string
- func (s *ElasticSearchSource) SourceToElasticSearchDestination() *ElasticSearchDestination
- func (s *ElasticSearchSource) ToElasticSearchSource() (*ElasticSearchSource, ServerType)
- func (s *ElasticSearchSource) VPCSecurityGroups() []string
- func (s *ElasticSearchSource) VPCSubnets() []string
- func (s *ElasticSearchSource) Validate() error
- func (s *ElasticSearchSource) WithDefaults()
- type IsElasticLikeDestination
- type IsElasticLikeSource
- type Provider
- type SchemaDescription
- type ServerType
- type ShardingFilter
- type Sink
- type Storage
- func (s *Storage) Close()
- func (s *Storage) EstimateTableRowsCount(table abstract.TableID) (uint64, error)
- func (s *Storage) ExactTableRowsCount(table abstract.TableID) (uint64, error)
- func (s *Storage) LoadTable(ctx context.Context, table abstract.TableDescription, pusher abstract.Pusher) error
- func (s *Storage) Ping() error
- func (s *Storage) ShardTable(ctx context.Context, table abstract.TableDescription) ([]abstract.TableDescription, error)
- func (s *Storage) TableExists(table abstract.TableID) (bool, error)
- func (s *Storage) TableList(includeTableFilter abstract.IncludeTableList) (abstract.TableMap, error)
- func (s *Storage) TableSchema(_ context.Context, table abstract.TableID) (*abstract.TableSchema, error)
- type StorageOpt
Constants ¶
View Source
const ( Undefined = 0 OpenSearch = 1 ElasticSearch ServerType = 2 )
View Source
const ProviderType = abstract.ProviderType("elasticsearch")
Variables ¶
This section is empty.
Functions ¶
func ConfigFromDestination ¶
func ConfigFromDestination(logger log.Logger, cfg *ElasticSearchDestination, serverType ServerType) (*elasticsearch.Config, error)
func DeleteSystemFieldsFromIndexParams ¶
func DeleteSystemFieldsFromIndexParams(params map[string]interface{})
func DumpIndexInfo ¶
func New ¶
func New(lgr log.Logger, registry metrics.Registry, cp coordinator.Coordinator, transfer *server.Transfer) providers.Provider
func NewSinkImpl ¶
func WaitForIndexToExist ¶
func WithLogger ¶
func WithLogger(config elasticsearch.Config, logger log.Logger, serverType ServerType) (*elasticsearch.Client, error)
Types ¶
type ElasticSearchDestination ¶
type ElasticSearchDestination struct { ClusterID string // Deprecated: new endpoints should be on premise only DataNodes []ElasticSearchHostPort User string Password server.SecretString SSLEnabled bool TLSFile string SubNetworkID string SecurityGroupIDs []string Cleanup server.CleanupType SanitizeDocKeys bool }
func (*ElasticSearchDestination) CleanupMode ¶
func (d *ElasticSearchDestination) CleanupMode() server.CleanupType
func (*ElasticSearchDestination) Compatible ¶
func (d *ElasticSearchDestination) Compatible(src server.Source, transferType abstract.TransferType) error
func (*ElasticSearchDestination) GetProviderType ¶
func (d *ElasticSearchDestination) GetProviderType() abstract.ProviderType
func (*ElasticSearchDestination) Hosts ¶
func (d *ElasticSearchDestination) Hosts() []string
func (*ElasticSearchDestination) IsDestination ¶
func (d *ElasticSearchDestination) IsDestination()
func (*ElasticSearchDestination) MDBClusterID ¶
func (d *ElasticSearchDestination) MDBClusterID() string
func (*ElasticSearchDestination) ToElasticSearchDestination ¶
func (d *ElasticSearchDestination) ToElasticSearchDestination() (*ElasticSearchDestination, ServerType)
func (*ElasticSearchDestination) Transformer ¶
func (d *ElasticSearchDestination) Transformer() map[string]string
func (*ElasticSearchDestination) VPCSecurityGroups ¶
func (d *ElasticSearchDestination) VPCSecurityGroups() []string
func (*ElasticSearchDestination) VPCSubnets ¶
func (d *ElasticSearchDestination) VPCSubnets() []string
func (*ElasticSearchDestination) Validate ¶
func (d *ElasticSearchDestination) Validate() error
func (*ElasticSearchDestination) WithDefaults ¶
func (d *ElasticSearchDestination) WithDefaults()
type ElasticSearchHostPort ¶
type ElasticSearchSource ¶
type ElasticSearchSource struct { ClusterID string // Deprecated: new endpoints should be on premise only DataNodes []ElasticSearchHostPort User string Password server.SecretString SSLEnabled bool TLSFile string SubNetworkID string SecurityGroupIDs []string DumpIndexWithMapping bool }
func (*ElasticSearchSource) GetProviderType ¶
func (s *ElasticSearchSource) GetProviderType() abstract.ProviderType
func (*ElasticSearchSource) IsSource ¶
func (s *ElasticSearchSource) IsSource()
func (*ElasticSearchSource) MDBClusterID ¶
func (s *ElasticSearchSource) MDBClusterID() string
func (*ElasticSearchSource) SourceToElasticSearchDestination ¶
func (s *ElasticSearchSource) SourceToElasticSearchDestination() *ElasticSearchDestination
func (*ElasticSearchSource) ToElasticSearchSource ¶
func (s *ElasticSearchSource) ToElasticSearchSource() (*ElasticSearchSource, ServerType)
func (*ElasticSearchSource) VPCSecurityGroups ¶
func (s *ElasticSearchSource) VPCSecurityGroups() []string
func (*ElasticSearchSource) VPCSubnets ¶
func (s *ElasticSearchSource) VPCSubnets() []string
func (*ElasticSearchSource) Validate ¶
func (s *ElasticSearchSource) Validate() error
func (*ElasticSearchSource) WithDefaults ¶
func (s *ElasticSearchSource) WithDefaults()
type IsElasticLikeDestination ¶
type IsElasticLikeDestination interface {
ToElasticSearchDestination() (*ElasticSearchDestination, ServerType)
}
type IsElasticLikeSource ¶
type IsElasticLikeSource interface {
ToElasticSearchSource() (*ElasticSearchSource, ServerType)
}
type Provider ¶
type Provider struct {
// contains filtered or unexported fields
}
func (*Provider) Activate ¶
func (p *Provider) Activate(ctx context.Context, task *server.TransferOperation, tables abstract.TableMap, callbacks providers.ActivateCallbacks) error
func (*Provider) Type ¶
func (p *Provider) Type() abstract.ProviderType
type SchemaDescription ¶
type ServerType ¶
type ServerType int64
type ShardingFilter ¶
func UnmarshalFilter ¶
func UnmarshalFilter(marshalledFilter string) (ShardingFilter, error)
type Storage ¶
type Storage struct { Cfg *elasticsearch.Config Client *elasticsearch.Client Metrics *stats.SourceStats IsHomo bool }
func NewStorage ¶
func NewStorage(src *ElasticSearchSource, logger log.Logger, mRegistry metrics.Registry, serverType ServerType, opts ...StorageOpt) (*Storage, error)
func WithOpts ¶
func WithOpts(storage *Storage, opts ...StorageOpt) *Storage
func (*Storage) EstimateTableRowsCount ¶
func (*Storage) ExactTableRowsCount ¶
func (*Storage) ShardTable ¶
func (s *Storage) ShardTable(ctx context.Context, table abstract.TableDescription) ([]abstract.TableDescription, error)
Fetch amount of active shards for index in order to calculate ideal slicing for parallelized execution https://www.elastic.co/guide/en/elasticsearch/reference/master/paginate-search-results.html#slice-scroll sliceNr <= shardsNr
func (*Storage) TableSchema ¶
type StorageOpt ¶
func WithHomo ¶
func WithHomo() StorageOpt
Click to show internal directories.
Click to hide internal directories.