clickhouse

package
v0.0.0-rc9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 12, 2024 License: Apache-2.0 Imports: 72 Imported by: 0

Documentation

Overview

Package ch cluster - it's like stand-alone cluster with multimaster []*SinkServer - masters (AltHosts). We don't care in which SinkServer we are writing - it's like multimaster. We choose alive masters (by bestSinkServer()) and then round-robin them

Package ch

SinkServer - it's like master (in multi-master system) destination

Index

Constants

View Source
const (
	CredentialsCheckType = abstract.CheckType("credentials")
	ConnectivityNative   = abstract.CheckType("connection-native")
	ConnectivityHTTP     = abstract.CheckType("connection-http")
)
View Source
const ProviderType = abstract.ProviderType("ch")

Variables

View Source
var ClickHouseSinkClosedErr = xerrors.New("ClickHouse sink has already been closed")

Functions

func ColumnDefinitions

func ColumnDefinitions(cols []abstract.ColSchema) (result []string, keyIsNullable bool)

func ColumnShouldBeSelected

func ColumnShouldBeSelected(col abstract.ColSchema, isHomo bool) bool

func InitValuesForScan

func InitValuesForScan(rows *sql.Rows) ([]interface{}, error)

func IsColVirtual

func IsColVirtual(colSchema abstract.ColSchema) bool

func MakeAltNames

func MakeAltNames(config model.ChSinkShardParams) map[string]string

func MakeConnection

func MakeConnection(cfg *model.ChStorageParams) (*sql.DB, error)

func MarshalFields

func MarshalFields(vals []interface{}, schema abstract.TableColumns) ([]interface{}, uint64)

func New

func NewClickhouseProvider

func NewClickhouseProvider(logger log.Logger, registry metrics.Registry, config *model.ChSource, transfer *dp_model.Transfer) (base.SnapshotProvider, error)

func NewSink

func NewSink(transfer *dp_model.Transfer, logger log.Logger, metrics metrics.Registry, runtime abstract.Runtime, middlewaresConfig middlewares.Config) (abstract.Sinker, error)

func NewSourcesChain

func NewSourcesChain(logger log.Logger, sources ...base.EventSource) base.ProgressableEventSource

Types

type BufWithPos

type BufWithPos struct {
	bytes.Buffer
	// contains filtered or unexported fields
}

func NewBufWithPos

func NewBufWithPos() *BufWithPos

func (*BufWithPos) BufFromRememberedPos

func (b *BufWithPos) BufFromRememberedPos() []byte

func (*BufWithPos) RememberPos

func (b *BufWithPos) RememberPos()

type ClickhouseStorage

type ClickhouseStorage interface {
	abstract.SampleableStorage
	LoadTablesDDL(tables []abstract.TableID) ([]schema.TableDDL, error)
	BuildTableQuery(table abstract.TableDescription) (*abstract.TableSchema, string, string, error)
	GetRowsCount(tableID abstract.TableID) (uint64, error)
	TableParts(ctx context.Context, table abstract.TableID) ([]TablePart, error)
	Version() semver.Version
}

func NewShardedFromUrls

func NewShardedFromUrls(shardUrls map[string][]string, config *model.ChStorageParams, transfer *dp_model.Transfer, opts ...StorageOpt) (ClickhouseStorage, error)

func NewShardedStorage

func NewShardedStorage(shards map[string]*Storage) ClickhouseStorage

func NewStorage

func NewStorage(config *model.ChStorageParams, transfer *dp_model.Transfer, opts ...StorageOpt) (ClickhouseStorage, error)

type ClusterTables

type ClusterTables struct {
	// contains filtered or unexported fields
}

func NewClusterTables

func NewClusterTables(storage ClickhouseStorage, config *model.ChSource, inputFilter base.DataObjectFilter) (*ClusterTables, error)

func (*ClusterTables) AddTable

func (s *ClusterTables) AddTable(tid abstract.TableID) error

func (*ClusterTables) AddTableDescription

func (s *ClusterTables) AddTableDescription(desc abstract.TableDescription) error

func (*ClusterTables) Close

func (s *ClusterTables) Close()

func (*ClusterTables) Err

func (s *ClusterTables) Err() error

func (*ClusterTables) Next

func (s *ClusterTables) Next() bool

func (*ClusterTables) Object

func (s *ClusterTables) Object() (base.DataObject, error)

func (*ClusterTables) ParsePartKey

func (s *ClusterTables) ParsePartKey(data string) (*abstract.TableID, error)

func (*ClusterTables) ToOldTableMap

func (s *ClusterTables) ToOldTableMap() (abstract.TableMap, error)

type DataProvider

type DataProvider struct {
	// contains filtered or unexported fields
}

func (*DataProvider) BeginSnapshot

func (c *DataProvider) BeginSnapshot() error

func (*DataProvider) Close

func (c *DataProvider) Close() error

func (*DataProvider) CreateSnapshotSource

func (c *DataProvider) CreateSnapshotSource(part base.DataObjectPart) (base.ProgressableEventSource, error)

func (*DataProvider) DataObjects

func (c *DataProvider) DataObjects(filter base.DataObjectFilter) (base.DataObjects, error)

func (*DataProvider) DataObjectsToTableParts

func (c *DataProvider) DataObjectsToTableParts(filter base.DataObjectFilter) ([]abstract.TableDescription, error)

func (*DataProvider) EndSnapshot

func (c *DataProvider) EndSnapshot() error

func (*DataProvider) Init

func (c *DataProvider) Init() error

func (*DataProvider) Ping

func (c *DataProvider) Ping() error

func (*DataProvider) ResolveOldTableDescriptionToDataPart

func (c *DataProvider) ResolveOldTableDescriptionToDataPart(tableDesc abstract.TableDescription) (base.DataObjectPart, error)

func (*DataProvider) TablePartToDataObjectPart

func (c *DataProvider) TablePartToDataObjectPart(tableDescription *abstract.TableDescription) (base.DataObjectPart, error)

func (*DataProvider) TableSchema

func (c *DataProvider) TableSchema(part base.DataObjectPart) (*abstract.TableSchema, error)

type HTTPEventsBatch

type HTTPEventsBatch struct {
	Data []byte
	Cols *abstract.TableSchema

	Part     *TablePartA2
	ColNames []string

	Format    model.ClickhouseIOFormat
	RowCount  int
	SizeBytes int
	// contains filtered or unexported fields
}

func NewHTTPEventsBatch

func NewHTTPEventsBatch(part *TablePartA2, data []byte, cols *abstract.TableSchema, readerStart time.Time, format model.ClickhouseIOFormat, count int, size int) *HTTPEventsBatch

func NewJSONCompactBatch

func NewJSONCompactBatch(part *TablePartA2, data []byte, cols *abstract.TableSchema, readerStart time.Time, count int, size int) *HTTPEventsBatch

func (*HTTPEventsBatch) ColumnNames

func (b *HTTPEventsBatch) ColumnNames() []string

func (*HTTPEventsBatch) Count

func (b *HTTPEventsBatch) Count() int

func (*HTTPEventsBatch) Event

func (b *HTTPEventsBatch) Event() (base.Event, error)

func (*HTTPEventsBatch) Next

func (b *HTTPEventsBatch) Next() bool

func (*HTTPEventsBatch) Size

func (b *HTTPEventsBatch) Size() int

type HTTPSource

type HTTPSource struct {
	// contains filtered or unexported fields
}

func NewHTTPSource

func NewHTTPSource(
	logger log.Logger,
	query string,
	countQuery string,
	cols *abstract.TableSchema,
	hosts []string,
	config *model.ChSource,
	part *TablePartA2,
	sourceStats *stats.SourceStats,
) (*HTTPSource, error)

func NewHTTPSourceImpl

func NewHTTPSourceImpl(
	logger log.Logger,
	query string,
	countQuery string,
	cols *abstract.TableSchema,
	hosts []string,
	config *model.ChSource,
	part *TablePartA2,
	sourceStats *stats.SourceStats,
	client httpclient.HTTPClient,
) *HTTPSource

func (*HTTPSource) IOFormat

func (s *HTTPSource) IOFormat() model.ClickhouseIOFormat

func (*HTTPSource) Progress

func (s *HTTPSource) Progress() (base.EventSourceProgress, error)

func (*HTTPSource) Running

func (s *HTTPSource) Running() bool

func (*HTTPSource) Start

func (s *HTTPSource) Start(ctx context.Context, target base.EventTarget) error

func (*HTTPSource) Stop

func (s *HTTPSource) Stop() error

type HTTPTarget

type HTTPTarget struct {
	// contains filtered or unexported fields
}

func NewHTTPTarget

func NewHTTPTarget(transfer *dp_model.Transfer, mtrc metrics.Registry, logger log.Logger) (*HTTPTarget, error)

func (*HTTPTarget) AsyncPush

func (c *HTTPTarget) AsyncPush(input base.EventBatch) chan error

func (*HTTPTarget) Close

func (c *HTTPTarget) Close() error

func (*HTTPTarget) HostByPart

func (c *HTTPTarget) HostByPart(part *TablePartA2) string

type Provider

type Provider struct {
	// contains filtered or unexported fields
}

func (*Provider) Activate

func (*Provider) AsyncSink

func (p *Provider) AsyncSink(middleware abstract.Middleware) (abstract.AsyncSink, error)

func (*Provider) DataProvider

func (p *Provider) DataProvider() (base.DataProvider, error)

func (*Provider) Sink

func (p *Provider) Sink(config middlewares.Config) (abstract.Sinker, error)

func (*Provider) Storage

func (p *Provider) Storage() (abstract.Storage, error)

func (*Provider) Target

func (p *Provider) Target(options ...abstract.SinkOption) (base.EventTarget, error)

func (*Provider) Test

func (p *Provider) Test(ctx context.Context) *abstract.TestResult

func (*Provider) TestChecks

func (p *Provider) TestChecks() []abstract.CheckType

func (*Provider) Type

func (p *Provider) Type() abstract.ProviderType

type Schema

type Schema struct {
	// contains filtered or unexported fields
}

func NewSchema

func NewSchema(cols []abstract.ColSchema, systemColumnsFirst bool, name string) *Schema

type ShardStorage

type ShardStorage struct {
	// contains filtered or unexported fields
}

func (ShardStorage) BuildTableQuery

func (ShardStorage) Close

func (s ShardStorage) Close()

func (ShardStorage) EstimateTableRowsCount

func (s ShardStorage) EstimateTableRowsCount(table abstract.TableID) (uint64, error)

func (ShardStorage) ExactTableRowsCount

func (s ShardStorage) ExactTableRowsCount(table abstract.TableID) (uint64, error)

func (*ShardStorage) GetIncrementalState

func (s *ShardStorage) GetIncrementalState(ctx context.Context, incremental []abstract.IncrementalTable) ([]abstract.TableDescription, error)

func (ShardStorage) GetRowsCount

func (s ShardStorage) GetRowsCount(tableID abstract.TableID) (uint64, error)

func (ShardStorage) LoadRandomSample

func (s ShardStorage) LoadRandomSample(table abstract.TableDescription, pusher abstract.Pusher) error

func (ShardStorage) LoadSampleBySet

func (s ShardStorage) LoadSampleBySet(table abstract.TableDescription, keySet []map[string]interface{}, pusher abstract.Pusher) error

func (ShardStorage) LoadTable

func (s ShardStorage) LoadTable(ctx context.Context, table abstract.TableDescription, pusher abstract.Pusher) error

func (ShardStorage) LoadTablesDDL

func (s ShardStorage) LoadTablesDDL(tables []abstract.TableID) ([]schema.TableDDL, error)

func (ShardStorage) LoadTopBottomSample

func (s ShardStorage) LoadTopBottomSample(table abstract.TableDescription, pusher abstract.Pusher) error

func (ShardStorage) Ping

func (s ShardStorage) Ping() error

func (*ShardStorage) SetInitialState

func (s *ShardStorage) SetInitialState(tables []abstract.TableDescription, incremental []abstract.IncrementalTable)

func (ShardStorage) TableAccessible

func (s ShardStorage) TableAccessible(table abstract.TableDescription) bool

func (ShardStorage) TableExists

func (s ShardStorage) TableExists(table abstract.TableID) (bool, error)

func (ShardStorage) TableList

func (*ShardStorage) TableParts

func (s *ShardStorage) TableParts(ctx context.Context, table abstract.TableID) ([]TablePart, error)

func (*ShardStorage) TableSchema

func (s *ShardStorage) TableSchema(ctx context.Context, table abstract.TableID) (*abstract.TableSchema, error)

func (ShardStorage) TableSizeInBytes

func (s ShardStorage) TableSizeInBytes(table abstract.TableID) (uint64, error)

func (*ShardStorage) Version

func (s *ShardStorage) Version() semver.Version

type SinkServer

type SinkServer struct {
	// contains filtered or unexported fields
}

func NewSinkServer

func NewSinkServer(cfg model.ChSinkServerParams, lgr log.Logger, metrics *stats.ChStats, cluster *sinkCluster) (*SinkServer, error)

func NewSinkServerImpl

func NewSinkServerImpl(cfg model.ChSinkServerParams, lgr log.Logger, metrics *stats.ChStats, cluster *sinkCluster) (*SinkServer, error)

func (*SinkServer) Alive

func (s *SinkServer) Alive() bool

func (*SinkServer) CleanupPartitions

func (s *SinkServer) CleanupPartitions(keepParts int, table string) error

func (*SinkServer) Close

func (s *SinkServer) Close() error

func (*SinkServer) DropTable

func (s *SinkServer) DropTable(ctx context.Context, tableName string, onCluster bool) error

func (*SinkServer) ExecDDL

func (s *SinkServer) ExecDDL(ctx context.Context, ddl string) error

func (*SinkServer) GetTable

func (s *SinkServer) GetTable(table string, schema *abstract.TableSchema) (*sinkTable, error)

func (*SinkServer) Insert

func (s *SinkServer) Insert(spec *TableSpec, rows []abstract.ChangeItem) error

func (*SinkServer) QuerySingleValue

func (s *SinkServer) QuerySingleValue(query string, target interface{}) error

func (*SinkServer) RunGoroutines

func (s *SinkServer) RunGoroutines()

func (*SinkServer) TestSetCallbackOnPing

func (s *SinkServer) TestSetCallbackOnPing(onPing *SinkServerCallbacks)

func (*SinkServer) TruncateTable

func (s *SinkServer) TruncateTable(ctx context.Context, tableName string, onCluster bool) error

type SinkServerCallbacks

type SinkServerCallbacks struct {
	OnPing func(sinkServer *SinkServer)
}

type SourcesChain

type SourcesChain struct {
	// contains filtered or unexported fields
}

func (*SourcesChain) Progress

func (p *SourcesChain) Progress() (base.EventSourceProgress, error)

func (*SourcesChain) Running

func (p *SourcesChain) Running() bool

func (*SourcesChain) Start

func (p *SourcesChain) Start(ctx context.Context, target base.EventTarget) error

func (*SourcesChain) Stop

func (p *SourcesChain) Stop() error

type Storage

type Storage struct {
	IsHomo bool
	// contains filtered or unexported fields
}

func WithOpts

func WithOpts(storage *Storage, opts ...StorageOpt) *Storage

func (*Storage) BuildTableQuery

func (s *Storage) BuildTableQuery(table abstract.TableDescription) (*abstract.TableSchema, string, string, error)

func (*Storage) Close

func (s *Storage) Close()

func (*Storage) CopySchema

func (s *Storage) CopySchema(tables abstract.TableMap, pusher abstract.Pusher) error

func (*Storage) EstimateTableRowsCount

func (s *Storage) EstimateTableRowsCount(table abstract.TableID) (uint64, error)

func (*Storage) ExactTableRowsCount

func (s *Storage) ExactTableRowsCount(table abstract.TableID) (uint64, error)

func (*Storage) GetIncrementalState

func (s *Storage) GetIncrementalState(ctx context.Context, incremental []abstract.IncrementalTable) ([]abstract.TableDescription, error)

func (*Storage) GetRowsCount

func (s *Storage) GetRowsCount(tableID abstract.TableID) (uint64, error)

func (*Storage) LoadRandomSample

func (s *Storage) LoadRandomSample(table abstract.TableDescription, pusher abstract.Pusher) error

func (*Storage) LoadSampleBySet

func (s *Storage) LoadSampleBySet(table abstract.TableDescription, keySet []map[string]interface{}, pusher abstract.Pusher) error

func (*Storage) LoadSchema

func (s *Storage) LoadSchema() (dbSchema abstract.DBSchema, err error)

func (*Storage) LoadTable

func (s *Storage) LoadTable(ctx context.Context, table abstract.TableDescription, pusher abstract.Pusher) error

func (*Storage) LoadTablesDDL

func (s *Storage) LoadTablesDDL(tables []abstract.TableID) ([]schema.TableDDL, error)

func (*Storage) LoadTopBottomSample

func (s *Storage) LoadTopBottomSample(table abstract.TableDescription, pusher abstract.Pusher) error

func (*Storage) Ping

func (s *Storage) Ping() error

func (*Storage) SetInitialState

func (s *Storage) SetInitialState(tables []abstract.TableDescription, incrementalTables []abstract.IncrementalTable)

func (*Storage) TableAccessible

func (s *Storage) TableAccessible(table abstract.TableDescription) bool

func (*Storage) TableExists

func (s *Storage) TableExists(table abstract.TableID) (bool, error)

func (*Storage) TableList

func (s *Storage) TableList(includeTableFilter abstract.IncludeTableList) (abstract.TableMap, error)

func (*Storage) TableParts

func (s *Storage) TableParts(ctx context.Context, table abstract.TableID) ([]TablePart, error)

func (*Storage) TableSchema

func (s *Storage) TableSchema(ctx context.Context, table abstract.TableID) (*abstract.TableSchema, error)

func (*Storage) TableSizeInBytes

func (s *Storage) TableSizeInBytes(table abstract.TableID) (uint64, error)

func (*Storage) Version

func (s *Storage) Version() semver.Version

type StorageOpt

type StorageOpt func(storage *Storage) *Storage

func WithHomo

func WithHomo() StorageOpt

func WithMetrics

func WithMetrics(registry metrics.Registry) StorageOpt

func WithShardName

func WithShardName(shardName string) StorageOpt

func WithTableFilter

func WithTableFilter(f abstract.Includeable) StorageOpt

type Table

type Table struct {
	// contains filtered or unexported fields
}

func NewTable

func NewTable(tableID abstract.TableID, parts []TablePart, shards map[string][]string, rowFilter abstract.WhereStatement) *Table

func (*Table) Close

func (s *Table) Close()

func (*Table) Err

func (s *Table) Err() error

func (*Table) FullName

func (s *Table) FullName() string

func (*Table) Name

func (s *Table) Name() string

func (*Table) Next

func (s *Table) Next() bool

func (*Table) Part

func (s *Table) Part() (base.DataObjectPart, error)

func (*Table) ToOldTableID

func (s *Table) ToOldTableID() (*abstract.TableID, error)

type TablePart

type TablePart struct {
	Table      abstract.TableID
	Name       string // clickhouse `_partiotion_id` virtual columnt
	Rows       int64
	Bytes      int64
	Shard      string
	ShardCount int
}

func (TablePart) Key

func (p TablePart) Key() string

type TablePartA2

type TablePartA2 struct {
	Shard      string
	TableID    abstract.TableID
	ShardNum   int
	ShardCount int
	Part       TablePart
	RowFilter  abstract.WhereStatement
}

func NewTablePart

func NewTablePart(shardCount int, shardNum int, shardName string, tID abstract.TableID, part TablePart, rowFilter abstract.WhereStatement) *TablePartA2

func (*TablePartA2) FullName

func (t *TablePartA2) FullName() string

func (*TablePartA2) Name

func (t *TablePartA2) Name() string

func (*TablePartA2) ToOldTableDescription

func (t *TablePartA2) ToOldTableDescription() (*abstract.TableDescription, error)

func (*TablePartA2) ToTablePart

func (t *TablePartA2) ToTablePart() (*abstract.TableDescription, error)

type TableSpec

type TableSpec struct {
	Name   string
	Schema *abstract.TableSchema
}

Directories

Path Synopsis
dao
Code generated by MockGen.
Code generated by MockGen.
tests

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL