rendezvous

package module
v0.0.0-...-01831bb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 19, 2022 License: MIT Imports: 22 Imported by: 0

README

Rendezvous Protocol

Overview

Similar to status-im/rendezvous in using a smaller liveness TTL for records (20s), and not using unregistering records, due to assuming that the TTL is very low (making it incompatible with libp2p original rendezvous spec). This module is intended to be used in go-waku as a lightweight mechanism for generalized peer discovery.

A difference compared to status-im/rendezvous is the usage of routing records and signed envelopes instead of ENR records

Protocol identifier: /vac/waku/rendezvous/0.0.1

Usage

Adding discovery to gossipsub

import (
  "github.com/libp2p/go-libp2p"
  "github.com/libp2p/go-libp2p-core/host"
  "github.com/libp2p/go-libp2p-core/peer"
  pubsub "github.com/status-im/go-libp2p-pubsub"
  rendezvous "github.com/status-im/go-waku-rendezvous"
)

// create a new libp2p Host that listens on a random TCP port
h, err := libp2p.New(libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
if err != nil {
  panic(err)
}

// Create a rendezvous instance
rendezvous := rendezvous.NewRendezvousDiscovery(h)

// create a new PubSub service using the GossipSub router
ps, err := pubsub.NewGossipSub(ctx, h, pubsub.WithDiscovery(rendezvous))
if err != nil {
  panic(err)
}

Creating a rendezvous server

import (
  "database/sql"
  "github.com/syndtr/goleveldb/leveldb"
  "github.com/syndtr/goleveldb/leveldb/opt"
  "github.com/syndtr/goleveldb/leveldb/util"
  "github.com/libp2p/go-libp2p"
  "github.com/libp2p/go-libp2p-core/host"
  "github.com/libp2p/go-libp2p-core/peer"
  pubsub "github.com/status-im/go-libp2p-pubsub"
  rendezvous "github.com/status-im/go-waku-rendezvous"
)

type RendezVousLevelDB struct {
	db *leveldb.DB
}

func NewRendezVousLevelDB(dBPath string) (*RendezVousLevelDB, error) {
	db, err := leveldb.OpenFile(dBPath, &opt.Options{OpenFilesCacheCapacity: 3})

	if err != nil {
		return nil, err
	}

	return &RendezVousLevelDB{db}, nil
}

func (r *RendezVousLevelDB) Delete(key []byte) error {
	return r.db.Delete(key, nil)
}

func (r *RendezVousLevelDB) Put(key []byte, value []byte) error {
	return r.db.Put(key, value, nil)
}

func (r *RendezVousLevelDB) NewIterator(prefix []byte) rendezvous.Iterator {
	return r.db.NewIterator(util.BytesPrefix(prefix), nil)
}


// create a new libp2p Host that listens on a random TCP port
h, err := libp2p.New(libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
if err != nil {
  panic(err)
}

// LevelDB storage for peer records
db, err := NewRendezVousLevelDB("/tmp/rendezvous")
if err != nil {
  panic(err)
}
storage := rendezvous.NewStorage(db)

rendezvousService = rendezvous.NewRendezvousService(h, storage)
if err := rendezvousService.Start(); err != nil {
  panic(err)
}
Protobuf
message Message {
  enum MessageType {
    REGISTER = 0;
    REGISTER_RESPONSE = 1;
    DISCOVER = 2;
    DISCOVER_RESPONSE = 3;
  }

  enum ResponseStatus {
    OK                  = 0;
    E_INVALID_NAMESPACE = 100;
    E_INVALID_PEER_INFO = 101;
    E_INVALID_TTL       = 102;
    E_NOT_AUTHORIZED    = 200;
    E_INTERNAL_ERROR    = 300;
    E_UNAVAILABLE       = 400;
  }

  message Register {
    string ns = 1;
    bytes signedPeerRecord = 2;
    int64 ttl = 3; // in seconds
  }

  message RegisterResponse {
    ResponseStatus status = 1;
    string statusText = 2;
    int64 ttl = 3;
  }

  message Discover {
    string ns = 1;
    int64 limit = 2;
  }

  message DiscoverResponse {
    repeated Register registrations = 1;
    ResponseStatus status = 3;
    string statusText = 4;
  }

  MessageType type = 1;
  Register register = 2;
  RegisterResponse registerResponse = 3;
  Discover discover = 4;
  DiscoverResponse discoverResponse = 5;
}

Documentation

Index

Constants

View Source
const (
	RendezvousID_v001 = protocol.ID("/vac/waku/rendezvous/0.0.1")
	DefaultTTL        = 20 // 20 seconds
)
View Source
const (
	RecordsPrefix byte = 1 + iota

	TopicBodyDelimiter = 0xff
)
View Source
const (
	MaxTTL = 20 // 20sec

	MaxNamespaceLength   = 256
	MaxPeerAddressLength = 2048
	MaxDiscoverLimit     = int64(1000)
)

Variables

View Source
var (
	DiscoverAsyncInterval = 2 * time.Minute
)

Functions

func NewRendezvousDiscovery

func NewRendezvousDiscovery(host host.Host) discovery.Discovery

func TopicPart

func TopicPart(key []byte) []byte

TopicPart looks for TopicBodyDelimiter and returns topic prefix from the same key. It doesn't allocate memory for topic prefix.

Types

type Cleaner

type Cleaner struct {
	// contains filtered or unexported fields
}

func NewCleaner

func NewCleaner() *Cleaner

definitely rename Rewrite cleaner to operate on a leveldb directly if it is impossible to query on topic+timestamp(big endian) for purging store an additional key

func (*Cleaner) Add

func (c *Cleaner) Add(deadlineTime time.Time, key string)

func (*Cleaner) Exist

func (c *Cleaner) Exist(key string) bool

func (*Cleaner) Id

func (c *Cleaner) Id(index int) string

func (*Cleaner) Len

func (c *Cleaner) Len() int

func (*Cleaner) Less

func (c *Cleaner) Less(i, j int) bool

func (*Cleaner) Pop

func (c *Cleaner) Pop() interface{}

func (*Cleaner) PopSince

func (c *Cleaner) PopSince(now time.Time) (rst []string)

func (*Cleaner) Push

func (c *Cleaner) Push(record interface{})

func (*Cleaner) Swap

func (c *Cleaner) Swap(i, j int)

type DB

type DB interface {
	Put([]byte, []byte) error
	Delete([]byte) error
	NewIterator([]byte) Iterator
}

type Iterator

type Iterator interface {
	Release()
	Next() bool
	Prev() bool
	Value() []byte
	Key() []byte
	Seek([]byte) bool
}

type RecordsKey

type RecordsKey []byte

func NewRecordsKey

func NewRecordsKey(ns string, id peer.ID) RecordsKey

func (RecordsKey) SamePrefix

func (k RecordsKey) SamePrefix(prefix []byte) bool

func (RecordsKey) String

func (k RecordsKey) String() string

type Registration

type Registration struct {
	Peer peer.AddrInfo
	Ns   string
	Ttl  int
}

type RegistrationRecord

type RegistrationRecord struct {
	PeerEnvelope []byte
	Ns           string
	Ttl          int
	Deadline     time.Time
}

type RendezvousClient

type RendezvousClient interface {
	Register(ctx context.Context, ns string, ttl int) (time.Duration, error)
	Discover(ctx context.Context, ns string, limit int) ([]peer.AddrInfo, error)
	DiscoverAsync(ctx context.Context, ns string) (<-chan peer.AddrInfo, error)
}

func NewRendezvousClient

func NewRendezvousClient(host host.Host) RendezvousClient

func NewRendezvousClientWithPoint

func NewRendezvousClientWithPoint(rp RendezvousPoint) RendezvousClient

type RendezvousError

type RendezvousError struct {
	Status pb.Message_ResponseStatus
	Text   string
}

func (RendezvousError) Error

func (e RendezvousError) Error() string

type RendezvousPoint

type RendezvousPoint interface {
	Register(ctx context.Context, ns string, ttl int) (time.Duration, error)
	Discover(ctx context.Context, ns string, limit int) ([]Registration, error)
	DiscoverAsync(ctx context.Context, ns string) (<-chan Registration, error)
}

func NewRendezvousPoint

func NewRendezvousPoint(host host.Host) RendezvousPoint

type RendezvousService

type RendezvousService struct {
	// contains filtered or unexported fields
}

func NewRendezvousService

func NewRendezvousService(host host.Host, storage Storage) *RendezvousService

func (*RendezvousService) Start

func (rz *RendezvousService) Start() error

func (*RendezvousService) Stop

func (rz *RendezvousService) Stop()

Stop closes listener and waits till all helper goroutines are stopped.

type Storage

type Storage struct {
	// contains filtered or unexported fields
}

Storage manages records.

func NewStorage

func NewStorage(db DB) Storage

NewStorage creates instance of the storage.

func (Storage) Add

func (s Storage) Add(ns string, id peer.ID, envelope []byte, ttl int, deadline time.Time) (string, error)

Add stores record using specified topic.

func (*Storage) GetRandom

func (s *Storage) GetRandom(ns string, limit int64) (rst []RegistrationRecord, err error)

GetRandom reads random records for specified topic up to specified limit.

func (*Storage) IterateAllKeys

func (s *Storage) IterateAllKeys(iterator func(key RecordsKey, Deadline time.Time) error) error

func (*Storage) RemoveByKey

func (s *Storage) RemoveByKey(key string) error

RemoveBykey removes record from storage.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL