Documentation ¶
Index ¶
- Constants
- Variables
- func EditTLV(off uint64, rdt byte, tlv []byte) (edit []byte)
- func Exists(dirname string) (bool, error)
- func FieldOffset(fields []string, name string) rdx.ID
- func LastLit(recs protocol.Records) byte
- func OKey(id rdx.ID, rdt byte) (key []byte)
- func OKeyIdRdt(key []byte) (id rdx.ID, rdt byte)
- func ObjectKeyRange(oid rdx.ID) (fro, til []byte)
- func ParsePacket(pack []byte) (lit byte, id, ref rdx.ID, body []byte, err error)
- func VKey(id rdx.ID) (key []byte)
- func VKeyId(key []byte) rdx.ID
- type AtomicCounter
- type CallHook
- type Chotki
- func (cho *Chotki) AddHook(fid rdx.ID, hook Hook)
- func (cho *Chotki) AddToMapSSField(ctx context.Context, fid rdx.ID, changes rdx.MapSS) (id rdx.ID, err error)
- func (cho *Chotki) AddToMapTRField(ctx context.Context, fid rdx.ID, changes rdx.MapTR) (id rdx.ID, err error)
- func (cho *Chotki) AddToNField(ctx context.Context, fid rdx.ID, count uint64) (id rdx.ID, err error)
- func (cho *Chotki) ApplyC(id, ref rdx.ID, body []byte, batch *pebble.Batch) (err error)
- func (cho *Chotki) ApplyD(id, ref rdx.ID, body []byte, batch *pebble.Batch) (err error)
- func (cho *Chotki) ApplyE(id, r rdx.ID, body []byte, batch *pebble.Batch, calls *[]CallHook) (err error)
- func (cho *Chotki) ApplyH(id, ref rdx.ID, body []byte, batch *pebble.Batch) (err error)
- func (cho *Chotki) ApplyOY(lot byte, id, ref rdx.ID, body []byte, batch *pebble.Batch) (err error)
- func (cho *Chotki) ApplyV(id, ref rdx.ID, body []byte, batch *pebble.Batch) (err error)
- func (cho *Chotki) Broadcast(ctx context.Context, records protocol.Records, except string)
- func (cho *Chotki) ClassFields(cid rdx.ID) (fields Fields, err error)
- func (cho *Chotki) Clock() rdx.Clock
- func (cho *Chotki) Close() error
- func (cho *Chotki) CommitPacket(ctx context.Context, lit byte, ref rdx.ID, body protocol.Records) (id rdx.ID, err error)
- func (cho *Chotki) Connect(addr string) error
- func (cho *Chotki) ConnectPool(name string, addrs []string) error
- func (cho *Chotki) Counter(rid rdx.ID, offset uint64, updatePeriod time.Duration) *AtomicCounter
- func (cho *Chotki) Database() *pebble.DB
- func (cho *Chotki) Directory() string
- func (cho *Chotki) Disconnect(addr string) error
- func (cho *Chotki) Drain(ctx context.Context, recs protocol.Records) (err error)
- func (cho *Chotki) DumpAll(writer io.Writer)
- func (cho *Chotki) DumpObjects(writer io.Writer)
- func (cho *Chotki) DumpVV(writer io.Writer)
- func (cho *Chotki) EditFieldTLV(ctx context.Context, fid rdx.ID, delta []byte) (id rdx.ID, err error)
- func (cho *Chotki) EditObject(ctx context.Context, oid rdx.ID, fields ...string) (id rdx.ID, err error)deprecated
- func (cho *Chotki) EditObjectRDX(ctx context.Context, oid rdx.ID, pairs []rdx.RDX) (id rdx.ID, err error)
- func (cho *Chotki) GetFieldTLV(id rdx.ID) (rdt byte, tlv []byte)
- func (cho *Chotki) IncNField(ctx context.Context, fid rdx.ID) (id rdx.ID, err error)
- func (cho *Chotki) KeepAlive() error
- func (cho *Chotki) KeepAliveLoop()
- func (cho *Chotki) Last() rdx.ID
- func (cho *Chotki) Listen(addr string) error
- func (cho *Chotki) MapSSField(fid rdx.ID) (themap rdx.MapSS, err error)
- func (cho *Chotki) MapTRField(fid rdx.ID) (themap rdx.MapTR, err error)
- func (cho *Chotki) Metrics() []prometheus.Collector
- func (cho *Chotki) NewClass(ctx context.Context, parent rdx.ID, fields ...Field) (id rdx.ID, err error)
- func (cho *Chotki) NewObject(ctx context.Context, tid rdx.ID, fields ...string) (id rdx.ID, err error)
- func (cho *Chotki) NewObjectTLV(ctx context.Context, tid rdx.ID, fields protocol.Records) (id rdx.ID, err error)
- func (cho *Chotki) ObjectFieldTLV(fid rdx.ID) (rdt byte, tlv []byte, err error)
- func (cho *Chotki) ObjectFields(oid rdx.ID) (tid rdx.ID, decl Fields, fact protocol.Records, err error)
- func (cho *Chotki) ObjectFieldsByClass(oid rdx.ID, form []string) (tid rdx.ID, tlvs protocol.Records, err error)
- func (cho *Chotki) ObjectFieldsTLV(oid rdx.ID) (tid rdx.ID, tlv protocol.Records, err error)
- func (cho *Chotki) ObjectIterator(oid rdx.ID, snap *pebble.Snapshot) *pebble.Iterator
- func (cho *Chotki) ObjectMapper() *ORM
- func (cho *Chotki) ObjectRDTFieldTLV(fid rdx.ID, rdt byte) (tlv []byte, err error)
- func (cho *Chotki) ObjectString(oid rdx.ID) (txt string, err error)
- func (cho *Chotki) ObjectVVField(fid rdx.ID) (vv rdx.VV, err error)
- func (cho *Chotki) RemoveAllHooks(fid rdx.ID)
- func (cho *Chotki) RemoveHook(fid rdx.ID, hook Hook) (err error)
- func (cho *Chotki) RestoreNet() error
- func (cho *Chotki) SetFieldTLV(ctx context.Context, fid rdx.ID, tlve []byte) (id rdx.ID, err error)
- func (cho *Chotki) SetMapSSField(ctx context.Context, fid rdx.ID, changes rdx.MapSS) (id rdx.ID, err error)
- func (cho *Chotki) SetMapTRField(ctx context.Context, fid rdx.ID, changes rdx.MapTR) (id rdx.ID, err error)
- func (cho *Chotki) Snapshot() pebble.Reader
- func (cho *Chotki) Source() uint64
- func (cho *Chotki) Unlisten(addr string) error
- func (cho *Chotki) UpdateVTree(id, ref rdx.ID, pb *pebble.Batch) (err error)
- func (cho *Chotki) VersionVector() (vv rdx.VV, err error)
- type Field
- type Fields
- type Hook
- type Merger
- type NativeObject
- type NetCollector
- type ORM
- func (orm *ORM) Clear() error
- func (orm *ORM) Close() error
- func (orm *ORM) Compile(name string, cid rdx.ID) (code string, err error)
- func (orm *ORM) FindID(obj NativeObject) rdx.ID
- func (orm *ORM) Load(id rdx.ID, blanc NativeObject) (obj NativeObject, err error)
- func (orm *ORM) New(ctx context.Context, cid rdx.ID, objs ...NativeObject) (err error)
- func (orm *ORM) Object(id rdx.ID) (obj NativeObject)
- func (orm *ORM) Save(ctx context.Context, objs ...NativeObject) (err error)
- func (orm *ORM) SaveAll(ctx context.Context) (err error)
- func (orm *ORM) SyncAll(ctx context.Context) (err error)
- func (orm *ORM) UpdateAll() (err error)
- func (orm *ORM) UpdateObject(obj NativeObject, snap *pebble.Snapshot) error
- type Options
- type PebbleMergeAdaptor
- type PingState
- type SyncHost
- type SyncMode
- type SyncState
- type Syncer
- func (sync *Syncer) Close() error
- func (sync *Syncer) Drain(ctx context.Context, recs protocol.Records) (err error)
- func (sync *Syncer) DrainHandshake(recs protocol.Records) (err error)
- func (sync *Syncer) Feed(ctx context.Context) (recs protocol.Records, err error)
- func (sync *Syncer) FeedBlockDiff() (diff protocol.Records, err error)
- func (sync *Syncer) FeedDiffVV() (vv protocol.Records, err error)
- func (sync *Syncer) FeedHandshake() (vv protocol.Records, err error)
- func (sync *Syncer) GetDrainState() SyncState
- func (sync *Syncer) GetFeedState() SyncState
- func (sync *Syncer) GetTraceId() string
- func (sync *Syncer) SetDrainState(ctx context.Context, state SyncState)
- func (sync *Syncer) SetFeedState(ctx context.Context, state SyncState)
- func (sync *Syncer) WaitDrainState(ctx context.Context, state SyncState) chan SyncState
Constants ¶
const ( IdNames = id2 + 1 IdNodes = id2 + 2 IdNodeInfo = id2 + 3 // ID from which we count user static objects IdLog1 = id2 + 4 )
const LidLKeyLen = 1 + 8 + 1
const PingVal = "ping"
const PongVal = "pong"
const SyncBlockBits = 28
const SyncBlockMask = (rdx.ID(1) << SyncBlockBits) - 1
const TraceSize = 10
const YAckOff = uint64(2)
Variables ¶
var ( ErrDbClosed = errors.New("chotki: db is closed") ErrDirnameIsFile = errors.New("chotki: the dirname is file") ErrNotImplemented = errors.New("chotki: not implemented yet") ErrHookNotFound = errors.New("chotki: hook not found") ErrBadIRecord = errors.New("chotki: bad id-ref record") ErrBadORecord = errors.New("chotki: bad id-ref record") ErrBadHPacket = errors.New("chotki: bad handshake packet") ErrBadEPacket = errors.New("chotki: bad E packet") ErrBadVPacket = errors.New("chotki: bad V packet") ErrBadYPacket = errors.New("chotki: bad Y packet") ErrBadLPacket = errors.New("chotki: bad L packet") ErrBadTPacket = errors.New("chotki: bad T packet") ErrBadOPacket = errors.New("chotki: bad O packet") ErrSrcUnknown = errors.New("chotki: source unknown") ErrSyncUnknown = errors.New("chotki: sync session unknown") ErrBadRRecord = errors.New("chotki: bad ref record") ErrClosed = errors.New("chotki: no replica open") ErrBadTypeDescription = errors.New("chotki: bad type description") ErrObjectUnknown = errors.New("chotki: unknown object") ErrTypeUnknown = errors.New("chotki: unknown object type") ErrUnknownFieldInAType = errors.New("chotki: unknown field for the type") ErrBadClass = errors.New("chotki: bad class description") ErrOutOfOrder = errors.New("chotki: order fail: sequence gap") ErrCausalityBroken = errors.New("chotki: order fail: refs an unknown op") )
var ClassTemplate = `` /* 1146-byte string literal not displayed */
todo RDX formula
var ETemplate = `
func (o *{{Name}}) Get{{- Name}}() {
}
func (o *{{Name}}) Put{{- Name}}() {
}
`
todo collection description
var ErrCounterNotLoaded error = fmt.Errorf("counter not loaded")
var ErrNotCounter error = fmt.Errorf("not a counter")
var ErrOffsetOpId = errors.New("op id is offset")
var ErrWrongFieldType = errors.New("wrong field type")
var EventsBatchSize = prometheus.NewHistogram(prometheus.HistogramOpts{ Namespace: "chotki", Name: "batch_size", Buckets: []float64{0, 1, 10, 50, 100, 500, 1000, 10000, 100000, 1000000}, })
var EventsMetric = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "chotki",
Name: "packet_count",
})
var EventsOutboundMetric = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "chotki", Name: "outbound_packet_count", }, []string{"name"})
var FIRSTnatives = map[byte]string{
'F': "float64",
'I': "int64",
'R': "rdx.ID",
'S': "string",
'T': "string",
'N': "uint64",
'Z': "int64",
}
var Log0 = protocol.Records{ protocol.Record('Y', protocol.Record('I', rdx.ID0.ZipBytes()), protocol.Record('R', rdx.ID0.ZipBytes()), ), protocol.Record('C', protocol.Record('I', id1.ZipBytes()), protocol.Record('R', rdx.ID0.ZipBytes()), rdx.Atlv(rdx.LogT{ "MNames", }), ), protocol.Record('O', protocol.Record('I', id2.ZipBytes()), protocol.Record('R', id1.ZipBytes()), protocol.Record('M', protocol.Record('T', rdx.Ttlv("0")), protocol.Record('R', rdx.Rtlv(rdx.ID0)), protocol.Record('T', rdx.Ttlv("Global")), protocol.Record('R', rdx.Rtlv(id2)), protocol.Record('T', rdx.Ttlv("Names")), protocol.Record('R', rdx.Rtlv(IdNames)), protocol.Record('T', rdx.Ttlv("Nodes")), protocol.Record('R', rdx.Rtlv(IdNodes)), ), ), protocol.Record('C', protocol.Record('I', id3.ZipBytes()), protocol.Record('R', rdx.ID0.ZipBytes()), rdx.Atlv(rdx.LogT{ "SName", "VAck", "SAddr", }), ), }
var VKey0 = []byte{'V', 0, 0, 0, 0, 0, 0, 0, 0, 'V'}
Functions ¶
func ObjectKeyRange ¶
Types ¶
type AtomicCounter ¶
type AtomicCounter struct {
// contains filtered or unexported fields
}
func NewAtomicCounter ¶
func NewAtomicCounter(db *Chotki, rid rdx.ID, offset uint64, updatePeriod time.Duration) *AtomicCounter
creates counter that has two properties
- its atomic as long as you use single instance to do all increments, creating multiple instances will break this guarantee
- it can ease CPU load if updatePeiod > 0, in that case it will not read from db backend current value of the counter
Because we use LSM backend writes are cheap, reads are expensive. You can trade off up to date value of counter for less CPU cycles
type Chotki ¶
type Chotki struct {
// contains filtered or unexported fields
}
TLV all the way down
func (*Chotki) AddToMapSSField ¶
func (*Chotki) AddToMapTRField ¶
func (cho *Chotki) AddToMapTRField(ctx context.Context, fid rdx.ID, changes rdx.MapTR) (id rdx.ID, err error)
Adds/removes elements to/from a map (removed should map to nil)
func (*Chotki) AddToNField ¶
func (*Chotki) ClassFields ¶
todo note that the class may change as the program runs; in such a case if the class fields are already cached, the current session will not understand the new fields!
func (*Chotki) CommitPacket ¶
func (cho *Chotki) CommitPacket(ctx context.Context, lit byte, ref rdx.ID, body protocol.Records) (id rdx.ID, err error)
Here new packets are timestamped and queued for save
func (*Chotki) Disconnect ¶
func (*Chotki) DumpObjects ¶
func (*Chotki) EditFieldTLV ¶
func (*Chotki) EditObjectRDX ¶
func (*Chotki) KeepAliveLoop ¶
func (cho *Chotki) KeepAliveLoop()
func (*Chotki) Metrics ¶
func (cho *Chotki) Metrics() []prometheus.Collector
func (*Chotki) NewObjectTLV ¶
func (cho *Chotki) NewObjectTLV(ctx context.Context, tid rdx.ID, fields protocol.Records) (id rdx.ID, err error)
Creates a new object from enveloped TLV fields; no class checks.
func (*Chotki) ObjectFieldTLV ¶
ObjectFieldTLV picks one field fast. No class checks, etc.
func (*Chotki) ObjectFields ¶
func (*Chotki) ObjectFieldsByClass ¶
func (*Chotki) ObjectFieldsTLV ¶
func (*Chotki) ObjectIterator ¶
returns nil for "not found"
func (*Chotki) ObjectMapper ¶
func (*Chotki) ObjectRDTFieldTLV ¶
ObjectFieldTLV picks one field given its id and RDT.
func (*Chotki) RemoveAllHooks ¶
func (*Chotki) RestoreNet ¶
func (*Chotki) SetFieldTLV ¶
func (*Chotki) SetMapSSField ¶
func (*Chotki) SetMapTRField ¶
func (*Chotki) UpdateVTree ¶
type Field ¶
A class contains a number of fields. Each Field has some RDT type. A class can inherit another class. New fields can be appended to a class, but never removed. Max number of fields is 128, max inheritance depth 32. When stored, a class is an append-only sequence of Ts. The syntax for each T: "XName", where X is the RDT. For the map types, can use "MSS_Name" or similar. Each field has an Offset. The Offset+RdxType pair is the *actual key* for the field in the database. Entries having identical Offset+RdxType are considered *renames*!
type NativeObject ¶
type NetCollector ¶
type NetCollector struct {
// contains filtered or unexported fields
}
func NewNetCollector ¶
func NewNetCollector(net *protocol.Net) *NetCollector
func (*NetCollector) Collect ¶
func (n *NetCollector) Collect(m chan<- prometheus.Metric)
func (*NetCollector) Describe ¶
func (n *NetCollector) Describe(d chan<- *prometheus.Desc)
type ORM ¶
func (*ORM) FindID ¶
func (orm *ORM) FindID(obj NativeObject) rdx.ID
Find the ID of the registered object's.
func (*ORM) Load ¶
func (orm *ORM) Load(id rdx.ID, blanc NativeObject) (obj NativeObject, err error)
Load the object's state from the db, register the object. If an object is already registered for that id, returns the old one. The new one is not used then.
func (*ORM) Object ¶
func (orm *ORM) Object(id rdx.ID) (obj NativeObject)
Find a registered object given its id. nil if none.
func (*ORM) Save ¶
func (orm *ORM) Save(ctx context.Context, objs ...NativeObject) (err error)
Save the registered object's changes. Much faster than SaveALl() esp if you loaded many, modified few.
func (*ORM) SaveAll ¶
SaveAll the changed fields; this will scan the objects and their database records.
func (*ORM) UpdateObject ¶
func (orm *ORM) UpdateObject(obj NativeObject, snap *pebble.Snapshot) error
type Options ¶
type Options struct { pebble.Options Src uint64 Name string Log1 protocol.Records MaxLogLen int64 RelaxedOrder bool Logger utils.Logger PingPeriod time.Duration PingWait time.Duration PebbleWriteOptions *pebble.WriteOptions BroadcastBatchSize int BroadcastTimeLimit time.Duration ReadAccumTimeLimit time.Duration ReadMaxBufferSize int ReadMinBufferSizeToProcess int TcpReadBufferSize int TcpWriteBufferSize int TlsConfig *tls.Config }
func (*Options) SetDefaults ¶
func (o *Options) SetDefaults()
type PebbleMergeAdaptor ¶
type PebbleMergeAdaptor struct {
// contains filtered or unexported fields
}
func (*PebbleMergeAdaptor) MergeNewer ¶
func (a *PebbleMergeAdaptor) MergeNewer(value []byte) error
func (*PebbleMergeAdaptor) MergeOlder ¶
func (a *PebbleMergeAdaptor) MergeOlder(value []byte) error
type Syncer ¶
type Syncer struct { Src uint64 Name string Host SyncHost Mode SyncMode PingPeriod time.Duration PingWait time.Duration WaitUntilNone time.Duration // contains filtered or unexported fields }