model

package
v0.0.0-rc4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 27, 2024 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ClickhouseIOFormatCSV         = ClickhouseIOFormat("CSV")
	ClickhouseIOFormatJSONCompact = ClickhouseIOFormat("JSONCompactEachRow")
	DefaultUser                   = "admin"
)
View Source
const (
	// BufferTriggingSizeDefault is a recommended default value for bufferer trigging size
	// Default value assume that we have 4 thread writer in 3gb box (default runtime box)
	// so each thread would consume at most 256 * 2 (one time for source one time for target) mb + some constant memory
	// in total it would eat 512 * 4 = 2gb, which is less than 3gb
	BufferTriggingSizeDefault uint64 = 256 * humanize.MiByte
)

Variables

This section is empty.

Functions

func ConnectionHosts

func ConnectionHosts(cfg *ChStorageParams, shard string) ([]string, error)

ConnectionHosts returns a list of hosts which can be used to connect to the ClickHouse cluster with the given shard.

Empty `shard` is supported.

func ResolvePassword

func ResolvePassword(clusterID, user, password string) (string, error)

func ShardFromCluster

func ShardFromCluster(clusterID, shardGroup string) (map[string][]string, error)

Types

type ChDestination

type ChDestination struct {
	// ChSinkServerParams
	MdbClusterID     string `json:"Cluster"`
	ChClusterName    string // CH cluster to which data will be transfered. Other clusters would be ignored.
	User             string
	Password         server.SecretString
	Database         string
	Partition        string
	SSLEnabled       bool
	HTTPPort         int
	NativePort       int
	TTL              string
	InferSchema      bool
	MigrationOptions *ChSinkMigrationOptions
	// ForceJSONMode forces JSON protocol at sink:
	// - allows upload records without 'required'-fields, clickhouse fills them via defaults.
	//         BUT IF THEY ARE 'REQUIRED' - WHAT THE POINT?
	// - allows new data types
	// - allows composite data types
	// - allows schemas with expressions & defaults
	// - allows handle some 'alias'
	// - json-protocol is slower than native
	//
	// JSON protocol implementation currently only supports InsertKind items.
	// This option used to be public.
	ForceJSONMode           bool `json:"ForceHTTP"`
	ProtocolUnspecified     bool // Denotes that the original proto configuration does not specify the protocol
	AnyAsString             bool
	SystemColumnsFirst      bool
	IsUpdateable            bool
	UpsertAbsentToastedRows bool

	// Insert settings
	InsertParams InsertParams

	// AltHosts
	Hosts []string

	// ChSinkShardParams
	RetryCount           int
	UseSchemaInTableName bool
	ShardCol             string
	Interval             time.Duration
	AltNamesList         []server.AltName

	// ChSinkParams
	ShardByTransferID          bool
	ShardByRoundRobin          bool
	Rotation                   *server.RotatorConfig
	ShardsList                 []ClickHouseShard
	ColumnValueToShardNameList []ClickHouseColumnValueToShardName

	// fields used only in wrapper-over-sink
	TransformerConfig  map[string]string
	SubNetworkID       string
	SecurityGroupIDs   []string
	Cleanup            server.CleanupType
	PemFileContent     string // timmyb32r: this field is not used in sinker! It seems we are not able to transfer into on-premise ch with cert
	InflightBuffer     int    // deprecated: use BufferTriggingSize instead. Items' count triggering a buffer flush
	BufferTriggingSize uint64
	RootCACertPaths    []string
}

ChDestination - see description of fields in sink_params.go

func (*ChDestination) BuffererConfig

func (d *ChDestination) BuffererConfig() bufferer.BuffererConfig

func (*ChDestination) CleanupMode

func (d *ChDestination) CleanupMode() server.CleanupType

func (*ChDestination) ClusterID

func (d *ChDestination) ClusterID() string

func (*ChDestination) FillDependentFields

func (d *ChDestination) FillDependentFields(transfer *server.Transfer)

func (*ChDestination) GetProviderType

func (d *ChDestination) GetProviderType() abstract.ProviderType

func (ChDestination) IsDestination

func (ChDestination) IsDestination()

func (*ChDestination) MDBClusterID

func (d *ChDestination) MDBClusterID() string

func (*ChDestination) Shards

func (d *ChDestination) Shards() map[string][]string

func (*ChDestination) ToReplicationFromPGSinkParams

func (d *ChDestination) ToReplicationFromPGSinkParams() ChDestinationWrapper

ToReplicationFromPGSinkParams converts the model into sink properties object that would be constructed for a replication from PostgreSQL

func (*ChDestination) ToSinkParams

func (d *ChDestination) ToSinkParams(transfer *server.Transfer) ChDestinationWrapper

ToSinkParams converts the model into sink properties object, which contains extra information which depends on transfer type

func (*ChDestination) ToStorageParams

func (d *ChDestination) ToStorageParams() *ChStorageParams

func (*ChDestination) Transformer

func (d *ChDestination) Transformer() map[string]string

func (*ChDestination) Validate

func (d *ChDestination) Validate() error

func (*ChDestination) WithDefaults

func (d *ChDestination) WithDefaults()

type ChDestinationWrapper

type ChDestinationWrapper struct {
	Model *ChDestination
	// contains filtered or unexported fields
}

ChDestinationWrapper implements ChSinkParams

func (ChDestinationWrapper) AltHosts

func (d ChDestinationWrapper) AltHosts() []string

func (ChDestinationWrapper) AnyAsString

func (d ChDestinationWrapper) AnyAsString() bool

func (ChDestinationWrapper) ChClusterName

func (d ChDestinationWrapper) ChClusterName() string

func (ChDestinationWrapper) Cleanup

func (ChDestinationWrapper) ColumnToShardName

func (d ChDestinationWrapper) ColumnToShardName() map[string]string

func (ChDestinationWrapper) Database

func (d ChDestinationWrapper) Database() string

func (ChDestinationWrapper) HTTPPort

func (d ChDestinationWrapper) HTTPPort() int

func (ChDestinationWrapper) Host

func (d ChDestinationWrapper) Host() *string

func (ChDestinationWrapper) InferSchema

func (d ChDestinationWrapper) InferSchema() bool

func (ChDestinationWrapper) InsertSettings

func (d ChDestinationWrapper) InsertSettings() InsertParams

func (ChDestinationWrapper) Interval

func (d ChDestinationWrapper) Interval() time.Duration

func (ChDestinationWrapper) IsUpdateable

func (d ChDestinationWrapper) IsUpdateable() bool

func (ChDestinationWrapper) MakeChildServerParams

func (d ChDestinationWrapper) MakeChildServerParams(host string) ChSinkServerParams

func (ChDestinationWrapper) MakeChildShardParams

func (d ChDestinationWrapper) MakeChildShardParams(altHosts []string) ChSinkShardParams

func (ChDestinationWrapper) MdbClusterID

func (d ChDestinationWrapper) MdbClusterID() string

func (ChDestinationWrapper) MigrationOptions

func (d ChDestinationWrapper) MigrationOptions() ChSinkMigrationOptions

func (ChDestinationWrapper) NativePort

func (d ChDestinationWrapper) NativePort() int

func (ChDestinationWrapper) Partition

func (d ChDestinationWrapper) Partition() string

func (ChDestinationWrapper) Password

func (d ChDestinationWrapper) Password() string

func (ChDestinationWrapper) PemFileContent

func (d ChDestinationWrapper) PemFileContent() string

func (ChDestinationWrapper) ResolvePassword

func (d ChDestinationWrapper) ResolvePassword() (string, error)

func (ChDestinationWrapper) RetryCount

func (d ChDestinationWrapper) RetryCount() int

func (ChDestinationWrapper) RootCertPaths

func (d ChDestinationWrapper) RootCertPaths() []string

func (ChDestinationWrapper) Rotation

func (ChDestinationWrapper) SSLEnabled

func (d ChDestinationWrapper) SSLEnabled() bool

func (ChDestinationWrapper) SetShards

func (d ChDestinationWrapper) SetShards(shards map[string][]string)

SetShards we can set model variables, bcs we make copy of ChDestination in NewChDestinationV1

func (ChDestinationWrapper) ShardByRoundRobin

func (d ChDestinationWrapper) ShardByRoundRobin() bool

func (ChDestinationWrapper) ShardByTransferID

func (d ChDestinationWrapper) ShardByTransferID() bool

func (ChDestinationWrapper) ShardCol

func (d ChDestinationWrapper) ShardCol() string

func (ChDestinationWrapper) Shards

func (d ChDestinationWrapper) Shards() map[string][]string

func (ChDestinationWrapper) SystemColumnsFirst

func (d ChDestinationWrapper) SystemColumnsFirst() bool

func (ChDestinationWrapper) TTL

func (d ChDestinationWrapper) TTL() string

func (ChDestinationWrapper) Tables

func (d ChDestinationWrapper) Tables() map[string]string

func (ChDestinationWrapper) UploadAsJSON

func (d ChDestinationWrapper) UploadAsJSON() bool

func (ChDestinationWrapper) UpsertAbsentToastedRows

func (d ChDestinationWrapper) UpsertAbsentToastedRows() bool

func (ChDestinationWrapper) UseSchemaInTableName

func (d ChDestinationWrapper) UseSchemaInTableName() bool

func (ChDestinationWrapper) User

func (d ChDestinationWrapper) User() string

type ChSinkClusterParams

type ChSinkClusterParams interface {
	ChSinkServerParams
	// AltHosts
	// In the model it calls 'Hosts'
	//
	// https://github.com/ClickHouse/clickhouse-go#dsn
	// alt_hosts - comma-separated list of single address hosts for load-balancing
	// We can get it from user, and we can fill it from mdb dbaas.
	//
	// for every AltHost, sinkCluster has special sinkServer
	//
	// it's very ad-hoc field - every sinker rewrites it as it want
	AltHosts() []string

	// ShardByTransferID
	// TODO(@timmyb32r) - is it meaningful?) highly likely something wrong with this option.
	// see: TM-2060 - it's for sharded pg
	// it's close to TM-2517, but we need to add sharding-by-src-shard on ch-sink
	// after TM-2517 we can remove this field & describe best-practice of dealing with sharded data
	ShardByTransferID() bool // another sharding option. TODO - why it's location differs form ShardCol
	ShardByRoundRobin() bool

	MakeChildServerParams(hosts string) ChSinkServerParams
}

type ChSinkClusterParamsWrapper

type ChSinkClusterParamsWrapper struct {
	Model *ChSinkClusterParams
}

type ChSinkMigrationOptions

type ChSinkMigrationOptions struct {
	// AddNewColumns
	// automatically alter table to add new columns
	AddNewColumns bool
}

type ChSinkParams

type ChSinkParams interface {
	ChSinkShardParams
	// Rotation
	// TODO - I think we don't need this (bcs of TTL in schema), and if need - we can make it by some universal mechanism
	Rotation() *server.RotatorConfig

	Shards() map[string][]string // shardName->[host]. It's used in sink.go to slice on shards

	// ColumnToShardIndex returns a user-provided exact mapping of shard key to shard name
	ColumnToShardName() map[string]string

	MakeChildShardParams(altHosts []string) ChSinkShardParams
	SetShards(shards map[string][]string)
}

type ChSinkParamsWrapper

type ChSinkParamsWrapper struct {
	Model *ChSinkParams
}

type ChSinkServerParams

type ChSinkServerParams interface {
	MdbClusterID() string
	ChClusterName() string
	User() string
	Password() string
	ResolvePassword() (string, error)
	Database() string
	// Partition
	// string, substitutes after 'PARTITION BY' in ddl. Field absent in UI.
	// 'ddl += fmt.Sprintf(" PARTITION BY (%v)", t.config.Partition)'
	Partition() string
	// Host
	// filled by SinkCluster for SinkServer.
	// the only field, which is absent in the model.
	Host() *string
	PemFileContent() string
	SSLEnabled() bool
	HTTPPort() int
	NativePort() int
	// TTL
	// string, substitutes after 'TTL' in ddl. Field absent in UI. Nobody used.
	// example: '_timestamp + INTERVAL 18 MONTH'
	TTL() string
	// IsUpdateable
	// automatically derived from transfer options.
	// Updateable - data-transfer term, means the table satisfies two conditions:
	//     1) ReplacingMergeTree engine family
	//     2) table contains data-transfer system columns: '__data_transfer_commit_time', '__data_transfer_delete_time'
	IsUpdateable() bool

	// UpsertAbsentToastedRows When batch push fails on TOAST, interpret as sequential independent upserts.
	// Useful in cases:
	//  1. YDB Source with 'Updates' changefeed mode
	//  2. Any IncrementOnly transfer in ClickHouse which can bring update for inexistent document (for instance PG->CH)
	UpsertAbsentToastedRows() bool
	InferSchema() bool // If table exists - get it schema
	// MigrationOptions
	// Sink table modification settings
	MigrationOptions() ChSinkMigrationOptions
	// UploadAsJSON enables JSON format upload. See CH destination model for details.
	UploadAsJSON() bool
	// AnyAsString
	// it's used only when UploadAsJSON=true.
	// for non-date/time & string types - when true, made one more json.Marshal. Why?
	AnyAsString() bool
	// SystemColumnsFirst
	// it seems we can derive it - just like we derive 'IsUpdateable' flag.
	// furthermore - we can get rid of 'system' columns term - just merge it with 'key' columns
	SystemColumnsFirst() bool
	Cleanup() server.CleanupType
	RootCertPaths() []string
	InsertSettings() InsertParams
}

type ChSinkServerParamsWrapper

type ChSinkServerParamsWrapper struct {
	Model *ChSinkServerParams
}

type ChSinkShardParams

type ChSinkShardParams interface {
	ChSinkClusterParams
	// RetryCount
	// amount of retries in sinkShard::upload - very, very bad design of this part. TODO - remove this ugly stuff
	RetryCount() int
	// UseSchemaInTableName
	// add schema to tableName. TODO - replace it by universal transformer
	UseSchemaInTableName() bool
	// ShardCol
	// column_name, which is used for sharding
	// Meaningful only for queue-sources! bcs there are one 'column' and big amount of data.
	// For replication-dst we automatically turning-off sharding!
	ShardCol() string
	// Interval returns the minimum interval between two subsequent Pushes
	Interval() time.Duration
	// Tables
	// it's 'AltNames'. TODO - replace it by universal transformer
	Tables() map[string]string
}

type ChSinkShardParamsWrapper

type ChSinkShardParamsWrapper struct {
	Model *ChSinkShardParams
}

type ChSource

type ChSource struct {
	MdbClusterID     string `json:"ClusterID"`
	ChClusterName    string // CH cluster from which data will be transfered. Other clusters would be ignored.
	ShardsList       []ClickHouseShard
	HTTPPort         int
	NativePort       int
	User             string
	Password         server.SecretString
	SSLEnabled       bool
	PemFileContent   string
	Database         string
	SubNetworkID     string
	SecurityGroupIDs []string
	IncludeTables    []string
	ExcludeTables    []string
	IsHomo           bool
	BufferSize       uint64
	IOHomoFormat     ClickhouseIOFormat // one of - https://clickhouse.com/docs/en/interfaces/formats
	RootCACertPaths  []string
}

func (*ChSource) AllIncludes

func (s *ChSource) AllIncludes() []string

func (*ChSource) ClusterID

func (s *ChSource) ClusterID() string

func (*ChSource) FulfilledIncludes

func (s *ChSource) FulfilledIncludes(tID abstract.TableID) (result []string)

func (*ChSource) GetProviderType

func (s *ChSource) GetProviderType() abstract.ProviderType

func (*ChSource) Include

func (s *ChSource) Include(tID abstract.TableID) bool

func (*ChSource) IsAbstract2

func (s *ChSource) IsAbstract2(dst server.Destination) bool

func (*ChSource) IsIncremental

func (*ChSource) IsIncremental()

func (*ChSource) IsSource

func (*ChSource) IsSource()

func (*ChSource) MDBClusterID

func (s *ChSource) MDBClusterID() string

func (*ChSource) SupportsStartCursorValue

func (*ChSource) SupportsStartCursorValue() bool

func (*ChSource) ToSinkParams

func (s *ChSource) ToSinkParams() ChSourceWrapper

func (*ChSource) ToStorageParams

func (s *ChSource) ToStorageParams() *ChStorageParams

func (*ChSource) Validate

func (s *ChSource) Validate() error

func (*ChSource) WithDefaults

func (s *ChSource) WithDefaults()

type ChSourceWrapper

type ChSourceWrapper struct {
	Model *ChSource
	// contains filtered or unexported fields
}

func (ChSourceWrapper) AltHosts

func (s ChSourceWrapper) AltHosts() []string

func (ChSourceWrapper) AnyAsString

func (s ChSourceWrapper) AnyAsString() bool

func (ChSourceWrapper) BufferTriggingSize

func (s ChSourceWrapper) BufferTriggingSize() uint64

func (ChSourceWrapper) ChClusterName

func (s ChSourceWrapper) ChClusterName() string

func (ChSourceWrapper) Cleanup

func (s ChSourceWrapper) Cleanup() server.CleanupType

func (ChSourceWrapper) ColumnToShardName

func (s ChSourceWrapper) ColumnToShardName() map[string]string

func (ChSourceWrapper) Database

func (s ChSourceWrapper) Database() string

func (ChSourceWrapper) HTTPPort

func (s ChSourceWrapper) HTTPPort() int

func (ChSourceWrapper) Host

func (s ChSourceWrapper) Host() *string

func (ChSourceWrapper) InferSchema

func (s ChSourceWrapper) InferSchema() bool

func (ChSourceWrapper) InsertSettings

func (s ChSourceWrapper) InsertSettings() InsertParams

func (ChSourceWrapper) Interval

func (s ChSourceWrapper) Interval() time.Duration

func (ChSourceWrapper) IsUpdateable

func (s ChSourceWrapper) IsUpdateable() bool

func (ChSourceWrapper) MakeChildServerParams

func (s ChSourceWrapper) MakeChildServerParams(host string) ChSinkServerParams

func (ChSourceWrapper) MakeChildShardParams

func (s ChSourceWrapper) MakeChildShardParams(altHosts []string) ChSinkShardParams

func (ChSourceWrapper) MdbClusterID

func (s ChSourceWrapper) MdbClusterID() string

func (ChSourceWrapper) MigrationOptions

func (s ChSourceWrapper) MigrationOptions() ChSinkMigrationOptions

func (ChSourceWrapper) NativePort

func (s ChSourceWrapper) NativePort() int

func (ChSourceWrapper) Partition

func (s ChSourceWrapper) Partition() string

func (ChSourceWrapper) Password

func (s ChSourceWrapper) Password() string

func (ChSourceWrapper) PemFileContent

func (s ChSourceWrapper) PemFileContent() string

func (ChSourceWrapper) ResolvePassword

func (s ChSourceWrapper) ResolvePassword() (string, error)

func (ChSourceWrapper) RetryCount

func (s ChSourceWrapper) RetryCount() int

func (ChSourceWrapper) RootCertPaths

func (s ChSourceWrapper) RootCertPaths() []string

func (ChSourceWrapper) Rotation

func (s ChSourceWrapper) Rotation() *server.RotatorConfig

func (ChSourceWrapper) SSLEnabled

func (s ChSourceWrapper) SSLEnabled() bool

func (ChSourceWrapper) SetShards

func (s ChSourceWrapper) SetShards(shards map[string][]string)

func (ChSourceWrapper) ShardByRoundRobin

func (s ChSourceWrapper) ShardByRoundRobin() bool

func (ChSourceWrapper) ShardByTransferID

func (s ChSourceWrapper) ShardByTransferID() bool

func (ChSourceWrapper) ShardCol

func (s ChSourceWrapper) ShardCol() string

func (ChSourceWrapper) Shards

func (s ChSourceWrapper) Shards() map[string][]string

func (ChSourceWrapper) SystemColumnsFirst

func (s ChSourceWrapper) SystemColumnsFirst() bool

func (ChSourceWrapper) TTL

func (s ChSourceWrapper) TTL() string

func (ChSourceWrapper) Tables

func (s ChSourceWrapper) Tables() map[string]string

func (ChSourceWrapper) UploadAsJSON

func (s ChSourceWrapper) UploadAsJSON() bool

func (ChSourceWrapper) UpsertAbsentToastedRows

func (s ChSourceWrapper) UpsertAbsentToastedRows() bool

func (ChSourceWrapper) UseSchemaInTableName

func (s ChSourceWrapper) UseSchemaInTableName() bool

func (ChSourceWrapper) User

func (s ChSourceWrapper) User() string

type ChStorageParams

type ChStorageParams struct {
	MdbClusterID   string
	Hosts          []string
	ChClusterName  string
	NativePort     int
	Secure         bool
	PemFileContent string
	Database       string
	User           string
	Password       string
	BufferSize     uint64
	HTTPPort       int
	Shards         map[string][]string
	IOHomoFormat   ClickhouseIOFormat // one of - https://clickhouse.com/docs/en/interfaces/formats
}

func (*ChStorageParams) IsManaged

func (c *ChStorageParams) IsManaged() bool

func (*ChStorageParams) ToConnParams

func (c *ChStorageParams) ToConnParams() connConfigWrapper

type ClickHouseColumnValueToShardName

type ClickHouseColumnValueToShardName struct {
	ColumnValue string
	ShardName   string
}

type ClickHouseShard

type ClickHouseShard struct {
	Name  string
	Hosts []string
}

type ClickhouseIOFormat

type ClickhouseIOFormat string

type InsertParams

type InsertParams struct {
	MaterializedViewsIgnoreErrors bool
}

func (InsertParams) AsQueryPart

func (p InsertParams) AsQueryPart() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL