Documentation ¶
Index ¶
- Constants
- func MarshalMysqlBinlogPosition(p *MysqlBinlogPosition) ([]byte, error)
- func NoPKError(db string, table string) error
- func NoSchemaError(db string, table string) error
- func SetComponentBuilderInstance(b ComponentBuilder)
- func TypeError(value interface{}, typeName string) error
- func UnmarshalMysqlBinlogPosition(p *MysqlBinlogPosition, data []byte) (err error)
- type BaseComponent
- func (c *BaseComponent) Configure(config StringMap) error
- func (c *BaseComponent) Errors() chan error
- func (c *BaseComponent) GetID() string
- func (c *BaseComponent) GetLogger() *log.Logger
- func (c *BaseComponent) RaiseError(err error)
- func (c *BaseComponent) SetErrors(errChan chan error)
- func (c *BaseComponent) SetLogger(logger *log.Logger)
- func (c *BaseComponent) Start() error
- func (c *BaseComponent) Stop()
- type BaseInput
- type BaseOutput
- type Column
- type ColumnType
- type Component
- type ComponentBuilder
- type ComponentConstructor
- type Configurable
- type DBChangeEvent
- type Errorable
- type Input
- type LifeCycle
- type LogAware
- type Message
- func (m *Message) ColumnNames() []string
- func (m *Message) GetMeta(id int) (interface{}, bool)
- func (m *Message) GetTableSchema() (*Table, bool)
- func (m *Message) GetVariable(name string) (interface{}, bool)
- func (m *Message) SetMeta(id int, data interface{})
- func (m *Message) SetVariable(name string, data interface{})
- type MessageHeader
- type MysqlBinlogPosition
- type MysqlDMLEvent
- type Output
- type Pipeline
- type Processor
- type StateStore
- type StringMap
- type Table
Constants ¶
const ( TypeDML = "dml" TypeJsonBytes = "json_bytes" TypeBytes = "bytes" TypeDBChange = "db_change" )
const ( MetaOther int = iota CustomVariable MetaMySqlPos MetaMySqlScanPos MetaTableSchema AckWaitGroup MetaError MetaKafkaConsumerSession MetaKafkaConsumerPosition )
const ( DBInsert = "insert" DBUpdate = "update" DBDelete = "delete" )
db operation types
const ( FileStateStore = "file" ZooKeeperStateStore = "zookeeper" )
Variables ¶
This section is empty.
Functions ¶
func MarshalMysqlBinlogPosition ¶
func MarshalMysqlBinlogPosition(p *MysqlBinlogPosition) ([]byte, error)
func NoSchemaError ¶
func SetComponentBuilderInstance ¶
func SetComponentBuilderInstance(b ComponentBuilder)
func UnmarshalMysqlBinlogPosition ¶
func UnmarshalMysqlBinlogPosition(p *MysqlBinlogPosition, data []byte) (err error)
Types ¶
type BaseComponent ¶
type BaseComponent struct { ID string // contains filtered or unexported fields }
func NewBaseComponent ¶
func NewBaseComponent() *BaseComponent
func (*BaseComponent) Configure ¶
func (c *BaseComponent) Configure(config StringMap) error
func (*BaseComponent) Errors ¶
func (c *BaseComponent) Errors() chan error
func (*BaseComponent) GetID ¶
func (c *BaseComponent) GetID() string
func (*BaseComponent) GetLogger ¶
func (c *BaseComponent) GetLogger() *log.Logger
func (*BaseComponent) RaiseError ¶
func (c *BaseComponent) RaiseError(err error)
RaiseError is used when there's no msg to ack.
func (*BaseComponent) SetErrors ¶
func (c *BaseComponent) SetErrors(errChan chan error)
func (*BaseComponent) SetLogger ¶
func (c *BaseComponent) SetLogger(logger *log.Logger)
func (*BaseComponent) Start ¶
func (c *BaseComponent) Start() error
func (*BaseComponent) Stop ¶
func (c *BaseComponent) Stop()
type BaseInput ¶
type BaseInput struct { *BaseComponent // contains filtered or unexported fields }
func NewBaseInput ¶
func NewBaseInput() *BaseInput
func (*BaseInput) GetLastAckError ¶
func (*BaseInput) SetLastAckError ¶
type BaseOutput ¶
type BaseOutput struct { *BaseComponent // contains filtered or unexported fields }
func NewBaseOutput ¶
func NewBaseOutput() *BaseOutput
func (*BaseOutput) GetInput ¶
func (out *BaseOutput) GetInput() Input
func (*BaseOutput) SetInput ¶
func (out *BaseOutput) SetInput(input Input)
type Column ¶
type Column struct { Name string Index int Type ColumnType // Description from mysql metadata RawType string IsPrimaryKey bool IsNullable bool IsUnsigned bool //some types may have length property Length int DefaultValue sql.NullString }
Column represents a mysql table column
type ColumnType ¶
type ColumnType = int
const ( TypeOther ColumnType = iota + 1 TypeTinyInt TypeSmallInt TypeMediumInt TypeInt TypeBigInt TypeDecimal TypeFloat TypeDouble TypeBit TypeDate TypeDatetime TypeTimestamp TypeTime TypeYear TypeChar TypeVarchar TypeBinary TypeVarBinary TypeTinyBlob TypeBlob TypeMediumBlob TypeLongBlob TypeTinyText TypeText TypeMediumText TypeLongText TypeEnum TypeSet TypeJson )
mysql column types, see https://dev.mysql.com/doc/refman/8.0/en/data-types.html
type Component ¶
type Component interface { Configurable LifeCycle Errorable }
Component is an entity which has id and lifecycle and can be configured and error able.
type ComponentBuilder ¶
type ComponentBuilder interface { // RegisterComponent after registering builder is able to create component of RegisterComponent(typeName string, constructor ComponentConstructor) CreatePipeline(config StringMap) (pipe Pipeline, err error) CreateProcessor(config StringMap) (processor Processor, err error) }
ComponentBuilder is the factory of Components
func GetComponentBuilderInstance ¶
func GetComponentBuilderInstance() ComponentBuilder
type ComponentConstructor ¶
type ComponentConstructor func() Component
ComponentConstructor represents a Component's constructor.
type Configurable ¶
type DBChangeEvent ¶
type DBChangeEvent struct { ID string // id Database string // database name Table string // table name DBTime uint64 // binlog time EventTime uint64 // time event is created Operation string // insert/update/delete OldRow map[string]interface{} // DB row values before change NewRow map[string]interface{} // DB row values after change ExtraInfo map[string]interface{} // can put everything else here }
DBChangeEvent is the standard object describes a database row change
func (*DBChangeEvent) GetColumns ¶
func (e *DBChangeEvent) GetColumns() []string
func (*DBChangeEvent) GetRow ¶
func (e *DBChangeEvent) GetRow() map[string]interface{}
type LifeCycle ¶
type LifeCycle interface { // Start represents start phase in lifecycle. Start() error // Stop represents stop phase in lifecycle. Stop() }
type Message ¶
type Message struct { Type string // message type Header *MessageHeader // message header Data interface{} // message data }
Message is the basic data container which is transferred throughout the whole system.
func NewMessage ¶
func (*Message) ColumnNames ¶
func (*Message) GetTableSchema ¶
func (*Message) GetVariable ¶
GetVariable get user customized variable
func (*Message) SetVariable ¶
SetVariable set user customized variable
type MessageHeader ¶
type MysqlBinlogPosition ¶
type MysqlBinlogPosition struct { BinlogName string // binlog filename BinlogPos uint32 // binlog position TxBinlogPos uint32 // last committed transaction binlog position Timestamp uint32 // binlog timestamp ServerID uint32 // mysql server_id ServerUUID string // server UUID TransactionID int64 // transaction ID GTIDSetString string // all GTIDs as string (for serialization) RowOffset int // offset of the row in current batch TransactionOffset int //offset in a transaction GTIDSet mysql.GTIDSet `json:"-"` // parsed GTID set (for runtime) }
MysqlBinlogPosition describes position in mysql binlog.
func (*MysqlBinlogPosition) SimpleCopy ¶
func (p *MysqlBinlogPosition) SimpleCopy() *MysqlBinlogPosition
SimpleCopy clones a new position instance but skip GTIDSet which costs much.
type MysqlDMLEvent ¶
type MysqlDMLEvent struct { Pos *MysqlBinlogPosition // replication position BinlogEvent *replication.BinlogEvent // binlog event from go mysql FullTableName string // mysql full table name - db.table Operation string // mysql operation type OldRow []interface{} // old DB row values NewRow []interface{} // new DB row values }
MysqlDMLEvent describes mysql binlog event
type Pipeline ¶
Pipeline is a component receives data from an input and sends it to an output, and during the process the data maybe processed by several Processors.
type StateStore ¶
type StateStore interface { Configurable Save(key string, value []byte) error Load(key string) ([]byte, error) GetType() string Close() }
StateStore stores states like position, statistics of a pipeline.