Documentation ¶
Index ¶
- Constants
- func Convert(s StreamProperty, value sqltypes.Value) (interface{}, error)
- func Parse[T any](path string, obj T) (T, error)
- func ParseContents[T any](content []byte, obj T) (T, error)
- func QueryResultToRecords(qr *sqltypes.Result) []map[string]interface{}
- func Sync(ctx context.Context, mysqlDatabase PlanetScaleEdgeMysqlAccess, ...) error
- func TabletTypeToString(t psdbconnect.TabletType) string
- type BatchResponse
- type Bookmark
- type Catalog
- type DiscoverSettings
- type ImportBatch
- type ImportMessage
- type Logger
- type Metadata
- type MetadataCollection
- type NodeMetadata
- type OnCursor
- type OnResult
- type PlanetScaleDatabase
- type PlanetScaleEdgeDatabase
- type PlanetScaleEdgeMysqlAccess
- type PlanetScaleSource
- type ReadParams
- type Record
- type RecordWriter
- type SerializedCursor
- type ShardStates
- type State
- type StateMessage
- type StatusLogger
- type Stream
- type StreamProperty
- type StreamSchema
- type VitessTablet
- type WrappedState
Constants ¶
const ( MaxObjectsInBatch int = 1000 MaxBatchRequestSize int = 2 * 1024 * 1024 )
const MaxBatchSize = 10000
Variables ¶
This section is empty.
Functions ¶
func Convert ¶ added in v0.17.0
func Convert(s StreamProperty, value sqltypes.Value) (interface{}, error)
Convert will turn the mysql representation of a value into its equivalent JSONSchema compatible representation.
func ParseContents ¶ added in v0.11.0
func QueryResultToRecords ¶
func Sync ¶
func Sync(ctx context.Context, mysqlDatabase PlanetScaleEdgeMysqlAccess, edgeDatabase PlanetScaleDatabase, logger Logger, source PlanetScaleSource, catalog Catalog, state *State, recordWriter RecordWriter, tabletType psdbconnect.TabletType) error
func TabletTypeToString ¶
func TabletTypeToString(t psdbconnect.TabletType) string
Types ¶
type BatchResponse ¶
type Catalog ¶
type Catalog struct {
Streams []Stream `json:"streams,omitempty"`
}
func Discover ¶
func Discover(ctx context.Context, source PlanetScaleSource, mysql PlanetScaleEdgeMysqlAccess, settings DiscoverSettings) (Catalog, error)
type DiscoverSettings ¶ added in v0.7.0
type ImportBatch ¶
type ImportBatch struct { // The name of the destination table the data is being pushed to. // Table names must be unique in each destination schema, or loading issues will occur. Table string `json:"table_name"` // A Schema object containing the JSON schema describing the record(s) in the Message object’s data property. // Records must conform to this schema or an error will be returned when the request is sent. Schema StreamSchema `json:"schema"` // An array of Message objects, each representing a record to be upserted into the table. Messages []ImportMessage `json:"messages"` // An array of strings representing the Primary Key fields in the source table. // Each field in the list must be the name of a top-level property defined in the Schema object. // Primary Key fields cannot be contained in an object or an array. PrimaryKeys []string `json:"key_names"` }
ImportBatch is an object containing a table name, a table schema, and message objects representing records to be pushed to Stitch.
func (*ImportBatch) SizeOf ¶ added in v0.12.0
func (imb *ImportBatch) SizeOf() int
type ImportMessage ¶
type ImportMessage struct { // This will always be upsert. Action string `json:"action"` // An integer that tells the Import API the order in which // data points in the request body should be considered for loading. // This data will be stored in the destination table in the _sdc_sequence column. EmittedAt int64 `json:"sequence"` // The record to be upserted into a table. // The record data must conform to the JSON schema contained in the request’s Schema object. Data map[string]interface{} `json:"data"` }
ImportMessage contains information about a record to be upserted into a table.
type Logger ¶
type Logger interface { Log(message string) Info(message string) Error(message string) Schema(Catalog) error StreamSchema(Stream) error Record(Record, Stream) error State(State) error Flush(Stream) error }
func NewTestLogger ¶ added in v0.14.0
func NewTestLogger() Logger
type MetadataCollection ¶
type MetadataCollection []Metadata
func (MetadataCollection) GetPropertyMap ¶
func (m MetadataCollection) GetPropertyMap() map[string]Metadata
GetPropertyMap takes a MetadataCollection which is a flat slice of metadata values and turns it into a map of property name to metadata item input:
{ "metadata": { "inclusion": "available", "breadcrumb": [ "properties", "dept_no" ] } }, { "metadata": { "inclusion": "available", "breadcrumb": [ "properties", "dept_name"] } }
] output :
{ "dept_no": { "metadata": { "inclusion": "available", "breadcrumb": [ "properties", "dept_no" ] } }, "dept_name": { "metadata": { "inclusion": "available", "breadcrumb": [ "properties", "dept_name" ] } } }
func (MetadataCollection) GetSelectedProperties ¶ added in v0.14.0
func (m MetadataCollection) GetSelectedProperties() []string
type NodeMetadata ¶
type NodeMetadata struct { // Either true or false. Indicates that this node in the schema has been selected by the user for replication. Selected bool `json:"selected"` // Either FULL_TABLE, INCREMENTAL, or LOG_BASED. The replication method to use for a stream. ReplicationMethod string `json:"replication-method,omitempty"` // The name of a property in the source to use as a "bookmark". // For example, this will often be an "updated-at" field or an auto-incrementing primary key (requires replication-method). ReplicationKey string `json:"replication-key,omitempty"` // Either available, automatic, or unsupported. // 1. "available" means the field is available for selection, // and the tap will only emit values for that field if it is marked with "selected": true. // 2. "automatic" means that the tap will emit values for the field. // 3. "unsupported" means that the field exists in the source data but the tap is unable to provide it. Inclusion string `json:"inclusion,omitempty"` // Either true or false. // Indicates if a node in the schema should be replicated if // a user has not expressed any opinion on whether or not to replicate it. SelectedByDefault bool `json:"selected-by-default,omitempty"` // List of the fields that could be used as replication keys. ValidReplicationKeys []string `json:"valid-replication-keys,omitempty"` // Used to force the replication method to either FULL_TABLE or INCREMENTAL. ForcedReplicationMethod string `json:"forced-replication-method,omitempty"` // List of key properties for a database table. TableKeyProperties []string `json:"table-key-properties,omitempty"` // The name of the stream. SchemaName string `json:"schema-name,omitempty"` // Either true or false. Indicates whether a stream corresponds to a database view. IsView bool `json:"is-view,omitempty"` // Name of database. DatabaseName string `json:"database-name,omitempty"` // Represents the datatype of a database column. SqlDataType string `json:"sql-datatype,omitempty"` // The breadcrumb object defines the path into the schema to the node to which the metadata belongs. // Metadata for a stream will have an empty breadcrumb. // example for a stream: "breadcrumb": [] // example for a property: "breadcrumb": ["properties", "id"] BreadCrumb []string `json:"breadcrumb"` }
NodeMetadata represents the metadata for a given database object an example is : "metadata": [
{ "metadata": { "inclusion": "available", "table-key-properties": ["id"], "selected": true, "valid-replication-keys": ["date_modified"], "schema-name": "users", }, "breadcrumb": [] }, { "metadata": { "inclusion": "automatic" }, "breadcrumb": ["properties", "id"] }, { "metadata": { "inclusion": "available", "selected": true }, "breadcrumb": ["properties", "name"] }, { "metadata": { "inclusion": "automatic" }, "breadcrumb": ["properties", "date_modified"] } ]
type OnCursor ¶ added in v0.14.0
type OnCursor func(*psdbconnect.TableCursor) error
type PlanetScaleDatabase ¶
type PlanetScaleDatabase interface { CanConnect(ctx context.Context, ps PlanetScaleSource) error Read(ctx context.Context, params ReadParams) (*SerializedCursor, error) Close() error }
PlanetScaleDatabase is a general purpose interface that defines all the data access methods needed for the PlanetScale Singer Tap to function.
func NewEdge ¶
func NewEdge(mysql PlanetScaleEdgeMysqlAccess, logger Logger) PlanetScaleDatabase
type PlanetScaleEdgeDatabase ¶
type PlanetScaleEdgeDatabase struct { Logger Logger Mysql PlanetScaleEdgeMysqlAccess // contains filtered or unexported fields }
PlanetScaleEdgeDatabase is an implementation of the PlanetScaleDatabase interface defined above. It uses the mysql interface provided by PlanetScale for all schema/shard/tablet discovery and the grpc API for incrementally syncing rows from PlanetScale.
func (PlanetScaleEdgeDatabase) CanConnect ¶
func (p PlanetScaleEdgeDatabase) CanConnect(ctx context.Context, psc PlanetScaleSource) error
func (PlanetScaleEdgeDatabase) Close ¶
func (p PlanetScaleEdgeDatabase) Close() error
func (PlanetScaleEdgeDatabase) Read ¶
func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, params ReadParams) (*SerializedCursor, error)
Read streams rows from a table given a starting cursor. 1. We will get the latest vgtid for a given table in a shard when a sync session starts. 2. This latest vgtid is now the stopping point for this sync session. 3. Ask vstream to stream from the last known vgtid 4. When we reach the stopping point, read all rows available at this vgtid 5. End the stream when (a) a vgtid newer than latest vgtid is encountered or (b) the timeout kicks in.
type PlanetScaleEdgeMysqlAccess ¶
type PlanetScaleEdgeMysqlAccess interface { PingContext(context.Context, PlanetScaleSource) error GetTableNames(context.Context, PlanetScaleSource) ([]string, error) GetTableSchema(context.Context, PlanetScaleSource, string, bool) (map[string]StreamProperty, error) GetTablePrimaryKeys(context.Context, PlanetScaleSource, string) ([]string, error) GetVitessShards(ctx context.Context, psc PlanetScaleSource) ([]string, error) GetVitessTablets(ctx context.Context, psc PlanetScaleSource) ([]VitessTablet, error) Close() error }
func NewMySQL ¶
func NewMySQL(psc *PlanetScaleSource) (PlanetScaleEdgeMysqlAccess, error)
type PlanetScaleSource ¶
type PlanetScaleSource struct { Host string `json:"host"` Database string `json:"database"` Username string `json:"username"` Password string `json:"password"` Shards string `json:"shards"` }
PlanetScaleSource defines a configured Singer Tap Source for a PlanetScale database
func (PlanetScaleSource) DSN ¶
func (psc PlanetScaleSource) DSN(tt psdbconnect.TabletType) string
DSN returns a DataSource that mysql libraries can use to connect to a PlanetScale database.
func (PlanetScaleSource) GetInitialState ¶
func (psc PlanetScaleSource) GetInitialState(keyspaceOrDatabase string, shards []string) (ShardStates, error)
GetInitialState will return the initial/blank state for a given keyspace in all of its shards. This state can be round-tripped safely with Singer.
type ReadParams ¶ added in v0.14.0
type ReadParams struct { Source PlanetScaleSource Table Stream LastKnownPosition *psdbconnect.TableCursor Columns []string OnResult OnResult OnCursor OnCursor TabletType psdbconnect.TabletType Cells []string }
type Record ¶
type Record struct { // a constant with value "Record" Type string `json:"type"` // The string name of the stream Stream string `json:"stream"` // The time this record was observed in the source. // This should be an RFC3339 formatted date-time, like "2017-11-20T16:45:33.000Z". TimeExtracted string `json:"time_extracted"` // A JSON map containing a streamed data point Data map[string]interface{} `json:"record"` }
Record messages contain the data from the data stream. example:
{ "type": "RECORD", "stream": "users", "time_extracted": "2017-11-20T16:45:33.000Z", "record": { "id": 0, "name": "Chris" } }
type RecordWriter ¶ added in v0.14.0
type RecordWriter interface { Flush(stream Stream) error Record(record Record, stream Stream) error State(state State) error }
func NewHttpRecordWriter ¶ added in v0.14.0
func NewHttpRecordWriter(batchSize int, apiURL, apiToken, stateFileDir string, logger StatusLogger) RecordWriter
type SerializedCursor ¶
type SerializedCursor struct {
Cursor string `json:"cursor"`
}
func TableCursorToSerializedCursor ¶
func TableCursorToSerializedCursor(cursor *psdbconnect.TableCursor) (*SerializedCursor, error)
func (SerializedCursor) SerializedCursorToTableCursor ¶
func (s SerializedCursor) SerializedCursorToTableCursor() (*psdbconnect.TableCursor, error)
type ShardStates ¶
type ShardStates struct {
Shards map[string]*SerializedCursor `json:"shards"`
}
type State ¶
type State struct {
Streams map[string]ShardStates `json:"bookmarks"`
}
State represents any previously known state about the last sync operation example :
{ "bookmarks": { "branch_query": { "shards": { "80-c0": { "cursor": "Base64-encoded-tablecursor" } } }, "branch_query_tag": { "shards": { "-40": { "cursor": "Base64-encoded-tablecursor" }, "c0-": { "cursor": "Base64-encoded-tablecursor" }, "40-80": { "cursor": "Base64-encoded-tablecursor" }, "80-c0": { "cursor": "Base64-encoded-tablecursor" } } } } }
func ParseSavedState ¶ added in v0.11.0
type StateMessage ¶
type StatusLogger ¶ added in v0.14.0
type Stream ¶
type Stream struct { // Type is a constant of value "SCHEMA" Type string `json:"type"` // The name of the stream. Name string `json:"stream"` // The unique identifier for the stream. // This is allowed to be different from the name of the stream // in order to allow for sources that have duplicate stream names. ID string `json:"tap_stream_id"` // The JSON schema for the stream. Schema StreamSchema `json:"schema"` // For a database source, the name of the table. TableName string `json:"table-name"` // Each piece of metadata has the following canonical shape: //{ // "metadata" : { // "selected" : true, // "some-other-metadata" : "whatever" // }, // "breadcrumb" : ["properties", "some-field-name"] //} Metadata MetadataCollection `json:"metadata"` // A list of strings indicating which properties make up the primary key for this stream. // Each item in the list must be the name of a top-level property defined in the schema KeyProperties []string `json:"key_properties"` // A list of strings indicating which properties the tap is using as bookmarks. // Each item in the list must be the name of a top-level property defined in the schema. CursorProperties []string `json:"bookmark_properties"` }
Stream represents the JSONSchema definition for a given database object. example:
{ "streams": [ { "tap_stream_id": "users", "stream": "users", "schema": { "type": ["null", "object"], "additionalProperties": false, "properties": { "id": { "type": [ "null", "string" ], }, "name": { "type": [ "null", "string" ], }, "date_modified": { "type": [ "null", "string" ], "format": "date-time", } } } } ] }
func (*Stream) GenerateMetadata ¶
func (*Stream) GetTableMetadata ¶
GetTableMetadata iterates the Metadata collection for a stream and returns the metadata item that is associated with the stream.
func (*Stream) IncrementalSyncRequested ¶
type StreamProperty ¶
type StreamProperty struct { Types []string `json:"type"` CustomFormat string `json:"format,omitempty"` }
func (StreamProperty) IsBoolean ¶ added in v0.17.0
func (s StreamProperty) IsBoolean() bool
func (StreamProperty) IsDateTime ¶ added in v0.17.0
func (s StreamProperty) IsDateTime() bool
func (StreamProperty) IsInteger ¶ added in v0.17.0
func (s StreamProperty) IsInteger() bool
func (StreamProperty) IsNumber ¶ added in v0.17.0
func (s StreamProperty) IsNumber() bool
type StreamSchema ¶
type StreamSchema struct { Type []string `json:"type"` HasAdditionalProperties bool `json:"additionalProperties"` Properties map[string]StreamProperty `json:"properties"` }
type VitessTablet ¶
type WrappedState ¶ added in v0.11.0
type WrappedState struct {
Value State `json:"value"`
}