Versions in this module Expand all Collapse all v0 v0.1.0 Mar 16, 2023 Changes in this version + const AckWaitGroup + const CustomVariable + const DBDelete + const DBInsert + const DBUpdate + const FileStateStore + const MetaError + const MetaKafkaConsumerPosition + const MetaKafkaConsumerSession + const MetaMySqlPos + const MetaMySqlScanPos + const MetaOther + const MetaTableSchema + const TypeBytes + const TypeDBChange + const TypeDML + const TypeJsonBytes + const ZooKeeperStateStore + func MarshalMysqlBinlogPosition(p *MysqlBinlogPosition) ([]byte, error) + func NoPKError(db string, table string) error + func NoSchemaError(db string, table string) error + func SetComponentBuilderInstance(b ComponentBuilder) + func TypeError(value interface{}, typeName string) error + func UnmarshalMysqlBinlogPosition(p *MysqlBinlogPosition, data []byte) (err error) + type BaseComponent struct + ID string + func NewBaseComponent() *BaseComponent + func (c *BaseComponent) Configure(config StringMap) error + func (c *BaseComponent) Errors() chan error + func (c *BaseComponent) GetID() string + func (c *BaseComponent) GetLogger() *log.Logger + func (c *BaseComponent) RaiseError(err error) + func (c *BaseComponent) SetErrors(errChan chan error) + func (c *BaseComponent) SetLogger(logger *log.Logger) + func (c *BaseComponent) Start() error + func (c *BaseComponent) Stop() + type BaseInput struct + func NewBaseInput() *BaseInput + func (in *BaseInput) GetLastAckError() error + func (in *BaseInput) GetOutput() Output + func (in *BaseInput) SetLastAckError(err error) + func (in *BaseInput) SetOutput(output Output) + type BaseOutput struct + func NewBaseOutput() *BaseOutput + func (out *BaseOutput) GetInput() Input + func (out *BaseOutput) SetInput(input Input) + type Column struct + DefaultValue sql.NullString + Index int + IsNullable bool + IsPrimaryKey bool + IsUnsigned bool + Length int + Name string + RawType string + Type ColumnType + type ColumnType = int + const TypeBigInt + const TypeBinary + const TypeBit + const TypeBlob + const TypeChar + const TypeDate + const TypeDatetime + const TypeDecimal + const TypeDouble + const TypeEnum + const TypeFloat + const TypeInt + const TypeJson + const TypeLongBlob + const TypeLongText + const TypeMediumBlob + const TypeMediumInt + const TypeMediumText + const TypeOther + const TypeSet + const TypeSmallInt + const TypeText + const TypeTime + const TypeTimestamp + const TypeTinyBlob + const TypeTinyInt + const TypeTinyText + const TypeVarBinary + const TypeVarchar + const TypeYear + type Component interface + type ComponentBuilder interface + CreatePipeline func(config StringMap) (pipe Pipeline, err error) + CreateProcessor func(config StringMap) (processor Processor, err error) + RegisterComponent func(typeName string, constructor ComponentConstructor) + func GetComponentBuilderInstance() ComponentBuilder + type ComponentConstructor func() Component + type Configurable interface + Configure func(config StringMap) error + GetID func() string + type DBChangeEvent struct + DBTime uint64 + Database string + EventTime uint64 + ExtraInfo map[string]interface{} + ID string + NewRow map[string]interface{} + OldRow map[string]interface{} + Operation string + Table string + func (e *DBChangeEvent) GetColumns() []string + func (e *DBChangeEvent) GetRow() map[string]interface{} + type Errorable interface + Errors func() chan error + SetErrors func(errChan chan error) + type Input interface + Ack func(msg *Message, err error) + GetState func() (state []byte, done bool) + SetOutput func(output Output) + SetState func(state []byte) error + type LifeCycle interface + Start func() error + Stop func() + type LogAware interface + SetLogger func(logger *log.Logger) + type Message struct + Data interface{} + Header *MessageHeader + Type string + func NewMessage(typeName string) *Message + func (m *Message) ColumnNames() []string + func (m *Message) GetMeta(id int) (interface{}, bool) + func (m *Message) GetTableSchema() (*Table, bool) + func (m *Message) GetVariable(name string) (interface{}, bool) + func (m *Message) SetMeta(id int, data interface{}) + func (m *Message) SetVariable(name string, data interface{}) + type MessageHeader struct + CreateTime uint64 + ID string + MetaMap map[int]interface{} + Sequence uint64 + type MysqlBinlogPosition struct + BinlogName string + BinlogPos uint32 + GTIDSet mysql.GTIDSet + GTIDSetString string + RowOffset int + ServerID uint32 + ServerUUID string + Timestamp uint32 + TransactionID int64 + TransactionOffset int + TxBinlogPos uint32 + func (p *MysqlBinlogPosition) SimpleCopy() *MysqlBinlogPosition + type MysqlDMLEvent struct + BinlogEvent *replication.BinlogEvent + FullTableName string + NewRow []interface{} + OldRow []interface{} + Operation string + Pos *MysqlBinlogPosition + type Output interface + Process func(msg *Message) + SetInput func(input Input) + type Pipeline interface + SetProcessors func(processors []Processor) + type Processor interface + Process func(msg *Message) (skip bool, err error) + type StateStore interface + Close func() + GetType func() string + Load func(key string) ([]byte, error) + Save func(key string, value []byte) error + type StringMap = map[string]interface + type Table struct + Columns []*Column + DBName string + PKColumns []*Column + TableName string + func (t *Table) ColumnNames() []string + func (t *Table) PKColumnNames() []string