Documentation ¶
Index ¶
- Constants
- Variables
- func AddMetrics(pipelineName string, msg *Msg)
- func HashConfig(config string) string
- func SafeEncodeString(s string) string
- type AfterMsgCommitFunc
- type AsynchronousOutput
- type DDLMsg
- type DMLMsg
- type DMLOp
- type Emitter
- type IFilter
- type IFilterFactory
- type IMatcher
- type IMatcherFactory
- type IMatcherGroup
- type Input
- type Metrics
- type Msg
- type MsgAcker
- type MsgSubmitter
- type MsgType
- type Output
- type Scheduler
- type SynchronousOutput
- type TaskReportStage
- type TaskReportStatus
Constants ¶
View Source
const (
PipelineTag = "pipeline"
)
Variables ¶
View Source
var ( MsgCreateToEmitDurationHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "drc_v2", Subsystem: "gravity", Name: "msg_create_to_emit_duration", Help: "Bucketed histogram of processing time (s) from msg create to msg emit", Buckets: prometheus.ExponentialBuckets(0.0005, 2, 22), }, []string{PipelineTag}) MsgEmitToSubmitDurationHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "drc_v2", Subsystem: "gravity", Name: "msg_emit_to_submit_duration", Help: "Bucketed histogram of processing time (s) from msg emit to submit", Buckets: prometheus.ExponentialBuckets(0.0005, 2, 22), }, []string{PipelineTag}) MsgSubmitToAckDurationHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "drc_v2", Subsystem: "gravity", Name: "msg_submit_to_ack_duration", Help: "Bucketed histogram of processing time (s) from msg submit to ack", Buckets: prometheus.ExponentialBuckets(0.0005, 2, 22), }, []string{PipelineTag}) MsgCreateToAckDurationHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "drc_v2", Subsystem: "gravity", Name: "msg_create_to_ack_duration", Help: "Bucketed histogram of processing time (s) from msg create to ack", Buckets: prometheus.ExponentialBuckets(0.0005, 2, 22), }, []string{PipelineTag}) )
Functions ¶
func AddMetrics ¶
func HashConfig ¶
func SafeEncodeString ¶ added in v0.9.1
Types ¶
type AfterMsgCommitFunc ¶
type AsynchronousOutput ¶
type IFilterFactory ¶
type IFilterFactory interface {
NewFilter() IFilter
}
type IMatcherFactory ¶
type IMatcherFactory interface {
NewMatcher() IMatcher
}
type IMatcherGroup ¶
type IMatcherGroup []IMatcher
func (IMatcherGroup) Match ¶
func (matcherGroup IMatcherGroup) Match(msg *Msg) bool
Match returns true if all matcher returns true
type Input ¶
type Input interface { Start(emitter Emitter) error Close() Stage() config.InputMode // TODO position store can be hidden by input plugin // or we should use a configuration dedicated for position store NewPositionStore() (position_store.PositionStore, error) PositionStore() position_store.PositionStore Done() chan position_store.Position SendDeadSignal() error // for test only Wait() Identity() uint32 }
type Metrics ¶
type Metrics struct { MsgCreateTime time.Time MsgEmitTime time.Time MsgSubmitTime time.Time MsgAckTime time.Time }
metrics definitions
type Msg ¶
type Msg struct { Metrics Type MsgType Host string Database string Table string DdlMsg *DDLMsg DmlMsg *DMLMsg // // Timestamp, TimeZone, Oplog will be deprecated. // Timestamp time.Time TimeZone *time.Location Oplog *gtm.Op InputStreamKey *string OutputStreamKey *string Done chan struct{} InputSequence *int64 InputContext interface{} AfterCommitCallback AfterMsgCommitFunc }
type MsgSubmitter ¶
type Scheduler ¶
type Scheduler interface { MsgSubmitter MsgAcker Healthy() bool Start(output Output) error Close() }
type SynchronousOutput ¶
type TaskReportStage ¶
type TaskReportStage string
const ( ReportStageFull TaskReportStage = "Full" ReportStageIncremental TaskReportStage = "Incremental" )
type TaskReportStatus ¶
type TaskReportStatus struct { Name string `json:"name"` ConfigHash string `json:"configHash"` Position string `json:"position"` Stage TaskReportStage `json:"stage"` Version string `json:"version"` }
Source Files ¶
Click to show internal directories.
Click to hide internal directories.