Documentation ¶
Index ¶
- Constants
- func GetRowDataFromOp(op *gtm.Op) *map[string]interface{}
- func OplogPositionValueDecoder(v string) (interface{}, error)
- func OplogPositionValueEncoder(v interface{}) (string, error)
- func SetupInitialPosition(cache position_store.PositionCacheInterface, ...) error
- func UpdateCurrentPositionValue(cache position_store.PositionCacheInterface, ...) error
- type OplogChecker
- type OplogHeartbeat
- type OplogPositionValue
- type OplogTailer
- type OplogTailerOpt
- type PluginConfig
Constants ¶
View Source
const Name = "mongo-stream"
View Source
const OplogCheckerCollectionName = "heartbeat"
Variables ¶
This section is empty.
Functions ¶
func GetRowDataFromOp ¶
func SetupInitialPosition ¶
func SetupInitialPosition(cache position_store.PositionCacheInterface, startPositionInSpec *config.MongoPosition) error
func UpdateCurrentPositionValue ¶
func UpdateCurrentPositionValue(cache position_store.PositionCacheInterface, positionValue config.MongoPosition) error
Types ¶
type OplogChecker ¶
type OplogChecker struct {
// contains filtered or unexported fields
}
func NewOplogChecker ¶
func (*OplogChecker) MarkActive ¶
func (checker *OplogChecker) MarkActive(data map[string]interface{})
func (*OplogChecker) Run ¶
func (checker *OplogChecker) Run()
func (*OplogChecker) Stop ¶
func (checker *OplogChecker) Stop()
type OplogHeartbeat ¶
type OplogPositionValue ¶
type OplogPositionValue struct { StartPosition *config.MongoPosition `json:"start_position" bson:"start_position"` CurrentPosition config.MongoPosition `json:"current_position" bson:"current_position"` }
func GetPositionValue ¶
func GetPositionValue(cache position_store.PositionCacheInterface) (OplogPositionValue, error)
type OplogTailer ¶
type OplogTailer struct {
// contains filtered or unexported fields
}
func NewOplogTailer ¶
func NewOplogTailer(opts *OplogTailerOpt) *OplogTailer
func (*OplogTailer) AfterMsgCommit ¶
func (tailer *OplogTailer) AfterMsgCommit(msg *core.Msg) error
func (*OplogTailer) Run ¶
func (tailer *OplogTailer) Run()
func (*OplogTailer) SendDeadSignal ¶
func (tailer *OplogTailer) SendDeadSignal() error
func (*OplogTailer) Stop ¶
func (tailer *OplogTailer) Stop()
func (*OplogTailer) Wait ¶
func (tailer *OplogTailer) Wait()
type OplogTailerOpt ¶
type OplogTailerOpt struct {
// contains filtered or unexported fields
}
type PluginConfig ¶
type PluginConfig struct { // MongoSource *config.MongoSource `mapstructure:"source" toml:"source" json:"source"` Source *config.MongoConnConfig `mapstructure:"source" toml:"source" json:"source"` StartPosition *config.MongoPosition `mapstructure:"start-position" toml:"start-position" json:"start-position"` GtmConfig *config.GtmConfig `mapstructure:"gtm-config" toml:"gtm-config" json:"gtm-config"` }
Click to show internal directories.
Click to hide internal directories.