base

package
v0.0.0-...-303e327 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 22, 2023 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ApplyIndexKey = []byte("raft_apply_index")
	RaftMemberKey = []byte("#raft_members")
)

Functions

func EncodeProposeInfo

func EncodeProposeInfo(module string, operType int32, data []byte, ctx ProposeContext) []byte

EncodeProposeInfo encode propose info into []byte

func EncodeSnapshotData

func EncodeSnapshotData(src *SnapshotData) ([]byte, error)

Types

type ProposeContext

type ProposeContext struct {
	ReqID string
}

ProposeContext hold propose context info during the request life cycle

func (ProposeContext) Marshal

func (p ProposeContext) Marshal() (ret []byte, err error)

func (*ProposeContext) Unmarshal

func (p *ProposeContext) Unmarshal(r io.Reader) (err error)

type ProposeInfo

type ProposeInfo struct {
	Module   string
	OperType int32
	Data     []byte
	Context  ProposeContext
}

raft propose info

func DecodeProposeInfo

func DecodeProposeInfo(src []byte) *ProposeInfo

DecodeProposeInfo decode propose info from []byte

type RaftApplier

type RaftApplier interface {
	GetModuleName() string
	SetModuleName(module string)
	// Apply apply specified data into module manager
	Apply(ctx context.Context, operTypes []int32, data [][]byte, contexts []ProposeContext) error
	// Flush should flush all memory data into persistent storage ,like rocksdb etc
	Flush(ctx context.Context) error
	// NotifyLeaderChange notify leader host change
	NotifyLeaderChange(ctx context.Context, leader uint64, host string)
	// LoadData load data from applier's db
	LoadData(ctx context.Context) error
}

type RaftMember

type RaftMember struct {
	ID       uint64 `json:"id"`
	Host     string `json:"host"`
	Learner  bool   `json:"learner"`
	NodeHost string `json:"node_host"`
}

type RaftMembers

type RaftMembers struct {
	Mbs []RaftMember `json:"members"`
}

type RaftNode

type RaftNode struct {
	raftserver.RaftServer
	*RaftNodeConfig
	// contains filtered or unexported fields
}

func NewRaftNode

func NewRaftNode(cfg *RaftNodeConfig, raftDB *raftdb.RaftDB, snapshotDBs map[string]SnapshotDB) (*RaftNode, error)

func (*RaftNode) ApplyRaftSnapshot

func (r *RaftNode) ApplyRaftSnapshot(ctx context.Context, st raftserver.Snapshot) error

ApplyRaftSnapshot apply snapshot's data into db

func (*RaftNode) CreateRaftSnapshot

func (r *RaftNode) CreateRaftSnapshot(patchNum int) raftserver.Snapshot

func (*RaftNode) GetCurrentApplyIndex

func (r *RaftNode) GetCurrentApplyIndex() uint64

func (*RaftNode) GetLeaderHost

func (r *RaftNode) GetLeaderHost() string

func (*RaftNode) GetNodes

func (r *RaftNode) GetNodes() map[uint64]string

func (*RaftNode) GetRaftMembers

func (r *RaftNode) GetRaftMembers(ctx context.Context) ([]RaftMember, error)

func (*RaftNode) GetStableApplyIndex

func (r *RaftNode) GetStableApplyIndex() uint64

func (*RaftNode) ModuleApply

func (r *RaftNode) ModuleApply(ctx context.Context, module string, operTypes []int32, datas [][]byte, contexts []ProposeContext) error

func (*RaftNode) NotifyLeaderChange

func (r *RaftNode) NotifyLeaderChange(ctx context.Context, leader uint64, host string)

func (*RaftNode) RecordApplyIndex

func (r *RaftNode) RecordApplyIndex(ctx context.Context, index uint64, isFlush bool) (err error)

func (*RaftNode) RecordRaftMember

func (r *RaftNode) RecordRaftMember(ctx context.Context, member RaftMember, isDelete bool) error

func (*RaftNode) RegistRaftApplier

func (r *RaftNode) RegistRaftApplier(target interface{})

registRaftApplier use reflect to find out all RaftApplier and register

func (*RaftNode) SetLeaderHost

func (r *RaftNode) SetLeaderHost(idx uint64, host string)

func (*RaftNode) SetRaftServer

func (r *RaftNode) SetRaftServer(raftServer raftserver.RaftServer)

func (*RaftNode) Start

func (r *RaftNode) Start()

func (*RaftNode) Stop

func (r *RaftNode) Stop()

type RaftNodeConfig

type RaftNodeConfig struct {
	FlushNumInterval    uint64       `json:"flush_num_interval"`
	FlushTimeIntervalS  int          `json:"flush_time_interval_s"`
	TruncateNumInterval uint64       `json:"truncate_num_interval"`
	NodeProtocol        string       `json:"node_protocol"`
	Members             []RaftMember `json:"members"`
	ApplyFlush          bool         `json:"apply_flush"`

	ApplyIndex uint64 `json:"-"`
}

type SnapshotDB

type SnapshotDB interface {
	GetAllCfNames() []string
	kvstore.KVStore
}

type SnapshotData

type SnapshotData struct {
	Header SnapshotItem
	Key    []byte
	Value  []byte
}

func DecodeSnapshotData

func DecodeSnapshotData(reader io.Reader) (ret *SnapshotData, err error)

type SnapshotItem

type SnapshotItem struct {
	DbName string `json:"db_name"`
	CfName string `json:"cf_name"`
	// contains filtered or unexported fields
}

type TaskDistribution

type TaskDistribution struct {
	// contains filtered or unexported fields
}

func NewTaskDistribution

func NewTaskDistribution(concurrency int, taskBuffLength int) *TaskDistribution

func (*TaskDistribution) Close

func (t *TaskDistribution) Close()

func (*TaskDistribution) Run

func (t *TaskDistribution) Run(taskIdx int, task func())

type VolumeTaskType

type VolumeTaskType uint8

volume task type

const (
	VolumeTaskTypeLock VolumeTaskType = VolumeTaskType(iota + 1)
	VolumeTaskTypeUnlock
)

func (VolumeTaskType) String

func (t VolumeTaskType) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL