datasync

package
v1.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 16, 2018 License: Apache-2.0 Imports: 4 Imported by: 333

README

Concept

Package datasync defines the interfaces for the abstraction datasync_api.go of a data synchronization between app plugins and different backend data sources (such as data stores, message buses, or RPC-connected clients).

In this context, data synchronization is about multiple data sets that need to be synchronized whenever a particular event is published. The event can be published by:

  • database (when particular data was changed);
  • by message bus (such as consuming messages from Kafka topics);
  • or by RPC clients (when GRPC or REST service call ).

The data synchronization APIs are centered around watching and publishing data change events. These events are processed asynchronously.

The data handled by one plugin can have references to the data of another plugin. Therefore, a proper time/order of data resynchronization between plugins needs to be maintained. The datasync plugin initiates a full data resync in the same order as the other plugins have been registered in Init().

Watch data API

Watch data API is used by app plugin (see the following diagram and the example) to:

  1. Subscribe channels for data changes using Watch(), while being abstracted from a particular message source (data store, message bus or RPC)
  2. Process full Data RESYNC (startup & fault recovery scenarios). Feedback is given back to the user of this API (e.g. successful configuration or an error) via callback.
  3. Process Incremental Data CHANGE. This is an optimized variant of RESYNC, where only the minimal set of changes (deltas) needed to get in-sync gets propagated to plugins. Again, feedback to the user of the API (e.g. successful configuration or an error) is returned via callback.

datasync

This APIs define two types of events that a plugin must be able to process:

  1. Full Data RESYNC (resynchronization) event is defined to trigger resynchronization of the whole configuration. This event is used after agent start/restart, or for a fault recovery scenario (e.g. when agent's connectivity to an external data source is lost and restored).
  2. Incremental Data CHANGE event is defined to trigger incremental processing of configuration changes. Each data change event contains both the previous and the new/current value. The Data synchronization is switched to this optimized mode only after successful Full Data RESYNC.

Publish data API

Publish data API is used by app plugins to asynchronously publish events with particular data change values and still remain abstracted from the target data store, message bus, local/RPC client.

datasync publish

Documentation

Overview

Package datasync defines the datasync API, which abstracts the data transport between app plugins and backend data sources. Data sources may be data stores, or clients connected to a message bus, or remote clients connected to CN-Infra app. Transport may be, for example, HTTP or gRPC.

These events are processed asynchronously. With each event, the app plugin also receives a separate callback which is used to propagate any errors encountered during the event processing back to the user of the datasync package. Successfully finalized event processing is signaled by sending nil value (meaning no error) via the associated callback.

See the examples under the dedicated examples package.

Index

Constants

View Source
const DefaultNotifTimeout = 2 * time.Second

DefaultNotifTimeout defines the default timeout for datasync notification delivery.

Variables

This section is empty.

Functions

This section is empty.

Types

type AggregatedRegistration

type AggregatedRegistration struct {
	Registrations []WatchRegistration
}

AggregatedRegistration is adapter that allows multiple registrations (WatchRegistration) to be aggregated in one. Close operation is applied collectively to all included registration.

func (*AggregatedRegistration) Close

func (wa *AggregatedRegistration) Close() error

Close every registration under the aggregator. This function implements WatchRegistration.Close().

func (*AggregatedRegistration) Register added in v1.3.0

func (wa *AggregatedRegistration) Register(resyncName, keyPrefix string) error

Register new key for all available aggregator objects. Call Register(keyPrefix) on specific registration to add the key from that registration only

func (*AggregatedRegistration) Unregister added in v1.0.6

func (wa *AggregatedRegistration) Unregister(keyPrefix string) error

Unregister closed registration of specific key under all available aggregator objects. Call Unregister(keyPrefix) on specific registration to remove the key from that registration only

type CallbackResult

type CallbackResult interface {
	// Done allows plugins that are processing data change/resync to send
	// feedback. If there was no error, Done(nil) needs to be called.
	// Use the noError=nil definition for better readability, for example:
	//     Done(noError).
	Done(error)
}

CallbackResult can be used by an event receiver to indicate to the event producer whether an operation was successful (error is nil) or unsuccessful (error is not nil).

type ChangeEvent

type ChangeEvent interface {
	CallbackResult

	ProtoWatchResp
}

ChangeEvent is used to define the data type for the change channel (<changeChan> from KeyValProtoWatcher.Watch). A data change event contains a key identifying where the change happened and two values for data stored under that key: the value *before* the change (previous value) and the value *after* the change (current value).

type ChangeValue

type ChangeValue interface {
	LazyValueWithRev
	WithChangeType
}

ChangeValue represents a single propagated change.

type CompositeKVProtoWatcher

type CompositeKVProtoWatcher struct {
	Adapters []KeyValProtoWatcher
}

CompositeKVProtoWatcher is an adapter that allows multiple watchers (KeyValProtoWatcher) to be aggregated in one. Watch request is delegated to all of them.

func (*CompositeKVProtoWatcher) Watch

func (ta *CompositeKVProtoWatcher) Watch(resyncName string, changeChan chan ChangeEvent,
	resyncChan chan ResyncEvent, keyPrefixes ...string) (WatchRegistration, error)

Watch subscribes to every transport available within transport aggregator. The function implements KeyValProtoWatcher.Watch().

type CompositeKVProtoWriter

type CompositeKVProtoWriter struct {
	Adapters []KeyProtoValWriter
}

CompositeKVProtoWriter is an adapter that allows multiple writers (KeyProtoValWriter) in one. Put request is delegated to all of them.

func (*CompositeKVProtoWriter) Put

func (ta *CompositeKVProtoWriter) Put(key string, data proto.Message, opts ...PutOption) error

Put writes data to all aggregated transports. This function implements KeyProtoValWriter.Put().

type DelOption

type DelOption interface {
	// DelOptionMark is used to mark structures implementing DelOption
	// interface.
	DelOptionMark()
}

DelOption defines options for Del operation. The available options can be found below.

type DelOptionMarker

type DelOptionMarker struct{}

DelOptionMarker is meant for anonymous composition in With*Opt structs.

func (*DelOptionMarker) DelOptionMark

func (marker *DelOptionMarker) DelOptionMark()

DelOptionMark is used only to mark structures implementing DelOption interface.

type KeyProtoValWriter

type KeyProtoValWriter interface {
	// Put <data> to ETCD or to any other key-value based data transport
	// (from other Agent Plugins) under the key <key>.
	// See options.go for a list of available options.
	Put(key string, data proto.Message, opts ...PutOption) error
}

KeyProtoValWriter allows plugins to push their data changes to a data store.

type KeyVal

type KeyVal interface {
	WithKey
	LazyValueWithRev
}

KeyVal represents a single key-value pair.

type KeyValIterator

type KeyValIterator interface {
	// GetNext retrieves the next value from the iterator context. The retrieved
	// value is unmarshaled and returned as <kv>. The allReceived flag is
	// set to true on the last KeyVal pair in the context.
	GetNext() (kv KeyVal, allReceived bool)
}

KeyValIterator is an iterator for KeyVal.

type KeyValProtoWatcher

type KeyValProtoWatcher interface {
	// Watch using ETCD or any other data transport.
	// <resyncName> is used for the name of the RESYNC subcription.
	// <changeChan> channel is used for delivery of data CHANGE events.
	// <resyncChan> channel is used for delivery of data RESYNC events.
	// <keyPrefix> is a variable list of keys to watch on.
	Watch(resyncName string, changeChan chan ChangeEvent, resyncChan chan ResyncEvent,
		keyPrefixes ...string) (WatchRegistration, error)
}

KeyValProtoWatcher is used by plugins to subscribe to both data change events and data resync events. Multiple keys can be specified and the caller will be subscribed to events on each key. See README.md for description of the Events.

type LazyValue

type LazyValue interface {
	// GetValue gets the current value in the data change event.
	// The caller must provide an address of a proto message buffer
	// as <value>.
	// returns:
	// - error if value argument can not be properly filled.
	GetValue(value proto.Message) error
}

LazyValue defines value that is unmarshaled into proto message on demand. The reason for defining interface with only one method is primarily to unify interfaces in this package.

type LazyValueWithRev

type LazyValueWithRev interface {
	LazyValue
	WithRevision
}

LazyValueWithRev defines value that is unmarshaled into proto message on demand with a revision.

type ProtoWatchResp

type ProtoWatchResp interface {
	ChangeValue
	WithKey
	WithPrevValue
}

ProtoWatchResp contains changed value.

type PutDel

type PutDel string

PutDel enumerates 'Put' (meaning 'Create' or 'Update') and 'Delete' operations.

const (
	// Put represents Create or Update operation.
	Put PutDel = "Put"
	// Delete operation
	Delete PutDel = "Delete"
)

type PutOption

type PutOption interface {
	// PutOptionMark is used only to mark structures implementing PutOption
	// interface.
	PutOptionMark()
}

PutOption defines options for Put operation. The available options can be found below.

type PutOptionMarker

type PutOptionMarker struct{}

PutOptionMarker is meant for anonymous composition in With*Opt structs.

func (*PutOptionMarker) PutOptionMark

func (marker *PutOptionMarker) PutOptionMark()

PutOptionMark is used only to mark structures implementing PutOption interface.

type ResyncEvent

type ResyncEvent interface {
	CallbackResult

	// GetValues returns key-value pairs sorted by key prefixes
	// (<keyPrefix> variable list from KeyValProtoWatcher.Watch).
	GetValues() map[string]KeyValIterator
}

ResyncEvent is used to define the data type for the resync channel (<resyncChan> from KeyValProtoWatcher.Watch).

type WatchRegistration

type WatchRegistration interface {
	// Add <keyPrefix> to adapter subscription under specific <resyncName>.
	// If called on registration returned by composite watcher, register
	// <keyPrefix> to all adapters. Returns error if there is no subscription
	// with provided resync name or prefix already exists
	Register(resyncName string, keyPrefix string) error

	// Unregister <keyPrefix> from adapter subscription. If called on registration
	// returned by composite watcher, unregister <keyPrefix> from all adapters
	Unregister(keyPrefix string) error

	io.Closer
}

WatchRegistration is a facade that avoids importing the io.Closer package into Agent plugin implementations.

type WithChangeType

type WithChangeType interface {
	GetChangeType() PutDel
}

WithChangeType is a simple helper interface embedded by all interfaces that require access to change type information. The intent is to ensure that the same method declaration is used in different interfaces (composition of interfaces).

type WithKey

type WithKey interface {
	// GetKey returns the key of the pair
	GetKey() string
}

WithKey is a simple helper interface embedded by all interfaces that require access to the key of the key-value pair. The intent is to ensure that the same method declaration is used in different interfaces (composition of interfaces).

type WithPrefixOpt

type WithPrefixOpt struct {
	DelOptionMarker
}

WithPrefixOpt applies an operation to all items with the specified prefix.

func WithPrefix

func WithPrefix() *WithPrefixOpt

WithPrefix creates a new instance of WithPrefixOpt.

type WithPrevValue

type WithPrevValue interface {
	// GetPrevValue gets the previous value in the data change event.
	// The caller must provide an address of a proto message buffer
	// as <prevValue>.
	// returns:
	// - <prevValueExist> flag is set to 'true' if previous value does exist
	// - error if <prevValue> can not be properly filled
	GetPrevValue(prevValue proto.Message) (prevValueExist bool, err error)
}

WithPrevValue is a simple helper interface embedded by all interfaces that require access to the previous value. The intent is to ensure that the same method declaration is used in different interfaces (composition of interfaces).

type WithRevision

type WithRevision interface {
	// GetRevision gets revision of current value
	GetRevision() (rev int64)
}

WithRevision is a simple helper interface embedded by all interfaces that require access to the value revision. The intent is to ensure that the same method declaration is used in different interfaces (composition of interfaces).

type WithTTLOpt

type WithTTLOpt struct {
	PutOptionMarker
	TTL time.Duration
}

WithTTLOpt defines TTL for Put operation. Once TTL elapses, the associated data are removed from data store.

func WithTTL

func WithTTL(TTL time.Duration) *WithTTLOpt

WithTTL creates a new instance of TTL option. Once TTL elapses, the associated data are removed. Beware: some implementation might be using TTL with lower precision.

Directories

Path Synopsis
Package grpcsync implements (in ALPHA VERSION) the gRPC client and server that satisfies the datasync API (see the definition of the service in ../syncbase/msg package).
Package grpcsync implements (in ALPHA VERSION) the gRPC client and server that satisfies the datasync API (see the definition of the service in ../syncbase/msg package).
Package kvdbsync defines a key-value data store client API for unified access among key-value datastore servers.
Package kvdbsync defines a key-value data store client API for unified access among key-value datastore servers.
local
Package local implements DB Transactions for the local "in memory" transport.
Package local implements DB Transactions for the local "in memory" transport.
Package msgsync propagates protobuf messages to a particular topic.
Package msgsync propagates protobuf messages to a particular topic.
Package restsync implements (in ALPHA VERSION) the datasync API for the HTTP/REST transport.
Package restsync implements (in ALPHA VERSION) the datasync API for the HTTP/REST transport.
Package resync implements the mechanism to notify previously registered plugins that the resync procedure needs to start.
Package resync implements the mechanism to notify previously registered plugins that the resync procedure needs to start.
Package syncbase defines common structures used in multiple datasync transports.
Package syncbase defines common structures used in multiple datasync transports.
msg
Package msg is a generated protocol buffer package.
Package msg is a generated protocol buffer package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL