elastic

package
v0.0.0-rc1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 16, 2024 License: Apache-2.0 Imports: 41 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Undefined                = 0
	OpenSearch               = 1
	ElasticSearch ServerType = 2
)
View Source
const ProviderType = abstract.ProviderType("elasticsearch")

Variables

This section is empty.

Functions

func ConfigFromDestination

func ConfigFromDestination(logger log.Logger, cfg *ElasticSearchDestination, serverType ServerType) (*elasticsearch.Config, error)

func DeleteSystemFieldsFromIndexParams

func DeleteSystemFieldsFromIndexParams(params map[string]interface{})

func DumpIndexInfo

func DumpIndexInfo(transfer *server.Transfer, logger log.Logger, mRegistry metrics.Registry) error

func New

func NewSink

func NewSink(cfg *ElasticSearchDestination, logger log.Logger, registry metrics.Registry) (abstract.Sinker, error)

func NewSinkImpl

func NewSinkImpl(cfg *ElasticSearchDestination, logger log.Logger, registry metrics.Registry, client *elasticsearch.Client) (abstract.Sinker, error)

func WaitForIndexToExist

func WaitForIndexToExist(client *elasticsearch.Client, indexName string, timeout time.Duration) error

func WithLogger

func WithLogger(config elasticsearch.Config, logger log.Logger, serverType ServerType) (*elasticsearch.Client, error)

Types

type ElasticSearchDestination

type ElasticSearchDestination struct {
	ClusterID        string // Deprecated: new endpoints should be on premise only
	DataNodes        []ElasticSearchHostPort
	User             string
	Password         server.SecretString
	SSLEnabled       bool
	TLSFile          string
	SubNetworkID     string
	SecurityGroupIDs []string
	Cleanup          server.CleanupType

	SanitizeDocKeys bool
}

func (*ElasticSearchDestination) CleanupMode

func (d *ElasticSearchDestination) CleanupMode() server.CleanupType

func (*ElasticSearchDestination) Compatible

func (d *ElasticSearchDestination) Compatible(src server.Source, transferType abstract.TransferType) error

func (*ElasticSearchDestination) GetProviderType

func (d *ElasticSearchDestination) GetProviderType() abstract.ProviderType

func (*ElasticSearchDestination) Hosts

func (d *ElasticSearchDestination) Hosts() []string

func (*ElasticSearchDestination) IsDestination

func (d *ElasticSearchDestination) IsDestination()

func (*ElasticSearchDestination) MDBClusterID

func (d *ElasticSearchDestination) MDBClusterID() string

func (*ElasticSearchDestination) ToElasticSearchDestination

func (d *ElasticSearchDestination) ToElasticSearchDestination() (*ElasticSearchDestination, ServerType)

func (*ElasticSearchDestination) Transformer

func (d *ElasticSearchDestination) Transformer() map[string]string

func (*ElasticSearchDestination) VPCSecurityGroups

func (d *ElasticSearchDestination) VPCSecurityGroups() []string

func (*ElasticSearchDestination) VPCSubnets

func (d *ElasticSearchDestination) VPCSubnets() []string

func (*ElasticSearchDestination) Validate

func (d *ElasticSearchDestination) Validate() error

func (*ElasticSearchDestination) WithDefaults

func (d *ElasticSearchDestination) WithDefaults()

type ElasticSearchHostPort

type ElasticSearchHostPort struct {
	Host string
	Port int
}

type ElasticSearchSource

type ElasticSearchSource struct {
	ClusterID            string // Deprecated: new endpoints should be on premise only
	DataNodes            []ElasticSearchHostPort
	User                 string
	Password             server.SecretString
	SSLEnabled           bool
	TLSFile              string
	SubNetworkID         string
	SecurityGroupIDs     []string
	DumpIndexWithMapping bool
}

func (*ElasticSearchSource) GetProviderType

func (s *ElasticSearchSource) GetProviderType() abstract.ProviderType

func (*ElasticSearchSource) IsSource

func (s *ElasticSearchSource) IsSource()

func (*ElasticSearchSource) MDBClusterID

func (s *ElasticSearchSource) MDBClusterID() string

func (*ElasticSearchSource) SourceToElasticSearchDestination

func (s *ElasticSearchSource) SourceToElasticSearchDestination() *ElasticSearchDestination

func (*ElasticSearchSource) ToElasticSearchSource

func (s *ElasticSearchSource) ToElasticSearchSource() (*ElasticSearchSource, ServerType)

func (*ElasticSearchSource) VPCSecurityGroups

func (s *ElasticSearchSource) VPCSecurityGroups() []string

func (*ElasticSearchSource) VPCSubnets

func (s *ElasticSearchSource) VPCSubnets() []string

func (*ElasticSearchSource) Validate

func (s *ElasticSearchSource) Validate() error

func (*ElasticSearchSource) WithDefaults

func (s *ElasticSearchSource) WithDefaults()

type IsElasticLikeDestination

type IsElasticLikeDestination interface {
	ToElasticSearchDestination() (*ElasticSearchDestination, ServerType)
}

type IsElasticLikeSource

type IsElasticLikeSource interface {
	ToElasticSearchSource() (*ElasticSearchSource, ServerType)
}

type Provider

type Provider struct {
	// contains filtered or unexported fields
}

func (*Provider) Activate

func (*Provider) Sink

func (*Provider) Storage

func (p *Provider) Storage() (abstract.Storage, error)

func (*Provider) Type

func (p *Provider) Type() abstract.ProviderType

type SchemaDescription

type SchemaDescription struct {
	Columns      []abstract.ColSchema
	ColumnsNames []string
}

type ServerType

type ServerType int64

type ShardingFilter

type ShardingFilter struct {
	ID  int `json:"id"`
	Max int `json:"max"`
}

func UnmarshalFilter

func UnmarshalFilter(marshalledFilter string) (ShardingFilter, error)

type Sink

type Sink struct {
	// contains filtered or unexported fields
}

func (*Sink) Close

func (s *Sink) Close() error

func (*Sink) Push

func (s *Sink) Push(input []abstract.ChangeItem) error

type Storage

type Storage struct {
	Cfg     *elasticsearch.Config
	Client  *elasticsearch.Client
	Metrics *stats.SourceStats
	IsHomo  bool
}

func NewStorage

func NewStorage(src *ElasticSearchSource, logger log.Logger, mRegistry metrics.Registry, serverType ServerType, opts ...StorageOpt) (*Storage, error)

func WithOpts

func WithOpts(storage *Storage, opts ...StorageOpt) *Storage

func (*Storage) Close

func (s *Storage) Close()

func (*Storage) EstimateTableRowsCount

func (s *Storage) EstimateTableRowsCount(table abstract.TableID) (uint64, error)

func (*Storage) ExactTableRowsCount

func (s *Storage) ExactTableRowsCount(table abstract.TableID) (uint64, error)

func (*Storage) LoadTable

func (s *Storage) LoadTable(ctx context.Context, table abstract.TableDescription, pusher abstract.Pusher) error

func (*Storage) Ping

func (s *Storage) Ping() error

func (*Storage) ShardTable

Fetch amount of active shards for index in order to calculate ideal slicing for parallelized execution https://www.elastic.co/guide/en/elasticsearch/reference/master/paginate-search-results.html#slice-scroll sliceNr <= shardsNr

func (*Storage) TableExists

func (s *Storage) TableExists(table abstract.TableID) (bool, error)

func (*Storage) TableList

func (s *Storage) TableList(includeTableFilter abstract.IncludeTableList) (abstract.TableMap, error)

func (*Storage) TableSchema

func (s *Storage) TableSchema(_ context.Context, table abstract.TableID) (*abstract.TableSchema, error)

type StorageOpt

type StorageOpt func(storage *Storage) *Storage

func WithHomo

func WithHomo() StorageOpt

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL