Documentation ¶
Index ¶
- Constants
- func GetFuncName() string
- type Decoder
- type JSONDecoder
- type Message
- type Parser
- func (mp *Parser) CheckPrimaryKeys(destination sink.Destination, row client.Row) error
- func (mp *Parser) CheckTimeColumnExistence(schema *metaCom.Table, columnDict map[string]int, destination sink.Destination, ...) error
- func (mp *Parser) IsMessageValid(msg map[string]interface{}, destination sink.Destination) error
- func (mp *Parser) ParseMessage(msg map[string]interface{}, destination sink.Destination) (client.Row, error)
- type StringMessage
Constants ¶
const ( // MsgPrefix is prefix keyword of message body MsgPrefix = "msg" // MsgMetaDataUUID is message metadata uuid keyword MsgMetaDataUUID = "uuid" // MsgMetaDataTS is message metadata timestamp keyword MsgMetaDataTS = "ts" )
Variables ¶
This section is empty.
Functions ¶
func GetFuncName ¶
func GetFuncName() string
GetFuncName get the function name of the calling function
Types ¶
type Decoder ¶
type Decoder interface { // DecodeMsg will decode the given message into out variable DecodeMsg(msg consumer.Message) (*Message, error) }
Decoder is a interface that Kafka message decoders
func NewDefaultDecoder ¶
func NewDefaultDecoder( jobConfig *rules.JobConfig, serviceConfig config.ServiceConfig) (decoder Decoder, err error)
NewDefaultDecoder will initialize the json decoder based on the job type
type JSONDecoder ¶
type JSONDecoder struct{}
JSONDecoder is an implementation of Decoder interface for identity decoder
type Message ¶
type Message struct { // MsgInSubTS is the timestamp when the message is consumed by the subscriber MsgInSubTS time.Time // MsgMetaDataTS is defined in encoder/decoder metadata MsgMetaDataTS time.Time // RawMessage is encoded message RawMessage consumer.Message // DecodedMessage is decoded message DecodedMessage map[string]interface{} }
Message contains raw message read from Kafka and the decoded message
type Parser ¶
type Parser struct { // ServiceConfig is ares-subscriber configure ServiceConfig config.ServiceConfig // JobName is job name JobName string // Cluster is ares cluster name Cluster string // destinations each message will be parsed and written into Destination sink.Destination // Transformations are keyed on the output column name Transformations map[string]*rules.TransformationConfig // contains filtered or unexported fields }
Parser holds all resources needed to parse one message into one or multiple row objects with respect to different destinations
func NewParser ¶
func NewParser(jobConfig *rules.JobConfig, serviceConfig config.ServiceConfig) *Parser
NewParser will create a Parser for given JobConfig
func (*Parser) CheckPrimaryKeys ¶
CheckPrimaryKeys returns error if the value of primary key column is nil
func (*Parser) CheckTimeColumnExistence ¶
func (mp *Parser) CheckTimeColumnExistence(schema *metaCom.Table, columnDict map[string]int, destination sink.Destination, row client.Row) error
CheckTimeColumnExistence checks if time column is missing for fact table
func (*Parser) IsMessageValid ¶
func (mp *Parser) IsMessageValid(msg map[string]interface{}, destination sink.Destination) error
IsMessageValid checks if the message is valid
func (*Parser) ParseMessage ¶
func (mp *Parser) ParseMessage(msg map[string]interface{}, destination sink.Destination) (client.Row, error)
ParseMessage will parse given message to fit the destination
type StringMessage ¶
type StringMessage struct {
// contains filtered or unexported fields
}
stringMessage is an implementation of Message interface for testing.
func NewStringMessage ¶
func NewStringMessage(topic, msg string) *StringMessage
func (*StringMessage) Ack ¶
func (m *StringMessage) Ack()
func (*StringMessage) Cluster ¶
func (m *StringMessage) Cluster() string
func (*StringMessage) Key ¶
func (m *StringMessage) Key() []byte
func (*StringMessage) Nack ¶
func (m *StringMessage) Nack()
func (*StringMessage) Offset ¶
func (m *StringMessage) Offset() int64
func (*StringMessage) Partition ¶
func (m *StringMessage) Partition() int32
func (*StringMessage) Topic ¶
func (m *StringMessage) Topic() string
func (*StringMessage) Value ¶
func (m *StringMessage) Value() []byte