Documentation ¶
Index ¶
- Constants
- func QueryResultToRecords(qr *sqltypes.Result) []map[string]interface{}
- func TabletTypeToString(t psdbconnect.TabletType) string
- type AirbyteLogMessage
- type AirbyteLogger
- type AirbyteMessage
- type AirbyteRecord
- type AirbyteState
- type Catalog
- type ConfiguredCatalog
- type ConfiguredStream
- type ConnectionProperties
- type ConnectionProperty
- type ConnectionPropertyHash
- type ConnectionPropertyHashProperties
- type ConnectionSpecification
- type ConnectionStatus
- type CustomOptions
- type CustomOptionsProperties
- type CustomOptionsSpecification
- type CustomSourceOptions
- type PlanetScaleDatabase
- type PlanetScaleEdgeDatabase
- func (p PlanetScaleEdgeDatabase) CanConnect(ctx context.Context, psc PlanetScaleSource) error
- func (p PlanetScaleEdgeDatabase) Close() error
- func (p PlanetScaleEdgeDatabase) DiscoverSchema(ctx context.Context, psc PlanetScaleSource) (Catalog, error)
- func (p PlanetScaleEdgeDatabase) ListShards(ctx context.Context, psc PlanetScaleSource) ([]string, error)
- func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, w io.Writer, ps PlanetScaleSource, s ConfiguredStream, ...) (*SerializedCursor, error)
- type PlanetScaleEdgeMysqlAccess
- type PlanetScaleSource
- type PropertyType
- type SerializedCursor
- type ShardStates
- type Spec
- type SpecMessage
- type Stream
- type StreamSchema
- type SyncState
- type VitessTablet
Constants ¶
const ( RECORD = "RECORD" STATE = "STATE" LOG = "LOG" CONNECTION_STATUS = "CONNECTION_STATUS" CATALOG = "CATALOG" )
const ( LOGLEVEL_ERROR = "ERROR" LOGLEVEL_WARN = "WARN" LOGLEVEL_INFO = "INFO" )
const ( SYNC_MODE_FULL_REFRESH = "full_refresh" SYNC_MODE_INCREMENTAL = "incremental" )
const MaxBatchSize = 10000
Variables ¶
This section is empty.
Functions ¶
func QueryResultToRecords ¶
func TabletTypeToString ¶
func TabletTypeToString(t psdbconnect.TabletType) string
Types ¶
type AirbyteLogMessage ¶
type AirbyteLogger ¶
type AirbyteLogger interface { Log(level, message string) Catalog(catalog Catalog) ConnectionStatus(status ConnectionStatus) Record(tableNamespace, tableName string, data map[string]interface{}) Flush() State(syncState SyncState) Error(error string) }
func NewLogger ¶
func NewLogger(w io.Writer) AirbyteLogger
type AirbyteMessage ¶
type AirbyteMessage struct { Type string `json:"type"` Log *AirbyteLogMessage `json:"log,omitempty"` ConnectionStatus *ConnectionStatus `json:"connectionStatus,omitempty"` Catalog *Catalog `json:"catalog,omitempty"` Record *AirbyteRecord `json:"record,omitempty"` State *AirbyteState `json:"state,omitempty"` }
type AirbyteRecord ¶
type AirbyteState ¶
type AirbyteState struct {
Data SyncState `json:"data"`
}
type ConfiguredCatalog ¶
type ConfiguredCatalog struct {
Streams []ConfiguredStream `json:"streams"`
}
type ConfiguredStream ¶
func (ConfiguredStream) IncrementalSyncRequested ¶
func (cs ConfiguredStream) IncrementalSyncRequested() bool
func (ConfiguredStream) ResetRequested ¶
func (cs ConfiguredStream) ResetRequested() bool
type ConnectionProperties ¶
type ConnectionProperties struct { Host ConnectionProperty `json:"host"` Shards ConnectionProperty `json:"shards"` Database ConnectionProperty `json:"database"` Username ConnectionProperty `json:"username"` Password ConnectionProperty `json:"password"` Options CustomOptionsSpecification `json:"options"` }
type ConnectionProperty ¶
type ConnectionProperty struct { Description string `json:"description"` Title string `json:"title"` Type string `json:"type"` Order int `json:"order"` IsSecret bool `json:"airbyte_secret,omitempty"` Minimum int `json:"minimum,omitempty"` Maximum int `json:"maximum,omitempty"` Default interface{} `json:"default,omitempty"` }
type ConnectionPropertyHash ¶ added in v1.5.0
type ConnectionPropertyHash struct { Description string `json:"description"` Title string `json:"title"` Type string `json:"type"` Order int `json:"order"` Options []ConnectionPropertyHashProperties `json:"oneOf"` }
type ConnectionPropertyHashProperties ¶ added in v1.5.0
type ConnectionPropertyHashProperties struct {
DoNotTreatTinyIntAsBoolean ConnectionProperty `json:"do_not_treat_tiny_int_as_boolean"`
}
type ConnectionSpecification ¶
type ConnectionStatus ¶
type CustomOptions ¶ added in v1.5.0
type CustomOptions struct { Description string `json:"description"` Title string `json:"title"` Type string `json:"type"` Order int `json:"order"` Properties CustomOptionsProperties `json:"properties"` }
type CustomOptionsProperties ¶ added in v1.5.0
type CustomOptionsProperties struct {
DoNotTreatTinyIntAsBoolean ConnectionProperty `json:"do_not_treat_tiny_int_as_boolean"`
}
type CustomOptionsSpecification ¶ added in v1.5.0
type CustomOptionsSpecification struct { Description string `json:"description"` Title string `json:"title"` Type string `json:"type"` Order int `json:"order"` Options []CustomOptions `json:"oneOf"` }
type CustomSourceOptions ¶ added in v1.5.0
type CustomSourceOptions struct {
DoNotTreatTinyIntAsBoolean bool `json:"do_not_treat_tiny_int_as_boolean"`
}
type PlanetScaleDatabase ¶
type PlanetScaleDatabase interface { CanConnect(ctx context.Context, ps PlanetScaleSource) error DiscoverSchema(ctx context.Context, ps PlanetScaleSource) (Catalog, error) ListShards(ctx context.Context, ps PlanetScaleSource) ([]string, error) Read(ctx context.Context, w io.Writer, ps PlanetScaleSource, s ConfiguredStream, tc *psdbconnect.TableCursor) (*SerializedCursor, error) Close() error }
PlanetScaleDatabase is a general purpose interface that defines all the data access methods needed for the PlanetScale Airbyte source to function.
type PlanetScaleEdgeDatabase ¶
type PlanetScaleEdgeDatabase struct { Logger AirbyteLogger Mysql PlanetScaleEdgeMysqlAccess // contains filtered or unexported fields }
PlanetScaleEdgeDatabase is an implementation of the PlanetScaleDatabase interface defined above. It uses the mysql interface provided by PlanetScale for all schema/shard/tablet discovery and the grpc API for incrementally syncing rows from PlanetScale.
func (PlanetScaleEdgeDatabase) CanConnect ¶
func (p PlanetScaleEdgeDatabase) CanConnect(ctx context.Context, psc PlanetScaleSource) error
func (PlanetScaleEdgeDatabase) Close ¶
func (p PlanetScaleEdgeDatabase) Close() error
func (PlanetScaleEdgeDatabase) DiscoverSchema ¶
func (p PlanetScaleEdgeDatabase) DiscoverSchema(ctx context.Context, psc PlanetScaleSource) (Catalog, error)
func (PlanetScaleEdgeDatabase) ListShards ¶
func (p PlanetScaleEdgeDatabase) ListShards(ctx context.Context, psc PlanetScaleSource) ([]string, error)
func (PlanetScaleEdgeDatabase) Read ¶
func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, w io.Writer, ps PlanetScaleSource, s ConfiguredStream, lastKnownPosition *psdbconnect.TableCursor) (*SerializedCursor, error)
Read streams rows from a table given a starting cursor. 1. We will get the latest vgtid for a given table in a shard when a sync session starts. 2. This latest vgtid is now the stopping point for this sync session. 3. Ask vstream to stream from the last known vgtid 4. When we reach the stopping point, read all rows available at this vgtid 5. End the stream when (a) a vgtid newer than latest vgtid is encountered or (b) the timeout kicks in.
type PlanetScaleEdgeMysqlAccess ¶
type PlanetScaleEdgeMysqlAccess interface { PingContext(context.Context, PlanetScaleSource) error GetTableNames(context.Context, PlanetScaleSource) ([]string, error) GetTableSchema(context.Context, PlanetScaleSource, string) (map[string]PropertyType, error) GetTablePrimaryKeys(context.Context, PlanetScaleSource, string) ([]string, error) GetVitessShards(ctx context.Context, psc PlanetScaleSource) ([]string, error) GetVitessTablets(ctx context.Context, psc PlanetScaleSource) ([]VitessTablet, error) Close() error }
func NewMySQL ¶
func NewMySQL(psc *PlanetScaleSource) (PlanetScaleEdgeMysqlAccess, error)
type PlanetScaleSource ¶
type PlanetScaleSource struct { Host string `json:"host"` Database string `json:"database"` Username string `json:"username"` Password string `json:"password"` Shards string `json:"shards"` Options CustomSourceOptions `json:"options"` }
PlanetScaleSource defines a configured Airbyte Source for a PlanetScale database
func (PlanetScaleSource) DSN ¶
func (psc PlanetScaleSource) DSN(tt psdbconnect.TabletType) string
DSN returns a DataSource that mysql libraries can use to connect to a PlanetScale database.
func (PlanetScaleSource) GetInitialState ¶
func (psc PlanetScaleSource) GetInitialState(keyspaceOrDatabase string, shards []string) (ShardStates, error)
GetInitialState will return the initial/blank state for a given keyspace in all of its shards. This state can be round-tripped safely with Airbyte.
type PropertyType ¶
type SerializedCursor ¶
type SerializedCursor struct {
Cursor string `json:"cursor"`
}
func TableCursorToSerializedCursor ¶
func TableCursorToSerializedCursor(cursor *psdbconnect.TableCursor) (*SerializedCursor, error)
func (SerializedCursor) SerializedCursorToTableCursor ¶
func (s SerializedCursor) SerializedCursorToTableCursor(table ConfiguredStream) (*psdbconnect.TableCursor, error)
type ShardStates ¶
type ShardStates struct {
Shards map[string]*SerializedCursor `json:"shards"`
}
type Spec ¶
type Spec struct { DocumentationURL string `json:"documentationUrl"` ConnectionSpecification ConnectionSpecification `json:"connectionSpecification"` SupportsIncremental bool `json:"supportsIncremental"` SupportsNormalization bool `json:"supportsNormalization"` SupportsDBT bool `json:"supportsDBT"` SupportedDestinationSyncModes []string `json:"supported_destination_sync_modes"` }
type SpecMessage ¶
type Stream ¶
type Stream struct { Name string `json:"name"` Schema StreamSchema `json:"json_schema"` SupportedSyncModes []string `json:"supported_sync_modes"` Namespace string `json:"namespace"` PrimaryKeys [][]string `json:"source_defined_primary_key"` SourceDefinedCursor bool `json:"source_defined_cursor"` DefaultCursorFields []string `json:"default_cursor_field"` }
type StreamSchema ¶
type StreamSchema struct { Type string `json:"type"` Properties map[string]PropertyType `json:"properties"` }
type SyncState ¶
type SyncState struct {
Streams map[string]ShardStates `json:"streams"`
}