Documentation ¶
Index ¶
- Variables
- func ApplyFieldForceUpdate(fieldUpdate []string, enricherSource map[string]interface{}, ...)
- func ApplyFieldKeepEarliest(fieldKeepEarliest []string, enricherSource map[string]interface{}, ...)
- func ApplyFieldKeepLatest(fieldKeepLatest []string, enricherSource map[string]interface{}, ...)
- func ApplyFieldMath(config []FieldMath, newDoc *models.Document, existingDoc *models.Document, ...)
- func ApplyFieldMerge(fieldMerge []string, enricherSource map[string]interface{}, ...)
- func ApplyFieldReplace(fieldReplace []string, enricherSource map[string]interface{}, ...)
- func ApplyFieldReplaceIfMissing(fieldReplace []string, enricherSource map[string]interface{}, ...)
- func DoubleUnescapeUnicode(s string) ([]byte, error)
- func FilterHeaders(filters []FilterHeaderOption, headers []*sarama.RecordHeader) (bool, string)
- func LookupNestedMap(pathParts []string, data interface{}) (interface{}, bool)
- func LookupNestedMapFullPaths(data interface{}, paths [][]string, separator string) (interface{}, bool)
- type AvroToJSONTransformer
- type BatchSink
- type BulkIngestRequest
- type Config
- type ConsumerProcessor
- type DecodedKafkaMessage
- type DefaultConsumer
- type DefaultMultiConsumer
- type Document
- type FieldMath
- type FilterHeaderOption
- type FilteredJsonMessage
- type FormatToBIRs
- type Group
- type JSONMapper
- type JSONMapperConfigItem
- type JSONMapperFilterItem
- type JSONMapperJsoniter
- type KafkaMessage
- type Lookup
- type Mapper
- type Message
- type MessageWithOptions
- type Mode
- type Processor
- type Reloader
- type Restarter
- type SaramaDebugLogger
- type SaramaLogger
- type Sink
- type Transformer
- type TypedDataMessage
Constants ¶
This section is empty.
Variables ¶
var ReloaderComponentNotFoundErr = errors.New("component not found")
Functions ¶
func ApplyFieldForceUpdate ¶ added in v4.2.9
func ApplyFieldForceUpdate(fieldUpdate []string, enricherSource map[string]interface{}, outputSource map[string]interface{})
ApplyFieldForceUpdate applies FieldForceUpdate merging configuration on input documents
func ApplyFieldKeepEarliest ¶ added in v4.2.9
func ApplyFieldKeepEarliest(fieldKeepEarliest []string, enricherSource map[string]interface{}, outputSource map[string]interface{})
ApplyFieldKeepEarliest applies all FieldKeepEarliest merging configuration on input documents
func ApplyFieldKeepLatest ¶ added in v4.2.9
func ApplyFieldKeepLatest(fieldKeepLatest []string, enricherSource map[string]interface{}, outputSource map[string]interface{})
ApplyFieldKeepLatest applies all FieldKeepLatest merging configuration on input documents
func ApplyFieldMath ¶ added in v4.2.9
func ApplyFieldMath(config []FieldMath, newDoc *models.Document, existingDoc *models.Document, outputSource map[string]interface{})
ApplyFieldMath applies all FieldMath merging configuration on input documents
func ApplyFieldMerge ¶ added in v4.2.9
func ApplyFieldMerge(fieldMerge []string, enricherSource map[string]interface{}, outputSource map[string]interface{})
ApplyFieldMerge applies all FieldReplace merging configuration on input documents
func ApplyFieldReplace ¶ added in v4.2.9
func ApplyFieldReplace(fieldReplace []string, enricherSource map[string]interface{}, outputSource map[string]interface{})
ApplyFieldReplace applies all FieldReplace merging configuration on input documents
func ApplyFieldReplaceIfMissing ¶ added in v4.2.9
func ApplyFieldReplaceIfMissing(fieldReplace []string, enricherSource map[string]interface{}, outputSource map[string]interface{})
ApplyFieldReplaceIfMissing applies all FieldReplace merging configuration on input documents
func DoubleUnescapeUnicode ¶
DoubleUnescapeUnicode is a special function to extract avro binaries from a JSON encoded string This function has been built for a very specific usage and may not works on other messages
func FilterHeaders ¶ added in v4.2.1
func FilterHeaders(filters []FilterHeaderOption, headers []*sarama.RecordHeader) (bool, string)
func LookupNestedMap ¶ added in v4.5.7
LookupNestedMap lookup for a value corresponding to the exact specified path inside a map
func LookupNestedMapFullPaths ¶ added in v4.5.7
func LookupNestedMapFullPaths(data interface{}, paths [][]string, separator string) (interface{}, bool)
LookupNestedMapFullPaths Looks searches value of all paths in data and concatenates them with a separator
Types ¶
type AvroToJSONTransformer ¶
type AvroToJSONTransformer struct {
// contains filtered or unexported fields
}
AvroToJSONTransformer :
func NewAvroToJSONTransformer ¶
func NewAvroToJSONTransformer(schemaRegistryEndpoint string, ttlCacheDuration time.Duration) (*AvroToJSONTransformer, error)
New transformer constructor TODO : Manage multiple schemaRegistryEndpoint ? In case of server failure ?
func (AvroToJSONTransformer) AvroBinaryToNative ¶
func (transformer AvroToJSONTransformer) AvroBinaryToNative(avroBinary []byte) (interface{}, error)
AvroBinaryToNative :
func (AvroToJSONTransformer) AvroBinaryToTextual ¶
func (transformer AvroToJSONTransformer) AvroBinaryToTextual(avroBinary []byte) ([]byte, error)
AvroBinaryToTextual :
type BatchSink ¶
type BatchSink struct { TargetURL string Send chan Message Client *retryablehttp.Client BufferSize int FlushTimeout time.Duration FormatToBIRs FormatToBIRs // TODO: Change to be more generic ? (sending []byte or interface{}) DryRun bool }
BatchSink ..
func NewBatchSink ¶
func NewBatchSink(targetURL string, client *retryablehttp.Client, bufferSize int, flushTimeout time.Duration, formatToBIRs FormatToBIRs, dryRun bool) *BatchSink
NewBatchSink constructor for BatchSink
func (*BatchSink) AddMessageToQueue ¶
func (*BatchSink) SendToIngester ¶
func (sink *BatchSink) SendToIngester(ctx context.Context, bir *BulkIngestRequest) error
type BulkIngestRequest ¶
type Config ¶
type Config struct { Mode Mode `json:"mode"` ExistingAsMaster bool `json:"existingAsMaster"` Type string `json:"type,omitempty"` LinkKey string `json:"linkKey,omitempty"` Groups []Group `json:"groups,omitempty"` }
Config wraps all rules for document merging
type ConsumerProcessor ¶ added in v4.2.0
type ConsumerProcessor interface {
Process(message *sarama.ConsumerMessage)
}
type DecodedKafkaMessage ¶ added in v4.5.6
type DecodedKafkaMessage struct {
Data map[string]interface{}
}
DecodedKafkaMessage holds a json decoded version of the kafka message It can be used to avoid decoding data multiple time (which is very consuming)
func (DecodedKafkaMessage) String ¶ added in v4.5.6
func (msg DecodedKafkaMessage) String() string
type DefaultConsumer ¶ added in v4.2.0
type DefaultConsumer struct { Ready chan bool // contains filtered or unexported fields }
DefaultConsumer represents a Sarama consumer group consumer
func NewDefaultConsumer ¶ added in v4.2.0
func NewDefaultConsumer(processor ConsumerProcessor) DefaultConsumer
func (*DefaultConsumer) Cleanup ¶ added in v4.2.0
func (consumer *DefaultConsumer) Cleanup(sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*DefaultConsumer) ConsumeClaim ¶ added in v4.2.0
func (consumer *DefaultConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (*DefaultConsumer) Setup ¶ added in v4.2.0
func (consumer *DefaultConsumer) Setup(sarama.ConsumerGroupSession) error
Setup is run at the beginning of a new session, before ConsumeClaim
type DefaultMultiConsumer ¶ added in v4.2.0
type DefaultMultiConsumer struct { Ready chan bool // contains filtered or unexported fields }
DefaultMultiConsumer represents a Sarama consumer group consumer
func NewDefaultMultiConsumer ¶ added in v4.2.0
func NewDefaultMultiConsumer(processors map[string]ConsumerProcessor) DefaultMultiConsumer
func (*DefaultMultiConsumer) Cleanup ¶ added in v4.2.0
func (consumer *DefaultMultiConsumer) Cleanup(sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*DefaultMultiConsumer) ConsumeClaim ¶ added in v4.2.0
func (consumer *DefaultMultiConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (*DefaultMultiConsumer) Setup ¶ added in v4.2.0
func (consumer *DefaultMultiConsumer) Setup(sarama.ConsumerGroupSession) error
Setup is run at the beginning of a new session, before ConsumeClaim
type Document ¶
type Document struct { ID string `json:"id"` Index string `json:"index"` IndexType string `json:"type"` Source map[string]interface{} `json:"source"` }
Document represent an es document
type FieldMath ¶
type FieldMath struct { Expression string `json:"expression"` OutputField string `json:"outputField"` }
FieldMath specify a merge rule using a math expression
type FilterHeaderOption ¶ added in v4.2.1
type FilteredJsonMessage ¶
type FilteredJsonMessage struct {
Data map[string]interface{}
}
FilteredJsonMessage output once we've filtered the myrtea fields from the kafka messages
func (FilteredJsonMessage) String ¶
func (msg FilteredJsonMessage) String() string
type FormatToBIRs ¶ added in v4.1.6
type FormatToBIRs func([]Message) []*BulkIngestRequest
type Group ¶
type Group struct { Condition string `json:"condition,omitempty"` FieldReplace []string `json:"fieldReplace,omitempty"` FieldReplaceIfMissing []string `json:"fieldReplaceIfMissing,omitempty"` FieldMerge []string `json:"fieldMerge,omitempty"` FieldMath []FieldMath `json:"fieldMath,omitempty"` FieldKeepLatest []string `json:"fieldKeepLatest,omitempty"` FieldKeepEarliest []string `json:"fieldKeepEarliest,omitempty"` FieldForceUpdate []string `json:"fieldForceUpdate,omitempty"` }
Group allows to group un set of merge fields and to define an optional condition to applay the merge fields
type JSONMapper ¶
type JSONMapper struct {
// contains filtered or unexported fields
}
JSONMapper :
func (JSONMapper) DecodeDocument ¶ added in v4.5.6
func (mapper JSONMapper) DecodeDocument(msg Message) (Message, error)
DecodeDocument not implemented here, it only uses msg
func (JSONMapper) FilterDocument ¶
func (mapper JSONMapper) FilterDocument(msg Message) (bool, string)
func (JSONMapper) MapToDocument ¶
func (mapper JSONMapper) MapToDocument(msg Message) (Message, error)
MapAvroToDocument :
type JSONMapperConfigItem ¶
type JSONMapperConfigItem struct { Mandatory bool FieldType string DefaultValue interface{} DateFormat string Paths [][]string OtherPaths [][]string ArrayPath []string Separator string }
JSONMapperConfigItem :
type JSONMapperFilterItem ¶
type JSONMapperJsoniter ¶ added in v4.2.4
type JSONMapperJsoniter struct {
// contains filtered or unexported fields
}
JSONMapperJsoniter :
func NewJSONMapperJsoniter ¶ added in v4.2.4
func NewJSONMapperJsoniter(name, path string) (*JSONMapperJsoniter, error)
NewJSONMapperJsoniter :
func (JSONMapperJsoniter) DecodeDocument ¶ added in v4.5.6
func (mapper JSONMapperJsoniter) DecodeDocument(msg Message) (Message, error)
DecodeDocument returns a DecodedKafkaMessage and contains a map with json decoded data
func (JSONMapperJsoniter) FilterDocument ¶ added in v4.2.4
func (mapper JSONMapperJsoniter) FilterDocument(msg Message) (bool, string)
FilterDocument checks if document is filtered or not, returns if documents valid and if invalid, the following reason
func (JSONMapperJsoniter) MapToDocument ¶ added in v4.2.4
func (mapper JSONMapperJsoniter) MapToDocument(msg Message) (Message, error)
MapToDocument Maps data to document
type KafkaMessage ¶
type KafkaMessage struct {
Data []byte
}
KafkaMessage ...
func (KafkaMessage) GetData ¶
func (kMessage KafkaMessage) GetData() []byte
GetData Getter for the message data
func (KafkaMessage) String ¶
func (kMessage KafkaMessage) String() string
type Mapper ¶
type Mapper interface { // FilterDocument checks if document is filtered or not, returns if documents valid and if invalid, // the following reason. It is using the given filters. FilterDocument(msg Message) (bool, string) // MapToDocument Maps data to document MapToDocument(msg Message) (Message, error) // DecodeDocument is a function that just decodes a document and returns it // You can use it if you want to decode a message only "once" and not in each // individual function DecodeDocument(msg Message) (Message, error) }
type MessageWithOptions ¶
MessageWithOptions output once we've filtered the myrtea fields from the kafka messages
func (MessageWithOptions) String ¶
func (msg MessageWithOptions) String() string
type Mode ¶
type Mode int
Mode ...
func (Mode) MarshalJSON ¶
MarshalJSON marshals the enum as a quoted json string
func (*Mode) UnmarshalJSON ¶
UnmarshalJSON unmashals a quoted json string to the enum value
type Reloader ¶ added in v4.4.9
type Reloader struct {
// contains filtered or unexported fields
}
func NewReloader ¶ added in v4.4.9
func (*Reloader) BindEndpoint ¶ added in v4.5.4
func (re *Reloader) BindEndpoint(rg chi.Router)
BindEndpoint Binds the reload endpoint to a existing router
type Restarter ¶ added in v4.5.0
type Restarter struct {
// contains filtered or unexported fields
}
func NewRestarter ¶ added in v4.5.0
func (*Restarter) BindEndpoint ¶ added in v4.5.4
func (re *Restarter) BindEndpoint(rg chi.Router)
BindEndpoint Binds the restart endpoint to an existing router
type SaramaDebugLogger ¶ added in v4.2.0
type SaramaDebugLogger struct {
// contains filtered or unexported fields
}
SaramaDebugLogger wraps zap.Logger with the sarama.StdLogger interface
func NewSaramaDebugLogger ¶ added in v4.2.0
func NewSaramaDebugLogger(zl *zap.Logger) *SaramaDebugLogger
NewSaramaLogger initializes a new SaramaDebugLogger with a zap.Logger
func (*SaramaDebugLogger) Print ¶ added in v4.2.0
func (s *SaramaDebugLogger) Print(v ...interface{})
func (*SaramaDebugLogger) Printf ¶ added in v4.2.0
func (s *SaramaDebugLogger) Printf(format string, v ...interface{})
func (*SaramaDebugLogger) Println ¶ added in v4.2.0
func (s *SaramaDebugLogger) Println(v ...interface{})
type SaramaLogger ¶ added in v4.2.0
type SaramaLogger struct {
// contains filtered or unexported fields
}
SaramaLogger wraps zap.Logger with the sarama.StdLogger interface
func NewSaramaLogger ¶ added in v4.2.0
func NewSaramaLogger(zl *zap.Logger) *SaramaLogger
NewSaramaLogger initializes a new SaramaLogger with a zap.Logger
func (*SaramaLogger) Print ¶ added in v4.2.0
func (s *SaramaLogger) Print(v ...interface{})
func (*SaramaLogger) Printf ¶ added in v4.2.0
func (s *SaramaLogger) Printf(format string, v ...interface{})
func (*SaramaLogger) Println ¶ added in v4.2.0
func (s *SaramaLogger) Println(v ...interface{})
type Transformer ¶
Transformer ..