chotki

package module
v0.0.0-...-116b653 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 22, 2024 License: MIT Imports: 28 Imported by: 0

README

◌ Chotki: fast syncable store

godoc MIT License Build Status

Chotki is a syncable store with really fast counters. Internally, it is pebble db running CRDT natively, using the Replicated Data Interchange format (RDX). Chotki is sync-centric and causally consistent. That means, Chotki replicas can sync master-master, either incrementally in real-time or stay offline to diff-sync periodically. Chotki API is REST/object based, no SQL yet.

Use

Chotki's data model has three key entities:

  1. a class is a list of named fields,
  2. an object is an instance of a class,
  3. a field is a value of a Replicated Data Type (RDT).

Available RDTs include basic "last-write-wins" FIRST types:

  1. Float,
  2. Integer,
  3. Reference,
  4. String and
  5. Term.

There are also collections/ELM types:

  1. Eulerian (set) is an unordered collection of FIRST values,
  2. Linear (array) is an ordered collection of FIRST values, and
  3. Mapping is a collection of key-value pairs (both FIRST).

There are also counters/NZ types:

  1. N are "natural" increment-only uint64 counters and
  2. Z are signed int64 increment/decrement counters.

Objects can Reference each other thus forming object graphs. As an example, an object of class Team has a field Players which is a E-set of references to objects of class Player. Each object of class Player has an Integer field Number, a String field Name, and a counter NetWorth.

Chotki does not support any filter queries yet (of select ... where type). It can fetch an object by its ID or recursively by references (a reference is an ID).

REPL

Chotki REPL supports a number of commands to manage databases, network activities and the data itself. Each command takes an argument in the text version of RDX, which is very close to JSON, e.g.

    ◌ connect "replica:1234"
    ◌ cat b0b-28f 
    ◌ new {_ref:Player, Name: "Lionel Messi", Number: 10}

Each command returns an ID and/or an error.

Text RDX is different from JSON in several aspects: it has the ID type, arbitrary literals apart from true, false and null, also set collections and some other minor differences. On the human readability side, it is pretty much the same thing.

HTTP

Chotki HTTP interface is your typical REST API that accepts and delivers RDX or JSON.

Native API

Chotki aims at implementing protoc-like compilation step producing code for objects, including database store/load, RDX serialization/parsing, etc.

Replicas

The main superpower of Chotki is syncing. Replicas may work offline, reconnect and resync, or they may sync continuously in real time. Chotki so far only supports a spanning-tree overlay network topology. Each replica has a 20-bit number (aka source); a replica can only connect to replicas with lesser src number to prevent loops. E.g. a1ece can connect to b0b, but not the other way around (replica numbers are given in hex).

Implementations of client replicas working on mobile devices or in a browser are planned.

Microbenchmarks

Chotki is based on pebble db, which is an LSM database. A superpower of LSM is "blind writes", i.e. writes with no preceding read necessary. On a Lenovo Yoga laptop, a Chotki replica can do about 1mln blind increments of a counter in about 3 seconds single-thread, be it connected to other replicas or not:

    ◌ sinc {fid:b0b-6-2,count:10000000,ms:0}
    b0b-6-2
    inc storm: 10000000 incs complete for b0b-6-2, elapsed 1m4.506868865s, a1ece-5debd1..a1ece-f68251

Installation

go get -u github.com/drpcorg/chotki

Inner workings

Internally, Chotki is pebble db using RDX merge operators. See the RDX doc for further details on its serialization format (type-length-value) and a very predictable choice of CRDTs.

Comparison to other projects

Overall, RDX is the final version of RON (Replicated Object Notation), a research project that lasted from 2017 till 2022. One may check the first 2017 RON/RDX talk manifesting the project's goals. In that regard, we may compare RON/RDX to Automerge, which is a project of exactly the same age. Both projects started with a columnar-like coding of operations, which Automerge is using to this day, while RDX followed the Einstein's maxim: "Everything should be made as simple as possible, but not simpler". After spending quite some time to cram columnar-encoded CRDT into exising databases, RDX was greatly simplified and now all the RDX CRDT logic fits into a merge operator. That greatly improved the performance. Effectively, that simplicity allows to use a commodity LSM storage engine to natively store arbitrary CRDT.

