connector

package
v5.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 11, 2024 License: MIT Imports: 24 Imported by: 4

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ReloaderComponentNotFoundErr = errors.New("component not found")

Functions

func ApplyFieldForceUpdate

func ApplyFieldForceUpdate(fieldUpdate []string, enricherSource map[string]interface{}, outputSource map[string]interface{})

ApplyFieldForceUpdate applies FieldForceUpdate merging configuration on input documents

func ApplyFieldKeepEarliest

func ApplyFieldKeepEarliest(fieldKeepEarliest []string, enricherSource map[string]interface{}, outputSource map[string]interface{})

ApplyFieldKeepEarliest applies all FieldKeepEarliest merging configuration on input documents

func ApplyFieldKeepLatest

func ApplyFieldKeepLatest(fieldKeepLatest []string, enricherSource map[string]interface{}, outputSource map[string]interface{})

ApplyFieldKeepLatest applies all FieldKeepLatest merging configuration on input documents

func ApplyFieldMath

func ApplyFieldMath(config []FieldMath, newDoc *models.Document, existingDoc *models.Document, outputSource map[string]interface{})

ApplyFieldMath applies all FieldMath merging configuration on input documents

func ApplyFieldMerge

func ApplyFieldMerge(fieldMerge []string, enricherSource map[string]interface{}, outputSource map[string]interface{})

ApplyFieldMerge applies all FieldReplace merging configuration on input documents

func ApplyFieldReplace

func ApplyFieldReplace(fieldReplace []string, enricherSource map[string]interface{}, outputSource map[string]interface{})

ApplyFieldReplace applies all FieldReplace merging configuration on input documents

func ApplyFieldReplaceIfMissing

func ApplyFieldReplaceIfMissing(fieldReplace []string, enricherSource map[string]interface{}, outputSource map[string]interface{})

ApplyFieldReplaceIfMissing applies all FieldReplace merging configuration on input documents

func DoubleUnescapeUnicode

func DoubleUnescapeUnicode(s string) ([]byte, error)

DoubleUnescapeUnicode is a special function to extract avro binaries from a JSON encoded string This function has been built for a very specific usage and may not works on other messages

func FilterHeaders

func FilterHeaders(filters []FilterHeaderOption, headers []*sarama.RecordHeader) (bool, string)

func LookupNestedMap

func LookupNestedMap(pathParts []string, data interface{}) (interface{}, bool)

LookupNestedMap lookup for a value corresponding to the exact specified path inside a map

func LookupNestedMapFullPaths

func LookupNestedMapFullPaths(data interface{}, paths [][]string, separator string) (interface{}, bool)

LookupNestedMapFullPaths Looks searches value of all paths in data and concatenates them with a separator

Types

type AvroToJSONTransformer

type AvroToJSONTransformer struct {
	// contains filtered or unexported fields
}

AvroToJSONTransformer :

func NewAvroToJSONTransformer

func NewAvroToJSONTransformer(schemaRegistryEndpoint string, ttlCacheDuration time.Duration) (*AvroToJSONTransformer, error)

New transformer constructor TODO : Manage multiple schemaRegistryEndpoint ? In case of server failure ?

func (AvroToJSONTransformer) AvroBinaryToNative

func (transformer AvroToJSONTransformer) AvroBinaryToNative(avroBinary []byte) (interface{}, error)

AvroBinaryToNative :

func (AvroToJSONTransformer) AvroBinaryToTextual

func (transformer AvroToJSONTransformer) AvroBinaryToTextual(avroBinary []byte) ([]byte, error)

AvroBinaryToTextual :

func (AvroToJSONTransformer) Transform

func (transformer AvroToJSONTransformer) Transform(msg Message) (Message, error)

Transform is the convertor transformer, it has to decode the AVRO message into a byte message (JSONMessage)

type BatchSink

type BatchSink struct {
	TargetURL    string
	Send         chan Message
	Client       *retryablehttp.Client
	BufferSize   int
	FlushTimeout time.Duration
	FormatToBIRs FormatToBIRs // TODO: Change to be more generic ? (sending []byte or interface{})
	DryRun       bool
}

BatchSink ..

func NewBatchSink

func NewBatchSink(targetURL string, client *retryablehttp.Client, bufferSize int, flushTimeout time.Duration,
	formatToBIRs FormatToBIRs, dryRun bool) *BatchSink

NewBatchSink constructor for BatchSink

func (*BatchSink) AddMessageToQueue

func (sink *BatchSink) AddMessageToQueue(message Message)

func (*BatchSink) SendToIngester

func (sink *BatchSink) SendToIngester(ctx context.Context, bir *BulkIngestRequest) error

func (*BatchSink) Sender

func (sink *BatchSink) Sender(ctx context.Context)

type BulkIngestRequest

type BulkIngestRequest struct {
	UUID         string     `json:"uuid"`
	DocumentType string     `json:"documentType"`
	MergeConfig  []Config   `json:"merge"`
	Docs         []Document `json:"docs"`
}

type Config

type Config struct {
	Mode             Mode    `json:"mode"`
	ExistingAsMaster bool    `json:"existingAsMaster"`
	Type             string  `json:"type,omitempty"`
	LinkKey          string  `json:"linkKey,omitempty"`
	Groups           []Group `json:"groups,omitempty"`
}

Config wraps all rules for document merging

func (*Config) Apply

func (config *Config) Apply(newDoc *models.Document, existingDoc *models.Document) *models.Document

Apply returns a pre-build merge function, configured with a specific merge config Merge is done in the following order : FieldMath, FieldReplace, FieldMerge

type ConsumerProcessor

type ConsumerProcessor interface {
	Process(message *sarama.ConsumerMessage)
}

type DecodedKafkaMessage

type DecodedKafkaMessage struct {
	Data map[string]interface{}
}

DecodedKafkaMessage holds a json decoded version of the kafka message It can be used to avoid decoding data multiple time (which is very consuming)

func (DecodedKafkaMessage) String

func (msg DecodedKafkaMessage) String() string

type DefaultConsumer

type DefaultConsumer struct {
	Ready chan bool
	// contains filtered or unexported fields
}

DefaultConsumer represents a Sarama consumer group consumer

func NewDefaultConsumer

func NewDefaultConsumer(processor ConsumerProcessor) DefaultConsumer

func (*DefaultConsumer) Cleanup

func (consumer *DefaultConsumer) Cleanup(sarama.ConsumerGroupSession) error

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*DefaultConsumer) ConsumeClaim

func (consumer *DefaultConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*DefaultConsumer) Setup

Setup is run at the beginning of a new session, before ConsumeClaim

type DefaultMultiConsumer

type DefaultMultiConsumer struct {
	Ready chan bool
	// contains filtered or unexported fields
}

DefaultMultiConsumer represents a Sarama consumer group consumer

func NewDefaultMultiConsumer

func NewDefaultMultiConsumer(processors map[string]ConsumerProcessor) DefaultMultiConsumer

func (*DefaultMultiConsumer) Cleanup

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*DefaultMultiConsumer) ConsumeClaim

func (consumer *DefaultMultiConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*DefaultMultiConsumer) Setup

Setup is run at the beginning of a new session, before ConsumeClaim

type Document

type Document struct {
	ID        string                 `json:"id"`
	Index     string                 `json:"index"`
	IndexType string                 `json:"type"`
	Source    map[string]interface{} `json:"source"`
}

Document represent an es document

type FieldMath

type FieldMath struct {
	Expression  string `json:"expression"`
	OutputField string `json:"outputField"`
}

FieldMath specify a merge rule using a math expression

type FilterHeaderOption

type FilterHeaderOption struct {
	Key       string
	Value     string
	Values    []string
	Condition string
}

type FilteredJsonMessage

type FilteredJsonMessage struct {
	Data map[string]interface{}
}

FilteredJsonMessage output once we've filtered the myrtea fields from the kafka messages

func (FilteredJsonMessage) String

func (msg FilteredJsonMessage) String() string

type FormatToBIRs

type FormatToBIRs func([]Message) []*BulkIngestRequest

type Group

type Group struct {
	Condition             string      `json:"condition,omitempty"`
	FieldReplace          []string    `json:"fieldReplace,omitempty"`
	FieldReplaceIfMissing []string    `json:"fieldReplaceIfMissing,omitempty"`
	FieldMerge            []string    `json:"fieldMerge,omitempty"`
	FieldMath             []FieldMath `json:"fieldMath,omitempty"`
	FieldKeepLatest       []string    `json:"fieldKeepLatest,omitempty"`
	FieldKeepEarliest     []string    `json:"fieldKeepEarliest,omitempty"`
	FieldForceUpdate      []string    `json:"fieldForceUpdate,omitempty"`
}

Group allows to group un set of merge fields and to define an optional condition to applay the merge fields

type JSONMapper

type JSONMapper struct {
	// contains filtered or unexported fields
}

JSONMapper :

func NewJSONMapper

func NewJSONMapper(name, path string) (*JSONMapper, error)

NewJSONMapper :

func (JSONMapper) DecodeDocument

func (mapper JSONMapper) DecodeDocument(msg Message) (Message, error)

DecodeDocument not implemented here, it only uses msg

func (JSONMapper) FilterDocument

func (mapper JSONMapper) FilterDocument(msg Message) (bool, string)

func (JSONMapper) MapToDocument

func (mapper JSONMapper) MapToDocument(msg Message) (Message, error)

MapAvroToDocument :

type JSONMapperConfigItem

type JSONMapperConfigItem struct {
	Mandatory    bool
	FieldType    string
	DefaultValue interface{}
	DateFormat   string
	Paths        [][]string
	OtherPaths   [][]string
	ArrayPath    []string
	Separator    string
}

JSONMapperConfigItem :

type JSONMapperFilterItem

type JSONMapperFilterItem struct {
	Keep         bool
	FieldType    string
	DefaultValue string
	Paths        [][]string
	Condition    string
	Value        string
	Values       []string
	Separator    string
}

type JSONMapperJsoniter

type JSONMapperJsoniter struct {
	// contains filtered or unexported fields
}

JSONMapperJsoniter :

func NewJSONMapperJsoniter

func NewJSONMapperJsoniter(name, path string) (*JSONMapperJsoniter, error)

NewJSONMapperJsoniter :

func (JSONMapperJsoniter) DecodeDocument

func (mapper JSONMapperJsoniter) DecodeDocument(msg Message) (Message, error)

DecodeDocument returns a DecodedKafkaMessage and contains a map with json decoded data

func (JSONMapperJsoniter) FilterDocument

func (mapper JSONMapperJsoniter) FilterDocument(msg Message) (bool, string)

FilterDocument checks if document is filtered or not, returns if documents valid and if invalid, the following reason

func (JSONMapperJsoniter) MapToDocument

func (mapper JSONMapperJsoniter) MapToDocument(msg Message) (Message, error)

MapToDocument Maps data to document

type KafkaMessage

type KafkaMessage struct {
	Data []byte
}

KafkaMessage ...

func (KafkaMessage) GetData

func (kMessage KafkaMessage) GetData() []byte

GetData Getter for the message data

func (KafkaMessage) String

func (kMessage KafkaMessage) String() string

type Lookup

type Lookup func(path string, value string, index string) ([]string, error)

type Mapper

type Mapper interface {
	// FilterDocument checks if document is filtered or not, returns if documents valid and if invalid,
	// the following reason. It is using the given filters.
	FilterDocument(msg Message) (bool, string)

	// MapToDocument Maps data to document
	MapToDocument(msg Message) (Message, error)

	// DecodeDocument is a function that just decodes a document and returns it
	// You can use it if you want to decode a message only "once" and not in each
	// individual function
	DecodeDocument(msg Message) (Message, error)
}

type Message

type Message interface {
	String() string
}

Message ...

type MessageWithOptions

type MessageWithOptions struct {
	Data    map[string]interface{}
	Options map[string]interface{}
}

MessageWithOptions output once we've filtered the myrtea fields from the kafka messages

func (MessageWithOptions) String

func (msg MessageWithOptions) String() string

type Mode

type Mode int

Mode ...

const (
	// Self ...
	Self Mode = iota + 1
	// EnrichTo ...
	EnrichTo
	// EnrichFrom ...
	EnrichFrom
)

func (Mode) MarshalJSON

func (s Mode) MarshalJSON() ([]byte, error)

MarshalJSON marshals the enum as a quoted json string

func (Mode) String

func (s Mode) String() string

func (*Mode) UnmarshalJSON

func (s *Mode) UnmarshalJSON(b []byte) error

UnmarshalJSON unmashals a quoted json string to the enum value

type Processor

type Processor interface {
	Process(msg Message) ([]Message, error)
}

Processor is a general interface to processor message

type Reloader

type Reloader struct {
	// contains filtered or unexported fields
}

func NewReloader

func NewReloader(action func(string) error, cooldown time.Duration, apiKey string) *Reloader

func (*Reloader) BindEndpoint

func (re *Reloader) BindEndpoint(rg chi.Router)

BindEndpoint Binds the reload endpoint to a existing router

type Restarter

type Restarter struct {
	// contains filtered or unexported fields
}

func NewRestarter

func NewRestarter(doneChan chan os.Signal, apiKey string) *Restarter

func (*Restarter) BindEndpoint

func (re *Restarter) BindEndpoint(rg chi.Router)

BindEndpoint Binds the restart endpoint to an existing router

type SaramaDebugLogger

type SaramaDebugLogger struct {
	// contains filtered or unexported fields
}

SaramaDebugLogger wraps zap.Logger with the sarama.StdLogger interface

func NewSaramaDebugLogger

func NewSaramaDebugLogger(zl *zap.Logger) *SaramaDebugLogger

NewSaramaLogger initializes a new SaramaDebugLogger with a zap.Logger

func (*SaramaDebugLogger) Print

func (s *SaramaDebugLogger) Print(v ...interface{})

func (*SaramaDebugLogger) Printf

func (s *SaramaDebugLogger) Printf(format string, v ...interface{})

func (*SaramaDebugLogger) Println

func (s *SaramaDebugLogger) Println(v ...interface{})

type SaramaLogger

type SaramaLogger struct {
	// contains filtered or unexported fields
}

SaramaLogger wraps zap.Logger with the sarama.StdLogger interface

func NewSaramaLogger

func NewSaramaLogger(zl *zap.Logger) *SaramaLogger

NewSaramaLogger initializes a new SaramaLogger with a zap.Logger

func (*SaramaLogger) Print

func (s *SaramaLogger) Print(v ...interface{})

func (*SaramaLogger) Printf

func (s *SaramaLogger) Printf(format string, v ...interface{})

func (*SaramaLogger) Println

func (s *SaramaLogger) Println(v ...interface{})

type Sink

type Sink interface {
	AddMessageToQueue(message Message)
}

type Transformer

type Transformer interface {
	Transform(Message) (Message, error)
}

Transformer ..

type TypedDataMessage

type TypedDataMessage struct {
	Strings map[string]string
	Ints    map[string]int64
	Bools   map[string]bool
	Times   map[string]time.Time
}

func (TypedDataMessage) String

func (m TypedDataMessage) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL