Documentation ¶
Index ¶
- Constants
- Variables
- func ClearTypeMetaFields(object map[string]interface{})
- func ConvertOldMappings(mappingType config.FieldMappingType, oldStyleMappings []string) (*config.Mapping, error)
- func IsLetterOrNumber(symbol int32) bool
- func NewFieldMapper(newStyleMappings *config.Mapping) (events.Mapper, typing.SQLTypes, error)
- func Reformat(key string) string
- type BatchHeader
- type CSVMarshaller
- type DatePartition
- type DummyFlattener
- type DummyMapper
- type DummyTypeResolver
- type Envelope
- type Field
- type FieldMapper
- type Fields
- type Flattener
- type FlattenerImpl
- type Granularity
- type JSLogListener
- type JSONMarshaller
- type MappingRule
- type Marshaller
- type ParquetMarshaller
- type ProcessedFile
- func (pf *ProcessedFile) GetEventsPerSrc() map[string]int
- func (pf *ProcessedFile) GetOriginalRawEvents() []string
- func (pf *ProcessedFile) GetPayload() []map[string]interface{}
- func (pf *ProcessedFile) GetPayloadBytes(marshaller Marshaller) ([]byte, error)
- func (pf *ProcessedFile) GetPayloadBytesWithHeader(marshaller Marshaller) ([]byte, []string, error)
- func (pf *ProcessedFile) GetPayloadLen() int
- func (pf *ProcessedFile) GetPayloadUsingStronglyTypedMarshaller(stm StronglyTypedMarshaller) ([]byte, error)
- type Processor
- func (p *Processor) AddJavaScript(js string)
- func (p *Processor) AddJavaScriptVariables(jsVar map[string]interface{})
- func (p *Processor) Close()
- func (p *Processor) CloseJavaScriptTemplates()
- func (p *Processor) DestinationType() string
- func (p *Processor) GetTransformer() templates.TemplateExecutor
- func (p *Processor) InitJavaScriptTemplates() (err error)
- func (p *Processor) ProcessEvent(event map[string]interface{}, needCopyEvent bool) ([]Envelope, error)
- func (p *Processor) ProcessEvents(fileName string, objects []map[string]interface{}, ...) (flatData map[string]*ProcessedFile, ...)
- func (p *Processor) ProcessPulledEvents(tableName string, objects []map[string]interface{}) (map[string]*ProcessedFile, error)
- func (p *Processor) SetBuiltinTransformer(builtinTransformer templates.TemplateExecutor)
- func (p *Processor) SetDefaultUserTransform(defaultUserTransform string)
- type SQLTypeSuggestion
- type StronglyTypedMarshaller
- type TableNameExtractor
- type TypeResolver
- type TypeResolverImpl
Constants ¶
const ( JitsuEnvelopParameter = "JITSU_ENVELOP" JitsuUserRecognizedEvent = "JITSU_UR_EVENT" )
const SqlTypeKeyword = "__sql_type_"
Variables ¶
var ( JSONMarshallerInstance = JSONMarshaller{} CSVMarshallerInstance = CSVMarshaller{} )
var ErrSkipObject = errors.New("Transform or table name filter marked object to be skipped. This object will be skipped.")
var ( EventSpecialParameters = []string{ templates.TableNameParameter, JitsuEnvelopParameter, JitsuUserRecognizedEvent, } )
Functions ¶
func ClearTypeMetaFields ¶
func ClearTypeMetaFields(object map[string]interface{})
func ConvertOldMappings ¶
func ConvertOldMappings(mappingType config.FieldMappingType, oldStyleMappings []string) (*config.Mapping, error)
ConvertOldMappings converts old style mappings into new style return new style mappings or error
func IsLetterOrNumber ¶
IsLetterOrNumber returns true if input symbol is:
A - Z: 65-90 a - z: 97-122
func NewFieldMapper ¶
NewFieldMapper return FieldMapper, sql typecast and err
Types ¶
type BatchHeader ¶
type BatchHeader struct { TableName string Fields Fields Partition DatePartition }
BatchHeader is the schema result of parsing JSON objects
func (*BatchHeader) Exists ¶
func (bh *BatchHeader) Exists() bool
Exists returns true if there is at least one field
type CSVMarshaller ¶
type CSVMarshaller struct { }
func (CSVMarshaller) Marshal ¶
func (cm CSVMarshaller) Marshal(fields []string, object map[string]interface{}, buf *bytes.Buffer) error
Marshal marshals input object as csv values string with delimiter
func (CSVMarshaller) NeedHeader ¶
func (cm CSVMarshaller) NeedHeader() bool
type DatePartition ¶
type DatePartition struct { Field string Granularity Granularity }
type DummyFlattener ¶
type DummyFlattener struct { }
func NewDummyFlattener ¶
func NewDummyFlattener() *DummyFlattener
func (*DummyFlattener) FlattenObject ¶
func (df *DummyFlattener) FlattenObject(json map[string]interface{}) (map[string]interface{}, error)
FlattenObject return the same json object
type DummyMapper ¶
type DummyMapper struct{}
type DummyTypeResolver ¶
type DummyTypeResolver struct { }
DummyTypeResolver doesn't do anything
func NewDummyTypeResolver ¶
func NewDummyTypeResolver() *DummyTypeResolver
NewDummyTypeResolver return DummyTypeResolver
type Envelope ¶
type Envelope struct { Header *BatchHeader Event events.Event OriginalEvent string }
type Field ¶
type Field struct {
// contains filtered or unexported fields
}
Field is a data type holder with occurrences
func NewFieldWithSQLType ¶
func NewFieldWithSQLType(t typing.DataType, sqlTypeSuggestion *SQLTypeSuggestion) Field
NewFieldWithSQLType returns Field instance with configured suggested sql types
func (Field) GetSuggestedSQLType ¶
GetSuggestedSQLType returns suggested SQL type if configured is used in case when source overrides destination type
type FieldMapper ¶
type FieldMapper struct {
// contains filtered or unexported fields
}
type Fields ¶
func (Fields) OverrideTypes ¶
OverrideTypes check if field exists in other then put its type
type Flattener ¶
func NewFlattener ¶
func NewFlattener() Flattener
type FlattenerImpl ¶
type FlattenerImpl struct {
// contains filtered or unexported fields
}
func (*FlattenerImpl) FlattenObject ¶
func (f *FlattenerImpl) FlattenObject(json map[string]interface{}) (map[string]interface{}, error)
FlattenObject flatten object e.g. from {"key1":{"key2":123}} to {"key1_key2":123} from {"$key1":1} to {"_key1":1} from {"(key1)":1} to {"_key1_":1}
type Granularity ¶
type Granularity string
Granularity is a granularity of TimeInterval
const ( HOUR Granularity = "HOUR" DAY Granularity = "DAY" WEEK Granularity = "WEEK" MONTH Granularity = "MONTH" QUARTER Granularity = "QUARTER" YEAR Granularity = "YEAR" ALL Granularity = "ALL" )
func (Granularity) Format ¶
func (g Granularity) Format(t time.Time) string
Format returns formatted string value representation
func (Granularity) Lower ¶
func (g Granularity) Lower(t time.Time) time.Time
Lower returns the lower value of interval
func (Granularity) String ¶
func (g Granularity) String() string
String returns string value representation
type JSLogListener ¶
type JSLogListener struct {
// contains filtered or unexported fields
}
func (*JSLogListener) Data ¶
func (j *JSLogListener) Data(data []byte)
func (*JSLogListener) Log ¶
func (j *JSLogListener) Log(level, message string)
func (*JSLogListener) Timeout ¶
func (j *JSLogListener) Timeout() time.Duration
type JSONMarshaller ¶
type JSONMarshaller struct { }
func (JSONMarshaller) Marshal ¶
func (jm JSONMarshaller) Marshal(fields []string, object map[string]interface{}, buf *bytes.Buffer) error
Marshal object as json
func (JSONMarshaller) NeedHeader ¶
func (jm JSONMarshaller) NeedHeader() bool
type MappingRule ¶
type MappingRule struct {
// contains filtered or unexported fields
}
type Marshaller ¶
type ParquetMarshaller ¶
func (*ParquetMarshaller) Marshal ¶
func (pm *ParquetMarshaller) Marshal(bh *BatchHeader, data []map[string]interface{}) ([]byte, error)
type ProcessedFile ¶
type ProcessedFile struct { FileName string BatchHeader *BatchHeader RecognitionPayload bool // contains filtered or unexported fields }
ProcessedFile collect data in payload and return it in two formats
func (*ProcessedFile) GetEventsPerSrc ¶
func (pf *ProcessedFile) GetEventsPerSrc() map[string]int
GetEventsPerSrc returns events quantity per src
func (*ProcessedFile) GetOriginalRawEvents ¶
func (pf *ProcessedFile) GetOriginalRawEvents() []string
GetOriginalRawEvents return payload as is
func (*ProcessedFile) GetPayload ¶
func (pf *ProcessedFile) GetPayload() []map[string]interface{}
GetPayload return payload as is
func (*ProcessedFile) GetPayloadBytes ¶
func (pf *ProcessedFile) GetPayloadBytes(marshaller Marshaller) ([]byte, error)
GetPayloadBytes returns marshaling by marshaller func, joined with \n, bytes assume that payload can't be empty
func (*ProcessedFile) GetPayloadBytesWithHeader ¶
func (pf *ProcessedFile) GetPayloadBytesWithHeader(marshaller Marshaller) ([]byte, []string, error)
GetPayloadBytesWithHeader returns marshaling by marshaller func, joined with \n, bytes assume that payload can't be empty
func (*ProcessedFile) GetPayloadLen ¶
func (pf *ProcessedFile) GetPayloadLen() int
GetPayloadLen return count of rows(objects)
func (*ProcessedFile) GetPayloadUsingStronglyTypedMarshaller ¶
func (pf *ProcessedFile) GetPayloadUsingStronglyTypedMarshaller(stm StronglyTypedMarshaller) ([]byte, error)
GetPayloadUsingStronglyTypedMarshaller returns bytes, containing marshalled payload StronglyTypedMarshaller needs to know payload schema (types of fields) to convert payload to byte slice
type Processor ¶
type Processor struct { MappingStyle string // contains filtered or unexported fields }
func NewProcessor ¶
func NewProcessor(destinationID string, destinationConfig *config.DestinationConfig, isSQLType bool, tableNameFuncExpression string, fieldMapper events.Mapper, enrichmentRules []enrichment.Rule, flattener Flattener, typeResolver TypeResolver, uniqueIDField *identifiers.UniqueID, maxColumnNameLen int, mappingStyle string, userRecognitionEnabled bool) (*Processor, error)
func (*Processor) AddJavaScript ¶
AddJavaScript loads javascript to transformation template's vm
func (*Processor) AddJavaScriptVariables ¶
AddJavaScriptVariables loads variable to globalThis object of transformation template's vm
func (*Processor) CloseJavaScriptTemplates ¶
func (p *Processor) CloseJavaScriptTemplates()
func (*Processor) DestinationType ¶
func (*Processor) GetTransformer ¶
func (p *Processor) GetTransformer() templates.TemplateExecutor
func (*Processor) InitJavaScriptTemplates ¶
InitJavaScriptTemplates loads destination transform javascript, inits context variables. and sets up template executor
func (*Processor) ProcessEvent ¶
func (p *Processor) ProcessEvent(event map[string]interface{}, needCopyEvent bool) ([]Envelope, error)
ProcessEvent returns table representation, processed flatten object
func (*Processor) ProcessEvents ¶
func (p *Processor) ProcessEvents(fileName string, objects []map[string]interface{}, alreadyUploadedTables map[string]bool, needCopyEvent bool) (flatData map[string]*ProcessedFile, recognizedFlatData map[string]*ProcessedFile, failedEvents *events.FailedEvents, skippedEvents *events.SkippedEvents, err error)
ProcessEvents processes events objects returns array of processed objects per table like {"table1": []objects, "table2": []objects}, All failed events are moved to separate collection for sending to fallback
func (*Processor) ProcessPulledEvents ¶
func (p *Processor) ProcessPulledEvents(tableName string, objects []map[string]interface{}) (map[string]*ProcessedFile, error)
ProcessPulledEvents processes events objects without applying mapping rules returns array of processed objects under tablename or error if at least 1 was occurred
func (*Processor) SetBuiltinTransformer ¶
func (p *Processor) SetBuiltinTransformer(builtinTransformer templates.TemplateExecutor)
SetBuiltinTransformer javascript executor for builtin js code (e.g. npm destination)
func (*Processor) SetDefaultUserTransform ¶
SetDefaultUserTransform set default transformation code that will be used if no transform or mapping settings provided
type SQLTypeSuggestion ¶
type SQLTypeSuggestion struct {
// contains filtered or unexported fields
}
SQLTypeSuggestion is a struct which keeps certain SQL types per certain destination type
func NewSQLTypeSuggestion ¶
func NewSQLTypeSuggestion(sqlType typing.SQLColumn, sqlTypePerDestination map[string]typing.SQLColumn) *SQLTypeSuggestion
NewSQLTypeSuggestion returns configured SQLTypeSuggestion instance
type StronglyTypedMarshaller ¶
type StronglyTypedMarshaller interface {
Marshal(bh *BatchHeader, data []map[string]interface{}) ([]byte, error)
}
func NewParquetMarshaller ¶
func NewParquetMarshaller(useGZIP bool) StronglyTypedMarshaller
type TableNameExtractor ¶
type TableNameExtractor struct { Expression string // contains filtered or unexported fields }
TableNameExtractor extracts table name from every JSON event
func NewTableNameExtractor ¶
func NewTableNameExtractor(tableNameExtractExpression string, funcMap template.FuncMap) (*TableNameExtractor, error)
NewTableNameExtractor returns configured TableNameExtractor
func (*TableNameExtractor) Close ¶
func (tne *TableNameExtractor) Close()
func (*TableNameExtractor) Extract ¶
func (tne *TableNameExtractor) Extract(object map[string]interface{}) (result string, err error)
Extract returns table name string. Extracts it from JSON event with text/template expression or javascript code. replaces all empty fields with 'null': {{.field1}} with object {'field2':2} => returns 'null'
func (*TableNameExtractor) Format ¶
func (tne *TableNameExtractor) Format() string
type TypeResolver ¶
TypeResolver resolves schema.Fields from input object
type TypeResolverImpl ¶
type TypeResolverImpl struct { }
TypeResolverImpl resolves types based on converter.go rules
func NewTypeResolver ¶
func NewTypeResolver() *TypeResolverImpl
NewTypeResolver returns TypeResolverImpl
func (*TypeResolverImpl) Resolve ¶
func (tr *TypeResolverImpl) Resolve(object map[string]interface{}) (Fields, error)
Resolve return Fields representation of input object apply default typecast and define column types reformat from json.Number into int64 or float64 and put back reformat from string with timestamp into time.Time and put back