Documentation ¶
Index ¶
- Constants
- Variables
- type AgentHeader
- type Column
- type DBSQLQuery
- func (d *DBSQLQuery) ExtractDatabaseTable(query string) (string, string)
- func (d *DBSQLQuery) GetCapabilities() map[string]*utils.TaskDescription
- func (d *DBSQLQuery) GetPrimary(schema, table string) string
- func (d *DBSQLQuery) GetSchema() map[string]map[string]*Column
- func (d *DBSQLQuery) GetStatus() interface{}
- func (d *DBSQLQuery) HealthCheck() bool
- func (d *DBSQLQuery) ProcessLines(columns []string, lines [][]interface{}, info QueryInfo, ...)
- func (d *DBSQLQuery) Query(database string, query string) (err error)
- func (d *DBSQLQuery) QueryMeta(query string) ([]map[string]interface{}, error)
- func (d *DBSQLQuery) QuerySchema(q string) (err error)
- type DBSQLQueryConfig
- type FileReadingFollower
- func (f *FileReadingFollower) GetCapabilities() map[string]*utils.TaskDescription
- func (f *FileReadingFollower) GetMeta() map[string]utils.Meta
- func (f *FileReadingFollower) GetSchema() map[string]map[string]*Column
- func (f *FileReadingFollower) Process(action string, params ...interface{}) interface{}
- func (f *FileReadingFollower) Start(i ...interface{}) (err error)
- func (f *FileReadingFollower) UpdateCommittedLsn()
- type FileReadingFollowerConfig
- type Message
- type Messages
- type Meta
- type MySQLQuery
- func (m *MySQLQuery) Connect(schema string) error
- func (m *MySQLQuery) GetStatus() interface{}
- func (m *MySQLQuery) HealthCheck() bool
- func (m *MySQLQuery) Init()
- func (m *MySQLQuery) Process(action string, params ...interface{}) interface{}
- func (m *MySQLQuery) Query(query string) error
- func (m *MySQLQuery) QueryMeta(query string) ([]map[string]interface{}, error)
- func (m *MySQLQuery) QuerySchema() (err error)
- type MysqlCDC
- func (m *MysqlCDC) GetFirstBinlog() (mysql.Position, error)
- func (m *MysqlCDC) GetGTIDFromMariaDBPosition(pos mysql.Position) (mysql.GTIDSet, error)
- func (m *MysqlCDC) GetLastBinlog() (mysql.Position, error)
- func (m *MysqlCDC) GetMariaDBPosGTID() (mysql.GTIDSet, error)
- func (m *MysqlCDC) GetMeta() map[string]utils.Meta
- func (m *MysqlCDC) GetSchema() map[string]map[string]*Column
- func (m *MysqlCDC) GetValidBinlogFromOffset(offset string) mysql.Position
- func (m *MysqlCDC) GetValidMariaDBGTIDFromOffset(offset string) (mysql.GTIDSet, error)
- func (m *MysqlCDC) GetValidMysqlGTIDFromOffset(offset string) (mysql.GTIDSet, error)
- func (m *MysqlCDC) GetValidOffset(mode string, flavor string, offset string) error
- func (m *MysqlCDC) Init()
- func (m *MysqlCDC) OnDDL(header *replication.EventHeader, nextPos mysql.Position, ...) error
- func (m *MysqlCDC) OnGTID(header *replication.EventHeader, gset mysql.GTIDSet) error
- func (m *MysqlCDC) OnPosSynced(header *replication.EventHeader, pos mysql.Position, gset mysql.GTIDSet, ...) error
- func (m *MysqlCDC) OnRotate(header *replication.EventHeader, e *replication.RotateEvent) error
- func (m *MysqlCDC) OnRow(e *canal.RowsEvent) error
- func (m *MysqlCDC) OnTableChanged(header *replication.EventHeader, schema string, table string) error
- func (m *MysqlCDC) OnXID(header *replication.EventHeader, pos mysql.Position) error
- func (m *MysqlCDC) ParsePosition(offset string) (mysql.Position, error)
- func (m *MysqlCDC) Process(action string, params ...interface{}) interface{}
- func (m *MysqlCDC) Start(i ...interface{}) (err error)
- func (m *MysqlCDC) StartCanal() error
- func (m *MysqlCDC) String() string
- func (m *MysqlCDC) UpdateCommittedLsn()
- type MysqlCDCConfig
- type MysqlCDCMeta
- type MysqlOffset
- type MysqlQueryConfig
- type OffsetCommittedState
- type Oldkeys
- type PostgreSQLCDC
- func (p *PostgreSQLCDC) GetConfirmedFlushLsn() (pglogrepl.LSN, error)
- func (p *PostgreSQLCDC) GetMeta() map[string]utils.Meta
- func (p *PostgreSQLCDC) GetOffsetTimeline(tli int32, lsn pglogrepl.LSN) (int32, error)
- func (p *PostgreSQLCDC) GetSchema() map[string]map[string]*Column
- func (p *PostgreSQLCDC) GetSlotStatus() bool
- func (p *PostgreSQLCDC) HealthCheck() bool
- func (p *PostgreSQLCDC) Init()
- func (p *PostgreSQLCDC) NewConn() (*pgconn.PgConn, error)
- func (p *PostgreSQLCDC) Process(action string, params ...interface{}) interface{}
- func (p *PostgreSQLCDC) Start(i ...interface{}) (err error)
- func (p *PostgreSQLCDC) StartReplication()
- func (p *PostgreSQLCDC) UpdateCommittedLsn()
- type PostgreSQLCDCConf
- type PostgreSQLQuery
- func (p *PostgreSQLQuery) Connect()
- func (p *PostgreSQLQuery) GetStatus() interface{}
- func (p *PostgreSQLQuery) HealthCheck() bool
- func (p *PostgreSQLQuery) Init()
- func (p *PostgreSQLQuery) Process(action string, params ...interface{}) interface{}
- func (p *PostgreSQLQuery) Query(query string) error
- func (p *PostgreSQLQuery) QueryMeta(query string) ([]map[string]interface{}, error)
- func (p *PostgreSQLQuery) QuerySchema() (err error)
- type PostgreSQLQueryConfig
- type Query
- type QueryInfo
- type Random
- func (r *Random) GetMeta() map[string]utils.Meta
- func (r *Random) GetSchema() map[string]map[string]*Column
- func (r *Random) GetStatus() interface{}
- func (r *Random) HealthCheck() bool
- func (r *Random) Process(action string, params ...interface{}) interface{}
- func (r *Random) Start(i ...interface{}) error
- type RandomConfig
- type SQLSchema
- type Source
- func (s *Source) GetCapabilities() map[string]*utils.TaskDescription
- func (s *Source) GetCommitChan() chan interface{}
- func (s *Source) GetMeta() map[string]utils.Meta
- func (s *Source) GetName() string
- func (s *Source) GetOutputChan() chan events.LookatchEvent
- func (s *Source) GetStatus() interface{}
- func (s *Source) HealthCheck() bool
- func (s *Source) Init()
- func (s *Source) IsEnable() bool
- func (s *Source) Start(i ...interface{}) (err error)
- func (s *Source) Stop() error
- func (s *Source) UpdateCommittedLsn()
- type SourceI
- func New(name string, sourceType string, config *viper.Viper) (s SourceI, err error)
- func NewFileReadingFollower(s *Source) (SourceI, error)
- func NewMysqlCdc(s *Source) (SourceI, error)
- func NewMysqlQuery(s *Source) (SourceI, error)
- func NewPostgreSQLCdc(s *Source) (SourceI, error)
- func NewPostgreSQLQuery(s *Source) (SourceI, error)
- func NewRandom(s *Source) (SourceI, error)
- func NewSqlserverCDC(s *Source) (SourceI, error)
- func NewSqlserverSQLQuery(s *Source) (SourceI, error)
- func NewSyslog(s *Source) (SourceI, error)
- type SqlserverCDC
- func (s *SqlserverCDC) Connect()
- func (s *SqlserverCDC) GetCapabilities() map[string]*utils.TaskDescription
- func (s *SqlserverCDC) GetChangesForTables(table string, fromLsn []byte, toLsn []byte) []map[string]interface{}
- func (s *SqlserverCDC) GetMaxLsn() []byte
- func (s *SqlserverCDC) GetMeta() map[string]utils.Meta
- func (s *SqlserverCDC) GetMinLsn(table string) []byte
- func (s *SqlserverCDC) GetNextLsn(lsn []byte) []byte
- func (s *SqlserverCDC) GetRecordedChances()
- func (s *SqlserverCDC) GetSchema() map[string]map[string]*Column
- func (s *SqlserverCDC) GetTimestampFromLsn(lsn []byte) int64
- func (s *SqlserverCDC) HealthCheck() bool
- func (s *SqlserverCDC) Init()
- func (s *SqlserverCDC) Process(action string, params ...interface{}) interface{}
- func (s *SqlserverCDC) ProcessRow(row map[string]interface{}, schema string, table string, pk string, ...)
- func (s *SqlserverCDC) Query(query string) []map[string]interface{}
- func (s *SqlserverCDC) Start(i ...interface{}) (err error)
- func (s *SqlserverCDC) Stop() error
- func (s *SqlserverCDC) UpdateChangeTables()
- func (s *SqlserverCDC) UpdateCommittedLsn()
- type SqlserverCDCConfig
- type SqlserverCDCMeta
- type SqlserverQuery
- func (m *SqlserverQuery) Connect()
- func (m *SqlserverQuery) GetStatus() interface{}
- func (m *SqlserverQuery) HealthCheck() bool
- func (m *SqlserverQuery) Init()
- func (m *SqlserverQuery) Process(action string, params ...interface{}) interface{}
- func (m *SqlserverQuery) Query(query string) error
- func (m *SqlserverQuery) QueryMeta(query string) ([]map[string]interface{}, error)
- func (m *SqlserverQuery) QuerySchema() (err error)
- type SqlserverQueryConfig
- type Syslog
- func (s *Syslog) GetCapabilities() map[string]*utils.TaskDescription
- func (s *Syslog) GetCommitChan() chan interface{}
- func (s *Syslog) GetMeta() map[string]utils.Meta
- func (s *Syslog) GetName() string
- func (s *Syslog) GetOutputChan() chan events.LookatchEvent
- func (s *Syslog) GetSchema() map[string]map[string]*Column
- func (s *Syslog) GetStatus() interface{}
- func (s *Syslog) HealthCheck() bool
- func (s *Syslog) Init()
- func (s *Syslog) IsEnable() bool
- func (s *Syslog) Process(action string, params ...interface{}) interface{}
- func (s *Syslog) Start(i ...interface{}) error
- func (s *Syslog) Stop() error
- type SyslogConfig
Constants ¶
const ( UpdateAction = "update" InsertAction = "insert" DeleteAction = "delete" ModeGTID = "GTID" ModeBinlog = "binlog" MariadDB = "mariadb" Mysql = "mysql" )
const ( // Source Possible Statuses SourceStatusOnError = "ON_ERROR" SourceStatusRunning = "RUNNING" SourceStatusWaiting = "WAITING" SourceStatusWaitingForMETA = "WAITING_FOR_META" SourceStatusInit = "INITIALIZING" DefaultChannelSize = 100 )
Possible Statuses
const FileReadingFollowerType = "FileReadingFollower"
FileReadingFollowerType type of source
const MysqlCDCType = "MysqlCDC"
MysqlCDCType type of source
const MysqlQueryType = "MysqlQuery"
MysqlQueryType type of source
const PRI = "PRI"
const PostgreSQLCDCType = "PostgresqlCDC"
PostgreSQLCDCType type of source
const PostgreSQLQueryType = "PostgresqlQuery"
PostgreSQLQueryType type of source
const RandomType = "Random"
RandomType type of source
const SqlserverCDCType = "SqlserverCDC"
SqlserverCDCType type of source
const SqlserverQueryType = "SqlserverQuery"
SqlserverQueryType type of source
const SyslogType = "Syslog"
SyslogType type of source
const TickerValue = 10
TickerValue number second to wait between to tick
Variables ¶
var Factory = map[string]sourceCreatorT{ RandomType: NewRandom, MysqlQueryType: NewMysqlQuery, MysqlCDCType: NewMysqlCdc, PostgreSQLQueryType: NewPostgreSQLQuery, PostgreSQLCDCType: NewPostgreSQLCdc, SqlserverQueryType: NewSqlserverSQLQuery, SqlserverCDCType: NewSqlserverCDC, SyslogType: NewSyslog, FileReadingFollowerType: NewFileReadingFollower, }
Factory source Factory
Functions ¶
This section is empty.
Types ¶
type AgentHeader ¶
type AgentHeader struct { Tenant events.LookatchTenantInfo Hostname string UUID string }
AgentHeader representation of Agent Header contain agent auth information for events
type Column ¶ added in v0.2.0
type Column struct { Database string `json:"database,omitempty"` Schema string `json:"schema,omitempty"` Table string `json:"table,omitempty"` Column string `json:"column"` ColumnOrdPos int `json:"column_ord_pos"` Nullable bool `json:"nullable"` DataType string `json:"data_type"` CharacterMaximumLength null.Int `json:"character_maximum_length,omitempty"` NumericPrecision null.Int `json:"numeric_precision,omitempty"` NumericScale null.Int `json:"numeric_scale,omitempty"` ColumnType string `json:"column_type"` ColumnKey string `json:"column_key,omitempty"` }
Column representation of schema
type DBSQLQuery ¶ added in v0.2.0
type DBSQLQuery struct { *Source Config DBSQLQueryConfig // contains filtered or unexported fields }
DBSQLQuery representation of DBSQL query
func NewDBSQLQuery ¶ added in v0.2.0
func NewDBSQLQuery(s *Source) DBSQLQuery
NewDBSQLQuery create new DBSQL query client
func (*DBSQLQuery) ExtractDatabaseTable ¶ added in v0.2.0
func (d *DBSQLQuery) ExtractDatabaseTable(query string) (string, string)
ExtractDatabaseTable Extract the database (or schema, depending on your RDBMS) name and table name from a SQL query
func (*DBSQLQuery) GetCapabilities ¶ added in v0.2.0
func (d *DBSQLQuery) GetCapabilities() map[string]*utils.TaskDescription
GetCapabilities returns the actions the collector can perform on this source
func (*DBSQLQuery) GetPrimary ¶ added in v0.2.0
func (d *DBSQLQuery) GetPrimary(schema, table string) string
GetPrimary check if columns is primary key
func (*DBSQLQuery) GetSchema ¶ added in v0.2.0
func (d *DBSQLQuery) GetSchema() map[string]map[string]*Column
GetSchema returns source schema
func (*DBSQLQuery) GetStatus ¶ added in v0.2.0
func (d *DBSQLQuery) GetStatus() interface{}
GetStatus returns the collector's source status
func (*DBSQLQuery) HealthCheck ¶ added in v0.2.0
func (d *DBSQLQuery) HealthCheck() bool
HealthCheck returns true if the source is correctly configured and the collector is connected to it
func (*DBSQLQuery) ProcessLines ¶ added in v0.2.0
func (d *DBSQLQuery) ProcessLines(columns []string, lines [][]interface{}, info QueryInfo, wg *sizedwaitgroup.SizedWaitGroup)
ProcessLines process batches of lines from a resultset to map them before sending them to a marshall goroutine
func (*DBSQLQuery) Query ¶ added in v0.2.0
func (d *DBSQLQuery) Query(database string, query string) (err error)
Query send a SQL query to the configured source
func (*DBSQLQuery) QueryMeta ¶ added in v0.2.0
func (d *DBSQLQuery) QueryMeta(query string) ([]map[string]interface{}, error)
QueryMeta execute query metadata
func (*DBSQLQuery) QuerySchema ¶ added in v0.2.0
func (d *DBSQLQuery) QuerySchema(q string) (err error)
QuerySchema retrieves source's schema directly from the source itself
type DBSQLQueryConfig ¶ added in v0.2.0
type DBSQLQueryConfig struct { Host string `json:"host"` Port int `json:"port"` User string `json:"user"` Password string `json:"password"` BatchSize int `json:"batch_size" mapstructure:"batch_size"` NbWorker int `json:"nb_worker" mapstructure:"nb_worker"` ColumnsMetaValue bool `json:"columns_meta" mapstructure:"columns_meta"` DefinedPk map[string]string `json:"defined_pk" mapstructure:"defined_pk"` }
DBSQLQueryConfig representation of DBSQL query configuration
type FileReadingFollower ¶ added in v0.1.0
type FileReadingFollower struct { *Source // contains filtered or unexported fields }
FileReadingFollower representation of FileReadingFollower
func (*FileReadingFollower) GetCapabilities ¶ added in v0.2.0
func (f *FileReadingFollower) GetCapabilities() map[string]*utils.TaskDescription
GetCapabilities returns available actions
func (*FileReadingFollower) GetMeta ¶ added in v0.1.0
func (f *FileReadingFollower) GetMeta() map[string]utils.Meta
GetMeta get source meta
func (*FileReadingFollower) GetSchema ¶ added in v0.1.0
func (f *FileReadingFollower) GetSchema() map[string]map[string]*Column
GetSchema Get source Schema
func (*FileReadingFollower) Process ¶ added in v0.1.0
func (f *FileReadingFollower) Process(action string, params ...interface{}) interface{}
Process action
func (*FileReadingFollower) Start ¶ added in v0.1.0
func (f *FileReadingFollower) Start(i ...interface{}) (err error)
Start source
func (*FileReadingFollower) UpdateCommittedLsn ¶ added in v0.2.0
func (f *FileReadingFollower) UpdateCommittedLsn()
UpdateCommittedLsn update CommittedLsn
type FileReadingFollowerConfig ¶ added in v0.1.0
FileReadingFollowerConfig representation of FileReadingFollower Config
type Message ¶
type Message struct { Columnnames []string `json:"columnnames"` Columntypes []string `json:"columntypes"` Columnvalues []interface{} `json:"columnvalues"` Kind string `json:"kind"` Schema string `json:"schema"` Table string `json:"table"` Oldkeys Oldkeys `json:"oldkeys"` }
Message representation of message
type Messages ¶
type Messages struct {
Change []Message `json:"change"`
}
Messages representation of messages
type Meta ¶
type Meta struct { LastState string `json:"laststate"` CurrentLsn pglogrepl.LSN `json:"current_lsn"` CommittedLsn pglogrepl.LSN `json:"committed_lsn"` SlotStatus bool `json:"slotstatus"` ServerWALEnd pglogrepl.LSN `json:"werver_wal_end"` }
Meta representation of metadata
type MySQLQuery ¶
type MySQLQuery struct { *DBSQLQuery // contains filtered or unexported fields }
MySQLQuery representation of MySQL Query source
func (*MySQLQuery) Connect ¶
func (m *MySQLQuery) Connect(schema string) error
Connect connection to database
func (*MySQLQuery) GetStatus ¶
func (m *MySQLQuery) GetStatus() interface{}
GetStatus returns current status of connexion
func (*MySQLQuery) HealthCheck ¶
func (m *MySQLQuery) HealthCheck() bool
HealthCheck returns true if source is ok
func (*MySQLQuery) Process ¶
func (m *MySQLQuery) Process(action string, params ...interface{}) interface{}
Process process an action
func (*MySQLQuery) Query ¶
func (m *MySQLQuery) Query(query string) error
Query execute query string
func (*MySQLQuery) QueryMeta ¶
func (m *MySQLQuery) QueryMeta(query string) ([]map[string]interface{}, error)
QueryMeta execute query meta string
func (*MySQLQuery) QuerySchema ¶
func (m *MySQLQuery) QuerySchema() (err error)
QuerySchema extract schema from database
type MysqlCDC ¶
type MysqlCDC struct { *Source // contains filtered or unexported fields }
MysqlCDC representation of Mysql change data capture
func (*MysqlCDC) GetFirstBinlog ¶
GetFirstBinlog Get First Binlog offset
func (*MysqlCDC) GetGTIDFromMariaDBPosition ¶ added in v0.2.0
GetGTIDFromMariaDBPosition return mariadb GTID from binlog position
func (*MysqlCDC) GetLastBinlog ¶ added in v0.2.0
GetLastBinlog Get last Binlog offset
func (*MysqlCDC) GetMariaDBPosGTID ¶ added in v0.2.2
GetMariaDBPosGTID return mariadb GTIDSet from slave
func (*MysqlCDC) GetValidBinlogFromOffset ¶ added in v0.2.0
GetValidBinlogFromOffset return a valid binlog offset if offset is invalid return first binlog offset if offset is greater than master position offset return master position else return parse offset
func (*MysqlCDC) GetValidMariaDBGTIDFromOffset ¶ added in v0.2.0
GetValidBinlogFromOffset return a valid Mariadb GTID offset if offset is invalid return first GTID offset if offset is greater than master position offset return master position else return parse GTID offset
func (*MysqlCDC) GetValidMysqlGTIDFromOffset ¶ added in v0.2.0
GetValidMysqlGTIDFromOffset return a valid Mariadb GTID offset parse string offset and add purge GTIDset if exist
func (*MysqlCDC) GetValidOffset ¶ added in v0.2.0
GetValidOffset return a valid offset
func (*MysqlCDC) OnDDL ¶ added in v0.2.0
func (m *MysqlCDC) OnDDL(header *replication.EventHeader, nextPos mysql.Position, queryEvent *replication.QueryEvent) error
OnDDL
func (*MysqlCDC) OnGTID ¶ added in v0.2.0
func (m *MysqlCDC) OnGTID(header *replication.EventHeader, gset mysql.GTIDSet) error
OnGTID store GTID Position
func (*MysqlCDC) OnPosSynced ¶ added in v0.2.0
func (m *MysqlCDC) OnPosSynced(header *replication.EventHeader, pos mysql.Position, gset mysql.GTIDSet, force bool) error
OnPosSynced Use your own way to sync position. When force is true, sync position immediately.
func (*MysqlCDC) OnRotate ¶ added in v0.2.0
func (m *MysqlCDC) OnRotate(header *replication.EventHeader, e *replication.RotateEvent) error
OnRotate store binlog Position
func (*MysqlCDC) OnTableChanged ¶ added in v0.2.0
func (m *MysqlCDC) OnTableChanged(header *replication.EventHeader, schema string, table string) error
OnTableChanged
func (*MysqlCDC) OnXID ¶ added in v0.2.0
func (m *MysqlCDC) OnXID(header *replication.EventHeader, pos mysql.Position) error
OnXID store binlog Position
func (*MysqlCDC) ParsePosition ¶ added in v0.2.0
ParsePosition parse binlog offset from string
func (*MysqlCDC) StartCanal ¶ added in v0.2.0
StartCanal decode event
func (*MysqlCDC) UpdateCommittedLsn ¶ added in v0.2.0
func (m *MysqlCDC) UpdateCommittedLsn()
UpdateCommittedLsn update CommittedLsn
type MysqlCDCConfig ¶
type MysqlCDCConfig struct { Enabled bool `json:"enabled"` OldValue bool `json:"old_value" mapstructure:"old_value"` ColumnsMetaValue bool `json:"columns_meta" mapstructure:"columns_meta"` SlaveID uint32 `json:"slave_id" mapstructure:"slave_id"` Host string `json:"host"` Port int `json:"port"` User string `json:"user"` Password string `json:"password"` Offset string `json:"offset"` Flavor string `json:"flavor"` Mode string `json:"mode"` FilterPolicy string `json:"filter_policy" mapstructure:"filter_policy"` Filter map[string]interface{} `json:"filter"` DefinedPk map[string]string `json:"defined_pk" mapstructure:"defined_pk"` }
MysqlCDCConfig representation of Mysql change data capture configuration
type MysqlCDCMeta ¶ added in v0.2.0
type MysqlCDCMeta struct { ErrorEventNumber int `json:"error_event_number"` CommittedOffset string `json:"committed_offset"` }
MysqlCDCMeta representation of metadata
type MysqlOffset ¶ added in v0.2.0
MysqlOffset representation of binlog and GTID Offset
func (*MysqlOffset) GTIDSet ¶ added in v0.2.0
func (m *MysqlOffset) GTIDSet() mysql.GTIDSet
GTIDSet get GTID
func (*MysqlOffset) OffsetString ¶ added in v0.2.0
func (m *MysqlOffset) OffsetString(mode string) string
OffsetString convert offset to string
func (*MysqlOffset) Position ¶ added in v0.2.0
func (m *MysqlOffset) Position() mysql.Position
Position get Position
func (*MysqlOffset) Update ¶ added in v0.2.0
func (m *MysqlOffset) Update(pos mysql.Position)
Update set pos
func (*MysqlOffset) UpdateGTIDSet ¶ added in v0.2.0
func (m *MysqlOffset) UpdateGTIDSet(gset mysql.GTIDSet)
UpdateGTIDSet set GTID pos
func (*MysqlOffset) UpdatePos ¶ added in v0.2.2
func (m *MysqlOffset) UpdatePos(pos uint32)
Update set pos
type MysqlQueryConfig ¶
type MysqlQueryConfig struct { *DBSQLQueryConfig Schema string `json:"schema"` Exclude []string `json:"exclude"` }
MysqlQueryConfig representation MySQL Query configuration
type OffsetCommittedState ¶ added in v0.2.2
OffsetComittedState keep track offSend Event
func NewOffsetCommittedState ¶ added in v0.2.2
func NewOffsetCommittedState() *OffsetCommittedState
func (*OffsetCommittedState) Add ¶ added in v0.2.2
func (c *OffsetCommittedState) Add(lsn pglogrepl.LSN)
Add add new lsn to state
func (*OffsetCommittedState) CleanFromLsn ¶ added in v0.2.2
func (c *OffsetCommittedState) CleanFromLsn(lsn pglogrepl.LSN)
CleanFromLsn clean old lsn state
func (*OffsetCommittedState) IsEmpty ¶ added in v0.2.2
func (c *OffsetCommittedState) IsEmpty() bool
IsEmpty check if no offset in State
type Oldkeys ¶
type Oldkeys struct { Keynames []string `json:"keynames"` Keytypes []string `json:"keytypes"` Keyvalues []interface{} `json:"keyvalues"` }
Oldkeys representation of old event
type PostgreSQLCDC ¶
type PostgreSQLCDC struct { *Source CommittedState *OffsetCommittedState // contains filtered or unexported fields }
PostgreSQLCDC representation of PostgreSQL change data capture
func (*PostgreSQLCDC) GetConfirmedFlushLsn ¶ added in v0.2.2
func (p *PostgreSQLCDC) GetConfirmedFlushLsn() (pglogrepl.LSN, error)
GetRestartLsn Get restart Lsn
func (*PostgreSQLCDC) GetMeta ¶
func (p *PostgreSQLCDC) GetMeta() map[string]utils.Meta
GetMeta get metadata
func (*PostgreSQLCDC) GetOffsetTimeline ¶ added in v0.2.4
func (*PostgreSQLCDC) GetSchema ¶
func (p *PostgreSQLCDC) GetSchema() map[string]map[string]*Column
GetSchema get schema
func (*PostgreSQLCDC) GetSlotStatus ¶ added in v0.2.0
func (p *PostgreSQLCDC) GetSlotStatus() bool
GetSlotStatus get slot status
func (*PostgreSQLCDC) HealthCheck ¶
func (p *PostgreSQLCDC) HealthCheck() bool
HealthCheck returns true if ok
func (*PostgreSQLCDC) NewConn ¶ added in v0.2.0
func (p *PostgreSQLCDC) NewConn() (*pgconn.PgConn, error)
NewConn create new pg connection
func (*PostgreSQLCDC) Process ¶
func (p *PostgreSQLCDC) Process(action string, params ...interface{}) interface{}
Process action
func (*PostgreSQLCDC) Start ¶
func (p *PostgreSQLCDC) Start(i ...interface{}) (err error)
Start source
func (*PostgreSQLCDC) StartReplication ¶ added in v0.1.0
func (p *PostgreSQLCDC) StartReplication()
StartReplication Start Replication
func (*PostgreSQLCDC) UpdateCommittedLsn ¶ added in v0.2.0
func (p *PostgreSQLCDC) UpdateCommittedLsn()
UpdateCommittedLsn update CommittedLsn
type PostgreSQLCDCConf ¶
type PostgreSQLCDCConf struct { Enabled bool `json:"enabled"` OldValue bool `json:"old_value" mapstructure:"old_value"` ColumnsMetaValue bool `json:"columns_meta" mapstructure:"columns_meta"` Host string `json:"host"` Port int `json:"port"` User string `json:"user"` Password string `json:"password"` SslMode string `json:"sslmode"` Database string `json:"database"` SlotName string `json:"slot_name" mapstructure:"slot_name"` FilterPolicy string `json:"filter_policy" mapstructure:"filter_policy"` Filter map[string]interface{} `json:"filter"` DefinedPk map[string]string `json:"defined_pk" mapstructure:"defined_pk"` }
PostgreSQLCDCConf representation of PostgreSQL change data capture configuration
type PostgreSQLQuery ¶
type PostgreSQLQuery struct { *DBSQLQuery // contains filtered or unexported fields }
PostgreSQLQuery representation of PostgreSQL Query source
func (*PostgreSQLQuery) Connect ¶
func (p *PostgreSQLQuery) Connect()
Connect connection to database
func (*PostgreSQLQuery) GetStatus ¶
func (p *PostgreSQLQuery) GetStatus() interface{}
GetStatus returns current status of connexion
func (*PostgreSQLQuery) HealthCheck ¶
func (p *PostgreSQLQuery) HealthCheck() bool
HealthCheck returns true if the source is correctly configured and the collector is connected to it
func (*PostgreSQLQuery) Init ¶
func (p *PostgreSQLQuery) Init()
Init initialisation of PostgreSQL Query source
func (*PostgreSQLQuery) Process ¶
func (p *PostgreSQLQuery) Process(action string, params ...interface{}) interface{}
Process process an action
func (*PostgreSQLQuery) Query ¶
func (p *PostgreSQLQuery) Query(query string) error
Query execute query string
func (*PostgreSQLQuery) QueryMeta ¶
func (p *PostgreSQLQuery) QueryMeta(query string) ([]map[string]interface{}, error)
QueryMeta execute query meta string
func (*PostgreSQLQuery) QuerySchema ¶
func (p *PostgreSQLQuery) QuerySchema() (err error)
QuerySchema extract schema from database
type PostgreSQLQueryConfig ¶
type PostgreSQLQueryConfig struct { *DBSQLQueryConfig SslMode string `json:"sslmode"` Database string `json:"database"` }
PostgreSQLQueryConfig representation PostgreSQL Query configuration
type Query ¶
type Query struct {
Query string `name:"query" description:"SQL query to execute on the source by the collector" required:"true"`
}
Query representation of query action
type QueryInfo ¶ added in v0.2.2
type QueryInfo struct { Database string Schema string Table string PrimaryKey string ExecTimestamp string ColumnMeta map[string]events.ColumnsMeta }
QueryInfo need Inbformation for query Event
type Random ¶
Random representation of Random
func (*Random) HealthCheck ¶
HealthCheck returns true if the source is correctly configured and the collector is connected to it
type RandomConfig ¶
type RandomConfig struct {
Wait string `json:"wait"`
}
RandomConfig representation of Random Config
type Source ¶
type Source struct { Name string OutputChannel chan events.LookatchEvent CommitChannel chan interface{} AgentInfo *AgentHeader Conf *viper.Viper Offset int64 Status string }
Source representation of source
func (*Source) GetCapabilities ¶ added in v0.2.0
func (s *Source) GetCapabilities() map[string]*utils.TaskDescription
GetCapabilities returns available actions
func (*Source) GetCommitChan ¶ added in v0.2.0
func (s *Source) GetCommitChan() chan interface{}
GetCommitChan return commit channel attach to source
func (*Source) GetOutputChan ¶ added in v0.2.0
func (s *Source) GetOutputChan() chan events.LookatchEvent
GetOutputChan get output channel
func (*Source) GetStatus ¶ added in v0.2.0
func (s *Source) GetStatus() interface{}
GetStatus returns the collector's source status
func (*Source) HealthCheck ¶ added in v0.2.0
HealthCheck returns true if the source is correctly configured and the collector is connected to it
func (*Source) UpdateCommittedLsn ¶ added in v0.2.0
func (s *Source) UpdateCommittedLsn()
UpdateCommittedLsn do noting avoid deadlock
type SourceI ¶
type SourceI interface { Init() Stop() error Start(...interface{}) error GetName() string GetOutputChan() chan events.LookatchEvent GetCommitChan() chan interface{} UpdateCommittedLsn() GetMeta() map[string]utils.Meta GetSchema() map[string]map[string]*Column GetStatus() interface{} IsEnable() bool HealthCheck() bool GetCapabilities() map[string]*utils.TaskDescription Process(string, ...interface{}) interface{} }
SourceI interface of source
func NewFileReadingFollower ¶ added in v0.2.2
NewFileReadingFollower create new FileReadingFollower source
func NewMysqlCdc ¶ added in v0.2.2
NewMysqlCdc create new mysql CDC source
func NewMysqlQuery ¶ added in v0.2.2
NewMysqlQuery create a Mysql Query source
func NewPostgreSQLCdc ¶ added in v0.2.2
NewPostgreSQLCdc create new PostgreSQL CDC source
func NewPostgreSQLQuery ¶ added in v0.2.2
NewPostgreSQLQuery create a PostgreSQL Query source
func NewSqlserverCDC ¶ added in v0.2.2
NewSqlserverCDC create a Sqlserver Query source
func NewSqlserverSQLQuery ¶ added in v0.2.2
NewSqlserverSQLQuery create a Sqlserver Query source
type SqlserverCDC ¶ added in v0.2.0
type SqlserverCDC struct { *Source // contains filtered or unexported fields }
SqlserverCDC representation of Sqlserver Query source
func (*SqlserverCDC) Connect ¶ added in v0.2.0
func (s *SqlserverCDC) Connect()
Connect connection to database
func (*SqlserverCDC) GetCapabilities ¶ added in v0.2.0
func (s *SqlserverCDC) GetCapabilities() map[string]*utils.TaskDescription
GetCapabilities returns available actions
func (*SqlserverCDC) GetChangesForTables ¶ added in v0.2.0
func (s *SqlserverCDC) GetChangesForTables(table string, fromLsn []byte, toLsn []byte) []map[string]interface{}
GetChangesForTables get all change for table between fromLsn and toLsn
func (*SqlserverCDC) GetMaxLsn ¶ added in v0.2.0
func (s *SqlserverCDC) GetMaxLsn() []byte
GetMaxLsn get max lsn of server
func (*SqlserverCDC) GetMeta ¶ added in v0.2.0
func (s *SqlserverCDC) GetMeta() map[string]utils.Meta
GetMeta get metadata
func (*SqlserverCDC) GetMinLsn ¶ added in v0.2.0
func (s *SqlserverCDC) GetMinLsn(table string) []byte
GetMinLsn get start lsn of a table
func (*SqlserverCDC) GetNextLsn ¶ added in v0.2.0
func (s *SqlserverCDC) GetNextLsn(lsn []byte) []byte
GetMinLsn get start lsn of a table
func (*SqlserverCDC) GetRecordedChances ¶ added in v0.2.0
func (s *SqlserverCDC) GetRecordedChances()
GetRecordedChances get cdc change for table list
func (*SqlserverCDC) GetSchema ¶ added in v0.2.0
func (s *SqlserverCDC) GetSchema() map[string]map[string]*Column
GetSchema get schema
func (*SqlserverCDC) GetTimestampFromLsn ¶ added in v0.2.0
func (s *SqlserverCDC) GetTimestampFromLsn(lsn []byte) int64
GetTimestampFromLsn get Timestamp associated to lsn
func (*SqlserverCDC) HealthCheck ¶ added in v0.2.0
func (s *SqlserverCDC) HealthCheck() bool
HealthCheck returns true if source is ok
func (*SqlserverCDC) Init ¶ added in v0.2.0
func (s *SqlserverCDC) Init()
Init initialisation of Sqlserver Query source
func (*SqlserverCDC) Process ¶ added in v0.2.0
func (s *SqlserverCDC) Process(action string, params ...interface{}) interface{}
Process process an action
func (*SqlserverCDC) ProcessRow ¶ added in v0.2.0
func (s *SqlserverCDC) ProcessRow(row map[string]interface{}, schema string, table string, pk string, method string)
ProcessRow send row to chan
func (*SqlserverCDC) Query ¶ added in v0.2.0
func (s *SqlserverCDC) Query(query string) []map[string]interface{}
Query execute query
func (*SqlserverCDC) Start ¶ added in v0.2.0
func (s *SqlserverCDC) Start(i ...interface{}) (err error)
Start source
func (*SqlserverCDC) UpdateChangeTables ¶ added in v0.2.0
func (s *SqlserverCDC) UpdateChangeTables()
UpdateChangeTables update list of activated cdc tables
func (*SqlserverCDC) UpdateCommittedLsn ¶ added in v0.2.0
func (s *SqlserverCDC) UpdateCommittedLsn()
UpdateCommittedLsn update CommittedLsn
type SqlserverCDCConfig ¶ added in v0.2.0
type SqlserverCDCConfig struct { Host string `json:"host"` Port int `json:"port"` User string `json:"user"` Password string `json:"password"` SslMode string `json:"sslmode"` Database string `json:"database"` PollInterval string `json:"poll_interval" mapstructure:"poll_interval"` FilterPolicy string `json:"filter_policy" mapstructure:"filter_policy"` Filter map[string]interface{} `json:"filter"` Enabled bool `json:"enabled"` Lsn string `json:"lsn"` }
SqlserverCDCConfig representation Sqlserver Query configuration
type SqlserverCDCMeta ¶ added in v0.2.0
type SqlserverCDCMeta struct {
CurrentLsn string `json:"current_lsn"`
}
SqlserverCDCMeta representation of matadata
type SqlserverQuery ¶ added in v0.2.0
type SqlserverQuery struct { *DBSQLQuery // contains filtered or unexported fields }
SqlserverQuery representation of Sqlserver Query source
func (*SqlserverQuery) Connect ¶ added in v0.2.0
func (m *SqlserverQuery) Connect()
Connect connection to database
func (*SqlserverQuery) GetStatus ¶ added in v0.2.0
func (m *SqlserverQuery) GetStatus() interface{}
GetStatus returns the collector's source status
func (*SqlserverQuery) HealthCheck ¶ added in v0.2.0
func (m *SqlserverQuery) HealthCheck() bool
HealthCheck returns true if the source is correctly configured and the collector is connected to it
func (*SqlserverQuery) Init ¶ added in v0.2.0
func (m *SqlserverQuery) Init()
Init initialisation of Sqlserver Query source
func (*SqlserverQuery) Process ¶ added in v0.2.0
func (m *SqlserverQuery) Process(action string, params ...interface{}) interface{}
Process process an action
func (*SqlserverQuery) Query ¶ added in v0.2.0
func (m *SqlserverQuery) Query(query string) error
Query execute query string
func (*SqlserverQuery) QueryMeta ¶ added in v0.2.0
func (m *SqlserverQuery) QueryMeta(query string) ([]map[string]interface{}, error)
QueryMeta execute query meta string
func (*SqlserverQuery) QuerySchema ¶ added in v0.2.0
func (m *SqlserverQuery) QuerySchema() (err error)
QuerySchema extract schema from database
type SqlserverQueryConfig ¶ added in v0.2.0
type SqlserverQueryConfig struct { *DBSQLQueryConfig SslMode string `json:"sslmode"` Database string `json:"database"` }
SqlserverQueryConfig representation Sqlserver Query configuration
type Syslog ¶ added in v0.1.0
Syslog representation of Random
func (*Syslog) GetCapabilities ¶ added in v0.2.0
func (s *Syslog) GetCapabilities() map[string]*utils.TaskDescription
GetCapabilities returns available actions
func (*Syslog) GetCommitChan ¶ added in v0.2.0
func (s *Syslog) GetCommitChan() chan interface{}
GetCommitChan return commit channel attach to source
func (*Syslog) GetOutputChan ¶ added in v0.1.0
func (s *Syslog) GetOutputChan() chan events.LookatchEvent
GetOutputChan get output channel
func (*Syslog) GetStatus ¶ added in v0.1.0
func (s *Syslog) GetStatus() interface{}
GetStatus get source status
func (*Syslog) HealthCheck ¶ added in v0.1.0
GetStatus returns the collector's source status
type SyslogConfig ¶ added in v0.1.0
SyslogConfig representation of Random Config