position_store

package
v0.9.17 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 14, 2019 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Version = "1.0"
)

Variables

View Source
var DefaultFlushPeriod = 5 * time.Second

Functions

func IsPositionStoreEvent

func IsPositionStoreEvent(schemaName string, tableName string) bool

Types

type MongoPosition

type MongoPosition struct {
	StartPosition   config.MongoPosition `json:"start_position" bson:"start_position"`
	CurrentPosition config.MongoPosition `json:"current_position" bson:"current_position"`
}

MongoPosition and PositionEntity is here to keep backward compatible with previous position format

type Position

type Position struct {
	// Version is the schema version of position
	Version string
	// Name is the unique name of a pipeline
	Name       string
	Stage      config.InputMode
	Value      string
	UpdateTime time.Time
}

func (Position) Validate added in v0.9.17

func (p Position) Validate() error

type PositionCacheInterface added in v0.9.17

type PositionCacheInterface interface {
	Start() error
	Close()
	Put(position Position) error
	Get() (position Position, exist bool, err error)
	Flush() error
	Clear() error
}

func NewPositionCache added in v0.9.17

func NewPositionCache(pipelineName string, repo PositionRepo, flushDuration time.Duration) (PositionCacheInterface, error)

type PositionEntity added in v0.9.17

type PositionEntity struct {
	Name          string `json:"name" bson:"name"`
	Stage         string `json:"stage" bson:"stage"`
	MongoPosition `json:",inline" bson:",inline"`
	LastUpdate    string `json:"last_update" bson:"last_update"`
}

type PositionRepo added in v0.9.17

type PositionRepo interface {
	Get(pipelineName string) (Position, bool, error)
	Put(pipelineName string, position Position) error
	Delete(pipelineName string) error
	Close() error
}

func NewMemoRepo added in v0.9.17

func NewMemoRepo() PositionRepo

func NewMongoPositionRepo added in v0.9.17

func NewMongoPositionRepo(session *mgo.Session) (PositionRepo, error)

func NewMySQLRepo added in v0.9.17

func NewMySQLRepo(dbConfig *utils.DBConfig, annotation string) (PositionRepo, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL