datasync

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 10, 2024 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Overview

Package datasync defines the datasync API, which abstracts the data transport between app plugins and backend data sources. Data sources may be data stores, or clients connected to a message bus, or remote clients connected to CN-Infra app. Transport may be, for example, HTTP or gRPC.

These events are processed asynchronously. With each event, the app plugin also receives a separate callback which is used to propagate any errors encountered during the event processing back to the user of the datasync package. Successfully finalized event processing is signaled by sending nil value (meaning no error) via the associated callback.

See the examples under the dedicated examples package.

Index

Constants

View Source
const DefaultNotifTimeout = 2 * time.Second

DefaultNotifTimeout defines the default timeout for datasync notification delivery.

Variables

This section is empty.

Functions

This section is empty.

Types

type AggregatedRegistration

type AggregatedRegistration struct {
	Registrations []WatchRegistration
}

AggregatedRegistration is adapter that allows multiple registrations (WatchRegistration) to be aggregated in one. Close operation is applied collectively to all included registration.

func (*AggregatedRegistration) Close

func (wa *AggregatedRegistration) Close() error

Close every registration under the aggregator. This function implements WatchRegistration.Close().

func (*AggregatedRegistration) Register

func (wa *AggregatedRegistration) Register(resyncName, keyPrefix string) error

Register new key for all available aggregator objects. Call Register(keyPrefix) on specific registration to add the key from that registration only

func (*AggregatedRegistration) Unregister

func (wa *AggregatedRegistration) Unregister(keyPrefix string) error

Unregister closed registration of specific key under all available aggregator objects. Call Unregister(keyPrefix) on specific registration to remove the key from that registration only

type CallbackResult

type CallbackResult interface {
	// Done allows plugins that are processing data change/resync to send
	// feedback. If there was no error, Done(nil) needs to be called.
	Done(error)
}

CallbackResult can be used by an event receiver to indicate to the event producer whether an operation was successful (error is nil) or unsuccessful (error is not nil).

type ChangeEvent

type ChangeEvent interface {
	CallbackResult
	// GetContext returns the context associated with the event.
	GetContext() context.Context
	// GetChanges returns list of changes for this change event.
	GetChanges() []ProtoWatchResp
}

ChangeEvent is used to define the data type for the change channel (<changeChan> from KeyValProtoWatcher.Watch). A data change event contains a key identifying where the change happened and two values for data stored under that key: the value *before* the change (previous value) and the value *after* the change (current value).

type ChangeValue

type ChangeValue interface {
	WithChangeType
	KeyVal
}

ChangeValue represents a single propagated change.

type DelOption

type DelOption interface {
	// DelOptionMark is used to mark structures implementing DelOption
	// interface.
	DelOptionMark()
}

DelOption defines options for Del operation. The available options can be found below.

type DelOptionMarker

type DelOptionMarker struct{}

DelOptionMarker is meant for anonymous composition in With*Opt structs.

func (*DelOptionMarker) DelOptionMark

func (marker *DelOptionMarker) DelOptionMark()

DelOptionMark is used only to mark structures implementing DelOption interface.

type KVProtoWatchers

type KVProtoWatchers []KeyValProtoWatcher

KVProtoWatchers is an adapter that allows multiple watchers (KeyValProtoWatcher) to be aggregated in one. Watch request is delegated to all of them.

func (KVProtoWatchers) Watch

func (ta KVProtoWatchers) Watch(resyncName string, changeChan chan ChangeEvent,
	resyncChan chan ResyncEvent, keyPrefixes ...string) (WatchRegistration, error)

Watch subscribes to every transport available within transport aggregator. The function implements KeyValProtoWatcher.Watch().

type KVProtoWriters

type KVProtoWriters []KeyProtoValWriter

KVProtoWriters is an adapter that allows multiple writers (KeyProtoValWriter) in one. Put request is delegated to all of them.

func (KVProtoWriters) Put

func (ta KVProtoWriters) Put(key string, data proto.Message, opts ...PutOption) error

Put writes data to all aggregated transports. This function implements KeyProtoValWriter.Put().

type KeyProtoValWriter

type KeyProtoValWriter interface {
	// Put <data> to ETCD or to any other key-value based data transport
	// (from other Agent Plugins) under the key <key>.
	// See options.go for a list of available options.
	Put(key string, data proto.Message, opts ...PutOption) error
}

KeyProtoValWriter allows plugins to push their data changes to a data store.

type KeyVal

type KeyVal interface {
	WithKey
	LazyValue
	WithRevision
}

KeyVal represents a single key-value pair.

type KeyValIterator

type KeyValIterator interface {
	// GetNext retrieves the next value from the iterator context. The retrieved
	// value is unmarshaled and returned as <kv>. The allReceived flag is
	// set to true on the last KeyVal pair in the context.
	GetNext() (kv KeyVal, allReceived bool)
}

KeyValIterator is an iterator for KeyVal.

type KeyValProtoWatcher

type KeyValProtoWatcher interface {
	// Watch using ETCD or any other data transport.
	// <resyncName> is used for the name of the RESYNC subcription.
	// <changeChan> channel is used for delivery of data CHANGE events.
	// <resyncChan> channel is used for delivery of data RESYNC events.
	// <keyPrefix> is a variable list of keys to watch on.
	Watch(resyncName string, changeChan chan ChangeEvent, resyncChan chan ResyncEvent,
		keyPrefixes ...string) (WatchRegistration, error)
}

KeyValProtoWatcher is used by plugins to subscribe to both data change events and data resync events. Multiple keys can be specified and the caller will be subscribed to events on each key. See README.md for description of the Events.

type LazyValue

type LazyValue interface {
	// GetValue gets the current value in the data change event.
	// The caller must provide an address of a proto message buffer
	// as <value>.
	// returns:
	// - error if value argument can not be properly filled.
	GetValue(value proto.Message) error
}

LazyValue defines value that is unmarshaled into proto message on demand. The reason for defining interface with only one method is primarily to unify interfaces in this package.

type Op

type Op string

Op represents datasync operations.

const (
	// InvalidOp is default value
	// representing invalid operation.
	InvalidOp Op = ""
	// Put can represent Create or Update operation.
	Put Op = "Put"
	// Delete represents Delete operation
	Delete Op = "Delete"
)

type ProtoWatchResp

type ProtoWatchResp interface {
	ChangeValue
	WithPrevValue
}

ProtoWatchResp contains changed value.

type PutOption

type PutOption interface {
	// PutOptionMark is used only to mark structures implementing PutOption
	// interface.
	PutOptionMark()
}

PutOption defines options for Put operation. The available options can be found below.

type PutOptionMarker

type PutOptionMarker struct{}

PutOptionMarker is meant for anonymous composition in With*Opt structs.

func (*PutOptionMarker) PutOptionMark

func (marker *PutOptionMarker) PutOptionMark()

PutOptionMark is used only to mark structures implementing PutOption interface.

type ResyncEvent

type ResyncEvent interface {
	CallbackResult
	// GetContext returns the context associated with the event.
	GetContext() context.Context
	// GetValues returns key-value pairs sorted by key prefixes
	// (<keyPrefix> variable list from KeyValProtoWatcher.Watch).
	GetValues() map[string]KeyValIterator
}

ResyncEvent is used to define the data type for the resync channel (<resyncChan> from KeyValProtoWatcher.Watch).

type WatchRegistration

type WatchRegistration interface {
	// Add <keyPrefix> to adapter subscription under specific <resyncName>.
	// If called on registration returned by composite watcher, register
	// <keyPrefix> to all adapters. Returns error if there is no subscription
	// with provided resync name or prefix already exists
	Register(resyncName string, keyPrefix string) error

	// Unregister <keyPrefix> from adapter subscription. If called on registration
	// returned by composite watcher, unregister <keyPrefix> from all adapters
	Unregister(keyPrefix string) error

	io.Closer
}

WatchRegistration is a facade that avoids importing the io.Closer package into Agent plugin implementations.

type WithChangeType

type WithChangeType interface {
	GetChangeType() Op
}

WithChangeType is a simple helper interface embedded by all interfaces that require access to change type information. The intent is to ensure that the same method declaration is used in different interfaces (composition of interfaces).

type WithClientLifetimeTTLOpt

type WithClientLifetimeTTLOpt struct {
	PutOptionMarker
}

WithClientLifetimeTTLOpt defines option to Put a value for the lifetime of client. Once client is closed TTL is no longer renewed and value gets removed.

func WithClientLifetimeTTL

func WithClientLifetimeTTL() *WithClientLifetimeTTLOpt

WithClientLifetimeTTL creates a new instance of ClientLifetimeTTL option

type WithKey

type WithKey interface {
	// GetKey returns the key of the pair
	GetKey() string
}

WithKey is a simple helper interface embedded by all interfaces that require access to the key of the key-value pair. The intent is to ensure that the same method declaration is used in different interfaces (composition of interfaces).

type WithPrefixOpt

type WithPrefixOpt struct {
	DelOptionMarker
}

WithPrefixOpt applies an operation to all items with the specified prefix.

func WithPrefix

func WithPrefix() *WithPrefixOpt

WithPrefix creates a new instance of WithPrefixOpt.

type WithPrevValue

type WithPrevValue interface {
	// GetPrevValue gets the previous value in the data change event.
	// The caller must provide an address of a proto message buffer
	// as <prevValue>.
	// returns:
	// - <prevValueExist> flag is set to 'true' if previous value does exist
	// - error if <prevValue> can not be properly filled
	GetPrevValue(prevValue proto.Message) (prevValueExist bool, err error)
}

WithPrevValue is a simple helper interface embedded by all interfaces that require access to the previous value. The intent is to ensure that the same method declaration is used in different interfaces (composition of interfaces).

type WithRevision

type WithRevision interface {
	// GetRevision gets revision of current value
	GetRevision() int64
}

WithRevision is a simple helper interface embedded by all interfaces that require access to the value revision. The intent is to ensure that the same method declaration is used in different interfaces (composition of interfaces).

type WithTTLOpt

type WithTTLOpt struct {
	PutOptionMarker
	TTL time.Duration
}

WithTTLOpt defines TTL for Put operation. Once TTL elapses, the associated data are removed from data store.

func WithTTL

func WithTTL(TTL time.Duration) *WithTTLOpt

WithTTL creates a new instance of TTL option. Once TTL elapses, the associated data are removed. Beware: some implementation might be using TTL with lower precision.

Directories

Path Synopsis
Package grpcsync implements (in ALPHA VERSION) the gRPC client and server that satisfies the datasync API (see the definition of the service in ../syncbase/msg package).
Package grpcsync implements (in ALPHA VERSION) the gRPC client and server that satisfies the datasync API (see the definition of the service in ../syncbase/msg package).
Package kvdbsync defines a key-value data store client API for unified access among key-value datastore servers.
Package kvdbsync defines a key-value data store client API for unified access among key-value datastore servers.
local
Package local implements DB Transactions for the local "in memory" transport.
Package local implements DB Transactions for the local "in memory" transport.
Package msgsync propagates protobuf messages to a particular topic.
Package msgsync propagates protobuf messages to a particular topic.
Package restsync implements (in ALPHA VERSION) the datasync API for the HTTP/REST transport.
Package restsync implements (in ALPHA VERSION) the datasync API for the HTTP/REST transport.
Package resync implements the mechanism to notify previously registered plugins that the resync procedure needs to start.
Package resync implements the mechanism to notify previously registered plugins that the resync procedure needs to start.
Package syncbase defines common structures used in multiple datasync transports.
Package syncbase defines common structures used in multiple datasync transports.
msg
Package msg contains: - the definition of PROTOBUF structures and gRPC service, - helpers for mapping between PROTOBUF structures & the datasync_api.go.
Package msg contains: - the definition of PROTOBUF structures and gRPC service, - helpers for mapping between PROTOBUF structures & the datasync_api.go.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL