Documentation ¶
Overview ¶
Package syncbase defines common structures used in multiple datasync transports. The following reusable structures are defined:
- KeyValProtoWatcher maintains watch registrations/subscriptions.
- Registry of the latest revisions of values per each key, synchronized by datasync.
- Default implementation of Events & Iterators interfaces defined in data_api.go.
- Events & Iterators in this package are reused by some datasync transports.
Index ¶
- Variables
- func AggregateDone(events []func(chan error), done chan error)
- type Adapter
- type Change
- type ChangeEvent
- type ChangeResp
- type DoneCallback
- type DoneChannel
- type KVIterator
- type KeyVal
- type KeyValBytes
- type PrevRevisions
- func (r *PrevRevisions) Cleanup()
- func (r *PrevRevisions) Del(key string) (found bool, prev datasync.KeyVal)
- func (r *PrevRevisions) Get(key string) (found bool, value datasync.KeyVal)
- func (r *PrevRevisions) ListKeys() (ret []string)
- func (r *PrevRevisions) Put(key string, val datasync.LazyValue) (found bool, prev datasync.KeyVal, currRev int64)
- func (r *PrevRevisions) PutWithRevision(key string, inCurrent datasync.KeyVal) (found bool, prev datasync.KeyVal)
- type Registry
- func (adapter *Registry) LastRev() *PrevRevisions
- func (adapter *Registry) PropagateChanges(ctx context.Context, txData map[string]datasync.ChangeValue) error
- func (adapter *Registry) PropagateResync(ctx context.Context, txData map[string]datasync.ChangeValue) error
- func (adapter *Registry) Subscriptions() map[string]*Subscription
- func (adapter *Registry) Watch(resyncName string, changeChan chan datasync.ChangeEvent, ...) (datasync.WatchRegistration, error)
- type ResyncEventDB
- type Subscription
- type WatchDataReg
Constants ¶
This section is empty.
Variables ¶
var ( // PropagateChangesTimeout defines timeout used during // change propagation after which it will return an error. PropagateChangesTimeout = time.Second * 20 )
Functions ¶
func AggregateDone ¶
AggregateDone can be reused to avoid repetitive code that triggers a slice of events and waits until it is finished.
Types ¶
type Adapter ¶
type Adapter struct { Watcher datasync.KeyValProtoWatcher Publisher datasync.KeyProtoValWriter }
Adapter implements datasync.TransportAdapter but allows the Watch/ Put functions to be optionally implemented.
func (*Adapter) Watch ¶
func (adapter *Adapter) Watch(resyncName string, changeChan chan datasync.ChangeEvent, resyncChan chan datasync.ResyncEvent, keyPrefixes ...string) (datasync.WatchRegistration, error)
Watch uses Kafka KeyValProtoWatcher Topic KeyValProtoWatcher.
type Change ¶
Change represents a single Key-value pair plus changeType.
func NewChangeBytes ¶
NewChangeBytes creates a new instance of NewChangeBytes.
func (*Change) GetChangeType ¶
GetChangeType returns type of the change.
type ChangeEvent ¶
type ChangeEvent struct { Changes []datasync.ProtoWatchResp // contains filtered or unexported fields }
ChangeEvent is a simple structure that implements interface datasync.ChangeEvent.
func (*ChangeEvent) Done ¶
func (ev *ChangeEvent) Done(err error)
Done propagates call to delegate. If the delegate is nil, then the error is logged (if occurred).
func (*ChangeEvent) GetChanges ¶
func (ev *ChangeEvent) GetChanges() []datasync.ProtoWatchResp
GetChanges returns list of changes for the change event.
func (*ChangeEvent) GetContext ¶
func (ev *ChangeEvent) GetContext() context.Context
GetContext returns the context associated with the event.
type ChangeResp ¶
type ChangeResp struct { Key string ChangeType datasync.Op CurrVal datasync.LazyValue CurrRev int64 PrevVal datasync.LazyValue }
ChangeResp represents single change in the change event.
func (*ChangeResp) GetChangeType ¶
func (ev *ChangeResp) GetChangeType() datasync.Op
GetChangeType returns type of the event.
func (*ChangeResp) GetKey ¶
func (ev *ChangeResp) GetKey() string
GetKey returns the Key associated with the change.
func (*ChangeResp) GetPrevValue ¶
func (ev *ChangeResp) GetPrevValue(prevVal proto.Message) (prevExists bool, err error)
GetPrevValue returns the value before change.
func (*ChangeResp) GetRevision ¶
func (ev *ChangeResp) GetRevision() int64
GetRevision - see the comments in the interface datasync.ChangeEvent
type DoneCallback ¶
type DoneCallback struct {
Callback func(error)
}
DoneCallback is a small reusable part that is embedded to other events using composition. It implements datasync.CallbackResult.
func (*DoneCallback) Done ¶
func (ev *DoneCallback) Done(err error)
Done propagates error to the callback.
type DoneChannel ¶
type DoneChannel struct {
DoneChan chan error
}
DoneChannel is a small reusable part that is embedded to other events using composition. It implements datasync.CallbackResult.
func NewDoneChannel ¶
func NewDoneChannel(doneChan chan error) *DoneChannel
NewDoneChannel creates a new instance of DoneChannel.
func (*DoneChannel) Done ¶
func (ev *DoneChannel) Done(err error)
Done propagates error to the channel.
type KVIterator ¶
type KVIterator struct {
// contains filtered or unexported fields
}
KVIterator is a simple in memory implementation of data.Iterator.
func NewKVIterator ¶
func NewKVIterator(data []datasync.KeyVal) *KVIterator
NewKVIterator creates a new instance of KVIterator.
type KeyVal ¶
KeyVal represents a single key-value pair.
func (*KeyVal) GetRevision ¶
GetRevision returns revision associated with the latest change in the key-value pair.
type KeyValBytes ¶
type KeyValBytes struct {
// contains filtered or unexported fields
}
KeyValBytes represents a single key-value pair.
func NewKeyValBytes ¶
func NewKeyValBytes(key string, value []byte, rev int64) *KeyValBytes
NewKeyValBytes creates a new instance of KeyValBytes.
func (*KeyValBytes) GetKey ¶
func (kv *KeyValBytes) GetKey() string
GetKey returns the key of the pair.
func (*KeyValBytes) GetRevision ¶
func (kv *KeyValBytes) GetRevision() int64
GetRevision returns revision associated with the latest change in the key-value pair.
type PrevRevisions ¶
type PrevRevisions struct {
// contains filtered or unexported fields
}
PrevRevisions maintains the map of keys & values with revision.
func (*PrevRevisions) Cleanup ¶
func (r *PrevRevisions) Cleanup()
Cleanup removes all data from the registry
func (*PrevRevisions) Del ¶
func (r *PrevRevisions) Del(key string) (found bool, prev datasync.KeyVal)
Del deletes the entry from revisions and returns previous value.
func (*PrevRevisions) Get ¶
func (r *PrevRevisions) Get(key string) (found bool, value datasync.KeyVal)
Get gets the last proto.Message with it's revision.
func (*PrevRevisions) ListKeys ¶
func (r *PrevRevisions) ListKeys() (ret []string)
ListKeys returns all stored keys.
func (*PrevRevisions) Put ¶
func (r *PrevRevisions) Put(key string, val datasync.LazyValue) ( found bool, prev datasync.KeyVal, currRev int64)
Put updates the entry in the revisions and returns previous value.
func (*PrevRevisions) PutWithRevision ¶
func (r *PrevRevisions) PutWithRevision(key string, inCurrent datasync.KeyVal) ( found bool, prev datasync.KeyVal)
PutWithRevision updates the entry in the revisions and returns previous value.
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry of subscriptions and latest revisions. This structure contains extracted reusable code among various datasync implementations. Because of this code, datasync plugins does not need to repeat code related management of subscriptions.
func NewRegistry ¶
func NewRegistry() *Registry
NewRegistry creates reusable registry of subscriptions for a particular datasync plugin.
func (*Registry) LastRev ¶
func (adapter *Registry) LastRev() *PrevRevisions
LastRev is only a getter.
func (*Registry) PropagateChanges ¶
func (adapter *Registry) PropagateChanges(ctx context.Context, txData map[string]datasync.ChangeValue) error
PropagateChanges fills registered channels with the data.
func (*Registry) PropagateResync ¶
func (adapter *Registry) PropagateResync(ctx context.Context, txData map[string]datasync.ChangeValue) error
PropagateResync fills registered channels with the data.
func (*Registry) Subscriptions ¶
func (adapter *Registry) Subscriptions() map[string]*Subscription
Subscriptions returns the current subscriptions.
func (*Registry) Watch ¶
func (adapter *Registry) Watch(resyncName string, changeChan chan datasync.ChangeEvent, resyncChan chan datasync.ResyncEvent, keyPrefixes ...string) (datasync.WatchRegistration, error)
Watch only appends channels.
type ResyncEventDB ¶
type ResyncEventDB struct { *DoneChannel // contains filtered or unexported fields }
ResyncEventDB implements the interface datasync.ResyncEvent (see comments in there).
func NewResyncEventDB ¶
func NewResyncEventDB(ctx context.Context, its map[string]datasync.KeyValIterator) *ResyncEventDB
NewResyncEventDB creates a new instance of ResyncEventDB using the given map of iterators.
func (*ResyncEventDB) GetContext ¶
func (ev *ResyncEventDB) GetContext() context.Context
GetContext returns the context associated with the event.
func (*ResyncEventDB) GetValues ¶
func (ev *ResyncEventDB) GetValues() map[string]datasync.KeyValIterator
GetValues returns values of the event.
type Subscription ¶
type Subscription struct { ResyncName string ChangeChan chan datasync.ChangeEvent ResyncChan chan datasync.ResyncEvent CloseChan chan string KeyPrefixes []string }
Subscription represents single subscription for Registry.
type WatchDataReg ¶
type WatchDataReg struct { ResyncName string // contains filtered or unexported fields }
WatchDataReg implements interface datasync.WatchDataRegistration.
func (*WatchDataReg) Close ¶
func (reg *WatchDataReg) Close() error
Close stops watching of particular KeyPrefixes.
func (*WatchDataReg) Register ¶
func (reg *WatchDataReg) Register(resyncName, keyPrefix string) error
Register starts watching of particular key prefix. Method returns error if key which should be added already exists
func (*WatchDataReg) Unregister ¶
func (reg *WatchDataReg) Unregister(keyPrefix string) error
Unregister stops watching of particular key prefix. Method returns error if key which should be removed does not exist or in case the channel to close goroutine is nil
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package msg contains: - the definition of PROTOBUF structures and gRPC service, - helpers for mapping between PROTOBUF structures & the datasync_api.go.
|
Package msg contains: - the definition of PROTOBUF structures and gRPC service, - helpers for mapping between PROTOBUF structures & the datasync_api.go. |