ruleparser

package
v0.0.0-...-948e253 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 25, 2021 License: MIT Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// TOAddRule 添加规则
	TOAddRule = 3
	// TODelRule 删除规则
	TODelRule = 4
	// RuleInitial 规则初始化,可以直接加载到内存中
	RuleInitial = 0
)
View Source
var FusionRules = map[string][]Rule{}

FusionRules 解析规则

View Source
var KafkaKeyMap = map[string]string{}

KafkaKeyMap key: db.tb v: mysql中kafka_key_columns, 针对多partition场景,将column的值写到topic的key

View Source
var KafkaTopicMap = map[string]string{}

KafkaTopicMap key: db.tb v: column_name 针对多topic场景, 表示此表需要多topic

View Source
var KafkaTopicRuleMap = map[string]string{}

KafkaTopicRuleMap key: db.tb.column_value v: topic 针对多topic场景,不同的column路由到不同的topic

View Source
var NeedKafka = 0

NeedKafka 表示是否将解析数据发送到kafka, 如果需要发送的kafka, 不对rule 规则进行校验,

Functions

func GetCanalFilterTables

func GetCanalFilterTables() (map[string][]string, []string, []string)

GetCanalFilterTables 组装需要同步表的信息

func MonitorRuleChange

func MonitorRuleChange(cfg *RuleConfig, toKafka int) (bool, string, string)

MonitorRuleChange 监听规则变化

func NewRuleParser

func NewRuleParser(cfg *RuleConfig) error

NewRuleParser 重数据库中获取redis 存储规则

func ResetOperationColumn

func ResetOperationColumn(cfg *RuleConfig) error

ResetOperationColumn 将operation 字段置成 initial state

Types

type KafkaTopicRule

type KafkaTopicRule struct {
	ColumnName string      `json:"column_name"`
	TopicRule  interface{} `json:"topic_rule"`
}

KafkaTopicRule 规则表中KafkaTopicRule

type Rule

type Rule struct {
	ID                 int    `db:"id"`
	SID                int    `db:"sid"`
	DB                 string `db:"db"`
	TB                 string `db:"tb"`
	RefreshType        int    `db:"refresh_type"`
	ISDump             int    `db:"is_dump"`
	ISCompress         int    `db:"is_compress"`
	CompressLevel      int    `db:"compress_level"`
	CompressColumns    string `db:"compress_columns"`
	ExpireTime         int    `db:"expire_time"`
	ExpireRange        int    `db:"expire_range"`
	ISFullColumn       int    `db:"is_full_column"`
	ReplicColumns      string `db:"replic_columns"`
	FilterRule         string `db:"filter_rule"`
	RedisKeyPrefix     string `db:"redis_key_prefix"`
	RedisKeyColumns    string `db:"redis_key_columns"`
	RedisKeyType       string `db:"redis_key_type"`
	ZSetColumn         string `db:"redis_zset_score_column"`
	OP                 int    `db:"operation"`
	KafkaKeyColumns    string `db:"kafka_key_columns"`
	KafkaTopicRule     string `db:"kafka_topic_rule"`
	DelayDel           int    `db:"delay_del"`
	DelayExpireTime    int    `db:"delay_expire_time"`
	KafkaISFullColumn  int    `db:"kafka_is_full_column"`
	KafkaReplicColumns string `db:"kafka_replic_columns"`
}

Rule 配置的同步规则

func GetRules

func GetRules(key string) []Rule

GetRules 获取规则

type RuleConfig

type RuleConfig struct {
	Addr     string `toml:"addr"`
	User     string `toml:"user"`
	Password string `toml:"password"`
	Port     int    `toml:"port"`
	SID      int    `toml:"sid"`
	Schema   string `toml:"schema"`
	Table    string `toml:"table"`
}

RuleConfig 在数据表中存储rule 规则

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL