airbyte

package
v0.4.21 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 10, 2023 License: GPL-3.0 Imports: 27 Imported by: 1

Documentation

Index

Constants

View Source
const SourceDefinitionsURL = "" /* 126-byte string literal not displayed */

SourceDefinitionsURL is where the master source list is

Variables

View Source
var AirbyteFolder embed.FS

AirbyteFolder is the airbyte folder

Functions

func ContainerPull added in v0.3.241

func ContainerPull(ctx context.Context, image string) (err error)

ContainerRun runs a docker command and waits for the end

func GetAirbyteSpecs added in v0.1.0

func GetAirbyteSpecs() (abs map[string]ConnectionSpecification, err error)

GetAirbyteSpecs returns the key to specs map

Types

type Airbyte added in v0.1.0

type Airbyte struct {
	Context   *g.Context
	Connector *Connector
	Catalog   AirbyteCatalog
	// contains filtered or unexported fields
}

Airbyte is for connections for Airbyte

func NewAirbyteConnection added in v0.1.0

func NewAirbyteConnection(name string, opts AirbyteOptions) (a *Airbyte, err error)

NewAirbyteConnection creates a new airbyte connection object

func (*Airbyte) Close added in v0.1.0

func (a *Airbyte) Close() (err error)

Close closes the connection

func (*Airbyte) Discover added in v0.3.49

func (a *Airbyte) Discover() (streams AirbyteStreams, err error)

Discover returns the list of streams with their properties

func (*Airbyte) GetConfiguredStream added in v0.3.52

func (a *Airbyte) GetConfiguredStream(name string, sc StreamConfig) (cas ConfiguredAirbyteStream, err error)

func (*Airbyte) GetProp added in v0.1.0

func (a *Airbyte) GetProp(key string) (val string)

GetProp returns the value of a property

func (*Airbyte) Init added in v0.1.0

func (a *Airbyte) Init(check bool) (err error)

Init initializes the airbyte connection

func (*Airbyte) SetProp added in v0.1.0

func (a *Airbyte) SetProp(key string, val string)

SetProp sets the value of a property

func (*Airbyte) Stream added in v0.1.0

func (a *Airbyte) Stream(name string, sc StreamConfig) (ds *iop.Datastream, err error)

Stream stream the object data needs work to provide the `state` for incremental reading there are 2 ways it seems - providing the `start_date` as part of the config - or providing the `state` object when reading. it doesn't look to be consistent. Github uses `state`, salesforce uses `start_date`

type AirbyteCatalog

type AirbyteCatalog struct {
	Streams AirbyteStreams `json:"streams"`
}

AirbyteCatalog is the Airbyte stream schema catalog

func (AirbyteCatalog) GetStream

func (ac AirbyteCatalog) GetStream(name string) (s AirbyteStream)

GetStream returns the stream by name

type AirbyteConnectionStatus

type AirbyteConnectionStatus struct {
	Status  Status `json:"status"`
	Message string `json:"message"`
}

AirbyteConnectionStatus is the Airbyte connection status

type AirbyteLogMessage

type AirbyteLogMessage struct {
	Level   Level       `json:"level"`
	Message interface{} `json:"message"`
}

AirbyteLogMessage is the AirbyteLogMessage

type AirbyteMessage

type AirbyteMessage struct {
	Type             Type                     `json:"type"`
	Log              *AirbyteLogMessage       `json:"log,omitempty"`
	Trace            *AirbyteTraceMessage     `json:"trace,omitempty"`
	Spec             *ConnectorSpecification  `json:"spec,omitempty"`
	ConnectionStatus *AirbyteConnectionStatus `json:"connectionStatus,omitempty"`
	Catalog          *AirbyteCatalog          `json:"catalog,omitempty"`
	Record           *AirbyteRecordMessage    `json:"record,omitempty"`
	State            *AirbyteStateMessage     `json:"state,omitempty"`
}

AirbyteMessage is the AirbyteMessage

func NewAirbyteMessage

func NewAirbyteMessage() (am *AirbyteMessage)

NewAirbyteMessage creates a new airbyte message

func (AirbyteMessage) CheckError added in v0.3.47

func (msg AirbyteMessage) CheckError() (err error)

First returns the first message of specified kind

type AirbyteMessages

type AirbyteMessages []AirbyteMessage

AirbyteMessages is a list of messages

func (AirbyteMessages) CheckError added in v0.3.47

func (msgs AirbyteMessages) CheckError() (err error)

First returns the first message of specified kind

func (AirbyteMessages) First

func (msgs AirbyteMessages) First(t Type) (msg AirbyteMessage)

First returns the first message of specified kind

type AirbyteOptions added in v0.3.49

type AirbyteOptions struct {
	Config     map[string]interface{}
	TempFolder string
	DateLayout string
	DateField  string
}

type AirbyteRecordMessage

type AirbyteRecordMessage struct {
	Stream    string                 `json:"stream"`
	Data      map[string]interface{} `json:"data"`
	EmittedAt int64                  `json:"emitted_at"`
}

AirbyteRecordMessage is the AirbyteRecordMessage

type AirbyteStateMessage

type AirbyteStateMessage struct {
	Data map[string]interface{} `json:"data"`
}

AirbyteStateMessage is the AirbyteStateMessage

type AirbyteStream

type AirbyteStream struct {
	Name                    string           `json:"name"`
	JsonSchema              StreamJsonSchema `json:"json_schema"`
	SupportedSyncModes      []SyncMode       `json:"supported_sync_modes"`
	SourceDefinedCursor     bool             `json:"source_defined_cursor"`
	DefaultCursorField      []string         `json:"default_cursor_field"`
	SourceDefinedPrimaryKey [][]string       `json:"source_defined_primary_key"`
}

AirbyteStream is the AirbyteStream

func (AirbyteStream) Columns added in v0.3.49

func (as AirbyteStream) Columns() iop.Columns

Columns returns the stream columns

func (AirbyteStream) Select added in v0.3.49

func (as AirbyteStream) Select(columns []string) (nas AirbyteStream)

Select returns the stream with columns selected

type AirbyteStreams

type AirbyteStreams []AirbyteStream

AirbyteStreams is a list of AirbyteStream

func (AirbyteStreams) Names

func (ass AirbyteStreams) Names() []string

Names returns the stream names

type AirbyteTraceMessage added in v0.3.45

type AirbyteTraceMessage struct {
	Type      Level                  `json:"type"`
	EmittedAt float64                `json:"emitted_at"`
	Error     map[string]interface{} `json:"error"`
}

AirbyteTraceMessage is the AirbyteTraceMessage

type ConatinerLogMsg added in v0.3.241

type ConatinerLogMsg struct {
	ID     string `json:"id"`
	Status string `json:"status"`
}

type ConfiguredAirbyteCatalog

type ConfiguredAirbyteCatalog struct {
	Streams []ConfiguredAirbyteStream `json:"streams"`
}

ConfiguredAirbyteCatalog is the Airbyte stream schema catalog

type ConfiguredAirbyteStream

type ConfiguredAirbyteStream struct {
	Stream              AirbyteStream       `json:"stream"`
	SyncMode            SyncMode            `json:"sync_mode"`
	CursorField         []string            `json:"cursor_field"`
	DestinationSyncMode DestinationSyncMode `json:"destination_sync_mode"`
	PrimaryKey          []string            `json:"primary_key"`
}

ConfiguredAirbyteStream is the ConfiguredAirbyteStream

type ConnectionProperties

type ConnectionProperties map[string]ConnectionProperty

type ConnectionProperty

type ConnectionProperty struct {
	Title         string                    `json:"title,omitempty" yaml:"title,omitempty"`
	Const         string                    `json:"const,omitempty" yaml:"const,omitempty"`
	Description   string                    `json:"description,omitempty" yaml:"description,omitempty"`
	AirbyteSecret bool                      `json:"airbyte_secret,omitempty" yaml:"airbyte_secret,omitempty"`
	Type          string                    `json:"type,omitempty" yaml:"type,omitempty"`
	Order         *int                      `json:"order,omitempty" yaml:"order,omitempty"`
	OneOf         []ConnectionSpecification `json:"oneOf,omitempty" yaml:"oneOf,omitempty"`
	Minimum       interface{}               `json:"minimum,omitempty" yaml:"minimum,omitempty"`
	Maximum       interface{}               `json:"maximum,omitempty" yaml:"maximum,omitempty"`
	Default       interface{}               `json:"default,omitempty" yaml:"default,omitempty"`
	Examples      []interface{}             `json:"examples,omitempty" yaml:"examples,omitempty"`
}

type ConnectionSpecification

type ConnectionSpecification struct {
	Name                 string               `json:"name,omitempty" yaml:"name,omitempty"`
	Title                string               `json:"title" yaml:"title"`
	Type                 string               `json:"type" yaml:"type"`
	AdditionalProperties bool                 `json:"additionalProperties" yaml:"additionalProperties"`
	Required             []string             `json:"required" yaml:"required"`
	Properties           ConnectionProperties `json:"properties" yaml:"properties"`
}

type Connector

type Connector struct {
	Definition    ConnectorDefinition
	Specification ConnectorSpecification
	State         map[string]interface{}
	// contains filtered or unexported fields
}

Connector is an airbyte connector

func (*Connector) Check

func (c *Connector) Check(config map[string]interface{}) (s AirbyteConnectionStatus, err error)

Check attempts to connect to the connector with the provided config credentials

func (*Connector) Discover

func (c *Connector) Discover(config map[string]interface{}) (ac AirbyteCatalog, err error)

Discover detects the structure of the data in the data source.

func (*Connector) DockerRun

func (c *Connector) DockerRun(args ...string) (messages AirbyteMessages, err error)

DockerRun runs a docker command and waits for the end

func (*Connector) DockerStart

func (c *Connector) DockerStart(args ...string) (msgChan chan AirbyteMessage, err error)

DockerStart starts the process and returns the channel of messages

func (*Connector) GetSpec

func (c *Connector) GetSpec() (err error)

GetSpec retrieve spec from docker command

func (*Connector) InitTempDir

func (c *Connector) InitTempDir() (err error)

InitTempDir initalize temp directory

func (*Connector) Key added in v0.1.0

func (c *Connector) Key() string

func (*Connector) Pull added in v0.3.47

func (c *Connector) Pull() (err error)

Pull pulls the docker image. Useful for outputing

func (*Connector) Read

func (c *Connector) Read(config map[string]interface{}, catalog ConfiguredAirbyteCatalog, state map[string]interface{}, props map[string]string) (ds *iop.Datastream, err error)

Discover detects the structure of the data in the data source.

type ConnectorDefinition

type ConnectorDefinition struct {
	SourceDefinitionId string `yaml:"sourceDefinitionId"`
	Name               string `yaml:"name"`
	DockerRepository   string `yaml:"dockerRepository"`
	DockerImageTag     string `yaml:"dockerImageTag"`
	DocumentationUrl   string `yaml:"documentationUrl"`
}

ConnectorDefinition is a connector information https://github.com/airbytehq/airbyte/blob/master/airbyte-config/init/src/main/resources/seed/source_definitions.yaml

func (ConnectorDefinition) Image

func (cd ConnectorDefinition) Image() string

Image returns the docker image

type ConnectorSpecification

type ConnectorSpecification struct {
	DocumentationUrl              string                  `json:"documentationUrl" yaml:"documentationUrl"`
	ChangelogUrl                  string                  `json:"changelogUrl" yaml:"changelogUrl"`
	ConnectionSpecification       ConnectionSpecification `json:"connectionSpecification" yaml:"connectionSpecification"`
	SupportsIncremental           bool                    `json:"supportsIncremental" yaml:"supportsIncremental"`
	SupportedDestinationSyncModes []DestinationSyncMode   `json:"supported_destination_sync_modes" yaml:"supported_destination_sync_modes"`
}

ConnectorSpecification is the Specification of a connector (source/destination)

type Connectors

type Connectors []Connector

Connectors is a list of Connector

func GetSourceConnectors

func GetSourceConnectors(fetch bool) (connectors Connectors, err error)

GetSourceConnectors polls and retrieves the latest connectors sources

func (Connectors) Get

func (cs Connectors) Get(key string) (c Connector, err error)

Get returns the Connector with the spec

func (Connectors) Names

func (cs Connectors) Names() (n []string)

Names returns the Connector names

type Container added in v0.3.241

type Container struct {
	ID      string
	Err     error
	Context *g.Context
	DoneOK  <-chan container.ContainerWaitOKBody
	DoneErr <-chan error

	StdoutReader io.ReadCloser
	StderrReader io.ReadCloser
	StdinWriter  io.Writer
	Config       *container.Config
	HostConfig   *container.HostConfig
	Options      *ContainerOptions
	// contains filtered or unexported fields
}

func ContainerRun added in v0.3.241

func ContainerRun(ctx context.Context, opts *ContainerOptions) (c *Container, err error)

func ContainerStart added in v0.3.241

func ContainerStart(ctx context.Context, opts *ContainerOptions) (c *Container, err error)

ContainerRun starts a docker command

func (*Container) GetCommandString added in v0.3.241

func (c *Container) GetCommandString() (cmd string)

func (*Container) IsAlive added in v0.3.241

func (c *Container) IsAlive() bool

IsAlive checks is container is part of the list from docker. doneOK and doneErr don't always return, which is very strange

func (*Container) Wait added in v0.3.241

func (c *Container) Wait() (err error)

type ContainerOptions added in v0.3.241

type ContainerOptions struct {
	Image      string
	Cmd        []string
	WorkingDir string
	Env        []string
	Mounts     map[string]string
	AutoRemove bool
	OpenStdin  bool
	Nice       int
	Scanner    func(stderr bool, text string)
	Print      bool
}

type DestinationSyncMode

type DestinationSyncMode string

DestinationSyncMode is the DestinationSyncMode

const DestinationSyncModeAppend DestinationSyncMode = "append"
const DestinationSyncModeAppendDedup DestinationSyncMode = "append_dedup"
const DestinationSyncModeOverwrite DestinationSyncMode = "overwrite"
const DestinationSyncModeUpsertDedup DestinationSyncMode = "upsert_dedup"

type Level

type Level string

Level is for AirbyteLogMessage

const LevelDebug Level = "DEBUG"
const LevelError Level = "ERROR"
const LevelFatal Level = "FATAL"
const LevelInfo Level = "INFO"
const LevelTrace Level = "TRACE"
const LevelWarn Level = "WARN"

type Status

type Status string

Status is for the AirbyteConnectionStatus

const StatusFailed Status = "FAILED"
const StatusSucceeded Status = "SUCCEEDED"

type StreamConfig added in v0.3.49

type StreamConfig struct {
	Columns    []string
	PrimaryKey []string
	SyncMode   SyncMode
	StartDate  string

	// TODO: how to store and retrieve / reconstruct state?
	State map[string]interface{}
}

type StreamJsonSchema

type StreamJsonSchema struct {
	AdditionalProperties bool                   `json:"additionalProperties"`
	Properties           map[string]interface{} `json:"properties"`
}

func (StreamJsonSchema) Columns

func (sjs StreamJsonSchema) Columns() (cols iop.Columns)

Columns returns the properties as columns

type SyncMode

type SyncMode string

SyncMode is the SyncMode

const SyncModeFullRefresh SyncMode = "full_refresh"
const SyncModeIncremental SyncMode = "incremental"

type Type

type Type string

Type is for AirbyteMessage

const TypeCatalog Type = "CATALOG"
const TypeConnectionStatus Type = "CONNECTION_STATUS"
const TypeLog Type = "LOG"
const TypeRecord Type = "RECORD"
const TypeSpec Type = "SPEC"
const TypeState Type = "STATE"
const TypeTrace Type = "TRACE"

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL