opensearch

package
v0.0.0-rc14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 20, 2024 License: Apache-2.0 Imports: 12 Imported by: 0

README

Using OpenSearch via ElasticSearch official client

Official ElasticSearch client checks if server is truly 'ElasticSearch' by two parts:

  • Before every request - it can be turned-off by parameter 'useResponseCheckOnly' in config
    • so, we set UseResponseCheckOnly:true
  • After first request - and sets private field in client: 'productCheckSuccess' to 'true' - in case of success
    • so, we call 'setProductCheckSuccess' function to set it into true

As result, we can work with OpenSearch via ElasticSearch official client.

If new version of ElasticSearch client won't contain 'productCheckSuccess' field - test 'TestSetProductCheckSuccess' will show it.

Documentation

Index

Constants

View Source
const ProviderType = abstract.ProviderType("opensearch")

Variables

This section is empty.

Functions

func New

func NewSink

func NewSink(cfg *OpenSearchDestination, logger log.Logger, registry metrics.Registry) (abstract.Sinker, error)

func NewSinkImpl

func NewSinkImpl(cfg *OpenSearchDestination, logger log.Logger, registry metrics.Registry, client *elasticsearch.Client) (abstract.Sinker, error)

Types

type OpenSearchDestination

type OpenSearchDestination struct {
	ClusterID        string
	DataNodes        []OpenSearchHostPort
	User             string
	Password         model.SecretString
	SSLEnabled       bool
	TLSFile          string
	SubNetworkID     string
	SecurityGroupIDs []string
	Cleanup          model.CleanupType

	SanitizeDocKeys bool
}

func (*OpenSearchDestination) CleanupMode

func (d *OpenSearchDestination) CleanupMode() model.CleanupType

func (*OpenSearchDestination) Compatible

func (d *OpenSearchDestination) Compatible(src model.Source, transferType abstract.TransferType) error

func (*OpenSearchDestination) GetProviderType

func (d *OpenSearchDestination) GetProviderType() abstract.ProviderType

func (*OpenSearchDestination) Hosts

func (d *OpenSearchDestination) Hosts() []string

func (*OpenSearchDestination) IsDestination

func (d *OpenSearchDestination) IsDestination()

func (*OpenSearchDestination) MDBClusterID

func (d *OpenSearchDestination) MDBClusterID() string

func (*OpenSearchDestination) ToElasticSearchDestination

func (d *OpenSearchDestination) ToElasticSearchDestination() (*elastic.ElasticSearchDestination, elastic.ServerType)

func (*OpenSearchDestination) Transformer

func (d *OpenSearchDestination) Transformer() map[string]string

func (*OpenSearchDestination) Validate

func (d *OpenSearchDestination) Validate() error

func (*OpenSearchDestination) WithDefaults

func (d *OpenSearchDestination) WithDefaults()

type OpenSearchHostPort

type OpenSearchHostPort struct {
	Host string
	Port int
}

type OpenSearchSource

type OpenSearchSource struct {
	ClusterID            string
	DataNodes            []OpenSearchHostPort
	User                 string
	Password             model.SecretString
	SSLEnabled           bool
	TLSFile              string
	SubNetworkID         string
	SecurityGroupIDs     []string
	DumpIndexWithMapping bool
}

func (*OpenSearchSource) GetProviderType

func (s *OpenSearchSource) GetProviderType() abstract.ProviderType

func (*OpenSearchSource) Hosts

func (s *OpenSearchSource) Hosts() []string

func (*OpenSearchSource) IsSource

func (s *OpenSearchSource) IsSource()

func (*OpenSearchSource) MDBClusterID

func (s *OpenSearchSource) MDBClusterID() string

func (*OpenSearchSource) ToElasticSearchSource

func (s *OpenSearchSource) ToElasticSearchSource() (*elastic.ElasticSearchSource, elastic.ServerType)

func (*OpenSearchSource) Validate

func (s *OpenSearchSource) Validate() error

func (*OpenSearchSource) WithDefaults

func (s *OpenSearchSource) WithDefaults()

type Provider

type Provider struct {
	// contains filtered or unexported fields
}

func (*Provider) Activate

func (p *Provider) Activate(ctx context.Context, task *model.TransferOperation, tables abstract.TableMap, callbacks providers.ActivateCallbacks) error

func (*Provider) Sink

func (*Provider) Storage

func (p *Provider) Storage() (abstract.Storage, error)

func (*Provider) Type

func (p *Provider) Type() abstract.ProviderType

type Sink

type Sink struct {
	// contains filtered or unexported fields
}

func (*Sink) Close

func (s *Sink) Close() error

func (*Sink) Push

func (s *Sink) Push(input []abstract.ChangeItem) error

type Storage

type Storage struct {
	// contains filtered or unexported fields
}

func NewStorage

func NewStorage(src *OpenSearchSource, logger log.Logger, mRegistry metrics.Registry, opts ...elastic.StorageOpt) (*Storage, error)

func (*Storage) Close

func (s *Storage) Close()

func (*Storage) EstimateTableRowsCount

func (s *Storage) EstimateTableRowsCount(table abstract.TableID) (uint64, error)

func (*Storage) ExactTableRowsCount

func (s *Storage) ExactTableRowsCount(table abstract.TableID) (uint64, error)

func (*Storage) LoadTable

func (s *Storage) LoadTable(ctx context.Context, table abstract.TableDescription, pusher abstract.Pusher) error

func (*Storage) Ping

func (s *Storage) Ping() error

func (*Storage) ShardTable

func (*Storage) TableExists

func (s *Storage) TableExists(table abstract.TableID) (bool, error)

func (*Storage) TableList

func (s *Storage) TableList(includeTableFilter abstract.IncludeTableList) (abstract.TableMap, error)

func (*Storage) TableSchema

func (s *Storage) TableSchema(ctx context.Context, table abstract.TableID) (*abstract.TableSchema, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL