Documentation ¶
Index ¶
- type CheckPoint
- type LogCanal
- type ServiceCanal
- func (sc *ServiceCanal) Collect(ilogtail.Collector) error
- func (sc *ServiceCanal) Description() string
- func (sc *ServiceCanal) GetBinlogLatestPos() mysql.Position
- func (sc *ServiceCanal) Init(context ilogtail.Context) (int, error)
- func (sc *ServiceCanal) OnDDL(pos mysql.Position, e *replication.QueryEvent) error
- func (sc *ServiceCanal) OnGTID(s mysql.GTIDSet) error
- func (sc *ServiceCanal) OnPosSynced(pos mysql.Position, _ mysql.GTIDSet, force bool) error
- func (sc *ServiceCanal) OnRotate(r *replication.RotateEvent) error
- func (sc *ServiceCanal) OnRow(e *canal.RowsEvent) error
- func (sc *ServiceCanal) OnTableChanged(schema string, table string) error
- func (sc *ServiceCanal) OnXID(p mysql.Position) error
- func (sc *ServiceCanal) Start(c ilogtail.Collector) error
- func (sc *ServiceCanal) Stop() error
- func (sc *ServiceCanal) String() string
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ServiceCanal ¶
type ServiceCanal struct { Host string Port int User string Password string Flavor string ServerID int // Array of regex string, only when the name of table (included schema, such as db.test) // matches any regex in this array, its binlog will be collected. // Example: "IncludeTables": [".*\\..*"], collect all tables, notes the backslash for escaping. IncludeTables []string // Array of regex string, if the name of table (included schema, too) matches any regex // in this array, its binlog will not be collected. // Example: "ExcludeTables": ["^mysql\\..*$"], exclude all tables in schema named 'mysql'. ExcludeTables []string // Start* are used to specify start synchronization point. StartGTID is preferred if // both of them are specified. **Only works when no checkpoint is exiting.** // Start* will fail if any error is encountered during synchronization because plugin will // recovery from latest checkpoint (got from server). // StartGTID will not synchrnoize event specified by it, for example, set StartGTID to // 'uuid:1-9', then plugin will skip 1 to 9, and synchrnoize from interval 10. // However, StartBinName and StartBinLogPos will start synchrnoize from the event identified // by them. // For StartGTID: // - If its format is invalid (parse error), plugin starts synchronization from latest, // because in such case, the first synchronization will fail, and plugin will get the // latest checkpoint from server to failover. // - If its format is valid but value is invalid, plugin's behavior depends on server. It // might start synchrnoize from beginning or ERROR to start from latest. // For StartBinName and StartBinLogPos: // - If any of them are invalid, plugins starts synchronization from latest (failover). StartGTID string StartBinName string StartBinLogPos int HeartBeatPeriod int ReadTimeout int EnableDDL bool EnableXID bool EnableGTID bool EnableInsert bool EnableUpdate bool EnableDelete bool TextToString bool EnableEventMeta bool // True by default, convert field with SET type to string in format [elem1, elem2, ...]. SetToString bool // True by default, convert field in Go []byte type to string, such as JSON type. ByteValueToString bool // TODO: This parameter is not exposed in document. // StartFromBegining only works when no checkpoint and all Start* parameters are missing. // If StartFromBegining is set, plugin will not try to get latest position from server. StartFromBegining bool Charset string // Pack values into two fields: new_data and old_data. False by default. PackValues bool // contains filtered or unexported fields }
ServiceCanal is a service input plugin to collect binlog from MySQL. It works as a slave node to collect binlog from master (source) continually. It supports two kinds of replication mode: GTID or binlog-file.
GTID mode uses GTID as checkpoint, which needs the server to enable this feature (by setting gtid_mode=ON). At the beginning, plugin will check this necessary condition, it will fall down to binlog-file mode if gtid_mode is OFF. One special case: the server has turned on gtid_mode, but the latest GTID is empty. In such case, plugin will fall down to binlog-file mode at beginning.
Binlog-file mode uses the sequence number of file as checkpoint, which might leads to data repetition in master-slave arch. Because for same binlog content, master and slave nodes can use different sequence number to store it, once the sequence number on slave is higher and HA switch happens, data repetition happened. That's why we prefer GTID mode.
func NewServiceCanal ¶
func NewServiceCanal() *ServiceCanal
func (*ServiceCanal) Collect ¶
func (sc *ServiceCanal) Collect(ilogtail.Collector) error
Collect takes in an accumulator and adds the metrics that the Input gathers. This is called every "interval"
func (*ServiceCanal) Description ¶
func (sc *ServiceCanal) Description() string
func (*ServiceCanal) GetBinlogLatestPos ¶
func (sc *ServiceCanal) GetBinlogLatestPos() mysql.Position
GetBinlogLatestPos gets the latest binlog position from server.
func (*ServiceCanal) OnDDL ¶
func (sc *ServiceCanal) OnDDL(pos mysql.Position, e *replication.QueryEvent) error
OnDDL...
func (*ServiceCanal) OnGTID ¶
func (sc *ServiceCanal) OnGTID(s mysql.GTIDSet) error
OnGTID reports the GTID of the following event (OnRow, OnDDL). So we can not update checkpoint here, just record GTID and update in OnRow.
This strategy brings a potential problem, checkpoint will only be updated when OnRow is called, however, because of IncludeTables and ExcludeTables, calls to OnRow will be filtered. So, if plugin restarts before the next OnRow call comes, it will rerun from a old checkpoint. But this should be trivial for cases that valid data comes continuously.
func (*ServiceCanal) OnPosSynced ¶
func (*ServiceCanal) OnRotate ¶
func (sc *ServiceCanal) OnRotate(r *replication.RotateEvent) error
func (*ServiceCanal) OnRow ¶
func (sc *ServiceCanal) OnRow(e *canal.RowsEvent) error
OnRow processes the row event, according user's config, constructs data to send.
func (*ServiceCanal) OnTableChanged ¶
func (sc *ServiceCanal) OnTableChanged(schema string, table string) error
func (*ServiceCanal) Start ¶
func (sc *ServiceCanal) Start(c ilogtail.Collector) error
Start starts the ServiceInput's service, whatever that may be
func (*ServiceCanal) Stop ¶
func (sc *ServiceCanal) Stop() error
Stop stops the services and closes any necessary channels and connections
func (*ServiceCanal) String ¶
func (sc *ServiceCanal) String() string