Documentation ¶
Index ¶
- Constants
- Variables
- func ElemIDX(columns []string) int
- func IsGenericUnparsedSchema(schema *abstract.TableSchema) bool
- func NewUnparsed(partition abstract.Partition, name, line, reason string, idx int, ...) abstract.ChangeItem
- func OffsetIDX(columns []string) int
- func PartitionIDX(columns []string) int
- func TableSplitter(sourceName string, columns []string, item map[string]interface{}, ...) string
- func TimestampIDX(columns []string) int
- type AuxParserOpts
- type GenericParser
- func (p *GenericParser) Do(msg parsers.Message, partition abstract.Partition) []abstract.ChangeItem
- func (p *GenericParser) DoBatch(batch parsers.MessageBatch) []abstract.ChangeItem
- func (p *GenericParser) Name() string
- func (p *GenericParser) Opts() AuxParserOpts
- func (p *GenericParser) Parse(msg parsers.Message, partition abstract.Partition) base.EventBatch
- func (p *GenericParser) ParseBatch(batch parsers.MessageBatch) base.EventBatch
- func (p *GenericParser) ParseVal(v interface{}, typ string) (interface{}, error)
- func (p *GenericParser) ResultSchema() *abstract.TableSchema
- func (p *GenericParser) SetTopic(topicName string)
- func (p *GenericParser) TableSplitter(sourceName string, item map[string]interface{}, columnValues []interface{}) string
- func (p *GenericParser) Unmarshal(line string, item *map[string]interface{}) error
- type GenericParserConfig
- type LogfellerParserConfig
- type ParserConfig
Constants ¶
View Source
const ColNameIdx = "_idx"
View Source
const ColNameOffset = "_offset"
View Source
const ColNamePartition = "_partition"
View Source
const ColNameTimestamp = "_timestamp"
Variables ¶
View Source
var (
UnparsedCols = cols(UnparsedSchema.Columns())
)
View Source
var ( UnparsedSchema = abstract.NewTableSchema([]abstract.ColSchema{ { ColumnName: ColNameTimestamp, DataType: schema.TypeTimestamp.String(), PrimaryKey: true, Required: true, }, { ColumnName: ColNamePartition, DataType: schema.TypeBytes.String(), PrimaryKey: true, Required: true, }, { ColumnName: ColNameOffset, DataType: schema.TypeUint64.String(), PrimaryKey: true, Required: true, }, { ColumnName: ColNameIdx, DataType: schema.TypeUint32.String(), PrimaryKey: true, Required: true, }, { ColumnName: "unparsed_row", DataType: schema.TypeBytes.String(), }, { ColumnName: "reason", DataType: schema.TypeBytes.String(), }, }) )
Functions ¶
func IsGenericUnparsedSchema ¶
func IsGenericUnparsedSchema(schema *abstract.TableSchema) bool
func NewUnparsed ¶
func PartitionIDX ¶
func TableSplitter ¶
func TimestampIDX ¶
Types ¶
type AuxParserOpts ¶
type AuxParserOpts struct { Topic string // if true - add into schema (and add value) fields _timestamp/_partition/_offset/_idx // firstly needed only for cloudLogging parser - bcs these fields is unwanted there, and there was no way turn it off AddDedupeKeys bool // if true - if there are no pkey in changeItem - mark _timestamp/_partition/_offset/_idx as 'system' key MarkDedupeKeysAsSystem bool // if true - add into schema fields: _topic // if true - add into values: _lb_ctime, _lb_wtime, _lb_extra_%v AddSystemColumns bool // if true - fill value from 'msg.Headers["logtype"]' AddTopicColumn bool // if true - add into schema (and add value) field '_rest' AddRest bool TimeField *abstract.TimestampCol InferTimeZone bool NullKeysAllowed bool DropUnparsed bool MaskSecrets bool IgnoreColumnPaths bool TableSplitter *abstract.TableSplitter Sniff bool // if true - use json.Number to parse numbers in any // if false - use float64 to parse numbers in any UseNumbersInAny bool // if true - unescape string values: // for tskv - unescape special symbols in string values // for json - use repeatedly json.Unmarshal(val) for strings if it possible UnescapeStringValues bool // UnpackBytesBase64 will try to parse `bytes`-typed column as base64 encoded byte array UnpackBytesBase64 bool }
type GenericParser ¶
type GenericParser struct {
// contains filtered or unexported fields
}
func NewGenericParser ¶
func NewGenericParser(cfg ParserConfig, fields []abstract.ColSchema, logger log.Logger, registry *stats.SourceStats) *GenericParser
func (*GenericParser) Do ¶
func (p *GenericParser) Do(msg parsers.Message, partition abstract.Partition) []abstract.ChangeItem
func (*GenericParser) DoBatch ¶
func (p *GenericParser) DoBatch(batch parsers.MessageBatch) []abstract.ChangeItem
func (*GenericParser) Name ¶
func (p *GenericParser) Name() string
func (*GenericParser) Opts ¶
func (p *GenericParser) Opts() AuxParserOpts
func (*GenericParser) Parse ¶
func (p *GenericParser) Parse(msg parsers.Message, partition abstract.Partition) base.EventBatch
func (*GenericParser) ParseBatch ¶
func (p *GenericParser) ParseBatch(batch parsers.MessageBatch) base.EventBatch
func (*GenericParser) ParseVal ¶
func (p *GenericParser) ParseVal(v interface{}, typ string) (interface{}, error)
func (*GenericParser) ResultSchema ¶
func (p *GenericParser) ResultSchema() *abstract.TableSchema
func (*GenericParser) SetTopic ¶
func (p *GenericParser) SetTopic(topicName string)
func (*GenericParser) TableSplitter ¶
func (p *GenericParser) TableSplitter(sourceName string, item map[string]interface{}, columnValues []interface{}) string
type GenericParserConfig ¶
type GenericParserConfig struct { Format string SchemaResourceName string Fields []abstract.ColSchema AuxOpts AuxParserOpts }
type LogfellerParserConfig ¶
type LogfellerParserConfig struct { ParserName string SplitterName string OverriddenFields []abstract.ColSchema AuxOpts AuxParserOpts }
type ParserConfig ¶
type ParserConfig interface {
// contains filtered or unexported methods
}
Click to show internal directories.
Click to hide internal directories.