airbyte

package
v0.0.0-rc6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 29, 2024 License: Apache-2.0 Imports: 38 Imported by: 0

README

Airbyte provider

This is a bridge between native transfer and airbyte connector.

We support source airbyte connectors

How to add new airbyte connector

1. Add proto-model

Each airbyte provider has own config. To get such config you can run:

docker run airbyte/source-snowflake:0.1.31 spec

This would output airbyte spec for a connector. Output may contains also logs, but some line should have type SPEC. Something like this:

{"type":"LOG","log":{"level":"INFO","message":"INFO i.a.i.b.IntegrationRunner(runInternal):107 Integration config: IntegrationConfig{command=SPEC, configPath='null', catalogPath='null', statePath='null'}"}}
{"type":"SPEC","spec":{"documentationUrl":"https://docs.airbyte.com/integrations/sources/snowflake","connectionSpecification":{"$schema":"http://json-schema.org/draft-07/schema#","title":"Snowflake Source Spec","type":"object","required":["host","role","warehouse","database"],"properties":{"credentials":{"title":"Authorization Method","type":"object","oneOf":[{"type":"object","title":"OAuth2.0","order":0,"required":["client_id","client_secret","auth_type"],"properties":{"auth_type":{"type":"string","const":"OAuth","order":0},"client_id":{"type":"string","title":"Client ID","description":"The Client ID of your Snowflake developer application.","airbyte_secret":true,"order":1},"client_secret":{"type":"string","title":"Client Secret","description":"The Client Secret of your Snowflake developer application.","airbyte_secret":true,"order":2},"access_token":{"type":"string","title":"Access Token","description":"Access Token for making authenticated requests.","airbyte_secret":true,"order":3},"refresh_token":{"type":"string","title":"Refresh Token","description":"Refresh Token for making authenticated requests.","airbyte_secret":true,"order":4}}},{"title":"Username and Password","type":"object","required":["username","password","auth_type"],"order":1,"properties":{"auth_type":{"type":"string","const":"username/password","order":0},"username":{"description":"The username you created to allow Airbyte to access the database.","examples":["AIRBYTE_USER"],"type":"string","title":"Username","order":1},"password":{"description":"The password associated with the username.","type":"string","airbyte_secret":true,"title":"Password","order":2}}}],"order":0},"host":{"description":"The host domain of the snowflake instance (must include the account, region, cloud environment, and end with snowflakecomputing.com).","examples":["accountname.us-east-2.aws.snowflakecomputing.com"],"type":"string","title":"Account Name","order":1},"role":{"description":"The role you created for Airbyte to access Snowflake.","examples":["AIRBYTE_ROLE"],"type":"string","title":"Role","order":2},"warehouse":{"description":"The warehouse you created for Airbyte to access data.","examples":["AIRBYTE_WAREHOUSE"],"type":"string","title":"Warehouse","order":3},"database":{"description":"The database you created for Airbyte to access data.","examples":["AIRBYTE_DATABASE"],"type":"string","title":"Database","order":4},"schema":{"description":"The source Snowflake schema tables. Leave empty to access tables from multiple schemas.","examples":["AIRBYTE_SCHEMA"],"type":"string","title":"Schema","order":5},"jdbc_url_params":{"description":"Additional properties to pass to the JDBC URL string when connecting to the database formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3).","title":"JDBC URL Params","type":"string","order":6}}},"supportsNormalization":false,"supportsDBT":false,"supported_destination_sync_modes":[],"advanced_auth":{"auth_flow_type":"oauth2.0","predicate_key":["credentials","auth_type"],"predicate_value":"OAuth","oauth_config_specification":{"oauth_user_input_from_connector_config_specification":{"type":"object","properties":{"host":{"type":"string","path_in_connector_config":["host"]},"role":{"type":"string","path_in_connector_config":["role"]}}},"complete_oauth_output_specification":{"type":"object","properties":{"access_token":{"type":"string","path_in_connector_config":["credentials","access_token"]},"refresh_token":{"type":"string","path_in_connector_config":["credentials","refresh_token"]}}},"complete_oauth_server_input_specification":{"type":"object","properties":{"client_id":{"type":"string"},"client_secret":{"type":"string"}}},"complete_oauth_server_output_specification":{"type":"object","properties":{"client_id":{"type":"string","path_in_connector_config":["credentials","client_id"]},"client_secret":{"type":"string","path_in_connector_config":["credentials","client_secret"]}}}}}}}
{"type":"LOG","log":{"level":"INFO","message":"INFO i.a.i.b.IntegrationRunner(runInternal):182 Completed integration: io.airbyte.integrations.source.snowflake.SnowflakeSource"}}

