core

package
v0.9.20 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 18, 2019 License: Apache-2.0 Imports: 9 Imported by: 2

Documentation

Index

Constants

This section is empty.

Variables

View Source
var PipelineName string

Functions

func HashConfig

func HashConfig(config string) string

func SafeEncodeString added in v0.9.1

func SafeEncodeString(s string) string

Types

type AfterMsgCommitFunc

type AfterMsgCommitFunc func(m *Msg) error

type AsynchronousOutput

type AsynchronousOutput interface {
	Start(msgAcker MsgAcker) error
	Output
}

type DDLMsg

type DDLMsg struct {
	Statement string
	AST       ast.StmtNode
}

type DMLMsg

type DMLMsg struct {
	Operation DMLOp
	Data      map[string]interface{}
	Old       map[string]interface{}
	Pks       map[string]interface{}
}

type DMLOp

type DMLOp string
const (
	Insert DMLOp = "insert"
	Update DMLOp = "update"
	Delete DMLOp = "delete"
)

type Emitter

type Emitter interface {
	// Emit use fs to modify messages and submit job to scheduler
	// msg is the message to send
	//
	// TODO better interface for Emit, so that we don't need InputStreamKey...
	//
	Emit(msg *Msg) error
}

type EmptyRouter added in v0.9.17

type EmptyRouter struct{}

func (EmptyRouter) Exists added in v0.9.17

func (EmptyRouter) Exists(msg *Msg) bool

type IFilter

type IFilter interface {
	Configure(configData map[string]interface{}) error
	Filter(msg *Msg) (continueNext bool, err error)
}

type IFilterFactory

type IFilterFactory interface {
	NewFilter() IFilter
}

type IMatcher

type IMatcher interface {
	Configure(arg interface{}) error
	Match(msg *Msg) bool
}

type IMatcherFactory

type IMatcherFactory interface {
	NewMatcher() IMatcher
}

type IMatcherGroup

type IMatcherGroup []IMatcher

func (IMatcherGroup) Match

func (matcherGroup IMatcherGroup) Match(msg *Msg) bool

Match returns true if all matcher returns true

type Input

type Input interface {
	Start(emitter Emitter, router Router, positionCache position_store.PositionCacheInterface) error
	Close()
	Stage() config.InputMode
	Done() chan position_store.Position
	SendDeadSignal() error // for test only
	Wait()
}

type Msg

type Msg struct {
	Phase

	Type     MsgType
	Host     string
	Database string
	Table    string

	DdlMsg *DDLMsg
	DmlMsg *DMLMsg

	//
	// Timestamp, TimeZone, Oplog will be deprecated.
	//
	Timestamp time.Time // event generated at source
	TimeZone  *time.Location
	Oplog     *gtm.Op

	InputStreamKey  *string
	OutputStreamKey *string
	Done            chan struct{}

	InputSequence *int64

	InputContext        interface{}
	AfterCommitCallback AfterMsgCommitFunc
}

func (*Msg) BeforeWindowMoveForward added in v0.9.20

func (msg *Msg) BeforeWindowMoveForward()

func (*Msg) EventTime added in v0.9.20

func (msg *Msg) EventTime() time.Time

func (Msg) GetPkSign

func (msg Msg) GetPkSign() string

func (*Msg) ProcessTime added in v0.9.20

func (msg *Msg) ProcessTime() time.Time

func (*Msg) SequenceNumber added in v0.9.20

func (msg *Msg) SequenceNumber() int64

type MsgAcker

type MsgAcker interface {
	AckMsg(msg *Msg) error
}

type MsgSubmitter

type MsgSubmitter interface {
	SubmitMsg(msg *Msg) error
}

type MsgType

type MsgType string
const (
	MsgDML MsgType = "dml"
	MsgDDL MsgType = "ddl"

	// ctl message is internal messages like
	// heartbeat, barrier that is not dml/ddl.
	MsgCtl MsgType = "ctl"

	// MsgCloseInputStream is used to tell the scheduler to close a stream
	MsgCloseInputStream MsgType = "closeInput"
)

type Output

type Output interface {
	Execute(msgs []*Msg) error
	GetRouter() Router
	Close()
}

type Phase added in v0.9.20

type Phase struct {
	EnterInput     time.Time
	EnterEmitter   time.Time // also leave input
	LeaveEmitter   time.Time
	EnterScheduler time.Time // also enter submitter
	LeaveScheduler time.Time // also leave acker
	LeaveSubmitter time.Time
	EnterAcker     time.Time // also leave output
	EnterOutput    time.Time
}

type PositionCacheCreator added in v0.9.17

type PositionCacheCreator interface {
	NewPositionCache() (position_store.PositionCacheInterface, error)
}

type Router added in v0.9.17

type Router interface {
	Exists(msg *Msg) bool
}

type Scheduler

type Scheduler interface {
	MsgSubmitter
	MsgAcker
	Healthy() bool
	Start(output Output) error
	Close()
}

type SynchronousOutput

type SynchronousOutput interface {
	Start() error
	Output
}

type TaskReportStage

type TaskReportStage string
const (
	ReportStageFull        TaskReportStage = "Full"
	ReportStageIncremental TaskReportStage = "Incremental"
)

type TaskReportStatus

type TaskReportStatus struct {
	Name       string          `json:"name"`
	ConfigHash string          `json:"configHash"`
	Position   string          `json:"position"`
	Stage      TaskReportStage `json:"stage"`
	Version    string          `json:"version"`
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL