Documentation ¶
Index ¶
- Constants
- Variables
- func EnrichWithCollection(object map[string]interface{}, collection string)
- func EnrichWithSourceId(object map[string]interface{}, sourceId string)
- func EnrichWithTimeInterval(object map[string]interface{}, interval string, lower, upper time.Time)
- func ExtractSrc(event Event) string
- func ParseFallbackJSON(line []byte) (map[string]interface{}, error)
- func TimedEventBuilder() interface{}
- type APIProcessor
- type BulkProcessor
- type Consumer
- type DummyQueue
- type DummyRecognition
- type Event
- type FailedEvent
- type FailedEvents
- type HTTPContext
- type JsProcessor
- type Mapper
- type NativeQueue
- type Parser
- type ParsingError
- type PixelProcessor
- type Processor
- type ProcessorHolder
- func (ph *ProcessorHolder) GetAPIPreprocessor() Processor
- func (ph *ProcessorHolder) GetBulkPreprocessor() Processor
- func (ph *ProcessorHolder) GetByType(processorType string) Processor
- func (ph *ProcessorHolder) GetJSPreprocessor() Processor
- func (ph *ProcessorHolder) GetPixelPreprocessor() Processor
- func (ph *ProcessorHolder) GetSegmentPreprocessor() Processor
- type Queue
- type QueueFactory
- type Recognition
- type RequestContext
- type SegmentProcessor
- type SkippedEvent
- type SkippedEvents
- type TimedEvent
Constants ¶
const ( //SrcKey is a system field SrcKey = "src" //TimeChunkKey is a system field TimeChunkKey = "_time_interval" TimeIntervalStart = "_interval_start" TimeIntervalEnd = "_interval_end" CollectionIDKey = "_collection_id" SourceIDKey = "_source_id" )
const ( //EventType is used in Jitsu javascript SDK as a event type field EventType = "event_type" //UserIdentify is used in Jitsu javascript SDK as a type of identification event UserIdentify = "user_identify" //MaskedParameterValue is used to detect when parameter is masked MaskedParameterValue = "masked" )
const HTTPContextField = "__HTTP_CONTEXT__"
const SrcBulk = "bulk"
Variables ¶
var ErrQueueClosed = errors.New("queue is closed")
var HashedAnonymIDPath = jsonutils.NewJSONPath("/eventn_ctx/user/hashed_anonymous_id||/user/hashed_anonymous_id")
var UserAnonymIDPath = jsonutils.NewJSONPath("/eventn_ctx/user/anonymous_id||/user/anonymous_id")
UserAnonymIDPath is used for setting generated user identifier in case of GDPR
Functions ¶
func EnrichWithCollection ¶
EnrichWithCollection puts collection string to object
func EnrichWithSourceId ¶
EnrichWithSourceId puts source id string to object
func EnrichWithTimeInterval ¶
EnrichWithTimeInterval puts interval representation to object
func ExtractSrc ¶
ExtractSrc returns 'src' field from input event or an empty string
func ParseFallbackJSON ¶
ParseFallbackJSON returns parsed into map[string]interface{} event from events.FailedFact
func TimedEventBuilder ¶
func TimedEventBuilder() interface{}
TimedEventBuilder creates and returns a new *events.TimedEvent (must be pointer). This is used on deserialization
Types ¶
type APIProcessor ¶
type APIProcessor struct {
// contains filtered or unexported fields
}
APIProcessor preprocess server 2 server integration events
func NewAPIProcessor ¶
func NewAPIProcessor(usersRecognition Recognition) *APIProcessor
NewAPIProcessor returns new API preprocessor
func (*APIProcessor) Postprocess ¶
func (ap *APIProcessor) Postprocess(event Event, eventID string, destinationIDs []string, tokenID string)
Postprocess does nothing
func (*APIProcessor) Preprocess ¶
func (ap *APIProcessor) Preprocess(event Event, requestContext *RequestContext)
Preprocess puts src = api if doesn't exist
type BulkProcessor ¶
type BulkProcessor struct { }
BulkProcessor preprocess server 2 server bulk integration events
func NewBulkProcessor ¶
func NewBulkProcessor() *BulkProcessor
NewBulkProcessor returns new BulkProcessor
func (*BulkProcessor) Postprocess ¶
func (bp *BulkProcessor) Postprocess(event Event, eventID string, destinationIDs []string, tokenID string)
Postprocess does nothing
func (*BulkProcessor) Preprocess ¶
func (bp *BulkProcessor) Preprocess(event Event, requestContext *RequestContext)
Preprocess puts src = bulk if doesn't exist
type DummyQueue ¶
type DummyQueue struct { }
func (*DummyQueue) Close ¶
func (d *DummyQueue) Close() error
func (*DummyQueue) Consume ¶
func (d *DummyQueue) Consume(f map[string]interface{}, tokenID string)
func (*DummyQueue) ConsumeTimed ¶
func (d *DummyQueue) ConsumeTimed(f map[string]interface{}, t time.Time, tokenID string)
func (*DummyQueue) DequeueBlock ¶
type DummyRecognition ¶
type DummyRecognition struct{}
type Event ¶
type Event map[string]interface{}
Event is a dto for deserialization input events
func (Event) DebugString ¶
DebugString returns the same JSON string representation of the event as Serialize but limited to 1024 bytes
type FailedEvent ¶
type FailedEvent struct { MalformedEvent string `json:"malformed_event,omitempty"` Event json.RawMessage `json:"event,omitempty"` Error string `json:"error,omitempty"` EventID string `json:"event_id,omitempty"` RecognizedEvent bool }
FailedEvent is a dto for serialization fallback events
type FailedEvents ¶
type FailedEvents struct { Events []*FailedEvent Src map[string]int }
FailedEvents is a dto for keeping fallback events per src
func (*FailedEvents) IsEmpty ¶
func (ff *FailedEvents) IsEmpty() bool
IsEmpty return true if nil or events are empty
type HTTPContext ¶
type JsProcessor ¶
type JsProcessor struct {
// contains filtered or unexported fields
}
JsProcessor preprocess client integration events
func NewJsProcessor ¶
func NewJsProcessor(usersRecognition Recognition, userAgentPath string) *JsProcessor
NewJsProcessor returns configured JsProcessor
func (*JsProcessor) Postprocess ¶
func (jp *JsProcessor) Postprocess(event Event, eventID string, destinationIDs []string, tokenID string)
Postprocess puts event into recognition Service
func (*JsProcessor) Preprocess ¶
func (jp *JsProcessor) Preprocess(event Event, reqContext *RequestContext)
Preprocess sets user-agent from request header to configured nodes sets user anonymous ID if GDPR
type NativeQueue ¶
type NativeQueue struct {
// contains filtered or unexported fields
}
NativeQueue is a event queue implementation by Jitsu
func (*NativeQueue) Consume ¶
func (q *NativeQueue) Consume(f map[string]interface{}, tokenID string)
func (*NativeQueue) ConsumeTimed ¶
func (q *NativeQueue) ConsumeTimed(payload map[string]interface{}, t time.Time, tokenID string)
func (*NativeQueue) DequeueBlock ¶
type Parser ¶
type Parser interface {
ParseEventsBody(c *gin.Context) ([]Event, *ParsingError)
}
Parser is used for parsing income HTTP event body
func NewJitsuParser ¶
NewJitsuParser returns jitsuParser
func NewSegmentCompatParser ¶
func NewSegmentCompatParser(mapper Mapper, globalUniqueID *identifiers.UniqueID, maxEventSize, maxCachedEventsErrSize int) Parser
NewSegmentCompatParser returns configured Segment Parser for old Jitsu data structures
func NewSegmentParser ¶
func NewSegmentParser(mapper Mapper, globalUniqueID *identifiers.UniqueID, maxEventSize, maxCachedEventsErrSize int) Parser
NewSegmentParser returns configured Segment Parser for SDK 2.0 data structures
type ParsingError ¶
type PixelProcessor ¶
type PixelProcessor struct {
// contains filtered or unexported fields
}
PixelProcessor preprocess tracking pixel events
func NewPixelProcessor ¶
func NewPixelProcessor() *PixelProcessor
NewPixelProcessor returns configured PixelProcessor
func (*PixelProcessor) Postprocess ¶
func (pp *PixelProcessor) Postprocess(event Event, eventID string, destinationIDs []string, tokenID string)
func (*PixelProcessor) Preprocess ¶
func (pp *PixelProcessor) Preprocess(event Event, reqContext *RequestContext)
Preprocess set some values from request header into event
func (*PixelProcessor) Type ¶
func (pp *PixelProcessor) Type() string
Type returns preprocessor type
type Processor ¶
type Processor interface { Preprocess(event Event, requestContext *RequestContext) Postprocess(event Event, eventID string, destinationIDs []string, tokenID string) Type() string }
Processor is used in preprocessing and postprocessing events before and after consuming(storing) should be stateless
type ProcessorHolder ¶
type ProcessorHolder struct {
// contains filtered or unexported fields
}
ProcessorHolder is used for holding Processor instances per type
func NewProcessorHolder ¶
func NewProcessorHolder(apiProcessor *APIProcessor, jsProcessor *JsProcessor, pixelProcessor *PixelProcessor, segmentProcessor *SegmentProcessor, bulkProcessor *BulkProcessor) *ProcessorHolder
NewProcessorHolder returns configured ProcessHolder with all processor types instances
func (*ProcessorHolder) GetAPIPreprocessor ¶
func (ph *ProcessorHolder) GetAPIPreprocessor() Processor
func (*ProcessorHolder) GetBulkPreprocessor ¶
func (ph *ProcessorHolder) GetBulkPreprocessor() Processor
func (*ProcessorHolder) GetByType ¶
func (ph *ProcessorHolder) GetByType(processorType string) Processor
func (*ProcessorHolder) GetJSPreprocessor ¶
func (ph *ProcessorHolder) GetJSPreprocessor() Processor
func (*ProcessorHolder) GetPixelPreprocessor ¶
func (ph *ProcessorHolder) GetPixelPreprocessor() Processor
func (*ProcessorHolder) GetSegmentPreprocessor ¶
func (ph *ProcessorHolder) GetSegmentPreprocessor() Processor
type Queue ¶
type Queue interface { io.Closer Consume(f map[string]interface{}, tokenID string) ConsumeTimed(f map[string]interface{}, t time.Time, tokenID string) DequeueBlock() (Event, time.Time, string, error) }
Queue is an events queue. Possible implementations (dque, leveldbqueue, native)
type QueueFactory ¶
type QueueFactory struct {
// contains filtered or unexported fields
}
func NewQueueFactory ¶
func NewQueueFactory(redisPool *meta.RedisPool, redisReadTimeout time.Duration) *QueueFactory
func (*QueueFactory) Close ¶
func (qf *QueueFactory) Close() error
func (*QueueFactory) CreateEventsQueue ¶
func (qf *QueueFactory) CreateEventsQueue(subsystem, identifier string) (Queue, error)
func (*QueueFactory) CreateHTTPQueue ¶
func (qf *QueueFactory) CreateHTTPQueue(identifier string, serializationModelBuilder func() interface{}) queue.Queue
type Recognition ¶
type RequestContext ¶
type RequestContext struct { UserAgent string `json:"user_agent,omitempty"` ClientIP string `json:"client_ip,omitempty"` Referer string `json:"referer,omitempty"` JitsuAnonymousID string `json:"jitsu_anonymous_id,omitempty"` HashedAnonymousID string `json:"hashed_anonymous_id,omitempty"` CookiesLawCompliant bool `json:"cookie_laws_compliant,omitempty"` }
RequestContext is a dto for keeping request data like special headers or e.g. client IP address
type SegmentProcessor ¶
type SegmentProcessor struct {
// contains filtered or unexported fields
}
SegmentProcessor preprocess client integration events
func NewSegmentProcessor ¶
func NewSegmentProcessor(usersRecognition Recognition) *SegmentProcessor
NewSegmentProcessor returns configured SegmentProcessor
func (*SegmentProcessor) Postprocess ¶
func (sp *SegmentProcessor) Postprocess(event Event, eventID string, destinationIDs []string, tokenID string)
Postprocess puts event into recognition Service
func (*SegmentProcessor) Preprocess ¶
func (sp *SegmentProcessor) Preprocess(event Event, reqContext *RequestContext)
Preprocess adds src value sets user anonymous ID if GDPR
func (*SegmentProcessor) Type ¶
func (sp *SegmentProcessor) Type() string
Type returns preprocessor type
type SkippedEvent ¶
type SkippedEvent struct { Event json.RawMessage `json:"event,omitempty"` Error string `json:"error,omitempty"` RecognizedEvent bool }
SkippedEvent is a dto for serialization in events cache
type SkippedEvents ¶
type SkippedEvents struct {
Events []*SkippedEvent
}
SkippedEvents is a dto for keeping skipped events per src
func (*SkippedEvents) IsEmpty ¶
func (se *SkippedEvents) IsEmpty() bool
IsEmpty return true if nil or events are empty