Documentation ¶
Overview ¶
Package syncbase defines common structures used in multiple datasync transports. The following reusable structures are defined:
- KeyValProtoWatcher maintains watch registrations/subscriptions.
- Registry of the latest revisions of values per each key, synchronized by datasync.
- Default implementation of Events & Iterators interfaces defined in data_api.go.
- Events & Iterators in this package are reused by some datasync transports.
Index ¶
- func AggregateDone(events []func(chan error), done chan error)
- type Adapter
- type Change
- type ChangeEvent
- func (ev *ChangeEvent) Done(err error)
- func (ev *ChangeEvent) GetChangeType() datasync.Op
- func (ev *ChangeEvent) GetKey() string
- func (ev *ChangeEvent) GetPrevValue(prevVal proto.Message) (prevExists bool, err error)
- func (ev *ChangeEvent) GetRevision() int64
- func (ev *ChangeEvent) GetValue(val proto.Message) (err error)
- type ChangeIterator
- type DoneCallback
- type DoneChannel
- type KVIterator
- type KeyVal
- type KeyValBytes
- type PrevRevisions
- func (r *PrevRevisions) Cleanup()
- func (r *PrevRevisions) Del(key string) (found bool, prev datasync.LazyValueWithRev)
- func (r *PrevRevisions) Get(key string) (found bool, value datasync.LazyValueWithRev)
- func (r *PrevRevisions) ListKeys() []string
- func (r *PrevRevisions) Put(key string, val datasync.LazyValue) (found bool, prev datasync.LazyValueWithRev, currRev int64)
- func (r *PrevRevisions) PutWithRevision(key string, inCurrent datasync.LazyValueWithRev) (found bool, prev datasync.LazyValueWithRev)
- type Registry
- func (adapter *Registry) LastRev() *PrevRevisions
- func (adapter *Registry) PropagateChanges(txData map[string]datasync.ChangeValue) error
- func (adapter *Registry) PropagateResync(txData map[string]datasync.ChangeValue) error
- func (adapter *Registry) Subscriptions() map[string]*Subscription
- func (adapter *Registry) Watch(resyncName string, changeChan chan datasync.ChangeEvent, ...) (datasync.WatchRegistration, error)
- type ResyncEventDB
- type Subscription
- type WatchDataReg
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AggregateDone ¶
AggregateDone can be reused to avoid repetitive code that triggers a slice of events and waits until it is finished.
Types ¶
type Adapter ¶
type Adapter struct { Watcher datasync.KeyValProtoWatcher Publisher datasync.KeyProtoValWriter }
Adapter implements datasync.TransportAdapter but allows the Watch/ Put functions to be optionally implemented.
func (*Adapter) Watch ¶
func (adapter *Adapter) Watch(resyncName string, changeChan chan datasync.ChangeEvent, resyncChan chan datasync.ResyncEvent, keyPrefixes ...string) (datasync.WatchRegistration, error)
Watch uses Kafka KeyValProtoWatcher Topic KeyValProtoWatcher.
type Change ¶
Change represents a single Key-value pair plus changeType.
func NewChangeBytes ¶
NewChangeBytes creates a new instance of NewChangeBytes.
func (*Change) GetChangeType ¶
GetChangeType returns type of the change.
type ChangeEvent ¶
type ChangeEvent struct { Key string ChangeType datasync.Op CurrVal datasync.LazyValue CurrRev int64 PrevVal datasync.LazyValue // contains filtered or unexported fields }
ChangeEvent is a simple structure that implements interface datasync.ChangeEvent.
func (*ChangeEvent) Done ¶
func (ev *ChangeEvent) Done(err error)
Done propagates call to delegate. If the delegate is nil, then the error is logged (if occurred).
func (*ChangeEvent) GetChangeType ¶
func (ev *ChangeEvent) GetChangeType() datasync.Op
GetChangeType returns type of the event.
func (*ChangeEvent) GetKey ¶
func (ev *ChangeEvent) GetKey() string
GetKey returns the Key associated with the change.
func (*ChangeEvent) GetPrevValue ¶
func (ev *ChangeEvent) GetPrevValue(prevVal proto.Message) (prevExists bool, err error)
GetPrevValue returns the value before change.
func (*ChangeEvent) GetRevision ¶
func (ev *ChangeEvent) GetRevision() int64
GetRevision - see the comments in the interface datasync.ChangeEvent
type ChangeIterator ¶
type ChangeIterator struct {
// contains filtered or unexported fields
}
ChangeIterator is a simple in-memory implementation of data.Iterator.
func NewChangeIterator ¶
func NewChangeIterator(data []*Change) *ChangeIterator
NewChangeIterator creates a new instance of ChangeIterator.
type DoneCallback ¶
type DoneCallback struct {
Callback func(error)
}
DoneCallback is a small reusable part that is embedded to other events using composition. It implements datasync.CallbackResult.
func (*DoneCallback) Done ¶
func (ev *DoneCallback) Done(err error)
Done propagates error to the callback.
type DoneChannel ¶
type DoneChannel struct {
DoneChan chan error
}
DoneChannel is a small reusable part that is embedded to other events using composition. It implements datasync.CallbackResult.
func NewDoneChannel ¶
func NewDoneChannel(doneChan chan error) *DoneChannel
NewDoneChannel creates a new instance of DoneChannel.
func (*DoneChannel) Done ¶
func (ev *DoneChannel) Done(err error)
Done propagates error to the channel.
type KVIterator ¶
type KVIterator struct {
// contains filtered or unexported fields
}
KVIterator is a simple in memory implementation of data.Iterator.
func NewKVIterator ¶
func NewKVIterator(data []datasync.KeyVal) *KVIterator
NewKVIterator creates a new instance of KVIterator.
type KeyVal ¶
KeyVal represents a single key-value pair.
func (*KeyVal) GetRevision ¶
GetRevision returns revision associated with the latest change in the key-value pair.
type KeyValBytes ¶
type KeyValBytes struct {
// contains filtered or unexported fields
}
KeyValBytes represents a single key-value pair.
func NewKeyValBytes ¶
func NewKeyValBytes(key string, value []byte, rev int64) *KeyValBytes
NewKeyValBytes creates a new instance of KeyValBytes.
func (*KeyValBytes) GetKey ¶
func (kv *KeyValBytes) GetKey() string
GetKey returns the key of the pair.
func (*KeyValBytes) GetRevision ¶
func (kv *KeyValBytes) GetRevision() int64
GetRevision returns revision associated with the latest change in the key-value pair.
type PrevRevisions ¶
PrevRevisions maintains the map of keys & values with revision.
func (*PrevRevisions) Cleanup ¶ added in v1.4.0
func (r *PrevRevisions) Cleanup()
Cleanup removes all data from the registry
func (*PrevRevisions) Del ¶
func (r *PrevRevisions) Del(key string) (found bool, prev datasync.LazyValueWithRev)
Del deletes the entry from revisions and returns previous value.
func (*PrevRevisions) Get ¶
func (r *PrevRevisions) Get(key string) (found bool, value datasync.LazyValueWithRev)
Get gets the last proto.Message with it's revision.
func (*PrevRevisions) ListKeys ¶
func (r *PrevRevisions) ListKeys() []string
ListKeys returns all stored keys.
func (*PrevRevisions) Put ¶
func (r *PrevRevisions) Put(key string, val datasync.LazyValue) ( found bool, prev datasync.LazyValueWithRev, currRev int64)
Put updates the entry in the revisions and returns previous value.
func (*PrevRevisions) PutWithRevision ¶
func (r *PrevRevisions) PutWithRevision(key string, inCurrent datasync.LazyValueWithRev) ( found bool, prev datasync.LazyValueWithRev)
PutWithRevision updates the entry in the revisions and returns previous value.
type Registry ¶ added in v1.0.3
type Registry struct {
// contains filtered or unexported fields
}
Registry of subscriptions and latest revisions. This structure contains extracted reusable code among various datasync implementations. Because of this code, datasync plugins does not need to repeat code related management of subscriptions.
func NewRegistry ¶ added in v1.0.3
func NewRegistry() *Registry
NewRegistry creates reusable registry of subscriptions for a particular datasync plugin.
func (*Registry) LastRev ¶ added in v1.0.3
func (adapter *Registry) LastRev() *PrevRevisions
LastRev is only a getter.
func (*Registry) PropagateChanges ¶ added in v1.0.3
func (adapter *Registry) PropagateChanges(txData map[string]datasync.ChangeValue) error
PropagateChanges fills registered channels with the data.
func (*Registry) PropagateResync ¶ added in v1.0.3
func (adapter *Registry) PropagateResync(txData map[string]datasync.ChangeValue) error
PropagateResync fills registered channels with the data.
func (*Registry) Subscriptions ¶ added in v1.0.3
func (adapter *Registry) Subscriptions() map[string]*Subscription
Subscriptions returns the current subscriptions.
func (*Registry) Watch ¶ added in v1.0.3
func (adapter *Registry) Watch(resyncName string, changeChan chan datasync.ChangeEvent, resyncChan chan datasync.ResyncEvent, keyPrefixes ...string) (datasync.WatchRegistration, error)
Watch only appends channels.
type ResyncEventDB ¶
type ResyncEventDB struct { *DoneChannel // contains filtered or unexported fields }
ResyncEventDB implements the interface datasync.ResyncEvent (see comments in there).
func NewResyncEvent ¶
func NewResyncEvent(m map[string][]datasync.KeyVal) *ResyncEventDB
NewResyncEvent creates a new instance of ResyncEventDB using the give map of slice of KeyVal.
func NewResyncEventDB ¶
func NewResyncEventDB(its map[string]datasync.KeyValIterator) *ResyncEventDB
NewResyncEventDB creates a new instance of ResyncEventDB using the given map of iterators.
func (*ResyncEventDB) GetValues ¶
func (ev *ResyncEventDB) GetValues() map[string]datasync.KeyValIterator
GetValues returns values of the event.
type Subscription ¶
type Subscription struct { ResyncName string ChangeChan chan datasync.ChangeEvent ResyncChan chan datasync.ResyncEvent CloseChan chan string KeyPrefixes []string }
Subscription represents single subscription for Registry.
type WatchDataReg ¶
type WatchDataReg struct { ResyncName string // contains filtered or unexported fields }
WatchDataReg implements interface datasync.WatchDataRegistration.
func (*WatchDataReg) Close ¶
func (reg *WatchDataReg) Close() error
Close stops watching of particular KeyPrefixes.
func (*WatchDataReg) Register ¶ added in v1.3.0
func (reg *WatchDataReg) Register(resyncName, keyPrefix string) error
Register starts watching of particular key prefix. Method returns error if key which should be added already exists
func (*WatchDataReg) Unregister ¶ added in v1.0.6
func (reg *WatchDataReg) Unregister(keyPrefix string) error
Unregister stops watching of particular key prefix. Method returns error if key which should be removed does not exist or in case the channel to close goroutine is nil