internal

package
v1.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 25, 2022 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const (
	RECORD            = "RECORD"
	STATE             = "STATE"
	LOG               = "LOG"
	CONNECTION_STATUS = "CONNECTION_STATUS"
	CATALOG           = "CATALOG"
)
View Source
const (
	LOGLEVEL_ERROR = "ERROR"
	LOGLEVEL_WARN  = "WARN"
	LOGLEVEL_INFO  = "INFO"
)
View Source
const (
	SYNC_MODE_FULL_REFRESH = "full_refresh"
	SYNC_MODE_INCREMENTAL  = "incremental"
)
View Source
const MaxBatchSize = 10000

Variables

This section is empty.

Functions

func QueryResultToRecords

func QueryResultToRecords(qr *sqltypes.Result) []map[string]interface{}

func TabletTypeToString

func TabletTypeToString(t psdbconnect.TabletType) string

Types

type AirbyteLogMessage

type AirbyteLogMessage struct {
	Level   string `json:"level,omitempty"`
	Message string `json:"message,omitempty"`
}

type AirbyteLogger

type AirbyteLogger interface {
	Log(level, message string)
	Catalog(catalog Catalog)
	ConnectionStatus(status ConnectionStatus)
	Record(tableNamespace, tableName string, data map[string]interface{})
	Flush()
	State(syncState SyncState)
	Error(error string)
}

func NewLogger

func NewLogger(w io.Writer) AirbyteLogger

type AirbyteMessage

type AirbyteMessage struct {
	Type             string             `json:"type"`
	Log              *AirbyteLogMessage `json:"log,omitempty"`
	ConnectionStatus *ConnectionStatus  `json:"connectionStatus,omitempty"`
	Catalog          *Catalog           `json:"catalog,omitempty"`
	Record           *AirbyteRecord     `json:"record,omitempty"`
	State            *AirbyteState      `json:"state,omitempty"`
}

type AirbyteRecord

type AirbyteRecord struct {
	Stream    string                 `json:"stream"`
	Namespace string                 `json:"namespace"`
	EmittedAt int64                  `json:"emitted_at"`
	Data      map[string]interface{} `json:"data"`
}

type AirbyteState

type AirbyteState struct {
	Data SyncState `json:"data"`
}

type Catalog

type Catalog struct {
	Streams []Stream `json:"streams"`
}

type ConfiguredCatalog

type ConfiguredCatalog struct {
	Streams []ConfiguredStream `json:"streams"`
}

type ConfiguredStream

type ConfiguredStream struct {
	Stream   Stream `json:"stream"`
	SyncMode string `json:"sync_mode"`
}

func (ConfiguredStream) IncrementalSyncRequested

func (cs ConfiguredStream) IncrementalSyncRequested() bool

func (ConfiguredStream) ResetRequested

func (cs ConfiguredStream) ResetRequested() bool

type ConnectionProperties

type ConnectionProperties struct {
	Host     ConnectionProperty         `json:"host"`
	Shards   ConnectionProperty         `json:"shards"`
	Database ConnectionProperty         `json:"database"`
	Username ConnectionProperty         `json:"username"`
	Password ConnectionProperty         `json:"password"`
	Options  CustomOptionsSpecification `json:"options"`
}

type ConnectionProperty

type ConnectionProperty struct {
	Description string      `json:"description"`
	Title       string      `json:"title"`
	Type        string      `json:"type"`
	Order       int         `json:"order"`
	IsSecret    bool        `json:"airbyte_secret,omitempty"`
	Minimum     int         `json:"minimum,omitempty"`
	Maximum     int         `json:"maximum,omitempty"`
	Default     interface{} `json:"default,omitempty"`
}

type ConnectionPropertyHash added in v1.5.0

type ConnectionPropertyHash struct {
	Description string                             `json:"description"`
	Title       string                             `json:"title"`
	Type        string                             `json:"type"`
	Order       int                                `json:"order"`
	Options     []ConnectionPropertyHashProperties `json:"oneOf"`
}

type ConnectionPropertyHashProperties added in v1.5.0

type ConnectionPropertyHashProperties struct {
	DoNotTreatTinyIntAsBoolean ConnectionProperty `json:"do_not_treat_tiny_int_as_boolean"`
}

type ConnectionSpecification

type ConnectionSpecification struct {
	Schema               string               `json:"$schema"`
	Title                string               `json:"title"`
	Type                 string               `json:"type"`
	Required             []string             `json:"required"`
	AdditionalProperties bool                 `json:"additionalProperties"`
	Properties           ConnectionProperties `json:"properties"`
}

type ConnectionStatus

type ConnectionStatus struct {
	Status  string `json:"status"`
	Message string `json:"message"`
}

type CustomOptions added in v1.5.0

type CustomOptions struct {
	Description string                  `json:"description"`
	Title       string                  `json:"title"`
	Type        string                  `json:"type"`
	Order       int                     `json:"order"`
	Properties  CustomOptionsProperties `json:"properties"`
}

type CustomOptionsProperties added in v1.5.0

type CustomOptionsProperties struct {
	DoNotTreatTinyIntAsBoolean ConnectionProperty `json:"do_not_treat_tiny_int_as_boolean"`
}

type CustomOptionsSpecification added in v1.5.0

type CustomOptionsSpecification struct {
	Description string          `json:"description"`
	Title       string          `json:"title"`
	Type        string          `json:"type"`
	Order       int             `json:"order"`
	Options     []CustomOptions `json:"oneOf"`
}

type CustomSourceOptions added in v1.5.0

type CustomSourceOptions struct {
	DoNotTreatTinyIntAsBoolean bool `json:"do_not_treat_tiny_int_as_boolean"`
}

type PlanetScaleDatabase

type PlanetScaleDatabase interface {
	CanConnect(ctx context.Context, ps PlanetScaleSource) error
	DiscoverSchema(ctx context.Context, ps PlanetScaleSource) (Catalog, error)
	ListShards(ctx context.Context, ps PlanetScaleSource) ([]string, error)
	Read(ctx context.Context, w io.Writer, ps PlanetScaleSource, s ConfiguredStream, tc *psdbconnect.TableCursor) (*SerializedCursor, error)
	Close() error
}

PlanetScaleDatabase is a general purpose interface that defines all the data access methods needed for the PlanetScale Airbyte source to function.

type PlanetScaleEdgeDatabase

type PlanetScaleEdgeDatabase struct {
	Logger AirbyteLogger
	Mysql  PlanetScaleEdgeMysqlAccess
	// contains filtered or unexported fields
}

PlanetScaleEdgeDatabase is an implementation of the PlanetScaleDatabase interface defined above. It uses the mysql interface provided by PlanetScale for all schema/shard/tablet discovery and the grpc API for incrementally syncing rows from PlanetScale.

func (PlanetScaleEdgeDatabase) CanConnect

func (PlanetScaleEdgeDatabase) Close

func (p PlanetScaleEdgeDatabase) Close() error

func (PlanetScaleEdgeDatabase) DiscoverSchema

func (p PlanetScaleEdgeDatabase) DiscoverSchema(ctx context.Context, psc PlanetScaleSource) (Catalog, error)

func (PlanetScaleEdgeDatabase) ListShards

func (PlanetScaleEdgeDatabase) Read

Read streams rows from a table given a starting cursor. 1. We will get the latest vgtid for a given table in a shard when a sync session starts. 2. This latest vgtid is now the stopping point for this sync session. 3. Ask vstream to stream from the last known vgtid 4. When we reach the stopping point, read all rows available at this vgtid 5. End the stream when (a) a vgtid newer than latest vgtid is encountered or (b) the timeout kicks in.

type PlanetScaleEdgeMysqlAccess

type PlanetScaleEdgeMysqlAccess interface {
	PingContext(context.Context, PlanetScaleSource) error
	GetTableNames(context.Context, PlanetScaleSource) ([]string, error)
	GetTableSchema(context.Context, PlanetScaleSource, string) (map[string]PropertyType, error)
	GetTablePrimaryKeys(context.Context, PlanetScaleSource, string) ([]string, error)
	GetVitessShards(ctx context.Context, psc PlanetScaleSource) ([]string, error)
	GetVitessTablets(ctx context.Context, psc PlanetScaleSource) ([]VitessTablet, error)
	Close() error
}

type PlanetScaleSource

type PlanetScaleSource struct {
	Host     string              `json:"host"`
	Database string              `json:"database"`
	Username string              `json:"username"`
	Password string              `json:"password"`
	Shards   string              `json:"shards"`
	Options  CustomSourceOptions `json:"options"`
}

PlanetScaleSource defines a configured Airbyte Source for a PlanetScale database

func (PlanetScaleSource) DSN

DSN returns a DataSource that mysql libraries can use to connect to a PlanetScale database.

func (PlanetScaleSource) GetInitialState

func (psc PlanetScaleSource) GetInitialState(keyspaceOrDatabase string, shards []string) (ShardStates, error)

GetInitialState will return the initial/blank state for a given keyspace in all of its shards. This state can be round-tripped safely with Airbyte.

type PropertyType

type PropertyType struct {
	Type         string `json:"type"`
	CustomFormat string `json:"format,omitempty"`
	AirbyteType  string `json:"airbyte_type,omitempty"`
}

type SerializedCursor

type SerializedCursor struct {
	Cursor string `json:"cursor"`
}

func TableCursorToSerializedCursor

func TableCursorToSerializedCursor(cursor *psdbconnect.TableCursor) (*SerializedCursor, error)

func (SerializedCursor) SerializedCursorToTableCursor

func (s SerializedCursor) SerializedCursorToTableCursor(table ConfiguredStream) (*psdbconnect.TableCursor, error)

type ShardStates

type ShardStates struct {
	Shards map[string]*SerializedCursor `json:"shards"`
}

type Spec

type Spec struct {
	DocumentationURL              string                  `json:"documentationUrl"`
	ConnectionSpecification       ConnectionSpecification `json:"connectionSpecification"`
	SupportsIncremental           bool                    `json:"supportsIncremental"`
	SupportsNormalization         bool                    `json:"supportsNormalization"`
	SupportsDBT                   bool                    `json:"supportsDBT"`
	SupportedDestinationSyncModes []string                `json:"supported_destination_sync_modes"`
}

type SpecMessage

type SpecMessage struct {
	Type string `json:"type"`
	Spec Spec   `json:"spec"`
}

type Stream

type Stream struct {
	Name                string       `json:"name"`
	Schema              StreamSchema `json:"json_schema"`
	SupportedSyncModes  []string     `json:"supported_sync_modes"`
	Namespace           string       `json:"namespace"`
	PrimaryKeys         [][]string   `json:"source_defined_primary_key"`
	SourceDefinedCursor bool         `json:"source_defined_cursor"`
	DefaultCursorFields []string     `json:"default_cursor_field"`
}

type StreamSchema

type StreamSchema struct {
	Type       string                  `json:"type"`
	Properties map[string]PropertyType `json:"properties"`
}

type SyncState

type SyncState struct {
	Streams map[string]ShardStates `json:"streams"`
}

type VitessTablet

type VitessTablet struct {
	Cell                 string
	Keyspace             string
	Shard                string
	TabletType           string
	State                string
	Alias                string
	Hostname             string
	PrimaryTermStartTime string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL