syncbase

package
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 28, 2017 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Overview

Package syncbase defines common structures used in multiple datasync transports. The following reusable structures are defined:

- KeyValProtoWatcher maintains watch registrations/subscriptions.

- Registry of latest revisions of values per each key synchronized by datasync.

- Default implementation of Events & Iterators interfaces defined in data_api.go.

- Events & Iterators in this package are reused by some but all datasync transports.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AggregateDone

func AggregateDone(events []func(chan error), done chan error)

AggregateDone can be reused to avoid repetitive code that triggers slice of events and waits until it is finished

Types

type Adapter

type Adapter struct {
	Watcher   datasync.KeyValProtoWatcher
	Publisher datasync.KeyProtoValWriter
}

Adapter implements datasync.TransportAdapter but allows optionally implement these Watch / Put

func (*Adapter) Put

func (adapter *Adapter) Put(key string, data proto.Message) error

Put using Kafka KeyValProtoWatcher Topic KeyProtoValWriter

func (*Adapter) Watch

func (adapter *Adapter) Watch(resyncName string, changeChan chan datasync.ChangeEvent,
	resyncChan chan datasync.ResyncEvent, keyPrefixes ...string) (datasync.WatchRegistration, error)

Watch using Kafka KeyValProtoWatcher Topic KeyValProtoWatcher

type Change

type Change struct {
	datasync.KeyVal
	// contains filtered or unexported fields
}

Change represents a single Key-value pair plus changeType

func NewChange

func NewChange(key string, value proto.Message, rev int64, changeType datasync.PutDel) *Change

NewChange creates a new instance of Change.

func NewChangeBytes

func NewChangeBytes(key string, value []byte, rev int64, changeType datasync.PutDel) *Change

NewChangeBytes creates a new instance of NewChangeBytes.

func (*Change) GetChangeType

func (kv *Change) GetChangeType() datasync.PutDel

GetChangeType returns type of the change.

type ChangeEvent

type ChangeEvent struct {
	Key        string
	ChangeType datasync.PutDel
	CurrVal    datasync.LazyValue
	CurrRev    int64
	PrevVal    datasync.LazyValue
	// contains filtered or unexported fields
}

ChangeEvent is a simple structure that implements interface datasync.ChangeEvent

func (*ChangeEvent) Done

func (ev *ChangeEvent) Done(err error)

Done val the call or logs err if there is no val & error occurred

func (*ChangeEvent) GetChangeType

func (ev *ChangeEvent) GetChangeType() datasync.PutDel

GetChangeType returns type of the event.

func (*ChangeEvent) GetKey

func (ev *ChangeEvent) GetKey() string

GetKey returns the Key associated with the change

func (*ChangeEvent) GetPrevValue

func (ev *ChangeEvent) GetPrevValue(prevVal proto.Message) (prevExists bool, err error)

GetPrevValue returns the value before change

func (*ChangeEvent) GetRevision

func (ev *ChangeEvent) GetRevision() int64

GetRevision - see the comments in the interface datasync.ChangeEvent

func (*ChangeEvent) GetValue

func (ev *ChangeEvent) GetValue(val proto.Message) (err error)

GetValue - see the comments in the interface datasync.ChangeEvent

type ChangeIterator

type ChangeIterator struct {
	// contains filtered or unexported fields
}

ChangeIterator is a simple in-memory implementation of data.Iterator

func NewChangeIterator

func NewChangeIterator(data []*Change) *ChangeIterator

NewChangeIterator creates a new instance of ChangeIterator.

func (*ChangeIterator) GetNext

func (it *ChangeIterator) GetNext() (kv datasync.KeyVal, changeType datasync.PutDel, allReceived bool)

GetNext TODO

type DoneCallback

type DoneCallback struct {
	Callback func(error)
}

DoneCallback is small reusable part that is embedded to other events using composition. It implements datasync.CallbackResult.

func (*DoneCallback) Done

func (ev *DoneCallback) Done(err error)

Done propagates error to the callback

type DoneChannel

type DoneChannel struct {
	DoneChan chan error
}

DoneChannel is small reusable part that is embedded to other events using composition. It implements datasync.CallbackResult.

func NewDoneChannel

func NewDoneChannel(doneChan chan error) *DoneChannel

NewDoneChannel creates a new instance of DoneChannel.

func (*DoneChannel) Done

func (ev *DoneChannel) Done(err error)

Done propagates error to the channel.

type KVIterator

type KVIterator struct {
	// contains filtered or unexported fields
}

KVIterator is a simple in memory implementation of data.Iterator

func NewKVIterator

func NewKVIterator(data []datasync.KeyVal) *KVIterator

NewKVIterator creates a new instance of KVIterator.

func (*KVIterator) GetNext

func (it *KVIterator) GetNext() (kv datasync.KeyVal, allReceived bool)

GetNext TODO

type KeyVal

type KeyVal struct {
	datasync.LazyValue
	// contains filtered or unexported fields
}

KeyVal represents a single Key-value pair

func NewKeyVal

func NewKeyVal(key string, value datasync.LazyValue, rev int64) *KeyVal

NewKeyVal creates a new instance of KeyVal.

func (*KeyVal) GetKey

func (kv *KeyVal) GetKey() string

GetKey returns the Key of the pair

func (*KeyVal) GetRevision

func (kv *KeyVal) GetRevision() int64

GetRevision returns revision associated with the latest change in the Key-value pair

type KeyValBytes

type KeyValBytes struct {
	// contains filtered or unexported fields
}

KeyValBytes represents a single Key-value pair

func NewKeyValBytes

func NewKeyValBytes(key string, value []byte, rev int64) *KeyValBytes

NewKeyValBytes creates a new instance of KeyValBytes.

func (*KeyValBytes) GetKey

func (kv *KeyValBytes) GetKey() string

GetKey returns the Key of the pair

func (*KeyValBytes) GetRevision

func (kv *KeyValBytes) GetRevision() int64

GetRevision returns revision associated with the latest change in the Key-value pair

func (*KeyValBytes) GetValue

func (kv *KeyValBytes) GetValue(message proto.Message) error

GetValue returns the value of the pair

type PrevRevisions

type PrevRevisions struct {
	// contains filtered or unexported fields
}

PrevRevisions maintains the map of keys & values with revision

func NewLatestRev

func NewLatestRev() *PrevRevisions

NewLatestRev is a constructor

func (*PrevRevisions) Del

func (r *PrevRevisions) Del(key string) (found bool, prev datasync.LazyValueWithRev)

Del deletes entry from revisions and returns previous value

func (*PrevRevisions) Get

func (r *PrevRevisions) Get(key string) (found bool, value datasync.LazyValueWithRev)

Get gets last proto.Message with it's revision

func (*PrevRevisions) ListKeys

func (r *PrevRevisions) ListKeys() []string

ListKeys returns all stored keys

func (*PrevRevisions) Put

func (r *PrevRevisions) Put(key string, val datasync.LazyValue) (
	found bool, prev datasync.LazyValueWithRev, currRev int64)

Put updates entry in the revisions and returns previous value

func (*PrevRevisions) PutWithRevision

func (r *PrevRevisions) PutWithRevision(key string, inCurrent datasync.LazyValueWithRev) (
	found bool, prev datasync.LazyValueWithRev)

PutWithRevision updates entry in the revisions and returns previous value

type ResyncEventDB

type ResyncEventDB struct {
	*DoneChannel
	// contains filtered or unexported fields
}

ResyncEventDB implements interface datasync.ResyncEvent (see comments in there)

func NewResyncEvent

func NewResyncEvent(m map[string][]datasync.KeyVal) *ResyncEventDB

NewResyncEvent creates a new instance of ResyncEventDB using the give map of slice of KeyVal.

func NewResyncEventDB

func NewResyncEventDB(its map[string]datasync.KeyValIterator) *ResyncEventDB

NewResyncEventDB creates a new instance of ResyncEventDB using the given map of iterators.

func (*ResyncEventDB) GetValues

func (ev *ResyncEventDB) GetValues() map[string]datasync.KeyValIterator

GetValues returns values of the event.

type Subscription

type Subscription struct {
	ResyncName  string
	ChangeChan  chan datasync.ChangeEvent
	ResyncChan  chan datasync.ResyncEvent
	KeyPrefixes []string
}

Subscription TODO

type WatchDataReg

type WatchDataReg struct {
	ResyncName string

	CloseChan chan interface{}
	// contains filtered or unexported fields
}

WatchDataReg implements interface datasync.WatchDataRegistration

func (*WatchDataReg) Close

func (reg *WatchDataReg) Close() error

Close stops watching of particular KeyPrefixes.

type Watcher

type Watcher struct {
	// contains filtered or unexported fields
}

Watcher propagates events using channels.

func NewWatcher

func NewWatcher() *Watcher

NewWatcher creates a new instance of KeyValProtoWatcher.

func (*Watcher) LastRev

func (adapter *Watcher) LastRev() *PrevRevisions

LastRev is just getter

func (*Watcher) PropagateChanges

func (adapter *Watcher) PropagateChanges(txData map[string]datasync.ChangeValue) error

PropagateChanges fills registered channels with the data

func (*Watcher) PropagateResync

func (adapter *Watcher) PropagateResync(txData map[string]datasync.ChangeValue) error

PropagateResync fills registered channels with the data

func (*Watcher) Subscriptions

func (adapter *Watcher) Subscriptions() map[string]*Subscription

Subscriptions returns the current subscriptions.

func (*Watcher) Watch

func (adapter *Watcher) Watch(resyncName string, changeChan chan datasync.ChangeEvent,
	resyncChan chan datasync.ResyncEvent, keyPrefixes ...string) (datasync.WatchRegistration, error)

Watch just appends channels

func (*Watcher) WatchDataBase

func (adapter *Watcher) WatchDataBase(resyncName string, changeChan chan datasync.ChangeEvent,
	resyncChan chan datasync.ResyncEvent, keyPrefixes ...string) (*WatchDataReg, error)

WatchDataBase just appends channels

Directories

Path Synopsis
Package msg is a generated protocol buffer package.
Package msg is a generated protocol buffer package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL