connector

package
v4.2.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 8, 2023 License: MIT Imports: 20 Imported by: 2

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DoubleUnescapeUnicode

func DoubleUnescapeUnicode(s string) ([]byte, error)

DoubleUnescapeUnicode is a special function to extract avro binaries from a JSON encoded string This function has been built for a very specific usage and may not works on other messages

func FilterHeaders added in v4.2.1

func FilterHeaders(filters []FilterHeaderOption, headers []*sarama.RecordHeader) (bool, string)

Types

type AvroToJSONTransformer

type AvroToJSONTransformer struct {
	// contains filtered or unexported fields
}

AvroToJSONTransformer :

func NewAvroToJSONTransformer

func NewAvroToJSONTransformer(schemaRegistryEndpoint string, ttlCacheDuration time.Duration) (*AvroToJSONTransformer, error)

New transformer constructor TODO : Manage multiple schemaRegistryEndpoint ? In case of server failure ?

func (AvroToJSONTransformer) AvroBinaryToNative

func (transformer AvroToJSONTransformer) AvroBinaryToNative(avroBinary []byte) (interface{}, error)

AvroBinaryToNative :

func (AvroToJSONTransformer) AvroBinaryToTextual

func (transformer AvroToJSONTransformer) AvroBinaryToTextual(avroBinary []byte) ([]byte, error)

AvroBinaryToTextual :

func (AvroToJSONTransformer) Transform

func (transformer AvroToJSONTransformer) Transform(msg Message) (Message, error)

Transform is the convertor transformer, it has to decode the AVRO message into a byte message (JSONMessage)

type BatchSink

type BatchSink struct {
	TargetURL    string
	Send         chan Message
	Client       *retryablehttp.Client
	BufferSize   int
	FlushTimeout time.Duration
	FormatToBIRs FormatToBIRs // TODO: Change to be more generic ? (sending []byte or interface{})
	DryRun       bool
}

BatchSink ..

func NewBatchSink

func NewBatchSink(targetURL string, client *retryablehttp.Client, bufferSize int, flushTimeout time.Duration,
	formatToBIRs FormatToBIRs, dryRun bool) *BatchSink

NewBatchSink constructor for BatchSink

func (*BatchSink) AddMessageToQueue

func (sink *BatchSink) AddMessageToQueue(message Message)

func (*BatchSink) SendToIngester

func (sink *BatchSink) SendToIngester(ctx context.Context, bir *BulkIngestRequest) error

func (*BatchSink) Sender

func (sink *BatchSink) Sender(ctx context.Context)

type BulkIngestRequest

type BulkIngestRequest struct {
	UUID         string     `json:"uuid"`
	DocumentType string     `json:"documentType"`
	MergeConfig  []Config   `json:"merge"`
	Docs         []Document `json:"docs"`
}

type Config

type Config struct {
	Mode             Mode    `json:"mode"`
	ExistingAsMaster bool    `json:"existingAsMaster"`
	Type             string  `json:"type,omitempty"`
	LinkKey          string  `json:"linkKey,omitempty"`
	Groups           []Group `json:"groups,omitempty"`
}

Config wraps all rules for document merging

type ConsumerProcessor added in v4.2.0

type ConsumerProcessor interface {
	Process(message *sarama.ConsumerMessage)
}

type DefaultConsumer added in v4.2.0

type DefaultConsumer struct {
	Ready chan bool
	// contains filtered or unexported fields
}

DefaultConsumer represents a Sarama consumer group consumer

func NewDefaultConsumer added in v4.2.0

func NewDefaultConsumer(processor ConsumerProcessor) DefaultConsumer

func (*DefaultConsumer) Cleanup added in v4.2.0

func (consumer *DefaultConsumer) Cleanup(sarama.ConsumerGroupSession) error

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*DefaultConsumer) ConsumeClaim added in v4.2.0

func (consumer *DefaultConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*DefaultConsumer) Setup added in v4.2.0

Setup is run at the beginning of a new session, before ConsumeClaim

type DefaultMultiConsumer added in v4.2.0

type DefaultMultiConsumer struct {
	Ready chan bool
	// contains filtered or unexported fields
}

DefaultMultiConsumer represents a Sarama consumer group consumer

func NewDefaultMultiConsumer added in v4.2.0

func NewDefaultMultiConsumer(processors map[string]ConsumerProcessor) DefaultMultiConsumer

func (*DefaultMultiConsumer) Cleanup added in v4.2.0

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*DefaultMultiConsumer) ConsumeClaim added in v4.2.0

func (consumer *DefaultMultiConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*DefaultMultiConsumer) Setup added in v4.2.0

Setup is run at the beginning of a new session, before ConsumeClaim

type Document

type Document struct {
	ID        string      `json:"id"`
	Index     string      `json:"index"`
	IndexType string      `json:"type"`
	Source    interface{} `json:"source"`
}

Document represent an es document

type FieldMath

type FieldMath struct {
	Expression  string `json:"expression"`
	OutputField string `json:"outputField"`
}

FieldMath specify a merge rule using a math expression

type FilterHeaderOption added in v4.2.1

type FilterHeaderOption struct {
	Key       string
	Value     string
	Values    []string
	Condition string
}

type FilteredJsonMessage

type FilteredJsonMessage struct {
	Data map[string]interface{}
}

FilteredJsonMessage output once we've filtered the myrtea fields from the kafka messages

func (FilteredJsonMessage) String

func (msg FilteredJsonMessage) String() string

type FormatToBIRs added in v4.1.6

type FormatToBIRs func([]Message) []*BulkIngestRequest

type Group

type Group struct {
	Condition             string      `json:"condition,omitempty"`
	FieldReplace          []string    `json:"fieldReplace,omitempty"`
	FieldReplaceIfMissing []string    `json:"fieldReplaceIfMissing,omitempty"`
	FieldMerge            []string    `json:"fieldMerge,omitempty"`
	FieldMath             []FieldMath `json:"fieldMath,omitempty"`
	FieldKeepLatest       []string    `json:"fieldKeepLatest,omitempty"`
	FieldKeepEarliest     []string    `json:"fieldKeepEarliest,omitempty"`
	FieldForceUpdate      []string    `json:"fieldForceUpdate,omitempty"`
}

Group allows to group un set of merge fields and to define an optional condition to applay the merge fields

type JSONMapper

type JSONMapper struct {
	// contains filtered or unexported fields
}

JSONMapper :

func NewJSONMapper

func NewJSONMapper(name, path string) (*JSONMapper, error)

NewJSONMapper :

func (JSONMapper) FilterDocument

func (mapper JSONMapper) FilterDocument(msg Message) (bool, string)

func (JSONMapper) MapToDocument

func (mapper JSONMapper) MapToDocument(msg Message) (FilteredJsonMessage, error)

MapAvroToDocument :

type JSONMapperConfigItem

type JSONMapperConfigItem struct {
	Mandatory    bool
	FieldType    string
	DefaultValue interface{}
	DateFormat   string
	Paths        [][]string
	OtherPaths   [][]string
	ArrayPath    []string
	Separator    string
}

JSONMapperConfigItem :

type JSONMapperFilterItem

type JSONMapperFilterItem struct {
	Keep         bool
	FieldType    string
	DefaultValue string
	Paths        [][]string
	Condition    string
	Value        string
	Values       []string
	Separator    string
}

type JSONMapperJsoniter added in v4.2.4

type JSONMapperJsoniter struct {
	// contains filtered or unexported fields
}

JSONMapperJsoniter :

func NewJSONMapperJsoniter added in v4.2.4

func NewJSONMapperJsoniter(name, path string) (*JSONMapperJsoniter, error)

NewJSONMapperJsoniter :

func (JSONMapperJsoniter) FilterDocument added in v4.2.4

func (mapper JSONMapperJsoniter) FilterDocument(msg Message) (bool, string)

func (JSONMapperJsoniter) MapToDocument added in v4.2.4

func (mapper JSONMapperJsoniter) MapToDocument(msg Message) (Message, error)

MapAvroToDocument :

type KafkaMessage

type KafkaMessage struct {
	Data []byte
}

KafkaMessage ...

func (KafkaMessage) GetData

func (kMessage KafkaMessage) GetData() []byte

GetData Getter for the message data

func (KafkaMessage) String

func (kMessage KafkaMessage) String() string

type Lookup

type Lookup func(path string, value string, index string) ([]string, error)

type Mapper

type Mapper interface {
	FilterDocument(msg Message) (bool, string)
	MapToDocument(msg Message) (Message, error)
}

type Message

type Message interface {
	String() string
}

Message ...

type MessageWithOptions

type MessageWithOptions struct {
	Data    map[string]interface{}
	Options map[string]interface{}
}

MessageWithOptions output once we've filtered the myrtea fields from the kafka messages

func (MessageWithOptions) String

func (msg MessageWithOptions) String() string

type Mode

type Mode int

Mode ...

const (
	// Self ...
	Self Mode = iota + 1
	// EnrichTo ...
	EnrichTo
	// EnrichFrom ...
	EnrichFrom
)

func (Mode) MarshalJSON

func (s Mode) MarshalJSON() ([]byte, error)

MarshalJSON marshals the enum as a quoted json string

func (Mode) String

func (s Mode) String() string

func (*Mode) UnmarshalJSON

func (s *Mode) UnmarshalJSON(b []byte) error

UnmarshalJSON unmashals a quoted json string to the enum value

type Processor

type Processor interface {
	Process(msg Message) ([]Message, error)
}

Processor is a general interface to processor message

type SaramaDebugLogger added in v4.2.0

type SaramaDebugLogger struct {
	// contains filtered or unexported fields
}

SaramaDebugLogger wraps zap.Logger with the sarama.StdLogger interface

func NewSaramaDebugLogger added in v4.2.0

func NewSaramaDebugLogger(zl *zap.Logger) *SaramaDebugLogger

NewSaramaLogger initializes a new SaramaDebugLogger with a zap.Logger

func (*SaramaDebugLogger) Print added in v4.2.0

func (s *SaramaDebugLogger) Print(v ...interface{})

func (*SaramaDebugLogger) Printf added in v4.2.0

func (s *SaramaDebugLogger) Printf(format string, v ...interface{})

func (*SaramaDebugLogger) Println added in v4.2.0

func (s *SaramaDebugLogger) Println(v ...interface{})

type SaramaLogger added in v4.2.0

type SaramaLogger struct {
	// contains filtered or unexported fields
}

SaramaLogger wraps zap.Logger with the sarama.StdLogger interface

func NewSaramaLogger added in v4.2.0

func NewSaramaLogger(zl *zap.Logger) *SaramaLogger

NewSaramaLogger initializes a new SaramaLogger with a zap.Logger

func (*SaramaLogger) Print added in v4.2.0

func (s *SaramaLogger) Print(v ...interface{})

func (*SaramaLogger) Printf added in v4.2.0

func (s *SaramaLogger) Printf(format string, v ...interface{})

func (*SaramaLogger) Println added in v4.2.0

func (s *SaramaLogger) Println(v ...interface{})

type Sink

type Sink interface {
	AddMessageToQueue(message Message)
}

type Transformer

type Transformer interface {
	Transform(Message) (Message, error)
}

Transformer ..

type TypedDataMessage added in v4.2.4

type TypedDataMessage struct {
	Strings map[string]string
	Ints    map[string]int64
	Bools   map[string]bool
	Times   map[string]time.Time
}

func (TypedDataMessage) String added in v4.2.4

func (m TypedDataMessage) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL