core

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 16, 2023 License: MIT Imports: 7 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TypeDML       = "dml"
	TypeJsonBytes = "json_bytes"
	TypeBytes     = "bytes"
	TypeDBChange  = "db_change"
)
View Source
const (
	MetaOther int = iota
	CustomVariable
	MetaMySqlPos
	MetaMySqlScanPos
	MetaTableSchema
	AckWaitGroup
	MetaError
	MetaKafkaConsumerSession
	MetaKafkaConsumerPosition
)
View Source
const (
	DBInsert = "insert"
	DBUpdate = "update"
	DBDelete = "delete"
)

db operation types

View Source
const (
	FileStateStore      = "file"
	ZooKeeperStateStore = "zookeeper"
)

Variables

This section is empty.

Functions

func MarshalMysqlBinlogPosition

func MarshalMysqlBinlogPosition(p *MysqlBinlogPosition) ([]byte, error)

func NoPKError

func NoPKError(db string, table string) error

func NoSchemaError

func NoSchemaError(db string, table string) error

func SetComponentBuilderInstance

func SetComponentBuilderInstance(b ComponentBuilder)

func TypeError

func TypeError(value interface{}, typeName string) error

func UnmarshalMysqlBinlogPosition

func UnmarshalMysqlBinlogPosition(p *MysqlBinlogPosition, data []byte) (err error)

Types

type BaseComponent

type BaseComponent struct {
	ID string
	// contains filtered or unexported fields
}

func NewBaseComponent

func NewBaseComponent() *BaseComponent

func (*BaseComponent) Configure

func (c *BaseComponent) Configure(config StringMap) error

func (*BaseComponent) Errors

func (c *BaseComponent) Errors() chan error

func (*BaseComponent) GetID

func (c *BaseComponent) GetID() string

func (*BaseComponent) GetLogger

func (c *BaseComponent) GetLogger() *log.Logger

func (*BaseComponent) RaiseError

func (c *BaseComponent) RaiseError(err error)

RaiseError is used when there's no msg to ack.

func (*BaseComponent) SetErrors

func (c *BaseComponent) SetErrors(errChan chan error)

func (*BaseComponent) SetLogger

func (c *BaseComponent) SetLogger(logger *log.Logger)

func (*BaseComponent) Start

func (c *BaseComponent) Start() error

func (*BaseComponent) Stop

func (c *BaseComponent) Stop()

type BaseInput

type BaseInput struct {
	*BaseComponent
	// contains filtered or unexported fields
}

func NewBaseInput

func NewBaseInput() *BaseInput

func (*BaseInput) GetLastAckError

func (in *BaseInput) GetLastAckError() error

func (*BaseInput) GetOutput

func (in *BaseInput) GetOutput() Output

func (*BaseInput) SetLastAckError

func (in *BaseInput) SetLastAckError(err error)

func (*BaseInput) SetOutput

func (in *BaseInput) SetOutput(output Output)

type BaseOutput

type BaseOutput struct {
	*BaseComponent
	// contains filtered or unexported fields
}

func NewBaseOutput

func NewBaseOutput() *BaseOutput

func (*BaseOutput) GetInput

func (out *BaseOutput) GetInput() Input

func (*BaseOutput) SetInput

func (out *BaseOutput) SetInput(input Input)

type Column

type Column struct {
	Name  string
	Index int
	Type  ColumnType
	// Description from mysql metadata
	RawType      string
	IsPrimaryKey bool
	IsNullable   bool
	IsUnsigned   bool
	//some types may have length property
	Length       int
	DefaultValue sql.NullString
}

Column represents a mysql table column

type ColumnType

type ColumnType = int
const (
	TypeOther ColumnType = iota + 1
	TypeTinyInt
	TypeSmallInt
	TypeMediumInt
	TypeInt
	TypeBigInt
	TypeDecimal
	TypeFloat
	TypeDouble
	TypeBit
	TypeDate
	TypeDatetime
	TypeTimestamp
	TypeTime
	TypeYear
	TypeChar
	TypeVarchar
	TypeBinary
	TypeVarBinary
	TypeTinyBlob
	TypeBlob
	TypeMediumBlob
	TypeLongBlob
	TypeTinyText
	TypeText
	TypeMediumText
	TypeLongText
	TypeEnum
	TypeSet
	TypeJson
)

mysql column types, see https://dev.mysql.com/doc/refman/8.0/en/data-types.html

type Component

type Component interface {
	Configurable
	LifeCycle
	Errorable
}

Component is an entity which has id and lifecycle and can be configured and error able.

type ComponentBuilder

type ComponentBuilder interface {
	// RegisterComponent after registering builder is able to create component of
	RegisterComponent(typeName string, constructor ComponentConstructor)
	CreatePipeline(config StringMap) (pipe Pipeline, err error)
	CreateProcessor(config StringMap) (processor Processor, err error)
}

ComponentBuilder is the factory of Components

func GetComponentBuilderInstance

func GetComponentBuilderInstance() ComponentBuilder

type ComponentConstructor

type ComponentConstructor func() Component

ComponentConstructor represents a Component's constructor.

type Configurable

type Configurable interface {
	// Configure reads a StringMap and make corresponding configurations.
	Configure(config StringMap) error
	// GetID returns the ID property.
	GetID() string
}

type DBChangeEvent

type DBChangeEvent struct {
	ID        string                 // id
	Database  string                 // database name
	Table     string                 // table name
	DBTime    uint64                 // binlog time
	EventTime uint64                 // time event is created
	Operation string                 // insert/update/delete
	OldRow    map[string]interface{} // DB row values before change
	NewRow    map[string]interface{} // DB row values after change
	ExtraInfo map[string]interface{} // can put everything else here
}

DBChangeEvent is the standard object describes a database row change

func (*DBChangeEvent) GetColumns

func (e *DBChangeEvent) GetColumns() []string

func (*DBChangeEvent) GetRow

func (e *DBChangeEvent) GetRow() map[string]interface{}

type Errorable

type Errorable interface {
	Errors() chan error
	SetErrors(errChan chan error)
}

type Input

type Input interface {
	Component
	SetOutput(output Output)
	Ack(msg *Message, err error)
	SetState(state []byte) error
	GetState() (state []byte, done bool)
}

type LifeCycle

type LifeCycle interface {
	// Start represents start phase in lifecycle.
	Start() error
	// Stop represents stop phase in lifecycle.
	Stop()
}

type LogAware

type LogAware interface {
	SetLogger(logger *log.Logger)
}

LogAware is used to inject logger to other components

type Message

type Message struct {
	Type   string         // message type
	Header *MessageHeader // message header
	Data   interface{}    // message data
}

Message is the basic data container which is transferred throughout the whole system.

func NewMessage

func NewMessage(typeName string) *Message

func (*Message) ColumnNames

func (m *Message) ColumnNames() []string

func (*Message) GetMeta

func (m *Message) GetMeta(id int) (interface{}, bool)

GetMeta get system meta value.

func (*Message) GetTableSchema

func (m *Message) GetTableSchema() (*Table, bool)

func (*Message) GetVariable

func (m *Message) GetVariable(name string) (interface{}, bool)

GetVariable get user customized variable

func (*Message) SetMeta

func (m *Message) SetMeta(id int, data interface{})

SetMeta set system meta value.

func (*Message) SetVariable

func (m *Message) SetVariable(name string, data interface{})

SetVariable set user customized variable

type MessageHeader

type MessageHeader struct {
	ID         string              // ID of the message
	Sequence   uint64              // In process unique message identifier
	CreateTime uint64              // Unix timestamp in nanoseconds
	MetaMap    map[int]interface{} // Other meta data
}

type MysqlBinlogPosition

type MysqlBinlogPosition struct {
	BinlogName        string        // binlog filename
	BinlogPos         uint32        // binlog position
	TxBinlogPos       uint32        // last committed transaction binlog position
	Timestamp         uint32        // binlog timestamp
	ServerID          uint32        // mysql server_id
	ServerUUID        string        // server UUID
	TransactionID     int64         // transaction ID
	GTIDSetString     string        // all GTIDs as string (for serialization)
	RowOffset         int           // offset of the row in current batch
	TransactionOffset int           //offset in a transaction
	GTIDSet           mysql.GTIDSet `json:"-"` // parsed GTID set (for runtime)
}

MysqlBinlogPosition describes position in mysql binlog.

func (*MysqlBinlogPosition) SimpleCopy

func (p *MysqlBinlogPosition) SimpleCopy() *MysqlBinlogPosition

SimpleCopy clones a new position instance but skip GTIDSet which costs much.

type MysqlDMLEvent

type MysqlDMLEvent struct {
	Pos           *MysqlBinlogPosition     // replication position
	BinlogEvent   *replication.BinlogEvent // binlog event from go mysql
	FullTableName string                   // mysql full table  name - db.table
	Operation     string                   // mysql operation type
	OldRow        []interface{}            // old DB row values
	NewRow        []interface{}            // new DB row values
}

MysqlDMLEvent describes mysql binlog event

type Output

type Output interface {
	Component
	SetInput(input Input)
	Process(msg *Message)
}

type Pipeline

type Pipeline interface {
	Input
	Output
	SetProcessors(processors []Processor)
}

Pipeline is a component receives data from an input and sends it to an output, and during the process the data maybe processed by several Processors.

type Processor

type Processor interface {
	Component
	Process(msg *Message) (skip bool, err error)
}

type StateStore

type StateStore interface {
	Configurable
	Save(key string, value []byte) error
	Load(key string) ([]byte, error)
	GetType() string
	Close()
}

StateStore stores states like position, statistics of a pipeline.

type StringMap

type StringMap = map[string]interface{}

type Table

type Table struct {
	TableName string
	DBName    string
	Columns   []*Column
	PKColumns []*Column
}

Table represents a mysql table

func (*Table) ColumnNames

func (t *Table) ColumnNames() []string

func (*Table) PKColumnNames

func (t *Table) PKColumnNames() []string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL