Documentation
¶
Index ¶
- Constants
- Variables
- func CreateMySQLAdapter(ctx context.Context, config adapters.DataSourceConfig, ...) (*adapters.MySQL, error)
- func CreateSnowflakeAdapter(ctx context.Context, s3Config *adapters.S3Config, ...) (*adapters.Snowflake, error)
- func IsConnectionError(err error) bool
- func RegisterFileStorage(storageType string, createStorage CreateStorage, extractConfig ExtractConfig)
- func RegisterStorage(storageType StorageType)
- type Abstract
- func (a *Abstract) AccountResult(eventContext *adapters.EventContext, err error)
- func (a *Abstract) Clean(tableName string) error
- func (a *Abstract) DropTable(tableName string) error
- func (a *Abstract) DryRun(payload events.Event) ([][]adapters.TableField, error)
- func (a *Abstract) ErrorEvent(fallback bool, eventCtx *adapters.EventContext, err error)
- func (a *Abstract) Fallback(failedEvents ...*events.FailedEvent)
- func (a *Abstract) GetSyncWorker() *SyncWorker
- func (a *Abstract) GetUniqueIDField() *identifiers.UniqueID
- func (a *Abstract) ID() string
- func (a *Abstract) Init(config *Config, impl Storage, preinstalledJavaScript string, ...) error
- func (a *Abstract) Insert(eventContext *adapters.EventContext) (insertErr error)
- func (a *Abstract) IsCachingDisabled() bool
- func (a *Abstract) IsStaging() bool
- func (a *Abstract) Processor() *schema.Processor
- func (a *Abstract) ReplaceTable(originalTable, replacementTable string, dropOldTable bool) error
- func (a *Abstract) SkipEvent(eventCtx *adapters.EventContext, err error)
- func (a *Abstract) Start(config *Config) error
- func (a *Abstract) Store(fileName string, objects []map[string]interface{}, ...) (map[string]*StoreResult, *events.FailedEvents, *events.SkippedEvents, error)
- func (a *Abstract) SuccessEvent(eventCtx *adapters.EventContext)
- func (a *Abstract) Update(eventContext *adapters.EventContext) error
- type Amplitude
- type AwsRedshift
- func (ar *AwsRedshift) Clean(tableName string) error
- func (ar *AwsRedshift) Close() (multiErr error)
- func (ar *AwsRedshift) GetUsersRecognition() *UserRecognitionConfiguration
- func (ar *AwsRedshift) SyncStore(overriddenDataSchema *schema.BatchHeader, objects []map[string]interface{}, ...) error
- func (ar *AwsRedshift) Type() string
- type BigQuery
- func (bq *BigQuery) Clean(tableName string) error
- func (bq *BigQuery) Close() (multiErr error)
- func (bq *BigQuery) GetUsersRecognition() *UserRecognitionConfiguration
- func (bq *BigQuery) SyncStore(overriddenDataSchema *schema.BatchHeader, objects []map[string]interface{}, ...) error
- func (bq *BigQuery) Type() string
- func (bq *BigQuery) Update(eventContext *adapters.EventContext) error
- type ClickHouse
- func (ch *ClickHouse) Clean(tableName string) error
- func (ch *ClickHouse) Close() (multiErr error)
- func (ch *ClickHouse) GetUsersRecognition() *UserRecognitionConfiguration
- func (ch *ClickHouse) SyncStore(overriddenDataSchema *schema.BatchHeader, objects []map[string]interface{}, ...) error
- func (ch *ClickHouse) Type() string
- type Config
- type CreateStorage
- type DbtCloud
- type ExtractConfig
- type Facebook
- type Factory
- type FactoryImpl
- type FileAdapter
- type FileStorage
- func (fs *FileStorage) Close() (multiErr error)
- func (fs *FileStorage) DryRun(events.Event) ([][]adapters.TableField, error)
- func (fs *FileStorage) GetUsersRecognition() *UserRecognitionConfiguration
- func (fs *FileStorage) Store(fileName string, objects []map[string]interface{}, ...) (map[string]*StoreResult, *events.FailedEvents, *events.SkippedEvents, error)
- func (fs *FileStorage) SyncStore(overriddenDataSchema *schema.BatchHeader, objects []map[string]interface{}, ...) error
- func (fs *FileStorage) Type() string
- func (fs *FileStorage) Update(map[string]interface{}) error
- type GoogleAnalytics
- type HTTPStorage
- func (h *HTTPStorage) Close() (multiErr error)
- func (h *HTTPStorage) DryRun(payload events.Event) ([][]adapters.TableField, error)
- func (h *HTTPStorage) GetUniqueIDField() *identifiers.UniqueID
- func (h *HTTPStorage) GetUsersRecognition() *UserRecognitionConfiguration
- func (h *HTTPStorage) Insert(eventContext *adapters.EventContext) error
- func (h *HTTPStorage) IsCachingDisabled() bool
- func (h *HTTPStorage) Store(fileName string, objects []map[string]interface{}, ...) (map[string]*StoreResult, *events.FailedEvents, *events.SkippedEvents, error)
- func (h *HTTPStorage) SyncStore(overriddenDataSchema *schema.BatchHeader, objects []map[string]interface{}, ...) error
- func (h *HTTPStorage) Type() string
- func (h *HTTPStorage) Update(eventContext *adapters.EventContext) error
- type HubSpot
- type MockFactory
- type MySQL
- func (m *MySQL) Clean(tableName string) error
- func (m *MySQL) Close() (multiErr error)
- func (m *MySQL) DryRun(payload events.Event) ([][]adapters.TableField, error)
- func (m *MySQL) GetUsersRecognition() *UserRecognitionConfiguration
- func (m *MySQL) SyncStore(overriddenDataSchema *schema.BatchHeader, objects []map[string]interface{}, ...) error
- func (m *MySQL) Type() string
- type NpmDestination
- type NpmValidatorResult
- type Postgres
- func (p *Postgres) Clean(tableName string) error
- func (p *Postgres) Close() (multiErr error)
- func (p *Postgres) GetUsersRecognition() *UserRecognitionConfiguration
- func (p *Postgres) SyncStore(overriddenDataSchema *schema.BatchHeader, objects []map[string]interface{}, ...) error
- func (p *Postgres) Type() string
- type RetryableProxy
- func (rsp *RetryableProxy) Close() error
- func (rsp *RetryableProxy) Get() (Storage, bool)
- func (rsp *RetryableProxy) GetGeoResolverID() string
- func (rsp *RetryableProxy) GetPostHandleDestinations() []string
- func (rsp *RetryableProxy) GetUniqueIDField() *identifiers.UniqueID
- func (rsp *RetryableProxy) ID() string
- func (rsp *RetryableProxy) IsCachingDisabled() bool
- func (rsp *RetryableProxy) Mode() string
- func (rsp *RetryableProxy) Type() string
- type Snowflake
- func (s *Snowflake) Clean(tableName string) error
- func (s *Snowflake) Close() (multiErr error)
- func (s *Snowflake) GetUsersRecognition() *UserRecognitionConfiguration
- func (s *Snowflake) SyncStore(overriddenDataSchema *schema.BatchHeader, objects []map[string]interface{}, ...) error
- func (s *Snowflake) Type() string
- type Storage
- func NewAmplitude(config *Config) (storage Storage, err error)
- func NewAwsRedshift(config *Config) (storage Storage, err error)
- func NewBigQuery(config *Config) (storage Storage, err error)
- func NewClickHouse(config *Config) (storage Storage, err error)
- func NewDbtCloud(config *Config) (storage Storage, err error)
- func NewFacebook(config *Config) (storage Storage, err error)
- func NewGoogleAnalytics(config *Config) (storage Storage, err error)
- func NewGoogleCloudStorage(config *Config) (Storage, error)
- func NewHubSpot(config *Config) (storage Storage, err error)
- func NewMySQL(config *Config) (storage Storage, err error)
- func NewNpmDestination(config *Config) (storage Storage, err error)
- func NewPostgres(config *Config) (storage Storage, err error)
- func NewS3(config *Config) (Storage, error)
- func NewSnowflake(config *Config) (storage Storage, err error)
- func NewTagDestination(config *Config) (storage Storage, err error)
- func NewWebHook(config *Config) (storage Storage, err error)
- type StorageProxy
- type StorageType
- type StoreResult
- type StreamingStorage
- type StreamingWorker
- type SyncStorage
- type SyncWorker
- type TableHelper
- func (th *TableHelper) EnsureTable(destinationID string, dataSchema *adapters.Table, cacheTable bool) (*adapters.Table, error)
- func (th *TableHelper) EnsureTableWithCaching(destinationID string, dataSchema *adapters.Table) (*adapters.Table, error)
- func (th *TableHelper) EnsureTableWithoutCaching(destinationID string, dataSchema *adapters.Table) (*adapters.Table, error)
- func (th *TableHelper) MapTableSchema(batchHeader *schema.BatchHeader) *adapters.Table
- func (th *TableHelper) RefreshTableSchema(destinationName string, dataSchema *adapters.Table) (*adapters.Table, error)
- type TagDestination
- func (t *TagDestination) Close() error
- func (t *TagDestination) GetSyncWorker() *SyncWorker
- func (t *TagDestination) GetUsersRecognition() *UserRecognitionConfiguration
- func (t *TagDestination) ProcessEvent(eventContext *adapters.EventContext) (map[string]interface{}, error)
- func (t *TagDestination) SyncStore(overriddenDataSchema *schema.BatchHeader, objects []map[string]interface{}, ...) error
- func (t *TagDestination) Type() string
- type URSetup
- type UserRecognitionConfiguration
- type WebHook
Constants ¶
const ( SourceSuccessEventType = "SOURCE_SUCCESSFUL_RUN" DestinationBatchEventType = "DESTINATION_BATCH_RUN" )
const ( //BatchMode is a mode when destinations store data with batches BatchMode = "batch" //StreamMode is a mode when destinations store data row by row StreamMode = "stream" //SynchronousMode is a mode when destinations process event immediately during HTTP request lifetime and can put result in HTTP response body SynchronousMode = "synchronous" )
const ( RedshiftType = "redshift" BigQueryType = "bigquery" PostgresType = "postgres" MySQLType = "mysql" ClickHouseType = "clickhouse" S3Type = "s3" SnowflakeType = "snowflake" GoogleAnalyticsType = "google_analytics" GCSType = "gcs" FacebookType = "facebook" WebHookType = "webhook" NpmType = "npm" TagType = "tag" AmplitudeType = "amplitude" HubSpotType = "hubspot" DbtCloudType = "dbtcloud" )
Variables ¶
var ( //ErrUnknownDestination error for checking unknown destination type ErrUnknownDestination = errors.New("Unknown destination type") //StorageTypes is used in all destinations init() methods StorageTypes = make(map[string]StorageType) )
var DefaultHTTPConfiguration = &adapters.HTTPConfiguration{ GlobalClientTimeout: 10 * time.Second, RetryDelay: 10 * time.Second, RetryCount: 9, ClientMaxIdleConns: 1000, ClientMaxIdleConnsPerHost: 1000, QueueFullnessThreshold: 100_000, }
DefaultHTTPConfiguration contains default HTTP timeouts/retry/delays,etc for HTTPAdapters
var ( UserRecognitionStorages = map[string]URSetup{ MySQLType: {true}, PostgresType: {true}, RedshiftType: {true}, SnowflakeType: {true}, ClickHouseType: {false}, } )
Functions ¶
func CreateMySQLAdapter ¶
func CreateMySQLAdapter(ctx context.Context, config adapters.DataSourceConfig, queryLogger *logging.QueryLogger, sqlTypes typing.SQLTypes) (*adapters.MySQL, error)
CreateMySQLAdapter creates mysql adapter with database if database doesn't exist - mysql returns error. In this case connect without database and create it
func CreateSnowflakeAdapter ¶
func CreateSnowflakeAdapter(ctx context.Context, s3Config *adapters.S3Config, config adapters.SnowflakeConfig, queryLogger *logging.QueryLogger, sqlTypes typing.SQLTypes) (*adapters.Snowflake, error)
CreateSnowflakeAdapter creates snowflake adapter with schema if schema doesn't exist - snowflake returns error. In this case connect without schema and create it
func IsConnectionError ¶
func RegisterFileStorage ¶
func RegisterFileStorage(storageType string, createStorage CreateStorage, extractConfig ExtractConfig)
func RegisterStorage ¶
func RegisterStorage(storageType StorageType)
RegisterStorage registers function to create new storage(destination) instance
Types ¶
type Abstract ¶
type Abstract struct {
// contains filtered or unexported fields
}
Abstract is an Abstract destination storage contains common destination funcs aka abstract class
func (*Abstract) AccountResult ¶
func (a *Abstract) AccountResult(eventContext *adapters.EventContext, err error)
AccountResult checks input error and calls ErrorEvent or SuccessEvent
func (*Abstract) ErrorEvent ¶
func (a *Abstract) ErrorEvent(fallback bool, eventCtx *adapters.EventContext, err error)
ErrorEvent writes error to metrics/counters/telemetry/events cache
func (*Abstract) Fallback ¶
func (a *Abstract) Fallback(failedEvents ...*events.FailedEvent)
Fallback logs event with error to fallback logger
func (*Abstract) GetSyncWorker ¶
func (a *Abstract) GetSyncWorker() *SyncWorker
func (*Abstract) GetUniqueIDField ¶
func (a *Abstract) GetUniqueIDField() *identifiers.UniqueID
GetUniqueIDField returns unique ID field configuration
func (*Abstract) Insert ¶
func (a *Abstract) Insert(eventContext *adapters.EventContext) (insertErr error)
Insert ensures table and sends input event to Destination (with 1 retry if error)
func (*Abstract) IsCachingDisabled ¶
IsCachingDisabled returns true if caching is disabled in destination configuration
func (*Abstract) ReplaceTable ¶
func (*Abstract) SkipEvent ¶
func (a *Abstract) SkipEvent(eventCtx *adapters.EventContext, err error)
SkipEvent writes skip to metrics/counters/telemetry and error to events cache
func (*Abstract) Store ¶
func (a *Abstract) Store(fileName string, objects []map[string]interface{}, alreadyUploadedTables map[string]bool, needCopyEvent bool) (map[string]*StoreResult, *events.FailedEvents, *events.SkippedEvents, error)
Store process events and stores with StoreTable() func returns store result per table, failed events (group of events which are failed to process) and err
func (*Abstract) SuccessEvent ¶
func (a *Abstract) SuccessEvent(eventCtx *adapters.EventContext)
SuccessEvent writes success to metrics/counters/telemetry/events cache
type Amplitude ¶
type Amplitude struct {
HTTPStorage
}
Amplitude is a destination that can send data into Amplitude
type AwsRedshift ¶
type AwsRedshift struct { Abstract // contains filtered or unexported fields }
AwsRedshift stores files to aws RedShift in two modes: batch: via aws s3 in batch mode (1 file = 1 statement) stream: via events queue in stream mode (1 object = 1 statement)
func (*AwsRedshift) Clean ¶
func (ar *AwsRedshift) Clean(tableName string) error
func (*AwsRedshift) Close ¶
func (ar *AwsRedshift) Close() (multiErr error)
Close closes AwsRedshift adapter, fallback logger and streaming worker
func (*AwsRedshift) GetUsersRecognition ¶
func (ar *AwsRedshift) GetUsersRecognition() *UserRecognitionConfiguration
GetUsersRecognition returns users recognition configuration
func (*AwsRedshift) SyncStore ¶
func (ar *AwsRedshift) SyncStore(overriddenDataSchema *schema.BatchHeader, objects []map[string]interface{}, deleteConditions *base.DeleteConditions, cacheTable bool, needCopyEvent bool) error
SyncStore is used in storing chunk of pulled data to AwsRedshift with processing
type BigQuery ¶
type BigQuery struct { Abstract // contains filtered or unexported fields }
BigQuery stores files to google BigQuery in two modes: batch: via google cloud storage in batch mode (1 file = 1 operation) stream: via events queue in stream mode (1 object = 1 operation)
func (*BigQuery) GetUsersRecognition ¶
func (bq *BigQuery) GetUsersRecognition() *UserRecognitionConfiguration
GetUsersRecognition returns disabled users recognition configuration
type ClickHouse ¶
type ClickHouse struct { Abstract // contains filtered or unexported fields }
ClickHouse stores files to ClickHouse in two modes: batch: (1 file = 1 statement) stream: (1 object = 1 statement)
func (*ClickHouse) Clean ¶
func (ch *ClickHouse) Clean(tableName string) error
func (*ClickHouse) Close ¶
func (ch *ClickHouse) Close() (multiErr error)
Close closes ClickHouse adapters, fallback logger and streaming worker
func (*ClickHouse) GetUsersRecognition ¶
func (ch *ClickHouse) GetUsersRecognition() *UserRecognitionConfiguration
GetUsersRecognition returns users recognition configuration
func (*ClickHouse) SyncStore ¶
func (ch *ClickHouse) SyncStore(overriddenDataSchema *schema.BatchHeader, objects []map[string]interface{}, deleteConditions *base.DeleteConditions, cacheTable bool, needCopyEvent bool) error
SyncStore is used in storing chunk of pulled data to ClickHouse with processing
type Config ¶
type Config struct { PostHandleDestinations []string // contains filtered or unexported fields }
Config is a model for passing to destinations creator funcs
type CreateStorage ¶
type DbtCloud ¶
type DbtCloud struct { HTTPStorage // contains filtered or unexported fields }
DbtCloud is a destination that can send API request to cloud.getdbt.com It is not general purpose destination. It is designed for special kind of events like successful run of Source
type ExtractConfig ¶
type ExtractConfig = func(config *config.DestinationConfig) map[string]interface{}
type Facebook ¶
type Facebook struct {
HTTPStorage
}
Facebook stores events to Facebook Conversion API in stream mode
type Factory ¶
type Factory interface { Create(name string, destination config.DestinationConfig) (StorageProxy, events.Queue, error) Configure(destinationID string, destination config.DestinationConfig) (func(config *Config) (Storage, error), *Config, error) }
Factory is a destinations factory for creation
func NewFactory ¶
func NewFactory(ctx context.Context, logEventPath string, geoService *geo.Service, coordinationService *coordination.Service, eventsCache *caching.EventsCache, globalLoggerFactory *logevents.Factory, globalConfiguration *config.UsersRecognition, metaStorage meta.Storage, eventsQueueFactory *events.QueueFactory, maxColumns int, streamingThreadsCount int) Factory
NewFactory returns configured Factory
type FactoryImpl ¶
type FactoryImpl struct {
// contains filtered or unexported fields
}
FactoryImpl is a destination's factory implementation
func (*FactoryImpl) Configure ¶
func (f *FactoryImpl) Configure(destinationID string, destination config.DestinationConfig) (func(config *Config) (Storage, error), *Config, error)
func (*FactoryImpl) Create ¶
func (f *FactoryImpl) Create(destinationID string, destination config.DestinationConfig) (StorageProxy, events.Queue, error)
Create builds event storage proxy and event consumer (logger or event-queue) Enriches incoming configs with default values if needed
type FileAdapter ¶
type FileAdapter interface { io.Closer UploadBytes(fileName string, fileBytes []byte) error Compression() adapters.FileCompression Format() adapters.FileEncodingFormat }
type FileStorage ¶
type FileStorage struct { Abstract // contains filtered or unexported fields }
func (*FileStorage) Close ¶
func (fs *FileStorage) Close() (multiErr error)
Close closes fallback logger
func (*FileStorage) DryRun ¶
func (fs *FileStorage) DryRun(events.Event) ([][]adapters.TableField, error)
func (*FileStorage) GetUsersRecognition ¶
func (fs *FileStorage) GetUsersRecognition() *UserRecognitionConfiguration
GetUsersRecognition returns disabled users recognition configuration
func (*FileStorage) Store ¶
func (fs *FileStorage) Store(fileName string, objects []map[string]interface{}, alreadyUploadedTables map[string]bool, needCopyEvent bool) (map[string]*StoreResult, *events.FailedEvents, *events.SkippedEvents, error)
Store process events and stores with storeTable() func returns store result per table, failed events (group of events which are failed to process) and err
func (*FileStorage) SyncStore ¶
func (fs *FileStorage) SyncStore(overriddenDataSchema *schema.BatchHeader, objects []map[string]interface{}, deleteConditions *base.DeleteConditions, cacheTable bool, needCopyEvent bool) error
SyncStore isn't supported
func (*FileStorage) Update ¶
func (fs *FileStorage) Update(map[string]interface{}) error
Update isn't supported
type GoogleAnalytics ¶
type GoogleAnalytics struct {
HTTPStorage
}
GoogleAnalytics stores events to Google Analytics in stream mode
func (*GoogleAnalytics) Type ¶
func (ga *GoogleAnalytics) Type() string
Type returns Google Analytics type
type HTTPStorage ¶
type HTTPStorage struct { Abstract // contains filtered or unexported fields }
HTTPStorage is an abstract destination storage for HTTP destinations contains common HTTP destination funcs aka abstract class
func (*HTTPStorage) Close ¶
func (h *HTTPStorage) Close() (multiErr error)
Close closes adapter, fallback logger and streaming worker
func (*HTTPStorage) DryRun ¶
func (h *HTTPStorage) DryRun(payload events.Event) ([][]adapters.TableField, error)
func (*HTTPStorage) GetUniqueIDField ¶
func (h *HTTPStorage) GetUniqueIDField() *identifiers.UniqueID
GetUniqueIDField returns unique ID field configuration
func (*HTTPStorage) GetUsersRecognition ¶
func (h *HTTPStorage) GetUsersRecognition() *UserRecognitionConfiguration
GetUsersRecognition returns disabled users recognition configuration
func (*HTTPStorage) Insert ¶
func (h *HTTPStorage) Insert(eventContext *adapters.EventContext) error
Insert sends event into adapters.Adapter
func (*HTTPStorage) IsCachingDisabled ¶
func (h *HTTPStorage) IsCachingDisabled() bool
IsCachingDisabled returns true if caching is disabled in destination configuration
func (*HTTPStorage) Store ¶
func (h *HTTPStorage) Store(fileName string, objects []map[string]interface{}, alreadyUploadedTables map[string]bool, needCopyEvent bool) (map[string]*StoreResult, *events.FailedEvents, *events.SkippedEvents, error)
Store isn't supported
func (*HTTPStorage) SyncStore ¶
func (h *HTTPStorage) SyncStore(overriddenDataSchema *schema.BatchHeader, objects []map[string]interface{}, deleteConditions *base.DeleteConditions, cacheTable bool, needCopyEvent bool) error
SyncStore isn't supported
func (*HTTPStorage) Type ¶
func (h *HTTPStorage) Type() string
Type returns storage type. Should be overridden in every implementation
func (*HTTPStorage) Update ¶
func (h *HTTPStorage) Update(eventContext *adapters.EventContext) error
Update isn't supported
type HubSpot ¶
type HubSpot struct {
HTTPStorage
}
HubSpot is a destination that can send data into HubSpot
type MockFactory ¶
type MockFactory struct{}
MockFactory is a Mock destinations storages factory
func (*MockFactory) Configure ¶
func (mf *MockFactory) Configure(_ string, _ config.DestinationConfig) (func(config *Config) (Storage, error), *Config, error)
func (*MockFactory) Create ¶
func (mf *MockFactory) Create(id string, destination config.DestinationConfig) (StorageProxy, events.Queue, error)
Create returns proxy Mock and events queue
type MySQL ¶
type MySQL struct { Abstract // contains filtered or unexported fields }
MySQL stores files to MySQL in two modes: batch: (1 file = 1 statement) stream: (1 object = 1 statement)
func (*MySQL) GetUsersRecognition ¶
func (m *MySQL) GetUsersRecognition() *UserRecognitionConfiguration
GetUsersRecognition returns users recognition configuration
type NpmDestination ¶
type NpmDestination struct {
WebHook
}
type NpmValidatorResult ¶
type Postgres ¶
type Postgres struct { Abstract // contains filtered or unexported fields }
Postgres stores files to Postgres in two modes: batch: (1 file = 1 statement) stream: (1 object = 1 statement)
func (*Postgres) GetUsersRecognition ¶
func (p *Postgres) GetUsersRecognition() *UserRecognitionConfiguration
GetUsersRecognition returns users recognition configuration
type RetryableProxy ¶
RetryableProxy creates Storage with retry (if create fails e.g. because of connection issue)
func (*RetryableProxy) Close ¶
func (rsp *RetryableProxy) Close() error
Close stops underlying goroutine and close the storage
func (*RetryableProxy) Get ¶
func (rsp *RetryableProxy) Get() (Storage, bool)
Get returns underlying destination storage and ready flag
func (*RetryableProxy) GetGeoResolverID ¶
func (rsp *RetryableProxy) GetGeoResolverID() string
func (*RetryableProxy) GetPostHandleDestinations ¶
func (rsp *RetryableProxy) GetPostHandleDestinations() []string
func (*RetryableProxy) GetUniqueIDField ¶
func (rsp *RetryableProxy) GetUniqueIDField() *identifiers.UniqueID
GetUniqueIDField returns unique ID field configuration
func (*RetryableProxy) IsCachingDisabled ¶
func (rsp *RetryableProxy) IsCachingDisabled() bool
IsCachingDisabled returns true if caching is disabled in destination configuration
func (*RetryableProxy) Mode ¶
func (rsp *RetryableProxy) Mode() string
Mode returns destination mode
func (*RetryableProxy) Type ¶
func (rsp *RetryableProxy) Type() string
Type returns destination type
type Snowflake ¶
type Snowflake struct { Abstract // contains filtered or unexported fields }
Snowflake stores files to Snowflake in two modes: batch: via aws s3 (or gcp) in batch mode (1 file = 1 transaction) stream: via events queue in stream mode (1 object = 1 transaction)
func (*Snowflake) Close ¶
Close closes Snowflake adapter, stage adapter, fallback logger and streaming worker
func (*Snowflake) GetUsersRecognition ¶
func (s *Snowflake) GetUsersRecognition() *UserRecognitionConfiguration
GetUsersRecognition returns users recognition configuration
type Storage ¶
type Storage interface { io.Closer DryRun(payload events.Event) ([][]adapters.TableField, error) Store(fileName string, objects []map[string]interface{}, alreadyUploadedTables map[string]bool, needCopyEvent bool) (map[string]*StoreResult, *events.FailedEvents, *events.SkippedEvents, error) SyncStore(overriddenDataSchema *schema.BatchHeader, objects []map[string]interface{}, deleteConditions *base.DeleteConditions, cacheTable bool, needCopyEvent bool) error ReplaceTable(originalTable, replacementTable string, dropOldTable bool) error DropTable(tableName string) error //Update(object map[string]interface{}) error Fallback(events ...*events.FailedEvent) GetUsersRecognition() *UserRecognitionConfiguration GetSyncWorker() *SyncWorker GetUniqueIDField() *identifiers.UniqueID Processor() *schema.Processor Init(config *Config, impl Storage, preinstalledJavaScript string, defaultUserTransform string) error Start(config *Config) error ID() string Type() string IsStaging() bool IsCachingDisabled() bool Clean(tableName string) error // contains filtered or unexported methods }
Storage is a destination representation
func NewAmplitude ¶
NewAmplitude returns configured Amplitude destination
func NewAwsRedshift ¶
NewAwsRedshift returns AwsRedshift and start goroutine for aws redshift batch storage or for stream consumer depend on destination mode
func NewBigQuery ¶
NewBigQuery returns BigQuery configured instance
func NewClickHouse ¶
NewClickHouse returns configured ClickHouse instance
func NewDbtCloud ¶
NewDbtCloud returns configured DbtCloud destination
func NewFacebook ¶
NewFacebook returns configured Facebook destination
func NewGoogleAnalytics ¶
NewGoogleAnalytics return GoogleAnalytics instance start streaming worker goroutine
func NewGoogleCloudStorage ¶
func NewHubSpot ¶
NewHubSpot returns configured HubSpot destination
func NewNpmDestination ¶
NewNpmDestination returns configured NpmDestination
func NewPostgres ¶
NewPostgres returns configured Postgres Destination
func NewSnowflake ¶
NewSnowflake returns Snowflake and start goroutine for Snowflake batch storage or for stream consumer depend on destination mode
func NewTagDestination ¶
NewTagDestination returns configured TagDestination
func NewWebHook ¶
NewWebHook returns configured WebHook destination
type StorageProxy ¶
type StorageProxy interface { io.Closer Get() (Storage, bool) GetUniqueIDField() *identifiers.UniqueID GetPostHandleDestinations() []string GetGeoResolverID() string IsCachingDisabled() bool ID() string Type() string Mode() string }
StorageProxy is a storage proxy
type StorageType ¶
type StorageType struct { IsSynchronous bool // contains filtered or unexported fields }
type StoreResult ¶
StoreResult is used as a Batch storing result
type StreamingStorage ¶
type StreamingStorage interface { Storage //Insert uses errCallback in async adapters (e.g. adapters.HTTPAdapter) Insert(eventContext *adapters.EventContext) (err error) Update(eventContext *adapters.EventContext) (err error) //SuccessEvent writes metrics/counters/events cache, etc SuccessEvent(eventCtx *adapters.EventContext) //ErrorEvent writes metrics/counters/events cache, etc ErrorEvent(fallback bool, eventCtx *adapters.EventContext, err error) //SkipEvent writes metrics/counters/events cache, etc SkipEvent(eventCtx *adapters.EventContext, err error) }
StreamingStorage supports Insert operation
type StreamingWorker ¶
type StreamingWorker struct {
// contains filtered or unexported fields
}
StreamingWorker reads events from queue and using events.StreamingStorage writes them
func (*StreamingWorker) Close ¶
func (sw *StreamingWorker) Close() error
type SyncStorage ¶
type SyncStorage interface { Storage //ProcessEvent process event in sync fashion. Return resulting object immediately ProcessEvent(eventContext *adapters.EventContext) (map[string]interface{}, error) //SuccessEvent writes metrics/counters/events cache, etc SuccessEvent(eventCtx *adapters.EventContext) //ErrorEvent writes metrics/counters/events cache, etc ErrorEvent(fallback bool, eventCtx *adapters.EventContext, err error) //SkipEvent writes metrics/counters/events cache, etc SkipEvent(eventCtx *adapters.EventContext, err error) }
SyncStorage supports ProcessEvent synchronous operation
type SyncWorker ¶
type SyncWorker struct {
// contains filtered or unexported fields
}
SyncWorker process events synchronously. Allow returning result of processing in the body of http response
func (*SyncWorker) Close ¶
func (sw *SyncWorker) Close() error
func (*SyncWorker) ProcessEvent ¶
func (sw *SyncWorker) ProcessEvent(fact events.Event, tokenID string) []map[string]interface{}
type TableHelper ¶
TableHelper keeps tables schema state inmemory and update it according to incoming new data consider that all tables are in one destination schema. note: Assume that after any outer changes in db we need to increment table version in Service
func NewTableHelper ¶
func NewTableHelper(dbSchema string, sqlAdapter adapters.SQLAdapter, coordinationService *coordination.Service, pkFields map[string]bool, columnTypesMapping map[typing.DataType]string, maxColumns int, destinationType string) *TableHelper
NewTableHelper returns configured TableHelper instance Note: columnTypesMapping must be not empty (or fields will be ignored)
func (*TableHelper) EnsureTable ¶
func (th *TableHelper) EnsureTable(destinationID string, dataSchema *adapters.Table, cacheTable bool) (*adapters.Table, error)
EnsureTable returns DB table schema and err if occurred if table doesn't exist - create a new one and increment version if exists - calculate diff, patch existing one with diff and increment version returns actual db table schema (with actual db types)
func (*TableHelper) EnsureTableWithCaching ¶
func (th *TableHelper) EnsureTableWithCaching(destinationID string, dataSchema *adapters.Table) (*adapters.Table, error)
EnsureTableWithCaching calls EnsureTable with cacheTable = true it is used in stream destinations (because we don't have time to select table schema, but there is retry on error)
func (*TableHelper) EnsureTableWithoutCaching ¶
func (th *TableHelper) EnsureTableWithoutCaching(destinationID string, dataSchema *adapters.Table) (*adapters.Table, error)
EnsureTableWithoutCaching calls EnsureTable with cacheTable = true it is used in batch destinations and syncStore (because we have time to select table schema)
func (*TableHelper) MapTableSchema ¶
func (th *TableHelper) MapTableSchema(batchHeader *schema.BatchHeader) *adapters.Table
MapTableSchema maps schema.BatchHeader (JSON structure with json data types) into adapters.Table (structure with SQL types) applies column types mapping
func (*TableHelper) RefreshTableSchema ¶
func (th *TableHelper) RefreshTableSchema(destinationName string, dataSchema *adapters.Table) (*adapters.Table, error)
RefreshTableSchema force get (or create) db table schema and update it in-memory
type TagDestination ¶
type TagDestination struct { Abstract // contains filtered or unexported fields }
func (*TagDestination) Close ¶
func (t *TagDestination) Close() error
func (*TagDestination) GetSyncWorker ¶
func (t *TagDestination) GetSyncWorker() *SyncWorker
func (*TagDestination) GetUsersRecognition ¶
func (t *TagDestination) GetUsersRecognition() *UserRecognitionConfiguration
func (*TagDestination) ProcessEvent ¶
func (t *TagDestination) ProcessEvent(eventContext *adapters.EventContext) (map[string]interface{}, error)
func (*TagDestination) SyncStore ¶
func (t *TagDestination) SyncStore(overriddenDataSchema *schema.BatchHeader, objects []map[string]interface{}, deleteConditions *base.DeleteConditions, cacheTable bool, needCopyEvent bool) error
type UserRecognitionConfiguration ¶
type UserRecognitionConfiguration struct { AnonymousIDJSONPath jsonutils.JSONPath IdentificationJSONPathes *jsonutils.JSONPaths Enabled bool }
UserRecognitionConfiguration recognition configuration
func (*UserRecognitionConfiguration) IsEnabled ¶
func (urc *UserRecognitionConfiguration) IsEnabled() bool
IsEnabled returns true if not nil and enabled