Documentation ¶
Index ¶
- func DoubleUnescapeUnicode(s string) ([]byte, error)
- func FilterHeaders(filters []FilterHeaderOption, headers []*sarama.RecordHeader) (bool, string)
- type AvroToJSONTransformer
- type BatchSink
- type BulkIngestRequest
- type Config
- type ConsumerProcessor
- type DefaultConsumer
- type DefaultMultiConsumer
- type Document
- type FieldMath
- type FilterHeaderOption
- type FilteredJsonMessage
- type FormatToBIRs
- type Group
- type JSONMapper
- type JSONMapperConfigItem
- type JSONMapperFilterItem
- type JSONMapperJsoniter
- type KafkaMessage
- type Lookup
- type Mapper
- type Message
- type MessageWithOptions
- type Mode
- type Processor
- type SaramaDebugLogger
- type SaramaLogger
- type Sink
- type Transformer
- type TypedDataMessage
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DoubleUnescapeUnicode ¶
DoubleUnescapeUnicode is a special function to extract avro binaries from a JSON encoded string This function has been built for a very specific usage and may not works on other messages
func FilterHeaders ¶ added in v4.2.1
func FilterHeaders(filters []FilterHeaderOption, headers []*sarama.RecordHeader) (bool, string)
Types ¶
type AvroToJSONTransformer ¶
type AvroToJSONTransformer struct {
// contains filtered or unexported fields
}
AvroToJSONTransformer :
func NewAvroToJSONTransformer ¶
func NewAvroToJSONTransformer(schemaRegistryEndpoint string, ttlCacheDuration time.Duration) (*AvroToJSONTransformer, error)
New transformer constructor TODO : Manage multiple schemaRegistryEndpoint ? In case of server failure ?
func (AvroToJSONTransformer) AvroBinaryToNative ¶
func (transformer AvroToJSONTransformer) AvroBinaryToNative(avroBinary []byte) (interface{}, error)
AvroBinaryToNative :
func (AvroToJSONTransformer) AvroBinaryToTextual ¶
func (transformer AvroToJSONTransformer) AvroBinaryToTextual(avroBinary []byte) ([]byte, error)
AvroBinaryToTextual :
type BatchSink ¶
type BatchSink struct { TargetURL string Send chan Message Client *retryablehttp.Client BufferSize int FlushTimeout time.Duration FormatToBIRs FormatToBIRs // TODO: Change to be more generic ? (sending []byte or interface{}) DryRun bool }
BatchSink ..
func NewBatchSink ¶
func NewBatchSink(targetURL string, client *retryablehttp.Client, bufferSize int, flushTimeout time.Duration, formatToBIRs FormatToBIRs, dryRun bool) *BatchSink
NewBatchSink constructor for BatchSink
func (*BatchSink) AddMessageToQueue ¶
func (*BatchSink) SendToIngester ¶
func (sink *BatchSink) SendToIngester(ctx context.Context, bir *BulkIngestRequest) error
type BulkIngestRequest ¶
type Config ¶
type Config struct { Mode Mode `json:"mode"` ExistingAsMaster bool `json:"existingAsMaster"` Type string `json:"type,omitempty"` LinkKey string `json:"linkKey,omitempty"` Groups []Group `json:"groups,omitempty"` }
Config wraps all rules for document merging
type ConsumerProcessor ¶ added in v4.2.0
type ConsumerProcessor interface {
Process(message *sarama.ConsumerMessage)
}
type DefaultConsumer ¶ added in v4.2.0
type DefaultConsumer struct { Ready chan bool // contains filtered or unexported fields }
DefaultConsumer represents a Sarama consumer group consumer
func NewDefaultConsumer ¶ added in v4.2.0
func NewDefaultConsumer(processor ConsumerProcessor) DefaultConsumer
func (*DefaultConsumer) Cleanup ¶ added in v4.2.0
func (consumer *DefaultConsumer) Cleanup(sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*DefaultConsumer) ConsumeClaim ¶ added in v4.2.0
func (consumer *DefaultConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (*DefaultConsumer) Setup ¶ added in v4.2.0
func (consumer *DefaultConsumer) Setup(sarama.ConsumerGroupSession) error
Setup is run at the beginning of a new session, before ConsumeClaim
type DefaultMultiConsumer ¶ added in v4.2.0
type DefaultMultiConsumer struct { Ready chan bool // contains filtered or unexported fields }
DefaultMultiConsumer represents a Sarama consumer group consumer
func NewDefaultMultiConsumer ¶ added in v4.2.0
func NewDefaultMultiConsumer(processors map[string]ConsumerProcessor) DefaultMultiConsumer
func (*DefaultMultiConsumer) Cleanup ¶ added in v4.2.0
func (consumer *DefaultMultiConsumer) Cleanup(sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*DefaultMultiConsumer) ConsumeClaim ¶ added in v4.2.0
func (consumer *DefaultMultiConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (*DefaultMultiConsumer) Setup ¶ added in v4.2.0
func (consumer *DefaultMultiConsumer) Setup(sarama.ConsumerGroupSession) error
Setup is run at the beginning of a new session, before ConsumeClaim
type Document ¶
type Document struct { ID string `json:"id"` Index string `json:"index"` IndexType string `json:"type"` Source interface{} `json:"source"` }
Document represent an es document
type FieldMath ¶
type FieldMath struct { Expression string `json:"expression"` OutputField string `json:"outputField"` }
FieldMath specify a merge rule using a math expression
type FilterHeaderOption ¶ added in v4.2.1
type FilteredJsonMessage ¶
type FilteredJsonMessage struct {
Data map[string]interface{}
}
FilteredJsonMessage output once we've filtered the myrtea fields from the kafka messages
func (FilteredJsonMessage) String ¶
func (msg FilteredJsonMessage) String() string
type FormatToBIRs ¶ added in v4.1.6
type FormatToBIRs func([]Message) []*BulkIngestRequest
type Group ¶
type Group struct { Condition string `json:"condition,omitempty"` FieldReplace []string `json:"fieldReplace,omitempty"` FieldReplaceIfMissing []string `json:"fieldReplaceIfMissing,omitempty"` FieldMerge []string `json:"fieldMerge,omitempty"` FieldMath []FieldMath `json:"fieldMath,omitempty"` FieldKeepLatest []string `json:"fieldKeepLatest,omitempty"` FieldKeepEarliest []string `json:"fieldKeepEarliest,omitempty"` FieldForceUpdate []string `json:"fieldForceUpdate,omitempty"` }
Group allows to group un set of merge fields and to define an optional condition to applay the merge fields
type JSONMapper ¶
type JSONMapper struct {
// contains filtered or unexported fields
}
JSONMapper :
func (JSONMapper) FilterDocument ¶
func (mapper JSONMapper) FilterDocument(msg Message) (bool, string)
func (JSONMapper) MapToDocument ¶
func (mapper JSONMapper) MapToDocument(msg Message) (FilteredJsonMessage, error)
MapAvroToDocument :
type JSONMapperConfigItem ¶
type JSONMapperConfigItem struct { Mandatory bool FieldType string DefaultValue interface{} DateFormat string Paths [][]string OtherPaths [][]string ArrayPath []string Separator string }
JSONMapperConfigItem :
type JSONMapperFilterItem ¶
type JSONMapperJsoniter ¶ added in v4.2.4
type JSONMapperJsoniter struct {
// contains filtered or unexported fields
}
JSONMapperJsoniter :
func NewJSONMapperJsoniter ¶ added in v4.2.4
func NewJSONMapperJsoniter(name, path string) (*JSONMapperJsoniter, error)
NewJSONMapperJsoniter :
func (JSONMapperJsoniter) FilterDocument ¶ added in v4.2.4
func (mapper JSONMapperJsoniter) FilterDocument(msg Message) (bool, string)
func (JSONMapperJsoniter) MapToDocument ¶ added in v4.2.4
func (mapper JSONMapperJsoniter) MapToDocument(msg Message) (Message, error)
MapAvroToDocument :
type KafkaMessage ¶
type KafkaMessage struct {
Data []byte
}
KafkaMessage ...
func (KafkaMessage) GetData ¶
func (kMessage KafkaMessage) GetData() []byte
GetData Getter for the message data
func (KafkaMessage) String ¶
func (kMessage KafkaMessage) String() string
type MessageWithOptions ¶
MessageWithOptions output once we've filtered the myrtea fields from the kafka messages
func (MessageWithOptions) String ¶
func (msg MessageWithOptions) String() string
type Mode ¶
type Mode int
Mode ...
func (Mode) MarshalJSON ¶
MarshalJSON marshals the enum as a quoted json string
func (*Mode) UnmarshalJSON ¶
UnmarshalJSON unmashals a quoted json string to the enum value
type SaramaDebugLogger ¶ added in v4.2.0
type SaramaDebugLogger struct {
// contains filtered or unexported fields
}
SaramaDebugLogger wraps zap.Logger with the sarama.StdLogger interface
func NewSaramaDebugLogger ¶ added in v4.2.0
func NewSaramaDebugLogger(zl *zap.Logger) *SaramaDebugLogger
NewSaramaLogger initializes a new SaramaDebugLogger with a zap.Logger
func (*SaramaDebugLogger) Print ¶ added in v4.2.0
func (s *SaramaDebugLogger) Print(v ...interface{})
func (*SaramaDebugLogger) Printf ¶ added in v4.2.0
func (s *SaramaDebugLogger) Printf(format string, v ...interface{})
func (*SaramaDebugLogger) Println ¶ added in v4.2.0
func (s *SaramaDebugLogger) Println(v ...interface{})
type SaramaLogger ¶ added in v4.2.0
type SaramaLogger struct {
// contains filtered or unexported fields
}
SaramaLogger wraps zap.Logger with the sarama.StdLogger interface
func NewSaramaLogger ¶ added in v4.2.0
func NewSaramaLogger(zl *zap.Logger) *SaramaLogger
NewSaramaLogger initializes a new SaramaLogger with a zap.Logger
func (*SaramaLogger) Print ¶ added in v4.2.0
func (s *SaramaLogger) Print(v ...interface{})
func (*SaramaLogger) Printf ¶ added in v4.2.0
func (s *SaramaLogger) Printf(format string, v ...interface{})
func (*SaramaLogger) Println ¶ added in v4.2.0
func (s *SaramaLogger) Println(v ...interface{})
type Transformer ¶
Transformer ..