Documentation ¶
Index ¶
- Constants
- func ConvertArrayToString(array []string) (string, error)
- func ConvertStringToArray(jsonString string) ([]string, error)
- func InferChunkSize(storage abstract.SampleableStorage, tableID abstract.TableID, ...) (uint64, error)
- func MakeNextWhereStatement(primaryKey, lowBound []string) abstract.WhereStatement
- func MakeSQLTuple(stringArray []string) string
- func PKeysToStringArr(item *abstract.ChangeItem, primaryKey []string, converter ChangeItemConverter) ([]string, error)
- func ResolveChunkMapFromArr(items []abstract.ChangeItem, primaryKey []string, ...) (map[string]abstract.ChangeItem, error)
- func ResolvePrimaryKeyColumns(ctx context.Context, storage abstract.Storage, tableID abstract.TableID, ...) ([]string, error)
- type ChangeItemConverter
- type IncrementalAsyncSink
- type IncrementalIterator
- type MockSignalTable
- type SignalTable
- type WatermarkType
Constants ¶
View Source
const ( LowWatermarkType = "L" HighWatermarkType = "H" SuccessWatermarkType = "S" BadWatermarkType = "B" )
View Source
const ( FallbackChunkSize = uint64(100_000) DefaultChunkSizeInBytes = uint64(10_000_000) AlwaysTrueWhereStatement = abstract.WhereStatement("1 = 1") )
Variables ¶
This section is empty.
Functions ¶
func ConvertArrayToString ¶
func ConvertStringToArray ¶
func InferChunkSize ¶
func MakeNextWhereStatement ¶
func MakeNextWhereStatement(primaryKey, lowBound []string) abstract.WhereStatement
func MakeSQLTuple ¶
func PKeysToStringArr ¶
func PKeysToStringArr(item *abstract.ChangeItem, primaryKey []string, converter ChangeItemConverter) ([]string, error)
func ResolveChunkMapFromArr ¶
func ResolveChunkMapFromArr(items []abstract.ChangeItem, primaryKey []string, converter ChangeItemConverter) (map[string]abstract.ChangeItem, error)
Types ¶
type ChangeItemConverter ¶
type IncrementalAsyncSink ¶
type IncrementalAsyncSink struct {
// contains filtered or unexported fields
}
func NewIncrementalAsyncSink ¶
func NewIncrementalAsyncSink( ctx context.Context, logger log.Logger, signalTable SignalTable, table abstract.TableID, tableIterator *IncrementalIterator, primaryKey []string, chunk map[string]abstract.ChangeItem, itemConverter ChangeItemConverter, stopCallback func(), outputPusher abstract.Pusher, ) *IncrementalAsyncSink
func (*IncrementalAsyncSink) AsyncPush ¶
func (s *IncrementalAsyncSink) AsyncPush(items []abstract.ChangeItem) chan error
func (*IncrementalAsyncSink) Close ¶
func (s *IncrementalAsyncSink) Close() error
type IncrementalIterator ¶
type IncrementalIterator struct { LowWatermarkUUID uuid.UUID HighWatermarkUUID uuid.UUID // contains filtered or unexported fields }
func NewIncrementalIterator ¶
func NewIncrementalIterator( logger log.Logger, storage tablequery.StorageTableQueryable, tableQuery *tablequery.TableQuery, signalTable SignalTable, itemConverter ChangeItemConverter, pkColNames []string, lowBound []string, limit uint64, betweenMarksOpts ...func(), ) (*IncrementalIterator, error)
func (*IncrementalIterator) Next ¶
func (i *IncrementalIterator) Next(ctx context.Context) ([]abstract.ChangeItem, error)
type MockSignalTable ¶
type MockSignalTable struct { CreateWatermarkF func(ctx context.Context, tableID abstract.TableID, watermarkType WatermarkType, lowBound []string) (uuid.UUID, error) IsWatermarkF func(item *abstract.ChangeItem, tableID abstract.TableID, mark uuid.UUID) (bool, WatermarkType) }
func NewMockSignalTable ¶
func NewMockSignalTable() *MockSignalTable
func (*MockSignalTable) CreateWatermark ¶
func (m *MockSignalTable) CreateWatermark(ctx context.Context, tableID abstract.TableID, watermarkType WatermarkType, lowBound []string) (uuid.UUID, error)
func (*MockSignalTable) IsWatermark ¶
func (m *MockSignalTable) IsWatermark(item *abstract.ChangeItem, tableID abstract.TableID, mark uuid.UUID) (bool, WatermarkType)
type SignalTable ¶
type SignalTable interface { CreateWatermark(ctx context.Context, tableID abstract.TableID, watermarkType WatermarkType, lowBound []string) (uuid.UUID, error) IsWatermark(item *abstract.ChangeItem, tableID abstract.TableID, markUUID uuid.UUID) (bool, WatermarkType) }
The SignalTable is used to create watermarks in the wal, and check whether the read item is a watermark
It is assumed that the low_watermark will be placed before reading data from the table, and the high_watermark after reading data from the table
When reading the wal, we have 2 types of watermarks needed to check in wal (low and high):
- low - means that we are now just before the moment, when we start reading from the database
- high - means that we have completed reading
Also we have 2 types needed to resolve additional problem (success watermark and bad watermark):
- success - needed to save the last successfully transferred chunk
- bad - it is necessary to mark an invalid watermark type
type WatermarkType ¶
type WatermarkType string
Source Files ¶
Click to show internal directories.
Click to hide internal directories.