rangesync

package
v1.7.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 22, 2024 License: MIT Imports: 26 Imported by: 0

Documentation

Overview

Package rangesync implements pairwise set reconciliation protocol.

The protocol is based on this paper: Range-Based Set Reconciliation by Aljoscha Meyer https://arxiv.org/pdf/2212.13567.pdf

The protocol has an advantage of a possibility to build a reusable synchronization helper structures (ordered sets) and also a possibility to use the same structure to reconcile subranges of the ordered sets efficiently against multiple peers. The disadvantage is that the algorithm is not very efficient for large differences (like tens of thousands of elements) when the elements have no apparent ordering, which is the case with hash-based IDs like ATX ids. In order to mitigate this problem, the protocol supports exchanging recently-received items between peers before beginning the actual reconciliation. When the difference is large, the algorithm can automatically degrade to dumb "send me the whole set" mode. The resulting increased load on the network and the peer computers is mitigated by splitting the element range between multiple peers.

The core concepts are the following:

  1. Ordered sets. The sets to be reconciled are ordered. The elements in the set are also called "keys" b/c they represent the IDs of the actual objects being synchronized, such as ATXs.
  2. Ranges. [x, y) range denotes a range of items in the set, where x is inclusive and y is exclusive. The ranges may wrap around. This means: - x == y: the whole set - x < y: a normal range starting with x and ending below y - x > y: a wrapped around range, that is from x (inclusive) to the end of the set and from the beginning of the set to y, non-inclusive.
  3. Fingerprint. Each range has a fingerprint, which is all the IDs (keys) in the range XORed together. The fingerprint is used to quickly check if the range is in sync with the peer.

Each OrderedSet is supposed to be able to provide the number of items and the fingerprint of items in a range relatively cheaply. Additionally, the OrderedSet has Recent method that retrieves recently-based keys (elements) based on the provided timestamp. This reconciliation helper mechanism is optional, so Recent may just return an empty sequence.

Below is a log of a sample interaction between two peers A and B. Interactions:

A: empty set; B: empty set A -> B:

EmptySet
EndRound

B -> A:

Done

A: empty set; B: non-empty set A -> B:

EmptySet
EndRound

B -> A:

ItemBatch
ItemBatch
...

A -> B:

Done

A: small set (< maxSendRange); B: non-empty set A -> B:

ItemBatch
ItemBatch
...
RangeContents [x, y)
EndRound

B -> A:

ItemBatch
ItemBatch
...

A -> B:

Done

A: large set; B: non-empty set; maxDiff < 0 A -> B:

Fingerprint [x, y)
EndRound

B -> A:

Fingerprint [x, m)
Fingerprint [m, y)
EndRound

A -> B:

ItemBatch
ItemBatch
...
RangeContents [x, m)
EndRound

A -> B:

Done

A: large set; B: non-empty set; maxDiff >= 0; differenceMetric <= maxDiff NOTE: Sample includes fingerprint A -> B:

Sample [x, y)
EndRound

B -> A:

Fingerprint [x, m)
Fingerprint [m, y)
EndRound

A -> B:

ItemBatch
ItemBatch
...
RangeContents [x, m)
EndRound

A -> B:

Done

A: large set; B: non-empty set; maxDiff >= 0; differenceMetric > maxDiff A -> B:

Sample [x, y)
EndRound

B -> A:

ItemBatch
ItemBatch
...
RangeContents [x, y)
EndRound

A -> B:

Done

A: large set; B: non-empty set; sync priming; maxDiff >= 0; differenceMetric <= maxDiff (after priming) A -> B:

ItemBatch
ItemBatch
...
Recent
EndRound

B -> A:

ItemBatch
ItemBatch
...
Sample [x, y)
EndRound

A -> B:

Fingerprint [x, m)
Fingerprint [m, y)
EndRound

B -> A:

ItemBatch
ItemBatch
...
RangeContents [x, m)
EndRound

A -> B:

Done

A: large set; B: non-empty set; sync priming; maxDiff < 0 A -> B:

ItemBatch
ItemBatch
...
Recent
EndRound

B -> A:

ItemBatch
ItemBatch
...
Fingerprint [x, y)
EndRound

A -> B:

Fingerprint [x, m)
Fingerprint [m, y)
EndRound

B -> A:

ItemBatch
ItemBatch
...
RangeContents [x, m)
EndRound

A -> B:

Done

nolint

Index

Constants

View Source
const (
	DefaultMaxSendRange  = 16
	DefaultItemChunkSize = 1024
	DefaultSampleSize    = 200
)
View Source
const (
	// FingerprintSize is the size of a fingerprint in bytes.
	FingerprintSize = 12
)

Variables

View Source
var ErrLimitExceeded = errors.New("sync traffic/message limit exceeded")

Functions

func CalcSim

func CalcSim(a, b []MinhashSampleItem) float64

CalcSim estimates the Jaccard similarity coefficient between two sets based on the samples, which are derived from N lowest-valued elements of each set. The return value is in 0..1 range, with 0 meaning almost no intersection and 1 meaning the sets are mostly equal. The precision of the estimate will suffer if none of the sets are empty and they have different size, with return value tending to be lower than the actual J coefficient.

func SyncMessageToString

func SyncMessageToString(m SyncMessage) string

SyncMessageToString returns string representation of a sync message.

Types

type CompactHash

type CompactHash struct {
	H KeyBytes
}

CompactHash encodes hashes in a compact form, skipping trailing zeroes. It also supports a nil hash (no value). The encoding format is as follows: byte 0: spec byte bytes 1..n: data bytes

The format of the spec byte is as follows: bits 0..5: number of non-zero leading bytes bits 6..7: hash type

The following hash types are supported: 0: nil hash 1: 32-byte hash 2,3: reserved

NOTE: when adding new hash types, we need to add a mechanism that makes sure that every received hash is of the expected type. Alternatively, we need to add some kind of context to the scale.Decoder / scale.Encoder, which may contain the size of hashes to be used.

func (*CompactHash) DecodeScale

func (c *CompactHash) DecodeScale(dec *scale.Decoder) (int, error)

DecodeScale implements scale.Decodable.

func (*CompactHash) EncodeScale

func (c *CompactHash) EncodeScale(enc *scale.Encoder) (int, error)

EncodeScale implements scale.Encodable.

func (*CompactHash) ToOrdered

func (c *CompactHash) ToOrdered() KeyBytes

type Conduit

type Conduit interface {
	// NextMessage returns the next SyncMessage, or nil if there are no more
	// SyncMessages for this session.
	NextMessage() (SyncMessage, error)
	// Send sends a SyncMessage to the peer.
	Send(SyncMessage) error
}

Conduit handles receiving and sending peer messages.

type ConduitOption

type ConduitOption func(c *wireConduit)

ConduitOption specifies an option for a message conduit.

func WithMessageLimit

func WithMessageLimit(limit int) ConduitOption

WithMessageLimit sets a limit on the total number of messages sent and received. Zero or negative values disable the limit.

func WithTrafficLimit

func WithTrafficLimit(limit int) ConduitOption

WithTrafficLimit sets a limit on the total number of bytes sent and received. Zero or negative values disable the limit.

type Dispatcher

type Dispatcher struct {
	*server.Server
	// contains filtered or unexported fields
}

Dispatcher multiplexes a P2P Server to multiple set reconcilers.

func NewDispatcher

func NewDispatcher(logger *zap.Logger) *Dispatcher

NewDispatcher creates a new Dispatcher.

func (*Dispatcher) Dispatch

func (d *Dispatcher) Dispatch(
	ctx context.Context,
	req []byte,
	stream io.ReadWriter,
) (err error)

Dispatch dispatches a request to a handler.

func (*Dispatcher) Register

func (d *Dispatcher) Register(name string, h Handler)

Register registers a handler with a Dispatcher.

func (*Dispatcher) SetupServer

func (d *Dispatcher) SetupServer(
	host host.Host,
	proto string,
	opts ...server.Opt,
) *server.Server

SetupServer creates a new P2P Server for the Dispatcher.

type DoneMessage

type DoneMessage struct{ Marker }

DoneMessage is a SyncMessage that denotes the end of the synchronization. The peer should stop any further processing after receiving this message.

func (*DoneMessage) Type

func (*DoneMessage) Type() MessageType

type EmptyRangeMessage

type EmptyRangeMessage struct {
	RangeX, RangeY CompactHash
}

EmptyRangeMessage notifies the peer that it needs to send all of its items in the specified range.

func (*EmptyRangeMessage) Count

func (m *EmptyRangeMessage) Count() int

func (*EmptyRangeMessage) DecodeScale

func (t *EmptyRangeMessage) DecodeScale(dec *scale.Decoder) (total int, err error)

func (*EmptyRangeMessage) EncodeScale

func (t *EmptyRangeMessage) EncodeScale(enc *scale.Encoder) (total int, err error)

func (*EmptyRangeMessage) Fingerprint

func (m *EmptyRangeMessage) Fingerprint() Fingerprint

func (*EmptyRangeMessage) Keys

func (m *EmptyRangeMessage) Keys() []KeyBytes

func (*EmptyRangeMessage) Sample

func (m *EmptyRangeMessage) Sample() []MinhashSampleItem

func (*EmptyRangeMessage) Since

func (m *EmptyRangeMessage) Since() time.Time

func (*EmptyRangeMessage) Type

func (m *EmptyRangeMessage) Type() MessageType

func (*EmptyRangeMessage) X

func (m *EmptyRangeMessage) X() KeyBytes

func (*EmptyRangeMessage) Y

func (m *EmptyRangeMessage) Y() KeyBytes

type EmptySetMessage

type EmptySetMessage struct{ Marker }

EmptySetMessage is a SyncMessage that denotes an empty set, requesting the peer to send all of its items.

func (*EmptySetMessage) Type

func (*EmptySetMessage) Type() MessageType

type EndRoundMessage

type EndRoundMessage struct{ Marker }

EndRoundMessage is a SyncMessage that denotes the end of the sync round.

func (*EndRoundMessage) Type

func (*EndRoundMessage) Type() MessageType

type Fingerprint

type Fingerprint [FingerprintSize]byte

Fingerprint represents a fingerprint of a set of keys. The fingerprint is obtained by XORing together the keys in the set.

func EmptyFingerprint

func EmptyFingerprint() Fingerprint

EmptyFingerprint returns an empty fingerprint.

func MustParseHexFingerprint

func MustParseHexFingerprint(s string) Fingerprint

MustParseHexFingerprint converts a hex string to Fingerprint.

func RandomFingerprint

func RandomFingerprint() Fingerprint

RandomFingerprint generates a random fingerprint.

func (*Fingerprint) BitFromLeft

func (fp *Fingerprint) BitFromLeft(i int) bool

BitFromLeft returns the n-th bit from the left in the fingerprint.

func (Fingerprint) Compare

func (fp Fingerprint) Compare(other Fingerprint) int

Compare compares two fingerprints.

func (Fingerprint) ShortString

func (fp Fingerprint) ShortString() string

String implements log.ShortString.

func (Fingerprint) String

func (fp Fingerprint) String() string

String implements fmt.Stringer.

func (*Fingerprint) Update

func (fp *Fingerprint) Update(h []byte)

Update includes the byte slice in the fingerprint.

type FingerprintMessage

type FingerprintMessage struct {
	RangeX, RangeY   CompactHash
	RangeFingerprint Fingerprint
	NumItems         uint32
}

FingerprintMessage contains range fingerprint for comparison against the peer's fingerprint of the range with the same bounds [RangeX, RangeY).

func (*FingerprintMessage) Count

func (m *FingerprintMessage) Count() int

func (*FingerprintMessage) DecodeScale

func (t *FingerprintMessage) DecodeScale(dec *scale.Decoder) (total int, err error)

func (*FingerprintMessage) EncodeScale

func (t *FingerprintMessage) EncodeScale(enc *scale.Encoder) (total int, err error)

func (*FingerprintMessage) Fingerprint

func (m *FingerprintMessage) Fingerprint() Fingerprint

func (*FingerprintMessage) Keys

func (m *FingerprintMessage) Keys() []KeyBytes

func (*FingerprintMessage) Sample

func (m *FingerprintMessage) Sample() []MinhashSampleItem

func (*FingerprintMessage) Since

func (m *FingerprintMessage) Since() time.Time

func (*FingerprintMessage) Type

func (m *FingerprintMessage) Type() MessageType

func (*FingerprintMessage) X

func (*FingerprintMessage) Y

type Handler

type Handler func(ctx context.Context, s io.ReadWriter) error

Handler is a function that handles a request for a Dispatcher.

type ItemBatchMessage

type ItemBatchMessage struct {
	ContentKeys KeyCollection `scale:"max=1024"`
}

ItemBatchMessage denotes a batch of items to be added to the peer's set.

func (*ItemBatchMessage) Count

func (m *ItemBatchMessage) Count() int

func (*ItemBatchMessage) DecodeScale

func (t *ItemBatchMessage) DecodeScale(dec *scale.Decoder) (total int, err error)

func (*ItemBatchMessage) EncodeScale

func (t *ItemBatchMessage) EncodeScale(enc *scale.Encoder) (total int, err error)

func (*ItemBatchMessage) Fingerprint

func (m *ItemBatchMessage) Fingerprint() Fingerprint

func (*ItemBatchMessage) Keys

func (m *ItemBatchMessage) Keys() []KeyBytes

func (*ItemBatchMessage) Sample

func (m *ItemBatchMessage) Sample() []MinhashSampleItem

func (*ItemBatchMessage) Since

func (m *ItemBatchMessage) Since() time.Time

func (*ItemBatchMessage) Type

func (m *ItemBatchMessage) Type() MessageType

func (*ItemBatchMessage) X

func (m *ItemBatchMessage) X() KeyBytes

func (*ItemBatchMessage) Y

func (m *ItemBatchMessage) Y() KeyBytes

type KeyBytes

type KeyBytes []byte

KeyBytes represents an item (key) in a reconciliable set.

func MustParseHexKeyBytes

func MustParseHexKeyBytes(s string) KeyBytes

MustParseHexKeyBytes converts a hex string to KeyBytes.

func RandomKeyBytes

func RandomKeyBytes(size int) KeyBytes

RandomKeyBytes generates random data in bytes for testing.

func (KeyBytes) Clone

func (k KeyBytes) Clone() KeyBytes

Clone returns a copy of the key.

func (KeyBytes) Compare

func (k KeyBytes) Compare(other KeyBytes) int

Compare compares two keys.

func (KeyBytes) Inc

func (k KeyBytes) Inc() (overflow bool)

Inc returns the key with the same number of bytes as this one, obtained by incrementing the key by one. It returns true if the increment has caused an overflow.

func (KeyBytes) IsZero

func (k KeyBytes) IsZero() bool

IsZero returns true if all bytes in the key are zero.

func (KeyBytes) ShortString

func (k KeyBytes) ShortString() string

String implements log.ShortString.

func (KeyBytes) String

func (k KeyBytes) String() string

String implements fmt.Stringer.

func (KeyBytes) Zero

func (k KeyBytes) Zero()

Zero sets all bytes in the key to zero.

type KeyCollection

type KeyCollection struct {
	Keys []KeyBytes
}

KeyCollection represents a collection of keys of the same size.

func (*KeyCollection) DecodeScale

func (c *KeyCollection) DecodeScale(dec *scale.Decoder) (int, error)

DecodeScale implements scale.Decodable.

func (*KeyCollection) EncodeScale

func (c *KeyCollection) EncodeScale(enc *scale.Encoder) (int, error)

EncodeScale implements scale.Encodable.

type Marker

type Marker struct{}

func (*Marker) Count

func (*Marker) Count() int

func (*Marker) Fingerprint

func (*Marker) Fingerprint() Fingerprint

func (*Marker) Keys

func (*Marker) Keys() []KeyBytes

func (*Marker) Sample

func (*Marker) Sample() []MinhashSampleItem

func (*Marker) Since

func (*Marker) Since() time.Time

func (*Marker) X

func (*Marker) X() KeyBytes

func (*Marker) Y

func (*Marker) Y() KeyBytes

type MessageType

type MessageType byte

MessageType specifies the type of a sync message.

const (
	// Done message is sent to indicate the completion of the whole sync run.
	MessageTypeDone MessageType = iota
	// EndRoundMessage is sent to indicate the completion of a single round.
	MessageTypeEndRound
	// EmptySetMessage is sent to indicate that the set is empty. In response, the
	// receiving side sends its whole set using ItemBatch and RangeContents messages.
	MessageTypeEmptySet
	// EmptyRangeMessage is sent to indicate that the specified range is empty. In
	// response, the receiving side sends the contents of the range using ItemBatch
	// and RangeContents messages.
	MessageTypeEmptyRange
	// Fingerprint carries a range fingerprint. Depending on the local fingerprint of
	// the same range, the receiving side may not reply to this message (range
	// completed), may send the contents of the range using ItemBatch and
	// RangeContents messages if the range is small enough, or it can split the range
	// in two and send back the Fingerprint messages for each of resulting parts.
	MessageTypeFingerprint
	// RangeContents message is sent after ItemBatch messages to indicate what range
	// the messages belong too. If the receiving side has any items within the same
	// range, these items are sent back using ItemBatch and RangeContents messages.
	MessageTypeRangeContents
	// ItemBatchMessage is sent to carry a batch of items.
	MessageTypeItemBatch
	// ProbeMessage is sent to request the count of items and an estimate of the
	// Jaccard similarity coefficient for the specfied range.
	MessageTypeProbe
	// Sample message carries a minhash sample along with the fingerprint and
	// the fingerprint and number of items within the specied range.
	MessageTypeSample
	// Recent message is sent after ItemBatch messages to indicate that the batches
	// contains items recently added to the set.
	MessageTypeRecent
)

func (MessageType) String

func (mtype MessageType) String() string

String implements Stringer.

type MinhashSampleItem

type MinhashSampleItem uint32

MinhashSampleItem represents an item of minhash sample subset.

func MinhashSampleItemFromKeyBytes

func MinhashSampleItemFromKeyBytes(h KeyBytes) MinhashSampleItem

MinhashSampleItemFromKeyBytes uses lower 32 bits of a hash as a MinhashSampleItem.

func Sample

func Sample(sr SeqResult, count, sampleSize int) ([]MinhashSampleItem, error)

Sample retrieves min(count, sampleSize) items friom the ordered sequence, extracting MinhashSampleItem from each value.

func (MinhashSampleItem) Compare

func (m MinhashSampleItem) Compare(other MinhashSampleItem) int

func (*MinhashSampleItem) DecodeScale

func (m *MinhashSampleItem) DecodeScale(d *scale.Decoder) (int, error)

DecodeScale implements scale.Decodable.

func (MinhashSampleItem) EncodeScale

func (m MinhashSampleItem) EncodeScale(e *scale.Encoder) (int, error)

EncodeScale implements scale.Encodable.

func (MinhashSampleItem) String

func (m MinhashSampleItem) String() string

type OrderedSet

type OrderedSet interface {
	// Receive handles a new key received from the peer.
	// It may or may not add it to the set immediately; this doesn't affect set
	// reconciliation operation.
	Receive(k KeyBytes) error
	// GetRangeInfo returns RangeInfo for the item range in the ordered set,
	// bounded by [x, y).
	// x == y indicates the whole set.
	// x < y indicates a normal range starting with x and ending below y.
	// x > y indicates a wrapped around range, that is from x (inclusive) to then end
	// of the set and from the beginning of the set to y, non-inclusive.
	// If count >= 0, at most count items are returned, and RangeInfo
	// is returned for the corresponding subrange of the requested range.
	// If both x and y are nil, the information for the entire set is returned.
	// If any of x or y is nil, the other one must be nil as well.
	GetRangeInfo(x, y KeyBytes, count int) (RangeInfo, error)
	// SplitRange splits the range roughly after the specified count of items,
	// returning RangeInfo for the first half and the second half of the range.
	SplitRange(x, y KeyBytes, count int) (SplitInfo, error)
	// Items returns the sequence of items in the set.
	Items() SeqResult
	// Empty returns true if the set is empty.
	Empty() (bool, error)
	// Copy makes a shallow copy of the OrderedSet.
	// syncScope argument is a hint that can be used to optimize resource usage.
	// If syncScope is true, then the copy is intended to be used for the duration of
	// a synchronization run.
	// If syncScope if false, then the lifetime of the copy is not clearly defined.
	Copy(syncScope bool) OrderedSet
	// Recent returns an Iterator that yields the items added since the specified
	// timestamp. Some OrderedSet implementations may not have Recent implemented, in
	// which case it should return an empty sequence.
	Recent(since time.Time) (SeqResult, int)
}

OrderedSet represents the set that can be synced against a remote peer.

func NewDumbSet

func NewDumbSet(disableReAdd bool) OrderedSet

NewDumbSet creates a new dumbSet instance. If disableReAdd is true, the set will panic if the same item is received twice.

type PairwiseSetSyncer

type PairwiseSetSyncer struct {
	// contains filtered or unexported fields
}

func NewPairwiseSetSyncer

func NewPairwiseSetSyncer(
	r Requester,
	name string,
	opts []RangeSetReconcilerOption,
	conduitOpts []ConduitOption,
) *PairwiseSetSyncer

func (*PairwiseSetSyncer) Probe

func (pss *PairwiseSetSyncer) Probe(
	ctx context.Context,
	peer p2p.Peer,
	os OrderedSet,
	x, y KeyBytes,
) (ProbeResult, error)

func (*PairwiseSetSyncer) Received

func (pss *PairwiseSetSyncer) Received() int

func (*PairwiseSetSyncer) Register

func (pss *PairwiseSetSyncer) Register(d *Dispatcher, os OrderedSet)

func (*PairwiseSetSyncer) Sent

func (pss *PairwiseSetSyncer) Sent() int

func (*PairwiseSetSyncer) Serve

func (pss *PairwiseSetSyncer) Serve(ctx context.Context, stream io.ReadWriter, os OrderedSet) error

func (*PairwiseSetSyncer) Sync

func (pss *PairwiseSetSyncer) Sync(
	ctx context.Context,
	peer p2p.Peer,
	os OrderedSet,
	x, y KeyBytes,
) error

type ProbeMessage

type ProbeMessage struct {
	RangeX, RangeY   CompactHash
	RangeFingerprint Fingerprint
	SampleSize       uint32
}

ProbeMessage requests bounded range fingerprint and count from the peer, along with a minhash sample if fingerprints differ.

func (*ProbeMessage) Count

func (m *ProbeMessage) Count() int

func (*ProbeMessage) DecodeScale

func (t *ProbeMessage) DecodeScale(dec *scale.Decoder) (total int, err error)

func (*ProbeMessage) EncodeScale

func (t *ProbeMessage) EncodeScale(enc *scale.Encoder) (total int, err error)

func (*ProbeMessage) Fingerprint

func (m *ProbeMessage) Fingerprint() Fingerprint

func (*ProbeMessage) Keys

func (m *ProbeMessage) Keys() []KeyBytes

func (*ProbeMessage) Sample

func (m *ProbeMessage) Sample() []MinhashSampleItem

func (*ProbeMessage) Since

func (m *ProbeMessage) Since() time.Time

func (*ProbeMessage) Type

func (m *ProbeMessage) Type() MessageType

func (*ProbeMessage) X

func (m *ProbeMessage) X() KeyBytes

func (*ProbeMessage) Y

func (m *ProbeMessage) Y() KeyBytes

type ProbeResult

type ProbeResult struct {
	// Fingerprint of the range.
	FP any
	// Number of items in the range.
	Count int
	// An estimate of Jaccard similarity coefficient between the sets.
	// The range is 0..1, 0 being mostly disjoint sets and 1 being mostly equal sets.
	Sim float64
}

ProbeResult contains the result of a probe.

type RangeContentsMessage

type RangeContentsMessage struct {
	RangeX, RangeY CompactHash
	NumItems       uint32
}

RangeContentsMessage denotes a range for which the set of items has been sent. The peer needs to send back any items it has in the same range bounded by [RangeX, RangeY).

func (*RangeContentsMessage) Count

func (m *RangeContentsMessage) Count() int

func (*RangeContentsMessage) DecodeScale

func (t *RangeContentsMessage) DecodeScale(dec *scale.Decoder) (total int, err error)

func (*RangeContentsMessage) EncodeScale

func (t *RangeContentsMessage) EncodeScale(enc *scale.Encoder) (total int, err error)

func (*RangeContentsMessage) Fingerprint

func (m *RangeContentsMessage) Fingerprint() Fingerprint

func (*RangeContentsMessage) Keys

func (m *RangeContentsMessage) Keys() []KeyBytes

func (*RangeContentsMessage) Sample

func (*RangeContentsMessage) Since

func (m *RangeContentsMessage) Since() time.Time

func (*RangeContentsMessage) Type

func (*RangeContentsMessage) X

func (*RangeContentsMessage) Y

type RangeInfo

type RangeInfo struct {
	// Fingerprint of the interval
	Fingerprint Fingerprint
	// Number of items in the interval
	Count int
	// Items is the sequence of set elements in the interval.
	Items SeqResult
}

RangeInfo contains information about a range of items in the OrderedSet as returned by OrderedSet.GetRangeInfo.

type RangeSetReconciler

type RangeSetReconciler struct {
	// contains filtered or unexported fields
}

RangeSetReconciler reconciles two sets of items using the recursive set reconciliation protocol.

func NewRangeSetReconciler

func NewRangeSetReconciler(os OrderedSet, opts ...RangeSetReconcilerOption) *RangeSetReconciler

NewRangeSetReconciler creates a new RangeSetReconciler.

func (*RangeSetReconciler) HandleProbeResponse

func (rsr *RangeSetReconciler) HandleProbeResponse(c Conduit, info RangeInfo) (pr ProbeResult, err error)

HandleProbeResponse processes the probe response message and returns the probe result. info is the range info returned by InitiateProbe.

func (*RangeSetReconciler) Initiate

func (rsr *RangeSetReconciler) Initiate(c Conduit, x, y KeyBytes) error

Initiate initiates the reconciliation process with the peer. If x and y are non-nil, [x, y) range is reconciled. If x and y are nil, the whole range is reconciled.

func (*RangeSetReconciler) InitiateProbe

func (rsr *RangeSetReconciler) InitiateProbe(
	c Conduit,
	x, y KeyBytes,
) (RangeInfo, error)

InitiateProbe initiates a probe to retrieve the item count and Jaccard similarity coefficient from the peer.

func (*RangeSetReconciler) Run

func (rsr *RangeSetReconciler) Run(c Conduit) error

Run performs sync reconciliation run using specified Conduit to send and receive messages.

type RangeSetReconcilerOption

type RangeSetReconcilerOption func(r *RangeSetReconciler)

RangeSetReconcilerOption is a configuration option for RangeSetReconciler.

func WithClock

WithClock specifies the clock for RangeSetReconciler.

func WithItemChunkSize

func WithItemChunkSize(n int) RangeSetReconcilerOption

WithItemChunkSize sets the size of the item chunk to use when sending the set items.

func WithLogger

func WithLogger(log *zap.Logger) RangeSetReconcilerOption

WithLogger specifies the logger for RangeSetReconciler.

func WithMaxDiff

func WithMaxDiff(d float64) RangeSetReconcilerOption

WithMaxDiff sets maximum set difference metric (0..1) allowed for recursive reconciliation, with value of 0 meaning equal sets and 1 meaning completely disjoint set. If the difference metric MaxDiff value, the whole set is transmitted instead of applying the recursive algorithm.

func WithMaxSendRange

func WithMaxSendRange(n int) RangeSetReconcilerOption

WithMaxSendRange sets the maximum range size to send instead of further subdividing the input range.

func WithRecentTimeSpan

func WithRecentTimeSpan(d time.Duration) RangeSetReconcilerOption

WithRecentTimeSpan specifies the time span for recent items.

func WithSampleSize

func WithSampleSize(s int) RangeSetReconcilerOption

WithSampleSize sets the size of the MinHash sample to be sent to the peer.

func WithTracer

func WithTracer(t Tracer) RangeSetReconcilerOption

WithTracer specifies a tracer for RangeSetReconciler.

type RecentMessage

type RecentMessage struct {
	SinceTime uint64
}

RecentMessage is a SyncMessage that denotes a set of items that have been added to the peer's set since the specific point in time.

func (*RecentMessage) Count

func (m *RecentMessage) Count() int

func (*RecentMessage) DecodeScale

func (t *RecentMessage) DecodeScale(dec *scale.Decoder) (total int, err error)

func (*RecentMessage) EncodeScale

func (t *RecentMessage) EncodeScale(enc *scale.Encoder) (total int, err error)

func (*RecentMessage) Fingerprint

func (m *RecentMessage) Fingerprint() Fingerprint

func (*RecentMessage) Keys

func (m *RecentMessage) Keys() []KeyBytes

func (*RecentMessage) Sample

func (m *RecentMessage) Sample() []MinhashSampleItem

func (*RecentMessage) Since

func (m *RecentMessage) Since() time.Time

func (*RecentMessage) Type

func (m *RecentMessage) Type() MessageType

func (*RecentMessage) X

func (m *RecentMessage) X() KeyBytes

func (*RecentMessage) Y

func (m *RecentMessage) Y() KeyBytes

type Requester

type Requester interface {
	Run(context.Context) error
	StreamRequest(context.Context, p2p.Peer, []byte, server.StreamRequestCallback, ...string) error
}

type SampleMessage

type SampleMessage struct {
	RangeX, RangeY   CompactHash
	RangeFingerprint Fingerprint
	NumItems         uint32
	// NOTE: max must be in sync with maxSampleSize in hashsync/rangesync.go
	SampleItems []MinhashSampleItem `scale:"max=1000"`
}

SampleMessage is a sample of set items.

func (*SampleMessage) Count

func (m *SampleMessage) Count() int

func (*SampleMessage) DecodeScale

func (t *SampleMessage) DecodeScale(dec *scale.Decoder) (total int, err error)

func (*SampleMessage) EncodeScale

func (t *SampleMessage) EncodeScale(enc *scale.Encoder) (total int, err error)

func (*SampleMessage) Fingerprint

func (m *SampleMessage) Fingerprint() Fingerprint

func (*SampleMessage) Keys

func (m *SampleMessage) Keys() []KeyBytes

func (*SampleMessage) Sample

func (m *SampleMessage) Sample() []MinhashSampleItem

func (*SampleMessage) Since

func (m *SampleMessage) Since() time.Time

func (*SampleMessage) Type

func (m *SampleMessage) Type() MessageType

func (*SampleMessage) X

func (m *SampleMessage) X() KeyBytes

func (*SampleMessage) Y

func (m *SampleMessage) Y() KeyBytes

type Seq

type Seq iter.Seq[KeyBytes]

Seq represents an ordered sequence of elements. Unless the sequence is empty or an error occurs while iterating, it yields elements endlessly, wrapping around to the first element after the last one.

func EmptySeq

func EmptySeq() Seq

EmptySeq returns an empty sequence.

func (Seq) Collect

func (s Seq) Collect() []KeyBytes

Collect returns all elements in the sequence as a slice. It may not be very efficient due to reallocations, and thus it should only be used for small sequences or for testing.

func (Seq) First

func (s Seq) First() KeyBytes

First returns the first element from the sequence, if any. If the sequence is empty, it returns nil.

func (Seq) FirstN

func (s Seq) FirstN(n int) []KeyBytes

FirstN returns the first n elements from the sequence.

func (Seq) MarshalLogArray

func (s Seq) MarshalLogArray(enc zapcore.ArrayEncoder) error

MarshalLogArray implements zapcore.ArrayMarshaler.

type SeqErrorFunc

type SeqErrorFunc func() error

SeqErrorFunc is a function that returns an error that happened during iteration, if any.

var NoSeqError SeqErrorFunc = func() error { return nil }

NoSeqError is a SeqErrorFunc that always returns nil (no error).

func SeqError

func SeqError(err error) SeqErrorFunc

SeqError returns a SeqErrorFunc that always returns the given error.

type SeqResult

type SeqResult struct {
	Seq   Seq
	Error SeqErrorFunc
}

SeqResult represents the result of a function that returns a sequence. Error method most be called to check if an error occurred after processing the sequence. Error is reset at the beginning of each Seq call (iteration over the sequence).

func EmptySeqResult

func EmptySeqResult() SeqResult

EmptySeqResult returns an empty sequence result.

func ErrorSeqResult

func ErrorSeqResult(err error) SeqResult

ErrorSeqResult returns a sequence result with an empty sequence and an error.

func (SeqResult) Collect

func (s SeqResult) Collect() ([]KeyBytes, error)

Collect returns all elements in the result's sequence as a slice. It may not be very efficient due to reallocations, and thus it should only be used for small sequences or for testing.

func (SeqResult) First

func (s SeqResult) First() (KeyBytes, error)

First returns the first element from the result's sequence, if any. If the sequence is empty, it returns nil.

func (SeqResult) FirstN

func (s SeqResult) FirstN(n int) ([]KeyBytes, error)

FirstN returns the first n elements from the result's sequence.

func (SeqResult) MarshalLogArray

func (s SeqResult) MarshalLogArray(enc zapcore.ArrayEncoder) error

MarshalLogArray implements zapcore.ArrayMarshaler.

type SplitInfo

type SplitInfo struct {
	// 2 parts of the range
	Parts [2]RangeInfo
	// Middle point between the ranges
	Middle KeyBytes
}

SplitInfo contains information about range split in two.

type SyncMessage

type SyncMessage interface {
	// Type returns the type of the message.
	Type() MessageType
	// X returns the beginning of the range.
	X() KeyBytes
	// Y returns the end of the range.
	Y() KeyBytes
	// Fingerprint returns the fingerprint of the range.
	Fingerprint() Fingerprint
	// Count returns the number of items in the range.
	Count() int
	// Keys returns the keys of the items in the range.
	Keys() []KeyBytes
	// Since returns the time since when the recent items are being sent.
	Since() time.Time
	// Sample returns the minhash sample of the items in the range.
	Sample() []MinhashSampleItem
}

SyncMessage is a message that is a part of the sync protocol.

type Tracer

type Tracer interface {
	// OnDumbSync is called when the difference metric exceeds maxDiff and dumb
	// reconciliation process is used
	OnDumbSync()
	// OnRecent is invoked when Recent message is received
	OnRecent(receivedItems, sentItems int)
}

Tracer tracks the reconciliation process.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL