fullcache

package module
v0.0.0-...-634e303 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 27, 2022 License: MIT Imports: 18 Imported by: 0

README

Binlog Fullcache

Feature Overview

  • Support for MySQL-Redis triggering cache updates
  • Support for MySQL-Elasticsearch triggering cache updates

##Example

func main() {
	SyncerRun()
}

func SyncerRun() {
	// demo mysql binlog parser
	/*
	1. local myslq server binlog ON and row
	2.
	 */
	//tables := make(map[replication.EventType]string)
	cfg := replication.BinlogSyncerConfig {
		ServerID: 100,
		Flavor:   "mysql",
		Host:     "127.0.0.1",
		Port:     3306,
		User:     "root",
		Password: "123456",
	}
	syncer := replication.NewBinlogSyncer(cfg)
	// Start sync with specified binlog file and position
	streamer, _ := syncer.StartSync(mysql.Position{"mysql-bin.000003", 154})
	for {
		ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
		ev, err := streamer.GetEvent(ctx)
		cancel()

		if err == context.DeadlineExceeded {
			// meet timeout
			continue
		}
		switch ev.Header.EventType {
		case replication.TABLE_MAP_EVENT:
			e, err := ev.Event.(*replication.TableMapEvent)
			fmt.Println(string(e.Table),e.TableID, err)

		}
		ev.Dump(os.Stdout)
		fmt.Println("============")
	}
}

Contribute

If you want to pull request, please see CONTRIBUTING.

Refers

Canal

License

MIT

Documentation

Index

Constants

View Source
const (
	EsCreated = "created"
	EsUpdated = "updated"
	EsDeleted = "deleted"
)
View Source
const BufferSize int = 256 // experiential
View Source
const DefaultInternal = 30 * time.Second
View Source
const MaxQueueLen int = 64 // experiential
View Source
const OneYear = 24 * 365 * time.Hour
View Source
const SearchTimeOut = "请求搜索超时"

Variables

View Source
var ErrNil = errors.New("err: Nil Cache")

Functions

This section is empty.

Types

type BinlogListener

type BinlogListener struct {
	// contains filtered or unexported fields
}

Listener to and parse binlog

func InitBinlogListener

func InitBinlogListener(serverId int, host string, port int, db, user, password string,
	iLogWriter, eLogWriter io.Writer) *BinlogListener

func (*BinlogListener) Loop

func (bl *BinlogListener) Loop()

func (*BinlogListener) Pub

func (bl *BinlogListener) Pub(msg Msg)

func (*BinlogListener) Subscribe

func (bl *BinlogListener) Subscribe(ch chan Msg)

type DispatchFunc

type DispatchFunc func(table, pk string) error

type Empty

type Empty struct{}

type EsCache

type EsCache struct {
	// contains filtered or unexported fields
}

func NewEsKV

func NewEsKV(db *xorm.Engine, es *elastic.Client, index string, onMiss func(db *xorm.Session, key string) (value string, err error)) *EsCache

func (*EsCache) Del

func (es *EsCache) Del(key string) (err error)

func (*EsCache) Get

func (es *EsCache) Get(key string) (value string, err error)

func (*EsCache) OnMiss

func (es *EsCache) OnMiss(key string) (value string, err error)

func (*EsCache) Set

func (es *EsCache) Set(key, value string) error

type EsGetResp

type EsGetResp struct {
	Id     string      `json:"_id"`
	Found  bool        `json:"found"`
	Source interface{} `json:"_source"`
}

GET

{
    "_index": "warehouse",
    "_type": "order",
    "_id": "43020156",
    "found": false
}

type EsUpdateResp

type EsUpdateResp struct {
	Id     string `json:"_id"`
	Result string `json:"result"`
}

type ICache

type ICache interface {
	Get(key string) (string, error)
	Set(key, value string) error
	Del(key string) error
	OnMiss(key string) (value string, err error)
}

type Msg

type Msg struct {
	Type MsgType
	Data string
}

TODO other caches subscribe this listener

type MsgType

type MsgType uint8
const (
	MsgUpdateRow  MsgType = 0
	MsgAlterTable MsgType = 1
	MsgDeleteRow  MsgType = 2
)

type PKCache

type PKCache struct {
	// contains filtered or unexported fields
}

schema.table.PK -> data

func InitPKCache

func InitPKCache(schema string, bl *BinlogListener) *PKCache

func (*PKCache) Del

func (c *PKCache) Del(table, pk string) (err error)

func (*PKCache) Dispatch

func (c *PKCache) Dispatch(table string, f DispatchFunc)

func (*PKCache) Get

func (c *PKCache) Get(table, pk string) (data string, err error)

func (*PKCache) NewICache

func (c *PKCache) NewICache(ic ICache, table string)

func (*PKCache) Register

func (c *PKCache) Register(bl *BinlogListener)

func (*PKCache) Set

func (c *PKCache) Set(table, pk, data string) (err error)

func (*PKCache) Update

func (c *PKCache) Update(table, pk string) (err error)

type RedisCache

type RedisCache struct {
	// contains filtered or unexported fields
}

func NewRedisKV

func NewRedisKV(db *xorm.Engine, r *redis.Client, prefix string, onMiss func(db *xorm.Session,
	key string) (value string,
	err error)) *RedisCache

func (*RedisCache) Del

func (r *RedisCache) Del(key string) (err error)

func (*RedisCache) Get

func (r *RedisCache) Get(key string) (value string, err error)

func (*RedisCache) K

func (r *RedisCache) K(key string) string

func (*RedisCache) OnMiss

func (r *RedisCache) OnMiss(key string) (value string, err error)

func (*RedisCache) Set

func (r *RedisCache) Set(key, value string) error

type TableSchema

type TableSchema struct {
	Fields  []string
	PKIndex []int
	PK      string
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL