position_store

package
v0.9.24 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 27, 2019 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Version = "1.0"
)

Variables

View Source
var DefaultFlushPeriod = 5 * time.Second

Functions

func IsPositionStoreEvent

func IsPositionStoreEvent(schemaName string, tableName string) bool

func StringDecoder added in v0.9.19

func StringDecoder(s string) (interface{}, error)

func StringEncoder added in v0.9.19

func StringEncoder(v interface{}) (string, error)

Types

type MongoPosition

type MongoPosition struct {
	StartPosition   config.MongoPosition `json:"start_position" bson:"start_position"`
	CurrentPosition config.MongoPosition `json:"current_position" bson:"current_position"`
}

MongoPosition and PositionEntity is here to keep backward compatible with previous position format

type MongoPositionRet added in v0.9.19

type MongoPositionRet struct {
	Version    string `json:"version" bson:"version"`
	Name       string `json:"name" bson:"name"`
	Stage      string `json:"stage" bson:"stage"`
	Value      string `json:"value" bson:"value"`
	LastUpdate string `json:"last_update" bson:"last_update"`
}

MongoPositionRet is the new format

type Position

type Position struct {
	PositionMeta
	Value interface{} `bson:"-" json:"-"`
}

func (Position) Validate added in v0.9.17

func (p Position) Validate() error

type PositionCacheInterface added in v0.9.17

type PositionCacheInterface interface {
	Start() error
	Close()
	Put(position Position) error

	// Get will get a value from cache, if there is no value inside the cache
	// it will try to get it from position repo
	Get() (position Position, exist bool, err error)

	GetEncodedPersistentPosition() (position PositionMeta, v string, exist bool, err error)
	Flush() error
	Clear() error
}

func NewPositionCache added in v0.9.17

func NewPositionCache(pipelineName string, repo PositionRepo, encoder PositionValueEncoder, decoder PositionValueDecoder, flushDuration time.Duration) (PositionCacheInterface, error)

type PositionEntity added in v0.9.17

type PositionEntity struct {
	Name          string `json:"name" bson:"name"`
	Stage         string `json:"stage" bson:"stage"`
	MongoPosition `json:",inline" bson:",inline"`
	LastUpdate    string `json:"last_update" bson:"last_update"`
}

PositionEntity is the old format, will be deprecated

type PositionMeta added in v0.9.19

type PositionMeta struct {
	// Version is the schema version of position
	Version string `bson:"version" json:"version"`
	// Name is the unique name of a pipeline
	Name       string
	Stage      config.InputMode
	UpdateTime time.Time
}

func (PositionMeta) Validate added in v0.9.19

func (meta PositionMeta) Validate() error

type PositionRepo added in v0.9.17

type PositionRepo interface {
	Get(pipelineName string) (PositionMeta, string, bool, error)
	Put(pipelineName string, positionMeta PositionMeta, v string) error
	Delete(pipelineName string) error
	Close() error
}

func NewMemoRepo added in v0.9.17

func NewMemoRepo() PositionRepo

func NewMongoPositionRepo added in v0.9.17

func NewMongoPositionRepo(session *mgo.Session) (PositionRepo, error)

func NewMySQLRepo added in v0.9.17

func NewMySQLRepo(dbConfig *utils.DBConfig, annotation string) (PositionRepo, error)

type PositionValueDecoder added in v0.9.19

type PositionValueDecoder func(s string) (interface{}, error)

type PositionValueEncoder added in v0.9.19

type PositionValueEncoder func(v interface{}) (string, error)

type PositionWrapper added in v0.9.19

type PositionWrapper struct {
	PositionMeta
	MongoValue string `bson:"value" json:"value"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL