syncbase

package
v1.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 17, 2017 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Overview

Package syncbase defines common structures used in multiple datasync transports. The following reusable structures are defined:

- KeyValProtoWatcher maintains watch registrations/subscriptions.

  • Registry of the latest revisions of values per each key, synchronized by datasync.

- Default implementation of Events & Iterators interfaces defined in data_api.go.

- Events & Iterators in this package are reused by some datasync transports.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AggregateDone

func AggregateDone(events []func(chan error), done chan error)

AggregateDone can be reused to avoid repetitive code that triggers slice of events and waits until it is finished

Types

type Adapter

type Adapter struct {
	Watcher   datasync.KeyValProtoWatcher
	Publisher datasync.KeyProtoValWriter
}

Adapter implements datasync.TransportAdapter but allows optionally implement these Watch / Put

func (*Adapter) Put

func (adapter *Adapter) Put(key string, data proto.Message) error

Put using Kafka KeyValProtoWatcher Topic KeyProtoValWriter

func (*Adapter) Watch

func (adapter *Adapter) Watch(resyncName string, changeChan chan datasync.ChangeEvent,
	resyncChan chan datasync.ResyncEvent, keyPrefixes ...string) (datasync.WatchRegistration, error)

Watch using Kafka KeyValProtoWatcher Topic KeyValProtoWatcher

type Change

type Change struct {
	datasync.KeyVal
	// contains filtered or unexported fields
}

Change represents a single Key-value pair plus changeType

func NewChange

func NewChange(key string, value proto.Message, rev int64, changeType datasync.PutDel) *Change

NewChange creates a new instance of Change.

func NewChangeBytes

func NewChangeBytes(key string, value []byte, rev int64, changeType datasync.PutDel) *Change

NewChangeBytes creates a new instance of NewChangeBytes.

func (*Change) GetChangeType

func (kv *Change) GetChangeType() datasync.PutDel

GetChangeType returns type of the change.

type ChangeEvent

type ChangeEvent struct {
	Key        string
	ChangeType datasync.PutDel
	CurrVal    datasync.LazyValue
	CurrRev    int64
	PrevVal    datasync.LazyValue
	// contains filtered or unexported fields
}

ChangeEvent is a simple structure that implements interface datasync.ChangeEvent

func (*ChangeEvent) Done

func (ev *ChangeEvent) Done(err error)

Done val the call or logs err if there is no val & error occurred

func (*ChangeEvent) GetChangeType

func (ev *ChangeEvent) GetChangeType() datasync.PutDel

GetChangeType returns type of the event.

func (*ChangeEvent) GetKey

func (ev *ChangeEvent) GetKey() string

GetKey returns the Key associated with the change

func (*ChangeEvent) GetPrevValue

func (ev *ChangeEvent) GetPrevValue(prevVal proto.Message) (prevExists bool, err error)

GetPrevValue returns the value before change

func (*ChangeEvent) GetRevision

func (ev *ChangeEvent) GetRevision() int64

GetRevision - see the comments in the interface datasync.ChangeEvent

func (*ChangeEvent) GetValue

func (ev *ChangeEvent) GetValue(val proto.Message) (err error)

GetValue - see the comments in the interface datasync.ChangeEvent

type ChangeIterator

type ChangeIterator struct {
	// contains filtered or unexported fields
}

ChangeIterator is a simple in-memory implementation of data.Iterator

func NewChangeIterator

func NewChangeIterator(data []*Change) *ChangeIterator

NewChangeIterator creates a new instance of ChangeIterator.

func (*ChangeIterator) GetNext

func (it *ChangeIterator) GetNext() (kv datasync.KeyVal, changeType datasync.PutDel, allReceived bool)

GetNext TODO

type DoneCallback

type DoneCallback struct {
	Callback func(error)
}

DoneCallback is small reusable part that is embedded to other events using composition. It implements datasync.CallbackResult.

func (*DoneCallback) Done

func (ev *DoneCallback) Done(err error)

Done propagates error to the callback

type DoneChannel

type DoneChannel struct {
	DoneChan chan error
}

DoneChannel is small reusable part that is embedded to other events using composition. It implements datasync.CallbackResult.

func NewDoneChannel

func NewDoneChannel(doneChan chan error) *DoneChannel

NewDoneChannel creates a new instance of DoneChannel.

func (*DoneChannel) Done

func (ev *DoneChannel) Done(err error)

Done propagates error to the channel.

type KVIterator

type KVIterator struct {
	// contains filtered or unexported fields
}

KVIterator is a simple in memory implementation of data.Iterator

func NewKVIterator

func NewKVIterator(data []datasync.KeyVal) *KVIterator

NewKVIterator creates a new instance of KVIterator.

func (*KVIterator) GetNext

func (it *KVIterator) GetNext() (kv datasync.KeyVal, allReceived bool)

GetNext TODO

type KeyVal

type KeyVal struct {
	datasync.LazyValue
	// contains filtered or unexported fields
}

KeyVal represents a single Key-value pair

func NewKeyVal

func NewKeyVal(key string, value datasync.LazyValue, rev int64) *KeyVal

NewKeyVal creates a new instance of KeyVal.

func (*KeyVal) GetKey

func (kv *KeyVal) GetKey() string

GetKey returns the Key of the pair

func (*KeyVal) GetRevision

func (kv *KeyVal) GetRevision() int64

GetRevision returns revision associated with the latest change in the Key-value pair

type KeyValBytes

type KeyValBytes struct {
	// contains filtered or unexported fields
}

KeyValBytes represents a single Key-value pair

func NewKeyValBytes

func NewKeyValBytes(key string, value []byte, rev int64) *KeyValBytes

NewKeyValBytes creates a new instance of KeyValBytes.

func (*KeyValBytes) GetKey

func (kv *KeyValBytes) GetKey() string

GetKey returns the Key of the pair

func (*KeyValBytes) GetRevision

func (kv *KeyValBytes) GetRevision() int64

GetRevision returns revision associated with the latest change in the Key-value pair

func (*KeyValBytes) GetValue

func (kv *KeyValBytes) GetValue(message proto.Message) error

GetValue returns the value of the pair

type PrevRevisions

type PrevRevisions struct {
	// contains filtered or unexported fields
}

PrevRevisions maintains the map of keys & values with revision

func NewLatestRev

func NewLatestRev() *PrevRevisions

NewLatestRev is a constructor

func (*PrevRevisions) Del

func (r *PrevRevisions) Del(key string) (found bool, prev datasync.LazyValueWithRev)

Del deletes entry from revisions and returns previous value

func (*PrevRevisions) Get

func (r *PrevRevisions) Get(key string) (found bool, value datasync.LazyValueWithRev)

Get gets last proto.Message with it's revision

func (*PrevRevisions) ListKeys

func (r *PrevRevisions) ListKeys() []string

ListKeys returns all stored keys

func (*PrevRevisions) Put

func (r *PrevRevisions) Put(key string, val datasync.LazyValue) (
	found bool, prev datasync.LazyValueWithRev, currRev int64)

Put updates entry in the revisions and returns previous value

func (*PrevRevisions) PutWithRevision

func (r *PrevRevisions) PutWithRevision(key string, inCurrent datasync.LazyValueWithRev) (
	found bool, prev datasync.LazyValueWithRev)

PutWithRevision updates entry in the revisions and returns previous value

type Registry added in v1.0.3

type Registry struct {
	// contains filtered or unexported fields
}

Registry of subscriptions and latest revisions. This structure contains extracted reusable code among various datasync implementations. By having this code datasync plugins do not need to repeat code related management of subscriptions.

func NewRegistry added in v1.0.3

func NewRegistry() *Registry

NewRegistry creates reusable registry of subscriptions for a particular datasync plugin.

func (*Registry) LastRev added in v1.0.3

func (adapter *Registry) LastRev() *PrevRevisions

LastRev is just getter

func (*Registry) PropagateChanges added in v1.0.3

func (adapter *Registry) PropagateChanges(txData map[string]datasync.ChangeValue) error

PropagateChanges fills registered channels with the data

func (*Registry) PropagateResync added in v1.0.3

func (adapter *Registry) PropagateResync(txData map[string]datasync.ChangeValue) error

PropagateResync fills registered channels with the data

func (*Registry) Subscriptions added in v1.0.3

func (adapter *Registry) Subscriptions() map[string]*Subscription

Subscriptions returns the current subscriptions.

func (*Registry) Watch added in v1.0.3

func (adapter *Registry) Watch(resyncName string, changeChan chan datasync.ChangeEvent,
	resyncChan chan datasync.ResyncEvent, keyPrefixes ...string) (datasync.WatchRegistration, error)

Watch just appends channels

func (*Registry) WatchDataBase added in v1.0.3

func (adapter *Registry) WatchDataBase(resyncName string, changeChan chan datasync.ChangeEvent,
	resyncChan chan datasync.ResyncEvent, keyPrefixes ...string) (*WatchDataReg, error)

WatchDataBase just appends channels

type ResyncEventDB

type ResyncEventDB struct {
	*DoneChannel
	// contains filtered or unexported fields
}

ResyncEventDB implements interface datasync.ResyncEvent (see comments in there)

func NewResyncEvent

func NewResyncEvent(m map[string][]datasync.KeyVal) *ResyncEventDB

NewResyncEvent creates a new instance of ResyncEventDB using the give map of slice of KeyVal.

func NewResyncEventDB

func NewResyncEventDB(its map[string]datasync.KeyValIterator) *ResyncEventDB

NewResyncEventDB creates a new instance of ResyncEventDB using the given map of iterators.

func (*ResyncEventDB) GetValues

func (ev *ResyncEventDB) GetValues() map[string]datasync.KeyValIterator

GetValues returns values of the event.

type Subscription

type Subscription struct {
	ResyncName  string
	ChangeChan  chan datasync.ChangeEvent
	ResyncChan  chan datasync.ResyncEvent
	KeyPrefixes []string
}

Subscription TODO

type WatchDataReg

type WatchDataReg struct {
	ResyncName string

	CloseChan chan interface{}
	// contains filtered or unexported fields
}

WatchDataReg implements interface datasync.WatchDataRegistration

func (*WatchDataReg) Close

func (reg *WatchDataReg) Close() error

Close stops watching of particular KeyPrefixes.

Directories

Path Synopsis
Package msg is a generated protocol buffer package.
Package msg is a generated protocol buffer package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL