Documentation ¶
Index ¶
- Variables
- func ApplyFieldForceUpdate(fieldUpdate []string, enricherSource map[string]interface{}, ...)
- func ApplyFieldKeepEarliest(fieldKeepEarliest []string, enricherSource map[string]interface{}, ...)
- func ApplyFieldKeepLatest(fieldKeepLatest []string, enricherSource map[string]interface{}, ...)
- func ApplyFieldMath(config []FieldMath, newDoc *models.Document, existingDoc *models.Document, ...)
- func ApplyFieldMerge(fieldMerge []string, enricherSource map[string]interface{}, ...)
- func ApplyFieldReplace(fieldReplace []string, enricherSource map[string]interface{}, ...)
- func ApplyFieldReplaceIfMissing(fieldReplace []string, enricherSource map[string]interface{}, ...)
- func DoubleUnescapeUnicode(s string) ([]byte, error)
- func FilterHeaders(filters []FilterHeaderOption, headers []*sarama.RecordHeader) (bool, string)
- func LookupNestedMap(pathParts []string, data interface{}) (interface{}, bool)
- func LookupNestedMapFullPaths(data interface{}, paths [][]string, separator string) (interface{}, bool)
- type AvroToJSONTransformer
- type BatchSink
- type BulkIngestRequest
- type Config
- type ConsumerProcessor
- type DecodedKafkaMessage
- type DefaultConsumer
- type DefaultMultiConsumer
- type Document
- type FieldMath
- type FilterHeaderOption
- type FilteredJsonMessage
- type FormatToBIRs
- type Group
- type JSONMapper
- type JSONMapperConfigItem
- type JSONMapperFilterItem
- type JSONMapperJsoniter
- type KafkaMessage
- type Lookup
- type Mapper
- type Message
- type MessageWithOptions
- type Mode
- type Processor
- type Reloader
- type Restarter
- type SaramaDebugLogger
- type SaramaLogger
- type Sink
- type Transformer
- type TypedDataMessage
Constants ¶
This section is empty.
Variables ¶
var ReloaderComponentNotFoundErr = errors.New("component not found")
Functions ¶
func ApplyFieldForceUpdate ¶
func ApplyFieldForceUpdate(fieldUpdate []string, enricherSource map[string]interface{}, outputSource map[string]interface{})
ApplyFieldForceUpdate applies FieldForceUpdate merging configuration on input documents
func ApplyFieldKeepEarliest ¶
func ApplyFieldKeepEarliest(fieldKeepEarliest []string, enricherSource map[string]interface{}, outputSource map[string]interface{})
ApplyFieldKeepEarliest applies all FieldKeepEarliest merging configuration on input documents
func ApplyFieldKeepLatest ¶
func ApplyFieldKeepLatest(fieldKeepLatest []string, enricherSource map[string]interface{}, outputSource map[string]interface{})
ApplyFieldKeepLatest applies all FieldKeepLatest merging configuration on input documents
func ApplyFieldMath ¶
func ApplyFieldMath(config []FieldMath, newDoc *models.Document, existingDoc *models.Document, outputSource map[string]interface{})
ApplyFieldMath applies all FieldMath merging configuration on input documents
func ApplyFieldMerge ¶
func ApplyFieldMerge(fieldMerge []string, enricherSource map[string]interface{}, outputSource map[string]interface{})
ApplyFieldMerge applies all FieldReplace merging configuration on input documents
func ApplyFieldReplace ¶
func ApplyFieldReplace(fieldReplace []string, enricherSource map[string]interface{}, outputSource map[string]interface{})
ApplyFieldReplace applies all FieldReplace merging configuration on input documents
func ApplyFieldReplaceIfMissing ¶
func ApplyFieldReplaceIfMissing(fieldReplace []string, enricherSource map[string]interface{}, outputSource map[string]interface{})
ApplyFieldReplaceIfMissing applies all FieldReplace merging configuration on input documents
func DoubleUnescapeUnicode ¶
DoubleUnescapeUnicode is a special function to extract avro binaries from a JSON encoded string This function has been built for a very specific usage and may not works on other messages
func FilterHeaders ¶
func FilterHeaders(filters []FilterHeaderOption, headers []*sarama.RecordHeader) (bool, string)
func LookupNestedMap ¶
LookupNestedMap lookup for a value corresponding to the exact specified path inside a map
func LookupNestedMapFullPaths ¶
func LookupNestedMapFullPaths(data interface{}, paths [][]string, separator string) (interface{}, bool)
LookupNestedMapFullPaths Looks searches value of all paths in data and concatenates them with a separator
Types ¶
type AvroToJSONTransformer ¶
type AvroToJSONTransformer struct {
// contains filtered or unexported fields
}
AvroToJSONTransformer :
func NewAvroToJSONTransformer ¶
func NewAvroToJSONTransformer(schemaRegistryEndpoint string, ttlCacheDuration time.Duration) (*AvroToJSONTransformer, error)
New transformer constructor TODO : Manage multiple schemaRegistryEndpoint ? In case of server failure ?
func (AvroToJSONTransformer) AvroBinaryToNative ¶
func (transformer AvroToJSONTransformer) AvroBinaryToNative(avroBinary []byte) (interface{}, error)
AvroBinaryToNative :
func (AvroToJSONTransformer) AvroBinaryToTextual ¶
func (transformer AvroToJSONTransformer) AvroBinaryToTextual(avroBinary []byte) ([]byte, error)
AvroBinaryToTextual :
type BatchSink ¶
type BatchSink struct { TargetURL string Send chan Message Client *retryablehttp.Client BufferSize int FlushTimeout time.Duration FormatToBIRs FormatToBIRs // TODO: Change to be more generic ? (sending []byte or interface{}) DryRun bool }
BatchSink ..
func NewBatchSink ¶
func NewBatchSink(targetURL string, client *retryablehttp.Client, bufferSize int, flushTimeout time.Duration, formatToBIRs FormatToBIRs, dryRun bool) *BatchSink
NewBatchSink constructor for BatchSink
func (*BatchSink) AddMessageToQueue ¶
func (*BatchSink) SendToIngester ¶
func (sink *BatchSink) SendToIngester(ctx context.Context, bir *BulkIngestRequest) error
type BulkIngestRequest ¶
type Config ¶
type Config struct { Mode Mode `json:"mode"` ExistingAsMaster bool `json:"existingAsMaster"` Type string `json:"type,omitempty"` LinkKey string `json:"linkKey,omitempty"` Groups []Group `json:"groups,omitempty"` }
Config wraps all rules for document merging
type ConsumerProcessor ¶
type ConsumerProcessor interface {
Process(message *sarama.ConsumerMessage)
}
type DecodedKafkaMessage ¶
type DecodedKafkaMessage struct {
Data map[string]interface{}
}
DecodedKafkaMessage holds a json decoded version of the kafka message It can be used to avoid decoding data multiple time (which is very consuming)
func (DecodedKafkaMessage) String ¶
func (msg DecodedKafkaMessage) String() string
type DefaultConsumer ¶
type DefaultConsumer struct { Ready chan bool // contains filtered or unexported fields }
DefaultConsumer represents a Sarama consumer group consumer
func NewDefaultConsumer ¶
func NewDefaultConsumer(processor ConsumerProcessor) DefaultConsumer
func (*DefaultConsumer) Cleanup ¶
func (consumer *DefaultConsumer) Cleanup(sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*DefaultConsumer) ConsumeClaim ¶
func (consumer *DefaultConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (*DefaultConsumer) Setup ¶
func (consumer *DefaultConsumer) Setup(sarama.ConsumerGroupSession) error
Setup is run at the beginning of a new session, before ConsumeClaim
type DefaultMultiConsumer ¶
type DefaultMultiConsumer struct { Ready chan bool // contains filtered or unexported fields }
DefaultMultiConsumer represents a Sarama consumer group consumer
func NewDefaultMultiConsumer ¶
func NewDefaultMultiConsumer(processors map[string]ConsumerProcessor) DefaultMultiConsumer
func (*DefaultMultiConsumer) Cleanup ¶
func (consumer *DefaultMultiConsumer) Cleanup(sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*DefaultMultiConsumer) ConsumeClaim ¶
func (consumer *DefaultMultiConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (*DefaultMultiConsumer) Setup ¶
func (consumer *DefaultMultiConsumer) Setup(sarama.ConsumerGroupSession) error
Setup is run at the beginning of a new session, before ConsumeClaim
type Document ¶
type Document struct { ID string `json:"id"` Index string `json:"index"` IndexType string `json:"type"` Source map[string]interface{} `json:"source"` }
Document represent an es document
type FieldMath ¶
type FieldMath struct { Expression string `json:"expression"` OutputField string `json:"outputField"` }
FieldMath specify a merge rule using a math expression
type FilterHeaderOption ¶
type FilteredJsonMessage ¶
type FilteredJsonMessage struct {
Data map[string]interface{}
}
FilteredJsonMessage output once we've filtered the myrtea fields from the kafka messages
func (FilteredJsonMessage) String ¶
func (msg FilteredJsonMessage) String() string
type FormatToBIRs ¶
type FormatToBIRs func([]Message) []*BulkIngestRequest
type Group ¶
type Group struct { Condition string `json:"condition,omitempty"` FieldReplace []string `json:"fieldReplace,omitempty"` FieldReplaceIfMissing []string `json:"fieldReplaceIfMissing,omitempty"` FieldMerge []string `json:"fieldMerge,omitempty"` FieldMath []FieldMath `json:"fieldMath,omitempty"` FieldKeepLatest []string `json:"fieldKeepLatest,omitempty"` FieldKeepEarliest []string `json:"fieldKeepEarliest,omitempty"` FieldForceUpdate []string `json:"fieldForceUpdate,omitempty"` }
Group allows to group un set of merge fields and to define an optional condition to applay the merge fields
type JSONMapper ¶
type JSONMapper struct {
// contains filtered or unexported fields
}
JSONMapper :
func (JSONMapper) DecodeDocument ¶
func (mapper JSONMapper) DecodeDocument(msg Message) (Message, error)
DecodeDocument not implemented here, it only uses msg
func (JSONMapper) FilterDocument ¶
func (mapper JSONMapper) FilterDocument(msg Message) (bool, string)
func (JSONMapper) MapToDocument ¶
func (mapper JSONMapper) MapToDocument(msg Message) (Message, error)
MapAvroToDocument :
type JSONMapperConfigItem ¶
type JSONMapperConfigItem struct { Mandatory bool FieldType string DefaultValue interface{} DateFormat string Paths [][]string OtherPaths [][]string ArrayPath []string Separator string }
JSONMapperConfigItem :
type JSONMapperFilterItem ¶
type JSONMapperJsoniter ¶
type JSONMapperJsoniter struct {
// contains filtered or unexported fields
}
JSONMapperJsoniter :
func NewJSONMapperJsoniter ¶
func NewJSONMapperJsoniter(name, path string) (*JSONMapperJsoniter, error)
NewJSONMapperJsoniter :
func (JSONMapperJsoniter) DecodeDocument ¶
func (mapper JSONMapperJsoniter) DecodeDocument(msg Message) (Message, error)
DecodeDocument returns a DecodedKafkaMessage and contains a map with json decoded data
func (JSONMapperJsoniter) FilterDocument ¶
func (mapper JSONMapperJsoniter) FilterDocument(msg Message) (bool, string)
FilterDocument checks if document is filtered or not, returns if documents valid and if invalid, the following reason
func (JSONMapperJsoniter) MapToDocument ¶
func (mapper JSONMapperJsoniter) MapToDocument(msg Message) (Message, error)
MapToDocument Maps data to document
type KafkaMessage ¶
type KafkaMessage struct {
Data []byte
}
KafkaMessage ...
func (KafkaMessage) GetData ¶
func (kMessage KafkaMessage) GetData() []byte
GetData Getter for the message data
func (KafkaMessage) String ¶
func (kMessage KafkaMessage) String() string
type Mapper ¶
type Mapper interface { // FilterDocument checks if document is filtered or not, returns if documents valid and if invalid, // the following reason. It is using the given filters. FilterDocument(msg Message) (bool, string) // MapToDocument Maps data to document MapToDocument(msg Message) (Message, error) // DecodeDocument is a function that just decodes a document and returns it // You can use it if you want to decode a message only "once" and not in each // individual function DecodeDocument(msg Message) (Message, error) }
type MessageWithOptions ¶
MessageWithOptions output once we've filtered the myrtea fields from the kafka messages
func (MessageWithOptions) String ¶
func (msg MessageWithOptions) String() string
type Mode ¶
type Mode int
Mode ...
func (Mode) MarshalJSON ¶
MarshalJSON marshals the enum as a quoted json string
func (*Mode) UnmarshalJSON ¶
UnmarshalJSON unmashals a quoted json string to the enum value
type Reloader ¶
type Reloader struct {
// contains filtered or unexported fields
}
func NewReloader ¶
func (*Reloader) BindEndpoint ¶
func (re *Reloader) BindEndpoint(rg chi.Router)
BindEndpoint Binds the reload endpoint to a existing router
type Restarter ¶
type Restarter struct {
// contains filtered or unexported fields
}
func (*Restarter) BindEndpoint ¶
func (re *Restarter) BindEndpoint(rg chi.Router)
BindEndpoint Binds the restart endpoint to an existing router
type SaramaDebugLogger ¶
type SaramaDebugLogger struct {
// contains filtered or unexported fields
}
SaramaDebugLogger wraps zap.Logger with the sarama.StdLogger interface
func NewSaramaDebugLogger ¶
func NewSaramaDebugLogger(zl *zap.Logger) *SaramaDebugLogger
NewSaramaLogger initializes a new SaramaDebugLogger with a zap.Logger
func (*SaramaDebugLogger) Print ¶
func (s *SaramaDebugLogger) Print(v ...interface{})
func (*SaramaDebugLogger) Printf ¶
func (s *SaramaDebugLogger) Printf(format string, v ...interface{})
func (*SaramaDebugLogger) Println ¶
func (s *SaramaDebugLogger) Println(v ...interface{})
type SaramaLogger ¶
type SaramaLogger struct {
// contains filtered or unexported fields
}
SaramaLogger wraps zap.Logger with the sarama.StdLogger interface
func NewSaramaLogger ¶
func NewSaramaLogger(zl *zap.Logger) *SaramaLogger
NewSaramaLogger initializes a new SaramaLogger with a zap.Logger
func (*SaramaLogger) Print ¶
func (s *SaramaLogger) Print(v ...interface{})
func (*SaramaLogger) Printf ¶
func (s *SaramaLogger) Printf(format string, v ...interface{})
func (*SaramaLogger) Println ¶
func (s *SaramaLogger) Println(v ...interface{})
type Transformer ¶
Transformer ..