Documentation ¶
Index ¶
Constants ¶
View Source
const ( DefaultFlushIntervalMS = 100 DefaultFlushBatchSize = 500 )
View Source
const ( UpdateTimeTypeTime = "time" UpdateTimeTypeSec = "sec" UpdateTimeTypeMilli = "milli" UpdateTimeTypeNano = "nano" )
View Source
const ( ErrorTypeRowMiss = "row_miss" ErrorTypeRowDiff = "row_diff" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type MysqlCheckOutput ¶
type MysqlCheckOutput struct { *core.BaseOutput // contains filtered or unexported fields }
func NewMysqlCheckOutput ¶
func NewMysqlCheckOutput() *MysqlCheckOutput
func (*MysqlCheckOutput) Configure ¶
func (o *MysqlCheckOutput) Configure(config core.StringMap) (err error)
func (*MysqlCheckOutput) Process ¶
func (o *MysqlCheckOutput) Process(m *core.Message)
func (*MysqlCheckOutput) Start ¶
func (o *MysqlCheckOutput) Start() (err error)
func (*MysqlCheckOutput) Stop ¶
func (o *MysqlCheckOutput) Stop()
type MysqlCheckOutputConfig ¶
type MysqlCheckOutputConfig struct { ID string Host string Port uint16 User string Password string ResultFilePath string // if we need skip checking for most recently updated rows, we can use UpdateTimeColumn and UpdateTimeSkipSeconds // to construct a query condition like 'UpdateTimeColumn<Now()-UpdateTimeSkipSeconds' // this function only works if the pk values are not modified in the pipeline because implementing a reverse pipeline // operation is cumbersome UpdateTimeColumn string // can be time,sec(second),milli(milli second) and nano(nano second) UpdateTimeType string UpdateTimeSkipSeconds int64 TableBufferSize int // max messages buffered for each table TableFlushIntervalMS int64 // max ms between each table buffer flushing }
type TableProcessor ¶
type TableProcessor struct {
// contains filtered or unexported fields
}
func NewTableProcessor ¶
func NewTableProcessor(db string, table string, output *MysqlCheckOutput) (*TableProcessor, error)
func (*TableProcessor) Flush ¶
func (p *TableProcessor) Flush()
func (*TableProcessor) Process ¶
func (p *TableProcessor) Process(m *core.Message)
func (*TableProcessor) Run ¶
func (p *TableProcessor) Run()
Click to show internal directories.
Click to hide internal directories.