Documentation ¶
Index ¶
- Constants
- func QueryResultToRecords(qr *sqltypes.Result) []map[string]interface{}
- func TabletTypeToString(t psdbconnect.TabletType) string
- type AirbyteLogMessage
- type AirbyteLogger
- type AirbyteMessage
- type AirbyteRecord
- type AirbyteState
- type Catalog
- type ConfiguredCatalog
- type ConfiguredStream
- type ConnectionProperties
- type ConnectionProperty
- type ConnectionSpecification
- type ConnectionStatus
- type PlanetScaleDatabase
- type PlanetScaleEdgeDatabase
- func (p PlanetScaleEdgeDatabase) CanConnect(ctx context.Context, psc PlanetScaleSource) error
- func (p PlanetScaleEdgeDatabase) Close() error
- func (p PlanetScaleEdgeDatabase) DiscoverSchema(ctx context.Context, psc PlanetScaleSource) (Catalog, error)
- func (p PlanetScaleEdgeDatabase) ListShards(ctx context.Context, psc PlanetScaleSource) ([]string, error)
- func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, w io.Writer, ps PlanetScaleSource, s ConfiguredStream, ...) (*SerializedCursor, error)
- type PlanetScaleEdgeMysqlAccess
- type PlanetScaleSource
- type PropertyType
- type SerializedCursor
- type ShardStates
- type Spec
- type SpecMessage
- type Stream
- type StreamSchema
- type SyncState
- type VitessTablet
Constants ¶
const ( RECORD = "RECORD" STATE = "STATE" LOG = "LOG" CONNECTION_STATUS = "CONNECTION_STATUS" CATALOG = "CATALOG" )
const ( LOGLEVEL_ERROR = "ERROR" LOGLEVEL_WARN = "WARN" LOGLEVEL_INFO = "INFO" )
const ( SYNC_MODE_FULL_REFRESH = "full_refresh" SYNC_MODE_INCREMENTAL = "incremental" )
const MaxBatchSize = 10000
Variables ¶
This section is empty.
Functions ¶
func QueryResultToRecords ¶
func TabletTypeToString ¶
func TabletTypeToString(t psdbconnect.TabletType) string
Types ¶
type AirbyteLogMessage ¶
type AirbyteLogger ¶
type AirbyteLogger interface { Log(level, message string) Catalog(catalog Catalog) ConnectionStatus(status ConnectionStatus) Record(tableNamespace, tableName string, data map[string]interface{}) Flush() State(syncState SyncState) Error(error string) }
func NewLogger ¶
func NewLogger(w io.Writer) AirbyteLogger
type AirbyteMessage ¶
type AirbyteMessage struct { Type string `json:"type"` Log *AirbyteLogMessage `json:"log,omitempty"` ConnectionStatus *ConnectionStatus `json:"connectionStatus,omitempty"` Catalog *Catalog `json:"catalog,omitempty"` Record *AirbyteRecord `json:"record,omitempty"` State *AirbyteState `json:"state,omitempty"` }
type AirbyteRecord ¶
type AirbyteState ¶
type AirbyteState struct {
Data SyncState `json:"data"`
}
type ConfiguredCatalog ¶
type ConfiguredCatalog struct {
Streams []ConfiguredStream `json:"streams"`
}
type ConfiguredStream ¶
func (ConfiguredStream) IncrementalSyncRequested ¶
func (cs ConfiguredStream) IncrementalSyncRequested() bool
func (ConfiguredStream) ResetRequested ¶
func (cs ConfiguredStream) ResetRequested() bool
type ConnectionProperties ¶
type ConnectionProperties struct { Host ConnectionProperty `json:"host"` Shards ConnectionProperty `json:"shards"` Database ConnectionProperty `json:"database"` Username ConnectionProperty `json:"username"` Password ConnectionProperty `json:"password"` }
type ConnectionProperty ¶
type ConnectionSpecification ¶
type ConnectionStatus ¶
type PlanetScaleDatabase ¶
type PlanetScaleDatabase interface { CanConnect(ctx context.Context, ps PlanetScaleSource) error DiscoverSchema(ctx context.Context, ps PlanetScaleSource) (Catalog, error) ListShards(ctx context.Context, ps PlanetScaleSource) ([]string, error) Read(ctx context.Context, w io.Writer, ps PlanetScaleSource, s ConfiguredStream, tc *psdbconnect.TableCursor) (*SerializedCursor, error) Close() error }
PlanetScaleDatabase is a general purpose interface that defines all the data access methods needed for the PlanetScale Airbyte source to function.
type PlanetScaleEdgeDatabase ¶
type PlanetScaleEdgeDatabase struct { Logger AirbyteLogger Mysql PlanetScaleEdgeMysqlAccess // contains filtered or unexported fields }
PlanetScaleEdgeDatabase is an implementation of the PlanetScaleDatabase interface defined above. It uses the mysql interface provided by PlanetScale for all schema/shard/tablet discovery and the grpc API for incrementally syncing rows from PlanetScale.
func (PlanetScaleEdgeDatabase) CanConnect ¶
func (p PlanetScaleEdgeDatabase) CanConnect(ctx context.Context, psc PlanetScaleSource) error
func (PlanetScaleEdgeDatabase) Close ¶
func (p PlanetScaleEdgeDatabase) Close() error
func (PlanetScaleEdgeDatabase) DiscoverSchema ¶
func (p PlanetScaleEdgeDatabase) DiscoverSchema(ctx context.Context, psc PlanetScaleSource) (Catalog, error)
func (PlanetScaleEdgeDatabase) ListShards ¶
func (p PlanetScaleEdgeDatabase) ListShards(ctx context.Context, psc PlanetScaleSource) ([]string, error)
func (PlanetScaleEdgeDatabase) Read ¶
func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, w io.Writer, ps PlanetScaleSource, s ConfiguredStream, tc *psdbconnect.TableCursor) (*SerializedCursor, error)
type PlanetScaleEdgeMysqlAccess ¶
type PlanetScaleEdgeMysqlAccess interface { PingContext(context.Context, PlanetScaleSource) error GetTableNames(context.Context, PlanetScaleSource) ([]string, error) GetTableSchema(context.Context, PlanetScaleSource, string) (map[string]PropertyType, error) GetTablePrimaryKeys(context.Context, PlanetScaleSource, string) ([]string, error) GetVitessShards(ctx context.Context, psc PlanetScaleSource) ([]string, error) GetVitessTablets(ctx context.Context, psc PlanetScaleSource) ([]VitessTablet, error) Close() error }
func NewMySQL ¶
func NewMySQL(psc *PlanetScaleSource) (PlanetScaleEdgeMysqlAccess, error)
type PlanetScaleSource ¶
type PlanetScaleSource struct { Host string `json:"host"` Database string `json:"database"` Username string `json:"username"` Password string `json:"password"` Shards string `json:"shards"` }
PlanetScaleSource defines a configured Airbyte Source for a PlanetScale database
func (PlanetScaleSource) DSN ¶
func (psc PlanetScaleSource) DSN(tt psdbconnect.TabletType) string
DSN returns a DataSource that mysql libraries can use to connect to a PlanetScale database.
func (PlanetScaleSource) GetInitialState ¶
func (psc PlanetScaleSource) GetInitialState(keyspaceOrDatabase string, shards []string) (ShardStates, error)
GetInitialState will return the initial/blank state for a given keyspace in all of its shards. This state can be round-tripped safely with Airbyte.
type PropertyType ¶
type PropertyType struct {
Type string `json:"type"`
}
type SerializedCursor ¶
type SerializedCursor struct {
Cursor string `json:"cursor"`
}
func TableCursorToSerializedCursor ¶
func TableCursorToSerializedCursor(cursor *psdbconnect.TableCursor) (*SerializedCursor, error)
func (SerializedCursor) SerializedCursorToTableCursor ¶
func (s SerializedCursor) SerializedCursorToTableCursor(table ConfiguredStream) (*psdbconnect.TableCursor, error)
type ShardStates ¶
type ShardStates struct {
Shards map[string]*SerializedCursor `json:"shards"`
}
type Spec ¶
type Spec struct { DocumentationURL string `json:"documentationUrl"` ConnectionSpecification ConnectionSpecification `json:"connectionSpecification"` SupportsIncremental bool `json:"supportsIncremental"` SupportsNormalization bool `json:"supportsNormalization"` SupportsDBT bool `json:"supportsDBT"` SupportedDestinationSyncModes []string `json:"supported_destination_sync_modes"` }
type SpecMessage ¶
type Stream ¶
type Stream struct { Name string `json:"name"` Schema StreamSchema `json:"json_schema"` SupportedSyncModes []string `json:"supported_sync_modes"` Namespace string `json:"namespace"` PrimaryKeys [][]string `json:"source_defined_primary_key"` SourceDefinedCursor bool `json:"source_defined_cursor"` DefaultCursorFields []string `json:"default_cursor_field"` }
type StreamSchema ¶
type StreamSchema struct { Type string `json:"type"` Properties map[string]PropertyType `json:"properties"` }
type SyncState ¶
type SyncState struct {
Streams map[string]ShardStates `json:"streams"`
}