Documentation ¶
Index ¶
- Constants
- Variables
- func ApplyFieldForceUpdate(fieldUpdate []string, enricherSource map[string]interface{}, ...)
- func ApplyFieldKeepEarliest(fieldKeepEarliest []string, enricherSource map[string]interface{}, ...)
- func ApplyFieldKeepLatest(fieldKeepLatest []string, enricherSource map[string]interface{}, ...)
- func ApplyFieldMath(config []FieldMath, newDoc *models.Document, existingDoc *models.Document, ...)
- func ApplyFieldMerge(fieldMerge []string, enricherSource map[string]interface{}, ...)
- func ApplyFieldReplace(fieldReplace []string, enricherSource map[string]interface{}, ...)
- func ApplyFieldReplaceIfMissing(fieldReplace []string, enricherSource map[string]interface{}, ...)
- func FilterHeaders(filters []FilterHeaderOption, headers []*sarama.RecordHeader) (bool, string)
- func GetHeader(key string, headers []*sarama.RecordHeader) (value string, found bool)
- func LookupNestedMap(pathParts []string, data interface{}) (interface{}, bool)
- func LookupNestedMapFullPaths(data interface{}, paths [][]string, separator string) (interface{}, bool)
- type AvroToJSONTransformer
- type AvroToMapTransformer
- type BatchSink
- type BulkIngestRequest
- type Config
- type ConsumerParams
- type ConsumerProcessor
- type DecodedKafkaMessage
- type DefaultConsumer
- type DefaultMultiConsumer
- type Document
- type FieldMath
- type FilterHeaderOption
- type FilteredJsonMessage
- type FormatToBIRs
- type Group
- type JSONMapper
- type JSONMapperConfigItem
- type JSONMapperFilterItem
- type JSONMapperJsoniter
- type JSONMapperMap
- type KafkaMessage
- type Lookup
- type Mapper
- type Message
- type MessageWithOptions
- type Mode
- type Processor
- type Reloader
- type Restarter
- type SaramaDebugLogger
- type SaramaLogger
- type Sink
- type SinkManager
- type Transformer
- type TypedDataMessage
Constants ¶
const DefaultMaxPermittedPanics = 10
Variables ¶
var ReloaderComponentNotFoundErr = errors.New("component not found")
Functions ¶
func ApplyFieldForceUpdate ¶
func ApplyFieldForceUpdate(fieldUpdate []string, enricherSource map[string]interface{}, outputSource map[string]interface{})
ApplyFieldForceUpdate applies FieldForceUpdate merging configuration on input documents
func ApplyFieldKeepEarliest ¶
func ApplyFieldKeepEarliest(fieldKeepEarliest []string, enricherSource map[string]interface{}, outputSource map[string]interface{})
ApplyFieldKeepEarliest applies all FieldKeepEarliest merging configuration on input documents
func ApplyFieldKeepLatest ¶
func ApplyFieldKeepLatest(fieldKeepLatest []string, enricherSource map[string]interface{}, outputSource map[string]interface{})
ApplyFieldKeepLatest applies all FieldKeepLatest merging configuration on input documents
func ApplyFieldMath ¶
func ApplyFieldMath(config []FieldMath, newDoc *models.Document, existingDoc *models.Document, outputSource map[string]interface{})
ApplyFieldMath applies all FieldMath merging configuration on input documents
func ApplyFieldMerge ¶
func ApplyFieldMerge(fieldMerge []string, enricherSource map[string]interface{}, outputSource map[string]interface{})
ApplyFieldMerge applies all FieldReplace merging configuration on input documents Merge can merge a single field with a slice, or vice versa The result of a merge is always a slice with unique fields
func ApplyFieldReplace ¶
func ApplyFieldReplace(fieldReplace []string, enricherSource map[string]interface{}, outputSource map[string]interface{})
ApplyFieldReplace applies all FieldReplace merging configuration on input documents
func ApplyFieldReplaceIfMissing ¶
func ApplyFieldReplaceIfMissing(fieldReplace []string, enricherSource map[string]interface{}, outputSource map[string]interface{})
ApplyFieldReplaceIfMissing applies all FieldReplace merging configuration on input documents
func FilterHeaders ¶
func FilterHeaders(filters []FilterHeaderOption, headers []*sarama.RecordHeader) (bool, string)
func GetHeader ¶ added in v5.1.4
func GetHeader(key string, headers []*sarama.RecordHeader) (value string, found bool)
func LookupNestedMap ¶
LookupNestedMap lookup for a value corresponding to the exact specified path inside a map
func LookupNestedMapFullPaths ¶
func LookupNestedMapFullPaths(data interface{}, paths [][]string, separator string) (interface{}, bool)
LookupNestedMapFullPaths Looks searches value of all paths in data and concatenates them with a separator
Types ¶
type AvroToJSONTransformer ¶
type AvroToJSONTransformer struct {
// contains filtered or unexported fields
}
AvroToJSONTransformer : Deprecated
func NewAvroToJSONTransformer ¶
func NewAvroToJSONTransformer(schemaRegistryEndpoint string, ttlCacheDuration time.Duration) (*AvroToJSONTransformer, error)
NewAvroToJSONTransformer New transformer constructor TODO : Manage multiple schemaRegistryEndpoint ? In case of server failure ? Deprecated
func (AvroToJSONTransformer) AvroBinaryToNative ¶
func (transformer AvroToJSONTransformer) AvroBinaryToNative(avroBinary []byte) (interface{}, error)
AvroBinaryToNative : Deprecated
func (AvroToJSONTransformer) AvroBinaryToTextual ¶
func (transformer AvroToJSONTransformer) AvroBinaryToTextual(avroBinary []byte) ([]byte, error)
AvroBinaryToTextual : Deprecated
type AvroToMapTransformer ¶ added in v5.1.4
type AvroToMapTransformer struct {
// contains filtered or unexported fields
}
AvroToMapTransformer :
func NewAvroToMapTransformer ¶ added in v5.1.4
func NewAvroToMapTransformer(schemaRegistryEndpoint string, ttlCacheDuration time.Duration) (*AvroToMapTransformer, error)
NewAvroToMapTransformer New transformer constructor TODO : Manage multiple schemaRegistryEndpoint ? In case of server failure ?
type BatchSink ¶
type BatchSink struct { TargetURL string Send chan Message Client *retryablehttp.Client BufferSize int FlushTimeout time.Duration FormatToBIRs FormatToBIRs // TODO: Change to be more generic ? (sending []byte or interface{}) DryRun bool }
BatchSink ..
func NewBatchSink ¶
func NewBatchSink(targetURL string, client *retryablehttp.Client, bufferSize int, flushTimeout time.Duration, formatToBIRs FormatToBIRs, dryRun bool) *BatchSink
NewBatchSink constructor for BatchSink
func (*BatchSink) AddMessageToQueue ¶
func (*BatchSink) SendToIngester ¶
func (sink *BatchSink) SendToIngester(ctx context.Context, bir *BulkIngestRequest) error
type BulkIngestRequest ¶
type Config ¶
type Config struct { Mode Mode `json:"mode"` ExistingAsMaster bool `json:"existingAsMaster"` Type string `json:"type,omitempty"` LinkKey string `json:"linkKey,omitempty"` Groups []Group `json:"groups,omitempty"` }
Config wraps all rules for document merging
type ConsumerParams ¶ added in v5.1.5
func NewDefaultConsumerParams ¶ added in v5.1.5
func NewDefaultConsumerParams(done chan os.Signal) ConsumerParams
type ConsumerProcessor ¶
type ConsumerProcessor interface {
Process(message *sarama.ConsumerMessage)
}
type DecodedKafkaMessage ¶
type DecodedKafkaMessage struct {
Data map[string]interface{}
}
DecodedKafkaMessage holds a json decoded version of the kafka message It can be used to avoid decoding data multiple time (which is very consuming)
func (DecodedKafkaMessage) String ¶
func (msg DecodedKafkaMessage) String() string
type DefaultConsumer ¶
type DefaultConsumer struct { Ready chan bool ConsumerParams // contains filtered or unexported fields }
DefaultConsumer represents a Sarama consumer group consumer
func NewDefaultConsumer ¶
func NewDefaultConsumer(processor ConsumerProcessor, params ConsumerParams) DefaultConsumer
func (*DefaultConsumer) Cleanup ¶
func (consumer *DefaultConsumer) Cleanup(sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*DefaultConsumer) ConsumeClaim ¶
func (consumer *DefaultConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (*DefaultConsumer) Setup ¶
func (consumer *DefaultConsumer) Setup(sarama.ConsumerGroupSession) error
Setup is run at the beginning of a new session, before ConsumeClaim
type DefaultMultiConsumer ¶
type DefaultMultiConsumer struct { Ready chan bool ConsumerParams // contains filtered or unexported fields }
DefaultMultiConsumer represents a Sarama consumer group consumer
func NewDefaultMultiConsumer ¶
func NewDefaultMultiConsumer(processors map[string]ConsumerProcessor, params ConsumerParams) DefaultMultiConsumer
func (*DefaultMultiConsumer) Cleanup ¶
func (consumer *DefaultMultiConsumer) Cleanup(sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*DefaultMultiConsumer) ConsumeClaim ¶
func (consumer *DefaultMultiConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (*DefaultMultiConsumer) Setup ¶
func (consumer *DefaultMultiConsumer) Setup(sarama.ConsumerGroupSession) error
Setup is run at the beginning of a new session, before ConsumeClaim
type Document ¶
type Document struct { ID string `json:"id"` Index string `json:"index"` IndexType string `json:"type"` Source map[string]interface{} `json:"source"` }
Document represent an es document
type FieldMath ¶
type FieldMath struct { Expression string `json:"expression"` OutputField string `json:"outputField"` }
FieldMath specify a merge rule using a math expression
type FilterHeaderOption ¶
type FilteredJsonMessage ¶
type FilteredJsonMessage struct {
Data map[string]interface{}
}
FilteredJsonMessage output once we've filtered the myrtea fields from the kafka messages
func (FilteredJsonMessage) String ¶
func (msg FilteredJsonMessage) String() string
type FormatToBIRs ¶
type FormatToBIRs func([]Message) []*BulkIngestRequest
type Group ¶
type Group struct { Condition string `json:"condition,omitempty"` FieldReplace []string `json:"fieldReplace,omitempty"` FieldReplaceIfMissing []string `json:"fieldReplaceIfMissing,omitempty"` FieldMerge []string `json:"fieldMerge,omitempty"` FieldMath []FieldMath `json:"fieldMath,omitempty"` FieldKeepLatest []string `json:"fieldKeepLatest,omitempty"` FieldKeepEarliest []string `json:"fieldKeepEarliest,omitempty"` FieldForceUpdate []string `json:"fieldForceUpdate,omitempty"` }
Group allows to group un set of merge fields and to define an optional condition to applay the merge fields
type JSONMapper ¶
type JSONMapper struct {
// contains filtered or unexported fields
}
JSONMapper :
func (JSONMapper) DecodeDocument ¶
func (mapper JSONMapper) DecodeDocument(msg Message) (Message, error)
DecodeDocument not implemented here, it only uses msg
func (JSONMapper) FilterDocument ¶
func (mapper JSONMapper) FilterDocument(msg Message) (bool, string)
func (JSONMapper) MapToDocument ¶
func (mapper JSONMapper) MapToDocument(msg Message) (Message, error)
MapAvroToDocument :
type JSONMapperConfigItem ¶
type JSONMapperConfigItem struct { Mandatory bool FieldType string DefaultValue interface{} DateFormat string Paths [][]string OtherPaths [][]string ArrayPath []string Separator string }
JSONMapperConfigItem :
type JSONMapperFilterItem ¶
type JSONMapperJsoniter ¶
type JSONMapperJsoniter struct {
// contains filtered or unexported fields
}
JSONMapperJsoniter :
func NewJSONMapperJsoniter ¶
func NewJSONMapperJsoniter(name, path string) (*JSONMapperJsoniter, error)
NewJSONMapperJsoniter :
func (JSONMapperJsoniter) DecodeDocument ¶
func (mapper JSONMapperJsoniter) DecodeDocument(msg Message) (Message, error)
DecodeDocument returns a DecodedKafkaMessage and contains a map with json decoded data
func (JSONMapperJsoniter) FilterDocument ¶
func (mapper JSONMapperJsoniter) FilterDocument(msg Message) (bool, string)
FilterDocument checks if document is filtered or not, returns if documents valid and if invalid, the following reason
func (JSONMapperJsoniter) MapToDocument ¶
func (mapper JSONMapperJsoniter) MapToDocument(msg Message) (Message, error)
MapToDocument Maps data to document
type JSONMapperMap ¶ added in v5.1.4
type JSONMapperMap struct {
// contains filtered or unexported fields
}
JSONMapperMap :
func NewJSONMapperMap ¶ added in v5.1.4
func NewJSONMapperMap(name, path string) (*JSONMapperMap, error)
NewJSONMapperMap :
func (JSONMapperMap) DecodeDocument ¶ added in v5.1.4
func (mapper JSONMapperMap) DecodeDocument(_ Message) (Message, error)
DecodeDocument returns a DecodedKafkaMessage and contains a map with json decoded data
func (JSONMapperMap) FilterDocument ¶ added in v5.1.4
func (mapper JSONMapperMap) FilterDocument(msg Message) (bool, string)
FilterDocument checks if document is filtered or not, returns if documents valid and if invalid, the following reason
func (JSONMapperMap) MapToDocument ¶ added in v5.1.4
func (mapper JSONMapperMap) MapToDocument(msg Message) (Message, error)
MapToDocument Maps data to document
type KafkaMessage ¶
type KafkaMessage struct {
Data []byte
}
KafkaMessage ...
func (KafkaMessage) GetData ¶
func (kMessage KafkaMessage) GetData() []byte
GetData Getter for the message data
func (KafkaMessage) String ¶
func (kMessage KafkaMessage) String() string
type Mapper ¶
type Mapper interface { // FilterDocument checks if document is filtered or not, returns if documents valid and if invalid, // the following reason. It is using the given filters. FilterDocument(msg Message) (bool, string) // MapToDocument Maps data to document MapToDocument(msg Message) (Message, error) // DecodeDocument is a function that just decodes a document and returns it // You can use it if you want to decode a message only "once" and not in each // individual function DecodeDocument(msg Message) (Message, error) }
type MessageWithOptions ¶
MessageWithOptions output once we've filtered the myrtea fields from the kafka messages
func (MessageWithOptions) String ¶
func (msg MessageWithOptions) String() string
type Mode ¶
type Mode int
Mode ...
func (Mode) MarshalJSON ¶
MarshalJSON marshals the enum as a quoted json string
func (*Mode) UnmarshalJSON ¶
UnmarshalJSON unmashals a quoted json string to the enum value
type Reloader ¶
type Reloader struct {
// contains filtered or unexported fields
}
func NewReloader ¶
func (*Reloader) BindEndpoint ¶
func (re *Reloader) BindEndpoint(rg chi.Router)
BindEndpoint Binds the reload endpoint to a existing router
type Restarter ¶
type Restarter struct {
// contains filtered or unexported fields
}
func (*Restarter) BindEndpoint ¶
func (re *Restarter) BindEndpoint(rg chi.Router)
BindEndpoint Binds the restart endpoint to an existing router
type SaramaDebugLogger ¶
type SaramaDebugLogger struct {
// contains filtered or unexported fields
}
SaramaDebugLogger wraps zap.Logger with the sarama.StdLogger interface
func NewSaramaDebugLogger ¶
func NewSaramaDebugLogger(zl *zap.Logger) *SaramaDebugLogger
NewSaramaLogger initializes a new SaramaDebugLogger with a zap.Logger
func (*SaramaDebugLogger) Print ¶
func (s *SaramaDebugLogger) Print(v ...interface{})
func (*SaramaDebugLogger) Printf ¶
func (s *SaramaDebugLogger) Printf(format string, v ...interface{})
func (*SaramaDebugLogger) Println ¶
func (s *SaramaDebugLogger) Println(v ...interface{})
type SaramaLogger ¶
type SaramaLogger struct {
// contains filtered or unexported fields
}
SaramaLogger wraps zap.Logger with the sarama.StdLogger interface
func NewSaramaLogger ¶
func NewSaramaLogger(zl *zap.Logger) *SaramaLogger
NewSaramaLogger initializes a new SaramaLogger with a zap.Logger
func (*SaramaLogger) Print ¶
func (s *SaramaLogger) Print(v ...interface{})
func (*SaramaLogger) Printf ¶
func (s *SaramaLogger) Printf(format string, v ...interface{})
func (*SaramaLogger) Println ¶
func (s *SaramaLogger) Println(v ...interface{})
type SinkManager ¶ added in v5.1.5
type SinkManager struct {
// contains filtered or unexported fields
}
func NewSinkManager ¶ added in v5.1.5
func NewSinkManager() *SinkManager
func (*SinkManager) AddSink ¶ added in v5.1.5
func (sm *SinkManager) AddSink(sink Sink)
AddSink add a sink to the manager
func (*SinkManager) StartAll ¶ added in v5.1.5
func (sm *SinkManager) StartAll(ctx context.Context)
StartAll starts all sinks in separate go-routines
func (*SinkManager) StopAll ¶ added in v5.1.5
func (sm *SinkManager) StopAll()
StopAll call all sinks to stop
type Transformer ¶
Transformer ..
Source Files ¶
- kafka_filters.go
- mapper.go
- mapper_json.go
- mapper_json_jsoniter.go
- mapper_json_jsonparser.go
- mapper_json_map.go
- mergeconfig.go
- message.go
- mode.go
- processor.go
- reloader.go
- restarter.go
- sink.go
- sink_batch.go
- sink_manager.go
- source_kafka_consumer.go
- source_kafka_logger.go
- transformer.go
- transformer_avro_json.go
- transformer_avro_map.go
- utils.go