Documentation ¶
Overview ¶
Package rangesync implements pairwise set reconciliation protocol.
The protocol is based on this paper: Range-Based Set Reconciliation by Aljoscha Meyer https://arxiv.org/pdf/2212.13567.pdf
The protocol has an advantage of a possibility to build a reusable synchronization helper structures (ordered sets) and also a possibility to use the same structure to reconcile subranges of the ordered sets efficiently against multiple peers. The disadvantage is that the algorithm is not very efficient for large differences (like tens of thousands of elements) when the elements have no apparent ordering, which is the case with hash-based IDs like ATX ids. In order to mitigate this problem, the protocol supports exchanging recently-received items between peers before beginning the actual reconciliation. When the difference is large, the algorithm can automatically degrade to dumb "send me the whole set" mode. The resulting increased load on the network and the peer computers is mitigated by splitting the element range between multiple peers.
The core concepts are the following:
- Ordered sets. The sets to be reconciled are ordered. The elements in the set are also called "keys" b/c they represent the IDs of the actual objects being synchronized, such as ATXs.
- Ranges. [x, y) range denotes a range of items in the set, where x is inclusive and y is exclusive. The ranges may wrap around. This means: - x == y: the whole set - x < y: a normal range starting with x and ending below y - x > y: a wrapped around range, that is from x (inclusive) to the end of the set and from the beginning of the set to y, non-inclusive.
- Fingerprint. Each range has a fingerprint, which is all the IDs (keys) in the range XORed together. The fingerprint is used to quickly check if the range is in sync with the peer.
Each OrderedSet is supposed to be able to provide the number of items and the fingerprint of items in a range relatively cheaply. Additionally, the OrderedSet has Recent method that retrieves recently-based keys (elements) based on the provided timestamp. This reconciliation helper mechanism is optional, so Recent may just return an empty sequence.
Below is a log of a sample interaction between two peers A and B. Interactions:
A: empty set; B: empty set A -> B:
EmptySet EndRound
B -> A:
Done
A: empty set; B: non-empty set A -> B:
EmptySet EndRound
B -> A:
ItemBatch ItemBatch ...
A -> B:
Done
A: small set (< maxSendRange); B: non-empty set A -> B:
ItemBatch ItemBatch ... RangeContents [x, y) EndRound
B -> A:
ItemBatch ItemBatch ...
A -> B:
Done
A: large set; B: non-empty set; maxDiff < 0 A -> B:
Fingerprint [x, y) EndRound
B -> A:
Fingerprint [x, m) Fingerprint [m, y) EndRound
A -> B:
ItemBatch ItemBatch ... RangeContents [x, m) EndRound
A -> B:
Done
A: large set; B: non-empty set; maxDiff >= 0; differenceMetric <= maxDiff NOTE: Sample includes fingerprint A -> B:
Sample [x, y) EndRound
B -> A:
Fingerprint [x, m) Fingerprint [m, y) EndRound
A -> B:
ItemBatch ItemBatch ... RangeContents [x, m) EndRound
A -> B:
Done
A: large set; B: non-empty set; maxDiff >= 0; differenceMetric > maxDiff A -> B:
Sample [x, y) EndRound
B -> A:
ItemBatch ItemBatch ... RangeContents [x, y) EndRound
A -> B:
Done
A: large set; B: non-empty set; sync priming; maxDiff >= 0; differenceMetric <= maxDiff (after priming) A -> B:
ItemBatch ItemBatch ... Recent EndRound
B -> A:
ItemBatch ItemBatch ... Sample [x, y) EndRound
A -> B:
Fingerprint [x, m) Fingerprint [m, y) EndRound
B -> A:
ItemBatch ItemBatch ... RangeContents [x, m) EndRound
A -> B:
Done
A: large set; B: non-empty set; sync priming; maxDiff < 0 A -> B:
ItemBatch ItemBatch ... Recent EndRound
B -> A:
ItemBatch ItemBatch ... Fingerprint [x, y) EndRound
A -> B:
Fingerprint [x, m) Fingerprint [m, y) EndRound
B -> A:
ItemBatch ItemBatch ... RangeContents [x, m) EndRound
A -> B:
Done
nolint
Index ¶
- Constants
- Variables
- func CalcSim(a, b []MinhashSampleItem) float64
- func SyncMessageToString(m SyncMessage) string
- type CompactHash
- type Conduit
- type ConduitOption
- type Dispatcher
- type DoneMessage
- type EmptyRangeMessage
- func (m *EmptyRangeMessage) Count() int
- func (t *EmptyRangeMessage) DecodeScale(dec *scale.Decoder) (total int, err error)
- func (t *EmptyRangeMessage) EncodeScale(enc *scale.Encoder) (total int, err error)
- func (m *EmptyRangeMessage) Fingerprint() Fingerprint
- func (m *EmptyRangeMessage) Keys() []KeyBytes
- func (m *EmptyRangeMessage) Sample() []MinhashSampleItem
- func (m *EmptyRangeMessage) Since() time.Time
- func (m *EmptyRangeMessage) Type() MessageType
- func (m *EmptyRangeMessage) X() KeyBytes
- func (m *EmptyRangeMessage) Y() KeyBytes
- type EmptySetMessage
- type EndRoundMessage
- type Fingerprint
- type FingerprintMessage
- func (m *FingerprintMessage) Count() int
- func (t *FingerprintMessage) DecodeScale(dec *scale.Decoder) (total int, err error)
- func (t *FingerprintMessage) EncodeScale(enc *scale.Encoder) (total int, err error)
- func (m *FingerprintMessage) Fingerprint() Fingerprint
- func (m *FingerprintMessage) Keys() []KeyBytes
- func (m *FingerprintMessage) Sample() []MinhashSampleItem
- func (m *FingerprintMessage) Since() time.Time
- func (m *FingerprintMessage) Type() MessageType
- func (m *FingerprintMessage) X() KeyBytes
- func (m *FingerprintMessage) Y() KeyBytes
- type Handler
- type ItemBatchMessage
- func (m *ItemBatchMessage) Count() int
- func (t *ItemBatchMessage) DecodeScale(dec *scale.Decoder) (total int, err error)
- func (t *ItemBatchMessage) EncodeScale(enc *scale.Encoder) (total int, err error)
- func (m *ItemBatchMessage) Fingerprint() Fingerprint
- func (m *ItemBatchMessage) Keys() []KeyBytes
- func (m *ItemBatchMessage) Sample() []MinhashSampleItem
- func (m *ItemBatchMessage) Since() time.Time
- func (m *ItemBatchMessage) Type() MessageType
- func (m *ItemBatchMessage) X() KeyBytes
- func (m *ItemBatchMessage) Y() KeyBytes
- type KeyBytes
- type KeyCollection
- type Marker
- type MessageType
- type MinhashSampleItem
- type OrderedSet
- type PairwiseSetSyncer
- func (pss *PairwiseSetSyncer) Probe(ctx context.Context, peer p2p.Peer, os OrderedSet, x, y KeyBytes) (ProbeResult, error)
- func (pss *PairwiseSetSyncer) Received() int
- func (pss *PairwiseSetSyncer) Register(d *Dispatcher, os OrderedSet)
- func (pss *PairwiseSetSyncer) Sent() int
- func (pss *PairwiseSetSyncer) Serve(ctx context.Context, stream io.ReadWriter, os OrderedSet) error
- func (pss *PairwiseSetSyncer) Sync(ctx context.Context, peer p2p.Peer, os OrderedSet, x, y KeyBytes) error
- type ProbeMessage
- func (m *ProbeMessage) Count() int
- func (t *ProbeMessage) DecodeScale(dec *scale.Decoder) (total int, err error)
- func (t *ProbeMessage) EncodeScale(enc *scale.Encoder) (total int, err error)
- func (m *ProbeMessage) Fingerprint() Fingerprint
- func (m *ProbeMessage) Keys() []KeyBytes
- func (m *ProbeMessage) Sample() []MinhashSampleItem
- func (m *ProbeMessage) Since() time.Time
- func (m *ProbeMessage) Type() MessageType
- func (m *ProbeMessage) X() KeyBytes
- func (m *ProbeMessage) Y() KeyBytes
- type ProbeResult
- type RangeContentsMessage
- func (m *RangeContentsMessage) Count() int
- func (t *RangeContentsMessage) DecodeScale(dec *scale.Decoder) (total int, err error)
- func (t *RangeContentsMessage) EncodeScale(enc *scale.Encoder) (total int, err error)
- func (m *RangeContentsMessage) Fingerprint() Fingerprint
- func (m *RangeContentsMessage) Keys() []KeyBytes
- func (m *RangeContentsMessage) Sample() []MinhashSampleItem
- func (m *RangeContentsMessage) Since() time.Time
- func (m *RangeContentsMessage) Type() MessageType
- func (m *RangeContentsMessage) X() KeyBytes
- func (m *RangeContentsMessage) Y() KeyBytes
- type RangeInfo
- type RangeSetReconciler
- func (rsr *RangeSetReconciler) HandleProbeResponse(c Conduit, info RangeInfo) (pr ProbeResult, err error)
- func (rsr *RangeSetReconciler) Initiate(c Conduit, x, y KeyBytes) error
- func (rsr *RangeSetReconciler) InitiateProbe(c Conduit, x, y KeyBytes) (RangeInfo, error)
- func (rsr *RangeSetReconciler) Run(c Conduit) error
- type RangeSetReconcilerOption
- func WithClock(c clockwork.Clock) RangeSetReconcilerOption
- func WithItemChunkSize(n int) RangeSetReconcilerOption
- func WithLogger(log *zap.Logger) RangeSetReconcilerOption
- func WithMaxDiff(d float64) RangeSetReconcilerOption
- func WithMaxSendRange(n int) RangeSetReconcilerOption
- func WithRecentTimeSpan(d time.Duration) RangeSetReconcilerOption
- func WithSampleSize(s int) RangeSetReconcilerOption
- func WithTracer(t Tracer) RangeSetReconcilerOption
- type RecentMessage
- func (m *RecentMessage) Count() int
- func (t *RecentMessage) DecodeScale(dec *scale.Decoder) (total int, err error)
- func (t *RecentMessage) EncodeScale(enc *scale.Encoder) (total int, err error)
- func (m *RecentMessage) Fingerprint() Fingerprint
- func (m *RecentMessage) Keys() []KeyBytes
- func (m *RecentMessage) Sample() []MinhashSampleItem
- func (m *RecentMessage) Since() time.Time
- func (m *RecentMessage) Type() MessageType
- func (m *RecentMessage) X() KeyBytes
- func (m *RecentMessage) Y() KeyBytes
- type Requester
- type SampleMessage
- func (m *SampleMessage) Count() int
- func (t *SampleMessage) DecodeScale(dec *scale.Decoder) (total int, err error)
- func (t *SampleMessage) EncodeScale(enc *scale.Encoder) (total int, err error)
- func (m *SampleMessage) Fingerprint() Fingerprint
- func (m *SampleMessage) Keys() []KeyBytes
- func (m *SampleMessage) Sample() []MinhashSampleItem
- func (m *SampleMessage) Since() time.Time
- func (m *SampleMessage) Type() MessageType
- func (m *SampleMessage) X() KeyBytes
- func (m *SampleMessage) Y() KeyBytes
- type Seq
- type SeqErrorFunc
- type SeqResult
- type SplitInfo
- type SyncMessage
- type Tracer
Constants ¶
const ( DefaultMaxSendRange = 16 DefaultItemChunkSize = 1024 DefaultSampleSize = 200 )
const (
// FingerprintSize is the size of a fingerprint in bytes.
FingerprintSize = 12
)
Variables ¶
var ErrLimitExceeded = errors.New("sync traffic/message limit exceeded")
Functions ¶
func CalcSim ¶
func CalcSim(a, b []MinhashSampleItem) float64
CalcSim estimates the Jaccard similarity coefficient between two sets based on the samples, which are derived from N lowest-valued elements of each set. The return value is in 0..1 range, with 0 meaning almost no intersection and 1 meaning the sets are mostly equal. The precision of the estimate will suffer if none of the sets are empty and they have different size, with return value tending to be lower than the actual J coefficient.
func SyncMessageToString ¶
func SyncMessageToString(m SyncMessage) string
SyncMessageToString returns string representation of a sync message.
Types ¶
type CompactHash ¶
type CompactHash struct {
H KeyBytes
}
CompactHash encodes hashes in a compact form, skipping trailing zeroes. It also supports a nil hash (no value). The encoding format is as follows: byte 0: spec byte bytes 1..n: data bytes
The format of the spec byte is as follows: bits 0..5: number of non-zero leading bytes bits 6..7: hash type
The following hash types are supported: 0: nil hash 1: 32-byte hash 2,3: reserved
NOTE: when adding new hash types, we need to add a mechanism that makes sure that every received hash is of the expected type. Alternatively, we need to add some kind of context to the scale.Decoder / scale.Encoder, which may contain the size of hashes to be used.
func (*CompactHash) DecodeScale ¶
func (c *CompactHash) DecodeScale(dec *scale.Decoder) (int, error)
DecodeScale implements scale.Decodable.
func (*CompactHash) EncodeScale ¶
func (c *CompactHash) EncodeScale(enc *scale.Encoder) (int, error)
EncodeScale implements scale.Encodable.
func (*CompactHash) ToOrdered ¶
func (c *CompactHash) ToOrdered() KeyBytes
type Conduit ¶
type Conduit interface { // NextMessage returns the next SyncMessage, or nil if there are no more // SyncMessages for this session. NextMessage() (SyncMessage, error) // Send sends a SyncMessage to the peer. Send(SyncMessage) error }
Conduit handles receiving and sending peer messages.
type ConduitOption ¶
type ConduitOption func(c *wireConduit)
ConduitOption specifies an option for a message conduit.
func WithMessageLimit ¶
func WithMessageLimit(limit int) ConduitOption
WithMessageLimit sets a limit on the total number of messages sent and received. Zero or negative values disable the limit.
func WithTrafficLimit ¶
func WithTrafficLimit(limit int) ConduitOption
WithTrafficLimit sets a limit on the total number of bytes sent and received. Zero or negative values disable the limit.
type Dispatcher ¶
Dispatcher multiplexes a P2P Server to multiple set reconcilers.
func NewDispatcher ¶
func NewDispatcher(logger *zap.Logger) *Dispatcher
NewDispatcher creates a new Dispatcher.
func (*Dispatcher) Dispatch ¶
func (d *Dispatcher) Dispatch( ctx context.Context, req []byte, stream io.ReadWriter, ) (err error)
Dispatch dispatches a request to a handler.
func (*Dispatcher) Register ¶
func (d *Dispatcher) Register(name string, h Handler)
Register registers a handler with a Dispatcher.
func (*Dispatcher) SetupServer ¶
func (d *Dispatcher) SetupServer( host host.Host, proto string, opts ...server.Opt, ) *server.Server
SetupServer creates a new P2P Server for the Dispatcher.
type DoneMessage ¶
type DoneMessage struct{ Marker }
DoneMessage is a SyncMessage that denotes the end of the synchronization. The peer should stop any further processing after receiving this message.
func (*DoneMessage) Type ¶
func (*DoneMessage) Type() MessageType
type EmptyRangeMessage ¶
type EmptyRangeMessage struct {
RangeX, RangeY CompactHash
}
EmptyRangeMessage notifies the peer that it needs to send all of its items in the specified range.
func (*EmptyRangeMessage) Count ¶
func (m *EmptyRangeMessage) Count() int
func (*EmptyRangeMessage) DecodeScale ¶
func (t *EmptyRangeMessage) DecodeScale(dec *scale.Decoder) (total int, err error)
func (*EmptyRangeMessage) EncodeScale ¶
func (t *EmptyRangeMessage) EncodeScale(enc *scale.Encoder) (total int, err error)
func (*EmptyRangeMessage) Fingerprint ¶
func (m *EmptyRangeMessage) Fingerprint() Fingerprint
func (*EmptyRangeMessage) Keys ¶
func (m *EmptyRangeMessage) Keys() []KeyBytes
func (*EmptyRangeMessage) Sample ¶
func (m *EmptyRangeMessage) Sample() []MinhashSampleItem
func (*EmptyRangeMessage) Since ¶
func (m *EmptyRangeMessage) Since() time.Time
func (*EmptyRangeMessage) Type ¶
func (m *EmptyRangeMessage) Type() MessageType
func (*EmptyRangeMessage) X ¶
func (m *EmptyRangeMessage) X() KeyBytes
func (*EmptyRangeMessage) Y ¶
func (m *EmptyRangeMessage) Y() KeyBytes
type EmptySetMessage ¶
type EmptySetMessage struct{ Marker }
EmptySetMessage is a SyncMessage that denotes an empty set, requesting the peer to send all of its items.
func (*EmptySetMessage) Type ¶
func (*EmptySetMessage) Type() MessageType
type EndRoundMessage ¶
type EndRoundMessage struct{ Marker }
EndRoundMessage is a SyncMessage that denotes the end of the sync round.
func (*EndRoundMessage) Type ¶
func (*EndRoundMessage) Type() MessageType
type Fingerprint ¶
type Fingerprint [FingerprintSize]byte
Fingerprint represents a fingerprint of a set of keys. The fingerprint is obtained by XORing together the keys in the set.
func EmptyFingerprint ¶
func EmptyFingerprint() Fingerprint
EmptyFingerprint returns an empty fingerprint.
func MustParseHexFingerprint ¶
func MustParseHexFingerprint(s string) Fingerprint
MustParseHexFingerprint converts a hex string to Fingerprint.
func RandomFingerprint ¶
func RandomFingerprint() Fingerprint
RandomFingerprint generates a random fingerprint.
func (*Fingerprint) BitFromLeft ¶
func (fp *Fingerprint) BitFromLeft(i int) bool
BitFromLeft returns the n-th bit from the left in the fingerprint.
func (Fingerprint) Compare ¶
func (fp Fingerprint) Compare(other Fingerprint) int
Compare compares two fingerprints.
func (Fingerprint) ShortString ¶
func (fp Fingerprint) ShortString() string
String implements log.ShortString.
func (*Fingerprint) Update ¶
func (fp *Fingerprint) Update(h []byte)
Update includes the byte slice in the fingerprint.
type FingerprintMessage ¶
type FingerprintMessage struct {
RangeX, RangeY CompactHash
RangeFingerprint Fingerprint
NumItems uint32
}
FingerprintMessage contains range fingerprint for comparison against the peer's fingerprint of the range with the same bounds [RangeX, RangeY).
func (*FingerprintMessage) Count ¶
func (m *FingerprintMessage) Count() int
func (*FingerprintMessage) DecodeScale ¶
func (t *FingerprintMessage) DecodeScale(dec *scale.Decoder) (total int, err error)
func (*FingerprintMessage) EncodeScale ¶
func (t *FingerprintMessage) EncodeScale(enc *scale.Encoder) (total int, err error)
func (*FingerprintMessage) Fingerprint ¶
func (m *FingerprintMessage) Fingerprint() Fingerprint
func (*FingerprintMessage) Keys ¶
func (m *FingerprintMessage) Keys() []KeyBytes
func (*FingerprintMessage) Sample ¶
func (m *FingerprintMessage) Sample() []MinhashSampleItem
func (*FingerprintMessage) Since ¶
func (m *FingerprintMessage) Since() time.Time
func (*FingerprintMessage) Type ¶
func (m *FingerprintMessage) Type() MessageType
func (*FingerprintMessage) X ¶
func (m *FingerprintMessage) X() KeyBytes
func (*FingerprintMessage) Y ¶
func (m *FingerprintMessage) Y() KeyBytes
type Handler ¶
type Handler func(ctx context.Context, s io.ReadWriter) error
Handler is a function that handles a request for a Dispatcher.
type ItemBatchMessage ¶
type ItemBatchMessage struct {
ContentKeys KeyCollection `scale:"max=1024"`
}
ItemBatchMessage denotes a batch of items to be added to the peer's set.
func (*ItemBatchMessage) Count ¶
func (m *ItemBatchMessage) Count() int
func (*ItemBatchMessage) DecodeScale ¶
func (t *ItemBatchMessage) DecodeScale(dec *scale.Decoder) (total int, err error)
func (*ItemBatchMessage) EncodeScale ¶
func (t *ItemBatchMessage) EncodeScale(enc *scale.Encoder) (total int, err error)
func (*ItemBatchMessage) Fingerprint ¶
func (m *ItemBatchMessage) Fingerprint() Fingerprint
func (*ItemBatchMessage) Keys ¶
func (m *ItemBatchMessage) Keys() []KeyBytes
func (*ItemBatchMessage) Sample ¶
func (m *ItemBatchMessage) Sample() []MinhashSampleItem
func (*ItemBatchMessage) Since ¶
func (m *ItemBatchMessage) Since() time.Time
func (*ItemBatchMessage) Type ¶
func (m *ItemBatchMessage) Type() MessageType
func (*ItemBatchMessage) X ¶
func (m *ItemBatchMessage) X() KeyBytes
func (*ItemBatchMessage) Y ¶
func (m *ItemBatchMessage) Y() KeyBytes
type KeyBytes ¶
type KeyBytes []byte
KeyBytes represents an item (key) in a reconciliable set.
func MustParseHexKeyBytes ¶
MustParseHexKeyBytes converts a hex string to KeyBytes.
func RandomKeyBytes ¶
RandomKeyBytes generates random data in bytes for testing.
func (KeyBytes) Inc ¶
Inc returns the key with the same number of bytes as this one, obtained by incrementing the key by one. It returns true if the increment has caused an overflow.
func (KeyBytes) ShortString ¶
String implements log.ShortString.
type KeyCollection ¶
type KeyCollection struct {
Keys []KeyBytes
}
KeyCollection represents a collection of keys of the same size.
func (*KeyCollection) DecodeScale ¶
func (c *KeyCollection) DecodeScale(dec *scale.Decoder) (int, error)
DecodeScale implements scale.Decodable.
func (*KeyCollection) EncodeScale ¶
func (c *KeyCollection) EncodeScale(enc *scale.Encoder) (int, error)
EncodeScale implements scale.Encodable.
type Marker ¶
type Marker struct{}
func (*Marker) Fingerprint ¶
func (*Marker) Fingerprint() Fingerprint
func (*Marker) Sample ¶
func (*Marker) Sample() []MinhashSampleItem
type MessageType ¶
type MessageType byte
MessageType specifies the type of a sync message.
const ( // Done message is sent to indicate the completion of the whole sync run. MessageTypeDone MessageType = iota // EndRoundMessage is sent to indicate the completion of a single round. MessageTypeEndRound // EmptySetMessage is sent to indicate that the set is empty. In response, the // receiving side sends its whole set using ItemBatch and RangeContents messages. MessageTypeEmptySet // EmptyRangeMessage is sent to indicate that the specified range is empty. In // response, the receiving side sends the contents of the range using ItemBatch // and RangeContents messages. MessageTypeEmptyRange // Fingerprint carries a range fingerprint. Depending on the local fingerprint of // the same range, the receiving side may not reply to this message (range // completed), may send the contents of the range using ItemBatch and // RangeContents messages if the range is small enough, or it can split the range // in two and send back the Fingerprint messages for each of resulting parts. MessageTypeFingerprint // RangeContents message is sent after ItemBatch messages to indicate what range // the messages belong too. If the receiving side has any items within the same // range, these items are sent back using ItemBatch and RangeContents messages. MessageTypeRangeContents // ItemBatchMessage is sent to carry a batch of items. MessageTypeItemBatch // ProbeMessage is sent to request the count of items and an estimate of the // Jaccard similarity coefficient for the specfied range. MessageTypeProbe // Sample message carries a minhash sample along with the fingerprint and // the fingerprint and number of items within the specied range. MessageTypeSample // Recent message is sent after ItemBatch messages to indicate that the batches // contains items recently added to the set. MessageTypeRecent )
type MinhashSampleItem ¶
type MinhashSampleItem uint32
MinhashSampleItem represents an item of minhash sample subset.
func MinhashSampleItemFromKeyBytes ¶
func MinhashSampleItemFromKeyBytes(h KeyBytes) MinhashSampleItem
MinhashSampleItemFromKeyBytes uses lower 32 bits of a hash as a MinhashSampleItem.
func Sample ¶
func Sample(sr SeqResult, count, sampleSize int) ([]MinhashSampleItem, error)
Sample retrieves min(count, sampleSize) items friom the ordered sequence, extracting MinhashSampleItem from each value.
func (MinhashSampleItem) Compare ¶
func (m MinhashSampleItem) Compare(other MinhashSampleItem) int
func (*MinhashSampleItem) DecodeScale ¶
func (m *MinhashSampleItem) DecodeScale(d *scale.Decoder) (int, error)
DecodeScale implements scale.Decodable.
func (MinhashSampleItem) EncodeScale ¶
func (m MinhashSampleItem) EncodeScale(e *scale.Encoder) (int, error)
EncodeScale implements scale.Encodable.
func (MinhashSampleItem) String ¶
func (m MinhashSampleItem) String() string
type OrderedSet ¶
type OrderedSet interface { // Receive handles a new key received from the peer. // It may or may not add it to the set immediately; this doesn't affect set // reconciliation operation. Receive(k KeyBytes) error // GetRangeInfo returns RangeInfo for the item range in the ordered set, // bounded by [x, y). // x == y indicates the whole set. // x < y indicates a normal range starting with x and ending below y. // x > y indicates a wrapped around range, that is from x (inclusive) to then end // of the set and from the beginning of the set to y, non-inclusive. // If count >= 0, at most count items are returned, and RangeInfo // is returned for the corresponding subrange of the requested range. // If both x and y are nil, the information for the entire set is returned. // If any of x or y is nil, the other one must be nil as well. GetRangeInfo(x, y KeyBytes, count int) (RangeInfo, error) // SplitRange splits the range roughly after the specified count of items, // returning RangeInfo for the first half and the second half of the range. SplitRange(x, y KeyBytes, count int) (SplitInfo, error) // Items returns the sequence of items in the set. Items() SeqResult // Empty returns true if the set is empty. Empty() (bool, error) // Copy makes a shallow copy of the OrderedSet. // syncScope argument is a hint that can be used to optimize resource usage. // If syncScope is true, then the copy is intended to be used for the duration of // a synchronization run. // If syncScope if false, then the lifetime of the copy is not clearly defined. Copy(syncScope bool) OrderedSet // Recent returns an Iterator that yields the items added since the specified // timestamp. Some OrderedSet implementations may not have Recent implemented, in // which case it should return an empty sequence. Recent(since time.Time) (SeqResult, int) }
OrderedSet represents the set that can be synced against a remote peer.
func NewDumbSet ¶
func NewDumbSet(disableReAdd bool) OrderedSet
NewDumbSet creates a new dumbSet instance. If disableReAdd is true, the set will panic if the same item is received twice.
type PairwiseSetSyncer ¶
type PairwiseSetSyncer struct {
// contains filtered or unexported fields
}
func NewPairwiseSetSyncer ¶
func NewPairwiseSetSyncer( r Requester, name string, opts []RangeSetReconcilerOption, conduitOpts []ConduitOption, ) *PairwiseSetSyncer
func (*PairwiseSetSyncer) Probe ¶
func (pss *PairwiseSetSyncer) Probe( ctx context.Context, peer p2p.Peer, os OrderedSet, x, y KeyBytes, ) (ProbeResult, error)
func (*PairwiseSetSyncer) Received ¶
func (pss *PairwiseSetSyncer) Received() int
func (*PairwiseSetSyncer) Register ¶
func (pss *PairwiseSetSyncer) Register(d *Dispatcher, os OrderedSet)
func (*PairwiseSetSyncer) Sent ¶
func (pss *PairwiseSetSyncer) Sent() int
func (*PairwiseSetSyncer) Serve ¶
func (pss *PairwiseSetSyncer) Serve(ctx context.Context, stream io.ReadWriter, os OrderedSet) error
func (*PairwiseSetSyncer) Sync ¶
func (pss *PairwiseSetSyncer) Sync( ctx context.Context, peer p2p.Peer, os OrderedSet, x, y KeyBytes, ) error
type ProbeMessage ¶
type ProbeMessage struct {
RangeX, RangeY CompactHash
RangeFingerprint Fingerprint
SampleSize uint32
}
ProbeMessage requests bounded range fingerprint and count from the peer, along with a minhash sample if fingerprints differ.
func (*ProbeMessage) Count ¶
func (m *ProbeMessage) Count() int
func (*ProbeMessage) DecodeScale ¶
func (t *ProbeMessage) DecodeScale(dec *scale.Decoder) (total int, err error)
func (*ProbeMessage) EncodeScale ¶
func (t *ProbeMessage) EncodeScale(enc *scale.Encoder) (total int, err error)
func (*ProbeMessage) Fingerprint ¶
func (m *ProbeMessage) Fingerprint() Fingerprint
func (*ProbeMessage) Keys ¶
func (m *ProbeMessage) Keys() []KeyBytes
func (*ProbeMessage) Sample ¶
func (m *ProbeMessage) Sample() []MinhashSampleItem
func (*ProbeMessage) Since ¶
func (m *ProbeMessage) Since() time.Time
func (*ProbeMessage) Type ¶
func (m *ProbeMessage) Type() MessageType
func (*ProbeMessage) X ¶
func (m *ProbeMessage) X() KeyBytes
func (*ProbeMessage) Y ¶
func (m *ProbeMessage) Y() KeyBytes
type ProbeResult ¶
type ProbeResult struct { // Fingerprint of the range. FP any // Number of items in the range. Count int // An estimate of Jaccard similarity coefficient between the sets. // The range is 0..1, 0 being mostly disjoint sets and 1 being mostly equal sets. Sim float64 }
ProbeResult contains the result of a probe.
type RangeContentsMessage ¶
type RangeContentsMessage struct {
RangeX, RangeY CompactHash
NumItems uint32
}
RangeContentsMessage denotes a range for which the set of items has been sent. The peer needs to send back any items it has in the same range bounded by [RangeX, RangeY).
func (*RangeContentsMessage) Count ¶
func (m *RangeContentsMessage) Count() int
func (*RangeContentsMessage) DecodeScale ¶
func (t *RangeContentsMessage) DecodeScale(dec *scale.Decoder) (total int, err error)
func (*RangeContentsMessage) EncodeScale ¶
func (t *RangeContentsMessage) EncodeScale(enc *scale.Encoder) (total int, err error)
func (*RangeContentsMessage) Fingerprint ¶
func (m *RangeContentsMessage) Fingerprint() Fingerprint
func (*RangeContentsMessage) Keys ¶
func (m *RangeContentsMessage) Keys() []KeyBytes
func (*RangeContentsMessage) Sample ¶
func (m *RangeContentsMessage) Sample() []MinhashSampleItem
func (*RangeContentsMessage) Since ¶
func (m *RangeContentsMessage) Since() time.Time
func (*RangeContentsMessage) Type ¶
func (m *RangeContentsMessage) Type() MessageType
func (*RangeContentsMessage) X ¶
func (m *RangeContentsMessage) X() KeyBytes
func (*RangeContentsMessage) Y ¶
func (m *RangeContentsMessage) Y() KeyBytes
type RangeInfo ¶
type RangeInfo struct { // Fingerprint of the interval Fingerprint Fingerprint // Number of items in the interval Count int // Items is the sequence of set elements in the interval. Items SeqResult }
RangeInfo contains information about a range of items in the OrderedSet as returned by OrderedSet.GetRangeInfo.
type RangeSetReconciler ¶
type RangeSetReconciler struct {
// contains filtered or unexported fields
}
RangeSetReconciler reconciles two sets of items using the recursive set reconciliation protocol.
func NewRangeSetReconciler ¶
func NewRangeSetReconciler(os OrderedSet, opts ...RangeSetReconcilerOption) *RangeSetReconciler
NewRangeSetReconciler creates a new RangeSetReconciler.
func (*RangeSetReconciler) HandleProbeResponse ¶
func (rsr *RangeSetReconciler) HandleProbeResponse(c Conduit, info RangeInfo) (pr ProbeResult, err error)
HandleProbeResponse processes the probe response message and returns the probe result. info is the range info returned by InitiateProbe.
func (*RangeSetReconciler) Initiate ¶
func (rsr *RangeSetReconciler) Initiate(c Conduit, x, y KeyBytes) error
Initiate initiates the reconciliation process with the peer. If x and y are non-nil, [x, y) range is reconciled. If x and y are nil, the whole range is reconciled.
func (*RangeSetReconciler) InitiateProbe ¶
func (rsr *RangeSetReconciler) InitiateProbe( c Conduit, x, y KeyBytes, ) (RangeInfo, error)
InitiateProbe initiates a probe to retrieve the item count and Jaccard similarity coefficient from the peer.
func (*RangeSetReconciler) Run ¶
func (rsr *RangeSetReconciler) Run(c Conduit) error
Run performs sync reconciliation run using specified Conduit to send and receive messages.
type RangeSetReconcilerOption ¶
type RangeSetReconcilerOption func(r *RangeSetReconciler)
RangeSetReconcilerOption is a configuration option for RangeSetReconciler.
func WithClock ¶
func WithClock(c clockwork.Clock) RangeSetReconcilerOption
WithClock specifies the clock for RangeSetReconciler.
func WithItemChunkSize ¶
func WithItemChunkSize(n int) RangeSetReconcilerOption
WithItemChunkSize sets the size of the item chunk to use when sending the set items.
func WithLogger ¶
func WithLogger(log *zap.Logger) RangeSetReconcilerOption
WithLogger specifies the logger for RangeSetReconciler.
func WithMaxDiff ¶
func WithMaxDiff(d float64) RangeSetReconcilerOption
WithMaxDiff sets maximum set difference metric (0..1) allowed for recursive reconciliation, with value of 0 meaning equal sets and 1 meaning completely disjoint set. If the difference metric MaxDiff value, the whole set is transmitted instead of applying the recursive algorithm.
func WithMaxSendRange ¶
func WithMaxSendRange(n int) RangeSetReconcilerOption
WithMaxSendRange sets the maximum range size to send instead of further subdividing the input range.
func WithRecentTimeSpan ¶
func WithRecentTimeSpan(d time.Duration) RangeSetReconcilerOption
WithRecentTimeSpan specifies the time span for recent items.
func WithSampleSize ¶
func WithSampleSize(s int) RangeSetReconcilerOption
WithSampleSize sets the size of the MinHash sample to be sent to the peer.
func WithTracer ¶
func WithTracer(t Tracer) RangeSetReconcilerOption
WithTracer specifies a tracer for RangeSetReconciler.
type RecentMessage ¶
type RecentMessage struct {
SinceTime uint64
}
RecentMessage is a SyncMessage that denotes a set of items that have been added to the peer's set since the specific point in time.
func (*RecentMessage) Count ¶
func (m *RecentMessage) Count() int
func (*RecentMessage) DecodeScale ¶
func (t *RecentMessage) DecodeScale(dec *scale.Decoder) (total int, err error)
func (*RecentMessage) EncodeScale ¶
func (t *RecentMessage) EncodeScale(enc *scale.Encoder) (total int, err error)
func (*RecentMessage) Fingerprint ¶
func (m *RecentMessage) Fingerprint() Fingerprint
func (*RecentMessage) Keys ¶
func (m *RecentMessage) Keys() []KeyBytes
func (*RecentMessage) Sample ¶
func (m *RecentMessage) Sample() []MinhashSampleItem
func (*RecentMessage) Since ¶
func (m *RecentMessage) Since() time.Time
func (*RecentMessage) Type ¶
func (m *RecentMessage) Type() MessageType
func (*RecentMessage) X ¶
func (m *RecentMessage) X() KeyBytes
func (*RecentMessage) Y ¶
func (m *RecentMessage) Y() KeyBytes
type SampleMessage ¶
type SampleMessage struct {
RangeX, RangeY CompactHash
RangeFingerprint Fingerprint
NumItems uint32
// NOTE: max must be in sync with maxSampleSize in hashsync/rangesync.go
SampleItems []MinhashSampleItem `scale:"max=1000"`
}
SampleMessage is a sample of set items.
func (*SampleMessage) Count ¶
func (m *SampleMessage) Count() int
func (*SampleMessage) DecodeScale ¶
func (t *SampleMessage) DecodeScale(dec *scale.Decoder) (total int, err error)
func (*SampleMessage) EncodeScale ¶
func (t *SampleMessage) EncodeScale(enc *scale.Encoder) (total int, err error)
func (*SampleMessage) Fingerprint ¶
func (m *SampleMessage) Fingerprint() Fingerprint
func (*SampleMessage) Keys ¶
func (m *SampleMessage) Keys() []KeyBytes
func (*SampleMessage) Sample ¶
func (m *SampleMessage) Sample() []MinhashSampleItem
func (*SampleMessage) Since ¶
func (m *SampleMessage) Since() time.Time
func (*SampleMessage) Type ¶
func (m *SampleMessage) Type() MessageType
func (*SampleMessage) X ¶
func (m *SampleMessage) X() KeyBytes
func (*SampleMessage) Y ¶
func (m *SampleMessage) Y() KeyBytes
type Seq ¶
Seq represents an ordered sequence of elements. Unless the sequence is empty or an error occurs while iterating, it yields elements endlessly, wrapping around to the first element after the last one.
func (Seq) Collect ¶
Collect returns all elements in the sequence as a slice. It may not be very efficient due to reallocations, and thus it should only be used for small sequences or for testing.
func (Seq) First ¶
First returns the first element from the sequence, if any. If the sequence is empty, it returns nil.
func (Seq) MarshalLogArray ¶
func (s Seq) MarshalLogArray(enc zapcore.ArrayEncoder) error
MarshalLogArray implements zapcore.ArrayMarshaler.
type SeqErrorFunc ¶
type SeqErrorFunc func() error
SeqErrorFunc is a function that returns an error that happened during iteration, if any.
var NoSeqError SeqErrorFunc = func() error { return nil }
NoSeqError is a SeqErrorFunc that always returns nil (no error).
func SeqError ¶
func SeqError(err error) SeqErrorFunc
SeqError returns a SeqErrorFunc that always returns the given error.
type SeqResult ¶
type SeqResult struct { Seq Seq Error SeqErrorFunc }
SeqResult represents the result of a function that returns a sequence. Error method most be called to check if an error occurred after processing the sequence. Error is reset at the beginning of each Seq call (iteration over the sequence).
func EmptySeqResult ¶
func EmptySeqResult() SeqResult
EmptySeqResult returns an empty sequence result.
func ErrorSeqResult ¶
ErrorSeqResult returns a sequence result with an empty sequence and an error.
func (SeqResult) Collect ¶
Collect returns all elements in the result's sequence as a slice. It may not be very efficient due to reallocations, and thus it should only be used for small sequences or for testing.
func (SeqResult) First ¶
First returns the first element from the result's sequence, if any. If the sequence is empty, it returns nil.
func (SeqResult) MarshalLogArray ¶
func (s SeqResult) MarshalLogArray(enc zapcore.ArrayEncoder) error
MarshalLogArray implements zapcore.ArrayMarshaler.
type SplitInfo ¶
type SplitInfo struct { // 2 parts of the range Parts [2]RangeInfo // Middle point between the ranges Middle KeyBytes }
SplitInfo contains information about range split in two.
type SyncMessage ¶
type SyncMessage interface { // Type returns the type of the message. Type() MessageType // X returns the beginning of the range. X() KeyBytes // Y returns the end of the range. Y() KeyBytes // Fingerprint returns the fingerprint of the range. Fingerprint() Fingerprint // Count returns the number of items in the range. Count() int // Keys returns the keys of the items in the range. Keys() []KeyBytes // Since returns the time since when the recent items are being sent. Since() time.Time // Sample returns the minhash sample of the items in the range. Sample() []MinhashSampleItem }
SyncMessage is a message that is a part of the sync protocol.