Documentation ¶
Index ¶
- Constants
- Variables
- func ExtractErrorFromUnparsed(unparsed abstract.ChangeItem) string
- func ExtractUnparsed(items []abstract.ChangeItem) (res []abstract.ChangeItem)
- func GetParserNameByMap(parserConfig map[string]interface{}) string
- func IsThisParserConfig(parserConfig map[string]interface{}, config interface{}) bool
- func IsUnparsed(item abstract.ChangeItem) bool
- func KnownParsers() []string
- func KnownParsersConfigs() []string
- func ParserConfigNameByStruct(in interface{}) string
- func ParserConfigStructToMap(in AbstractParserConfig) (map[string]interface{}, error)
- func Register(foo parserFactory, configs []AbstractParserConfig)
- func SchemaByFieldsAndResource(logger log.Logger, res resources.AbstractResources, ...) ([]abstract.ColSchema, error)
- func ToMap(parserName string, in interface{}) (map[string]interface{}, error)
- type AbstractParserConfig
- type Message
- type MessageBatch
- type Parser
- type ResourceableParser
- type WrappedParser
Constants ¶
View Source
const ( SyntheticTimestampCol = "_timestamp" SyntheticPartitionCol = "_partition" SyntheticOffsetCol = "_offset" SyntheticIdxCol = "_idx" SystemLbCtimeCol = "_lb_ctime" SystemLbWtimeCol = "_lb_wtime" SystemLbExtraColPrefix = "_lb_extra_" )
Variables ¶
View Source
var ( ErrParserColumns = []string{"_partition", "_offset", "_error", "data"} ErrParserSchema = abstract.NewTableSchema([]abstract.ColSchema{ {ColumnName: "_partition", DataType: string(ytschema.TypeString), PrimaryKey: true}, {ColumnName: "_offset", DataType: string(ytschema.TypeUint64), PrimaryKey: true}, {ColumnName: "_error", DataType: string(ytschema.TypeBytes)}, {ColumnName: "data", DataType: string(ytschema.TypeBytes)}, }) )
Functions ¶
func ExtractErrorFromUnparsed ¶
func ExtractErrorFromUnparsed(unparsed abstract.ChangeItem) string
func ExtractUnparsed ¶
func ExtractUnparsed(items []abstract.ChangeItem) (res []abstract.ChangeItem)
func GetParserNameByMap ¶
func IsThisParserConfig ¶
func IsUnparsed ¶
func IsUnparsed(item abstract.ChangeItem) bool
func KnownParsers ¶
func KnownParsers() []string
func KnownParsersConfigs ¶
func KnownParsersConfigs() []string
func ParserConfigNameByStruct ¶
func ParserConfigNameByStruct(in interface{}) string
func ParserConfigStructToMap ¶
func ParserConfigStructToMap(in AbstractParserConfig) (map[string]interface{}, error)
func Register ¶
func Register(foo parserFactory, configs []AbstractParserConfig)
Types ¶
type AbstractParserConfig ¶
func KnownAbstractParserConfigs ¶
func KnownAbstractParserConfigs() []AbstractParserConfig
func ParserConfigMapToStruct ¶
func ParserConfigMapToStruct(in map[string]interface{}) (AbstractParserConfig, error)
type Message ¶
type Message struct { // Offset is server sequence of message in topic. Must be monotone growing. Offset uint64 // Key is an uniq identifier of sequence Key []byte // Value actual data Value []byte // CreateTime when data was created on client (if presented) CreateTime time.Time // WriteTime when data was written to queue (if presented) WriteTime time.Time // Headers lables attached to read message Headers map[string]string // Deprecated: SeqNo is client set mark of message. Must be growing. // set it to 0 in new code, prefer using offset SeqNo uint64 }
Message is struct describing incoming message
type MessageBatch ¶
MessageBatch is group of messages.
type Parser ¶
type Parser interface { Do(msg Message, partition abstract.Partition) []abstract.ChangeItem DoBatch(batch MessageBatch) []abstract.ChangeItem }
func NewParserFromMap ¶
func NewParserFromParserConfig ¶
func NewParserFromParserConfig(parserConfig AbstractParserConfig, sniff bool, logger log.Logger, stats *stats.SourceStats) (Parser, error)
func WithResource ¶
func WithResource(parser Parser, res resources.AbstractResources) Parser
type ResourceableParser ¶
type ResourceableParser struct {
// contains filtered or unexported fields
}
func (*ResourceableParser) Do ¶
func (p *ResourceableParser) Do(msg Message, partition abstract.Partition) []abstract.ChangeItem
func (*ResourceableParser) DoBatch ¶
func (p *ResourceableParser) DoBatch(batch MessageBatch) []abstract.ChangeItem
func (*ResourceableParser) ResourcesObj ¶
func (p *ResourceableParser) ResourcesObj() resources.AbstractResources
func (*ResourceableParser) Unwrap ¶
func (p *ResourceableParser) Unwrap() Parser
type WrappedParser ¶
WrappedParser parser can be layered by wrapping them in extra layers. For wrapped parsers we should add extra method for extracting actual parser
Source Files ¶
Click to show internal directories.
Click to hide internal directories.