check

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 16, 2023 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultFlushIntervalMS = 100
	DefaultFlushBatchSize  = 500
)
View Source
const (
	UpdateTimeTypeTime  = "time"
	UpdateTimeTypeSec   = "sec"
	UpdateTimeTypeMilli = "milli"
	UpdateTimeTypeNano  = "nano"
)
View Source
const (
	ErrorTypeRowMiss = "row_miss"
	ErrorTypeRowDiff = "row_diff"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type MysqlCheckOutput

type MysqlCheckOutput struct {
	*core.BaseOutput
	// contains filtered or unexported fields
}

func NewMysqlCheckOutput

func NewMysqlCheckOutput() *MysqlCheckOutput

func (*MysqlCheckOutput) Configure

func (o *MysqlCheckOutput) Configure(config core.StringMap) (err error)

func (*MysqlCheckOutput) Process

func (o *MysqlCheckOutput) Process(m *core.Message)

func (*MysqlCheckOutput) Start

func (o *MysqlCheckOutput) Start() (err error)

func (*MysqlCheckOutput) Stop

func (o *MysqlCheckOutput) Stop()

type MysqlCheckOutputConfig

type MysqlCheckOutputConfig struct {
	ID       string
	Host     string
	Port     uint16
	User     string
	Password string

	ResultFilePath string
	// if we need skip checking for most recently updated rows, we can use UpdateTimeColumn and UpdateTimeSkipSeconds
	// to construct a query condition like 'UpdateTimeColumn<Now()-UpdateTimeSkipSeconds'
	// this function only works if the pk values are not modified in the pipeline because implementing a reverse pipeline
	// operation is cumbersome
	UpdateTimeColumn string
	// can be time,sec(second),milli(milli second) and nano(nano second)
	UpdateTimeType        string
	UpdateTimeSkipSeconds int64
	TableBufferSize       int   // max messages buffered for each table
	TableFlushIntervalMS  int64 // max ms between each table buffer flushing
}

type TableProcessor

type TableProcessor struct {
	// contains filtered or unexported fields
}

func NewTableProcessor

func NewTableProcessor(db string, table string, output *MysqlCheckOutput) (*TableProcessor, error)

func (*TableProcessor) Flush

func (p *TableProcessor) Flush()

func (*TableProcessor) Process

func (p *TableProcessor) Process(m *core.Message)

func (*TableProcessor) Run

func (p *TableProcessor) Run()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL