Documentation ¶
Index ¶
- func DoubleUnescapeUnicode(s string) ([]byte, error)
- type AvroToJSONTransformer
- type BatchSink
- type BulkIngestRequest
- type Config
- type Document
- type FieldMath
- type FilteredJsonMessage
- type Group
- type JSONMapper
- type JSONMapperConfigItem
- type JSONMapperFilterItem
- type KafkaMessage
- type KafkaSource
- type Lookup
- type Mapper
- type Message
- type MessageWithOptions
- type Mode
- type Processor
- type Sink
- type Transformer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DoubleUnescapeUnicode ¶
DoubleUnescapeUnicode is a special function to extract avro binaries from a JSON encoded string This function has been built for a very specific usage and may not works on other messages
Types ¶
type AvroToJSONTransformer ¶
type AvroToJSONTransformer struct {
// contains filtered or unexported fields
}
AvroToJSONTransformer :
func NewAvroToJSONTransformer ¶
func NewAvroToJSONTransformer(schemaRegistryEndpoint string, ttlCacheDuration time.Duration) (*AvroToJSONTransformer, error)
New transformer constructor TODO : Manage multiple schemaRegistryEndpoint ? In case of server failure ?
func (AvroToJSONTransformer) AvroBinaryToNative ¶
func (transformer AvroToJSONTransformer) AvroBinaryToNative(avroBinary []byte) (interface{}, error)
AvroBinaryToNative :
func (AvroToJSONTransformer) AvroBinaryToTextual ¶
func (transformer AvroToJSONTransformer) AvroBinaryToTextual(avroBinary []byte) ([]byte, error)
AvroBinaryToTextual :
type BatchSink ¶
type BatchSink struct { TargetURL string Send chan Message Client *retryablehttp.Client BufferSize int FlushTimeout time.Duration FormatToBIR func([]FilteredJsonMessage) *BulkIngestRequest // TODO: Change to be more generic ? (sending []byte or interface{}) DryRun bool }
BatchSink ..
func NewBatchSink ¶
func NewBatchSink(targetURL string, client *retryablehttp.Client, bufferSize int, flushTimeout time.Duration, formatToBIR func([]FilteredJsonMessage) *BulkIngestRequest, dryRun bool) *BatchSink
NewBatchSink constructor for BatchSink
func (*BatchSink) AddMessageToQueue ¶
func (*BatchSink) SendToIngester ¶
func (sink *BatchSink) SendToIngester(bir *BulkIngestRequest) error
type BulkIngestRequest ¶
type Config ¶
type Config struct { Mode Mode `json:"mode"` ExistingAsMaster bool `json:"existingAsMaster"` Type string `json:"type,omitempty"` LinkKey string `json:"linkKey,omitempty"` Groups []Group `json:"groups,omitempty"` }
Config wraps all rules for document merging
type Document ¶
type Document struct { ID string `json:"id"` Index string `json:"index"` IndexType string `json:"type"` Source interface{} `json:"source"` }
Document represent an es document
type FieldMath ¶
type FieldMath struct { Expression string `json:"expression"` OutputField string `json:"outputField"` }
FieldMath specify a merge rule using a math expression
type FilteredJsonMessage ¶
type FilteredJsonMessage struct {
Data map[string]interface{}
}
FilteredJsonMessage output once we've filtered the myrtea fields from the kafka messages
func (FilteredJsonMessage) String ¶
func (msg FilteredJsonMessage) String() string
type Group ¶
type Group struct { Condition string `json:"condition,omitempty"` FieldReplace []string `json:"fieldReplace,omitempty"` FieldReplaceIfMissing []string `json:"fieldReplaceIfMissing,omitempty"` FieldMerge []string `json:"fieldMerge,omitempty"` FieldMath []FieldMath `json:"fieldMath,omitempty"` FieldKeepLatest []string `json:"fieldKeepLatest,omitempty"` FieldKeepEarliest []string `json:"fieldKeepEarliest,omitempty"` }
Group allows to group un set of merge fields and to define an optional condition to applay the merge fields
type JSONMapper ¶
type JSONMapper struct {
// contains filtered or unexported fields
}
JSONMapper :
func (JSONMapper) FilterDocument ¶
func (mapper JSONMapper) FilterDocument(msg Message) (bool, string)
func (JSONMapper) MapToDocument ¶
func (mapper JSONMapper) MapToDocument(msg Message) (FilteredJsonMessage, error)
MapAvroToDocument :
type JSONMapperConfigItem ¶
type JSONMapperConfigItem struct { Mandatory bool FieldType string DefaultValue interface{} DateFormat string Paths [][]string OtherPaths [][]string ArrayPath []string Separator string }
JSONMapperConfigItem :
type JSONMapperFilterItem ¶
type KafkaMessage ¶
type KafkaMessage struct {
Data []byte
}
KafkaMessage ...
func (KafkaMessage) GetData ¶
func (kMessage KafkaMessage) GetData() []byte
GetData Getter for the message data
func (KafkaMessage) String ¶
func (kMessage KafkaMessage) String() string
type KafkaSource ¶
type KafkaSource struct { Topic string Consumer *cluster.Consumer OutChanel chan Message Signals chan os.Signal }
func NewKafkaSource ¶
func (*KafkaSource) Close ¶
func (src *KafkaSource) Close() error
func (*KafkaSource) Init ¶
func (src *KafkaSource) Init() error
func (*KafkaSource) Run ¶
func (src *KafkaSource) Run() error
type MessageWithOptions ¶
MessageWithOptions output once we've filtered the myrtea fields from the kafka messages
func (MessageWithOptions) String ¶
func (msg MessageWithOptions) String() string
type Mode ¶
type Mode int
Mode ...
func (Mode) MarshalJSON ¶
MarshalJSON marshals the enum as a quoted json string
func (*Mode) UnmarshalJSON ¶
UnmarshalJSON unmashals a quoted json string to the enum value
type Transformer ¶
Transformer ..