Spec line contains connectionSpecification key with JSON schema model, this model can be a base for our proto model.

This specification can be baseline for new proto, for example we can use some semi-automatic tools to convert json-schema into proto.

As example you can refer to exist protos

2. Adding docker image and provider type

Each user facing provider has own value of API-Providers enum. If there is no provider in enum - you need to add new one.

Beside that this enum should be added to Known Airbyte Providers list.

To create linkage between API-enum into specific Airbyte provider-code we need to add mappings:

  1. Between Provider Enum and proto-model here
  2. Between Provider Enum and Airbyte-docker image here

By default, we map proto message into json message as is with standard proto-json mapper, but for some cases (for example one-of fields) we should add extra mapping code like we do here.

3. Enable new provider

For managing providers across installation we use grants mechanism, so as last step we should add the new airbyte grant like we do here.

In an initial phase its recommended to enable the new provider in preview mode until a certain stability can be proven.

	NewAirbyteGrant(api.EndpointType_SNOWFLAKE, "Snowflake",
		AWS().Preview(),
	)
4. Optional e2e test

We have a fat pre-commit test for the airbyte connectors which is run by a special script go/tests/e2e/run_teamcity_docker_tests.sh, so if you have a stable instance that can be used as CI-source you can add a new test like it's done in s3csv or another test in that directory.

Example PR adding Airbyte Snowflake connector

For a full example adding a new airbyte connectory you can have a look at this PR

Documentation

Index

Constants

View Source
const (
	S3                  = EndpointType(24)
	Redshift            = EndpointType(25)
	MSSQL               = EndpointType(26)
	Bigquery            = EndpointType(27)
	Salesforce          = EndpointType(28)
	AmazonAds           = EndpointType(29)
	AmazonSellerPartner = EndpointType(30)
	AwsCloudtrail       = EndpointType(31)
	GoogleAnalytics     = EndpointType(33)
	GoogleSheets        = EndpointType(34)
	BingAds             = EndpointType(35)
	LinkedinAds         = EndpointType(36)
	FacebookMarketing   = EndpointType(37)
	FacebookPages       = EndpointType(38)
	TiktokMarketing     = EndpointType(39)
	SnapchatMarketing   = EndpointType(40)
	Instagram           = EndpointType(41)
	Snowflake           = EndpointType(47)
	Jira                = EndpointType(50)
	Hubspot             = EndpointType(51)
)
View Source
const AirbyteStateKey = "airbyte_state"
View Source
const (
	DiscoveryCheck = abstract.CheckType("discover")
)
View Source
const (
	ProviderType = abstract.ProviderType("airbyte")
)

Variables

View Source
var DefaultImages = map[EndpointType]string{
	AmazonAds:           "airbyte/source-amazon-ads:0.1.3",
	AmazonSellerPartner: "airbyte/source-amazon-seller-partner:0.2.14",
	AwsCloudtrail:       "airbyte/source-aws-cloudtrail:0.1.4",
	Bigquery:            "airbyte/source-bigquery:0.1.8",
	BingAds:             "airbyte/source-bing-ads:0.1.3",
	FacebookMarketing:   "airbyte/source-facebook-marketing:0.2.35",
	FacebookPages:       "airbyte/source-facebook-pages:0.1.6",
	GoogleAds:           "airbyte/source-google-ads:0.1.27",
	GoogleAnalytics:     "airbyte/source-google-analytics-v4:0.1.16",
	GoogleSheets:        "airbyte/source-google-sheets:0.2.9",
	Instagram:           "airbyte/source-instagram:0.1.9",
	LinkedinAds:         "airbyte/source-linkedin-ads:2.1.2",
	MSSQL:               "airbyte/source-mssql:0.3.17",
	Redshift:            "airbyte/source-redshift:0.3.8",
	S3:                  "airbyte/source-s3:0.1.18",
	Salesforce:          "airbyte/source-salesforce:0.1.23",
	SnapchatMarketing:   "airbyte/source-snapchat-marketing:0.1.4",
	TiktokMarketing:     "airbyte/source-tiktok-marketing:0.1.3",
	Snowflake:           "airbyte/source-snowflake:0.1.32",
	Jira:                "airbyte/source-jira:0.10.2",
	Hubspot:             "airbyte/source-hubspot:0.2.1",
}
View Source
var RecordIndexCol = abstract.ColSchema{
	TableSchema:  "",
	TableName:    "",
	Path:         "",
	ColumnName:   "__dt_record_index",
	DataType:     ytschema.TypeInt64.String(),
	PrimaryKey:   true,
	FakeKey:      false,
	Required:     true,
	Expression:   "",
	OriginalType: "",
	Properties:   nil,
}

Functions

func New

func New(lgr log.Logger, registry metrics.Registry, cp cpclient.Coordinator, transfer *model.Transfer) providers.Provider

func NewStreamRecord

func NewStreamRecord(
	stream *AirbyteStream,
	cols []string,
	record *Record,
	tableSchema *abstract.TableSchema,
	colToIndex map[string]int,
	rowIndex int,
) events.InsertEvent

func StateKey

func StateKey(table abstract.TableID) string

Types

type AirbyteSource

type AirbyteSource struct {
	Config         string
	BaseDir        string
	ProtoConfig    string
	BatchSizeLimit model.BytesSize
	RecordsLimit   int
	EndpointType   EndpointType
	MaxRowSize     int
	Image          string
}

func (*AirbyteSource) DataDir

func (s *AirbyteSource) DataDir() string

func (*AirbyteSource) DockerImage

func (s *AirbyteSource) DockerImage() string

func (*AirbyteSource) GetProviderType

func (s *AirbyteSource) GetProviderType() abstract.ProviderType

func (*AirbyteSource) IsAbstract2

func (*AirbyteSource) IsAbstract2(model.Destination) bool

func (*AirbyteSource) IsIncremental

func (*AirbyteSource) IsIncremental()

func (*AirbyteSource) IsSource

func (*AirbyteSource) IsSource()

func (*AirbyteSource) IsStrictSource

func (*AirbyteSource) IsStrictSource()

func (*AirbyteSource) SupportMultiThreads

func (s *AirbyteSource) SupportMultiThreads() bool

func (*AirbyteSource) SupportMultiWorkers

func (s *AirbyteSource) SupportMultiWorkers() bool

func (*AirbyteSource) SupportsStartCursorValue

func (*AirbyteSource) SupportsStartCursorValue() bool

func (*AirbyteSource) Validate

func (s *AirbyteSource) Validate() error

func (*AirbyteSource) WithDefaults

func (s *AirbyteSource) WithDefaults()

type AirbyteStream

type AirbyteStream struct {
	Name                    string     `json:"name"`
	JSONSchema              JSONSchema `json:"json_schema"`
	SupportedSyncModes      []string   `json:"supported_sync_modes"`
	DefaultCursorField      []string   `json:"default_cursor_field"`
	SourceDefinedPrimaryKey [][]string `json:"source_defined_primary_key"`
	Namespace               string     `json:"namespace"`
}

func (AirbyteStream) TableID

func (s AirbyteStream) TableID() abstract.TableID

type AirbyteSyncStream

type AirbyteSyncStream struct {
	Stream              AirbyteStream `json:"stream"`
	SyncMode            string        `json:"sync_mode"`
	CursorField         []string      `json:"cursor_field"`
	DestinationSyncMode string        `json:"destination_sync_mode"`
}

func (AirbyteSyncStream) TableID

func (s AirbyteSyncStream) TableID() abstract.TableID

type Catalog

type Catalog struct {
	Streams []Stream `json:"streams"`
}

type ConfiguredCatalog

type ConfiguredCatalog struct {
	Streams []ConfiguredStream `json:"streams"`
}

func (*ConfiguredCatalog) Validate

func (c *ConfiguredCatalog) Validate() error

type ConfiguredStream

type ConfiguredStream struct {
	Stream              Stream              `json:"stream"`
	SyncMode            string              `json:"sync_mode"`
	DestinationSyncMode DestinationSyncMode `json:"destination_sync_mode"`
	CursorField         []string            `json:"cursor_field,omitempty"`
	PrimaryKey          [][]string          `json:"primary_key,omitempty"`
}

func FindStream

func FindStream(streams []ConfiguredStream, obj string) (*ConfiguredStream, error)

func (*ConfiguredStream) Validate

func (c *ConfiguredStream) Validate() error

type ConnectionStatus

type ConnectionStatus struct {
	Status  Status `json:"status"`
	Message string `json:"message"`
}

type DataObject

type DataObject struct {
	// contains filtered or unexported fields
}

func (*DataObject) Close

func (d *DataObject) Close()

func (*DataObject) Err

func (d *DataObject) Err() error

func (*DataObject) FullName

func (d *DataObject) FullName() string

func (*DataObject) Name

func (d *DataObject) Name() string

func (*DataObject) Next

func (d *DataObject) Next() bool

func (*DataObject) Part

func (d *DataObject) Part() (base.DataObjectPart, error)

func (*DataObject) ToOldTableDescription

func (d *DataObject) ToOldTableDescription() (*abstract.TableDescription, error)

func (*DataObject) ToOldTableID

func (d *DataObject) ToOldTableID() (*abstract.TableID, error)

func (*DataObject) ToTablePart

func (d *DataObject) ToTablePart() (*abstract.TableDescription, error)

type DataObjects

type DataObjects struct {
	// contains filtered or unexported fields
}

func NewDataObjects

func NewDataObjects(catalog *Catalog, filter base.DataObjectFilter) (*DataObjects, error)

func (*DataObjects) Close

func (d *DataObjects) Close()

func (*DataObjects) Err

func (d *DataObjects) Err() error

func (*DataObjects) Next

func (d *DataObjects) Next() bool

func (*DataObjects) Object

func (d *DataObjects) Object() (base.DataObject, error)

func (*DataObjects) ToOldTableMap

func (d *DataObjects) ToOldTableMap() (abstract.TableMap, error)

type DestinationSyncMode

type DestinationSyncMode string
const (
	DestinationSyncModeAppend      DestinationSyncMode = "append"
	DestinationSyncModeOverwrite   DestinationSyncMode = "overwrite"
	DestinationSyncModeAppendDedup DestinationSyncMode = "append_dedup"
)

type EndpointType

type EndpointType int

type JSONProperty

type JSONProperty struct {
	Type        []string `json:"type"`
	Format      string   `json:"format"`
	AirbyteType string   `json:"airbyte_type"`
}

JSONProperty type can be array or single string, so had to implement custom unmarshal

func (*JSONProperty) UnmarshalJSON

func (b *JSONProperty) UnmarshalJSON(data []byte) error

type JSONSchema

type JSONSchema struct {
	Type       StringOrArray           `json:"type"`
	Properties map[string]JSONProperty `json:"properties"`
}

type Log

type Log struct {
	Level   LogLevel `json:"level"`
	Message string   `json:"message"`
}

type LogLevel

type LogLevel string
const (
	LogLevelTrace LogLevel = "TRACE"
	LogLevelDebug LogLevel = "DEBUG"
	LogLevelInfo  LogLevel = "INFO"
	LogLevelWarn  LogLevel = "WARN"
	LogLevelError LogLevel = "ERROR"
	LogLevelFatal LogLevel = "FATAL"
)

type LogRecord

type LogRecord struct {
	Level   string `json:"level"`
	Message string `json:"message"`
}

type Message

type Message struct {
	Type             MessageType       `json:"type"`
	Log              *Log              `json:"log,omitempty"`
	State            *State            `json:"state,omitempty"`
	Record           *Record           `json:"record,omitempty"`
	ConnectionStatus *ConnectionStatus `json:"connectionStatus,omitempty"`
	Spec             *Spec             `json:"spec,omitempty"`
	Catalog          *Catalog          `json:"catalog,omitempty"`
}

type MessageType

type MessageType string
const (
	MessageTypeRecord           MessageType = "RECORD"
	MessageTypeState            MessageType = "STATE"
	MessageTypeLog              MessageType = "LOG"
	MessageTypeSpec             MessageType = "SPEC"
	MessageTypeConnectionStatus MessageType = "CONNECTION_STATUS"
	MessageTypeCatalog          MessageType = "CATALOG"
)

type Provider

type Provider struct {
	// contains filtered or unexported fields
}

func (*Provider) Activate

func (p *Provider) Activate(ctx context.Context, task *model.TransferOperation, tables abstract.TableMap, callbacks providers.ActivateCallbacks) error

func (*Provider) Source

func (p *Provider) Source() (abstract.Source, error)

func (*Provider) Storage

func (p *Provider) Storage() (abstract.Storage, error)

func (*Provider) Test

func (p *Provider) Test(ctx context.Context) *abstract.TestResult

func (*Provider) TestChecks

func (p *Provider) TestChecks() []abstract.CheckType

func (*Provider) Type

func (p *Provider) Type() abstract.ProviderType

type Record

type Record struct {
	Stream     string                 `json:"stream"`
	Data       json.RawMessage        `json:"data"`
	EmittedAt  int64                  `json:"emitted_at"`
	Namespace  string                 `json:"namespace,omitempty"`
	ParsedData map[string]interface{} `json:"-"`
}

func (*Record) LazyParse

func (r *Record) LazyParse() error

func (Record) TableID

func (r Record) TableID() abstract.TableID

type RecordBatch

type RecordBatch struct {
	// contains filtered or unexported fields
}

func NewRecordBatch

func NewRecordBatch(rowIndexOffset int, stream *AirbyteStream) *RecordBatch

func (*RecordBatch) AsChangeItems

func (r *RecordBatch) AsChangeItems() ([]abstract.ChangeItem, error)

func (*RecordBatch) Count

func (r *RecordBatch) Count() int

func (*RecordBatch) Event

func (r *RecordBatch) Event() (base.Event, error)

func (*RecordBatch) Next

func (r *RecordBatch) Next() bool

func (*RecordBatch) Size

func (r *RecordBatch) Size() int

type RowsRecord

type RowsRecord struct {
	Stream *AirbyteStream
	Record *Record

	TableSchema *abstract.TableSchema
	// contains filtered or unexported fields
}

func (*RowsRecord) NewValue

func (s *RowsRecord) NewValue(i int) (base.Value, error)

func (*RowsRecord) NewValuesCount

func (s *RowsRecord) NewValuesCount() int

func (*RowsRecord) Table

func (s *RowsRecord) Table() base.Table

func (*RowsRecord) ToOldChangeItem

func (s *RowsRecord) ToOldChangeItem() (*abstract.ChangeItem, error)

type Source

type Source struct {
	// contains filtered or unexported fields
}

func NewSource

func NewSource(lgr log.Logger, registry metrics.Registry, cp coordinator.Coordinator, cfg *AirbyteSource, transfer *model.Transfer) *Source

func (*Source) Run

func (s *Source) Run(sink abstract.AsyncSink) error

func (*Source) Stop

func (s *Source) Stop()

type Spec

type Spec struct {
	DocumentationURL        string          `json:"documentationUrl,omitempty"`
	ChangelogURL            string          `json:"changelogUrl,omitempty"`
	ConnectionSpecification json.RawMessage `json:"connectionSpecification"`
	SupportsIncremental     bool            `json:"supportsIncremental,omitempty"`

	// SupportedDestinationSyncModes is ignored by Flow
	SupportedDestinationSyncModes []DestinationSyncMode `json:"supported_destination_sync_modes,omitempty"`
	// SupportsNormalization is not currently used or supported by Flow or estuary-developed
	// connectors
	SupportsNormalization bool `json:"supportsNormalization,omitempty"`
	// SupportsDBT is not currently used or supported by Flow or estuary-developed
	// connectors
	SupportsDBT bool `json:"supportsDBT,omitempty"`
	// AuthSpecification is not currently used or supported by Flow or estuary-developed
	// connectors
	AuthSpecification json.RawMessage `json:"authSpecification,omitempty"`
}

type State

type State struct {
	// Data is the actual state associated with the ingestion. This must be a JSON _Object_ in order
	// to comply with the airbyte specification.
	Data json.RawMessage `json:"data"`
	// Merge indicates that Data is an RFC 7396 JSON Merge Patch, and should
	// be be reduced into the previous state accordingly.
	Merge bool `json:"estuary.dev/merge,omitempty"`
}

func (*State) UnmarshalJSON

func (s *State) UnmarshalJSON(b []byte) error

type Status

type Status string
const (
	StatusSucceeded Status = "SUCCEEDED"
	StatusFailed    Status = "FAILED"
)

type Storage

type Storage struct {
	// contains filtered or unexported fields
}

func NewStorage

func NewStorage(lgr log.Logger, registry metrics.Registry, cp coordinator.Coordinator, cfg *AirbyteSource, transfer *model.Transfer) (*Storage, error)

func (*Storage) Close

func (a *Storage) Close()

func (*Storage) EstimateTableRowsCount

func (a *Storage) EstimateTableRowsCount(table abstract.TableID) (uint64, error)

func (*Storage) ExactTableRowsCount

func (a *Storage) ExactTableRowsCount(table abstract.TableID) (uint64, error)

func (*Storage) GetIncrementalState

func (a *Storage) GetIncrementalState(ctx context.Context, incremental []abstract.IncrementalTable) ([]abstract.TableDescription, error)

func (*Storage) LoadTable

func (a *Storage) LoadTable(ctx context.Context, table abstract.TableDescription, pusher abstract.Pusher) error

func (*Storage) Ping

func (a *Storage) Ping() error

func (*Storage) SetInitialState

func (a *Storage) SetInitialState(tables []abstract.TableDescription, incrementalTables []abstract.IncrementalTable)

SetInitialState should have done nothing, since state handled inside loadTable method

func (*Storage) TableExists

func (a *Storage) TableExists(table abstract.TableID) (bool, error)

func (*Storage) TableList

func (a *Storage) TableList(filter abstract.IncludeTableList) (abstract.TableMap, error)

func (*Storage) TableSchema

func (a *Storage) TableSchema(ctx context.Context, table abstract.TableID) (*abstract.TableSchema, error)

type Stream

type Stream struct {
	Name                    string          `json:"name"`
	JSONSchema              json.RawMessage `json:"json_schema"`
	SupportedSyncModes      []string        `json:"supported_sync_modes"`
	SourceDefinedCursor     bool            `json:"source_defined_cursor,omitempty"`
	DefaultCursorField      []string        `json:"default_cursor_field,omitempty"`
	SourceDefinedPrimaryKey [][]string      `json:"source_defined_primary_key,omitempty"`
	Namespace               string          `json:"namespace,omitempty"`
}

func (*Stream) AsModel

func (s *Stream) AsModel() *AirbyteStream

func (*Stream) Fqtn

func (s *Stream) Fqtn() string

func (*Stream) ParsedJSONSchema

func (s *Stream) ParsedJSONSchema() JSONSchema

func (*Stream) SupportMode

func (s *Stream) SupportMode(mode string) bool

func (*Stream) TableID

func (s *Stream) TableID() abstract.TableID

func (*Stream) Validate

func (s *Stream) Validate() error

type StreamState

type StreamState struct {
	StreamName      string   `json:"stream_name"`
	StreamNamespace string   `json:"stream_namespace"`
	CursorField     []string `json:"cursor_field"`
	Cursor          string   `json:"cursor,omitempty"`
}

func (StreamState) TableID

func (s StreamState) TableID() abstract.TableID

type StringOrArray

type StringOrArray struct {
	Array []string
}

func (*StringOrArray) MarshalJSON

func (b *StringOrArray) MarshalJSON() ([]byte, error)

func (*StringOrArray) UnmarshalJSON

func (b *StringOrArray) UnmarshalJSON(data []byte) error

type SyncStreams

type SyncStreams struct {
	Streams []AirbyteSyncStream `json:"streams"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL