connector

package
v5.1.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 3, 2025 License: MIT Imports: 26 Imported by: 4

Documentation

Index

Constants

View Source
const DefaultMaxPermittedPanics = 10

Variables

View Source
var ReloaderComponentNotFoundErr = errors.New("component not found")

Functions

func ApplyFieldForceUpdate

func ApplyFieldForceUpdate(fieldUpdate []string, enricherSource map[string]interface{}, outputSource map[string]interface{})

ApplyFieldForceUpdate applies FieldForceUpdate merging configuration on input documents

func ApplyFieldKeepEarliest

func ApplyFieldKeepEarliest(fieldKeepEarliest []string, enricherSource map[string]interface{}, outputSource map[string]interface{})

ApplyFieldKeepEarliest applies all FieldKeepEarliest merging configuration on input documents

func ApplyFieldKeepLatest

func ApplyFieldKeepLatest(fieldKeepLatest []string, enricherSource map[string]interface{}, outputSource map[string]interface{})

ApplyFieldKeepLatest applies all FieldKeepLatest merging configuration on input documents

func ApplyFieldMath

func ApplyFieldMath(config []FieldMath, newDoc *models.Document, existingDoc *models.Document, outputSource map[string]interface{})

ApplyFieldMath applies all FieldMath merging configuration on input documents

func ApplyFieldMerge

func ApplyFieldMerge(fieldMerge []string, enricherSource map[string]interface{}, outputSource map[string]interface{})

ApplyFieldMerge applies all FieldReplace merging configuration on input documents Merge can merge a single field with a slice, or vice versa The result of a merge is always a slice with unique fields

func ApplyFieldReplace

func ApplyFieldReplace(fieldReplace []string, enricherSource map[string]interface{}, outputSource map[string]interface{})

ApplyFieldReplace applies all FieldReplace merging configuration on input documents

func ApplyFieldReplaceIfMissing

func ApplyFieldReplaceIfMissing(fieldReplace []string, enricherSource map[string]interface{}, outputSource map[string]interface{})

ApplyFieldReplaceIfMissing applies all FieldReplace merging configuration on input documents

func FilterHeaders

func FilterHeaders(filters []FilterHeaderOption, headers []*sarama.RecordHeader) (bool, string)

func GetHeader added in v5.1.4

func GetHeader(key string, headers []*sarama.RecordHeader) (value string, found bool)

func LookupNestedMap

func LookupNestedMap(pathParts []string, data interface{}) (interface{}, bool)

LookupNestedMap lookup for a value corresponding to the exact specified path inside a map

func LookupNestedMapFullPaths

func LookupNestedMapFullPaths(data interface{}, paths [][]string, separator string) (interface{}, bool)

LookupNestedMapFullPaths Looks searches value of all paths in data and concatenates them with a separator

Types

type AvroToJSONTransformer

type AvroToJSONTransformer struct {
	// contains filtered or unexported fields
}

AvroToJSONTransformer : Deprecated

func NewAvroToJSONTransformer

func NewAvroToJSONTransformer(schemaRegistryEndpoint string, ttlCacheDuration time.Duration) (*AvroToJSONTransformer, error)

NewAvroToJSONTransformer New transformer constructor TODO : Manage multiple schemaRegistryEndpoint ? In case of server failure ? Deprecated

func (AvroToJSONTransformer) AvroBinaryToNative

func (transformer AvroToJSONTransformer) AvroBinaryToNative(avroBinary []byte) (interface{}, error)

AvroBinaryToNative : Deprecated

func (AvroToJSONTransformer) AvroBinaryToTextual

func (transformer AvroToJSONTransformer) AvroBinaryToTextual(avroBinary []byte) ([]byte, error)

AvroBinaryToTextual : Deprecated

func (AvroToJSONTransformer) Transform

func (transformer AvroToJSONTransformer) Transform(msg Message) (Message, error)

Transform is the convertor transformer, it has to decode the AVRO message into a byte message (JSONMessage) Deprecated

type AvroToMapTransformer added in v5.1.4

type AvroToMapTransformer struct {
	// contains filtered or unexported fields
}

AvroToMapTransformer :

func NewAvroToMapTransformer added in v5.1.4

func NewAvroToMapTransformer(schemaRegistryEndpoint string, ttlCacheDuration time.Duration) (*AvroToMapTransformer, error)

NewAvroToMapTransformer New transformer constructor TODO : Manage multiple schemaRegistryEndpoint ? In case of server failure ?

func (AvroToMapTransformer) Transform added in v5.1.4

func (transformer AvroToMapTransformer) Transform(msg Message) (Message, error)

Transform is the convertor transformer, it decodes the AVRO message into an DecodedKafkaMessage message

type BatchSink

type BatchSink struct {
	TargetURL    string
	Send         chan Message
	Client       *retryablehttp.Client
	BufferSize   int
	FlushTimeout time.Duration
	FormatToBIRs FormatToBIRs // TODO: Change to be more generic ? (sending []byte or interface{})
	DryRun       bool
}

BatchSink ..

func NewBatchSink

func NewBatchSink(targetURL string, client *retryablehttp.Client, bufferSize int, flushTimeout time.Duration,
	formatToBIRs FormatToBIRs, dryRun bool) *BatchSink

NewBatchSink constructor for BatchSink

func (*BatchSink) AddMessageToQueue

func (sink *BatchSink) AddMessageToQueue(message Message)

func (*BatchSink) SendToIngester

func (sink *BatchSink) SendToIngester(ctx context.Context, bir *BulkIngestRequest) error

func (*BatchSink) Start added in v5.1.5

func (sink *BatchSink) Start(ctx context.Context)

func (*BatchSink) Stop added in v5.1.5

func (sink *BatchSink) Stop()

Stop closes sink data channel

type BulkIngestRequest

type BulkIngestRequest struct {
	UUID         string     `json:"uuid"`
	DocumentType string     `json:"documentType"`
	MergeConfig  []Config   `json:"merge"`
	Docs         []Document `json:"docs"`
}

type Config

type Config struct {
	Mode             Mode    `json:"mode"`
	ExistingAsMaster bool    `json:"existingAsMaster"`
	Type             string  `json:"type,omitempty"`
	LinkKey          string  `json:"linkKey,omitempty"`
	Groups           []Group `json:"groups,omitempty"`
}

Config wraps all rules for document merging

func (*Config) Apply

func (config *Config) Apply(newDoc *models.Document, existingDoc *models.Document) *models.Document

Apply returns a pre-build merge function, configured with a specific merge config Merge is done in the following order : FieldMath, FieldReplace, FieldMerge

type ConsumerParams added in v5.1.5

type ConsumerParams struct {
	MaxPermittedPanics int
	Done               *chan os.Signal
}

func NewDefaultConsumerParams added in v5.1.5

func NewDefaultConsumerParams(done chan os.Signal) ConsumerParams

type ConsumerProcessor

type ConsumerProcessor interface {
	Process(message *sarama.ConsumerMessage)
}

type DecodedKafkaMessage

type DecodedKafkaMessage struct {
	Data map[string]interface{}
}

DecodedKafkaMessage holds a json decoded version of the kafka message It can be used to avoid decoding data multiple time (which is very consuming)

func (DecodedKafkaMessage) String

func (msg DecodedKafkaMessage) String() string

type DefaultConsumer

type DefaultConsumer struct {
	Ready chan bool

	ConsumerParams
	// contains filtered or unexported fields
}

DefaultConsumer represents a Sarama consumer group consumer

func NewDefaultConsumer

func NewDefaultConsumer(processor ConsumerProcessor, params ConsumerParams) DefaultConsumer

func (*DefaultConsumer) Cleanup

func (consumer *DefaultConsumer) Cleanup(sarama.ConsumerGroupSession) error

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*DefaultConsumer) ConsumeClaim

func (consumer *DefaultConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*DefaultConsumer) Setup

Setup is run at the beginning of a new session, before ConsumeClaim

type DefaultMultiConsumer

type DefaultMultiConsumer struct {
	Ready chan bool

	ConsumerParams
	// contains filtered or unexported fields
}

DefaultMultiConsumer represents a Sarama consumer group consumer

func NewDefaultMultiConsumer

func NewDefaultMultiConsumer(processors map[string]ConsumerProcessor, params ConsumerParams) DefaultMultiConsumer

func (*DefaultMultiConsumer) Cleanup

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*DefaultMultiConsumer) ConsumeClaim

func (consumer *DefaultMultiConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*DefaultMultiConsumer) Setup

Setup is run at the beginning of a new session, before ConsumeClaim

type Document

type Document struct {
	ID        string                 `json:"id"`
	Index     string                 `json:"index"`
	IndexType string                 `json:"type"`
	Source    map[string]interface{} `json:"source"`
}

Document represent an es document

type FieldMath

type FieldMath struct {
	Expression  string `json:"expression"`
	OutputField string `json:"outputField"`
}

FieldMath specify a merge rule using a math expression

type FilterHeaderOption

type FilterHeaderOption struct {
	Key       string
	Value     string
	Values    []string
	Condition string
}

type FilteredJsonMessage

type FilteredJsonMessage struct {
	Data map[string]interface{}
}

FilteredJsonMessage output once we've filtered the myrtea fields from the kafka messages

func (FilteredJsonMessage) String

func (msg FilteredJsonMessage) String() string

type FormatToBIRs

type FormatToBIRs func([]Message) []*BulkIngestRequest

type Group

type Group struct {
	Condition             string      `json:"condition,omitempty"`
	FieldReplace          []string    `json:"fieldReplace,omitempty"`
	FieldReplaceIfMissing []string    `json:"fieldReplaceIfMissing,omitempty"`
	FieldMerge            []string    `json:"fieldMerge,omitempty"`
	FieldMath             []FieldMath `json:"fieldMath,omitempty"`
	FieldKeepLatest       []string    `json:"fieldKeepLatest,omitempty"`
	FieldKeepEarliest     []string    `json:"fieldKeepEarliest,omitempty"`
	FieldForceUpdate      []string    `json:"fieldForceUpdate,omitempty"`
}

Group allows to group un set of merge fields and to define an optional condition to applay the merge fields

type JSONMapper

type JSONMapper struct {
	// contains filtered or unexported fields
}

JSONMapper :

func NewJSONMapper

func NewJSONMapper(name, path string) (*JSONMapper, error)

NewJSONMapper :

func (JSONMapper) DecodeDocument

func (mapper JSONMapper) DecodeDocument(msg Message) (Message, error)

DecodeDocument not implemented here, it only uses msg

func (JSONMapper) FilterDocument

func (mapper JSONMapper) FilterDocument(msg Message) (bool, string)

func (JSONMapper) MapToDocument

func (mapper JSONMapper) MapToDocument(msg Message) (Message, error)

MapAvroToDocument :

type JSONMapperConfigItem

type JSONMapperConfigItem struct {
	Mandatory    bool
	FieldType    string
	DefaultValue interface{}
	DateFormat   string
	Paths        [][]string
	OtherPaths   [][]string
	ArrayPath    []string
	Separator    string
}

JSONMapperConfigItem :

type JSONMapperFilterItem

type JSONMapperFilterItem struct {
	Keep         bool
	FieldType    string
	DefaultValue string
	Paths        [][]string
	Condition    string
	Value        string
	Values       []string
	Separator    string
}

type JSONMapperJsoniter

type JSONMapperJsoniter struct {
	// contains filtered or unexported fields
}

JSONMapperJsoniter :

func NewJSONMapperJsoniter

func NewJSONMapperJsoniter(name, path string) (*JSONMapperJsoniter, error)

NewJSONMapperJsoniter :

func (JSONMapperJsoniter) DecodeDocument

func (mapper JSONMapperJsoniter) DecodeDocument(msg Message) (Message, error)

DecodeDocument returns a DecodedKafkaMessage and contains a map with json decoded data

func (JSONMapperJsoniter) FilterDocument

func (mapper JSONMapperJsoniter) FilterDocument(msg Message) (bool, string)

FilterDocument checks if document is filtered or not, returns if documents valid and if invalid, the following reason

func (JSONMapperJsoniter) MapToDocument

func (mapper JSONMapperJsoniter) MapToDocument(msg Message) (Message, error)

MapToDocument Maps data to document

type JSONMapperMap added in v5.1.4

type JSONMapperMap struct {
	// contains filtered or unexported fields
}

JSONMapperMap :

func NewJSONMapperMap added in v5.1.4

func NewJSONMapperMap(name, path string) (*JSONMapperMap, error)

NewJSONMapperMap :

func (JSONMapperMap) DecodeDocument added in v5.1.4

func (mapper JSONMapperMap) DecodeDocument(_ Message) (Message, error)

DecodeDocument returns a DecodedKafkaMessage and contains a map with json decoded data

func (JSONMapperMap) FilterDocument added in v5.1.4

func (mapper JSONMapperMap) FilterDocument(msg Message) (bool, string)

FilterDocument checks if document is filtered or not, returns if documents valid and if invalid, the following reason

func (JSONMapperMap) MapToDocument added in v5.1.4

func (mapper JSONMapperMap) MapToDocument(msg Message) (Message, error)

MapToDocument Maps data to document

type KafkaMessage

type KafkaMessage struct {
	Data []byte
}

KafkaMessage ...

func (KafkaMessage) GetData

func (kMessage KafkaMessage) GetData() []byte

GetData Getter for the message data

func (KafkaMessage) String

func (kMessage KafkaMessage) String() string

type Lookup

type Lookup func(path string, value string, index string) ([]string, error)

type Mapper

type Mapper interface {
	// FilterDocument checks if document is filtered or not, returns if documents valid and if invalid,
	// the following reason. It is using the given filters.
	FilterDocument(msg Message) (bool, string)

	// MapToDocument Maps data to document
	MapToDocument(msg Message) (Message, error)

	// DecodeDocument is a function that just decodes a document and returns it
	// You can use it if you want to decode a message only "once" and not in each
	// individual function
	DecodeDocument(msg Message) (Message, error)
}

type Message

type Message interface {
	String() string
}

Message ...

type MessageWithOptions

type MessageWithOptions struct {
	Data    map[string]interface{}
	Options map[string]interface{}
}

MessageWithOptions output once we've filtered the myrtea fields from the kafka messages

func (MessageWithOptions) String

func (msg MessageWithOptions) String() string

type Mode

type Mode int

Mode ...

const (
	// Self ...
	Self Mode = iota + 1
	// EnrichTo ...
	EnrichTo
	// EnrichFrom ...
	EnrichFrom
)

func (Mode) MarshalJSON

func (s Mode) MarshalJSON() ([]byte, error)

MarshalJSON marshals the enum as a quoted json string

func (Mode) String

func (s Mode) String() string

func (*Mode) UnmarshalJSON

func (s *Mode) UnmarshalJSON(b []byte) error

UnmarshalJSON unmashals a quoted json string to the enum value

type Processor

type Processor interface {
	Process(msg Message) ([]Message, error)
}

Processor is a general interface to processor message

type Reloader

type Reloader struct {
	// contains filtered or unexported fields
}

func NewReloader

func NewReloader(action func(string) error, cooldown time.Duration, apiKey string) *Reloader

func (*Reloader) BindEndpoint

func (re *Reloader) BindEndpoint(rg chi.Router)

BindEndpoint Binds the reload endpoint to a existing router

type Restarter

type Restarter struct {
	// contains filtered or unexported fields
}

func NewRestarter

func NewRestarter(doneChan chan os.Signal, apiKey string) *Restarter

func (*Restarter) BindEndpoint

func (re *Restarter) BindEndpoint(rg chi.Router)

BindEndpoint Binds the restart endpoint to an existing router

type SaramaDebugLogger

type SaramaDebugLogger struct {
	// contains filtered or unexported fields
}

SaramaDebugLogger wraps zap.Logger with the sarama.StdLogger interface

func NewSaramaDebugLogger

func NewSaramaDebugLogger(zl *zap.Logger) *SaramaDebugLogger

NewSaramaLogger initializes a new SaramaDebugLogger with a zap.Logger

func (*SaramaDebugLogger) Print

func (s *SaramaDebugLogger) Print(v ...interface{})

func (*SaramaDebugLogger) Printf

func (s *SaramaDebugLogger) Printf(format string, v ...interface{})

func (*SaramaDebugLogger) Println

func (s *SaramaDebugLogger) Println(v ...interface{})

type SaramaLogger

type SaramaLogger struct {
	// contains filtered or unexported fields
}

SaramaLogger wraps zap.Logger with the sarama.StdLogger interface

func NewSaramaLogger

func NewSaramaLogger(zl *zap.Logger) *SaramaLogger

NewSaramaLogger initializes a new SaramaLogger with a zap.Logger

func (*SaramaLogger) Print

func (s *SaramaLogger) Print(v ...interface{})

func (*SaramaLogger) Printf

func (s *SaramaLogger) Printf(format string, v ...interface{})

func (*SaramaLogger) Println

func (s *SaramaLogger) Println(v ...interface{})

type Sink

type Sink interface {
	Start(context.Context)
	Stop()
	AddMessageToQueue(message Message)
}

type SinkManager added in v5.1.5

type SinkManager struct {
	// contains filtered or unexported fields
}

func NewSinkManager added in v5.1.5

func NewSinkManager() *SinkManager

func (*SinkManager) AddSink added in v5.1.5

func (sm *SinkManager) AddSink(sink Sink)

AddSink add a sink to the manager

func (*SinkManager) StartAll added in v5.1.5

func (sm *SinkManager) StartAll(ctx context.Context)

StartAll starts all sinks in separate go-routines

func (*SinkManager) StopAll added in v5.1.5

func (sm *SinkManager) StopAll()

StopAll call all sinks to stop

func (*SinkManager) Wait added in v5.1.5

func (sm *SinkManager) Wait()

Wait all sinks to stop

type Transformer

type Transformer interface {
	Transform(msg Message) (Message, error)
}

Transformer ..

type TypedDataMessage

type TypedDataMessage struct {
	Strings map[string]string
	Ints    map[string]int64
	Floats  map[string]float64
	Bools   map[string]bool
	Times   map[string]time.Time
}

func (TypedDataMessage) String

func (m TypedDataMessage) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL