events

package
v0.0.0-...-8aeb8a1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 29, 2023 License: MIT Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	//SrcKey is a system field
	SrcKey = "src"
	//TimeChunkKey is a system field
	TimeChunkKey      = "_time_interval"
	TimeIntervalStart = "_interval_start"
	TimeIntervalEnd   = "_interval_end"
	CollectionIDKey   = "_collection_id"
	SourceIDKey       = "_source_id"
)
View Source
const (
	//EventType is used in Jitsu javascript SDK as a event type field
	EventType = "event_type"
	//UserIdentify is used in Jitsu javascript SDK as a type of identification event
	UserIdentify = "user_identify"

	//MaskedParameterValue is used to detect when parameter is masked
	MaskedParameterValue = "masked"
)
View Source
const HTTPContextField = "__HTTP_CONTEXT__"
View Source
const SrcBulk = "bulk"

Variables

View Source
var ErrQueueClosed = errors.New("queue is closed")
View Source
var HashedAnonymIDPath = jsonutils.NewJSONPath("/eventn_ctx/user/hashed_anonymous_id||/user/hashed_anonymous_id")
View Source
var UserAnonymIDPath = jsonutils.NewJSONPath("/eventn_ctx/user/anonymous_id||/user/anonymous_id")

UserAnonymIDPath is used for setting generated user identifier in case of GDPR

Functions

func EnrichWithCollection

func EnrichWithCollection(object map[string]interface{}, collection string)

EnrichWithCollection puts collection string to object

func EnrichWithSourceId

func EnrichWithSourceId(object map[string]interface{}, sourceId string)

EnrichWithSourceId puts source id string to object

func EnrichWithTimeInterval

func EnrichWithTimeInterval(object map[string]interface{}, interval string, lower, upper time.Time)

EnrichWithTimeInterval puts interval representation to object

func ExtractSrc

func ExtractSrc(event Event) string

ExtractSrc returns 'src' field from input event or an empty string

func ParseFallbackJSON

func ParseFallbackJSON(line []byte) (map[string]interface{}, error)

ParseFallbackJSON returns parsed into map[string]interface{} event from events.FailedFact

func TimedEventBuilder

func TimedEventBuilder() interface{}

TimedEventBuilder creates and returns a new *events.TimedEvent (must be pointer). This is used on deserialization

Types

type APIProcessor

type APIProcessor struct {
	// contains filtered or unexported fields
}

APIProcessor preprocess server 2 server integration events

func NewAPIProcessor

func NewAPIProcessor(usersRecognition Recognition) *APIProcessor

NewAPIProcessor returns new API preprocessor

func (*APIProcessor) Postprocess

func (ap *APIProcessor) Postprocess(event Event, eventID string, destinationIDs []string, tokenID string)

Postprocess does nothing

func (*APIProcessor) Preprocess

func (ap *APIProcessor) Preprocess(event Event, requestContext *RequestContext)

Preprocess puts src = api if doesn't exist

func (*APIProcessor) Type

func (ap *APIProcessor) Type() string

Type returns preprocessor type

type BulkProcessor

type BulkProcessor struct {
}

BulkProcessor preprocess server 2 server bulk integration events

func NewBulkProcessor

func NewBulkProcessor() *BulkProcessor

NewBulkProcessor returns new BulkProcessor

func (*BulkProcessor) Postprocess

func (bp *BulkProcessor) Postprocess(event Event, eventID string, destinationIDs []string, tokenID string)

Postprocess does nothing

func (*BulkProcessor) Preprocess

func (bp *BulkProcessor) Preprocess(event Event, requestContext *RequestContext)

Preprocess puts src = bulk if doesn't exist

func (*BulkProcessor) Type

func (bp *BulkProcessor) Type() string

Type returns preprocessor type

type Consumer

type Consumer interface {
	io.Closer
	Consume(event map[string]interface{}, tokenID string)
}

type DummyQueue

type DummyQueue struct {
}

func (*DummyQueue) Close

func (d *DummyQueue) Close() error

func (*DummyQueue) Consume

func (d *DummyQueue) Consume(f map[string]interface{}, tokenID string)

func (*DummyQueue) ConsumeTimed

func (d *DummyQueue) ConsumeTimed(f map[string]interface{}, t time.Time, tokenID string)

func (*DummyQueue) DequeueBlock

func (d *DummyQueue) DequeueBlock() (Event, time.Time, string, error)

type DummyRecognition

type DummyRecognition struct{}

func (*DummyRecognition) Event

func (d *DummyRecognition) Event(event Event, eventID string, destinationIDs []string, tokenID string)

type Event

type Event map[string]interface{}

Event is a dto for deserialization input events

func (Event) Clone

func (f Event) Clone() Event

Clone returns copy of event

func (Event) DebugString

func (f Event) DebugString() string

DebugString returns the same JSON string representation of the event as Serialize but limited to 1024 bytes

func (Event) Serialize

func (f Event) Serialize() string

Serialize returns JSON string representation of the event

type FailedEvent

type FailedEvent struct {
	MalformedEvent  string          `json:"malformed_event,omitempty"`
	Event           json.RawMessage `json:"event,omitempty"`
	Error           string          `json:"error,omitempty"`
	EventID         string          `json:"event_id,omitempty"`
	RecognizedEvent bool
}

FailedEvent is a dto for serialization fallback events

type FailedEvents

type FailedEvents struct {
	Events []*FailedEvent
	Src    map[string]int
}

FailedEvents is a dto for keeping fallback events per src

func NewFailedEvents

func NewFailedEvents() *FailedEvents

NewFailedEvents returns FailedEvents

func (*FailedEvents) IsEmpty

func (ff *FailedEvents) IsEmpty() bool

IsEmpty return true if nil or events are empty

type HTTPContext

type HTTPContext struct {
	Headers http.Header `json:"headers,omitempty"`
}

type JsProcessor

type JsProcessor struct {
	// contains filtered or unexported fields
}

JsProcessor preprocess client integration events

func NewJsProcessor

func NewJsProcessor(usersRecognition Recognition, userAgentPath string) *JsProcessor

NewJsProcessor returns configured JsProcessor

func (*JsProcessor) Postprocess

func (jp *JsProcessor) Postprocess(event Event, eventID string, destinationIDs []string, tokenID string)

Postprocess puts event into recognition Service

func (*JsProcessor) Preprocess

func (jp *JsProcessor) Preprocess(event Event, reqContext *RequestContext)

Preprocess sets user-agent from request header to configured nodes sets user anonymous ID if GDPR

func (*JsProcessor) Type

func (jp *JsProcessor) Type() string

Type returns preprocessor type

type Mapper

type Mapper interface {
	Map(object map[string]interface{}) (map[string]interface{}, error)
}

type NativeQueue

type NativeQueue struct {
	// contains filtered or unexported fields
}

NativeQueue is a event queue implementation by Jitsu

func (*NativeQueue) Close

func (q *NativeQueue) Close() error

Close closes underlying queue

func (*NativeQueue) Consume

func (q *NativeQueue) Consume(f map[string]interface{}, tokenID string)

func (*NativeQueue) ConsumeTimed

func (q *NativeQueue) ConsumeTimed(payload map[string]interface{}, t time.Time, tokenID string)

func (*NativeQueue) DequeueBlock

func (q *NativeQueue) DequeueBlock() (Event, time.Time, string, error)

type Parser

type Parser interface {
	ParseEventsBody(c *gin.Context) ([]Event, *ParsingError)
}

Parser is used for parsing income HTTP event body

func NewJitsuParser

func NewJitsuParser(maxEventSize, maxCachedEventsErrSize int) Parser

NewJitsuParser returns jitsuParser

func NewSegmentCompatParser

func NewSegmentCompatParser(mapper Mapper, globalUniqueID *identifiers.UniqueID, maxEventSize, maxCachedEventsErrSize int) Parser

NewSegmentCompatParser returns configured Segment Parser for old Jitsu data structures

func NewSegmentParser

func NewSegmentParser(mapper Mapper, globalUniqueID *identifiers.UniqueID, maxEventSize, maxCachedEventsErrSize int) Parser

NewSegmentParser returns configured Segment Parser for SDK 2.0 data structures

type ParsingError

type ParsingError struct {
	LimitedPayload []byte
	Err            error
}

type PixelProcessor

type PixelProcessor struct {
	// contains filtered or unexported fields
}

PixelProcessor preprocess tracking pixel events

func NewPixelProcessor

func NewPixelProcessor() *PixelProcessor

NewPixelProcessor returns configured PixelProcessor

func (*PixelProcessor) Postprocess

func (pp *PixelProcessor) Postprocess(event Event, eventID string, destinationIDs []string, tokenID string)

func (*PixelProcessor) Preprocess

func (pp *PixelProcessor) Preprocess(event Event, reqContext *RequestContext)

Preprocess set some values from request header into event

func (*PixelProcessor) Type

func (pp *PixelProcessor) Type() string

Type returns preprocessor type

type Processor

type Processor interface {
	Preprocess(event Event, requestContext *RequestContext)
	Postprocess(event Event, eventID string, destinationIDs []string, tokenID string)
	Type() string
}

Processor is used in preprocessing and postprocessing events before and after consuming(storing) should be stateless

type ProcessorHolder

type ProcessorHolder struct {
	// contains filtered or unexported fields
}

ProcessorHolder is used for holding Processor instances per type

func NewProcessorHolder

func NewProcessorHolder(apiProcessor *APIProcessor, jsProcessor *JsProcessor, pixelProcessor *PixelProcessor,
	segmentProcessor *SegmentProcessor, bulkProcessor *BulkProcessor) *ProcessorHolder

NewProcessorHolder returns configured ProcessHolder with all processor types instances

func (*ProcessorHolder) GetAPIPreprocessor

func (ph *ProcessorHolder) GetAPIPreprocessor() Processor

func (*ProcessorHolder) GetBulkPreprocessor

func (ph *ProcessorHolder) GetBulkPreprocessor() Processor

func (*ProcessorHolder) GetByType

func (ph *ProcessorHolder) GetByType(processorType string) Processor

func (*ProcessorHolder) GetJSPreprocessor

func (ph *ProcessorHolder) GetJSPreprocessor() Processor

func (*ProcessorHolder) GetPixelPreprocessor

func (ph *ProcessorHolder) GetPixelPreprocessor() Processor

func (*ProcessorHolder) GetSegmentPreprocessor

func (ph *ProcessorHolder) GetSegmentPreprocessor() Processor

type Queue

type Queue interface {
	io.Closer
	Consume(f map[string]interface{}, tokenID string)
	ConsumeTimed(f map[string]interface{}, t time.Time, tokenID string)
	DequeueBlock() (Event, time.Time, string, error)
}

Queue is an events queue. Possible implementations (dque, leveldbqueue, native)

func NewNativeQueue

func NewNativeQueue(namespace, subsystem, identifier string, underlyingQueue queue.Queue) (Queue, error)

type QueueFactory

type QueueFactory struct {
	// contains filtered or unexported fields
}

func NewQueueFactory

func NewQueueFactory(redisPool *meta.RedisPool, redisReadTimeout time.Duration) *QueueFactory

func (*QueueFactory) Close

func (qf *QueueFactory) Close() error

func (*QueueFactory) CreateEventsQueue

func (qf *QueueFactory) CreateEventsQueue(subsystem, identifier string) (Queue, error)

func (*QueueFactory) CreateHTTPQueue

func (qf *QueueFactory) CreateHTTPQueue(identifier string, serializationModelBuilder func() interface{}) queue.Queue

type Recognition

type Recognition interface {
	Event(event Event, eventID string, destinationIDs []string, tokenID string)
}

type RequestContext

type RequestContext struct {
	UserAgent           string `json:"user_agent,omitempty"`
	ClientIP            string `json:"client_ip,omitempty"`
	Referer             string `json:"referer,omitempty"`
	JitsuAnonymousID    string `json:"jitsu_anonymous_id,omitempty"`
	HashedAnonymousID   string `json:"hashed_anonymous_id,omitempty"`
	CookiesLawCompliant bool   `json:"cookie_laws_compliant,omitempty"`
}

RequestContext is a dto for keeping request data like special headers or e.g. client IP address

type SegmentProcessor

type SegmentProcessor struct {
	// contains filtered or unexported fields
}

SegmentProcessor preprocess client integration events

func NewSegmentProcessor

func NewSegmentProcessor(usersRecognition Recognition) *SegmentProcessor

NewSegmentProcessor returns configured SegmentProcessor

func (*SegmentProcessor) Postprocess

func (sp *SegmentProcessor) Postprocess(event Event, eventID string, destinationIDs []string, tokenID string)

Postprocess puts event into recognition Service

func (*SegmentProcessor) Preprocess

func (sp *SegmentProcessor) Preprocess(event Event, reqContext *RequestContext)

Preprocess adds src value sets user anonymous ID if GDPR

func (*SegmentProcessor) Type

func (sp *SegmentProcessor) Type() string

Type returns preprocessor type

type SkippedEvent

type SkippedEvent struct {
	Event           json.RawMessage `json:"event,omitempty"`
	Error           string          `json:"error,omitempty"`
	RecognizedEvent bool
}

SkippedEvent is a dto for serialization in events cache

type SkippedEvents

type SkippedEvents struct {
	Events []*SkippedEvent
}

SkippedEvents is a dto for keeping skipped events per src

func (*SkippedEvents) IsEmpty

func (se *SkippedEvents) IsEmpty() bool

IsEmpty return true if nil or events are empty

type TimedEvent

type TimedEvent struct {
	Payload      map[string]interface{}
	DequeuedTime time.Time
	TokenID      string
}

TimedEvent is used for keeping events with time in queue

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL