Documentation ¶
Overview ¶
Package datasync defines the datasync API, which abstracts the data transport between app plugins and backend data sources. Data sources may be data stores, or clients connected to a message bus, or remote clients connected to CN-Infra app. Transport may be, for example, HTTP or gRPC.
These events are processed asynchronously. With each event, the app plugin also receives a separate callback which is used to propagate any errors encountered during the event processing back to the user of the datasync package. Successfully finalized event processing is signaled by sending nil value (meaning no error) via the associated callback.
See the examples under the dedicated examples package.
Index ¶
- Constants
- type AggregatedRegistration
- type CallbackResult
- type ChangeEvent
- type ChangeValue
- type DelOption
- type DelOptionMarker
- type KVProtoWatchers
- type KVProtoWriters
- type KeyProtoValWriter
- type KeyVal
- type KeyValIterator
- type KeyValProtoWatcher
- type LazyValue
- type Op
- type ProtoWatchResp
- type PutOption
- type PutOptionMarker
- type ResyncEvent
- type WatchRegistration
- type WithChangeType
- type WithClientLifetimeTTLOpt
- type WithKey
- type WithPrefixOpt
- type WithPrevValue
- type WithRevision
- type WithTTLOpt
Constants ¶
const DefaultNotifTimeout = 2 * time.Second
DefaultNotifTimeout defines the default timeout for datasync notification delivery.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AggregatedRegistration ¶
type AggregatedRegistration struct {
Registrations []WatchRegistration
}
AggregatedRegistration is adapter that allows multiple registrations (WatchRegistration) to be aggregated in one. Close operation is applied collectively to all included registration.
func (*AggregatedRegistration) Close ¶
func (wa *AggregatedRegistration) Close() error
Close every registration under the aggregator. This function implements WatchRegistration.Close().
func (*AggregatedRegistration) Register ¶
func (wa *AggregatedRegistration) Register(resyncName, keyPrefix string) error
Register new key for all available aggregator objects. Call Register(keyPrefix) on specific registration to add the key from that registration only
func (*AggregatedRegistration) Unregister ¶
func (wa *AggregatedRegistration) Unregister(keyPrefix string) error
Unregister closed registration of specific key under all available aggregator objects. Call Unregister(keyPrefix) on specific registration to remove the key from that registration only
type CallbackResult ¶
type CallbackResult interface { // Done allows plugins that are processing data change/resync to send // feedback. If there was no error, Done(nil) needs to be called. Done(error) }
CallbackResult can be used by an event receiver to indicate to the event producer whether an operation was successful (error is nil) or unsuccessful (error is not nil).
type ChangeEvent ¶
type ChangeEvent interface { CallbackResult // GetContext returns the context associated with the event. GetContext() context.Context // GetChanges returns list of changes for this change event. GetChanges() []ProtoWatchResp }
ChangeEvent is used to define the data type for the change channel (<changeChan> from KeyValProtoWatcher.Watch). A data change event contains a key identifying where the change happened and two values for data stored under that key: the value *before* the change (previous value) and the value *after* the change (current value).
type ChangeValue ¶
type ChangeValue interface { WithChangeType KeyVal }
ChangeValue represents a single propagated change.
type DelOption ¶
type DelOption interface { // DelOptionMark is used to mark structures implementing DelOption // interface. DelOptionMark() }
DelOption defines options for Del operation. The available options can be found below.
type DelOptionMarker ¶
type DelOptionMarker struct{}
DelOptionMarker is meant for anonymous composition in With*Opt structs.
func (*DelOptionMarker) DelOptionMark ¶
func (marker *DelOptionMarker) DelOptionMark()
DelOptionMark is used only to mark structures implementing DelOption interface.
type KVProtoWatchers ¶
type KVProtoWatchers []KeyValProtoWatcher
KVProtoWatchers is an adapter that allows multiple watchers (KeyValProtoWatcher) to be aggregated in one. Watch request is delegated to all of them.
func (KVProtoWatchers) Watch ¶
func (ta KVProtoWatchers) Watch(resyncName string, changeChan chan ChangeEvent, resyncChan chan ResyncEvent, keyPrefixes ...string) (WatchRegistration, error)
Watch subscribes to every transport available within transport aggregator. The function implements KeyValProtoWatcher.Watch().
type KVProtoWriters ¶
type KVProtoWriters []KeyProtoValWriter
KVProtoWriters is an adapter that allows multiple writers (KeyProtoValWriter) in one. Put request is delegated to all of them.
type KeyProtoValWriter ¶
type KeyProtoValWriter interface { // Put <data> to ETCD or to any other key-value based data transport // (from other Agent Plugins) under the key <key>. // See options.go for a list of available options. Put(key string, data proto.Message, opts ...PutOption) error }
KeyProtoValWriter allows plugins to push their data changes to a data store.
type KeyVal ¶
type KeyVal interface { WithKey LazyValue WithRevision }
KeyVal represents a single key-value pair.
type KeyValIterator ¶
type KeyValIterator interface { // GetNext retrieves the next value from the iterator context. The retrieved // value is unmarshaled and returned as <kv>. The allReceived flag is // set to true on the last KeyVal pair in the context. GetNext() (kv KeyVal, allReceived bool) }
KeyValIterator is an iterator for KeyVal.
type KeyValProtoWatcher ¶
type KeyValProtoWatcher interface { // Watch using ETCD or any other data transport. // <resyncName> is used for the name of the RESYNC subcription. // <changeChan> channel is used for delivery of data CHANGE events. // <resyncChan> channel is used for delivery of data RESYNC events. // <keyPrefix> is a variable list of keys to watch on. Watch(resyncName string, changeChan chan ChangeEvent, resyncChan chan ResyncEvent, keyPrefixes ...string) (WatchRegistration, error) }
KeyValProtoWatcher is used by plugins to subscribe to both data change events and data resync events. Multiple keys can be specified and the caller will be subscribed to events on each key. See README.md for description of the Events.
type LazyValue ¶
type LazyValue interface { // GetValue gets the current value in the data change event. // The caller must provide an address of a proto message buffer // as <value>. // returns: // - error if value argument can not be properly filled. GetValue(value proto.Message) error }
LazyValue defines value that is unmarshaled into proto message on demand. The reason for defining interface with only one method is primarily to unify interfaces in this package.
type ProtoWatchResp ¶
type ProtoWatchResp interface { ChangeValue WithPrevValue }
ProtoWatchResp contains changed value.
type PutOption ¶
type PutOption interface { // PutOptionMark is used only to mark structures implementing PutOption // interface. PutOptionMark() }
PutOption defines options for Put operation. The available options can be found below.
type PutOptionMarker ¶
type PutOptionMarker struct{}
PutOptionMarker is meant for anonymous composition in With*Opt structs.
func (*PutOptionMarker) PutOptionMark ¶
func (marker *PutOptionMarker) PutOptionMark()
PutOptionMark is used only to mark structures implementing PutOption interface.
type ResyncEvent ¶
type ResyncEvent interface { CallbackResult // GetContext returns the context associated with the event. GetContext() context.Context // GetValues returns key-value pairs sorted by key prefixes // (<keyPrefix> variable list from KeyValProtoWatcher.Watch). GetValues() map[string]KeyValIterator }
ResyncEvent is used to define the data type for the resync channel (<resyncChan> from KeyValProtoWatcher.Watch).
type WatchRegistration ¶
type WatchRegistration interface { // Add <keyPrefix> to adapter subscription under specific <resyncName>. // If called on registration returned by composite watcher, register // <keyPrefix> to all adapters. Returns error if there is no subscription // with provided resync name or prefix already exists Register(resyncName string, keyPrefix string) error // Unregister <keyPrefix> from adapter subscription. If called on registration // returned by composite watcher, unregister <keyPrefix> from all adapters Unregister(keyPrefix string) error io.Closer }
WatchRegistration is a facade that avoids importing the io.Closer package into Agent plugin implementations.
type WithChangeType ¶
type WithChangeType interface {
GetChangeType() Op
}
WithChangeType is a simple helper interface embedded by all interfaces that require access to change type information. The intent is to ensure that the same method declaration is used in different interfaces (composition of interfaces).
type WithClientLifetimeTTLOpt ¶
type WithClientLifetimeTTLOpt struct {
PutOptionMarker
}
WithClientLifetimeTTLOpt defines option to Put a value for the lifetime of client. Once client is closed TTL is no longer renewed and value gets removed.
func WithClientLifetimeTTL ¶
func WithClientLifetimeTTL() *WithClientLifetimeTTLOpt
WithClientLifetimeTTL creates a new instance of ClientLifetimeTTL option
type WithKey ¶
type WithKey interface { // GetKey returns the key of the pair GetKey() string }
WithKey is a simple helper interface embedded by all interfaces that require access to the key of the key-value pair. The intent is to ensure that the same method declaration is used in different interfaces (composition of interfaces).
type WithPrefixOpt ¶
type WithPrefixOpt struct {
DelOptionMarker
}
WithPrefixOpt applies an operation to all items with the specified prefix.
func WithPrefix ¶
func WithPrefix() *WithPrefixOpt
WithPrefix creates a new instance of WithPrefixOpt.
type WithPrevValue ¶
type WithPrevValue interface { // GetPrevValue gets the previous value in the data change event. // The caller must provide an address of a proto message buffer // as <prevValue>. // returns: // - <prevValueExist> flag is set to 'true' if previous value does exist // - error if <prevValue> can not be properly filled GetPrevValue(prevValue proto.Message) (prevValueExist bool, err error) }
WithPrevValue is a simple helper interface embedded by all interfaces that require access to the previous value. The intent is to ensure that the same method declaration is used in different interfaces (composition of interfaces).
type WithRevision ¶
type WithRevision interface { // GetRevision gets revision of current value GetRevision() int64 }
WithRevision is a simple helper interface embedded by all interfaces that require access to the value revision. The intent is to ensure that the same method declaration is used in different interfaces (composition of interfaces).
type WithTTLOpt ¶
type WithTTLOpt struct { PutOptionMarker TTL time.Duration }
WithTTLOpt defines TTL for Put operation. Once TTL elapses, the associated data are removed from data store.
func WithTTL ¶
func WithTTL(TTL time.Duration) *WithTTLOpt
WithTTL creates a new instance of TTL option. Once TTL elapses, the associated data are removed. Beware: some implementation might be using TTL with lower precision.
Directories ¶
Path | Synopsis |
---|---|
Package grpcsync implements (in ALPHA VERSION) the gRPC client and server that satisfies the datasync API (see the definition of the service in ../syncbase/msg package).
|
Package grpcsync implements (in ALPHA VERSION) the gRPC client and server that satisfies the datasync API (see the definition of the service in ../syncbase/msg package). |
Package kvdbsync defines a key-value data store client API for unified access among key-value datastore servers.
|
Package kvdbsync defines a key-value data store client API for unified access among key-value datastore servers. |
local
Package local implements DB Transactions for the local "in memory" transport.
|
Package local implements DB Transactions for the local "in memory" transport. |
Package msgsync propagates protobuf messages to a particular topic.
|
Package msgsync propagates protobuf messages to a particular topic. |
Package restsync implements (in ALPHA VERSION) the datasync API for the HTTP/REST transport.
|
Package restsync implements (in ALPHA VERSION) the datasync API for the HTTP/REST transport. |
Package resync implements the mechanism to notify previously registered plugins that the resync procedure needs to start.
|
Package resync implements the mechanism to notify previously registered plugins that the resync procedure needs to start. |
Package syncbase defines common structures used in multiple datasync transports.
|
Package syncbase defines common structures used in multiple datasync transports. |
msg
Package msg contains: - the definition of PROTOBUF structures and gRPC service, - helpers for mapping between PROTOBUF structures & the datasync_api.go.
|
Package msg contains: - the definition of PROTOBUF structures and gRPC service, - helpers for mapping between PROTOBUF structures & the datasync_api.go. |