adapters

package
v0.0.0-...-1a33ab8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 27, 2023 License: MIT Imports: 57 Imported by: 0

Documentation

Index

Constants

View Source
const (
	FileFormatFlatJSON  FileEncodingFormat = "flat_json" //flattened json objects with \n delimiter
	FileFormatJSON      FileEncodingFormat = "json"      //file with json objects with \n delimiter (not flattened)
	FileFormatCSV       FileEncodingFormat = "csv"       //flattened csv objects with \n delimiter
	FileFormatParquet   FileEncodingFormat = "parquet"   //flattened objects which are marshalled in apache parquet file
	FileCompressionGZIP FileCompression    = "gzip"      //gzip compression
)
View Source
const CtxDestinationId = "CtxDestinationId"
View Source
const (
	JitsuUserAgent = "Jitsu.com/1.0"
)
View Source
const (
	MySQLValuesLimit = 65535 // this is a limitation of parameters one can pass as query values. If more parameters are passed, error is returned

)
View Source
const (
	PostgresValuesLimit = 65535 // this is a limitation of parameters one can pass as query values. If more parameters are passed, error is returned
)
View Source
const (
	RedshiftValuesLimit = 32767 // this is a limitation of parameters one can pass as query values. If more parameters are passed, error is returned

)

Variables

View Source
var (
	//DefaultSchemaTypeMappings is dummy mappings
	DefaultSchemaTypeMappings = map[typing.DataType]string{
		typing.STRING:    "string",
		typing.INT64:     "string",
		typing.FLOAT64:   "string",
		typing.TIMESTAMP: "string",
		typing.BOOL:      "string",
		typing.UNKNOWN:   "string",
	}
)
View Source
var ErrMalformedBQDataset = errors.New("bq_dataset must be alphanumeric (plus underscores) and must be at most 1024 characters long")
View Source
var ErrTableNotExist = errors.New("table doesn't exist")
View Source
var (
	SchemaToClickhouse = map[typing.DataType]string{
		typing.STRING:    "String",
		typing.INT64:     "Int64",
		typing.FLOAT64:   "Float64",
		typing.TIMESTAMP: "DateTime",
		typing.BOOL:      "UInt8",
		typing.UNKNOWN:   "String",
	}
)
View Source
var (
	SchemaToMySQL = map[typing.DataType]string{
		typing.STRING:    "TEXT",
		typing.INT64:     "BIGINT",
		typing.FLOAT64:   "DOUBLE",
		typing.TIMESTAMP: "DATETIME",
		typing.BOOL:      "BOOLEAN",
		typing.UNKNOWN:   "TEXT",
	}
)
View Source
var (
	SchemaToPostgres = map[typing.DataType]string{
		typing.STRING:    "text",
		typing.INT64:     "bigint",
		typing.FLOAT64:   "double precision",
		typing.TIMESTAMP: "timestamp",
		typing.BOOL:      "boolean",
		typing.UNKNOWN:   "text",
	}
)
View Source
var (
	SchemaToRedshift = map[typing.DataType]string{
		typing.STRING:    "character varying(65535)",
		typing.INT64:     "bigint",
		typing.FLOAT64:   "double precision",
		typing.TIMESTAMP: "timestamp",
		typing.BOOL:      "boolean",
		typing.UNKNOWN:   "character varying(65535)",
	}
)
View Source
var (
	SchemaToSnowflake = map[typing.DataType]string{
		typing.STRING:    "text",
		typing.INT64:     "bigint",
		typing.FLOAT64:   "double precision",
		typing.TIMESTAMP: "timestamp(6)",
		typing.BOOL:      "boolean",
		typing.UNKNOWN:   "text",
	}
)

Functions

func BuildConstraintName

func BuildConstraintName(schemaName string, tableName string) string

func GranularityToPartitionIds

func GranularityToPartitionIds(g schema.Granularity, t time.Time) []string

func ObjectValuesToString

func ObjectValuesToString(header []string, valueArgs []interface{}) string

func ProcessSSL

func ProcessSSL(dir string, dsc *DataSourceConfig) error

ProcessSSL serializes SSL payload (ca, client cert, key) into files enriches input DataSourceConfig parameters with SSL config ssl configuration might be file path as well as string content

func QueuedRequestBuilder

func QueuedRequestBuilder() interface{}

QueuedRequestBuilder creates and returns a new *adapters.QueuedRequest (must be pointer).

func SSLDir

func SSLDir(dir, identifier string) string

SSLDir returns SSL dir /:path_to_configs/ssl/:ID

Types

type AbstractHTTP

type AbstractHTTP struct {
	// contains filtered or unexported fields
}

AbstractHTTP is an Abstract HTTP adapter for keeping default funcs

func (*AbstractHTTP) Close

func (a *AbstractHTTP) Close() error

Close closes underlying HTTPAdapter

func (*AbstractHTTP) Insert

func (a *AbstractHTTP) Insert(insertContext *InsertContext) error

Insert passes object to HTTPAdapter

func (*AbstractHTTP) Type

func (a *AbstractHTTP) Type() string

Type returns adapter type. Should be overridden in every implementation

type Adapter

type Adapter interface {
	io.Closer
	Insert(insertContext *InsertContext) error
}

Adapter is an adapter for all destinations

type Amplitude

type Amplitude struct {
	AbstractHTTP
	// contains filtered or unexported fields
}

Amplitude is an adapter for sending HTTP requests to Amplitude

func NewAmplitude

func NewAmplitude(config *AmplitudeConfig, httpAdapterConfiguration *HTTPAdapterConfiguration) (*Amplitude, error)

NewAmplitude returns configured Amplitude adapter instance

func NewTestAmplitude

func NewTestAmplitude(config *AmplitudeConfig) *Amplitude

NewTestAmplitude returns test instance of adapter

func (*Amplitude) TestAccess

func (a *Amplitude) TestAccess() error

TestAccess sends test request (empty POST) to Amplitude and check if error has occurred

func (*Amplitude) Type

func (a *Amplitude) Type() string

Type returns adapter type

type AmplitudeConfig

type AmplitudeConfig struct {
	APIKey   string `mapstructure:"api_key" json:"api_key,omitempty" yaml:"api_key,omitempty"`
	Endpoint string `mapstructure:"endpoint" json:"endpoint,omitempty" yaml:"endpoint,omitempty"`
}

AmplitudeConfig is a dto for parsing Amplitude configuration

func (*AmplitudeConfig) Validate

func (ac *AmplitudeConfig) Validate() error

Validate returns err if invalid

type AmplitudeRequest

type AmplitudeRequest struct {
	APIKey string                   `json:"api_key"`
	Events []map[string]interface{} `json:"events"`
}

AmplitudeRequest is a dto for sending requests to Amplitude

type AmplitudeRequestFactory

type AmplitudeRequestFactory struct {
	// contains filtered or unexported fields
}

AmplitudeRequestFactory is a factory for building Amplitude HTTP requests from input events

func (*AmplitudeRequestFactory) Close

func (arf *AmplitudeRequestFactory) Close()

func (*AmplitudeRequestFactory) Create

func (arf *AmplitudeRequestFactory) Create(object map[string]interface{}) (*Request, error)

Create returns created amplitude request put empty array in body if object is nil (is used in test connection)

type AmplitudeResponse

type AmplitudeResponse struct {
	Code  int    `json:"code"`
	Error string `json:"error"`
}

AmplitudeResponse is a dto for receiving response from Amplitude

type AwsRedshift

type AwsRedshift struct {
	// contains filtered or unexported fields
}

AwsRedshift adapter for creating,patching (schema or table), inserting and copying data from s3 to redshift

func NewAwsRedshift

func NewAwsRedshift(ctx context.Context, dsConfig *DataSourceConfig, s3Config *S3Config,
	queryLogger *logging.QueryLogger, sqlTypes typing.SQLTypes) (*AwsRedshift, error)

NewAwsRedshift returns configured AwsRedshift adapter instance

func (*AwsRedshift) Close

func (ar *AwsRedshift) Close() error

Close underlying sql.DB

func (*AwsRedshift) Copy

func (ar *AwsRedshift) Copy(fileKey, tableName string) error

Copy transfer data from s3 to redshift by passing COPY request to redshift

func (*AwsRedshift) CreateDbSchema

func (ar *AwsRedshift) CreateDbSchema(dbSchemaName string) error

CreateDbSchema create database schema instance if doesn't exist

func (*AwsRedshift) CreateTable

func (ar *AwsRedshift) CreateTable(tableSchema *Table) error

CreateTable create database table with name,columns provided in Table representation

func (*AwsRedshift) DropTable

func (ar *AwsRedshift) DropTable(table *Table) error

DropTable drops table in transaction uses underlying postgres datasource

func (*AwsRedshift) GetTableSchema

func (ar *AwsRedshift) GetTableSchema(tableName string) (*Table, error)

GetTableSchema return table (name,columns, primary key) representation wrapped in Table struct

func (*AwsRedshift) Insert

func (ar *AwsRedshift) Insert(insertContext *InsertContext) error

Insert inserts data with InsertContext as a single object or a batch into Redshift

func (*AwsRedshift) OpenTx

func (ar *AwsRedshift) OpenTx() (*Transaction, error)

OpenTx open underline sql transaction and return wrapped instance

func (*AwsRedshift) PatchTableSchema

func (ar *AwsRedshift) PatchTableSchema(patchSchema *Table) error

PatchTableSchema add new columns/primary keys or delete primary key from existing table on primary keys creation error - get table schema, re-create column and try one more time

func (*AwsRedshift) ReplaceTable

func (ar *AwsRedshift) ReplaceTable(originalTable, replacementTable string, dropOldTable bool) (err error)

func (*AwsRedshift) Truncate

func (ar *AwsRedshift) Truncate(tableName string) error

Truncate deletes all records in tableName table

func (AwsRedshift) Type

func (AwsRedshift) Type() string

func (*AwsRedshift) Update

func (ar *AwsRedshift) Update(table *Table, object map[string]interface{}, whereKey string, whereValue interface{}) error

Update one record in Redshift

type BQItem

type BQItem struct {
	// contains filtered or unexported fields
}

BQItem struct for streaming inserts to BigQuery

func (*BQItem) Save

func (bqi *BQItem) Save() (row map[string]bigquery.Value, insertID string, err error)

type BigQuery

type BigQuery struct {
	// contains filtered or unexported fields
}

BigQuery adapter for creating,patching (schema or table), inserting and copying data from gcs to BigQuery

func NewBigQuery

func NewBigQuery(ctx context.Context, config *GoogleConfig, queryLogger *logging.QueryLogger, sqlTypes typing.SQLTypes) (*BigQuery, error)

NewBigQuery return configured BigQuery adapter instance

func (*BigQuery) Close

func (bq *BigQuery) Close() error

func (*BigQuery) Copy

func (bq *BigQuery) Copy(fileKey, tableName string) error

Copy transfers data from google cloud storage file to google BigQuery table as one batch

func (*BigQuery) CreateDataset

func (bq *BigQuery) CreateDataset(dataset string) error

CreateDataset creates google BigQuery Dataset if doesn't exist

func (*BigQuery) CreateTable

func (bq *BigQuery) CreateTable(table *Table) error

CreateTable creates google BigQuery table from Table

func (*BigQuery) DeletePartition

func (bq *BigQuery) DeletePartition(tableName string, datePartiton *base.DatePartition) error

func (*BigQuery) DropTable

func (bq *BigQuery) DropTable(table *Table) error

DropTable drops table from BigQuery

func (*BigQuery) GetTableSchema

func (bq *BigQuery) GetTableSchema(tableName string) (*Table, error)

GetTableSchema return google BigQuery table (name,columns) representation wrapped in Table struct

func (*BigQuery) Insert

func (bq *BigQuery) Insert(insertContext *InsertContext) error

Insert inserts data with InsertContext as a single object or a batch into BigQuery

func (*BigQuery) PatchTableSchema

func (bq *BigQuery) PatchTableSchema(patchSchema *Table) error

PatchTableSchema adds Table columns to google BigQuery table

func (*BigQuery) ReplaceTable

func (bq *BigQuery) ReplaceTable(originalTable, replacementTable string, dropOldTable bool) error

func (*BigQuery) Test

func (bq *BigQuery) Test() error

func (*BigQuery) Truncate

func (bq *BigQuery) Truncate(tableName string) error

Truncate deletes all records in tableName table

func (*BigQuery) Update

func (bq *BigQuery) Update(table *Table, object map[string]interface{}, whereKey string, whereValue interface{}) error

type ClickHouse

type ClickHouse struct {
	// contains filtered or unexported fields
}

ClickHouse is adapter for creating,patching (schema or table), inserting data to clickhouse

func NewClickHouse

func NewClickHouse(ctx context.Context, connectionString, database, cluster string, tlsConfig map[string]string,
	tableStatementFactory *TableStatementFactory, nullableFields map[string]bool,
	queryLogger *logging.QueryLogger, sqlTypes typing.SQLTypes) (*ClickHouse, error)

NewClickHouse returns configured ClickHouse adapter instance

func (*ClickHouse) Close

func (ch *ClickHouse) Close() error

Close underlying sql.DB

func (*ClickHouse) CreateDB

func (ch *ClickHouse) CreateDB(dbName string) error

CreateDB create database instance if doesn't exist

func (*ClickHouse) CreateTable

func (ch *ClickHouse) CreateTable(table *Table) error

CreateTable create database table with name,columns provided in Table representation New tables will have MergeTree() or ReplicatedMergeTree() engine depends on config.cluster empty or not

func (*ClickHouse) DropTable

func (ch *ClickHouse) DropTable(table *Table) error

func (*ClickHouse) GetTableSchema

func (ch *ClickHouse) GetTableSchema(tableName string) (*Table, error)

GetTableSchema return table (name,columns with name and types) representation wrapped in Table struct

func (*ClickHouse) Insert

func (ch *ClickHouse) Insert(insertContext *InsertContext) error

Insert inserts provided object in ClickHouse as a single record or batch

func (*ClickHouse) PatchTableSchema

func (ch *ClickHouse) PatchTableSchema(patchSchema *Table) error

PatchTableSchema add new columns(from provided Table) to existing table drop and create distributed table

func (*ClickHouse) ReplaceTable

func (ch *ClickHouse) ReplaceTable(originalTable, replacementTable string, dropOldTable bool) error

func (*ClickHouse) Truncate

func (ch *ClickHouse) Truncate(tableName string) error

Truncate deletes all records in tableName table

func (ClickHouse) Type

func (ClickHouse) Type() string

func (*ClickHouse) Update

func (ch *ClickHouse) Update(table *Table, object map[string]interface{}, whereKey string, whereValue interface{}) error

type ClickHouseConfig

type ClickHouseConfig struct {
	Dsns     []string          `mapstructure:"dsns,omitempty" json:"dsns,omitempty" yaml:"dsns,omitempty"`
	Database string            `mapstructure:"db,omitempty" json:"db,omitempty" yaml:"db,omitempty"`
	TLS      map[string]string `mapstructure:"tls,omitempty" json:"tls,omitempty" yaml:"tls,omitempty"`
	Cluster  string            `mapstructure:"cluster,omitempty" json:"cluster,omitempty" yaml:"cluster,omitempty"`
	Engine   *EngineConfig     `mapstructure:"engine,omitempty" json:"engine,omitempty" yaml:"engine,omitempty"`
}

ClickHouseConfig dto for deserialized clickhouse config

func (*ClickHouseConfig) Validate

func (chc *ClickHouseConfig) Validate() error

Validate required fields in ClickHouseConfig

type Columns

type Columns map[string]typing.SQLColumn

Columns is a list of columns representation

type DataSourceConfig

type DataSourceConfig struct {
	Host             string            `mapstructure:"host,omitempty" json:"host,omitempty" yaml:"host,omitempty"`
	Port             int               `mapstructure:"port,omitempty" json:"port,omitempty" yaml:"port,omitempty"`
	Db               string            `mapstructure:"db,omitempty" json:"db,omitempty" yaml:"db,omitempty"`
	Schema           string            `mapstructure:"schema,omitempty" json:"schema,omitempty" yaml:"schema,omitempty"`
	Username         string            `mapstructure:"username,omitempty" json:"username,omitempty" yaml:"username,omitempty"`
	Password         string            `mapstructure:"password,omitempty" json:"password,omitempty" yaml:"password,omitempty"`
	Parameters       map[string]string `mapstructure:"parameters,omitempty" json:"parameters,omitempty" yaml:"parameters,omitempty"`
	SSLConfiguration *SSLConfig        `mapstructure:"ssl,omitempty" json:"ssl,omitempty" yaml:"ssl,omitempty"`
	S3               *S3Config         `mapstructure:"s3,omitempty" json:"s3,omitempty" yaml:"s3,omitempty"`
}

DataSourceConfig dto for deserialized datasource config (e.g. in Postgres or AwsRedshift destination)

func ReadRedshiftConfig

func ReadRedshiftConfig(t *testing.T) (*DataSourceConfig, bool)

func (*DataSourceConfig) Validate

func (dsc *DataSourceConfig) Validate() error

Validate required fields in DataSourceConfig

type DbtCloud

type DbtCloud struct {
	AbstractHTTP
	// contains filtered or unexported fields
}

DbtCloud is an adapter for sending HTTP requests with predefined headers and templates for URL, body

func NewDbtCloud

func NewDbtCloud(config *DbtCloudConfig, httpAdapterConfiguration *HTTPAdapterConfiguration) (*DbtCloud, error)

NewDbtCloud returns configured DbtCloud adapter instance

func NewTestDbtCloud

func NewTestDbtCloud(config *DbtCloudConfig) *DbtCloud

NewTestDbtCloud returns configured DbtCloud adapter instance for testing connection

func (*DbtCloud) TestAccess

func (dbt *DbtCloud) TestAccess() error

TestAccess sends Get Job object request to dbt cloud and checks job state

func (*DbtCloud) Type

func (dbt *DbtCloud) Type() string

Type returns adapter type

type DbtCloudConfig

type DbtCloudConfig struct {
	AccountId int    `mapstructure:"account_id,omitempty" json:"account_id,omitempty" yaml:"account_id,omitempty"`
	JobId     int    `mapstructure:"job_id,omitempty" json:"job_id,omitempty" yaml:"job_id,omitempty"`
	Cause     string `mapstructure:"cause,omitempty" json:"cause,omitempty" yaml:"cause,omitempty"`
	Token     string `mapstructure:"token,omitempty" json:"token,omitempty" yaml:"token,omitempty"`
	Enabled   bool   `mapstructure:"enabled,omitempty" json:"enabled,omitempty" yaml:"enabled,omitempty"`
}

DbtCloudConfig is a dto for parsing DbtCloud configuration

func (*DbtCloudConfig) Validate

func (dcc *DbtCloudConfig) Validate() error

Validate returns err if invalid

type DbtCloudRequestFactory

type DbtCloudRequestFactory struct {
	// contains filtered or unexported fields
}

func (*DbtCloudRequestFactory) Close

func (dcc *DbtCloudRequestFactory) Close()

func (*DbtCloudRequestFactory) Create

func (dcc *DbtCloudRequestFactory) Create(object map[string]interface{}) (req *Request, err error)

Create implements HTTPRequestFactory interface

type EngineConfig

type EngineConfig struct {
	RawStatement    string        `mapstructure:"raw_statement,omitempty" json:"raw_statement,omitempty" yaml:"raw_statement,omitempty"`
	NullableFields  []string      `mapstructure:"nullable_fields,omitempty" json:"nullable_fields,omitempty" yaml:"nullable_fields,omitempty"`
	PartitionFields []FieldConfig `mapstructure:"partition_fields,omitempty" json:"partition_fields,omitempty" yaml:"partition_fields,omitempty"`
	OrderFields     []FieldConfig `mapstructure:"order_fields,omitempty" json:"order_fields,omitempty" yaml:"order_fields,omitempty"`
	PrimaryKeys     []string      `mapstructure:"primary_keys,omitempty" json:"primary_keys,omitempty" yaml:"primary_keys,omitempty"`
}

EngineConfig dto for deserialized clickhouse engine config

type Envelop

type Envelop struct {
	URL     string            `mapstructure:"url"`
	Method  string            `mapstructure:"method"`
	Headers map[string]string `mapstructure:"headers"`
	Body    interface{}       `mapstructure:"body"`
}

type ErrorPayload

type ErrorPayload struct {
	Dataset         string
	Bucket          string
	Project         string
	Database        string
	Cluster         string
	Schema          string
	Table           string
	Partition       string
	PrimaryKeys     []string
	Statement       string
	Values          []interface{}
	ValuesMapString string
	TotalObjects    int
}

func (*ErrorPayload) String

func (ep *ErrorPayload) String() string

type EventContext

type EventContext struct {
	CacheDisabled   bool
	DestinationID   string
	EventID         string
	TokenID         string
	Src             string
	RawEvent        events.Event
	ProcessedEvent  events.Event
	Table           *Table
	RecognizedEvent bool

	SerializedOriginalEvent string

	//HTTPRequest is applicable only for HTTP events
	HTTPRequest       *Request
	SynchronousResult map[string]interface{}
}

EventContext is an extracted serializable event identifiers it is used in counters/metrics/cache

func (*EventContext) GetSerializedOriginalEvent

func (ec *EventContext) GetSerializedOriginalEvent() string

type FacebookConversionAPI

type FacebookConversionAPI struct {
	AbstractHTTP
	// contains filtered or unexported fields
}

FacebookConversionAPI adapter for Facebook Conversion API

func NewFacebookConversion

func NewFacebookConversion(config *FacebookConversionAPIConfig, httpAdapterConfiguration *HTTPAdapterConfiguration) (*FacebookConversionAPI, error)

NewFacebookConversion returns new instance of adapter

func NewTestFacebookConversion

func NewTestFacebookConversion(config *FacebookConversionAPIConfig) *FacebookConversionAPI

NewTestFacebookConversion returns test instance of adapter

func (*FacebookConversionAPI) TestAccess

func (fc *FacebookConversionAPI) TestAccess() error

TestAccess sends test request (empty POST) to Facebook and check if pixel id or access token are invalid

func (*FacebookConversionAPI) Type

func (fc *FacebookConversionAPI) Type() string

Type returns adapter type

type FacebookConversionAPIConfig

type FacebookConversionAPIConfig struct {
	PixelID     string `mapstructure:"pixel_id,omitempty" json:"pixel_id,omitempty" yaml:"pixel_id,omitempty"`
	AccessToken string `mapstructure:"access_token,omitempty" json:"access_token,omitempty" yaml:"access_token,omitempty"`
}

FacebookConversionAPIConfig dto for deserialized datasource config (e.g. in Facebook destination)

func (*FacebookConversionAPIConfig) Validate

func (fmc *FacebookConversionAPIConfig) Validate() error

Validate required fields in FacebookConversionAPIConfig

type FacebookConversionEventsReq

type FacebookConversionEventsReq struct {
	Data          []map[string]interface{} `json:"data,omitempty"`
	TestEventCode string                   `json:"test_event_code,omitempty"`
}

FacebookConversionEventsReq is sent to Facebook Conversion API https://developers.facebook.com/docs/marketing-api/conversions-api/using-the-api#

type FacebookRequestFactory

type FacebookRequestFactory struct {
	// contains filtered or unexported fields
}

FacebookRequestFactory is a factory for building facebook POST HTTP requests from input events

func (*FacebookRequestFactory) Close

func (frf *FacebookRequestFactory) Close()

func (*FacebookRequestFactory) Create

func (frf *FacebookRequestFactory) Create(object map[string]interface{}) (*Request, error)

Create returns created http.Request transforms parameters (event_time -> unix timestamp) maps input event_type(event_name) with standard hashes fields according to documentation

type FacebookResponse

type FacebookResponse struct {
	Error FacebookResponseErr `json:"error,omitempty"`
}

FacebookResponse is a dto for parsing Facebook response

type FacebookResponseErr

type FacebookResponseErr struct {
	Message string `json:"message,omitempty"`
	Type    string `json:"type,omitempty"`
	Code    int    `json:"code,omitempty"`
}

FacebookResponseErr is a dto for parsing Facebook response error

type FieldConfig

type FieldConfig struct {
	Function string `mapstructure:"function,omitempty" json:"function,omitempty" yaml:"function,omitempty"`
	Field    string `mapstructure:"field,omitempty" json:"field,omitempty" yaml:"field,omitempty"`
}

FieldConfig dto for deserialized clickhouse engine fields

type FileCompression

type FileCompression string

type FileConfig

type FileConfig struct {
	Folder      string             `mapstructure:"folder,omitempty" json:"folder,omitempty" yaml:"folder,omitempty"`
	Format      FileEncodingFormat `mapstructure:"format,omitempty" json:"format,omitempty" yaml:"format,omitempty"`
	Compression FileCompression    `mapstructure:"compression,omitempty" json:"compression,omitempty" yaml:"compression,omitempty"`
}

func (FileConfig) PrepareFile

func (c FileConfig) PrepareFile(fileName *string, fileBytes *[]byte) error

func (*FileConfig) RequireDefaultStage

func (c *FileConfig) RequireDefaultStage(storageType string)

type FileEncodingFormat

type FileEncodingFormat string

type GoogleAnalytics

type GoogleAnalytics struct {
	AbstractHTTP
}

GoogleAnalytics is an adapter for sending events into GoogleAnalytics

func NewGoogleAnalytics

func NewGoogleAnalytics(config *GoogleAnalyticsConfig, httpAdapterConfiguration *HTTPAdapterConfiguration) (*GoogleAnalytics, error)

NewGoogleAnalytics returns configured GoogleAnalytics instance

func (*GoogleAnalytics) Type

func (ga *GoogleAnalytics) Type() string

Type returns adapter type

type GoogleAnalyticsConfig

type GoogleAnalyticsConfig struct {
	TrackingID string `mapstructure:"tracking_id" json:"tracking_id,omitempty" yaml:"tracking_id,omitempty"`
}

GoogleAnalyticsConfig is a GA configuration

func (*GoogleAnalyticsConfig) Validate

func (gac *GoogleAnalyticsConfig) Validate() error

Validate returns true if some fields are empty

type GoogleAnalyticsRequestFactory

type GoogleAnalyticsRequestFactory struct {
	// contains filtered or unexported fields
}

GoogleAnalyticsRequestFactory is a HTTPRequestFactory for GA

func (*GoogleAnalyticsRequestFactory) Close

func (garf *GoogleAnalyticsRequestFactory) Close()

func (*GoogleAnalyticsRequestFactory) Create

func (garf *GoogleAnalyticsRequestFactory) Create(object map[string]interface{}) (*Request, error)

Create returns HTTP GET request with query parameters removes system fields and map event type

type GoogleCloudStorage

type GoogleCloudStorage struct {
	// contains filtered or unexported fields
}

func NewGoogleCloudStorage

func NewGoogleCloudStorage(ctx context.Context, config *GoogleConfig) (*GoogleCloudStorage, error)

func (*GoogleCloudStorage) Close

func (gcs *GoogleCloudStorage) Close() error

Close closes gcp client and returns err if occurred

func (*GoogleCloudStorage) Compression

func (gcs *GoogleCloudStorage) Compression() FileCompression

func (*GoogleCloudStorage) DeleteObject

func (gcs *GoogleCloudStorage) DeleteObject(key string) (err error)

DeleteObject deletes object from google cloud storage bucket

func (*GoogleCloudStorage) Format

func (gcs *GoogleCloudStorage) Format() FileEncodingFormat

func (*GoogleCloudStorage) UploadBytes

func (gcs *GoogleCloudStorage) UploadBytes(fileName string, fileBytes []byte) (err error)

UploadBytes creates named file on google cloud storage with payload

func (*GoogleCloudStorage) ValidateWritePermission

func (gcs *GoogleCloudStorage) ValidateWritePermission() error

ValidateWritePermission tries to create temporary file and remove it. returns nil if file creation was successful.

type GoogleConfig

type GoogleConfig struct {
	Bucket     string      `mapstructure:"gcs_bucket,omitempty" json:"gcs_bucket,omitempty" yaml:"gcs_bucket,omitempty"`
	Project    string      `mapstructure:"bq_project,omitempty" json:"bq_project,omitempty" yaml:"bq_project,omitempty"`
	Dataset    string      `mapstructure:"bq_dataset,omitempty" json:"bq_dataset,omitempty" yaml:"bq_dataset,omitempty"`
	KeyFile    interface{} `mapstructure:"key_file,omitempty" json:"key_file,omitempty" yaml:"key_file,omitempty"`
	FileConfig `mapstructure:",squash" yaml:"-,inline"`
	// contains filtered or unexported fields
}

func (*GoogleConfig) Validate

func (gc *GoogleConfig) Validate() error

func (*GoogleConfig) ValidateBatchMode

func (gc *GoogleConfig) ValidateBatchMode() error

ValidateBatchMode checks that google cloud storage is set

type HTTPAdapter

type HTTPAdapter struct {
	// contains filtered or unexported fields
}

HTTPAdapter is an adapter for sending HTTP requests with retries has persistent request queue and workers pool under the hood

func NewHTTPAdapter

func NewHTTPAdapter(config *HTTPAdapterConfiguration) (*HTTPAdapter, error)

NewHTTPAdapter returns configured HTTPAdapter and starts queue observing goroutine

func (*HTTPAdapter) Close

func (h *HTTPAdapter) Close() (err error)

Close closes underlying queue, workers pool and HTTP client returns err if occurred

func (*HTTPAdapter) SendAsync

func (h *HTTPAdapter) SendAsync(eventContext *EventContext) error

SendAsync puts request to the queue returns err if can't put to the queue

type HTTPAdapterConfiguration

type HTTPAdapterConfiguration struct {
	DestinationID  string
	Dir            string
	HTTPConfig     *HTTPConfiguration
	HTTPReqFactory HTTPRequestFactory
	QueueFactory   *events.QueueFactory
	PoolWorkers    int
	DebugLogger    *logging.QueryLogger
	ErrorHandler   func(fallback bool, eventContext *EventContext, err error)
	SuccessHandler func(eventContext *EventContext)
}

HTTPAdapterConfiguration is a dto for creating HTTPAdapter

type HTTPConfiguration

type HTTPConfiguration struct {
	GlobalClientTimeout time.Duration
	RetryDelay          time.Duration
	RetryCount          int

	ClientMaxIdleConns        int
	ClientMaxIdleConnsPerHost int

	QueueFullnessThreshold uint64
}

HTTPConfiguration is a dto for HTTP adapter (client) configuration

type HTTPRequestFactory

type HTTPRequestFactory interface {
	Create(object map[string]interface{}) (*Request, error)
	Close()
}

HTTPRequestFactory is a factory for creating http.Request from input event object

func NewWebhookRequestFactory

func NewWebhookRequestFactory(destinationID, destinationType, httpMethod, urlTmplStr, bodyTmplStr string, headers map[string]string) (HTTPRequestFactory, error)

NewWebhookRequestFactory returns configured HTTPRequestFactory instance for webhook requests

type HTTPRequestQueue

type HTTPRequestQueue struct {
	// contains filtered or unexported fields
}

HTTPRequestQueue is a queue (persisted on file system) with requests

func NewHTTPRequestQueue

func NewHTTPRequestQueue(identifier string, queueFactory *events.QueueFactory) *HTTPRequestQueue

NewHTTPRequestQueue returns configured HTTPRequestQueue instance

func (*HTTPRequestQueue) Add

func (pq *HTTPRequestQueue) Add(req *Request, eventContext *EventContext) error

Add puts HTTP request and error callback to the queue

func (*HTTPRequestQueue) AddRequest

func (pq *HTTPRequestQueue) AddRequest(req *RetryableRequest) error

AddRequest puts request to the queue with retryCount

func (*HTTPRequestQueue) Close

func (pq *HTTPRequestQueue) Close() error

Close closes underlying persistent queue

func (*HTTPRequestQueue) DequeueBlock

func (pq *HTTPRequestQueue) DequeueBlock() (*RetryableRequest, error)

DequeueBlock waits when enqueued request is ready and return it

func (*HTTPRequestQueue) Size

func (pq *HTTPRequestQueue) Size() uint64

Size returns queue size

type HubSpot

type HubSpot struct {
	AbstractHTTP
	// contains filtered or unexported fields
}

HubSpot is an adapter for sending HTTP requests to HubSpot

func NewHubSpot

func NewHubSpot(config *HubSpotConfig, httpAdapterConfiguration *HTTPAdapterConfiguration) (*HubSpot, error)

NewHubSpot returns configured HubSpot adapter instance

func NewTestHubSpot

func NewTestHubSpot(config *HubSpotConfig) *HubSpot

NewTestHubSpot returns test instance of adapter

func (*HubSpot) TestAccess

func (h *HubSpot) TestAccess() error

TestAccess sends get user properties request to HubSpot and check if error has occurred

func (*HubSpot) Type

func (h *HubSpot) Type() string

Type returns adapter type

type HubSpotConfig

type HubSpotConfig struct {
	APIKey      string `mapstructure:"api_key,omitempty" json:"api_key,omitempty" yaml:"api_key,omitempty"`
	AccessToken string `mapstructure:"access_token,omitempty" json:"access_token,omitempty" yaml:"access_token,omitempty"`
	HubID       string `mapstructure:"hub_id,omitempty" json:"hub_id,omitempty" yaml:"hub_id,omitempty"`
}

HubSpotConfig is a dto for parsing HubSpot configuration

func (*HubSpotConfig) Validate

func (hc *HubSpotConfig) Validate() error

Validate returns err if invalid

type HubSpotContactProperty

type HubSpotContactProperty struct {
	Name string `json:"name"`
}

HubSpotContactProperty is a dto for serializing contact (user) properties from HubSpot

type HubSpotContactPropertyWithValues

type HubSpotContactPropertyWithValues struct {
	Property string      `json:"property"`
	Value    interface{} `json:"value"`
}

HubSpotContactPropertyWithValues is a dto for serializing contact (user) properties that are sent to HubSpot

type HubSpotContactRequest

type HubSpotContactRequest struct {
	Properties []HubSpotContactPropertyWithValues `json:"properties"`
}

HubSpotContactRequest is a dto for sending contact requests to HubSpot

type HubSpotRequestFactory

type HubSpotRequestFactory struct {
	// contains filtered or unexported fields
}

HubSpotRequestFactory is a factory for building HubSpot HTTP requests from input events reloads properties configuration every minutes in background goroutine

func (*HubSpotRequestFactory) Close

func (hf *HubSpotRequestFactory) Close()

Close closes underlying goroutine

func (*HubSpotRequestFactory) Create

func (hf *HubSpotRequestFactory) Create(object map[string]interface{}) (*Request, error)

Create returns created hubspot request depends on event type

type HubSpotResponse

type HubSpotResponse struct {
	Category string `json:"category"`
	Status   string `json:"status"`
	Message  string `json:"message"`
}

HubSpotResponse is a dto for receiving response from HubSpot

type InsertContext

type InsertContext struct {
	// contains filtered or unexported fields
}

InsertContext is used as a dto for insert operation

func NewBatchInsertContext

func NewBatchInsertContext(table *Table, objects []map[string]interface{}, merge bool, deleteConditions *base.DeleteConditions) *InsertContext

func NewSingleInsertContext

func NewSingleInsertContext(eventContext *EventContext) *InsertContext

type MySQL

type MySQL struct {
	// contains filtered or unexported fields
}

MySQL is adapter for creating, patching (schema or table), inserting data to mySQL database

func NewMySQL

func NewMySQL(ctx context.Context, config *DataSourceConfig, queryLogger *logging.QueryLogger, sqlTypes typing.SQLTypes) (*MySQL, error)

NewMySQL returns configured MySQL adapter instance

func (*MySQL) Close

func (m *MySQL) Close() error

Close underlying sql.DB

func (*MySQL) CreateDB

func (m *MySQL) CreateDB(dbSchemaName string) error

CreateDB creates database instance if doesn't exist

func (*MySQL) CreateTable

func (m *MySQL) CreateTable(table *Table) (err error)

CreateTable creates database table with name,columns provided in Table representation

func (*MySQL) DropTable

func (m *MySQL) DropTable(table *Table) (err error)

DropTable drops table in transaction

func (*MySQL) GetTableSchema

func (m *MySQL) GetTableSchema(tableName string) (*Table, error)

GetTableSchema returns table (name,columns with name and types) representation wrapped in Table struct

func (*MySQL) Insert

func (m *MySQL) Insert(insertContext *InsertContext) error

Insert provided object in mySQL with typecasts uses upsert (merge on conflict) if primary_keys are configured

func (*MySQL) OpenTx

func (m *MySQL) OpenTx() (*Transaction, error)

OpenTx opens underline sql transaction and return wrapped instance

func (*MySQL) PatchTableSchema

func (m *MySQL) PatchTableSchema(patchTable *Table) (err error)

PatchTableSchema adds new columns(from provided Table) to existing table

func (*MySQL) ReplaceTable

func (m *MySQL) ReplaceTable(originalTable, replacementTable string, dropOldTable bool) (err error)

func (*MySQL) Truncate

func (m *MySQL) Truncate(tableName string) error

Truncate deletes all records in tableName table

func (MySQL) Type

func (MySQL) Type() string

Type returns MySQL type

func (*MySQL) Update

func (m *MySQL) Update(table *Table, object map[string]interface{}, whereKey string, whereValue interface{}) error

Update one record in MySQL

type Npm

type Npm struct {
	AbstractHTTP
}

Npm is an adapter for sending HTTP request based on result of running javascript SDK destinations

func NewNpm

func NewNpm(httpAdapterConfiguration *HTTPAdapterConfiguration) (*Npm, error)

NewNpm returns configured Npm adapter instance

func (*Npm) Type

func (n *Npm) Type() string

Type returns adapter type

type NpmRequestFactory

type NpmRequestFactory struct {
}

func (*NpmRequestFactory) Close

func (n *NpmRequestFactory) Close()

func (*NpmRequestFactory) Create

func (n *NpmRequestFactory) Create(object map[string]interface{}) (req *Request, err error)

Create returns created http.Request with templates

type Postgres

type Postgres struct {
	// contains filtered or unexported fields
}

Postgres is adapter for creating,patching (schema or table), inserting data to postgres

func NewPostgres

func NewPostgres(ctx context.Context, config *DataSourceConfig, queryLogger *logging.QueryLogger, sqlTypes typing.SQLTypes) (*Postgres, error)

NewPostgres return configured Postgres adapter instance

func NewPostgresUnderRedshift

func NewPostgresUnderRedshift(ctx context.Context, config *DataSourceConfig, queryLogger *logging.QueryLogger, sqlTypes typing.SQLTypes) (*Postgres, error)

NewPostgresUnderRedshift returns configured Postgres adapter instance without mapping old types

func (*Postgres) Close

func (p *Postgres) Close() error

Close underlying sql.DB

func (*Postgres) CreateDbSchema

func (p *Postgres) CreateDbSchema(dbSchemaName string) error

CreateDbSchema creates database schema instance if doesn't exist

func (*Postgres) CreateTable

func (p *Postgres) CreateTable(table *Table) (err error)

CreateTable creates database table with name,columns provided in Table representation

func (*Postgres) DropTable

func (p *Postgres) DropTable(table *Table) (err error)

DropTable drops table in transaction

func (*Postgres) GetTableSchema

func (p *Postgres) GetTableSchema(tableName string) (*Table, error)

GetTableSchema returns table (name,columns with name and types) representation wrapped in Table struct

func (*Postgres) Insert

func (p *Postgres) Insert(insertContext *InsertContext) error

Insert inserts data with InsertContext as a single object or a batch into Redshift

func (*Postgres) OpenTx

func (p *Postgres) OpenTx() (*Transaction, error)

OpenTx opens underline sql transaction and return wrapped instance

func (*Postgres) PatchTableSchema

func (p *Postgres) PatchTableSchema(patchTable *Table) (err error)

PatchTableSchema adds new columns(from provided Table) to existing table

func (*Postgres) ReplaceTable

func (p *Postgres) ReplaceTable(originalTable, replacementTable string, dropOldTable bool) (err error)

func (*Postgres) Truncate

func (p *Postgres) Truncate(tableName string) error

Truncate deletes all records in tableName table

func (Postgres) Type

func (Postgres) Type() string

Type returns Postgres type

func (*Postgres) Update

func (p *Postgres) Update(table *Table, object map[string]interface{}, whereKey string, whereValue interface{}) error

Update one record in Postgres

type QueuedRequest

type QueuedRequest struct {
	SerializedRetryableRequest []byte
}

QueuedRequest is a dto for serialization in persistent queue

type Request

type Request struct {
	URL     string
	Method  string
	Body    []byte
	Headers map[string]string
}

Request is a dto for serialization custom http.Request

type RetryableRequest

type RetryableRequest struct {
	Request      *Request
	Retry        int
	DequeuedTime time.Time
	EventContext *EventContext
}

RetryableRequest is an HTTP request with retry count

type S3

type S3 struct {
	// contains filtered or unexported fields
}

S3 is a S3 adapter for uploading/deleting files

func NewS3

func NewS3(s3Config *S3Config) (*S3, error)

NewS3 returns configured S3 adapter

func (*S3) Close

func (a *S3) Close() error

Close returns nil

func (*S3) Compression

func (a *S3) Compression() FileCompression

func (*S3) DeleteObject

func (a *S3) DeleteObject(key string) error

DeleteObject deletes object from s3 bucket by key

func (*S3) Format

func (a *S3) Format() FileEncodingFormat

func (*S3) UploadBytes

func (a *S3) UploadBytes(fileName string, fileBytes []byte) error

UploadBytes creates named file on s3 with payload

func (*S3) ValidateWritePermission

func (a *S3) ValidateWritePermission() error

ValidateWritePermission tries to create temporary file and remove it. returns nil if file creation was successful.

type S3Config

type S3Config struct {
	AccessKeyID string `mapstructure:"access_key_id,omitempty" json:"access_key_id,omitempty" yaml:"access_key_id,omitempty"`
	SecretKey   string `mapstructure:"secret_access_key,omitempty" json:"secret_access_key,omitempty" yaml:"secret_access_key,omitempty"`
	Bucket      string `mapstructure:"bucket,omitempty" json:"bucket,omitempty" yaml:"bucket,omitempty"`
	Region      string `mapstructure:"region,omitempty" json:"region,omitempty" yaml:"region,omitempty"`
	Endpoint    string `mapstructure:"endpoint,omitempty" json:"endpoint,omitempty" yaml:"endpoint,omitempty"`
	FileConfig  `mapstructure:",squash" yaml:"-,inline"`
}

S3Config is a dto for config deserialization

func (*S3Config) Validate

func (s3c *S3Config) Validate() error

Validate returns err if invalid

type SQLAdapter

type SQLAdapter interface {
	Adapter
	GetTableSchema(tableName string) (*Table, error)
	CreateTable(schemaToCreate *Table) error
	PatchTableSchema(schemaToAdd *Table) error
	Truncate(tableName string) error
	Update(table *Table, object map[string]interface{}, whereKey string, whereValue interface{}) error
	DropTable(table *Table) (err error)
	ReplaceTable(originalTable, replacementTable string, dropOldTable bool) error
}

SQLAdapter is a manager for DWH tables

type SSLConfig

type SSLConfig struct {
	Mode       SSLMode `mapstructure:"mode,omitempty" json:"mode,omitempty" yaml:"mode,omitempty"`
	ServerCA   string  `mapstructure:"server_ca,omitempty" json:"server_ca,omitempty" yaml:"server_ca,omitempty"`
	ClientCert string  `mapstructure:"client_cert,omitempty" json:"client_cert,omitempty" yaml:"client_cert,omitempty"`
	ClientKey  string  `mapstructure:"client_key,omitempty" json:"client_key,omitempty" yaml:"client_key,omitempty"`
}

SSLConfig is a dto for deserialized SSL configuration for Postgres

func (*SSLConfig) Validate

func (sc *SSLConfig) Validate() error

Validate returns err if the ssl configuration is invalid

type SSLMode

type SSLMode string
const (
	SSLModeRequire    SSLMode = "require"
	SSLModeDisable    SSLMode = "disable"
	SSLModeVerifyCA   SSLMode = "verify-ca"
	SSLModeVerifyFull SSLMode = "verify-full"

	Unknown SSLMode = ""
)

func FromString

func FromString(sslMode string) SSLMode

func (SSLMode) String

func (s SSLMode) String() string

type Snowflake

type Snowflake struct {
	// contains filtered or unexported fields
}

Snowflake is adapter for creating,patching (schema or table), inserting data to snowflake

func NewSnowflake

func NewSnowflake(ctx context.Context, config *SnowflakeConfig, s3Config *S3Config,
	queryLogger *logging.QueryLogger, sqlTypes typing.SQLTypes) (*Snowflake, error)

NewSnowflake returns configured Snowflake adapter instance

func (*Snowflake) Close

func (s *Snowflake) Close() (multiErr error)

Close underlying sql.DB

func (*Snowflake) Copy

func (s *Snowflake) Copy(fileName, tableName string, header []string) error

Copy transfer data from s3 to Snowflake by passing COPY request to Snowflake

func (*Snowflake) CreateDbSchema

func (s *Snowflake) CreateDbSchema(dbSchemaName string) error

CreateDbSchema create database schema instance if doesn't exist

func (*Snowflake) CreateTable

func (s *Snowflake) CreateTable(table *Table) (err error)

CreateTable runs createTableInTransaction

func (*Snowflake) DropTable

func (s *Snowflake) DropTable(table *Table) (err error)

DropTable drops table in transaction

func (*Snowflake) GetTableSchema

func (s *Snowflake) GetTableSchema(tableName string) (*Table, error)

GetTableSchema returns table (name,columns with name and types) representation wrapped in Table struct

func (*Snowflake) Insert

func (s *Snowflake) Insert(insertContext *InsertContext) error

Insert inserts data with InsertContext as a single object or a batch into Snowflake

func (*Snowflake) OpenTx

func (s *Snowflake) OpenTx() (*Transaction, error)

OpenTx open underline sql transaction and return wrapped instance

func (*Snowflake) PatchTableSchema

func (s *Snowflake) PatchTableSchema(patchTable *Table) (err error)

PatchTableSchema add new columns(from provided Table) to existing table

func (*Snowflake) ReplaceTable

func (s *Snowflake) ReplaceTable(originalTable, replacementTable string, dropOldTable bool) (err error)

func (*Snowflake) Truncate

func (s *Snowflake) Truncate(tableName string) error

Truncate deletes all records in tableName table

func (Snowflake) Type

func (Snowflake) Type() string

func (*Snowflake) Update

func (s *Snowflake) Update(table *Table, object map[string]interface{}, whereKey string, whereValue interface{}) error

Update one record in Snowflake

type SnowflakeConfig

type SnowflakeConfig struct {
	Account    string             `mapstructure:"account,omitempty" json:"account,omitempty" yaml:"account,omitempty"`
	Port       int                `mapstructure:"port,omitempty" json:"port,omitempty" yaml:"port,omitempty"`
	Db         string             `mapstructure:"db,omitempty" json:"db,omitempty" yaml:"db,omitempty"`
	Schema     string             `mapstructure:"schema,omitempty" json:"schema,omitempty" yaml:"schema,omitempty"`
	Username   string             `mapstructure:"username,omitempty" json:"username,omitempty" yaml:"username,omitempty"`
	Password   string             `mapstructure:"password,omitempty" json:"password,omitempty" yaml:"password,omitempty"`
	Warehouse  string             `mapstructure:"warehouse,omitempty" json:"warehouse,omitempty" yaml:"warehouse,omitempty"`
	Stage      string             `mapstructure:"stage,omitempty" json:"stage,omitempty" yaml:"stage,omitempty"`
	Parameters map[string]*string `mapstructure:"parameters,omitempty" json:"parameters,omitempty" yaml:"parameters,omitempty"`
	S3         *S3Config          `mapstructure:"s3,omitempty" json:"s3,omitempty" yaml:"s3,omitempty"`
	Google     *GoogleConfig      `mapstructure:"google,omitempty" json:"google,omitempty" yaml:"google,omitempty"`
}

SnowflakeConfig dto for deserialized datasource config for Snowflake

func ReadSFConfig

func ReadSFConfig(t *testing.T) (*SnowflakeConfig, bool)

func (*SnowflakeConfig) Validate

func (sc *SnowflakeConfig) Validate() error

Validate required fields in SnowflakeConfig

type SqlParams

type SqlParams struct {
	// contains filtered or unexported fields
}

type Stage

type Stage interface {
	io.Closer
	UploadBytes(fileName string, fileBytes []byte) error
	DeleteObject(key string) error
}

Stage is an intermediate layer (for BQ, Snowflake, Redshift, etc)

type Table

type Table struct {
	Schema string
	Name   string

	Columns        Columns
	PKFields       map[string]bool
	PrimaryKeyName string
	Partition      schema.DatePartition

	DeletePkFields bool
}

Table is a dto for DWH Table representation

func (*Table) Clone

func (t *Table) Clone() *Table

Clone returns clone of current table

func (Table) Diff

func (t Table) Diff(another *Table) *Table

Diff calculates diff between current schema and another one. Return schema to add to current schema (for being equal) or empty if 1) another one is empty 2) all fields from another schema exist in current schema NOTE: Diff method doesn't take types into account

func (*Table) Exists

func (t *Table) Exists() bool

Exists returns true if there is at least one column

func (*Table) GetPKFields

func (t *Table) GetPKFields() []string

GetPKFields returns primary keys list

func (*Table) GetPKFieldsMap

func (t *Table) GetPKFieldsMap() map[string]bool

GetPKFieldsMap returns primary keys set

func (*Table) SortedColumnNames

func (t *Table) SortedColumnNames() []string

SortedColumnNames return column names sorted in alphabetical order

type TableField

type TableField struct {
	Field string      `json:"field,omitempty"`
	Type  string      `json:"type,omitempty"`
	Value interface{} `json:"value,omitempty"`
}

TableField is a table column representation

type TableStatementFactory

type TableStatementFactory struct {
	// contains filtered or unexported fields
}

TableStatementFactory is used for creating CREATE TABLE statements depends on config

func NewTableStatementFactory

func NewTableStatementFactory(config *ClickHouseConfig) (*TableStatementFactory, error)

func (TableStatementFactory) CreateTableStatement

func (tsf TableStatementFactory) CreateTableStatement(tableName, columnsClause string) string

CreateTableStatement return clickhouse DDL for creating table statement

type Tag

type Tag struct {
	// contains filtered or unexported fields
}

Tag that returns HTML tag based on incoming event. HTML tag supposed to be added to the page with javascript-sdk

func NewTag

func NewTag(config *TagConfig, destinationId string) (*Tag, error)

func (*Tag) Close

func (t *Tag) Close() error

func (*Tag) Insert

func (t *Tag) Insert(insertContext *InsertContext) error

func (*Tag) ProcessEvent

func (t *Tag) ProcessEvent(event map[string]interface{}) (map[string]interface{}, error)

func (*Tag) Type

func (t *Tag) Type() string

Type returns adapter type

type TagConfig

type TagConfig struct {
	TagID    string `mapstructure:"tagid,omitempty" json:"tagid,omitempty" yaml:"tagid,omitempty"`
	Template string `mapstructure:"template,omitempty" json:"template,omitempty" yaml:"template,omitempty"`
	Filter   string `mapstructure:"filter,omitempty" json:"filter,omitempty" yaml:"filter,omitempty"`
}

func (*TagConfig) Validate

func (tc *TagConfig) Validate() error

Validate returns err if invalid

type Transaction

type Transaction struct {
	// contains filtered or unexported fields
}

Transaction is sql transaction wrapper. Used for handling and log errors with db type (postgres, mySQL, redshift or snowflake) on Commit() and Rollback() calls

func (*Transaction) Commit

func (t *Transaction) Commit() error

Commit commits underlying transaction and returns err if occurred

func (*Transaction) Rollback

func (t *Transaction) Rollback() error

Rollback cancels underlying transaction and logs system err if occurred

type WebHook

type WebHook struct {
	AbstractHTTP
}

WebHook is an adapter for sending HTTP requests with configurable HTTP parameters (URL, body, headers)

func NewWebHook

func NewWebHook(config *WebHookConfig, httpAdapterConfiguration *HTTPAdapterConfiguration) (*WebHook, error)

NewWebHook returns configured WebHook adapter instance

func (*WebHook) Type

func (wh *WebHook) Type() string

Type returns adapter type

type WebHookConfig

type WebHookConfig struct {
	URL     string            `mapstructure:"url,omitempty" json:"url,omitempty" yaml:"url,omitempty"`
	Method  string            `mapstructure:"method,omitempty" json:"method,omitempty" yaml:"method,omitempty"`
	Body    string            `mapstructure:"body,omitempty" json:"body,omitempty" yaml:"body,omitempty"`
	Headers map[string]string `mapstructure:"headers,omitempty" json:"headers,omitempty" yaml:"headers,omitempty"`
}

WebHookConfig is a dto for parsing Webhook configuration

func (*WebHookConfig) Validate

func (whc *WebHookConfig) Validate() error

Validate returns err if invalid

type WebhookRequestFactory

type WebhookRequestFactory struct {
	// contains filtered or unexported fields
}

WebhookRequestFactory is a factory for building webhook (templating) HTTP requests from input events

func (*WebhookRequestFactory) Close

func (wrf *WebhookRequestFactory) Close()

func (*WebhookRequestFactory) Create

func (wrf *WebhookRequestFactory) Create(object map[string]interface{}) (req *Request, err error)

Create returns created http.Request with templates

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL