chord

package module
v0.0.0-...-7295f39 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 15, 2019 License: MIT Imports: 14 Imported by: 0

README

Chord

[WIP] Implementation of Chord paper

Paper

https://pdos.csail.mit.edu/papers/ton:chord/paper-ton.pdf

Example Usage

package main

import (
	"github.com/arriqaaq/chord"
	"github.com/arriqaaq/chord/internal"
	"log"
	"os"
	"os/signal"
	"time"
)

func createNode(id string, addr string, joinNode *internal.Node) (*chord.Node, error) {

	cnf := chord.DefaultConfig()
	cnf.Id = id
	cnf.Addr = addr
	cnf.Timeout = 10 * time.Millisecond
	cnf.MaxIdle = 100 * time.Millisecond

	n, err := chord.NewNode(cnf, joinNode)
	return n, err
}


func main() {

	joinNode := chord.NewInode("1", "0.0.0.0:8001")

	h, err := createNode("8", "0.0.0.0:8003", joinNode)
	if err != nil {
		log.Fatalln(err)
		return
	}

	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt)
	<-c
	h.Stop()
}

References

This implementation helped me a lot in designing the code base https://github.com/r-medina/gmaj

TODO

  • Add more test cases
  • Add stats/prometheus stats

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ERR_NO_SUCCESSOR  = errors.New("cannot find successor")
	ERR_NODE_EXISTS   = errors.New("node with id already exists")
	ERR_KEY_NOT_FOUND = errors.New("key not found")
)

Functions

func Dial

func Dial(addr string, opts ...grpc.DialOption) (*grpc.ClientConn, error)

func GetHashID

func GetHashID(key string) []byte

For testing

func NewInode

func NewInode(id string, addr string) *models.Node

Types

type Config

type Config struct {
	Id   string
	Addr string

	ServerOpts []grpc.ServerOption
	DialOpts   []grpc.DialOption

	Hash     func() hash.Hash // Hash function to use
	HashSize int

	StabilizeMin time.Duration // Minimum stabilization time
	StabilizeMax time.Duration // Maximum stabilization time

	Timeout time.Duration
	MaxIdle time.Duration
}

func DefaultConfig

func DefaultConfig() *Config

func (*Config) Validate

func (c *Config) Validate() error

type GrpcTransport

type GrpcTransport struct {
	// contains filtered or unexported fields
}

func NewGrpcTransport

func NewGrpcTransport(config *Config) (*GrpcTransport, error)

func NewGrpcTransport(config *Config) (models.ChordClient, error) {

func (*GrpcTransport) CheckPredecessor

func (g *GrpcTransport) CheckPredecessor(node *models.Node) error

func (*GrpcTransport) DeleteKey

func (g *GrpcTransport) DeleteKey(node *models.Node, key string) error

func (*GrpcTransport) DeleteKeys

func (g *GrpcTransport) DeleteKeys(node *models.Node, keys []string) error

func (*GrpcTransport) FindSuccessor

func (g *GrpcTransport) FindSuccessor(node *models.Node, id []byte) (*models.Node, error)

FindSuccessor the successor ID of a remote node.

func (*GrpcTransport) GetKey

func (g *GrpcTransport) GetKey(node *models.Node, key string) (*models.GetResponse, error)

func (*GrpcTransport) GetPredecessor

func (g *GrpcTransport) GetPredecessor(node *models.Node) (*models.Node, error)

GetPredecessor the successor ID of a remote node.

func (*GrpcTransport) GetServer

func (g *GrpcTransport) GetServer() *grpc.Server

func (*GrpcTransport) GetSuccessor

func (g *GrpcTransport) GetSuccessor(node *models.Node) (*models.Node, error)

GetSuccessor the successor ID of a remote node.

func (*GrpcTransport) Notify

func (g *GrpcTransport) Notify(node, pred *models.Node) error

func (*GrpcTransport) RequestKeys

func (g *GrpcTransport) RequestKeys(node *models.Node, from, to []byte) ([]*models.KV, error)

func (*GrpcTransport) SetKey

func (g *GrpcTransport) SetKey(node *models.Node, key, value string) error

func (*GrpcTransport) SetPredecessor

func (g *GrpcTransport) SetPredecessor(node *models.Node, pred *models.Node) error

func (*GrpcTransport) SetSuccessor

func (g *GrpcTransport) SetSuccessor(node *models.Node, succ *models.Node) error

func (*GrpcTransport) Start

func (g *GrpcTransport) Start() error

func (*GrpcTransport) Stop

func (g *GrpcTransport) Stop() error

Shutdown the TCP transport

type Node

type Node struct {
	*models.Node
	// contains filtered or unexported fields
}

func NewNode

func NewNode(cnf *Config, joinNode *models.Node) (*Node, error)

NewNode creates a new Chord node. Returns error if node already exists in the chord ring

func (*Node) CheckPredecessor

func (n *Node) CheckPredecessor(ctx context.Context, id *models.ID) (*models.ER, error)

func (*Node) Delete

func (n *Node) Delete(key string) error

func (*Node) Find

func (n *Node) Find(key string) (*models.Node, error)

func (*Node) FindSuccessor

func (n *Node) FindSuccessor(ctx context.Context, id *models.ID) (*models.Node, error)

func (*Node) Get

func (n *Node) Get(key string) ([]byte, error)

func (*Node) GetPredecessor

func (n *Node) GetPredecessor(ctx context.Context, r *models.ER) (*models.Node, error)

func (*Node) GetSuccessor

func (n *Node) GetSuccessor(ctx context.Context, r *models.ER) (*models.Node, error)

GetSuccessor gets the successor on the node..

func (*Node) Notify

func (n *Node) Notify(ctx context.Context, node *models.Node) (*models.ER, error)

func (*Node) Set

func (n *Node) Set(key, value string) error

func (*Node) SetPredecessor

func (n *Node) SetPredecessor(ctx context.Context, pred *models.Node) (*models.ER, error)

SetPredecessor sets the predecessor on the node..

func (*Node) SetSuccessor

func (n *Node) SetSuccessor(ctx context.Context, succ *models.Node) (*models.ER, error)

SetSuccessor sets the successor on the node..

func (*Node) Stop

func (n *Node) Stop()

func (*Node) XDelete

func (n *Node) XDelete(ctx context.Context, req *models.DeleteRequest) (*models.DeleteResponse, error)

func (*Node) XGet

func (n *Node) XGet(ctx context.Context, req *models.GetRequest) (*models.GetResponse, error)

func (*Node) XMultiDelete

func (n *Node) XMultiDelete(ctx context.Context, req *models.MultiDeleteRequest) (*models.DeleteResponse, error)

func (*Node) XRequestKeys

func (*Node) XSet

func (n *Node) XSet(ctx context.Context, req *models.SetRequest) (*models.SetResponse, error)

type Storage

type Storage interface {
	Get(string) ([]byte, error)
	Set(string, string) error
	Delete(string) error
	Between([]byte, []byte) ([]*models.KV, error)
	MDelete(...string) error
}

func NewMapStore

func NewMapStore(hashFunc func() hash.Hash) Storage

type Transport

type Transport interface {
	Start() error
	Stop() error

	//RPC
	GetSuccessor(*models.Node) (*models.Node, error)
	FindSuccessor(*models.Node, []byte) (*models.Node, error)
	GetPredecessor(*models.Node) (*models.Node, error)
	Notify(*models.Node, *models.Node) error
	CheckPredecessor(*models.Node) error
	SetPredecessor(*models.Node, *models.Node) error
	SetSuccessor(*models.Node, *models.Node) error

	//Storage
	GetKey(*models.Node, string) (*models.GetResponse, error)
	SetKey(*models.Node, string, string) error
	DeleteKey(*models.Node, string) error
	RequestKeys(*models.Node, []byte, []byte) ([]*models.KV, error)
	DeleteKeys(*models.Node, []string) error
}

Transport enables a node to talk to the other nodes in the ring

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL