types

package
v0.0.0-...-48e7155 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 1, 2024 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Index

Constants

View Source
const ConditionTypeAnd = "and" // 多条件与关系
View Source
const ConditionTypeOr = "or" // 多条件或关系
View Source
const DataSourceElasticSearch = "es" // es 类型数据源
View Source
const DataSourceMysql = "mysql" // mysql 类型数据源
View Source
const EventTypeDelete = "delete" // 删除事件
View Source
const EventTypeInsert = "insert" // 插入事件
View Source
const EventTypeUpdate = "update" // 更新事件
View Source
const InnerSyncTypeDefaultKey = "innerSyncTypeDefaultKey"
View Source
const ReaderTypeKafka = "kafka" // kafka 类型读取器
View Source
const ReaderTypeWeb = "web" // web 类型读取器
View Source
const SyncTypeCopy = "copy" // 可复制的字段全部拷贝到目标数据源作为一条新的记录
View Source
const SyncTypeInner = "inner" // 只允许复制一个字段,复制到目标数据源符合记录的某个字段,这个字段一定是一个数组
View Source
const SyncTypeJoin = "join" // 可复制的字段全部拷贝到目标数据源中符合条件的记录,作为这条记录的某个对象字段
View Source
const TimestampCreatedAt = "created_at" // 创建时间戳
View Source
const TimestampUpdatedAt = "updated_at" // 更新时间戳

Variables

This section is empty.

Functions

func NewSyncWaitGroup

func NewSyncWaitGroup() *syncWaitGroup

Types

type BinlogParams

type BinlogParams struct {
	EventId   string              `json:"event_id" binding:"required"` // 事件ID 唯一
	Database  string              `json:"database" binding:"required"` // 库名
	Table     string              `json:"table" binding:"required"`    // 表名
	EventAt   int64               `json:"ts" binding:"required"`       // 事件时间
	EventType string              `json:"type" binding:"required"`     // 事件类型
	IsDdl     bool                `json:"isDdl" binding:"omitempty"`   // 是否 ddl修改(ddl 修改不处理)
	Data      []map[string]string `json:"data" binding:"required"`     // 更新后数据 (全量数据,根据 canal: canal.instance.filter.regex 的字段规则,没有字段规则就是全量)
	Old       []map[string]string `json:"old" binding:"omitempty"`     // 更新前数据 (只存在被更新的字段)
	Source    interface{}         `json:"-" binding:"omitempty"`       // 原始数据
}

func (*BinlogParams) UnmarshalJSON

func (c *BinlogParams) UnmarshalJSON(bytes []byte) error

UnmarshalJSON 重写 json 解析方法,如果是更新事件,记录本次更新的字段

type DataFilterCondition

type DataFilterCondition struct {
	Column      string                           `json:"column,omitempty"`       // 字段名
	Operator    string                           `json:"operator,omitempty"`     // 运算符
	Value       string                           `json:"value,omitempty"`        // 值
	ValueColumn string                           `json:"value_column,omitempty"` // 值字段  value 和 value_column 同时只存在其中一个
	Children    map[string][]DataFilterCondition `json:"children,omitempty"`     // 子条件
}

DataFilterCondition 数据规则条件

type Filter

type Filter interface {
	InsertEventRecord(params *SyncParams, updatedColumns []string) error
	FilterColumns(params *SyncParams, columns []string) ([]string, error, bool)
}

type SyncParams

type SyncParams struct {
	Rule SyncRule          `json:"rule"` // 只读,不用指针传递
	Data map[string]string `json:"data"`
	Old  map[string]string `json:"old"`

	RealEventType string `json:"real_event_type"` // 和 BinlogParams 的 EventType 重复,用于记录真实执行同步的事件类型
	// contains filtered or unexported fields
}

SyncParams 同步参数 一条 sql 可能修改多条数据,每条数据都会拆分成不同都子任务, 每个子任务都有自己都 SyncParams 所以 SyncParams 不涉及并发

func NewSyncParams

func NewSyncParams(wg *syncWaitGroup, rule *SyncRule, data, old map[string]string, binLog *BinlogParams) *SyncParams

func (*SyncParams) Clone

func (s *SyncParams) Clone(eventType string) *SyncParams

Clone 克隆一个新的同步数据,只有事件类型不同 主要用户软删除字段更新时,更新事件并更为 删除/插入事件

func (*SyncParams) GetBingLogParams

func (s *SyncParams) GetBingLogParams() *BinlogParams

func (*SyncParams) GetIdentifyId

func (s *SyncParams) GetIdentifyId() string

GetIdentifyId 获取标识id

func (*SyncParams) GetJoinColumn

func (s *SyncParams) GetJoinColumn() string

GetJoinColumn 当 SyncType 等于 SyncTypeInner 时只同步一个字段, 暂时缓存起来 同一个 SyncParams 只会被一个协程使用, 所以不存在并发问题

func (*SyncParams) GetUpdateValues

func (s *SyncParams) GetUpdateValues(updatedColumns []string) interface{}

GetUpdateValues 获取当次更新的数据 格式: {"column": "value"}

func (*SyncParams) GetWg

func (s *SyncParams) GetWg() *syncWaitGroup

func (*SyncParams) IsPrimaryKeyUpdated

func (s *SyncParams) IsPrimaryKeyUpdated() bool

IsPrimaryKeyUpdated 判断主键 或关联字段是否更新

func (*SyncParams) MergeOldToData

func (s *SyncParams) MergeOldToData() map[string]string

MergeOldToData 获取这条记录 更新之前的所有数据

func (*SyncParams) Recycle

func (s *SyncParams) Recycle()

func (*SyncParams) SetBinLogParams

func (s *SyncParams) SetBinLogParams(params *BinlogParams)

type SyncRule

type SyncRule struct {
	Database          string                           `json:"database" yaml:"database"`                                       // 需要同步的库
	Table             string                           `json:"table" yaml:"table"`                                             // 需要同步的表
	PrimaryKey        string                           `json:"primary_key" yaml:"primary_key"`                                 // 来源表中主键名称
	LockColumns       []string                         `json:"lock_columns" yaml:"lock_columns"`                               // 加锁时 依赖的字段
	Columns           map[string]string                `json:"columns" yaml:"columns"`                                         // 字段映射表 local:target 格式
	SystemColumns     []string                         `json:"-" yaml:"-"`                                                     // 系统字段 (特殊逻辑)
	SoftDeleteField   string                           `json:"soft_delete_field,omitempty" yaml:"soft_delete_field,omitempty"` // 软删除字段名称 为空代表不支持软删除
	UnSoftDeleteValue string                           `json:"un_soft_delete_value" yaml:"un_soft_delete_value"`               // 未软删除值 SoftDeleteField 不为空且作为key获取到的值不相等及被软删除
	DataConditions    map[string][]DataFilterCondition `json:"data_conditions,omitempty" yaml:"data_conditions,omitempty"`     // 同步条件 key为 and或or  比对结果false 不同步
	TargetType        string                           `json:"-" yaml:"target_type"`                                           // 目标类型 mysql|es
	Target            string                           `json:"target" yaml:"target"`                                           // 目标 mysql:connect.database.table es:connect.index
	TargetDatabase    string                           `json:"-" yaml:"target_database"`
	TargetTable       string                           `json:"-" yaml:"target_table"`
	//Type              string                     `json:"type"`                 // 同步类型 stats:统计 sync:同步
	SyncType      string `json:"sync_type" yaml:"sync_type"`                                 // 具体同步或统计类型
	JoinFieldName string `json:"join_field_name,omitempty" yaml:"join_field_name,omitempty"` // 加入字段名 sync_type:join|inner 时存在
	//SyncConditions    []SyncCondition            `json:"sync_conditions"`      // 同步条件 只允许and条件
	TargetExtraParams map[string]string `json:"target_extra_params,omitempty" yaml:"target_extra_params,omitempty"` // 目标额外参数,常量同步时一起写入目标表
}

SyncRule 同步规则

func (*SyncRule) EvaluateFilterConditions

func (sr *SyncRule) EvaluateFilterConditions(data map[string]string) bool

EvaluateFilterConditions 判断是否符合同步条件

func (*SyncRule) UnmarshalJSON

func (sr *SyncRule) UnmarshalJSON(bytes []byte) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL