Documentation
¶
Index ¶
- Constants
- Variables
- func EditTLV(off uint64, rdt byte, tlv []byte) (edit []byte)
- func Exists(dirname string) (bool, error)
- func FieldOffset(fields []string, name string) rdx.ID
- func LastLit(recs protocol.Records) byte
- func OKey(id rdx.ID, rdt byte) (key []byte)
- func OKeyIdRdt(key []byte) (id rdx.ID, rdt byte)
- func ObjectKeyRange(oid rdx.ID) (fro, til []byte)
- func ParsePacket(pack []byte) (lit byte, id, ref rdx.ID, body []byte, err error)
- func VKey(id rdx.ID) (key []byte)
- func VKeyId(key []byte) rdx.ID
- type AtomicCounter
- type CallHook
- type Chotki
- func (cho *Chotki) AddHook(fid rdx.ID, hook Hook)
- func (cho *Chotki) AddToMapSSField(ctx context.Context, fid rdx.ID, changes rdx.MapSS) (id rdx.ID, err error)
- func (cho *Chotki) AddToMapTRField(ctx context.Context, fid rdx.ID, changes rdx.MapTR) (id rdx.ID, err error)
- func (cho *Chotki) AddToNField(ctx context.Context, fid rdx.ID, count uint64) (id rdx.ID, err error)
- func (cho *Chotki) ApplyC(id, ref rdx.ID, body []byte, batch *pebble.Batch) (err error)
- func (cho *Chotki) ApplyD(id, ref rdx.ID, body []byte, batch *pebble.Batch) (err error)
- func (cho *Chotki) ApplyE(id, r rdx.ID, body []byte, batch *pebble.Batch, calls *[]CallHook) (err error)
- func (cho *Chotki) ApplyH(id, ref rdx.ID, body []byte, batch *pebble.Batch) (err error)
- func (cho *Chotki) ApplyOY(lot byte, id, ref rdx.ID, body []byte, batch *pebble.Batch) (err error)
- func (cho *Chotki) ApplyV(id, ref rdx.ID, body []byte, batch *pebble.Batch) (err error)
- func (cho *Chotki) Broadcast(ctx context.Context, records protocol.Records, except string)
- func (cho *Chotki) ClassFields(cid rdx.ID) (fields Fields, err error)
- func (cho *Chotki) Clock() rdx.Clock
- func (cho *Chotki) Close() error
- func (cho *Chotki) CommitPacket(ctx context.Context, lit byte, ref rdx.ID, body protocol.Records) (id rdx.ID, err error)
- func (cho *Chotki) Connect(addr string) error
- func (cho *Chotki) ConnectPool(name string, addrs []string) error
- func (cho *Chotki) Counter(rid rdx.ID, offset uint64, updatePeriod time.Duration) *AtomicCounter
- func (cho *Chotki) Database() *pebble.DB
- func (cho *Chotki) Directory() string
- func (cho *Chotki) Disconnect(addr string) error
- func (cho *Chotki) Drain(ctx context.Context, recs protocol.Records) (err error)
- func (cho *Chotki) DumpAll(writer io.Writer)
- func (cho *Chotki) DumpObjects(writer io.Writer)
- func (cho *Chotki) DumpVV(writer io.Writer)
- func (cho *Chotki) EditFieldTLV(ctx context.Context, fid rdx.ID, delta []byte) (id rdx.ID, err error)
- func (cho *Chotki) EditObject(ctx context.Context, oid rdx.ID, fields ...string) (id rdx.ID, err error)deprecated
- func (cho *Chotki) EditObjectRDX(ctx context.Context, oid rdx.ID, pairs []rdx.RDX) (id rdx.ID, err error)
- func (cho *Chotki) GetFieldTLV(id rdx.ID) (rdt byte, tlv []byte)
- func (cho *Chotki) IncNField(ctx context.Context, fid rdx.ID) (id rdx.ID, err error)
- func (cho *Chotki) KeepAlive() error
- func (cho *Chotki) KeepAliveLoop()
- func (cho *Chotki) Last() rdx.ID
- func (cho *Chotki) Listen(addr string) error
- func (cho *Chotki) MapSSField(fid rdx.ID) (themap rdx.MapSS, err error)
- func (cho *Chotki) MapTRField(fid rdx.ID) (themap rdx.MapTR, err error)
- func (cho *Chotki) Metrics() []prometheus.Collector
- func (cho *Chotki) NewClass(ctx context.Context, parent rdx.ID, fields ...Field) (id rdx.ID, err error)
- func (cho *Chotki) NewObject(ctx context.Context, tid rdx.ID, fields ...string) (id rdx.ID, err error)
- func (cho *Chotki) NewObjectTLV(ctx context.Context, tid rdx.ID, fields protocol.Records) (id rdx.ID, err error)
- func (cho *Chotki) ObjectFieldTLV(fid rdx.ID) (rdt byte, tlv []byte, err error)
- func (cho *Chotki) ObjectFields(oid rdx.ID) (tid rdx.ID, decl Fields, fact protocol.Records, err error)
- func (cho *Chotki) ObjectFieldsByClass(oid rdx.ID, form []string) (tid rdx.ID, tlvs protocol.Records, err error)
- func (cho *Chotki) ObjectFieldsTLV(oid rdx.ID) (tid rdx.ID, tlv protocol.Records, err error)
- func (cho *Chotki) ObjectIterator(oid rdx.ID, snap *pebble.Snapshot) *pebble.Iterator
- func (cho *Chotki) ObjectMapper() *ORM
- func (cho *Chotki) ObjectRDTFieldTLV(fid rdx.ID, rdt byte) (tlv []byte, err error)
- func (cho *Chotki) ObjectString(oid rdx.ID) (txt string, err error)
- func (cho *Chotki) ObjectVVField(fid rdx.ID) (vv rdx.VV, err error)
- func (cho *Chotki) RemoveAllHooks(fid rdx.ID)
- func (cho *Chotki) RemoveHook(fid rdx.ID, hook Hook) (err error)
- func (cho *Chotki) RestoreNet() error
- func (cho *Chotki) SetFieldTLV(ctx context.Context, fid rdx.ID, tlve []byte) (id rdx.ID, err error)
- func (cho *Chotki) SetMapSSField(ctx context.Context, fid rdx.ID, changes rdx.MapSS) (id rdx.ID, err error)
- func (cho *Chotki) SetMapTRField(ctx context.Context, fid rdx.ID, changes rdx.MapTR) (id rdx.ID, err error)
- func (cho *Chotki) Snapshot() pebble.Reader
- func (cho *Chotki) Source() uint64
- func (cho *Chotki) Unlisten(addr string) error
- func (cho *Chotki) UpdateVTree(id, ref rdx.ID, pb *pebble.Batch) (err error)
- func (cho *Chotki) VersionVector() (vv rdx.VV, err error)
- type ChotkiCollector
- type Field
- type Fields
- type Hook
- type Merger
- type NativeObject
- type NetCollector
- type ORM
- func (orm *ORM) Clear() error
- func (orm *ORM) Close() error
- func (orm *ORM) Compile(name string, cid rdx.ID) (code string, err error)
- func (orm *ORM) FindID(obj NativeObject) rdx.ID
- func (orm *ORM) Load(id rdx.ID, blanc NativeObject, skipFields ...uint64) (obj NativeObject, err error)
- func (orm *ORM) New(ctx context.Context, cid rdx.ID, objs ...NativeObject) (err error)
- func (orm *ORM) Object(id rdx.ID) NativeObject
- func (orm *ORM) Save(ctx context.Context, objs ...NativeObject) (err error)
- func (orm *ORM) SaveAll(ctx context.Context) (err error)
- func (orm *ORM) SyncAll(ctx context.Context) (err error)
- func (orm *ORM) UpdateAll() (err error)
- func (orm *ORM) UpdateObject(obj NativeObject, snap *pebble.Snapshot) error
- type Options
- type PebbleCollector
- type PebbleMergeAdaptor
- type PingState
- type SyncHost
- type SyncMode
- type SyncState
- type Syncer
- func (sync *Syncer) Close() error
- func (sync *Syncer) Drain(ctx context.Context, recs protocol.Records) (err error)
- func (sync *Syncer) DrainHandshake(recs protocol.Records) (err error)
- func (sync *Syncer) Feed(ctx context.Context) (recs protocol.Records, err error)
- func (sync *Syncer) FeedBlockDiff(ctx context.Context) (diff protocol.Records, err error)
- func (sync *Syncer) FeedDiffVV(ctx context.Context) (vv protocol.Records, err error)
- func (sync *Syncer) FeedHandshake() (vv protocol.Records, err error)
- func (sync *Syncer) GetDrainState() SyncState
- func (sync *Syncer) GetFeedState() SyncState
- func (sync *Syncer) GetTraceId() string
- func (sync *Syncer) SetDrainState(ctx context.Context, state SyncState)
- func (sync *Syncer) SetFeedState(ctx context.Context, state SyncState)
- func (sync *Syncer) WaitDrainState(ctx context.Context, state SyncState) chan SyncState
Constants ¶
const ( IdNames = id2 + 1 IdNodes = id2 + 2 IdNodeInfo = id2 + 3 // ID from which we count user static objects IdLog1 = id2 + 4 )
const LidLKeyLen = 1 + 8 + 1
const MaxParcelSize = 100_000_000
const PingVal = "ping"
const PongVal = "pong"
const SyncBlockBits = 28
const SyncBlockMask = (rdx.ID(1) << SyncBlockBits) - 1
const TraceSize = 10
const YAckOff = uint64(2)
Variables ¶
var ( ErrDbClosed = errors.New("chotki: db is closed") ErrDirnameIsFile = errors.New("chotki: the dirname is file") ErrNotImplemented = errors.New("chotki: not implemented yet") ErrHookNotFound = errors.New("chotki: hook not found") ErrBadIRecord = errors.New("chotki: bad id-ref record") ErrBadORecord = errors.New("chotki: bad id-ref record") ErrBadHPacket = errors.New("chotki: bad handshake packet") ErrBadEPacket = errors.New("chotki: bad E packet") ErrBadVPacket = errors.New("chotki: bad V packet") ErrBadYPacket = errors.New("chotki: bad Y packet") ErrBadLPacket = errors.New("chotki: bad L packet") ErrBadTPacket = errors.New("chotki: bad T packet") ErrBadOPacket = errors.New("chotki: bad O packet") ErrSrcUnknown = errors.New("chotki: source unknown") ErrSyncUnknown = errors.New("chotki: sync session unknown") ErrBadRRecord = errors.New("chotki: bad ref record") ErrClosed = errors.New("chotki: no replica open") ErrBadTypeDescription = errors.New("chotki: bad type description") ErrObjectUnknown = errors.New("chotki: unknown object") ErrTypeUnknown = errors.New("chotki: unknown object type") ErrUnknownFieldInAType = errors.New("chotki: unknown field for the type") ErrBadClass = errors.New("chotki: bad class description") ErrOutOfOrder = errors.New("chotki: order fail: sequence gap") ErrCausalityBroken = errors.New("chotki: order fail: refs an unknown op") )
var ClassTemplate = `` /* 1146-byte string literal not displayed */
todo RDX formula
var DrainTime = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "chotki", Name: "drain_time", Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10, 100, 500, 1000, 5000, 10000}, }, []string{"type"})
var ETemplate = `
func (o *{{Name}}) Get{{- Name}}() {
}
func (o *{{Name}}) Put{{- Name}}() {
}
`
todo collection description
var ErrCounterNotLoaded error = fmt.Errorf("counter not loaded")
var ErrDecrementN error = fmt.Errorf("decrementing natural counter")
var ErrNotCounter error = fmt.Errorf("not a counter")
var ErrOffsetOpId = errors.New("op id is offset")
var ErrWrongFieldType = errors.New("wrong field type")
var EventsBatchSize = prometheus.NewHistogram(prometheus.HistogramOpts{ Namespace: "chotki", Name: "batch_size", Buckets: []float64{0, 1, 10, 50, 100, 500, 1000, 10000, 100000, 1000000}, })
var EventsMetric = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "chotki",
Name: "packet_count",
})
var EventsOutboundMetric = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "chotki", Name: "outbound_packet_count", }, []string{"name"})
var FIRSTnatives = map[byte]string{
'F': "float64",
'I': "int64",
'R': "rdx.ID",
'S': "string",
'T': "string",
'N': "uint64",
'Z': "int64",
}
var Log0 = protocol.Records{ protocol.Record('Y', protocol.Record('I', rdx.ID0.ZipBytes()), protocol.Record('R', rdx.ID0.ZipBytes()), ), protocol.Record('C', protocol.Record('I', id1.ZipBytes()), protocol.Record('R', rdx.ID0.ZipBytes()), rdx.Atlv(rdx.LogT{ "MNames", }), ), protocol.Record('O', protocol.Record('I', id2.ZipBytes()), protocol.Record('R', id1.ZipBytes()), protocol.Record('M', protocol.Record('T', rdx.Ttlv("0")), protocol.Record('R', rdx.Rtlv(rdx.ID0)), protocol.Record('T', rdx.Ttlv("Global")), protocol.Record('R', rdx.Rtlv(id2)), protocol.Record('T', rdx.Ttlv("Names")), protocol.Record('R', rdx.Rtlv(IdNames)), protocol.Record('T', rdx.Ttlv("Nodes")), protocol.Record('R', rdx.Rtlv(IdNodes)), ), ), protocol.Record('C', protocol.Record('I', id3.ZipBytes()), protocol.Record('R', rdx.ID0.ZipBytes()), rdx.Atlv(rdx.LogT{ "SName", "VAck", "SAddr", }), ), }
var OpenedIterators = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "chotki", Subsystem: "sync", Name: "opened_iterators", }, []string{"id", "version"})
var OpenedSnapshots = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "chotki", Subsystem: "sync", Name: "opened_snapshots", }, []string{"id", "version"})
var SessionsStates = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "chotki", Subsystem: "sync", Name: "sessions", }, []string{"id", "kind", "version"})
var VKey0 = []byte{'V', 0, 0, 0, 0, 0, 0, 0, 0, 'V'}
Functions ¶
func ObjectKeyRange ¶
Types ¶
type AtomicCounter ¶
type AtomicCounter struct {
// contains filtered or unexported fields
}
func NewAtomicCounter ¶
func NewAtomicCounter(db *Chotki, rid rdx.ID, offset uint64, updatePeriod time.Duration) *AtomicCounter
creates counter that has two properties
- its atomic as long as you use single instance to do all increments, creating multiple instances will break this guarantee
- it can ease CPU load if updatePeiod > 0, in that case it will not read from db backend current value of the counter
Because we use LSM backend writes are cheap, reads are expensive. You can trade off up to date value of counter for less CPU cycles
type Chotki ¶
type Chotki struct {
// contains filtered or unexported fields
}
TLV all the way down
func (*Chotki) AddToMapSSField ¶
func (*Chotki) AddToMapTRField ¶
func (cho *Chotki) AddToMapTRField(ctx context.Context, fid rdx.ID, changes rdx.MapTR) (id rdx.ID, err error)
Adds/removes elements to/from a map (removed should map to nil)
func (*Chotki) AddToNField ¶
func (*Chotki) ClassFields ¶
todo note that the class may change as the program runs; in such a case if the class fields are already cached, the current session will not understand the new fields!
func (*Chotki) CommitPacket ¶
func (cho *Chotki) CommitPacket(ctx context.Context, lit byte, ref rdx.ID, body protocol.Records) (id rdx.ID, err error)
Here new packets are timestamped and queued for save
func (*Chotki) Disconnect ¶
func (*Chotki) DumpObjects ¶
func (*Chotki) EditFieldTLV ¶
func (*Chotki) EditObjectRDX ¶
func (*Chotki) KeepAliveLoop ¶
func (cho *Chotki) KeepAliveLoop()
func (*Chotki) Metrics ¶
func (cho *Chotki) Metrics() []prometheus.Collector
func (*Chotki) NewObjectTLV ¶
func (cho *Chotki) NewObjectTLV(ctx context.Context, tid rdx.ID, fields protocol.Records) (id rdx.ID, err error)
Creates a new object from enveloped TLV fields; no class checks.
func (*Chotki) ObjectFieldTLV ¶
ObjectFieldTLV picks one field fast. No class checks, etc.
func (*Chotki) ObjectFields ¶
func (*Chotki) ObjectFieldsByClass ¶
func (*Chotki) ObjectFieldsTLV ¶
func (*Chotki) ObjectIterator ¶
returns nil for "not found"
func (*Chotki) ObjectMapper ¶
func (*Chotki) ObjectRDTFieldTLV ¶
ObjectFieldTLV picks one field given its id and RDT.
func (*Chotki) RemoveAllHooks ¶
func (*Chotki) RestoreNet ¶
func (*Chotki) SetFieldTLV ¶
func (*Chotki) SetMapSSField ¶
func (*Chotki) SetMapTRField ¶
func (*Chotki) UpdateVTree ¶
type ChotkiCollector ¶
type ChotkiCollector struct {
// contains filtered or unexported fields
}
func NewChotkiCollector ¶
func NewChotkiCollector(chotki *Chotki) *ChotkiCollector
func (*ChotkiCollector) Collect ¶
func (n *ChotkiCollector) Collect(m chan<- prometheus.Metric)
func (*ChotkiCollector) Describe ¶
func (c *ChotkiCollector) Describe(d chan<- *prometheus.Desc)
type Field ¶
A class contains a number of fields. Each Field has some RDT type. A class can inherit another class. New fields can be appended to a class, but never removed. Max number of fields is 128, max inheritance depth 32. When stored, a class is an append-only sequence of Ts. The syntax for each T: "XName", where X is the RDT. For the map types, can use "MSS_Name" or similar. Each field has an Offset. The Offset+RdxType pair is the *actual key* for the field in the database. Entries having identical Offset+RdxType are considered *renames*!
type NativeObject ¶
type NetCollector ¶
type NetCollector struct {
// contains filtered or unexported fields
}
func NewNetCollector ¶
func NewNetCollector(net *protocol.Net) *NetCollector
func (*NetCollector) Collect ¶
func (n *NetCollector) Collect(m chan<- prometheus.Metric)
func (*NetCollector) Describe ¶
func (n *NetCollector) Describe(d chan<- *prometheus.Desc)
type ORM ¶
func (*ORM) FindID ¶
func (orm *ORM) FindID(obj NativeObject) rdx.ID
Find the ID of the registered object's.
func (*ORM) Load ¶
func (orm *ORM) Load(id rdx.ID, blanc NativeObject, skipFields ...uint64) (obj NativeObject, err error)
Load the object's state from the db, register the object. If an object is already registered for that id, returns the old one. The new one is not used then.
func (*ORM) Object ¶
func (orm *ORM) Object(id rdx.ID) NativeObject
Find a registered object given its id. nil if none.
func (*ORM) Save ¶
func (orm *ORM) Save(ctx context.Context, objs ...NativeObject) (err error)
Save the registered object's changes. Much faster than SaveALl() esp if you loaded many, modified few.
func (*ORM) SaveAll ¶
SaveAll the changed fields; this will scan the objects and their database records.
func (*ORM) UpdateObject ¶
func (orm *ORM) UpdateObject(obj NativeObject, snap *pebble.Snapshot) error
type Options ¶
type Options struct { pebble.Options Src uint64 Name string Log1 protocol.Records Logger utils.Logger PingPeriod time.Duration // how often should we ping neighbour replicae if its silent PingWait time.Duration // how much time we wait until pong received PebbleWriteOptions *pebble.WriteOptions BroadcastQueueMaxSize int // size in bytes, after reaching it all writes will block BroadcastQueueMinBatchSize int // reads will wait until they have enough data or timelimit expires // if this limit expires before read has enough data (BroadcastQueueMinBatchSize) it will return whatever it has, // writes will cause overflow error which will result in queue shutdown and session end BroadcastQueueTimeLimit time.Duration ReadAccumTimeLimit time.Duration // ReadMaxBufferSize int ReadMinBufferSizeToProcess int TcpReadBufferSize int TcpWriteBufferSize int WriteTimeout time.Duration TlsConfig *tls.Config }
func (*Options) SetDefaults ¶
func (o *Options) SetDefaults()
type PebbleCollector ¶
type PebbleCollector struct {
// contains filtered or unexported fields
}
func NewPebbleCollector ¶
func NewPebbleCollector(db *pebble.DB) *PebbleCollector
func (*PebbleCollector) Collect ¶
func (pc *PebbleCollector) Collect(ch chan<- prometheus.Metric)
func (*PebbleCollector) Describe ¶
func (pc *PebbleCollector) Describe(ch chan<- *prometheus.Desc)
type PebbleMergeAdaptor ¶
type PebbleMergeAdaptor struct {
// contains filtered or unexported fields
}
func (*PebbleMergeAdaptor) MergeNewer ¶
func (a *PebbleMergeAdaptor) MergeNewer(value []byte) error
func (*PebbleMergeAdaptor) MergeOlder ¶
func (a *PebbleMergeAdaptor) MergeOlder(value []byte) error
type Syncer ¶
type Syncer struct { Src uint64 Name string Host SyncHost Mode SyncMode PingPeriod time.Duration PingWait time.Duration WaitUntilNone time.Duration // contains filtered or unexported fields }