Documentation ¶
Overview ¶
Copyright 2022 Google LLC
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2022 Google LLC ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Package dynamodb handles schema and data migrations from DynamoDB.
Index ¶
- Constants
- func NewDynamoDBStream(client dynamodbiface.DynamoDBAPI, srcTable string) (string, error)
- func ProcessDataRow(m map[string]*dynamodb.AttributeValue, conv *internal.Conv, tableId string, ...)
- func ProcessRecord(conv *internal.Conv, streamInfo *StreamingInfo, record *dynamodbstreams.Record, ...)
- func ProcessShard(wgShard *sync.WaitGroup, streamInfo *StreamingInfo, conv *internal.Conv, ...)
- func ProcessStream(wgStream *sync.WaitGroup, streamClient dynamodbstreamsiface.DynamoDBStreamsAPI, ...)
- type InfoSchemaImpl
- func (isi InfoSchemaImpl) GetColumns(conv *internal.Conv, table common.SchemaAndName, ...) (map[string]schema.Column, []string, error)
- func (isi InfoSchemaImpl) GetConstraints(conv *internal.Conv, table common.SchemaAndName) (primaryKeys []string, constraints map[string][]string, err error)
- func (isi InfoSchemaImpl) GetForeignKeys(conv *internal.Conv, table common.SchemaAndName) (foreignKeys []schema.ForeignKey, err error)
- func (isi InfoSchemaImpl) GetIndexes(conv *internal.Conv, table common.SchemaAndName, ...) (indexes []schema.Index, err error)
- func (isi InfoSchemaImpl) GetRowCount(table common.SchemaAndName) (int64, error)
- func (isi InfoSchemaImpl) GetRowsFromTable(conv *internal.Conv, srcTable string) (interface{}, error)
- func (isi InfoSchemaImpl) GetTableName(schema string, tableName string) string
- func (isi InfoSchemaImpl) GetTables() ([]common.SchemaAndName, error)
- func (isi InfoSchemaImpl) GetToDdl() common.ToDdl
- func (isi InfoSchemaImpl) ProcessData(conv *internal.Conv, tableId string, srcSchema schema.Table, colIds []string, ...) error
- func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *internal.Conv) (map[string]interface{}, error)
- func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, ...) (internal.DataflowOutput, error)
- type StreamingInfo
- func (info *StreamingInfo) CollectBadRecord(recordType, srcTable string, srcCols []string, vals []string)
- func (info *StreamingInfo) CollectDroppedRecord(recordType, spTable string, spCols []string, spVals []interface{}, err error)
- func (info *StreamingInfo) SetShardStatus(shardId string, status bool)
- func (info *StreamingInfo) StatsAddBadRecord(srcTable, recordType string)
- func (info *StreamingInfo) StatsAddDroppedRecord(srcTable, recordType string)
- func (info *StreamingInfo) StatsAddRecord(srcTable, recordType string)
- func (info *StreamingInfo) StatsAddRecordProcessed()
- func (info *StreamingInfo) TotalUnexpecteds() int64
- func (info *StreamingInfo) Unexpected(u string)
- type ToDdlImpl
Constants ¶
const (
ESC = 27
)
Variables ¶
This section is empty.
Functions ¶
func NewDynamoDBStream ¶
func NewDynamoDBStream(client dynamodbiface.DynamoDBAPI, srcTable string) (string, error)
NewDynamoDBStream initializes a new DynamoDB Stream for a table with NEW_AND_OLD_IMAGES StreamViewType. If there exists a stream for a given table then it must be of type NEW_IMAGE or NEW_AND_OLD_IMAGES otherwise streaming changes for this table won't be captured. It returns latest Stream Arn for the table along with any error if encountered.
func ProcessDataRow ¶
func ProcessRecord ¶
func ProcessRecord(conv *internal.Conv, streamInfo *StreamingInfo, record *dynamodbstreams.Record, srcTable string)
ProcessRecord processes records retrieved from shards. It first converts the data to Spanner data (based on the source and Spanner schemas), and then writes that data to Cloud Spanner.
func ProcessShard ¶
func ProcessShard(wgShard *sync.WaitGroup, streamInfo *StreamingInfo, conv *internal.Conv, streamClient dynamodbstreamsiface.DynamoDBStreamsAPI, shard *dynamodbstreams.Shard, streamArn, srcTable string)
ProcessShard processes records within a shard starting from the first unexpired record. It doesn't start processing unless parent shard is processed. For closed shards this process is completed after processing all records but for open shards it keeps searching for new records until shards gets closed or customer calls for a exit.
func ProcessStream ¶
func ProcessStream(wgStream *sync.WaitGroup, streamClient dynamodbstreamsiface.DynamoDBStreamsAPI, streamInfo *StreamingInfo, conv *internal.Conv, streamArn, srcTable string)
ProcessStream processes the latest enabled DynamoDB Stream for a table. It searches for shards within stream and for each shard it creates a seperate working thread to process records within it.
Types ¶
type InfoSchemaImpl ¶
type InfoSchemaImpl struct { DynamoClient dynamodbiface.DynamoDBAPI DynamoStreamsClient dynamodbstreamsiface.DynamoDBStreamsAPI SampleSize int64 }
func (InfoSchemaImpl) GetColumns ¶
func (InfoSchemaImpl) GetConstraints ¶
func (isi InfoSchemaImpl) GetConstraints(conv *internal.Conv, table common.SchemaAndName) (primaryKeys []string, constraints map[string][]string, err error)
func (InfoSchemaImpl) GetForeignKeys ¶
func (isi InfoSchemaImpl) GetForeignKeys(conv *internal.Conv, table common.SchemaAndName) (foreignKeys []schema.ForeignKey, err error)
func (InfoSchemaImpl) GetIndexes ¶
func (isi InfoSchemaImpl) GetIndexes(conv *internal.Conv, table common.SchemaAndName, colNameIdMap map[string]string) (indexes []schema.Index, err error)
func (InfoSchemaImpl) GetRowCount ¶
func (isi InfoSchemaImpl) GetRowCount(table common.SchemaAndName) (int64, error)
func (InfoSchemaImpl) GetRowsFromTable ¶
func (isi InfoSchemaImpl) GetRowsFromTable(conv *internal.Conv, srcTable string) (interface{}, error)
func (InfoSchemaImpl) GetTableName ¶
func (isi InfoSchemaImpl) GetTableName(schema string, tableName string) string
func (InfoSchemaImpl) GetTables ¶
func (isi InfoSchemaImpl) GetTables() ([]common.SchemaAndName, error)
func (InfoSchemaImpl) GetToDdl ¶
func (isi InfoSchemaImpl) GetToDdl() common.ToDdl
func (InfoSchemaImpl) ProcessData ¶
func (isi InfoSchemaImpl) ProcessData(conv *internal.Conv, tableId string, srcSchema schema.Table, colIds []string, spSchema ddl.CreateTable, additionalAttributes internal.AdditionalDataAttributes) error
ProcessData performs data conversion for DynamoDB database. For each table, we extract data using Scan requests, convert the data to Spanner data (based on the source and Spanner schemas), and write it to Spanner. If we can't get/process data for a table, we skip that table and process the remaining tables.
func (InfoSchemaImpl) StartChangeDataCapture ¶
func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *internal.Conv) (map[string]interface{}, error)
StartChangeDataCapture initializes the DynamoDB Streams for the source database. It returns the latestStreamArn for all tables in the source database.
func (InfoSchemaImpl) StartStreamingMigration ¶
func (isi InfoSchemaImpl) StartStreamingMigration(ctx context.Context, client *sp.Client, conv *internal.Conv, latestStreamArn map[string]interface{}) (internal.DataflowOutput, error)
StartStreamingMigration starts the streaming migration process by creating a seperate worker thread/goroutine for each table's DynamoDB Stream. It catches Ctrl+C signal if customer wants to stop the process.
type StreamingInfo ¶
type StreamingInfo struct { Records map[string]map[string]int64 // Tablewise count of records received from DynamoDB Streams, broken down by record type i.e. INSERT, MODIFY & REMOVE. BadRecords map[string]map[string]int64 // Tablewise count of records not converted successfully, broken down by record type. DroppedRecords map[string]map[string]int64 // Tablewise count of records successfully converted but failed to written on Spanner, broken down by record type. ShardProcessed map[string]bool // Processing status of a shard, (default false i.e. unprocessed). UserExit bool // Flag confirming if customer wants to exit or not, (false until user presses Ctrl+C). Unexpecteds map[string]int64 // Count of unexpected conditions, broken down by condition description. SampleBadRecords []string // Records that generated errors during conversion. SampleBadWrites []string // Records that faced errors while writing to Cloud Spanner. // contains filtered or unexported fields }
StreamingInfo contains information related to processing of DynamoDB Streams.
func MakeStreamingInfo ¶
func MakeStreamingInfo() *StreamingInfo
func (*StreamingInfo) CollectBadRecord ¶
func (info *StreamingInfo) CollectBadRecord(recordType, srcTable string, srcCols []string, vals []string)
CollectBadRecord collects a record if record is not successfully converted to Cloud Spanner supported data types.
func (*StreamingInfo) CollectDroppedRecord ¶
func (info *StreamingInfo) CollectDroppedRecord(recordType, spTable string, spCols []string, spVals []interface{}, err error)
CollectDroppedRecord collects a record if record faces an error while writing to Cloud Spanner.
func (*StreamingInfo) SetShardStatus ¶
func (info *StreamingInfo) SetShardStatus(shardId string, status bool)
SetShardStatus changes the processing status of a shard.
true -> shard processed and vice versa.
func (*StreamingInfo) StatsAddBadRecord ¶
func (info *StreamingInfo) StatsAddBadRecord(srcTable, recordType string)
StatsAddBadRecord increases the count of records which are not successfully converted to Cloud Spanner supported data types based on the table name and record type.
func (*StreamingInfo) StatsAddDroppedRecord ¶
func (info *StreamingInfo) StatsAddDroppedRecord(srcTable, recordType string)
StatsAddDroppedRecord increases the count of records which failed while writing to Cloud Spanner based on the table name and record type.
func (*StreamingInfo) StatsAddRecord ¶
func (info *StreamingInfo) StatsAddRecord(srcTable, recordType string)
StatsAddRecord increases the count of records read from DynamoDB Streams based on the table name and record type.
func (*StreamingInfo) StatsAddRecordProcessed ¶
func (info *StreamingInfo) StatsAddRecordProcessed()
StatsAddRecordProcessed increases the count of total records processed to Cloud Spanner.
func (*StreamingInfo) TotalUnexpecteds ¶
func (info *StreamingInfo) TotalUnexpecteds() int64
TotalUnexpecteds returns the total number of distinct unexpected conditions encountered during processing of DynamoDB Streams.
func (*StreamingInfo) Unexpected ¶
func (info *StreamingInfo) Unexpected(u string)
Unexpected records stats about corner-cases and conditions that were not expected.
type ToDdlImpl ¶
type ToDdlImpl struct { }
ToDdl implementation for DynamoDB
func (ToDdlImpl) ToSpannerType ¶
func (tdi ToDdlImpl) ToSpannerType(conv *internal.Conv, spType string, srcType schema.Type) (ddl.Type, []internal.SchemaIssue)
Functions below implement the common.ToDdl interface toSpannerType maps a scalar source schema type (defined by id and mods) into a Spanner type. This is the core source-to-Spanner type mapping. toSpannerType returns the Spanner type and a list of type conversion issues encountered.