Documentation
¶
Index ¶
- Constants
- Variables
- func IsPositionStoreEvent(schemaName string, tableName string) bool
- func StringDecoder(s string) (interface{}, error)
- func StringEncoder(v interface{}) (string, error)
- type MongoPosition
- type MongoPositionRet
- type Position
- type PositionCacheInterface
- type PositionEntity
- type PositionMeta
- type PositionRepo
- type PositionValueDecoder
- type PositionValueEncoder
- type PositionWrapper
Constants ¶
View Source
const (
Version = "1.0"
)
Variables ¶
View Source
var DefaultFlushPeriod = 5 * time.Second
Functions ¶
func IsPositionStoreEvent ¶
func StringDecoder ¶ added in v0.9.19
func StringEncoder ¶ added in v0.9.19
Types ¶
type MongoPosition ¶
type MongoPosition struct { StartPosition config.MongoPosition `json:"start_position" bson:"start_position"` CurrentPosition config.MongoPosition `json:"current_position" bson:"current_position"` }
MongoPosition and PositionEntity is here to keep backward compatible with previous position format
type MongoPositionRet ¶ added in v0.9.19
type MongoPositionRet struct { Version string `json:"version" bson:"version"` Name string `json:"name" bson:"name"` Stage string `json:"stage" bson:"stage"` Value string `json:"value" bson:"value"` LastUpdate string `json:"last_update" bson:"last_update"` }
MongoPositionRet is the new format
type Position ¶
type Position struct { PositionMeta Value interface{} `bson:"-" json:"-"` }
type PositionCacheInterface ¶ added in v0.9.17
type PositionCacheInterface interface { Start() error Close() Put(position Position) error // Get will get a value from cache, if there is no value inside the cache // it will try to get it from position repo Get() (position Position, exist bool, err error) GetEncodedPersistentPosition() (position PositionMeta, v string, exist bool, err error) Flush() error Clear() error }
func NewPositionCache ¶ added in v0.9.17
func NewPositionCache(pipelineName string, repo PositionRepo, encoder PositionValueEncoder, decoder PositionValueDecoder, flushDuration time.Duration) (PositionCacheInterface, error)
type PositionEntity ¶ added in v0.9.17
type PositionEntity struct { Name string `json:"name" bson:"name"` Stage string `json:"stage" bson:"stage"` MongoPosition `json:",inline" bson:",inline"` LastUpdate string `json:"last_update" bson:"last_update"` }
PositionEntity is the old format, will be deprecated
type PositionMeta ¶ added in v0.9.19
type PositionMeta struct { // Version is the schema version of position Version string `bson:"version" json:"version"` // Name is the unique name of a pipeline Name string Stage config.InputMode UpdateTime time.Time }
func (PositionMeta) Validate ¶ added in v0.9.19
func (meta PositionMeta) Validate() error
type PositionRepo ¶ added in v0.9.17
type PositionRepo interface { Get(pipelineName string) (PositionMeta, string, bool, error) Put(pipelineName string, positionMeta PositionMeta, v string) error Delete(pipelineName string) error Close() error }
func NewMemoRepo ¶ added in v0.9.17
func NewMemoRepo() PositionRepo
func NewMongoPositionRepo ¶ added in v0.9.17
func NewMongoPositionRepo(session *mgo.Session) (PositionRepo, error)
func NewMySQLRepo ¶ added in v0.9.17
func NewMySQLRepo(dbConfig *utils.DBConfig, annotation string) (PositionRepo, error)
type PositionValueDecoder ¶ added in v0.9.19
type PositionValueEncoder ¶ added in v0.9.19
type PositionWrapper ¶ added in v0.9.19
type PositionWrapper struct { PositionMeta MongoValue string `bson:"value" json:"value"` }
Click to show internal directories.
Click to hide internal directories.