internal

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 24, 2022 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	RECORD            = "RECORD"
	STATE             = "STATE"
	LOG               = "LOG"
	CONNECTION_STATUS = "CONNECTION_STATUS"
	CATALOG           = "CATALOG"
)
View Source
const (
	LOGLEVEL_ERROR = "ERROR"
	LOGLEVEL_WARN  = "WARN"
	LOGLEVEL_INFO  = "INFO"
)
View Source
const (
	SYNC_MODE_FULL_REFRESH = "full_refresh"
	SYNC_MODE_INCREMENTAL  = "incremental"
)
View Source
const MaxBatchSize = 10000

Variables

This section is empty.

Functions

func QueryResultToRecords

func QueryResultToRecords(qr *sqltypes.Result) []map[string]interface{}

func TabletTypeToString

func TabletTypeToString(t psdbconnect.TabletType) string

Types

type AirbyteLogMessage

type AirbyteLogMessage struct {
	Level   string `json:"level,omitempty"`
	Message string `json:"message,omitempty"`
}

type AirbyteLogger

type AirbyteLogger interface {
	Log(level, message string)
	Catalog(catalog Catalog)
	ConnectionStatus(status ConnectionStatus)
	Record(tableNamespace, tableName string, data map[string]interface{})
	Flush()
	State(syncState SyncState)
	Error(error string)
}

func NewLogger

func NewLogger(w io.Writer) AirbyteLogger

type AirbyteMessage

type AirbyteMessage struct {
	Type             string             `json:"type"`
	Log              *AirbyteLogMessage `json:"log,omitempty"`
	ConnectionStatus *ConnectionStatus  `json:"connectionStatus,omitempty"`
	Catalog          *Catalog           `json:"catalog,omitempty"`
	Record           *AirbyteRecord     `json:"record,omitempty"`
	State            *AirbyteState      `json:"state,omitempty"`
}

type AirbyteRecord

type AirbyteRecord struct {
	Stream    string                 `json:"stream"`
	Namespace string                 `json:"namespace"`
	EmittedAt int64                  `json:"emitted_at"`
	Data      map[string]interface{} `json:"data"`
}

type AirbyteState

type AirbyteState struct {
	Data SyncState `json:"data"`
}

type Catalog

type Catalog struct {
	Streams []Stream `json:"streams"`
}

type ConfiguredCatalog

type ConfiguredCatalog struct {
	Streams []ConfiguredStream `json:"streams"`
}

type ConfiguredStream

type ConfiguredStream struct {
	Stream   Stream `json:"stream"`
	SyncMode string `json:"sync_mode"`
}

func (ConfiguredStream) IncrementalSyncRequested

func (cs ConfiguredStream) IncrementalSyncRequested() bool

func (ConfiguredStream) ResetRequested

func (cs ConfiguredStream) ResetRequested() bool

type ConnectionProperties

type ConnectionProperties struct {
	Host     ConnectionProperty `json:"host"`
	Shards   ConnectionProperty `json:"shards"`
	Database ConnectionProperty `json:"database"`
	Username ConnectionProperty `json:"username"`
	Password ConnectionProperty `json:"password"`
}

type ConnectionProperty

type ConnectionProperty struct {
	Description string      `json:"description"`
	Title       string      `json:"title"`
	Type        string      `json:"type"`
	Order       int         `json:"order"`
	IsSecret    bool        `json:"airbyte_secret"`
	Minimum     int         `json:"minimum"`
	Maximum     int         `json:"maximum"`
	Default     interface{} `json:"default"`
}

type ConnectionSpecification

type ConnectionSpecification struct {
	Schema               string               `json:"$schema"`
	Title                string               `json:"title"`
	Type                 string               `json:"type"`
	Required             []string             `json:"required"`
	AdditionalProperties bool                 `json:"additionalProperties"`
	Properties           ConnectionProperties `json:"properties"`
}

type ConnectionStatus

type ConnectionStatus struct {
	Status  string `json:"status"`
	Message string `json:"message"`
}

type PlanetScaleDatabase

type PlanetScaleDatabase interface {
	CanConnect(ctx context.Context, ps PlanetScaleSource) error
	DiscoverSchema(ctx context.Context, ps PlanetScaleSource) (Catalog, error)
	ListShards(ctx context.Context, ps PlanetScaleSource) ([]string, error)
	Read(ctx context.Context, w io.Writer, ps PlanetScaleSource, s ConfiguredStream, tc *psdbconnect.TableCursor) (*SerializedCursor, error)
	Close() error
}

PlanetScaleDatabase is a general purpose interface that defines all the data access methods needed for the PlanetScale Airbyte source to function.

type PlanetScaleEdgeDatabase

type PlanetScaleEdgeDatabase struct {
	Logger AirbyteLogger
	Mysql  PlanetScaleEdgeMysqlAccess
	// contains filtered or unexported fields
}

PlanetScaleEdgeDatabase is an implementation of the PlanetScaleDatabase interface defined above. It uses the mysql interface provided by PlanetScale for all schema/shard/tablet discovery and the grpc API for incrementally syncing rows from PlanetScale.

func (PlanetScaleEdgeDatabase) CanConnect

func (PlanetScaleEdgeDatabase) Close

func (p PlanetScaleEdgeDatabase) Close() error

func (PlanetScaleEdgeDatabase) DiscoverSchema

func (p PlanetScaleEdgeDatabase) DiscoverSchema(ctx context.Context, psc PlanetScaleSource) (Catalog, error)

func (PlanetScaleEdgeDatabase) ListShards

type PlanetScaleEdgeMysqlAccess

type PlanetScaleEdgeMysqlAccess interface {
	PingContext(context.Context, PlanetScaleSource) error
	GetTableNames(context.Context, PlanetScaleSource) ([]string, error)
	GetTableSchema(context.Context, PlanetScaleSource, string) (map[string]PropertyType, error)
	GetTablePrimaryKeys(context.Context, PlanetScaleSource, string) ([]string, error)
	GetVitessShards(ctx context.Context, psc PlanetScaleSource) ([]string, error)
	GetVitessTablets(ctx context.Context, psc PlanetScaleSource) ([]VitessTablet, error)
	Close() error
}

type PlanetScaleSource

type PlanetScaleSource struct {
	Host     string `json:"host"`
	Database string `json:"database"`
	Username string `json:"username"`
	Password string `json:"password"`
	Shards   string `json:"shards"`
}

PlanetScaleSource defines a configured Airbyte Source for a PlanetScale database

func (PlanetScaleSource) DSN

DSN returns a DataSource that mysql libraries can use to connect to a PlanetScale database.

func (PlanetScaleSource) GetInitialState

func (psc PlanetScaleSource) GetInitialState(keyspaceOrDatabase string, shards []string) (ShardStates, error)

GetInitialState will return the initial/blank state for a given keyspace in all of its shards. This state can be round-tripped safely with Airbyte.

type PropertyType

type PropertyType struct {
	Type string `json:"type"`
}

type SerializedCursor

type SerializedCursor struct {
	Cursor string `json:"cursor"`
}

func TableCursorToSerializedCursor

func TableCursorToSerializedCursor(cursor *psdbconnect.TableCursor) (*SerializedCursor, error)

func (SerializedCursor) SerializedCursorToTableCursor

func (s SerializedCursor) SerializedCursorToTableCursor(table ConfiguredStream) (*psdbconnect.TableCursor, error)

type ShardStates

type ShardStates struct {
	Shards map[string]*SerializedCursor `json:"shards"`
}

type Spec

type Spec struct {
	DocumentationURL              string                  `json:"documentationUrl"`
	ConnectionSpecification       ConnectionSpecification `json:"connectionSpecification"`
	SupportsIncremental           bool                    `json:"supportsIncremental"`
	SupportsNormalization         bool                    `json:"supportsNormalization"`
	SupportsDBT                   bool                    `json:"supportsDBT"`
	SupportedDestinationSyncModes []string                `json:"supported_destination_sync_modes"`
}

type SpecMessage

type SpecMessage struct {
	Type string `json:"type"`
	Spec Spec   `json:"spec"`
}

type Stream

type Stream struct {
	Name                string       `json:"name"`
	Schema              StreamSchema `json:"json_schema"`
	SupportedSyncModes  []string     `json:"supported_sync_modes"`
	Namespace           string       `json:"namespace"`
	PrimaryKeys         [][]string   `json:"source_defined_primary_key"`
	SourceDefinedCursor bool         `json:"source_defined_cursor"`
	DefaultCursorFields []string     `json:"default_cursor_field"`
}

type StreamSchema

type StreamSchema struct {
	Type       string                  `json:"type"`
	Properties map[string]PropertyType `json:"properties"`
}

type SyncState

type SyncState struct {
	Streams map[string]ShardStates `json:"streams"`
}

type VitessTablet

type VitessTablet struct {
	Cell                 string
	Keyspace             string
	Shard                string
	TabletType           string
	State                string
	Alias                string
	Hostname             string
	PrimaryTermStartTime string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL