Documentation ¶
Index ¶
- Constants
- Variables
- func New(lgr log.Logger, registry metrics.Registry, cp cpclient.Coordinator, ...) providers.Provider
- func NewStreamRecord(stream *AirbyteStream, cols []string, record *Record, ...) events.InsertEvent
- func StateKey(table abstract.TableID) string
- type AirbyteSource
- func (s *AirbyteSource) DataDir() string
- func (s *AirbyteSource) DockerImage() string
- func (s *AirbyteSource) GetProviderType() abstract.ProviderType
- func (*AirbyteSource) IsAbstract2(model.Destination) bool
- func (*AirbyteSource) IsIncremental()
- func (*AirbyteSource) IsSource()
- func (*AirbyteSource) IsStrictSource()
- func (s *AirbyteSource) SupportMultiThreads() bool
- func (s *AirbyteSource) SupportMultiWorkers() bool
- func (*AirbyteSource) SupportsStartCursorValue() bool
- func (s *AirbyteSource) Validate() error
- func (s *AirbyteSource) WithDefaults()
- type AirbyteStream
- type AirbyteSyncStream
- type Catalog
- type ConfiguredCatalog
- type ConfiguredStream
- type ConnectionStatus
- type DataObject
- func (d *DataObject) Close()
- func (d *DataObject) Err() error
- func (d *DataObject) FullName() string
- func (d *DataObject) Name() string
- func (d *DataObject) Next() bool
- func (d *DataObject) Part() (base.DataObjectPart, error)
- func (d *DataObject) ToOldTableDescription() (*abstract.TableDescription, error)
- func (d *DataObject) ToOldTableID() (*abstract.TableID, error)
- func (d *DataObject) ToTablePart() (*abstract.TableDescription, error)
- type DataObjects
- type DestinationSyncMode
- type EndpointType
- type JSONProperty
- type JSONSchema
- type Log
- type LogLevel
- type LogRecord
- type Message
- type MessageType
- type Provider
- func (p *Provider) Activate(ctx context.Context, task *model.TransferOperation, tables abstract.TableMap, ...) error
- func (p *Provider) Source() (abstract.Source, error)
- func (p *Provider) Storage() (abstract.Storage, error)
- func (p *Provider) Test(ctx context.Context) *abstract.TestResult
- func (p *Provider) TestChecks() []abstract.CheckType
- func (p *Provider) Type() abstract.ProviderType
- type Record
- type RecordBatch
- type RowsRecord
- type Source
- type Spec
- type State
- type Status
- type Storage
- func (a *Storage) Close()
- func (a *Storage) EstimateTableRowsCount(table abstract.TableID) (uint64, error)
- func (a *Storage) ExactTableRowsCount(table abstract.TableID) (uint64, error)
- func (a *Storage) GetIncrementalState(ctx context.Context, incremental []abstract.IncrementalTable) ([]abstract.TableDescription, error)
- func (a *Storage) LoadTable(ctx context.Context, table abstract.TableDescription, pusher abstract.Pusher) error
- func (a *Storage) Ping() error
- func (a *Storage) SetInitialState(tables []abstract.TableDescription, ...)
- func (a *Storage) TableExists(table abstract.TableID) (bool, error)
- func (a *Storage) TableList(filter abstract.IncludeTableList) (abstract.TableMap, error)
- func (a *Storage) TableSchema(ctx context.Context, table abstract.TableID) (*abstract.TableSchema, error)
- type Stream
- type StreamState
- type StringOrArray
- type SyncStreams
Constants ¶
View Source
const ( S3 = EndpointType(24) Redshift = EndpointType(25) MSSQL = EndpointType(26) Bigquery = EndpointType(27) Salesforce = EndpointType(28) AmazonAds = EndpointType(29) AmazonSellerPartner = EndpointType(30) AwsCloudtrail = EndpointType(31) GoogleAds = EndpointType(32) GoogleAnalytics = EndpointType(33) GoogleSheets = EndpointType(34) BingAds = EndpointType(35) LinkedinAds = EndpointType(36) FacebookMarketing = EndpointType(37) FacebookPages = EndpointType(38) TiktokMarketing = EndpointType(39) SnapchatMarketing = EndpointType(40) Instagram = EndpointType(41) Snowflake = EndpointType(47) Jira = EndpointType(50) Hubspot = EndpointType(51) )
View Source
const AirbyteStateKey = "airbyte_state"
View Source
const (
DiscoveryCheck = abstract.CheckType("discover")
)
View Source
const (
ProviderType = abstract.ProviderType("airbyte")
)
Variables ¶
View Source
var AllDestinationSyncModes = []DestinationSyncMode{ DestinationSyncModeAppend, DestinationSyncModeOverwrite, DestinationSyncModeAppendDedup, }
View Source
var DefaultImages = map[EndpointType]string{ AmazonAds: "airbyte/source-amazon-ads:0.1.3", AmazonSellerPartner: "airbyte/source-amazon-seller-partner:0.2.14", AwsCloudtrail: "airbyte/source-aws-cloudtrail:0.1.4", Bigquery: "airbyte/source-bigquery:0.1.8", BingAds: "airbyte/source-bing-ads:0.1.3", FacebookMarketing: "airbyte/source-facebook-marketing:0.2.35", FacebookPages: "airbyte/source-facebook-pages:0.1.6", GoogleAds: "airbyte/source-google-ads:0.1.27", GoogleAnalytics: "airbyte/source-google-analytics-v4:0.1.16", GoogleSheets: "airbyte/source-google-sheets:0.2.9", Instagram: "airbyte/source-instagram:0.1.9", LinkedinAds: "airbyte/source-linkedin-ads:2.1.2", MSSQL: "airbyte/source-mssql:0.3.17", Redshift: "airbyte/source-redshift:0.3.8", S3: "airbyte/source-s3:0.1.18", Salesforce: "airbyte/source-salesforce:0.1.23", SnapchatMarketing: "airbyte/source-snapchat-marketing:0.1.4", TiktokMarketing: "airbyte/source-tiktok-marketing:0.1.3", Snowflake: "airbyte/source-snowflake:0.1.32", Jira: "airbyte/source-jira:0.10.2", Hubspot: "airbyte/source-hubspot:0.2.1", }
Functions ¶
func NewStreamRecord ¶
func NewStreamRecord( stream *AirbyteStream, cols []string, record *Record, tableSchema *abstract.TableSchema, colToIndex map[string]int, rowIndex int, ) events.InsertEvent
Types ¶
type AirbyteSource ¶
type AirbyteSource struct { Config string BaseDir string ProtoConfig string BatchSizeLimit model.BytesSize RecordsLimit int EndpointType EndpointType MaxRowSize int Image string }
func (*AirbyteSource) DataDir ¶
func (s *AirbyteSource) DataDir() string
func (*AirbyteSource) DockerImage ¶
func (s *AirbyteSource) DockerImage() string
func (*AirbyteSource) GetProviderType ¶
func (s *AirbyteSource) GetProviderType() abstract.ProviderType
func (*AirbyteSource) IsAbstract2 ¶
func (*AirbyteSource) IsAbstract2(model.Destination) bool
func (*AirbyteSource) IsIncremental ¶
func (*AirbyteSource) IsIncremental()
func (*AirbyteSource) IsSource ¶
func (*AirbyteSource) IsSource()
func (*AirbyteSource) IsStrictSource ¶
func (*AirbyteSource) IsStrictSource()
func (*AirbyteSource) SupportMultiThreads ¶
func (s *AirbyteSource) SupportMultiThreads() bool
func (*AirbyteSource) SupportMultiWorkers ¶
func (s *AirbyteSource) SupportMultiWorkers() bool
func (*AirbyteSource) SupportsStartCursorValue ¶
func (*AirbyteSource) SupportsStartCursorValue() bool
func (*AirbyteSource) Validate ¶
func (s *AirbyteSource) Validate() error
func (*AirbyteSource) WithDefaults ¶
func (s *AirbyteSource) WithDefaults()
type AirbyteStream ¶
type AirbyteStream struct { Name string `json:"name"` JSONSchema JSONSchema `json:"json_schema"` SupportedSyncModes []string `json:"supported_sync_modes"` DefaultCursorField []string `json:"default_cursor_field"` SourceDefinedPrimaryKey [][]string `json:"source_defined_primary_key"` Namespace string `json:"namespace"` }
func (AirbyteStream) TableID ¶
func (s AirbyteStream) TableID() abstract.TableID
type AirbyteSyncStream ¶
type AirbyteSyncStream struct { Stream AirbyteStream `json:"stream"` SyncMode string `json:"sync_mode"` CursorField []string `json:"cursor_field"` DestinationSyncMode string `json:"destination_sync_mode"` }
func (AirbyteSyncStream) TableID ¶
func (s AirbyteSyncStream) TableID() abstract.TableID
type ConfiguredCatalog ¶
type ConfiguredCatalog struct {
Streams []ConfiguredStream `json:"streams"`
}
func (*ConfiguredCatalog) Validate ¶
func (c *ConfiguredCatalog) Validate() error
type ConfiguredStream ¶
type ConfiguredStream struct { Stream Stream `json:"stream"` SyncMode string `json:"sync_mode"` DestinationSyncMode DestinationSyncMode `json:"destination_sync_mode"` CursorField []string `json:"cursor_field,omitempty"` PrimaryKey [][]string `json:"primary_key,omitempty"` }
func FindStream ¶
func FindStream(streams []ConfiguredStream, obj string) (*ConfiguredStream, error)
func (*ConfiguredStream) Validate ¶
func (c *ConfiguredStream) Validate() error
type ConnectionStatus ¶
type DataObject ¶
type DataObject struct {
// contains filtered or unexported fields
}
func (*DataObject) Close ¶
func (d *DataObject) Close()
func (*DataObject) Err ¶
func (d *DataObject) Err() error
func (*DataObject) FullName ¶
func (d *DataObject) FullName() string
func (*DataObject) Name ¶
func (d *DataObject) Name() string
func (*DataObject) Next ¶
func (d *DataObject) Next() bool
func (*DataObject) Part ¶
func (d *DataObject) Part() (base.DataObjectPart, error)
func (*DataObject) ToOldTableDescription ¶
func (d *DataObject) ToOldTableDescription() (*abstract.TableDescription, error)
func (*DataObject) ToOldTableID ¶
func (d *DataObject) ToOldTableID() (*abstract.TableID, error)
func (*DataObject) ToTablePart ¶
func (d *DataObject) ToTablePart() (*abstract.TableDescription, error)
type DataObjects ¶
type DataObjects struct {
// contains filtered or unexported fields
}
func NewDataObjects ¶
func NewDataObjects(catalog *Catalog, filter base.DataObjectFilter) (*DataObjects, error)
func (*DataObjects) Close ¶
func (d *DataObjects) Close()
func (*DataObjects) Err ¶
func (d *DataObjects) Err() error
func (*DataObjects) Next ¶
func (d *DataObjects) Next() bool
func (*DataObjects) Object ¶
func (d *DataObjects) Object() (base.DataObject, error)
func (*DataObjects) ToOldTableMap ¶
func (d *DataObjects) ToOldTableMap() (abstract.TableMap, error)
type DestinationSyncMode ¶
type DestinationSyncMode string
const ( DestinationSyncModeAppend DestinationSyncMode = "append" DestinationSyncModeOverwrite DestinationSyncMode = "overwrite" DestinationSyncModeAppendDedup DestinationSyncMode = "append_dedup" )
type EndpointType ¶
type EndpointType int
type JSONProperty ¶
type JSONProperty struct { Type []string `json:"type"` Format string `json:"format"` AirbyteType string `json:"airbyte_type"` }
JSONProperty type can be array or single string, so had to implement custom unmarshal
func (*JSONProperty) UnmarshalJSON ¶
func (b *JSONProperty) UnmarshalJSON(data []byte) error
type JSONSchema ¶
type JSONSchema struct { Type StringOrArray `json:"type"` Properties map[string]JSONProperty `json:"properties"` }
type Message ¶
type Message struct { Type MessageType `json:"type"` Log *Log `json:"log,omitempty"` State *State `json:"state,omitempty"` Record *Record `json:"record,omitempty"` ConnectionStatus *ConnectionStatus `json:"connectionStatus,omitempty"` Spec *Spec `json:"spec,omitempty"` Catalog *Catalog `json:"catalog,omitempty"` }
type MessageType ¶
type MessageType string
const ( MessageTypeRecord MessageType = "RECORD" MessageTypeState MessageType = "STATE" MessageTypeLog MessageType = "LOG" MessageTypeSpec MessageType = "SPEC" MessageTypeConnectionStatus MessageType = "CONNECTION_STATUS" MessageTypeCatalog MessageType = "CATALOG" )
type Provider ¶
type Provider struct {
// contains filtered or unexported fields
}
func (*Provider) Activate ¶
func (p *Provider) Activate(ctx context.Context, task *model.TransferOperation, tables abstract.TableMap, callbacks providers.ActivateCallbacks) error
func (*Provider) TestChecks ¶
func (*Provider) Type ¶
func (p *Provider) Type() abstract.ProviderType
type Record ¶
type RecordBatch ¶
type RecordBatch struct {
// contains filtered or unexported fields
}
func NewRecordBatch ¶
func NewRecordBatch(rowIndexOffset int, stream *AirbyteStream) *RecordBatch
func (*RecordBatch) AsChangeItems ¶
func (r *RecordBatch) AsChangeItems() ([]abstract.ChangeItem, error)
func (*RecordBatch) Count ¶
func (r *RecordBatch) Count() int
func (*RecordBatch) Next ¶
func (r *RecordBatch) Next() bool
func (*RecordBatch) Size ¶
func (r *RecordBatch) Size() int
type RowsRecord ¶
type RowsRecord struct { Stream *AirbyteStream Record *Record TableSchema *abstract.TableSchema // contains filtered or unexported fields }
func (*RowsRecord) NewValuesCount ¶
func (s *RowsRecord) NewValuesCount() int
func (*RowsRecord) Table ¶
func (s *RowsRecord) Table() base.Table
func (*RowsRecord) ToOldChangeItem ¶
func (s *RowsRecord) ToOldChangeItem() (*abstract.ChangeItem, error)
type Source ¶
type Source struct {
// contains filtered or unexported fields
}
func NewSource ¶
func NewSource(lgr log.Logger, registry metrics.Registry, cp coordinator.Coordinator, cfg *AirbyteSource, transfer *model.Transfer) *Source
type Spec ¶
type Spec struct { DocumentationURL string `json:"documentationUrl,omitempty"` ChangelogURL string `json:"changelogUrl,omitempty"` ConnectionSpecification json.RawMessage `json:"connectionSpecification"` SupportsIncremental bool `json:"supportsIncremental,omitempty"` // SupportedDestinationSyncModes is ignored by Flow SupportedDestinationSyncModes []DestinationSyncMode `json:"supported_destination_sync_modes,omitempty"` // SupportsNormalization is not currently used or supported by Flow or estuary-developed // connectors SupportsNormalization bool `json:"supportsNormalization,omitempty"` // SupportsDBT is not currently used or supported by Flow or estuary-developed // connectors SupportsDBT bool `json:"supportsDBT,omitempty"` // AuthSpecification is not currently used or supported by Flow or estuary-developed // connectors AuthSpecification json.RawMessage `json:"authSpecification,omitempty"` }
type State ¶
type State struct { // Data is the actual state associated with the ingestion. This must be a JSON _Object_ in order // to comply with the airbyte specification. Data json.RawMessage `json:"data"` // Merge indicates that Data is an RFC 7396 JSON Merge Patch, and should // be be reduced into the previous state accordingly. Merge bool `json:"estuary.dev/merge,omitempty"` }
func (*State) UnmarshalJSON ¶
type Storage ¶
type Storage struct {
// contains filtered or unexported fields
}
func NewStorage ¶
func NewStorage(lgr log.Logger, registry metrics.Registry, cp coordinator.Coordinator, cfg *AirbyteSource, transfer *model.Transfer) (*Storage, error)
func (*Storage) EstimateTableRowsCount ¶
func (*Storage) ExactTableRowsCount ¶
func (*Storage) GetIncrementalState ¶
func (a *Storage) GetIncrementalState(ctx context.Context, incremental []abstract.IncrementalTable) ([]abstract.TableDescription, error)
func (*Storage) SetInitialState ¶
func (a *Storage) SetInitialState(tables []abstract.TableDescription, incrementalTables []abstract.IncrementalTable)
SetInitialState should have done nothing, since state handled inside loadTable method
func (*Storage) TableSchema ¶
type Stream ¶
type Stream struct { Name string `json:"name"` JSONSchema json.RawMessage `json:"json_schema"` SupportedSyncModes []string `json:"supported_sync_modes"` SourceDefinedCursor bool `json:"source_defined_cursor,omitempty"` DefaultCursorField []string `json:"default_cursor_field,omitempty"` SourceDefinedPrimaryKey [][]string `json:"source_defined_primary_key,omitempty"` Namespace string `json:"namespace,omitempty"` }
func (*Stream) AsModel ¶
func (s *Stream) AsModel() *AirbyteStream
func (*Stream) ParsedJSONSchema ¶
func (s *Stream) ParsedJSONSchema() JSONSchema
func (*Stream) SupportMode ¶
type StreamState ¶
type StreamState struct { StreamName string `json:"stream_name"` StreamNamespace string `json:"stream_namespace"` CursorField []string `json:"cursor_field"` Cursor string `json:"cursor,omitempty"` }
func (StreamState) TableID ¶
func (s StreamState) TableID() abstract.TableID
type StringOrArray ¶
type StringOrArray struct {
Array []string
}
func (*StringOrArray) MarshalJSON ¶
func (b *StringOrArray) MarshalJSON() ([]byte, error)
func (*StringOrArray) UnmarshalJSON ¶
func (b *StringOrArray) UnmarshalJSON(data []byte) error
type SyncStreams ¶
type SyncStreams struct {
Streams []AirbyteSyncStream `json:"streams"`
}
Click to show internal directories.
Click to hide internal directories.