generic

package
v0.0.0-rc1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 16, 2024 License: Apache-2.0 Imports: 29 Imported by: 0

Documentation

Index

Constants

View Source
const ColNameIdx = "_idx"
View Source
const ColNameOffset = "_offset"
View Source
const ColNamePartition = "_partition"
View Source
const ColNameTimestamp = "_timestamp"

Variables

View Source
var (
	UnparsedCols = cols(UnparsedSchema.Columns())
)
View Source
var (
	UnparsedSchema = abstract.NewTableSchema([]abstract.ColSchema{
		{
			ColumnName: ColNameTimestamp,
			DataType:   schema.TypeTimestamp.String(),
			PrimaryKey: true,
			Required:   true,
		},
		{
			ColumnName: ColNamePartition,
			DataType:   schema.TypeBytes.String(),
			PrimaryKey: true,
			Required:   true,
		},
		{
			ColumnName: ColNameOffset,
			DataType:   schema.TypeUint64.String(),
			PrimaryKey: true,
			Required:   true,
		},
		{
			ColumnName: ColNameIdx,
			DataType:   schema.TypeUint32.String(),
			PrimaryKey: true,
			Required:   true,
		},
		{
			ColumnName: "unparsed_row",
			DataType:   schema.TypeBytes.String(),
		},
		{
			ColumnName: "reason",
			DataType:   schema.TypeBytes.String(),
		},
	})
)

Functions

func ElemIDX

func ElemIDX(columns []string) int

func IsGenericUnparsedSchema

func IsGenericUnparsedSchema(schema *abstract.TableSchema) bool

func NewUnparsed

func NewUnparsed(partition abstract.Partition, name, line, reason string, idx int, offset uint64, writeTime time.Time) abstract.ChangeItem

func OffsetIDX

func OffsetIDX(columns []string) int

func PartitionIDX

func PartitionIDX(columns []string) int

func TableSplitter

func TableSplitter(sourceName string, columns []string, item map[string]interface{}, columnValues []interface{}, auxFieldsToIndex map[string]int) string

func TimestampIDX

func TimestampIDX(columns []string) int

Types

type AuxParserOpts

type AuxParserOpts struct {
	Topic string

	// if true - add into schema (and add value) fields _timestamp/_partition/_offset/_idx
	// firstly needed only for cloudLogging parser - bcs these fields is unwanted there, and there was no way turn it off
	AddDedupeKeys bool
	// if true - if there are no pkey in changeItem - mark _timestamp/_partition/_offset/_idx as 'system' key
	MarkDedupeKeysAsSystem bool

	// if true - add into schema fields: _topic
	// if true - add into values: _lb_ctime, _lb_wtime, _lb_extra_%v
	AddSystemColumns bool

	// if true - fill value from 'msg.Headers["logtype"]'
	AddTopicColumn bool

	// if true - add into schema (and add value) field '_rest'
	AddRest bool

	TimeField     *abstract.TimestampCol
	InferTimeZone bool

	NullKeysAllowed   bool
	DropUnparsed      bool
	MaskSecrets       bool
	IgnoreColumnPaths bool
	TableSplitter     *abstract.TableSplitter
	Sniff             bool

	// if true  - use json.Number to parse numbers in any
	// if false - use float64 to parse numbers in any
	UseNumbersInAny bool

	// if true - unescape string values:
	//		for tskv - unescape special symbols in string values
	//		for json - use repeatedly json.Unmarshal(val) for strings if it possible
	UnescapeStringValues bool

	// UnpackBytesBase64 will try to parse `bytes`-typed column as base64 encoded byte array
	UnpackBytesBase64 bool
}

type GenericParser

type GenericParser struct {
	// contains filtered or unexported fields
}

func NewGenericParser

func NewGenericParser(cfg ParserConfig, fields []abstract.ColSchema, logger log.Logger, registry *stats.SourceStats) *GenericParser

func (*GenericParser) Do

func (*GenericParser) DoBatch

func (*GenericParser) Name

func (p *GenericParser) Name() string

func (*GenericParser) Opts

func (p *GenericParser) Opts() AuxParserOpts

func (*GenericParser) Parse

func (p *GenericParser) Parse(msg parsers.Message, partition abstract.Partition) base.EventBatch

func (*GenericParser) ParseBatch

func (p *GenericParser) ParseBatch(batch parsers.MessageBatch) base.EventBatch

func (*GenericParser) ParseVal

func (p *GenericParser) ParseVal(v interface{}, typ string) (interface{}, error)

func (*GenericParser) ResultSchema

func (p *GenericParser) ResultSchema() *abstract.TableSchema

func (*GenericParser) SetTopic

func (p *GenericParser) SetTopic(topicName string)

func (*GenericParser) TableSplitter

func (p *GenericParser) TableSplitter(sourceName string, item map[string]interface{}, columnValues []interface{}) string

func (*GenericParser) Unmarshal

func (p *GenericParser) Unmarshal(line string, item *map[string]interface{}) error

type GenericParserConfig

type GenericParserConfig struct {
	Format             string
	SchemaResourceName string
	Fields             []abstract.ColSchema

	AuxOpts AuxParserOpts
}

type LogfellerParserConfig

type LogfellerParserConfig struct {
	ParserName       string
	SplitterName     string
	OverriddenFields []abstract.ColSchema

	AuxOpts AuxParserOpts
}

type ParserConfig

type ParserConfig interface {
	// contains filtered or unexported methods
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL