Documentation ¶
Overview ¶
Package syncbase defines common structures used in multiple datasync transports. The following reusable structures are defined:
- KeyValProtoWatcher maintains watch registrations/subscriptions.
- Registry of the latest revisions of values per each key, synchronized by datasync.
- Default implementation of Events & Iterators interfaces defined in data_api.go.
- Events & Iterators in this package are reused by some datasync transports.
Index ¶
- func AggregateDone(events []func(chan error), done chan error)
- type Adapter
- type Change
- type ChangeEvent
- func (ev *ChangeEvent) Done(err error)
- func (ev *ChangeEvent) GetChangeType() datasync.PutDel
- func (ev *ChangeEvent) GetKey() string
- func (ev *ChangeEvent) GetPrevValue(prevVal proto.Message) (prevExists bool, err error)
- func (ev *ChangeEvent) GetRevision() int64
- func (ev *ChangeEvent) GetValue(val proto.Message) (err error)
- type ChangeIterator
- type DoneCallback
- type DoneChannel
- type KVIterator
- type KeyVal
- type KeyValBytes
- type PrevRevisions
- func (r *PrevRevisions) Del(key string) (found bool, prev datasync.LazyValueWithRev)
- func (r *PrevRevisions) Get(key string) (found bool, value datasync.LazyValueWithRev)
- func (r *PrevRevisions) ListKeys() []string
- func (r *PrevRevisions) Put(key string, val datasync.LazyValue) (found bool, prev datasync.LazyValueWithRev, currRev int64)
- func (r *PrevRevisions) PutWithRevision(key string, inCurrent datasync.LazyValueWithRev) (found bool, prev datasync.LazyValueWithRev)
- type Registry
- func (adapter *Registry) LastRev() *PrevRevisions
- func (adapter *Registry) PropagateChanges(txData map[string]datasync.ChangeValue) error
- func (adapter *Registry) PropagateResync(txData map[string]datasync.ChangeValue) error
- func (adapter *Registry) Subscriptions() map[string]*Subscription
- func (adapter *Registry) Watch(resyncName string, changeChan chan datasync.ChangeEvent, ...) (datasync.WatchRegistration, error)
- func (adapter *Registry) WatchDataBase(resyncName string, changeChan chan datasync.ChangeEvent, ...) (*WatchDataReg, error)
- type ResyncEventDB
- type Subscription
- type WatchDataReg
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AggregateDone ¶
AggregateDone can be reused to avoid repetitive code that triggers slice of events and waits until it is finished
Types ¶
type Adapter ¶
type Adapter struct { Watcher datasync.KeyValProtoWatcher Publisher datasync.KeyProtoValWriter }
Adapter implements datasync.TransportAdapter but allows optionally implement these Watch / Put
func (*Adapter) Watch ¶
func (adapter *Adapter) Watch(resyncName string, changeChan chan datasync.ChangeEvent, resyncChan chan datasync.ResyncEvent, keyPrefixes ...string) (datasync.WatchRegistration, error)
Watch using Kafka KeyValProtoWatcher Topic KeyValProtoWatcher
type Change ¶
Change represents a single Key-value pair plus changeType
func NewChangeBytes ¶
NewChangeBytes creates a new instance of NewChangeBytes.
func (*Change) GetChangeType ¶
GetChangeType returns type of the change.
type ChangeEvent ¶
type ChangeEvent struct { Key string ChangeType datasync.PutDel CurrVal datasync.LazyValue CurrRev int64 PrevVal datasync.LazyValue // contains filtered or unexported fields }
ChangeEvent is a simple structure that implements interface datasync.ChangeEvent
func (*ChangeEvent) Done ¶
func (ev *ChangeEvent) Done(err error)
Done val the call or logs err if there is no val & error occurred
func (*ChangeEvent) GetChangeType ¶
func (ev *ChangeEvent) GetChangeType() datasync.PutDel
GetChangeType returns type of the event.
func (*ChangeEvent) GetKey ¶
func (ev *ChangeEvent) GetKey() string
GetKey returns the Key associated with the change
func (*ChangeEvent) GetPrevValue ¶
func (ev *ChangeEvent) GetPrevValue(prevVal proto.Message) (prevExists bool, err error)
GetPrevValue returns the value before change
func (*ChangeEvent) GetRevision ¶
func (ev *ChangeEvent) GetRevision() int64
GetRevision - see the comments in the interface datasync.ChangeEvent
type ChangeIterator ¶
type ChangeIterator struct {
// contains filtered or unexported fields
}
ChangeIterator is a simple in-memory implementation of data.Iterator
func NewChangeIterator ¶
func NewChangeIterator(data []*Change) *ChangeIterator
NewChangeIterator creates a new instance of ChangeIterator.
type DoneCallback ¶
type DoneCallback struct {
Callback func(error)
}
DoneCallback is small reusable part that is embedded to other events using composition. It implements datasync.CallbackResult.
func (*DoneCallback) Done ¶
func (ev *DoneCallback) Done(err error)
Done propagates error to the callback
type DoneChannel ¶
type DoneChannel struct {
DoneChan chan error
}
DoneChannel is small reusable part that is embedded to other events using composition. It implements datasync.CallbackResult.
func NewDoneChannel ¶
func NewDoneChannel(doneChan chan error) *DoneChannel
NewDoneChannel creates a new instance of DoneChannel.
func (*DoneChannel) Done ¶
func (ev *DoneChannel) Done(err error)
Done propagates error to the channel.
type KVIterator ¶
type KVIterator struct {
// contains filtered or unexported fields
}
KVIterator is a simple in memory implementation of data.Iterator
func NewKVIterator ¶
func NewKVIterator(data []datasync.KeyVal) *KVIterator
NewKVIterator creates a new instance of KVIterator.
type KeyVal ¶
KeyVal represents a single Key-value pair
func (*KeyVal) GetRevision ¶
GetRevision returns revision associated with the latest change in the Key-value pair
type KeyValBytes ¶
type KeyValBytes struct {
// contains filtered or unexported fields
}
KeyValBytes represents a single Key-value pair
func NewKeyValBytes ¶
func NewKeyValBytes(key string, value []byte, rev int64) *KeyValBytes
NewKeyValBytes creates a new instance of KeyValBytes.
func (*KeyValBytes) GetKey ¶
func (kv *KeyValBytes) GetKey() string
GetKey returns the Key of the pair
func (*KeyValBytes) GetRevision ¶
func (kv *KeyValBytes) GetRevision() int64
GetRevision returns revision associated with the latest change in the Key-value pair
type PrevRevisions ¶
type PrevRevisions struct {
// contains filtered or unexported fields
}
PrevRevisions maintains the map of keys & values with revision
func (*PrevRevisions) Del ¶
func (r *PrevRevisions) Del(key string) (found bool, prev datasync.LazyValueWithRev)
Del deletes entry from revisions and returns previous value
func (*PrevRevisions) Get ¶
func (r *PrevRevisions) Get(key string) (found bool, value datasync.LazyValueWithRev)
Get gets last proto.Message with it's revision
func (*PrevRevisions) ListKeys ¶
func (r *PrevRevisions) ListKeys() []string
ListKeys returns all stored keys
func (*PrevRevisions) Put ¶
func (r *PrevRevisions) Put(key string, val datasync.LazyValue) ( found bool, prev datasync.LazyValueWithRev, currRev int64)
Put updates entry in the revisions and returns previous value
func (*PrevRevisions) PutWithRevision ¶
func (r *PrevRevisions) PutWithRevision(key string, inCurrent datasync.LazyValueWithRev) ( found bool, prev datasync.LazyValueWithRev)
PutWithRevision updates entry in the revisions and returns previous value
type Registry ¶ added in v1.0.3
type Registry struct {
// contains filtered or unexported fields
}
Registry of subscriptions and latest revisions. This structure contains extracted reusable code among various datasync implementations. By having this code datasync plugins do not need to repeat code related management of subscriptions.
func NewRegistry ¶ added in v1.0.3
func NewRegistry() *Registry
NewRegistry creates reusable registry of subscriptions for a particular datasync plugin.
func (*Registry) LastRev ¶ added in v1.0.3
func (adapter *Registry) LastRev() *PrevRevisions
LastRev is just getter
func (*Registry) PropagateChanges ¶ added in v1.0.3
func (adapter *Registry) PropagateChanges(txData map[string]datasync.ChangeValue) error
PropagateChanges fills registered channels with the data
func (*Registry) PropagateResync ¶ added in v1.0.3
func (adapter *Registry) PropagateResync(txData map[string]datasync.ChangeValue) error
PropagateResync fills registered channels with the data
func (*Registry) Subscriptions ¶ added in v1.0.3
func (adapter *Registry) Subscriptions() map[string]*Subscription
Subscriptions returns the current subscriptions.
func (*Registry) Watch ¶ added in v1.0.3
func (adapter *Registry) Watch(resyncName string, changeChan chan datasync.ChangeEvent, resyncChan chan datasync.ResyncEvent, keyPrefixes ...string) (datasync.WatchRegistration, error)
Watch just appends channels
func (*Registry) WatchDataBase ¶ added in v1.0.3
func (adapter *Registry) WatchDataBase(resyncName string, changeChan chan datasync.ChangeEvent, resyncChan chan datasync.ResyncEvent, keyPrefixes ...string) (*WatchDataReg, error)
WatchDataBase just appends channels
type ResyncEventDB ¶
type ResyncEventDB struct { *DoneChannel // contains filtered or unexported fields }
ResyncEventDB implements interface datasync.ResyncEvent (see comments in there)
func NewResyncEvent ¶
func NewResyncEvent(m map[string][]datasync.KeyVal) *ResyncEventDB
NewResyncEvent creates a new instance of ResyncEventDB using the give map of slice of KeyVal.
func NewResyncEventDB ¶
func NewResyncEventDB(its map[string]datasync.KeyValIterator) *ResyncEventDB
NewResyncEventDB creates a new instance of ResyncEventDB using the given map of iterators.
func (*ResyncEventDB) GetValues ¶
func (ev *ResyncEventDB) GetValues() map[string]datasync.KeyValIterator
GetValues returns values of the event.
type Subscription ¶
type Subscription struct { ResyncName string ChangeChan chan datasync.ChangeEvent ResyncChan chan datasync.ResyncEvent KeyPrefixes []string }
Subscription TODO
type WatchDataReg ¶
type WatchDataReg struct { ResyncName string CloseChan chan interface{} // contains filtered or unexported fields }
WatchDataReg implements interface datasync.WatchDataRegistration
func (*WatchDataReg) Close ¶
func (reg *WatchDataReg) Close() error
Close stops watching of particular KeyPrefixes.