We can also compare Chotki to a number of JavaScript-centric CRDT databases, such as RxDB, TinyBase or SyncedStore. Historically RON/RDX also has it roots in the JavaScript world. Swarm.js was likely the first CRDT sync lib in history (2013-2018); although it was distilled from the earlier Citrea project (2011-2012). Chotki/RDX has an objective of creating a production-ready scalable CRDT store, which JavaScript does not really allow. Still, we will be extremely happy if some of the JavaScript libs would consider supporting RDX as a unifying format. (Ping us any time!)

The original Chotki project summary

The project is a collaboration of dRPC.org and Victor "gritzko" Grishchenko. RDX types are based on the RON/RDX research project. The following is the original project summary:

The mission of the system is to keep and update real-time statistics, such as quotas, counters, billing and suchlike. Update propagation time is expected to be close to the theoretic minimum: the one-way delay. We expect to be able to process data at bulk transfer speeds (i.e. as much as we can move by the network we can process in real-time). It is preferred if the database is embedded into the application to minimize response times. The data should (preferably) fit in memory. Consistency guarantees are: causally consistent (strong EC). The data model is hierarchical, JSON like, with an emphasis on numeric values. There are no aggregations or queries on the data, only point lookups and subtree scans. The inner workings of the database is a combination of a self-orginizing overlay network and an LSM like storage engine. A node is not connected to every other node: the topology is closer to a spanning tree. That is to minimize network traffic. The API is object-handle based; the entire object is not raised into the memory; instead, once the user is reading a field, we do a lookup. That wav we minimize reads, de-serializations and GC busywork. Updates are lamport-timestamped, there is a re-sync protocol for newly joining and re-joining replicas. Overall, we take every shortcut to make the store lightweight and fast while focusing on our specific usecase (distributed counters, mainly).

Documentation

Index

Constants

View Source
const (
	IdNames    = id2 + 1
	IdNodes    = id2 + 2
	IdNodeInfo = id2 + 3

	// ID from which we count user static objects
	IdLog1 = id2 + 4
)
View Source
const LidLKeyLen = 1 + 8 + 1
View Source
const PingVal = "ping"
View Source
const PongVal = "pong"
View Source
const SyncBlockBits = 28
View Source
const SyncBlockMask = (rdx.ID(1) << SyncBlockBits) - 1
View Source
const TraceSize = 10
View Source
const YAckOff = uint64(2)

Variables

View Source
var (
	ErrDbClosed       = errors.New("chotki: db is closed")
	ErrDirnameIsFile  = errors.New("chotki: the dirname is file")
	ErrNotImplemented = errors.New("chotki: not implemented yet")
	ErrHookNotFound   = errors.New("chotki: hook not found")
	ErrBadIRecord     = errors.New("chotki: bad id-ref record")
	ErrBadORecord     = errors.New("chotki: bad id-ref record")
	ErrBadHPacket     = errors.New("chotki: bad handshake packet")
	ErrBadEPacket     = errors.New("chotki: bad E packet")
	ErrBadVPacket     = errors.New("chotki: bad V packet")
	ErrBadYPacket     = errors.New("chotki: bad Y packet")
	ErrBadLPacket     = errors.New("chotki: bad L packet")
	ErrBadTPacket     = errors.New("chotki: bad T packet")
	ErrBadOPacket     = errors.New("chotki: bad O packet")
	ErrSrcUnknown     = errors.New("chotki: source unknown")
	ErrSyncUnknown    = errors.New("chotki: sync session unknown")
	ErrBadRRecord     = errors.New("chotki: bad ref record")
	ErrClosed         = errors.New("chotki: no replica open")

	ErrBadTypeDescription  = errors.New("chotki: bad type description")
	ErrObjectUnknown       = errors.New("chotki: unknown object")
	ErrTypeUnknown         = errors.New("chotki: unknown object type")
	ErrUnknownFieldInAType = errors.New("chotki: unknown field for the type")
	ErrBadClass            = errors.New("chotki: bad class description")

	ErrOutOfOrder      = errors.New("chotki: order fail: sequence gap")
	ErrCausalityBroken = errors.New("chotki: order fail: refs an unknown op")
)
View Source
var ClassTemplate = `` /* 1146-byte string literal not displayed */

todo RDX formula

View Source
var ETemplate = `
func (o *{{Name}}) Get{{- Name}}() {
}

func (o *{{Name}}) Put{{- Name}}() {
}
`

todo collection description

View Source
var ErrCounterNotLoaded error = fmt.Errorf("counter not loaded")
View Source
var ErrNotCounter error = fmt.Errorf("not a counter")
View Source
var ErrOffsetOpId = errors.New("op id is offset")
View Source
var ErrWrongFieldType = errors.New("wrong field type")
View Source
var EventsBatchSize = prometheus.NewHistogram(prometheus.HistogramOpts{
	Namespace: "chotki",
	Name:      "batch_size",
	Buckets:   []float64{0, 1, 10, 50, 100, 500, 1000, 10000, 100000, 1000000},
})
View Source
var EventsMetric = prometheus.NewCounter(prometheus.CounterOpts{
	Namespace: "chotki",
	Name:      "packet_count",
})
View Source
var EventsOutboundMetric = prometheus.NewCounterVec(prometheus.CounterOpts{
	Namespace: "chotki",
	Name:      "outbound_packet_count",
}, []string{"name"})
View Source
var FIRSTnatives = map[byte]string{
	'F': "float64",
	'I': "int64",
	'R': "rdx.ID",
	'S': "string",
	'T': "string",
	'N': "uint64",
	'Z': "int64",
}
View Source
var Log0 = protocol.Records{

	protocol.Record('Y',
		protocol.Record('I', rdx.ID0.ZipBytes()),
		protocol.Record('R', rdx.ID0.ZipBytes()),
	),

	protocol.Record('C',
		protocol.Record('I', id1.ZipBytes()),
		protocol.Record('R', rdx.ID0.ZipBytes()),
		rdx.Atlv(rdx.LogT{
			"MNames",
		}),
	),

	protocol.Record('O',
		protocol.Record('I', id2.ZipBytes()),
		protocol.Record('R', id1.ZipBytes()),

		protocol.Record('M',
			protocol.Record('T', rdx.Ttlv("0")),
			protocol.Record('R', rdx.Rtlv(rdx.ID0)),
			protocol.Record('T', rdx.Ttlv("Global")),
			protocol.Record('R', rdx.Rtlv(id2)),
			protocol.Record('T', rdx.Ttlv("Names")),
			protocol.Record('R', rdx.Rtlv(IdNames)),
			protocol.Record('T', rdx.Ttlv("Nodes")),
			protocol.Record('R', rdx.Rtlv(IdNodes)),
		),
	),

	protocol.Record('C',
		protocol.Record('I', id3.ZipBytes()),
		protocol.Record('R', rdx.ID0.ZipBytes()),
		rdx.Atlv(rdx.LogT{
			"SName",
			"VAck",
			"SAddr",
		}),
	),
}
View Source
var VKey0 = []byte{'V', 0, 0, 0, 0, 0, 0, 0, 0, 'V'}

Functions

func EditTLV

func EditTLV(off uint64, rdt byte, tlv []byte) (edit []byte)

func Exists

func Exists(dirname string) (bool, error)

func FieldOffset

func FieldOffset(fields []string, name string) rdx.ID

func LastLit

func LastLit(recs protocol.Records) byte

func OKey

func OKey(id rdx.ID, rdt byte) (key []byte)

func OKeyIdRdt

func OKeyIdRdt(key []byte) (id rdx.ID, rdt byte)

func ObjectKeyRange

func ObjectKeyRange(oid rdx.ID) (fro, til []byte)

func ParsePacket

func ParsePacket(pack []byte) (lit byte, id, ref rdx.ID, body []byte, err error)

func VKey

func VKey(id rdx.ID) (key []byte)

func VKeyId

func VKeyId(key []byte) rdx.ID

Types

type AtomicCounter

type AtomicCounter struct {
	// contains filtered or unexported fields
}

func NewAtomicCounter

func NewAtomicCounter(db *Chotki, rid rdx.ID, offset uint64, updatePeriod time.Duration) *AtomicCounter

creates counter that has two properties

  • its atomic as long as you use single instance to do all increments, creating multiple instances will break this guarantee
  • it can ease CPU load if updatePeiod > 0, in that case it will not read from db backend current value of the counter

Because we use LSM backend writes are cheap, reads are expensive. You can trade off up to date value of counter for less CPU cycles

func (*AtomicCounter) Increment

func (a *AtomicCounter) Increment(ctx context.Context, val int64) (int64, error)

Loads (if needed) and increments counter

type CallHook

type CallHook struct {
	// contains filtered or unexported fields
}

type Chotki

type Chotki struct {
	// contains filtered or unexported fields
}

TLV all the way down

func Open

func Open(dirname string, opts Options) (*Chotki, error)

func (*Chotki) AddHook

func (cho *Chotki) AddHook(fid rdx.ID, hook Hook)

func (*Chotki) AddToMapSSField

func (cho *Chotki) AddToMapSSField(ctx context.Context, fid rdx.ID, changes rdx.MapSS) (id rdx.ID, err error)

func (*Chotki) AddToMapTRField

func (cho *Chotki) AddToMapTRField(ctx context.Context, fid rdx.ID, changes rdx.MapTR) (id rdx.ID, err error)

Adds/removes elements to/from a map (removed should map to nil)

func (*Chotki) AddToNField

func (cho *Chotki) AddToNField(ctx context.Context, fid rdx.ID, count uint64) (id rdx.ID, err error)

func (*Chotki) ApplyC

func (cho *Chotki) ApplyC(id, ref rdx.ID, body []byte, batch *pebble.Batch) (err error)

func (*Chotki) ApplyD

func (cho *Chotki) ApplyD(id, ref rdx.ID, body []byte, batch *pebble.Batch) (err error)

func (*Chotki) ApplyE

func (cho *Chotki) ApplyE(id, r rdx.ID, body []byte, batch *pebble.Batch, calls *[]CallHook) (err error)

func (*Chotki) ApplyH

func (cho *Chotki) ApplyH(id, ref rdx.ID, body []byte, batch *pebble.Batch) (err error)

func (*Chotki) ApplyOY

func (cho *Chotki) ApplyOY(lot byte, id, ref rdx.ID, body []byte, batch *pebble.Batch) (err error)

func (*Chotki) ApplyV

func (cho *Chotki) ApplyV(id, ref rdx.ID, body []byte, batch *pebble.Batch) (err error)

func (*Chotki) Broadcast

func (cho *Chotki) Broadcast(ctx context.Context, records protocol.Records, except string)

func (*Chotki) ClassFields

func (cho *Chotki) ClassFields(cid rdx.ID) (fields Fields, err error)

todo note that the class may change as the program runs; in such a case if the class fields are already cached, the current session will not understand the new fields!

func (*Chotki) Clock

func (cho *Chotki) Clock() rdx.Clock

func (*Chotki) Close

func (cho *Chotki) Close() error

func (*Chotki) CommitPacket

func (cho *Chotki) CommitPacket(ctx context.Context, lit byte, ref rdx.ID, body protocol.Records) (id rdx.ID, err error)

Here new packets are timestamped and queued for save

func (*Chotki) Connect

func (cho *Chotki) Connect(addr string) error

func (*Chotki) ConnectPool

func (cho *Chotki) ConnectPool(name string, addrs []string) error

func (*Chotki) Counter

func (cho *Chotki) Counter(rid rdx.ID, offset uint64, updatePeriod time.Duration) *AtomicCounter

func (*Chotki) Database

func (cho *Chotki) Database() *pebble.DB

func (*Chotki) Directory

func (cho *Chotki) Directory() string

func (*Chotki) Disconnect

func (cho *Chotki) Disconnect(addr string) error

func (*Chotki) Drain

func (cho *Chotki) Drain(ctx context.Context, recs protocol.Records) (err error)

func (*Chotki) DumpAll

func (cho *Chotki) DumpAll(writer io.Writer)

func (*Chotki) DumpObjects

func (cho *Chotki) DumpObjects(writer io.Writer)

func (*Chotki) DumpVV

func (cho *Chotki) DumpVV(writer io.Writer)

func (*Chotki) EditFieldTLV

func (cho *Chotki) EditFieldTLV(ctx context.Context, fid rdx.ID, delta []byte) (id rdx.ID, err error)

func (*Chotki) EditObject deprecated

func (cho *Chotki) EditObject(ctx context.Context, oid rdx.ID, fields ...string) (id rdx.ID, err error)

Deprecated: does not handle non-trivial cases

func (*Chotki) EditObjectRDX

func (cho *Chotki) EditObjectRDX(ctx context.Context, oid rdx.ID, pairs []rdx.RDX) (id rdx.ID, err error)

func (*Chotki) GetFieldTLV

func (cho *Chotki) GetFieldTLV(id rdx.ID) (rdt byte, tlv []byte)

func (*Chotki) IncNField

func (cho *Chotki) IncNField(ctx context.Context, fid rdx.ID) (id rdx.ID, err error)

func (*Chotki) KeepAlive

func (cho *Chotki) KeepAlive() error

func (*Chotki) KeepAliveLoop

func (cho *Chotki) KeepAliveLoop()

func (*Chotki) Last

func (cho *Chotki) Last() rdx.ID

func (*Chotki) Listen

func (cho *Chotki) Listen(addr string) error

func (*Chotki) MapSSField

func (cho *Chotki) MapSSField(fid rdx.ID) (themap rdx.MapSS, err error)

func (*Chotki) MapTRField

func (cho *Chotki) MapTRField(fid rdx.ID) (themap rdx.MapTR, err error)

func (*Chotki) Metrics

func (cho *Chotki) Metrics() []prometheus.Collector

func (*Chotki) NewClass

func (cho *Chotki) NewClass(ctx context.Context, parent rdx.ID, fields ...Field) (id rdx.ID, err error)

func (*Chotki) NewObject

func (cho *Chotki) NewObject(ctx context.Context, tid rdx.ID, fields ...string) (id rdx.ID, err error)

func (*Chotki) NewObjectTLV

func (cho *Chotki) NewObjectTLV(ctx context.Context, tid rdx.ID, fields protocol.Records) (id rdx.ID, err error)

Creates a new object from enveloped TLV fields; no class checks.

func (*Chotki) ObjectFieldTLV

func (cho *Chotki) ObjectFieldTLV(fid rdx.ID) (rdt byte, tlv []byte, err error)

ObjectFieldTLV picks one field fast. No class checks, etc.

func (*Chotki) ObjectFields

func (cho *Chotki) ObjectFields(oid rdx.ID) (tid rdx.ID, decl Fields, fact protocol.Records, err error)

func (*Chotki) ObjectFieldsByClass

func (cho *Chotki) ObjectFieldsByClass(oid rdx.ID, form []string) (tid rdx.ID, tlvs protocol.Records, err error)

func (*Chotki) ObjectFieldsTLV

func (cho *Chotki) ObjectFieldsTLV(oid rdx.ID) (tid rdx.ID, tlv protocol.Records, err error)

func (*Chotki) ObjectIterator

func (cho *Chotki) ObjectIterator(oid rdx.ID, snap *pebble.Snapshot) *pebble.Iterator

returns nil for "not found"

func (*Chotki) ObjectMapper

func (cho *Chotki) ObjectMapper() *ORM

func (*Chotki) ObjectRDTFieldTLV

func (cho *Chotki) ObjectRDTFieldTLV(fid rdx.ID, rdt byte) (tlv []byte, err error)

ObjectFieldTLV picks one field given its id and RDT.

func (*Chotki) ObjectString

func (cho *Chotki) ObjectString(oid rdx.ID) (txt string, err error)

func (*Chotki) ObjectVVField

func (cho *Chotki) ObjectVVField(fid rdx.ID) (vv rdx.VV, err error)

func (*Chotki) RemoveAllHooks

func (cho *Chotki) RemoveAllHooks(fid rdx.ID)

func (*Chotki) RemoveHook

func (cho *Chotki) RemoveHook(fid rdx.ID, hook Hook) (err error)

func (*Chotki) RestoreNet

func (cho *Chotki) RestoreNet() error

func (*Chotki) SetFieldTLV

func (cho *Chotki) SetFieldTLV(ctx context.Context, fid rdx.ID, tlve []byte) (id rdx.ID, err error)

func (*Chotki) SetMapSSField

func (cho *Chotki) SetMapSSField(ctx context.Context, fid rdx.ID, changes rdx.MapSS) (id rdx.ID, err error)

func (*Chotki) SetMapTRField

func (cho *Chotki) SetMapTRField(ctx context.Context, fid rdx.ID, changes rdx.MapTR) (id rdx.ID, err error)

func (*Chotki) Snapshot

func (cho *Chotki) Snapshot() pebble.Reader

func (*Chotki) Source

func (cho *Chotki) Source() uint64

ToyKV convention key, lit O, then O00000-00000000-000 id

func (*Chotki) Unlisten

func (cho *Chotki) Unlisten(addr string) error

func (*Chotki) UpdateVTree

func (cho *Chotki) UpdateVTree(id, ref rdx.ID, pb *pebble.Batch) (err error)

func (*Chotki) VersionVector

func (cho *Chotki) VersionVector() (vv rdx.VV, err error)

type Field

type Field struct {
	Offset     int64
	Name       string
	RdxType    byte
	RdxTypeExt []byte
}

A class contains a number of fields. Each Field has some RDT type. A class can inherit another class. New fields can be appended to a class, but never removed. Max number of fields is 128, max inheritance depth 32. When stored, a class is an append-only sequence of Ts. The syntax for each T: "XName", where X is the RDT. For the map types, can use "MSS_Name" or similar. Each field has an Offset. The Offset+RdxType pair is the *actual key* for the field in the database. Entries having identical Offset+RdxType are considered *renames*!

func (Field) Valid

func (f Field) Valid() bool

type Fields

type Fields []Field

Fields

func (Fields) FindName

func (f Fields) FindName(name string) (ndx int)

func (Fields) FindRdtOff

func (f Fields) FindRdtOff(rdx byte, off int64) int

func (Fields) MaxOffset

func (fs Fields) MaxOffset() (off int64)

type Hook

type Hook func(cho *Chotki, id rdx.ID) error

type Merger

type Merger interface {
	// merges values, sorted old to new
	Merge(inputs [][]byte) []byte
}

type NativeObject

type NativeObject interface {
	// Read data from an iterator
	Load(field uint64, rdt byte, tlv []byte) error
	// Compare to the stored state, serialize the changes
	Store(field uint64, rdt byte, old []byte, clock rdx.Clock) (changes []byte, err error)
}

type NetCollector

type NetCollector struct {
	// contains filtered or unexported fields
}

func NewNetCollector

func NewNetCollector(net *protocol.Net) *NetCollector

func (*NetCollector) Collect

func (n *NetCollector) Collect(m chan<- prometheus.Metric)

func (*NetCollector) Describe

func (n *NetCollector) Describe(d chan<- *prometheus.Desc)

type ORM

type ORM struct {
	Host *Chotki
	Snap *pebble.Snapshot
	// contains filtered or unexported fields
}

func NewORM

func NewORM(host *Chotki, snap *pebble.Snapshot) *ORM

func (*ORM) Clear

func (orm *ORM) Clear() error

Clear forgets all the objects loaded; all the unsaved changes discarded

func (*ORM) Close

func (orm *ORM) Close() error

func (*ORM) Compile

func (orm *ORM) Compile(name string, cid rdx.ID) (code string, err error)

func (*ORM) FindID

func (orm *ORM) FindID(obj NativeObject) rdx.ID

Find the ID of the registered object's.

func (*ORM) Load

func (orm *ORM) Load(id rdx.ID, blanc NativeObject) (obj NativeObject, err error)

Load the object's state from the db, register the object. If an object is already registered for that id, returns the old one. The new one is not used then.

func (*ORM) New

func (orm *ORM) New(ctx context.Context, cid rdx.ID, objs ...NativeObject) (err error)

New object of the same type get persisted and registered with the ORM

func (*ORM) Object

func (orm *ORM) Object(id rdx.ID) (obj NativeObject)

Find a registered object given its id. nil if none.

func (*ORM) Save

func (orm *ORM) Save(ctx context.Context, objs ...NativeObject) (err error)

Save the registered object's changes. Much faster than SaveALl() esp if you loaded many, modified few.

func (*ORM) SaveAll

func (orm *ORM) SaveAll(ctx context.Context) (err error)

SaveAll the changed fields; this will scan the objects and their database records.

func (*ORM) SyncAll

func (orm *ORM) SyncAll(ctx context.Context) (err error)

Saves all the changes, updates all the objects to the current db state.

func (*ORM) UpdateAll

func (orm *ORM) UpdateAll() (err error)

UpdateAll the registered objects to the new db state

func (*ORM) UpdateObject

func (orm *ORM) UpdateObject(obj NativeObject, snap *pebble.Snapshot) error

type Options

type Options struct {
	pebble.Options

	Src                        uint64
	Name                       string
	Log1                       protocol.Records
	MaxLogLen                  int64
	RelaxedOrder               bool
	Logger                     utils.Logger
	PingPeriod                 time.Duration
	PingWait                   time.Duration
	PebbleWriteOptions         *pebble.WriteOptions
	BroadcastBatchSize         int
	BroadcastTimeLimit         time.Duration
	ReadAccumTimeLimit         time.Duration
	ReadMaxBufferSize          int
	ReadMinBufferSizeToProcess int
	TcpReadBufferSize          int
	TcpWriteBufferSize         int

	TlsConfig *tls.Config
}

func (*Options) SetDefaults

func (o *Options) SetDefaults()

type PebbleMergeAdaptor

type PebbleMergeAdaptor struct {
	// contains filtered or unexported fields
}

func (*PebbleMergeAdaptor) Finish

func (a *PebbleMergeAdaptor) Finish(includesBase bool) (res []byte, cl io.Closer, err error)

func (*PebbleMergeAdaptor) MergeNewer

func (a *PebbleMergeAdaptor) MergeNewer(value []byte) error

func (*PebbleMergeAdaptor) MergeOlder

func (a *PebbleMergeAdaptor) MergeOlder(value []byte) error

type PingState

type PingState int
const (
	Inactive PingState = iota
	Ping
	Pong
	PingBroken
	WaitingForPing
)

type SyncHost

type SyncHost interface {
	protocol.Drainer
	Snapshot() pebble.Reader
	Broadcast(ctx context.Context, records protocol.Records, except string)
}

type SyncMode

type SyncMode byte
const (
	SyncRead   SyncMode = 1
	SyncWrite  SyncMode = 2
	SyncLive   SyncMode = 4
	SyncRW     SyncMode = SyncRead | SyncWrite
	SyncRL     SyncMode = SyncRead | SyncLive
	SyncRWLive SyncMode = SyncRead | SyncWrite | SyncLive
)

func ParseHandshake

func ParseHandshake(body []byte) (mode SyncMode, vv rdx.VV, trace_id []byte, err error)

func (*SyncMode) Unzip

func (m *SyncMode) Unzip(raw []byte) error

func (*SyncMode) Zip

func (m *SyncMode) Zip() []byte

type SyncState

type SyncState int
const (
	SendHandshake SyncState = iota
	SendDiff
	SendLive
	SendEOF
	SendNone
	SendPing
	SendPong
)

func (SyncState) String

func (s SyncState) String() string

type Syncer

type Syncer struct {
	Src           uint64
	Name          string
	Host          SyncHost
	Mode          SyncMode
	PingPeriod    time.Duration
	PingWait      time.Duration
	WaitUntilNone time.Duration
	// contains filtered or unexported fields
}

func (*Syncer) Close

func (sync *Syncer) Close() error

func (*Syncer) Drain

func (sync *Syncer) Drain(ctx context.Context, recs protocol.Records) (err error)

func (*Syncer) DrainHandshake

func (sync *Syncer) DrainHandshake(recs protocol.Records) (err error)

func (*Syncer) Feed

func (sync *Syncer) Feed(ctx context.Context) (recs protocol.Records, err error)

func (*Syncer) FeedBlockDiff

func (sync *Syncer) FeedBlockDiff() (diff protocol.Records, err error)

func (*Syncer) FeedDiffVV

func (sync *Syncer) FeedDiffVV() (vv protocol.Records, err error)

func (*Syncer) FeedHandshake

func (sync *Syncer) FeedHandshake() (vv protocol.Records, err error)

func (*Syncer) GetDrainState

func (sync *Syncer) GetDrainState() SyncState

func (*Syncer) GetFeedState

func (sync *Syncer) GetFeedState() SyncState

func (*Syncer) GetTraceId

func (sync *Syncer) GetTraceId() string

func (*Syncer) SetDrainState

func (sync *Syncer) SetDrainState(ctx context.Context, state SyncState)

func (*Syncer) SetFeedState

func (sync *Syncer) SetFeedState(ctx context.Context, state SyncState)

func (*Syncer) WaitDrainState

func (sync *Syncer) WaitDrainState(ctx context.Context, state SyncState) chan SyncState

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL