consumers

package
v0.0.0-...-cb7d7c7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 14, 2020 License: MIT Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	BasePath    = "./oplog/" // 输出根目录
	FileMaxSize = 100
)
View Source
const (
	TimeoutCtx = 3 * time.Second
)

Variables

View Source
var (
	ConsumerMap = make(map[string]Consumer) // 下标为一个sync配置
)

Functions

func HandleData

func HandleData(key string, data *models.ChangeEvent)

统一处理消息

func NewElasticsearchConsumer

func NewElasticsearchConsumer(cfg *config.SyncConfig) error

NewElasticsearchConsumer 创建一个elasticsearch消费对象

func NewFileLogConsumer

func NewFileLogConsumer(cfg *config.SyncConfig) error

func NewMongoConsumer

func NewMongoConsumer(cfg *config.SyncConfig) error

NewMongoConsumer 创建一个mongo消费对象

func NewMysqlConsumer

func NewMysqlConsumer(cfg *config.SyncConfig, debug bool) error

Types

type Consumer

type Consumer interface {
	// 初始化连接
	InitClient(cfg *config.SyncConfig) error
	// 销毁连接
	Disconnect() error
	// 处理一条消息
	HandleData(data *models.ChangeEvent) error
	// 删除无用
	FilterField(collection string, document bson.M) error
}

type ElasticsearchConsumer

type ElasticsearchConsumer struct {
	Index string
	// contains filtered or unexported fields
}

elasticsearch目标数据落地

func (*ElasticsearchConsumer) Disconnect

func (ec *ElasticsearchConsumer) Disconnect() error

销毁连接

func (*ElasticsearchConsumer) FilterField

func (ec *ElasticsearchConsumer) FilterField(collection string, document bson.M) error

FilterField 删除无用

func (*ElasticsearchConsumer) HandleData

func (ec *ElasticsearchConsumer) HandleData(data *models.ChangeEvent) error

处理一条消息

func (*ElasticsearchConsumer) InitClient

func (ec *ElasticsearchConsumer) InitClient(cfg *config.SyncConfig) error

初始化连接

type FileLogConsumer

type FileLogConsumer struct {
	// contains filtered or unexported fields
}

func (*FileLogConsumer) Disconnect

func (fl *FileLogConsumer) Disconnect() error

销毁连接

func (*FileLogConsumer) FilterField

func (fl *FileLogConsumer) FilterField(collection string, document bson.M) error

FilterField 删除无用

func (*FileLogConsumer) HandleData

func (fl *FileLogConsumer) HandleData(data *models.ChangeEvent) error

处理一条消息

func (*FileLogConsumer) InitClient

func (fl *FileLogConsumer) InitClient(cfg *config.SyncConfig) error

初始化连接

type MongoConsumer

type MongoConsumer struct {
	// contains filtered or unexported fields
}

mogno目标数据落地

func (*MongoConsumer) Disconnect

func (mc *MongoConsumer) Disconnect() error

销毁连接

func (*MongoConsumer) FilterField

func (mc *MongoConsumer) FilterField(collection string, document bson.M) error

FilterField 删除无用

func (*MongoConsumer) HandleData

func (mc *MongoConsumer) HandleData(data *models.ChangeEvent) (err error)

处理一条消息

func (*MongoConsumer) InitClient

func (mc *MongoConsumer) InitClient(cfg *config.SyncConfig) error

初始化连接

type MysqlConsumer

type MysqlConsumer struct {
	// contains filtered or unexported fields
}

func (*MysqlConsumer) Disconnect

func (mc *MysqlConsumer) Disconnect() error

销毁连接

func (*MysqlConsumer) FilterField

func (mc *MysqlConsumer) FilterField(collection string, document bson.M) error

删除无用

func (*MysqlConsumer) HandleData

func (mc *MysqlConsumer) HandleData(data *models.ChangeEvent) (err error)

处理一条消息

func (*MysqlConsumer) InitClient

func (mc *MysqlConsumer) InitClient(cfg *config.SyncConfig) (err error)

初始化连接

type MysqlCount

type MysqlCount struct {
	CountTable int64 `gorm:"column:count_table" json:"count_table"`
}

MysqlCount 用于统计mysql数据行数

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL