Documentation ¶
Index ¶
- Constants
- Variables
- func FillPreconfiguredOauth(sourceType string, config interface{})
- func ParseProperties(system, prefix string, properties map[string]*Property, ...)
- func RegisterDriver(driverType string, ...)
- func RegisterTestConnectionFunc(driverType string, testConnectionFunc func(config *SourceConfig) error)
- func StreamIdentifier(namespace, name string) string
- func WaitReadiness(driver CLIDriver, taskLogger logging.TaskLogger) (bool, error)
- type AbstractCLIDriver
- func (acd *AbstractCLIDriver) GetAllAvailableIntervals() ([]*TimeInterval, error)
- func (acd *AbstractCLIDriver) GetCatalogPath() string
- func (acd *AbstractCLIDriver) GetCollectionMetaKey() string
- func (acd *AbstractCLIDriver) GetCollectionTable() string
- func (acd *AbstractCLIDriver) GetConfigPath() string
- func (acd *AbstractCLIDriver) GetObjectsFor(interval *TimeInterval, objectsLoader ObjectsLoader) error
- func (acd *AbstractCLIDriver) GetPropertiesPath() string
- func (acd *AbstractCLIDriver) GetRefreshWindow() (time.Duration, error)
- func (acd *AbstractCLIDriver) GetStateFilePath(state string) (string, error)
- func (acd *AbstractCLIDriver) GetStreamTableNameMapping() map[string]string
- func (acd *AbstractCLIDriver) GetTableNamePrefix() string
- func (acd *AbstractCLIDriver) GetTap() string
- func (acd *AbstractCLIDriver) ID() string
- func (acd *AbstractCLIDriver) Ready() (bool, error)
- func (acd *AbstractCLIDriver) SetCatalogPath(catalogPath string)
- func (acd *AbstractCLIDriver) SetPropertiesPath(propertiesPath string)
- func (acd *AbstractCLIDriver) SetStreamTableNameMappingIfNotExists(streamTableNameMappings map[string]string)
- func (acd *AbstractCLIDriver) Type() string
- type CLIDataConsumer
- type CLIDriver
- type CLIOutputRepresentation
- func (c *CLIOutputRepresentation) AddStream(streamName string, stream *StreamRepresentation)
- func (c *CLIOutputRepresentation) CurrentStream() *StreamRepresentation
- func (c *CLIOutputRepresentation) GetStream(streamName string) (*StreamRepresentation, bool)
- func (c *CLIOutputRepresentation) GetStreams() []*StreamRepresentation
- type CLIParser
- type CLITaskCloser
- type Collection
- type DatePartition
- type DeleteCondition
- type DeleteConditions
- type Driver
- type DriversInfo
- type ExecCommand
- type GoogleAuthConfig
- type GoogleAuthorizedUserJSON
- type IntervalDriver
- type ObjectsLoader
- type Property
- type SourceConfig
- type StreamConfiguration
- type StreamRepresentation
- type SyncCommand
- type TimeInterval
- func (ti *TimeInterval) CalculateSignatureFrom(t time.Time, window time.Duration) string
- func (ti *TimeInterval) Granularity() schema.Granularity
- func (ti *TimeInterval) IsAll() bool
- func (ti *TimeInterval) LowerEndpoint() time.Time
- func (ti *TimeInterval) String() string
- func (ti *TimeInterval) UpperEndpoint() time.Time
Constants ¶
const ( StateFileName = "state.json" ConfigFileName = "config.json" CatalogFileName = "catalog.json" PropertiesFileName = "properties.json" )
const ( AmplitudeType = "amplitude" FbMarketingType = "facebook_marketing" FirebaseType = "firebase" GoogleAnalyticsType = "google_analytics" GooglePlayType = "google_play" GoogleAdsType = "google_ads" RedisType = "redis" SingerType = "singer" AirbyteType = "airbyte" SdkSourceType = "sdk_source" NativeConnectorType = "native" GoogleOAuthAuthorizationType = "OAuth" DefaultDaysBackToLoad = 365 )
const ConfigSignatureSuffix = "_JITSU_config"
const SignatureLayout = "2006-01-02T15:04:05.000Z"
Variables ¶
var ( DriverConstructors = make(map[string]func(ctx context.Context, config *SourceConfig, collection *Collection) (Driver, error)) DriverTestConnectionFuncs = make(map[string]func(config *SourceConfig) error) )
Functions ¶
func FillPreconfiguredOauth ¶
func FillPreconfiguredOauth(sourceType string, config interface{})
func ParseProperties ¶
func ParseProperties(system, prefix string, properties map[string]*Property, resultFields schema.Fields)
ParseProperties recursively parses singer/airbyte catalog properties and enriches resultFields
func RegisterDriver ¶
func RegisterDriver(driverType string, createDriverFunc func(ctx context.Context, config *SourceConfig, collection *Collection) (Driver, error))
RegisterDriver registers function to create new driver instance
func RegisterTestConnectionFunc ¶
func RegisterTestConnectionFunc(driverType string, testConnectionFunc func(config *SourceConfig) error)
RegisterTestConnectionFunc registers function to test driver connection
func StreamIdentifier ¶
func WaitReadiness ¶
func WaitReadiness(driver CLIDriver, taskLogger logging.TaskLogger) (bool, error)
WaitReadiness waits 90 sec until driver is ready or returns false and notReadyError
Types ¶
type AbstractCLIDriver ¶
type AbstractCLIDriver struct {
// contains filtered or unexported fields
}
AbstractCLIDriver is an abstract implementation of CLI drivers such as Singer or Airbyte
func NewAbstractCLIDriver ¶
func NewAbstractCLIDriver(sourceID, tap, configPath, catalogPath, propertiesPath, initialStatePath, prefix, pathToConfigs string, tableNameMappings map[string]string) *AbstractCLIDriver
NewAbstractCLIDriver returns configured AbstractCLIDriver
func (*AbstractCLIDriver) GetAllAvailableIntervals ¶
func (acd *AbstractCLIDriver) GetAllAvailableIntervals() ([]*TimeInterval, error)
GetAllAvailableIntervals unsupported
func (*AbstractCLIDriver) GetCatalogPath ¶
func (acd *AbstractCLIDriver) GetCatalogPath() string
GetCatalogPath returns catalog path
func (*AbstractCLIDriver) GetCollectionMetaKey ¶
func (acd *AbstractCLIDriver) GetCollectionMetaKey() string
func (*AbstractCLIDriver) GetCollectionTable ¶
func (acd *AbstractCLIDriver) GetCollectionTable() string
GetCollectionTable unsupported
func (*AbstractCLIDriver) GetConfigPath ¶
func (acd *AbstractCLIDriver) GetConfigPath() string
GetConfigPath returns config path
func (*AbstractCLIDriver) GetObjectsFor ¶
func (acd *AbstractCLIDriver) GetObjectsFor(interval *TimeInterval, objectsLoader ObjectsLoader) error
GetObjectsFor unsupported
func (*AbstractCLIDriver) GetPropertiesPath ¶
func (acd *AbstractCLIDriver) GetPropertiesPath() string
GetPropertiesPath returns properties path
func (*AbstractCLIDriver) GetRefreshWindow ¶
func (acd *AbstractCLIDriver) GetRefreshWindow() (time.Duration, error)
GetRefreshWindow unsupported
func (*AbstractCLIDriver) GetStateFilePath ¶
func (acd *AbstractCLIDriver) GetStateFilePath(state string) (string, error)
GetStateFilePath returns input state as a filepath or returns initial state
func (*AbstractCLIDriver) GetStreamTableNameMapping ¶
func (acd *AbstractCLIDriver) GetStreamTableNameMapping() map[string]string
GetStreamTableNameMapping returns stream - table names mapping
func (*AbstractCLIDriver) GetTableNamePrefix ¶
func (acd *AbstractCLIDriver) GetTableNamePrefix() string
GetTableNamePrefix returns stream table name prefix or sourceID_
func (*AbstractCLIDriver) GetTap ¶
func (acd *AbstractCLIDriver) GetTap() string
func (*AbstractCLIDriver) Ready ¶
func (acd *AbstractCLIDriver) Ready() (bool, error)
Ready returns CLI Driver ready flag. Should be overridden in every implementation
func (*AbstractCLIDriver) SetCatalogPath ¶
func (acd *AbstractCLIDriver) SetCatalogPath(catalogPath string)
SetCatalogPath sets catalog path
func (*AbstractCLIDriver) SetPropertiesPath ¶
func (acd *AbstractCLIDriver) SetPropertiesPath(propertiesPath string)
SetPropertiesPath sets properties path
func (*AbstractCLIDriver) SetStreamTableNameMappingIfNotExists ¶
func (acd *AbstractCLIDriver) SetStreamTableNameMappingIfNotExists(streamTableNameMappings map[string]string)
SetStreamTableNameMappingIfNotExists sets stream table name mapping if not exists
func (*AbstractCLIDriver) Type ¶
func (acd *AbstractCLIDriver) Type() string
Type returns CLI Driver type. Should be overridden in every implementation
type CLIDataConsumer ¶
type CLIDataConsumer interface { Consume(representation *CLIOutputRepresentation) error CleanupAfterError(representation *CLIOutputRepresentation) }
CLIDataConsumer is used for consuming CLI drivers output
type CLIDriver ¶
type CLIDriver interface { Driver //IsClosed returns true if the driver is already closed IsClosed() bool //Load runs CLI command and consumes output Load(config string, state string, taskLogger logging.TaskLogger, dataConsumer CLIDataConsumer, taskCloser CLITaskCloser) error //Ready returns true if the driver is ready otherwise returns ErrNotReady Ready() (bool, error) //GetTap returns npm package for sdk_source, Singer tap or airbyte docker image (without prefix 'airbyte/': source-mixpanel) GetTap() string //GetTableNamePrefix returns stream table name prefix or sourceID_ GetTableNamePrefix() string //GetStreamTableNameMapping returns stream - table name mappings from configuration GetStreamTableNameMapping() map[string]string //GetTap returns path to config file GetConfigPath() string }
CLIDriver interface must be implemented by every CLI source type (Singer or Airbyte)
type CLIOutputRepresentation ¶
type CLIOutputRepresentation struct { State interface{} // contains filtered or unexported fields }
CLIOutputRepresentation is a singer/airbyte output representation
func NewCLIOutputRepresentation ¶
func NewCLIOutputRepresentation() *CLIOutputRepresentation
func (*CLIOutputRepresentation) AddStream ¶
func (c *CLIOutputRepresentation) AddStream(streamName string, stream *StreamRepresentation)
func (*CLIOutputRepresentation) CurrentStream ¶
func (c *CLIOutputRepresentation) CurrentStream() *StreamRepresentation
func (*CLIOutputRepresentation) GetStream ¶
func (c *CLIOutputRepresentation) GetStream(streamName string) (*StreamRepresentation, bool)
func (*CLIOutputRepresentation) GetStreams ¶
func (c *CLIOutputRepresentation) GetStreams() []*StreamRepresentation
type CLIParser ¶
type CLIParser interface {
Parse(stdout io.ReadCloser) error
}
type CLITaskCloser ¶
type CLITaskCloser interface { TaskID() string CloseWithError(msg string, systemErr bool) HandleCanceling() error }
CLITaskCloser is used for closing tasks
type Collection ¶
type Collection struct { DaysBackToLoad int `json:"-" yaml:"-"` //without serialization SourceID string `json:"-" yaml:"-"` //without serialization Name string `mapstructure:"name" json:"name,omitempty" yaml:"name,omitempty"` Type string `mapstructure:"type" json:"type,omitempty" yaml:"type,omitempty"` TableName string `mapstructure:"table_name" json:"table_name,omitempty" yaml:"table_name,omitempty"` StartDateStr string `mapstructure:"start_date" json:"start_date,omitempty" yaml:"start_date,omitempty"` Schedule string `mapstructure:"schedule" json:"schedule,omitempty" yaml:"schedule,omitempty"` SyncMode string `mapstructure:"mode" json:"mode,omitempty" yaml:"mode,omitempty"` Parameters map[string]interface{} `mapstructure:"parameters" json:"parameters,omitempty" yaml:"parameters,omitempty"` }
Collection is a dto for report unit serialization
func (*Collection) GetTableName ¶
func (c *Collection) GetTableName() string
GetTableName returns TableName if it's set otherwise SourceID_CollectionName
func (*Collection) Init ¶
func (c *Collection) Init() error
func (*Collection) Validate ¶
func (c *Collection) Validate() error
Validate returns err if collection invalid
type DatePartition ¶
type DatePartition struct { Field string Value time.Time Granularity schema.Granularity }
type DeleteCondition ¶
DeleteCondition is a representation of SQL delete condition
type DeleteConditions ¶
type DeleteConditions struct { Conditions []DeleteCondition Partition DatePartition JoinCondition string }
DeleteConditions is a dto for multiple DeleteCondition instances with Joiner
func DeleteByTimeChunkCondition ¶
func DeleteByTimeChunkCondition(timeInterval *TimeInterval) *DeleteConditions
DeleteByTimeChunkCondition return delete condition that removes objects based on eventn_ctx_time_interval value or empty condition if timeIntervalValue is empty
func (*DeleteConditions) IsEmpty ¶
func (dc *DeleteConditions) IsEmpty() bool
IsEmpty returns true if there is no conditions
type Driver ¶
type Driver interface { io.Closer //GetAllAvailableIntervals return all the available time intervals for data loading. It means, that if you want //your driver to load for the last year by month chunks, you need to return 12 time intervals, each covering one //month. There is drivers/granularity.ALL for data sources that store data which may not be split by date. GetAllAvailableIntervals() ([]*TimeInterval, error) //GetRefreshWindow return times duration during which Jitsu will keep reloading stream data. //Necessary for Sources where data may change retroactively (analytics, ads) GetRefreshWindow() (time.Duration, error) /*GetObjectsFor returns slice of objects per time interval. Each slice element is one object from the data source. pos - current position (object number) total - number of objects available to load. -1 is there is no way to know exact number in advance percent - percent of total object processed [0..100]. estimated value when there is no way to know exact number in advance*/ GetObjectsFor(interval *TimeInterval, objectsLoader ObjectsLoader) error // On full syncs whether to replace tables with or clear and append to them ReplaceTables() bool //Type returns string type of driver. Should be unique among drivers Type() string //GetCollectionTable returns table name GetCollectionTable() string //GetCollectionMetaKey returns key for storing signature in meta.Storage GetCollectionMetaKey() string //GetDriversInfo returns telemetry information about the driver GetDriversInfo() *DriversInfo Delete() error }
Driver interface must be implemented by every source type
type DriversInfo ¶
type DriversInfo struct { SourceType string ConnectorOrigin string ConnectorVersion string Streams int }
DriversInfo is a dto for sharing information about the driver into telemetry
type ExecCommand ¶
type GoogleAuthConfig ¶
type GoogleAuthConfig struct { Type string `mapstructure:"type" json:"type,omitempty" yaml:"type,omitempty"` ClientID string `mapstructure:"client_id" json:"client_id,omitempty" yaml:"client_id,omitempty"` ClientSecret string `mapstructure:"client_secret" json:"client_secret,omitempty" yaml:"client_secret,omitempty"` RefreshToken string `mapstructure:"refresh_token" json:"refresh_token,omitempty" yaml:"refresh_token,omitempty"` ServiceAccountKey interface{} `mapstructure:"service_account_key" json:"service_account_key,omitempty" yaml:"service_account_key,omitempty"` Subject string `mapstructure:"subject" json:"subject,omitempty" yaml:"subject,omitempty"` }
func (*GoogleAuthConfig) FillPreconfiguredOauth ¶
func (gac *GoogleAuthConfig) FillPreconfiguredOauth(sourceType string)
func (*GoogleAuthConfig) Marshal ¶
func (gac *GoogleAuthConfig) Marshal() ([]byte, error)
func (*GoogleAuthConfig) ToGoogleAuthJSON ¶
func (gac *GoogleAuthConfig) ToGoogleAuthJSON() GoogleAuthorizedUserJSON
ToGoogleAuthJSON returns configured GoogleAuthorizedUserJSON structure for Google authorization
func (*GoogleAuthConfig) Validate ¶
func (gac *GoogleAuthConfig) Validate() error
Validate checks service account JSON or OAuth fields returns err if both authorization parameters are empty
type GoogleAuthorizedUserJSON ¶
type GoogleAuthorizedUserJSON struct { ClientID string `mapstructure:"client_id" json:"client_id,omitempty" yaml:"client_id,omitempty"` ClientSecret string `mapstructure:"client_secret" json:"client_secret,omitempty" yaml:"client_secret,omitempty"` RefreshToken string `mapstructure:"refresh_token" json:"refresh_token,omitempty" yaml:"refresh_token,omitempty"` AuthType string `mapstructure:"type" json:"type,omitempty" yaml:"type,omitempty"` }
GoogleAuthorizedUserJSON is a Google dto for authorization
type IntervalDriver ¶
type IntervalDriver struct {
SourceType string
}
IntervalDriver is a base driver for native drivers
func (*IntervalDriver) Delete ¶
func (ind *IntervalDriver) Delete() error
func (*IntervalDriver) GetDriversInfo ¶
func (ind *IntervalDriver) GetDriversInfo() *DriversInfo
GetDriversInfo returns telemetry information about the driver
type ObjectsLoader ¶
type Property ¶
type Property struct { //might be string or []string or nil Type interface{} `json:"type,omitempty"` Format string `json:"format,omitempty"` Properties map[string]*Property `json:"properties,omitempty"` }
Property is a dto for catalog properties representation
type SourceConfig ¶
type SourceConfig struct { SourceID string `json:"source_id" yaml:"-"` Type string `mapstructure:"type" json:"type,omitempty" yaml:"type,omitempty"` Destinations []string `mapstructure:"destinations" json:"destinations,omitempty" yaml:"destinations,omitempty"` PostHandleDestinations []string `mapstructure:"post_handle_destinations" json:"post_handle_destinations,omitempty" yaml:"post_handle_destinations,omitempty"` Collections []interface{} `mapstructure:"collections" json:"collections,omitempty" yaml:"collections,omitempty"` Schedule string `mapstructure:"schedule" json:"schedule,omitempty" yaml:"schedule,omitempty"` Config map[string]interface{} `mapstructure:"config" json:"config,omitempty" yaml:"config,omitempty"` Notifications map[string]interface{} `mapstructure:"notifications" json:"notifications,omitempty" yaml:"notifications,omitempty"` ProjectName string `mapstructure:"project_name" json:"project_name,omitempty" yaml:"project_name,omitempty"` }
SourceConfig is a dto for api connector source config serialization
type StreamConfiguration ¶
type StreamConfiguration struct { Name string `mapstructure:"name" json:"name,omitempty" yaml:"name,omitempty"` Namespace string `mapstructure:"namespace" json:"namespace,omitempty" yaml:"namespace,omitempty"` SyncMode string `mapstructure:"sync_mode" json:"sync_mode,omitempty" yaml:"sync_mode,omitempty"` CursorField []string `mapstructure:"cursor_field" json:"cursor_field,omitempty" yaml:"cursor_field,omitempty"` }
StreamConfiguration is a dto for serialization selected streams configuration
type StreamRepresentation ¶
type StreamRepresentation struct { Namespace string ChunkNumber int StreamName string IntermediateTableName string BatchHeader *schema.BatchHeader KeyFields []string Objects []map[string]interface{} KeepKeysUnhashed bool RemoveSourceKeyFields bool NeedClean bool //Replace Stream table with IntermediateTableName (swap tables) Set only on final chunk SwapWithIntermediateTable bool DeleteConditions *DeleteConditions }
StreamRepresentation is a singer/airbyte stream representation
type SyncCommand ¶
type SyncCommand struct { Cmd ExecCommand TaskCloser CLITaskCloser }
SyncCommand is a dto for keeping sync command (airbyte/singer) for graceful closing
func (*SyncCommand) Cancel ¶
func (sc *SyncCommand) Cancel() error
Cancel uses Kill() under the hood
func (*SyncCommand) Kill ¶
func (sc *SyncCommand) Kill(msg string) error
Kill closes runner and uses taskCloser with err msg
func (*SyncCommand) Shutdown ¶
func (sc *SyncCommand) Shutdown() error
Shutdown uses Kill() under the hood
type TimeInterval ¶
type TimeInterval struct { TimeZoneID string // contains filtered or unexported fields }
func NewTimeInterval ¶
func NewTimeInterval(granularity schema.Granularity, t time.Time) *TimeInterval
func (*TimeInterval) CalculateSignatureFrom ¶
func (*TimeInterval) Granularity ¶
func (ti *TimeInterval) Granularity() schema.Granularity
func (*TimeInterval) IsAll ¶
func (ti *TimeInterval) IsAll() bool
func (*TimeInterval) LowerEndpoint ¶
func (ti *TimeInterval) LowerEndpoint() time.Time
func (*TimeInterval) String ¶
func (ti *TimeInterval) String() string
func (*TimeInterval) UpperEndpoint ¶
func (ti *TimeInterval) UpperEndpoint() time.Time