Documentation ¶
Overview ¶
Package syncbase defines common structures used in multiple datasync transports. The following reusable structures are defined:
- KeyValProtoWatcher maintains watch registrations/subscriptions.
- Registry of latest revisions of values per each key synchronized by datasync.
- Default implementation of Events & Iterators interfaces defined in data_api.go.
- Events & Iterators in this package are reused by some but all datasync transports.
Index ¶
- func AggregateDone(events []func(chan error), done chan error)
- type Adapter
- type Change
- type ChangeEvent
- func (ev *ChangeEvent) Done(err error)
- func (ev *ChangeEvent) GetChangeType() datasync.PutDel
- func (ev *ChangeEvent) GetKey() string
- func (ev *ChangeEvent) GetPrevValue(prevVal proto.Message) (prevExists bool, err error)
- func (ev *ChangeEvent) GetRevision() int64
- func (ev *ChangeEvent) GetValue(val proto.Message) (err error)
- type ChangeIterator
- type DoneCallback
- type DoneChannel
- type KVIterator
- type KeyVal
- type KeyValBytes
- type PrevRevisions
- func (r *PrevRevisions) Del(key string) (found bool, prev datasync.LazyValueWithRev)
- func (r *PrevRevisions) Get(key string) (found bool, value datasync.LazyValueWithRev)
- func (r *PrevRevisions) ListKeys() []string
- func (r *PrevRevisions) Put(key string, val datasync.LazyValue) (found bool, prev datasync.LazyValueWithRev, currRev int64)
- func (r *PrevRevisions) PutWithRevision(key string, inCurrent datasync.LazyValueWithRev) (found bool, prev datasync.LazyValueWithRev)
- type ResyncEventDB
- type Subscription
- type WatchDataReg
- type Watcher
- func (adapter *Watcher) LastRev() *PrevRevisions
- func (adapter *Watcher) PropagateChanges(txData map[string]datasync.ChangeValue) error
- func (adapter *Watcher) PropagateResync(txData map[string]datasync.ChangeValue) error
- func (adapter *Watcher) Subscriptions() map[string]*Subscription
- func (adapter *Watcher) Watch(resyncName string, changeChan chan datasync.ChangeEvent, ...) (datasync.WatchRegistration, error)
- func (adapter *Watcher) WatchDataBase(resyncName string, changeChan chan datasync.ChangeEvent, ...) (*WatchDataReg, error)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AggregateDone ¶
AggregateDone can be reused to avoid repetitive code that triggers slice of events and waits until it is finished
Types ¶
type Adapter ¶
type Adapter struct { Watcher datasync.KeyValProtoWatcher Publisher datasync.KeyProtoValWriter }
Adapter implements datasync.TransportAdapter but allows optionally implement these Watch / Put
func (*Adapter) Watch ¶
func (adapter *Adapter) Watch(resyncName string, changeChan chan datasync.ChangeEvent, resyncChan chan datasync.ResyncEvent, keyPrefixes ...string) (datasync.WatchRegistration, error)
Watch using Kafka KeyValProtoWatcher Topic KeyValProtoWatcher
type Change ¶
Change represents a single Key-value pair plus changeType
func NewChangeBytes ¶
NewChangeBytes creates a new instance of NewChangeBytes.
func (*Change) GetChangeType ¶
GetChangeType returns type of the change.
type ChangeEvent ¶
type ChangeEvent struct { Key string ChangeType datasync.PutDel CurrVal datasync.LazyValue CurrRev int64 PrevVal datasync.LazyValue // contains filtered or unexported fields }
ChangeEvent is a simple structure that implements interface datasync.ChangeEvent
func (*ChangeEvent) Done ¶
func (ev *ChangeEvent) Done(err error)
Done val the call or logs err if there is no val & error occurred
func (*ChangeEvent) GetChangeType ¶
func (ev *ChangeEvent) GetChangeType() datasync.PutDel
GetChangeType returns type of the event.
func (*ChangeEvent) GetKey ¶
func (ev *ChangeEvent) GetKey() string
GetKey returns the Key associated with the change
func (*ChangeEvent) GetPrevValue ¶
func (ev *ChangeEvent) GetPrevValue(prevVal proto.Message) (prevExists bool, err error)
GetPrevValue returns the value before change
func (*ChangeEvent) GetRevision ¶
func (ev *ChangeEvent) GetRevision() int64
GetRevision - see the comments in the interface datasync.ChangeEvent
type ChangeIterator ¶
type ChangeIterator struct {
// contains filtered or unexported fields
}
ChangeIterator is a simple in-memory implementation of data.Iterator
func NewChangeIterator ¶
func NewChangeIterator(data []*Change) *ChangeIterator
NewChangeIterator creates a new instance of ChangeIterator.
type DoneCallback ¶
type DoneCallback struct {
Callback func(error)
}
DoneCallback is small reusable part that is embedded to other events using composition. It implements datasync.CallbackResult.
func (*DoneCallback) Done ¶
func (ev *DoneCallback) Done(err error)
Done propagates error to the callback
type DoneChannel ¶
type DoneChannel struct {
DoneChan chan error
}
DoneChannel is small reusable part that is embedded to other events using composition. It implements datasync.CallbackResult.
func NewDoneChannel ¶
func NewDoneChannel(doneChan chan error) *DoneChannel
NewDoneChannel creates a new instance of DoneChannel.
func (*DoneChannel) Done ¶
func (ev *DoneChannel) Done(err error)
Done propagates error to the channel.
type KVIterator ¶
type KVIterator struct {
// contains filtered or unexported fields
}
KVIterator is a simple in memory implementation of data.Iterator
func NewKVIterator ¶
func NewKVIterator(data []datasync.KeyVal) *KVIterator
NewKVIterator creates a new instance of KVIterator.
type KeyVal ¶
KeyVal represents a single Key-value pair
func (*KeyVal) GetRevision ¶
GetRevision returns revision associated with the latest change in the Key-value pair
type KeyValBytes ¶
type KeyValBytes struct {
// contains filtered or unexported fields
}
KeyValBytes represents a single Key-value pair
func NewKeyValBytes ¶
func NewKeyValBytes(key string, value []byte, rev int64) *KeyValBytes
NewKeyValBytes creates a new instance of KeyValBytes.
func (*KeyValBytes) GetKey ¶
func (kv *KeyValBytes) GetKey() string
GetKey returns the Key of the pair
func (*KeyValBytes) GetRevision ¶
func (kv *KeyValBytes) GetRevision() int64
GetRevision returns revision associated with the latest change in the Key-value pair
type PrevRevisions ¶
type PrevRevisions struct {
// contains filtered or unexported fields
}
PrevRevisions maintains the map of keys & values with revision
func (*PrevRevisions) Del ¶
func (r *PrevRevisions) Del(key string) (found bool, prev datasync.LazyValueWithRev)
Del deletes entry from revisions and returns previous value
func (*PrevRevisions) Get ¶
func (r *PrevRevisions) Get(key string) (found bool, value datasync.LazyValueWithRev)
Get gets last proto.Message with it's revision
func (*PrevRevisions) ListKeys ¶
func (r *PrevRevisions) ListKeys() []string
ListKeys returns all stored keys
func (*PrevRevisions) Put ¶
func (r *PrevRevisions) Put(key string, val datasync.LazyValue) ( found bool, prev datasync.LazyValueWithRev, currRev int64)
Put updates entry in the revisions and returns previous value
func (*PrevRevisions) PutWithRevision ¶
func (r *PrevRevisions) PutWithRevision(key string, inCurrent datasync.LazyValueWithRev) ( found bool, prev datasync.LazyValueWithRev)
PutWithRevision updates entry in the revisions and returns previous value
type ResyncEventDB ¶
type ResyncEventDB struct { *DoneChannel // contains filtered or unexported fields }
ResyncEventDB implements interface datasync.ResyncEvent (see comments in there)
func NewResyncEvent ¶
func NewResyncEvent(m map[string][]datasync.KeyVal) *ResyncEventDB
NewResyncEvent creates a new instance of ResyncEventDB using the give map of slice of KeyVal.
func NewResyncEventDB ¶
func NewResyncEventDB(its map[string]datasync.KeyValIterator) *ResyncEventDB
NewResyncEventDB creates a new instance of ResyncEventDB using the given map of iterators.
func (*ResyncEventDB) GetValues ¶
func (ev *ResyncEventDB) GetValues() map[string]datasync.KeyValIterator
GetValues returns values of the event.
type Subscription ¶
type Subscription struct { ResyncName string ChangeChan chan datasync.ChangeEvent ResyncChan chan datasync.ResyncEvent KeyPrefixes []string }
Subscription TODO
type WatchDataReg ¶
type WatchDataReg struct { ResyncName string CloseChan chan interface{} // contains filtered or unexported fields }
WatchDataReg implements interface datasync.WatchDataRegistration
func (*WatchDataReg) Close ¶
func (reg *WatchDataReg) Close() error
Close stops watching of particular KeyPrefixes.
type Watcher ¶
type Watcher struct {
// contains filtered or unexported fields
}
Watcher propagates events using channels.
func NewWatcher ¶
func NewWatcher() *Watcher
NewWatcher creates a new instance of KeyValProtoWatcher.
func (*Watcher) PropagateChanges ¶
func (adapter *Watcher) PropagateChanges(txData map[string]datasync.ChangeValue) error
PropagateChanges fills registered channels with the data
func (*Watcher) PropagateResync ¶
func (adapter *Watcher) PropagateResync(txData map[string]datasync.ChangeValue) error
PropagateResync fills registered channels with the data
func (*Watcher) Subscriptions ¶
func (adapter *Watcher) Subscriptions() map[string]*Subscription
Subscriptions returns the current subscriptions.
func (*Watcher) Watch ¶
func (adapter *Watcher) Watch(resyncName string, changeChan chan datasync.ChangeEvent, resyncChan chan datasync.ResyncEvent, keyPrefixes ...string) (datasync.WatchRegistration, error)
Watch just appends channels
func (*Watcher) WatchDataBase ¶
func (adapter *Watcher) WatchDataBase(resyncName string, changeChan chan datasync.ChangeEvent, resyncChan chan datasync.ResyncEvent, keyPrefixes ...string) (*WatchDataReg, error)
WatchDataBase just appends channels