kafka

package
v0.0.0-...-948e253 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 25, 2021 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ClearPos

func ClearPos(sid int) error

ClearPos 清掉同步的信息

func FusionProcessing

func FusionProcessing(cfg *config.KafkaConfig, kafkaData []utility.ConsumerDataStruct) error

FusionProcessing 处理、封装数据,发送到kafka 中

func GetPos

func GetPos(sid int) (string, int)

GetPos 从zookeeper 中获取需要同步的位置

func InitKafkaProducer

func InitKafkaProducer(kafkaCFG *config.KafkaConfig) error

InitKafkaProducer 初始化kafka producer

Types

type Column

type Column struct {
	Name  string      `xml:"name,attr"`
	Value interface{} `xml:",chardata"`
}

Column column info

type Columns

type Columns struct {
	XMLName xml.Name `xml:"columns"`
	Column  []Column `xml:"column"`
}

Columns row information

type JSONKafkaMsg

type JSONKafkaMsg struct {
	SID       int                      `json:"sid"`
	DBName    string                   `json:"dbname"`
	TBName    string                   `json:"tbname"`
	FileName  string                   `json:"filename"`
	Position  uint32                   `json:"position"`
	EventTime uint32                   `json:"event_time"`
	Event     string                   `json:"event"`
	Columns   []map[string]interface{} `json:"columns"`
}

JSONKafkaMsg data format saved in kafka

type JSONKafkaMsgQuery

type JSONKafkaMsgQuery struct {
	SID       int                      `json:"sid"`
	DBName    string                   `json:"dbname"`
	TBName    string                   `json:"tbname"`
	FileName  string                   `json:"filename"`
	Position  uint32                   `json:"position"`
	EventTime uint32                   `json:"event_time"`
	Event     string                   `json:"event"`
	Query     string                   `json:"query"`
	Columns   []map[string]interface{} `json:"columns"`
}

JSONKafkaMsg data format saved in kafka (add query field for ddl)

type XMLkafkaMsg

type XMLkafkaMsg struct {
	XMLName   xml.Name  `xml:"contenet"`
	SID       int       `xml:"sid"`
	DBName    string    `xml:"dbname"`
	TBName    string    `xml:"tbname"`
	FileName  string    `xml:"filename"`
	Position  uint32    `xml:"position"`
	EventTime uint32    `xml:"event_time"`
	Event     string    `xml:"event"`
	Columns   []Columns `xml:"columns"`
}

XMLkafkaMsg data format saved in kafka

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL