parsers

package
v0.0.0-rc12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 2, 2024 License: Apache-2.0 Imports: 17 Imported by: 0

README

Parsers

Parser configs

Parser configuratios (parser_config) can be of two types:

  • lb (logbroker)

  • common (kafka/yds/eventhub)

This is because historically the lb-source had one set of parsers (five of them), and the other source queues had only json.

And even the json parser, which was present in both lb and common, contained more parsing settings for lb-source.

So the parser could be in the lb-source, it could be in the other source queues, it could be in both, and the lb-source could contain one set of settings and the other source queues could contain a different set of settings.

For example:

  • A tskv-parser is only present in lb-source

  • A json-parser is present both in the lb-source and other source queues; in the lb-source, the json has more settings.

So we arrive at a scheme where the parser is a set of the following entities:

  • A separate set of settings (parser_config) for each type of source: lb/common

  • A single parser, which can be initialized by any parser_config.

Naming conventions

Adding a new parser is done in such a way that you don't have to think about things like:

  • How it will be stored in the source queue models

  • How the dispatching works between proto-models and your parser_config.

This magic is implemented through reflection, and you'll have to use the following naming conventions when describing your parser:

  • parser_config should be named according to the template:

    • for common: parser_config_%PARSER_NAME%_common.go - and should implement the CommonParserConfig interface

    • for lb: parser_config_%PARSER_NAME%_lb.go - and should implement the LbParserConfig interface

  • The parser itself should be called parser_%PARSER_NAME%.go - and should implement the AbstractParser interface

There should be a constructor function for the parser, for example:

func NewParserAuditTrailsV1(inWrapped interface{}, topicName string, sniff bool, logger log.Logger, registry *stats.SourceStats) (registrylib.AbstractParser, error) {

Such a constructor function takes any parser_config as its first parameter, and should be able to construct a parser from any of its parser_configs.

To register parser and its parser_config, you need to call the registrylib.RegisterParser function in init() in the parser file:

func init() {
    registrylib.RegisterParser(
        NewParserAuditTrailsV1,
        []registrylib.AbstractParserConfig{new(ParserConfigAuditTrailsV1Common)},
    )
}

Where we pass:

  • A parser constructor function

  • A parser_config array

And for this to be connected to the assembly, you need to be hooked into pkg/parsers/registry/registry.go.

For example, you can check how parsers are declared: tskv/json

Recommendation - since the parser directory already contains many files and entities - if the parsing engine itself takes more than one file, put it in the /engine subdirectory.

Step-by-step alogrith to add a new parser

  • For each parser_config type (lb/common), you describe UI proto-models (transfer_manager/go/proto/api/console/form/parsers.proto) add them to oneof in ParserConfigLb/ParserConfinCommon (I suggest you forget about API proto-model until they make this api more usable)

  • Create a subdirectory in pkg/parsers/registry - for example, myparser

  • For each type of parser_config (lb / common) you create parser_config (and tests) - for example, parser_config_myparser_lb.go, parser_config_myparser_common.go (converters to proto API can be filled with return nil, nil)

  • Create parser description (for example, parser_myparser.go) and add init() function with registrylib.RegisterParser call

  • Add import to this parser in pkg/parsers/registry/registry.go

  • Add it to pkg/parsers/registrylib/canon/unit_test.go - if your parser gets there, then it will get everywhere you need.

Documentation

Index

Constants

View Source
const (
	SyntheticTimestampCol  = "_timestamp"
	SyntheticPartitionCol  = "_partition"
	SyntheticOffsetCol     = "_offset"
	SyntheticIdxCol        = "_idx"
	SystemLbCtimeCol       = "_lb_ctime"
	SystemLbWtimeCol       = "_lb_wtime"
	SystemLbExtraColPrefix = "_lb_extra_"
)

Variables

View Source
var (
	ErrParserColumns = []string{"_partition", "_offset", "_error", "data"}
	ErrParserSchema  = abstract.NewTableSchema([]abstract.ColSchema{
		{ColumnName: "_partition", DataType: string(ytschema.TypeString), PrimaryKey: true},
		{ColumnName: "_offset", DataType: string(ytschema.TypeUint64), PrimaryKey: true},
		{ColumnName: "_error", DataType: string(ytschema.TypeBytes)},
		{ColumnName: "data", DataType: string(ytschema.TypeBytes)},
	})
)

Functions

func ExtractErrorFromUnparsed

func ExtractErrorFromUnparsed(unparsed abstract.ChangeItem) string

func ExtractUnparsed

func ExtractUnparsed(items []abstract.ChangeItem) (res []abstract.ChangeItem)

func GetParserNameByMap

func GetParserNameByMap(parserConfig map[string]interface{}) string

func IsThisParserConfig

func IsThisParserConfig(parserConfig map[string]interface{}, config interface{}) bool

func IsUnparsed

func IsUnparsed(item abstract.ChangeItem) bool

func KnownParsers

func KnownParsers() []string

func KnownParsersConfigs

func KnownParsersConfigs() []string

func ParserConfigNameByStruct

func ParserConfigNameByStruct(in interface{}) string

func ParserConfigStructToMap

func ParserConfigStructToMap(in AbstractParserConfig) (map[string]interface{}, error)

func Register

func Register(foo parserFactory, configs []AbstractParserConfig)

func SchemaByFieldsAndResource

func SchemaByFieldsAndResource(logger log.Logger, res resources.AbstractResources, fields []abstract.ColSchema, schemaResourceName string) ([]abstract.ColSchema, error)

func ToMap

func ToMap(parserName string, in interface{}) (map[string]interface{}, error)

Types

type AbstractParserConfig

type AbstractParserConfig interface {
	IsNewParserConfig()
	IsAppendOnly() bool
	Validate() error
}

func KnownAbstractParserConfigs

func KnownAbstractParserConfigs() []AbstractParserConfig

func ParserConfigMapToStruct

func ParserConfigMapToStruct(in map[string]interface{}) (AbstractParserConfig, error)

type Message

type Message struct {
	// Offset is server sequence of message in topic. Must be monotone growing.
	Offset uint64
	// Key is an uniq identifier of sequence
	Key []byte
	// Value actual data
	Value []byte
	// CreateTime when data was created on client (if presented)
	CreateTime time.Time
	// WriteTime when data was written to queue (if presented)
	WriteTime time.Time
	// Headers lables attached to read message
	Headers map[string]string
	// Deprecated: SeqNo is client set mark of message. Must be growing.
	// 	set it to 0 in new code, prefer using offset
	SeqNo uint64
}

Message is struct describing incoming message

type MessageBatch

type MessageBatch struct {
	Topic     string
	Partition uint32
	Messages  []Message
}

MessageBatch is group of messages.

type Parser

type Parser interface {
	Do(msg Message, partition abstract.Partition) []abstract.ChangeItem
	DoBatch(batch MessageBatch) []abstract.ChangeItem
}

func NewParserFromMap

func NewParserFromMap(in map[string]interface{}, sniff bool, logger log.Logger, stats *stats.SourceStats) (Parser, error)

func NewParserFromParserConfig

func NewParserFromParserConfig(parserConfig AbstractParserConfig, sniff bool, logger log.Logger, stats *stats.SourceStats) (Parser, error)

func WithResource

func WithResource(parser Parser, res resources.AbstractResources) Parser

type ResourceableParser

type ResourceableParser struct {
	// contains filtered or unexported fields
}

func (*ResourceableParser) Do

func (*ResourceableParser) DoBatch

func (p *ResourceableParser) DoBatch(batch MessageBatch) []abstract.ChangeItem

func (*ResourceableParser) ResourcesObj

func (*ResourceableParser) Unwrap

func (p *ResourceableParser) Unwrap() Parser

type WrappedParser

type WrappedParser interface {
	Parser
	Unwrap() Parser
}

WrappedParser parser can be layered by wrapping them in extra layers. For wrapped parsers we should add extra method for extracting actual parser

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL