raftkv

package
v0.0.0-...-ea8e5dc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 4, 2020 License: MIT Imports: 14 Imported by: 0

README

使用GO 语言实现一个分布式KV数据库

#part1

主要是依赖之前的 分布式raft 一直性协议,这里即是写一个client 和server 来存储 或读取数据,代码又有注释这里就不在赘述了

主要提供 Get, Put Append 操作

Get 操作代码

主要是通过,key 来获取相应的value, 如果key 不存在,那么会是空"" '''

func (kv *RaftKV) Get(args *GetArgs, reply *GetReply) { // Your code here. entry := Op{Kind:"Get",Key:args.Key,Id:args.Id,ReqId:args.ReqID}

ok := kv.AppendEntryToLog(entry)
if !ok {
	reply.WrongLeader = true
} else {
	reply.WrongLeader = false

	reply.Err = OK
	kv.mu.Lock()
	reply.Value = kv.db[args.Key]
	kv.ack[args.Id] = args.ReqID
	//log.Printf("%d get:%v value:%s\n",kv.me,entry,reply.Value)
	kv.mu.Unlock()
}

} '''

方法

func (kv *RaftKV) AppendEntryToLog(entry Op) bool 主要调用之前 raft 那边的Start 方法,将日志记录到raft 中的log[]之中,之所以这样写是更好地方便判断该KEY 是否存在

方法Put 操作 不言而喻,将相应的key value 放入到log[] 中,当然这里为了方便用于更好地实现,这里将数据里面的值 存在 map 中 在apply 方法中是实现 '''

func (kv *RaftKV) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
	// Your code here.
	entry := Op{Kind:args.Op,Key:args.Key,Value:args.Value,Id:args.Id,ReqId:args.ReqID}
	ok := kv.AppendEntryToLog(entry)
	if !ok {
		reply.WrongLeader = true
	} else {
		reply.WrongLeader = false
		reply.Err = OK
	}
}

'''

''' func (kv *RaftKV) Apply(args Op) { switch args.Kind { case "Put": kv.db[args.Key] = args.Value case "Append": kv.db[args.Key] += args.Value } kv.ack[args.Id] = args.ReqId }

'''

由于raft sever 需要等待 raft 达成一致,在开始的时候操作 put get 的时候 需要时用 applyCh chan,避免找出死锁

raft server 在启动之后需要立即返回,所以在真正实现的逻辑的时候使用go routing,通过不同的类型,不同的处理

'''

go func() {
		for {
			msg := <-kv.applyCh
			if msg.UseSnapshot {
				var LastIncludedIndex int
				var LastIncludedTerm int

				r := bytes.NewBuffer(msg.Snapshot)
				d := gob.NewDecoder(r)

				kv.mu.Lock()
				d.Decode(&LastIncludedIndex)
				d.Decode(&LastIncludedTerm)
				kv.db = make(map[string]string)
				kv.ack = make(map[int64]int)
				d.Decode(&kv.db)
				d.Decode(&kv.ack)
				kv.mu.Unlock()
			} else {
				op := msg.Command.(Op)
				kv.mu.Lock()
				if !kv.CheckDup(op.Id,op.ReqId) {
					kv.Apply(op)
				}

				ch,ok := kv.result[msg.Index]
				if ok {
					select {
					case <-kv.result[msg.Index]:
					default:
					}
					ch <- op
				} else {
					kv.result[msg.Index] = make(chan Op, 1)
				}

				//need snapshot
				if maxraftstate != -1 && kv.rf.GetPerisistSize() > maxraftstate {
					w := new(bytes.Buffer)
					e := gob.NewEncoder(w)
					e.Encode(kv.db)
					e.Encode(kv.ack)
					data := w.Bytes()
					go kv.rf.StartSnapshot(data,msg.Index)
				}
				kv.mu.Unlock()
			}
		}
	}()

'''

Documentation

Index

Constants

View Source
const (
	OK       = "OK"
	ErrNoKey = "ErrNoKey"
)
View Source
const Debug = 0

Variables

This section is empty.

Functions

func DPrintf

func DPrintf(format string, a ...interface{}) (n int, err error)

Types

type Clerk

type Clerk struct {
	// contains filtered or unexported fields
}

func MakeClerk

func MakeClerk(servers []*labrpc.ClientEnd) *Clerk

func (*Clerk) Append

func (ck *Clerk) Append(key string, value string)

func (*Clerk) Get

func (ck *Clerk) Get(key string) string

fetch the current value for a key. returns "" if the key does not exist. keeps trying forever in the face of all other errors.

you can send an RPC with code like this: ok := ck.servers[i].Call("RaftKV.Get", &args, &reply)

the types of args and reply (including whether they are pointers) must match the declared types of the RPC handler function's arguments. and reply must be passed as a pointer.

func (*Clerk) Put

func (ck *Clerk) Put(key string, value string)

func (*Clerk) PutAppend

func (ck *Clerk) PutAppend(key string, value string, op string)

shared by Put and Append.

you can send an RPC with code like this: ok := ck.servers[i].Call("RaftKV.PutAppend", &args, &reply)

the types of args and reply (including whether they are pointers) must match the declared types of the RPC handler function's arguments. and reply must be passed as a pointer.

type Err

type Err string

type GetArgs

type GetArgs struct {
	Key string
	// You'll have to add definitions here.
	Id    int64
	ReqID int
}

type GetReply

type GetReply struct {
	WrongLeader bool
	Err         Err
	Value       string
}

type Op

type Op struct {
	// Your definitions here.
	// Field names must start with capital letters,
	// otherwise RPC will break.
	Kind  string //"Put" or "Append" "Get"
	Key   string
	Value string
	Id    int64
	ReqId int
}

type PutAppendArgs

type PutAppendArgs struct {
	// You'll have to add definitions here.
	Key   string
	Value string
	Op    string // "Put" or "Append"
	// You'll have to add definitions here.
	// Field names must start with capital letters,
	// otherwise RPC will break.
	Id    int64
	ReqID int
}

Put or Append

type PutAppendReply

type PutAppendReply struct {
	WrongLeader bool
	Err         Err
}

type RaftKV

type RaftKV struct {
	// contains filtered or unexported fields
}

func StartKVServer

func StartKVServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int) *RaftKV

servers[] contains the ports of the set of servers that will cooperate via Raft to form the fault-tolerant key/value service. me is the index of the current server in servers[]. the k/v server should store snapshots with persister.SaveSnapshot(), and Raft should save its state (including log) with persister.SaveRaftState(). the k/v server should snapshot when Raft's saved state exceeds maxraftstate bytes, in order to allow Raft to garbage-collect its log. if maxraftstate is -1, you don't need to snapshot. StartKVServer() must return quickly, so it should start goroutines for any long-running work.

func (*RaftKV) AppendEntryToLog

func (kv *RaftKV) AppendEntryToLog(entry Op) bool

func (*RaftKV) Apply

func (kv *RaftKV) Apply(args Op)

func (*RaftKV) CheckDup

func (kv *RaftKV) CheckDup(id int64, reqid int) bool

func (*RaftKV) Get

func (kv *RaftKV) Get(args *GetArgs, reply *GetReply)

func (*RaftKV) Kill

func (kv *RaftKV) Kill()

the tester calls Kill() when a RaftKV instance won't be needed again. you are not required to do anything in Kill(), but it might be convenient to (for example) turn off debug output from this instance.

func (*RaftKV) PutAppend

func (kv *RaftKV) PutAppend(args *PutAppendArgs, reply *PutAppendReply)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL