datasync

package
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 8, 2017 License: Apache-2.0 Imports: 5 Imported by: 0

README

Concept

Package datasync defines the interfaces for the abstraction datasync_api.go of a data synchronization between app plugins and different backend data sources (such as data stores, message buses, or RPC-connected clients).

In this context, data synchronization is about multiple data sets that need to be synchronized whenever a particular event is published. The event can be published by:

  • database (when particular data was changed);
  • by message bus (such as consuming messages from Kafka topics);
  • or by RPC clients (when GRPC or REST service call ).

The data synchronization APIs are centered around watching and publishing data change events. These events are processed asynchronously.

The data handled by one plugin can have references to the data of another plugin. Therefore, proper time/order of data resynchronization between plugins needs to be maintained. The datasync plugin initiates full data resync in the same order as the other plugins have been registered in Init().

Watch data API

Watch data API is used by app plugin (see following diagram and the example) to:

  1. Subscribe channels for particular data changes Watch() while being abstracted from a particular message source (data store, message bus or RPC)
  2. Process Full Data RESYNC (startup, for certain fault recovery) event reprocess whole data set. Feedback is given to the user of this API (e.g. successful configuration or an error) by callback.
  3. Process Incremental Data CHANGE. It is Optimized mode that works only with a minimal set of changes (deltas). Again, feedback to the API's user (e.g. successful configuration or an error) is given by callback.

datasync

This APIs defines two types of events that a plugin must be able to process:

  1. Full Data RESYNC (resynchronization) event is defined to trigger resynchronization of the whole configuration. This event is used after agent start/restart, or for fault recovery (when agent's connectivity to an external data source is lost and restored).
  2. Incremental Data CHANGE event is defined to trigger incremental processing of configuration changes. Data change events are sent after the data resync has completed. Each data change event contains both the previous and the new/current values for the data. The Data synchronization is switched to optimized mode after successful Full Data RESYNC.

Publish data API

Publish data API is used by app plugins to asynchronously publish events with particular data change values and still abstract from data store, message bus, local/RPC client.

datasync publish

Documentation

Overview

Package datasync defines the datasync API, which abstracts the data transport between app plugins and backend data sources. Data sources can be data stores, clients connected to a message bus, or remote clients connected to a CN-Infra app. Transport can be, for example, HTTP or gRPC.

These events are processed asynchronously. The app plugin that watches data changes gives callback for each event (e.g. successful configuration or an error).

See the examples under the dedicated examples package.

Index

Constants

View Source
const DefaultNotifTimeout = 2 * time.Second

DefaultNotifTimeout for delivery of notification

Variables

This section is empty.

Functions

This section is empty.

Types

type AggregatedRegistration

type AggregatedRegistration struct {
	Registrations []WatchRegistration
}

AggregatedRegistration is cumulative adapter which contains all available transport types

func (*AggregatedRegistration) Close

func (wa *AggregatedRegistration) Close() error

Close every registration under watch aggregator

type CallbackResult

type CallbackResult interface {
	// Done allows plugins that are processing data change/resync to send feedback
	// If there was no error the Done(nil) needs to be called. Use the noError=nil
	// definition for better readability, for example:
	//     Done(noError).
	Done(error)
}

CallbackResult can be used by an event receiver to indicate to the event producer whether an operation was successful (error is nil) or unsuccessful (error is not nil)

DoneMethod is reused later. There are at least two implementations DoneChannel, DoneCallback

type ChangeEvent

type ChangeEvent interface {
	CallbackResult

	ProtoWatchResp
}

ChangeEvent is used as the data type for the change channel (see the VPP Standard Plugins API). A data change event contains a key identifying where the change happened and two values for data stored under that key: the value *before* the change (previous value) and the value *after* the change (current value).

type ChangeValue

type ChangeValue interface {
	LazyValueWithRev
	WithChangeType
}

ChangeValue represents single propagated change.

type CompositeKVProtoWatcher

type CompositeKVProtoWatcher struct {
	Adapters []KeyValProtoWatcher
}

CompositeKVProtoWatcher is a slice of watchers

func (*CompositeKVProtoWatcher) Watch

func (ta *CompositeKVProtoWatcher) Watch(resyncName string, changeChan chan ChangeEvent, resyncChan chan ResyncEvent,
	keyPrefixes ...string) (WatchRegistration, error)

Watch subscribes to every transport available within transport aggregator

type CompositeKVProtoWriter

type CompositeKVProtoWriter struct {
	Adapters []KeyProtoValWriter
}

CompositeKVProtoWriter is cumulative adapter which contains all available transport types

func (*CompositeKVProtoWriter) Put

func (ta *CompositeKVProtoWriter) Put(key string, data proto.Message, opts ...PutOption) error

Put to every available transport

type DelOption

type DelOption interface {
	//DelOptionMark is just for marking implementation that it implements this interface
	DelOptionMark()
}

DelOption defines options for Del operation. The particular options can be found below.

type DelOptionMarker

type DelOptionMarker struct{}

DelOptionMarker is meant for anonymous composition in With*Opt structs

func (*DelOptionMarker) DelOptionMark

func (marker *DelOptionMarker) DelOptionMark()

DelOptionMark is just for marking implementation that it implements this interface

type KeyProtoValWriter

type KeyProtoValWriter interface {
	// Put to ETCD or any other data transport (from other Agent Plugins)
	Put(key string, data proto.Message, opts ...PutOption) error
}

KeyProtoValWriter allows plugins to push their data changes to a data store.

type KeyVal

type KeyVal interface {
	WithKey
	LazyValueWithRev
}

KeyVal represents a single key-value pair

type KeyValIterator

type KeyValIterator interface {
	// GetNext retrieves the next value from the iterator context.  The retrieved
	// value is unmarshaled into the provided argument. The allReceived flag is
	// set to true on the last KeyVal pair in the context.
	GetNext() (kv KeyVal, allReceived bool)
}

KeyValIterator is an iterator for KeyVals

type KeyValProtoWatcher

type KeyValProtoWatcher interface {
	// Watch using ETCD or any other data transport
	Watch(resyncName string, changeChan chan ChangeEvent, resyncChan chan ResyncEvent,
		keyPrefixes ...string) (WatchRegistration, error)
}

KeyValProtoWatcher is used by plugin to subscribe to both data change events and data resync events. Multiple keys can be specified, the caller will be subscribed to events on each key. See README.md for description of the Events.

type LazyValue

type LazyValue interface {
	// GetValue gets the current in the data change event.
	// The caller must provide an address of a proto message buffer
	// for each value.
	// returns:
	// - revision associated with the latest change in the key-value pair
	// - error if value argument can not be properly filled
	GetValue(value proto.Message) error
}

LazyValue defines value that is unmarshalled into proto message on demand. The reason for defining interface with only one method is primary to unify interfaces in this package

type LazyValueWithRev

type LazyValueWithRev interface {
	LazyValue
	WithRevision
}

LazyValueWithRev defines value that is unmarshalled into proto message on demand with a revision. The reason for defining interface with only one method is primary to unify interfaces in this package

type ProtoWatchResp

type ProtoWatchResp interface {
	ChangeValue
	WithKey
	WithPrevValue
}

ProtoWatchResp contains changed value

type PutDel

type PutDel string

PutDel enumerates 'Put' (meaning 'Create' or 'Update') and 'Delete' operations

const (
	// Put represents a Create or Update operation
	Put PutDel = "Put"
	// Delete operation
	Delete PutDel = "Delete"
)

type PutOption

type PutOption interface {
	//PutOptionMark is just for marking implementation that it implements this interface
	PutOptionMark()
}

PutOption defines options for Put operation. The particular options can be found below.

type PutOptionMarker

type PutOptionMarker struct{}

PutOptionMarker is meant for anonymous composition in With*Opt structs

func (*PutOptionMarker) PutOptionMark

func (marker *PutOptionMarker) PutOptionMark()

PutOptionMark is just for marking implementation that it implements this interface

type ResyncEvent

type ResyncEvent interface {
	CallbackResult

	GetValues() map[string]KeyValIterator
}

ResyncEvent is used as the data type for the resync channel (see the ifplugin API)

type WatchRegistration

type WatchRegistration interface {
	io.Closer
}

WatchRegistration is a facade that avoids importing the io.Closer package into Agent plugin implementations.

type WithChangeType

type WithChangeType interface {
	GetChangeType() PutDel
}

WithChangeType is a helper interface which intent is to ensure that same method declaration is used in different interfaces (composition of interfaces)

type WithKey

type WithKey interface {
	// GetKey returns the key of the pair
	GetKey() string
}

WithKey is a helper interface which intent is to ensure that same method declaration is used in different interfaces (composition of interfaces)

type WithPrefixOpt

type WithPrefixOpt struct {
	DelOptionMarker
}

WithPrefixOpt applies an operation to all items with the specified prefix.

func WithPrefix

func WithPrefix() *WithPrefixOpt

WithPrefix creates new instance of WithPrefixOpt.

type WithPrevValue

type WithPrevValue interface {
	// GetPrevValue gets previous value in the data change event.
	// The caller must provide an address of a proto message buffer
	// for each value.
	// returns:
	// - prevValueExist flag is set to 'true' if prevValue was filled
	// - error if value argument can not be properly filled
	GetPrevValue(prevValue proto.Message) (prevValueExist bool, err error)
}

WithPrevValue is a helper interface which intent is to ensure that same method declaration is used in different interfaces (composition of interfaces)

type WithRevision

type WithRevision interface {
	// GetRevision gets revision of current value
	GetRevision() (rev int64)
}

WithRevision is a helper interface which intent is to ensure that same method declaration is used in different interfaces (composition of interfaces)

type WithTTLOpt

type WithTTLOpt struct {
	PutOptionMarker
	TTL time.Duration
}

WithTTLOpt defines a TTL for data being put. Once TTL elapses the data is removed from data store.

func WithTTL

func WithTTL(TTL time.Duration) *WithTTLOpt

WithTTL creates new instance of TTL option. Once TTL elapses data is removed. Beware: some implementation might be using TTL with lower precision.

Directories

Path Synopsis
Package grpcsync implements (in ALPHA VERSION) the gRPC transport client and server that satisfy the datasync API.
Package grpcsync implements (in ALPHA VERSION) the gRPC transport client and server that satisfy the datasync API.
Package kvdbsync implements the key-value data store client and server that satisfythe datasync API.
Package kvdbsync implements the key-value data store client and server that satisfythe datasync API.
local
Package local implements DB Transactions for the local "in memory" transport.
Package local implements DB Transactions for the local "in memory" transport.
Package restsync implements (in ALPHA VERSION) the datasync API for the HTTP/REST transport.
Package restsync implements (in ALPHA VERSION) the datasync API for the HTTP/REST transport.
Package resync implements the mechanism to notify previously registered plugins that the resync procedure needs to start.
Package resync implements the mechanism to notify previously registered plugins that the resync procedure needs to start.
Package syncbase defines common structures used in multiple datasync transports.
Package syncbase defines common structures used in multiple datasync transports.
msg
Package msg is a generated protocol buffer package.
Package msg is a generated protocol buffer package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL