core

package
v0.9.15 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 21, 2019 License: Apache-2.0 Imports: 9 Imported by: 2

Documentation

Index

Constants

View Source
const (
	PipelineTag = "pipeline"
)

Variables

View Source
var (
	MsgCreateToEmitDurationHistogram = prometheus.NewHistogramVec(
		prometheus.HistogramOpts{
			Namespace: "drc_v2",
			Subsystem: "gravity",
			Name:      "msg_create_to_emit_duration",
			Help:      "Bucketed histogram of processing time (s) from msg create to msg emit",
			Buckets:   prometheus.ExponentialBuckets(0.0005, 2, 22),
		}, []string{PipelineTag})

	MsgEmitToSubmitDurationHistogram = prometheus.NewHistogramVec(
		prometheus.HistogramOpts{
			Namespace: "drc_v2",
			Subsystem: "gravity",
			Name:      "msg_emit_to_submit_duration",
			Help:      "Bucketed histogram of processing time (s) from msg emit to submit",
			Buckets:   prometheus.ExponentialBuckets(0.0005, 2, 22),
		}, []string{PipelineTag})

	MsgSubmitToAckDurationHistogram = prometheus.NewHistogramVec(
		prometheus.HistogramOpts{
			Namespace: "drc_v2",
			Subsystem: "gravity",
			Name:      "msg_submit_to_ack_duration",
			Help:      "Bucketed histogram of processing time (s) from msg submit to ack",
			Buckets:   prometheus.ExponentialBuckets(0.0005, 2, 22),
		}, []string{PipelineTag})

	MsgCreateToAckDurationHistogram = prometheus.NewHistogramVec(
		prometheus.HistogramOpts{
			Namespace: "drc_v2",
			Subsystem: "gravity",
			Name:      "msg_create_to_ack_duration",
			Help:      "Bucketed histogram of processing time (s) from msg create to ack",
			Buckets:   prometheus.ExponentialBuckets(0.0005, 2, 22),
		}, []string{PipelineTag})
)

Functions

func AddMetrics

func AddMetrics(pipelineName string, msg *Msg)

func HashConfig

func HashConfig(config string) string

func SafeEncodeString added in v0.9.1

func SafeEncodeString(s string) string

Types

type AfterMsgCommitFunc

type AfterMsgCommitFunc func(m *Msg) error

type AsynchronousOutput

type AsynchronousOutput interface {
	Start(msgAcker MsgAcker) error
	Output
}

type DDLMsg

type DDLMsg struct {
	Statement string
	AST       ast.StmtNode
}

type DMLMsg

type DMLMsg struct {
	Operation DMLOp
	Data      map[string]interface{}
	Old       map[string]interface{}
	Pks       map[string]interface{}
}

type DMLOp

type DMLOp string
const (
	Insert DMLOp = "insert"
	Update DMLOp = "update"
	Delete DMLOp = "delete"
)

type Emitter

type Emitter interface {
	// Emit use fs to modify messages and submit job to scheduler
	// msg is the message to send
	//
	// TODO better interface for Emit, so that we don't need InputStreamKey...
	//
	Emit(msg *Msg) error
}

type IFilter

type IFilter interface {
	Configure(configData map[string]interface{}) error
	Filter(msg *Msg) (continueNext bool, err error)
}

type IFilterFactory

type IFilterFactory interface {
	NewFilter() IFilter
}

type IMatcher

type IMatcher interface {
	Configure(arg interface{}) error
	Match(msg *Msg) bool
}

type IMatcherFactory

type IMatcherFactory interface {
	NewMatcher() IMatcher
}

type IMatcherGroup

type IMatcherGroup []IMatcher

func (IMatcherGroup) Match

func (matcherGroup IMatcherGroup) Match(msg *Msg) bool

Match returns true if all matcher returns true

type Input

type Input interface {
	Start(emitter Emitter) error
	Close()
	Stage() config.InputMode
	// TODO position store can be hidden by input plugin
	// or we should use a configuration dedicated for position store
	NewPositionStore() (position_store.PositionStore, error)
	PositionStore() position_store.PositionStore
	Done() chan position_store.Position
	SendDeadSignal() error // for test only
	Wait()
	Identity() uint32
}

type Metrics

type Metrics struct {
	MsgCreateTime time.Time
	MsgEmitTime   time.Time
	MsgSubmitTime time.Time
	MsgAckTime    time.Time
}

metrics definitions

type Msg

type Msg struct {
	Metrics

	Type     MsgType
	Host     string
	Database string
	Table    string

	DdlMsg *DDLMsg
	DmlMsg *DMLMsg

	//
	// Timestamp, TimeZone, Oplog will be deprecated.
	//
	Timestamp time.Time
	TimeZone  *time.Location
	Oplog     *gtm.Op

	InputStreamKey  *string
	OutputStreamKey *string
	Done            chan struct{}

	InputSequence *int64

	InputContext        interface{}
	AfterCommitCallback AfterMsgCommitFunc
}

func (Msg) GetPkSign

func (msg Msg) GetPkSign() string

type MsgAcker

type MsgAcker interface {
	AckMsg(msg *Msg) error
}

type MsgSubmitter

type MsgSubmitter interface {
	SubmitMsg(msg *Msg) error
}

type MsgType

type MsgType string
const (
	MsgDML MsgType = "dml"
	MsgDDL MsgType = "ddl"

	// ctl message is internal messages like
	// heartbeat, barrier that is not dml/ddl.
	MsgCtl MsgType = "ctl"

	// MsgCloseInputStream is used to tell the scheduler to close a stream
	MsgCloseInputStream MsgType = "closeInput"
)

type Output

type Output interface {
	Execute(msgs []*Msg) error
	Close()
}

type Scheduler

type Scheduler interface {
	MsgSubmitter
	MsgAcker
	Healthy() bool
	Start(output Output) error
	Close()
}

type SynchronousOutput

type SynchronousOutput interface {
	Start() error
	Output
}

type TaskReportStage

type TaskReportStage string
const (
	ReportStageFull        TaskReportStage = "Full"
	ReportStageIncremental TaskReportStage = "Incremental"
)

type TaskReportStatus

type TaskReportStatus struct {
	Name       string          `json:"name"`
	ConfigHash string          `json:"configHash"`
	Position   string          `json:"position"`
	Stage      TaskReportStage `json:"stage"`
	Version    string          `json:"version"`
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL