Documentation ¶
Overview ¶
Package kvdbsync defines a key-value data store client API for unified access among key-value datastore servers. The datasync API contains the Data Broker & KeyValProtoWatcher APIs, which are only facades in front of different key-value or SQL stores.
A key-value store is used as a transport channel between a remote client and an agent (server). It stores data/configuration for multiple agents (servers). Therefore, a client only needs to know the address of the key-value store but does not need to know the addresses of individual agents. The client can write data/configuration independently of the agent's (server's) lifecycle.
The Data KeyValProtoWatcher is used during regular operation to efficiently propagate data/configuration changes from the key-value store to the agents (servers). Upon receiving a data change event, the watcher makes an incremental update to its data. When data resynchronization (RESYNC) is triggered, then the Data Broker is used to read all particular keys & values from the key-value store. Reading all particular keys & values is more reliable but less efficient data synchronization method.
Index ¶
- Variables
- type ChangeWatchResp
- type Deps
- type Iterator
- type Option
- type Plugin
- func (p *Plugin) AfterInit() error
- func (p *Plugin) Close() error
- func (p *Plugin) Delete(key string, opts ...datasync.DelOption) (existed bool, err error)
- func (p *Plugin) Init() error
- func (p *Plugin) Put(key string, data proto.Message, opts ...datasync.PutOption) error
- func (p *Plugin) Watch(resyncName string, changeChan chan datasync.ChangeEvent, ...) (datasync.WatchRegistration, error)
Constants ¶
This section is empty.
Variables ¶
var ( // ResyncAcceptTimeout defines timeout used for // sending resync event to registered watchers. ResyncAcceptTimeout = time.Second * 1 // ResyncDoneTimeout defines timeout used during // resync after which resync will return an error. ResyncDoneTimeout = time.Second * 5 )
var ( // ErrNotReady is an error returned when KVDBSync plugin is being used before the KVPlugin is ready. ErrNotReady = errors.New("transport adapter is not ready yet (probably called before AfterInit)") )
Functions ¶
This section is empty.
Types ¶
type ChangeWatchResp ¶
type ChangeWatchResp struct { *syncbase.DoneChannel // contains filtered or unexported fields }
ChangeWatchResp is a structure that adapts the BytesWatchResp to the datasync api.
func NewChangeWatchResp ¶
func NewChangeWatchResp(ctx context.Context, delegate datasync.ProtoWatchResp, prevVal datasync.LazyValue) *ChangeWatchResp
NewChangeWatchResp creates a new instance of ChangeWatchResp.
func (*ChangeWatchResp) GetChanges ¶
func (ev *ChangeWatchResp) GetChanges() []datasync.ProtoWatchResp
GetChanges returns list of changes for the change event.
func (*ChangeWatchResp) GetContext ¶
func (ev *ChangeWatchResp) GetContext() context.Context
GetContext returns the context associated with the event.
type Deps ¶
type Deps struct { infra.PluginName Log logging.PluginLogger KvPlugin keyval.KvProtoPlugin // inject ResyncOrch resync.Subscriber ServiceLabel servicelabel.ReaderAPI }
Deps groups dependencies injected into the plugin so that they are logically separated from other plugin fields.
type Iterator ¶
type Iterator struct {
// contains filtered or unexported fields
}
Iterator adapts the db_proto.KeyValIterator to the datasync.KeyValIterator.
func NewIterator ¶
func NewIterator(delegate keyval.ProtoKeyValIterator) *Iterator
NewIterator creates a new instance of Iterator.
type Option ¶
type Option func(*Plugin)
Option is a function that can be used in NewPlugin to customize Plugin.
func UseKV ¶
func UseKV(kv keyval.KvProtoPlugin) Option
UseKV returns Option that sets KvPlugin dependency.
type Plugin ¶
type Plugin struct { Deps // contains filtered or unexported fields }
Plugin dbsync implements synchronization between local memory and db. Other plugins can be notified when DB changes occur or resync is needed. This plugin reads/pulls the data from db when resync is needed.
func (*Plugin) AfterInit ¶
AfterInit uses provided connection to build new transport watcher.
Plugin.registry subscriptions (registered by Watch method) are used for resync. Resync is called only if ResyncOrch was injected (i.e. is not nil). The order of plugins in flavor is not important to resync since Watch() is called in Plugin.Init() and Resync.Register() is called in Plugin.AfterInit().
If provided connection is not ready (not connected), AfterInit starts new goroutine in order to 'wait' for the connection. After that, the new transport watcher is built as usual.
func (*Plugin) Delete ¶
Delete propagates this call to a particular kvdb.Plugin unless the kvdb.Plugin is Disabled().
This method is supposed to be called in Plugin.AfterInit() or later (even from different go routine).
func (*Plugin) Put ¶
Put propagates this call to a particular kvdb.Plugin unless the kvdb.Plugin is Disabled().
This method is supposed to be called in Plugin.AfterInit() or later (even from different go routine).
func (*Plugin) Watch ¶
func (p *Plugin) Watch(resyncName string, changeChan chan datasync.ChangeEvent, resyncChan chan datasync.ResyncEvent, keyPrefixes ...string) (datasync.WatchRegistration, error)
Watch adds entry to the plugin.registry. By doing this, other plugins will receive notifications about data changes and data resynchronization.
This method is supposed to be called in Plugin.Init(). Calling this method later than kvdbsync.Plugin.AfterInit() will have no effect (no notifications will be received).