connector

package
v4.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 7, 2022 License: MIT Imports: 19 Imported by: 2

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DoubleUnescapeUnicode

func DoubleUnescapeUnicode(s string) ([]byte, error)

DoubleUnescapeUnicode is a special function to extract avro binaries from a JSON encoded string This function has been built for a very specific usage and may not works on other messages

Types

type AvroToJSONTransformer

type AvroToJSONTransformer struct {
	// contains filtered or unexported fields
}

AvroToJSONTransformer :

func NewAvroToJSONTransformer

func NewAvroToJSONTransformer(schemaRegistryEndpoint string, ttlCacheDuration time.Duration) (*AvroToJSONTransformer, error)

New transformer constructor TODO : Manage multiple schemaRegistryEndpoint ? In case of server failure ?

func (AvroToJSONTransformer) AvroBinaryToNative

func (transformer AvroToJSONTransformer) AvroBinaryToNative(avroBinary []byte) (interface{}, error)

AvroBinaryToNative :

func (AvroToJSONTransformer) AvroBinaryToTextual

func (transformer AvroToJSONTransformer) AvroBinaryToTextual(avroBinary []byte) ([]byte, error)

AvroBinaryToTextual :

func (AvroToJSONTransformer) Transform

func (transformer AvroToJSONTransformer) Transform(msg Message) (Message, error)

Transform is the convertor transformer, it has to decode the AVRO message into a byte message (JSONMessage)

type BatchSink

type BatchSink struct {
	TargetURL    string
	Send         chan Message
	Client       *retryablehttp.Client
	BufferSize   int
	FlushTimeout time.Duration
	FormatToBIR  func([]FilteredJsonMessage) *BulkIngestRequest // TODO: Change to be more generic ? (sending []byte or interface{})
	DryRun       bool
}

BatchSink ..

func NewBatchSink

func NewBatchSink(targetURL string, client *retryablehttp.Client, bufferSize int, flushTimeout time.Duration,
	formatToBIR func([]FilteredJsonMessage) *BulkIngestRequest, dryRun bool) *BatchSink

NewBatchSink constructor for BatchSink

func (*BatchSink) AddMessageToQueue

func (sink *BatchSink) AddMessageToQueue(message Message)

func (*BatchSink) SendToIngester

func (sink *BatchSink) SendToIngester(bir *BulkIngestRequest) error

func (*BatchSink) Sender

func (sink *BatchSink) Sender()

type BulkIngestRequest

type BulkIngestRequest struct {
	UUID         string     `json:"uuid"`
	DocumentType string     `json:"documentType"`
	MergeConfig  []Config   `json:"merge"`
	Docs         []Document `json:"docs"`
}

type Config

type Config struct {
	Mode             Mode    `json:"mode"`
	ExistingAsMaster bool    `json:"existingAsMaster"`
	Type             string  `json:"type,omitempty"`
	LinkKey          string  `json:"linkKey,omitempty"`
	Groups           []Group `json:"groups,omitempty"`
}

Config wraps all rules for document merging

type Document

type Document struct {
	ID        string      `json:"id"`
	Index     string      `json:"index"`
	IndexType string      `json:"type"`
	Source    interface{} `json:"source"`
}

Document represent an es document

type FieldMath

type FieldMath struct {
	Expression  string `json:"expression"`
	OutputField string `json:"outputField"`
}

FieldMath specify a merge rule using a math expression

type FilteredJsonMessage

type FilteredJsonMessage struct {
	Data map[string]interface{}
}

FilteredJsonMessage output once we've filtered the myrtea fields from the kafka messages

func (FilteredJsonMessage) String

func (msg FilteredJsonMessage) String() string

type Group

type Group struct {
	Condition             string      `json:"condition,omitempty"`
	FieldReplace          []string    `json:"fieldReplace,omitempty"`
	FieldReplaceIfMissing []string    `json:"fieldReplaceIfMissing,omitempty"`
	FieldMerge            []string    `json:"fieldMerge,omitempty"`
	FieldMath             []FieldMath `json:"fieldMath,omitempty"`
	FieldKeepLatest       []string    `json:"fieldKeepLatest,omitempty"`
	FieldKeepEarliest     []string    `json:"fieldKeepEarliest,omitempty"`
}

Group allows to group un set of merge fields and to define an optional condition to applay the merge fields

type JSONMapper

type JSONMapper struct {
	// contains filtered or unexported fields
}

JSONMapper :

func NewJSONMapper

func NewJSONMapper(name, path string) (*JSONMapper, error)

NewJSONMapper :

func (JSONMapper) FilterDocument

func (mapper JSONMapper) FilterDocument(msg Message) (bool, string)

func (JSONMapper) MapToDocument

func (mapper JSONMapper) MapToDocument(msg Message) (FilteredJsonMessage, error)

MapAvroToDocument :

type JSONMapperConfigItem

type JSONMapperConfigItem struct {
	Mandatory    bool
	FieldType    string
	DefaultValue interface{}
	DateFormat   string
	Paths        [][]string
	OtherPaths   [][]string
	ArrayPath    []string
	Separator    string
}

JSONMapperConfigItem :

type JSONMapperFilterItem

type JSONMapperFilterItem struct {
	Keep         bool
	FieldType    string
	DefaultValue string
	Paths        [][]string
	Condition    string
	Value        string
	Values       []string
	Separator    string
}

type KafkaMessage

type KafkaMessage struct {
	Data []byte
}

KafkaMessage ...

func (KafkaMessage) GetData

func (kMessage KafkaMessage) GetData() []byte

GetData Getter for the message data

func (KafkaMessage) String

func (kMessage KafkaMessage) String() string

type KafkaSource

type KafkaSource struct {
	Topic     string
	Consumer  *cluster.Consumer
	OutChanel chan Message
	Signals   chan os.Signal
}

func NewKafkaSource

func NewKafkaSource(topic string, consumer *cluster.Consumer, out chan Message, signals chan os.Signal) *KafkaSource

func (*KafkaSource) Close

func (src *KafkaSource) Close() error

func (*KafkaSource) Init

func (src *KafkaSource) Init() error

func (*KafkaSource) Run

func (src *KafkaSource) Run() error

type Lookup

type Lookup func(path string, value string, index string) ([]string, error)

type Mapper

type Mapper interface {
	FilterDocument(msg Message) (bool, string)
	MapToDocument(msg Message) (FilteredJsonMessage, error)
}

type Message

type Message interface {
	String() string
}

Message ...

type MessageWithOptions

type MessageWithOptions struct {
	Data    map[string]interface{}
	Options map[string]interface{}
}

MessageWithOptions output once we've filtered the myrtea fields from the kafka messages

func (MessageWithOptions) String

func (msg MessageWithOptions) String() string

type Mode

type Mode int

Mode ...

const (
	// Self ...
	Self Mode = iota + 1
	// EnrichTo ...
	EnrichTo
	// EnrichFrom ...
	EnrichFrom
)

func (Mode) MarshalJSON

func (s Mode) MarshalJSON() ([]byte, error)

MarshalJSON marshals the enum as a quoted json string

func (Mode) String

func (s Mode) String() string

func (*Mode) UnmarshalJSON

func (s *Mode) UnmarshalJSON(b []byte) error

UnmarshalJSON unmashals a quoted json string to the enum value

type Processor

type Processor interface {
	Process(msg Message) ([]Message, error)
}

Processor is a general interface to processor message

type Sink

type Sink interface {
	AddMessageToQueue(message Message)
}

type Transformer

type Transformer interface {
	Transform(Message) (Message, error)
}

Transformer ..

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL