emo

package module
v0.0.0-...-6e8f25a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 9, 2024 License: MIT Imports: 27 Imported by: 0

README

Emo

Emo is a Kademlia DHT implementation for Go with a focus on performance and ease of use. It is not seek to conform to any existing standards or implementations.

  • implements a 256 bit keyspace using Keccak256 hashing.
  • replication factor (K) of 20
  • wire protocol using flatbuffers
  • SO_REUSEPORT to concurrently handle requests on the same port
  • asynchronous api
  • supports values larger than MTU

Usage

In order to start a cluster of nodes, you will first need a bootstrap node that all other nodes can connect to first. To start a bootstrap node:

func main() {
    cfg := &dht.Config{
        ListenAddress: "127.0.0.1:9000", // udp address to bind to
        Listeners: 4,                    // number of socket listeners, defaults to GOMAXPROCS
        Timeout: time.Minute / 2         // request timeout, defaults to 1 minute
    }

    dht, err := dht.New(cfg)
    if err != nil {
        log.Fatal(err)
    }

    log.Println("bootstrap node started!")
}

Once a bootstrap node is up and runing, you can add other nodes to the network:

func main() {
    cfg := &dht.Config{
        ListenAddress: "127.0.0.1:9001", // udp address to bind to
        BootstrapAddresses: []string{
            "127.0.0.1:9000",
        },
        Listeners: 4,                    // number of socket listeners, defaults to GOMAXPROCS
        Timeout: time.Minute / 2         // request timeout, defaults to 1 minute
    }

    dht, err := dht.New(cfg)
    if err != nil {
        log.Fatal(err)
    }

    log.Println("node started!")
}

From any node you can then store values as follows:

func main() {
    ...

    // helper function to construct a sha1 hash that
    // will be used as the values key
    myKey := dht.Key("my-awesome-key")
    myValue := []byte("my-even-more-awesome-value")

    // stores a value for a given amount of time
    dht.Store(myKey, myValue, time.Hour, func(err error) {
        if err != nil {
            log.Fatal(err)
        }
        log.Printf("successfully stored key: %s -> %s", string(myKey), string(myValue))
    })
}

Once your value is stored, you can retreive it from the network as follows:

func main() {
    ...

    // finds the value. please note it is not safe to use the value outside
    // of the provided callback unless it is copied
    dht.Find(myKey, func(value []byte, err error) {
        if err != nil {
            log.Fatal(err)
        }
        log.Printf("successfully retrieved key: %s -> %s !\n", string(myKey), string(value))
    })
}

OS Tuning

For most linux distros, socket send and receive buffers are set very low. This will almost certainly result in large amounts of packet loss at higher throughput levels as these buffers get overrun.

How large these buffers will need to be will be dependent on your workload, so you should experiment to find the correct value.

You can temporarily increase the of the read and write buffers via sysctl:

# set the rmem and wmem buffers to ~128 MB each
$ sysctl -w net.core.rmem_max=536870912 && sysctl -w net.core.rmem_default=134217728 && sysctl -w net.core.wmem_max=536870912 && sysctl -w net.core.wmem_default=134217728

Development

To re-generate the flatbuffer definitions for the wire protocol:

$ make generate

Build the CLI emo executable:

$ go build -o emo ./cmd/emo

This will generate an executable named emo.

To run tests:

$ go test -v -race

To run benchmarks:

$ go test -v -bench=.

Running the Daemon To run the emo daemon as a background process (similar to a service), you can use the following methods:

$ ./emo daemon

This will start the emo daemon listening on 0.0.0.0:9000.

Running the Daemon in the Background

$ nohup ./emo daemon > emo.log 2>&1 &

This will start the emo daemon in the background and output the log to emo.log.

Implemented Features

  • [✅] routing
  • [✅] storage (custom)
  • [✅] storage (in-memory)
  • [✅] ping
  • [✅] store
  • [✅] findNode
  • [✅] findValue
  • [✅] benchmarks
  • [✅] node join/leave
  • [✅] user defined storage
  • [✅] multiple values per store request
  • [✅] handles packets larger than MTU
  • [✅] multiple values per key
  • [✅] batch socket reads and writes
  • [✅] peer refresh
  • [✅] key refresh
  • [✅] latency based route selection
  • [✅] storage (persistent)

Documentation

Index

Constants

View Source
const (
	// K number of nodes in a bucket
	K = 20
	// ALPHA number of nodes to query in parallel
	ALPHA = 3
	// KEY_BITS number of bits in a key
	KEY_BITS = 256
	// KEY_BYTES number of bytes in a key
	KEY_BYTES = KEY_BITS / 8
	// VALUE_BYTES maximum bytecode size that a value can have.
	VALUE_BYTES = 32 * 1024 // 32KiB
)
View Source
const (
	// PacketHeaderSize the size of the header we use to reconstruct data
	PacketHeaderSize = KEY_BYTES + 4

	// MaxEventSize the maximum size of an event packet size
	MaxEventSize = 65024

	// MaxPacketSize the size of packets we will send according to MTU,
	// minus a 8 bytes for the UDP header
	MaxPacketSize = 1472

	// MaxPayloadSize the maximum payload of our packet. The max packet size,
	// minus 24 bytes for our fragment header
	MaxPayloadSize = MaxPacketSize - PacketHeaderSize
)

Variables

View Source
var (
	// ErrRequestTimeout returned when a pending request has not recevied a response before the TTL period
	ErrRequestTimeout = errors.New("request timeout")
)

Functions

func ChaindataDir

func ChaindataDir(dataDir string) string

ChaindataDir returns the path to the LevelDB database.

func DefaultDataDir

func DefaultDataDir() string

DefaultDataDir returns the default data directory path based on the operating system.

func Keccak256

func Keccak256(data []byte) []byte

func Key

func Key(k any) []byte

Key creates a new 32 byte key hasehed with Keccak256 from a string, byte slice or int

func NewDatabase

func NewDatabase(path string) (*database, error)

Newdatabase initializes a new database instance.

func NewLatencyRouter

func NewLatencyRouter(dht *DHT) *latencyRouter

Types

type Config

type Config struct {
	// LocalID the id of this node. If not specified, a random id will be generated
	LocalID []byte
	// ListenAddress the udp ip and port to listen on
	ListenAddress string
	// BootstrapAddresses the udp ip and port of the bootstrap nodes
	BootstrapAddresses []string
	// Listeners the number of threads that will listen on the designated udp port
	Listeners int
	// Timeout the amount of time before a peer is declared unresponsive and removed
	Timeout time.Duration
	// Storage implementation to use for storing key value pairs
	Storage Storage
	// StorageBackend the type of storage to use
	StorageBackend StorageType
	// LevelDBPath the path to the LevelDB database
	LevelDBPath string
	// DataDir the path to the data directory
	DataDir string
	// SocketBufferSize sets the size of the udp sockets send and receive buffer
	SocketBufferSize int
	// SocketBatchSize the batch size of udp messages that will be written to the underlying socket
	SocketBatchSize int
	// SocketBatchInterval the period with which the current batch of udp messages will be written to the underlying socket if not full
	SocketBatchInterval time.Duration
	// Logging enables basic logging
	Logging bool
}

Config configuration parameters for the dht

type DHT

type DHT struct {
	// contains filtered or unexported fields
}

DHT represents the distributed hash table

func New

func New(cfg *Config) (*DHT, error)

New creates a new dht

func (*DHT) Close

func (d *DHT) Close() error

Close shuts down the dht

func (*DHT) Find

func (d *DHT) Find(key []byte, callback func(value []byte, err error), opts ...*FindOption)

Find finds a value on the network if it exists. If the key being queried has multiple values, the callback will be invoked for each result Any returned value will not be safe to use outside of the callback, so you should copy it if its needed elsewhere

func (*DHT) Store

func (d *DHT) Store(key, value []byte, ttl time.Duration, callback func(err error))

Store a value on the network. If the value fails to store, the provided callback will be returned with the error

type FindOption

type FindOption struct {
	// contains filtered or unexported fields
}

FindOption for configuring find requests

func ValuesFrom

func ValuesFrom(from time.Time) *FindOption

ValuesFrom filters results to only those that were created after a given timestmap this is useful for repeat queries where duplicates ideally should be avoided

type Storage

type Storage interface {
	Get(key []byte, from time.Time) ([]*Value, bool)
	Set(key, value []byte, created time.Time, ttl time.Duration) bool
	Iterate(cb func(value *Value) bool)
}

Storage defines the storage interface used by the DLT

func InitializeStorage

func InitializeStorage(cfg *Config) (Storage, error)

InitializeStorage initializes the storage based on the configuration.

type StorageType

type StorageType string

StorageType defines the type of storage to use.

const (
	InMemoryStorage StorageType = "inmemory"
	LevelDBStorage  StorageType = "leveldb"
)

type Value

type Value struct {
	Key     []byte
	Value   []byte
	TTL     time.Duration
	Created time.Time
	// contains filtered or unexported fields
}

Value represents the value to be stored

Directories

Path Synopsis
cmd
emo

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL