Documentation
¶
Overview ¶
Package etcdv3 implements the key-value Data Broker client API for the etcdv3 key-value data store. See cn-infra/db/keyval for the definition of the key-value Data Broker client API.
The entity that provides access to the data store is called BytesConnectionEtcd.
+-----------------------+ crud/watch ______ | BytesConnectionEtcd | ----> | ETCD | +-----------------------+ []byte +------+
To create a BytesConnectionEtcd use the following function
import "github.com/ligato/cn-infra/db/keyval/etcd" db := etcd.NewEtcdConnectionWithBytes(config)
config is a path to a file with the following format:
key-file: <filepath> ca-file: <filepath> cert-file: <filepath> insecure-skip-tls-verify: <bool> insecure-transport: <bool> dial-timeout: <nanoseconds> operation-timeout: <nanoseconds> endpoints: - <address_1>:<port> - <address_2>:<port> - .. - <address_n>:<port>
Connection to etcd is established using the provided config behind the scenes.
Alternatively, you may connect to etcd by yourself and initialize the connection object with a given client.
db := etcd.NewEtcdConnectionUsingClient(client)
Created BytesConnectionEtcd implements Broker and KeyValProtoWatcher interfaces. The example of use can be seen below.
To insert single key-value pair into etcd run:
db.Put(key, data)
To remove a value identified by key:
datasync.Delete(key)
In addition to single key-value pair approach, the transaction API is provided. Transaction executes multiple operations in a more efficient way than one by one execution.
// create new transaction txn := db.NewTxn() // add put operation into the transaction txn.Put(key, value) // add delete operation into the transaction txn.Delete(key, value) // try to commit the transaction err := txn.Commit()
To retrieve a value identified by key:
data, found, rev, err := db.GetValue(key) if err == nil && found { ... }
To retrieve all values matching a key prefix:
itr, err := db.ListValues(key) if err != nil { for { data, allReceived, rev, err := itr.GetNext() if allReceived { break } if err != nil { return err } process data... } }
To retrieve values in specified key range:
itr, err := db.ListValues(key) if err != nil { for { data, rev, allReceived := itr.GetNext() if allReceived { break } process data... } }
To list keys without fetching the values
itr, err := db.ListKeys(prefix) if err != nil { for { key, rev, allReceived := itr.GetNext() if allReceived { break } process key... } }
To start watching changes in etcd:
respChan := make(chan keyval.BytesWatchResp, 0) err = dbw.Watch(respChan, key) if err != nil { os.Exit(1) } for { select { case resp := <-respChan: switch resp.GetChangeType() { case data.Put: key := resp.GetKey() value := resp.GetValue() rev := resp.GetRevision() case data.Delete: ... } } }
BytesConnectionEtcd also allows to create proxy instances (BytesBrokerWatcherEtcd) using NewBroker and NewWatcher methods. Both of them accept the prefix argument. The prefix will be automatically prepended to all keys in put/delete requests made from the proxy instances. I n case of get-like calls (GetValue, ListValues, ...) the prefix is trimmed from the key of the returned values. They contain only a part following the prefix in the key field. The created proxy instances share the connection of the BytesConnectionEtcd.
+-----------------------+ | BytesBrokerWatcherEtcd | +-----------------------+ | | -----------------> +-----------------------+ crud/watch ______ | BytesConnectionEtcd | ----> | ETCD | -----------------> +-----------------------+ ([]byte) +------+ | | +------------------------+ | BytesBrokerWatcherEtcd | +------------------------+
To create proxy instances, type:
prefixedBroker := db.NewBroker(prefix) prefixedWatcher := db.NewWatcher(prefix)
The usage is the same as shown above.
The package also provides a proto decorator that simplifies the manipulation of proto modelled data. The proto decorator accepts arguments of type proto.message and marshals them into []byte slices.
+-------------------+--------------------+ crud/watch ______ | ProtoWrapperEtcd | ProtoWrapperEtcd | ----> | ETCD | +-------------------+--------------------+ ([]byte) +------+ (proto.Message)
The ProtoWrapperEtcd API is very similar to the BytesConnectionEtcd API. The difference is that arguments of type []byte are replaced by arguments of type proto.Message, and in some case one of the return values is transformed into an output argument.
Example of the decorator initialization:
// conn is BytesConnectionEtcd initialized as shown at the top of the page protoBroker := etcd.NewProtoWrapperEtcd(conn)
The only difference in Put/Delete functions is the type of the argument, apart from that the usage is the same as described above.
Example of retrieving single key-value pair using proto decorator:
// if the value exists it is unmarshalled into the msg found, rev, err := protoBroker.GetValue(key, msg)
To retrieve all values matching the key prefix use
resp, err := protoDb.ListValues(path) if err != nil { os.Exit(1) } for { // phonebook.Contact is a proto modelled structure (implementing proto.Message interface) contact := &phonebook.Contact{} // the value is unmarshaled into the contact variable kv, allReceived := resp.GetNext() if allReceived { break } err = kv.GetValue(contact) if err != nil { os.Exit(1) } ... use contact }
The Etcd plugin
plugin := etcdv3.Plugin{} // initialization by agent core
Plugin allows to create a broker
broker := plugin.NewBroker(prefix)
and watcher
watcher := plugin.NewWatcher(prefix)
Index ¶
- type BytesBrokerWatcherEtcd
- func (pdb *BytesBrokerWatcherEtcd) Delete(key string, opts ...datasync.DelOption) (existed bool, err error)
- func (pdb *BytesBrokerWatcherEtcd) GetValue(key string) (data []byte, found bool, revision int64, err error)
- func (pdb *BytesBrokerWatcherEtcd) ListKeys(prefix string) (keyval.BytesKeyIterator, error)
- func (pdb *BytesBrokerWatcherEtcd) ListValues(key string) (keyval.BytesKeyValIterator, error)
- func (pdb *BytesBrokerWatcherEtcd) ListValuesRange(fromPrefix string, toPrefix string) (keyval.BytesKeyValIterator, error)
- func (pdb *BytesBrokerWatcherEtcd) NewTxn() keyval.BytesTxn
- func (pdb *BytesBrokerWatcherEtcd) Put(key string, data []byte, opts ...datasync.PutOption) error
- func (pdb *BytesBrokerWatcherEtcd) Watch(resp func(keyval.BytesWatchResp), keys ...string) error
- type BytesConnectionEtcd
- func (db *BytesConnectionEtcd) Close() error
- func (db *BytesConnectionEtcd) Delete(key string, opts ...datasync.DelOption) (existed bool, err error)
- func (db *BytesConnectionEtcd) GetValue(key string) (data []byte, found bool, revision int64, err error)
- func (db *BytesConnectionEtcd) ListKeys(prefix string) (keyval.BytesKeyIterator, error)
- func (db *BytesConnectionEtcd) ListValues(key string) (keyval.BytesKeyValIterator, error)
- func (db *BytesConnectionEtcd) ListValuesRange(fromPrefix string, toPrefix string) (keyval.BytesKeyValIterator, error)
- func (db *BytesConnectionEtcd) NewBroker(prefix string) keyval.BytesBroker
- func (db *BytesConnectionEtcd) NewTxn() keyval.BytesTxn
- func (db *BytesConnectionEtcd) NewWatcher(prefix string) keyval.BytesWatcher
- func (db *BytesConnectionEtcd) Put(key string, binData []byte, opts ...datasync.PutOption) error
- func (db *BytesConnectionEtcd) Watch(resp func(keyval.BytesWatchResp), keys ...string) error
- type BytesWatchDelResp
- type BytesWatchPutResp
- type ClientConfig
- type Config
- type Deps
- type Plugin
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BytesBrokerWatcherEtcd ¶
BytesBrokerWatcherEtcd uses BytesConnectionEtcd to access the datastore. The connection can be shared among multiple BytesBrokerWatcherEtcd. In case of accessing a particular subtree in etcd only, BytesBrokerWatcherEtcd allows to define a keyPrefix that is prepended to all keys in its methods in order to shorten keys used in arguments.
func (*BytesBrokerWatcherEtcd) Delete ¶
func (pdb *BytesBrokerWatcherEtcd) Delete(key string, opts ...datasync.DelOption) (existed bool, err error)
Delete calls delete function of BytesConnectionEtcd. KeyPrefix defined in constructor is prepended to the key argument.
func (*BytesBrokerWatcherEtcd) GetValue ¶
func (pdb *BytesBrokerWatcherEtcd) GetValue(key string) (data []byte, found bool, revision int64, err error)
GetValue calls GetValue function of BytesConnectionEtcd. KeyPrefix defined in constructor is prepended to key argument.
func (*BytesBrokerWatcherEtcd) ListKeys ¶
func (pdb *BytesBrokerWatcherEtcd) ListKeys(prefix string) (keyval.BytesKeyIterator, error)
ListKeys calls ListKeys function of BytesConnectionEtcd. KeyPrefix defined in constructor is prepended to the argument.
func (*BytesBrokerWatcherEtcd) ListValues ¶
func (pdb *BytesBrokerWatcherEtcd) ListValues(key string) (keyval.BytesKeyValIterator, error)
ListValues calls ListValues function of BytesConnectionEtcd. KeyPrefix defined in constructor is prepended to key argument. The prefix is removed from the keys of the returned values.
func (*BytesBrokerWatcherEtcd) ListValuesRange ¶
func (pdb *BytesBrokerWatcherEtcd) ListValuesRange(fromPrefix string, toPrefix string) (keyval.BytesKeyValIterator, error)
ListValuesRange calls ListValuesRange function of BytesConnectionEtcd. KeyPrefix defined in constructor is prepended to the arguments. The prefix is removed from the keys of the returned values.
func (*BytesBrokerWatcherEtcd) NewTxn ¶
func (pdb *BytesBrokerWatcherEtcd) NewTxn() keyval.BytesTxn
NewTxn creates new transaction. KeyPrefix defined in constructor will be prepended to all key arguments in the transaction.
func (*BytesBrokerWatcherEtcd) Put ¶
Put calls Put function of BytesConnectionEtcd. KeyPrefix defined in constructor is prepended to key argument.
func (*BytesBrokerWatcherEtcd) Watch ¶
func (pdb *BytesBrokerWatcherEtcd) Watch(resp func(keyval.BytesWatchResp), keys ...string) error
Watch starts subscription for changes associated with the selected keys. KeyPrefix defined in constructor is prepended to all keys in the argument list. The prefix is removed from the keys used in watch events. Watch events will be delivered to respChan.
type BytesConnectionEtcd ¶
BytesConnectionEtcd encapsulates the connection to etcd. It provides API to read/edit and watch values from etcd.
func NewEtcdConnectionUsingClient ¶
func NewEtcdConnectionUsingClient(etcdClient *clientv3.Client, log logging.Logger) (*BytesConnectionEtcd, error)
NewEtcdConnectionUsingClient creates a new instance of BytesConnectionEtcd using the provided etcdv3 client
func NewEtcdConnectionWithBytes ¶
func NewEtcdConnectionWithBytes(config ClientConfig, log logging.Logger) (*BytesConnectionEtcd, error)
NewEtcdConnectionWithBytes creates new connection to etcd based on the given config file.
func (*BytesConnectionEtcd) Close ¶
func (db *BytesConnectionEtcd) Close() error
Close closes the connection to ETCD.
func (*BytesConnectionEtcd) Delete ¶
func (db *BytesConnectionEtcd) Delete(key string, opts ...datasync.DelOption) (existed bool, err error)
Delete removes data identified by the key.
func (*BytesConnectionEtcd) GetValue ¶
func (db *BytesConnectionEtcd) GetValue(key string) (data []byte, found bool, revision int64, err error)
GetValue retrieves one key-value item from the data store. The item is identified by the provided key.
func (*BytesConnectionEtcd) ListKeys ¶
func (db *BytesConnectionEtcd) ListKeys(prefix string) (keyval.BytesKeyIterator, error)
ListKeys is similar to the ListValues the difference is that values are not fetched
func (*BytesConnectionEtcd) ListValues ¶
func (db *BytesConnectionEtcd) ListValues(key string) (keyval.BytesKeyValIterator, error)
ListValues returns an iterator that enables to traverse values stored under the provided key.
func (*BytesConnectionEtcd) ListValuesRange ¶
func (db *BytesConnectionEtcd) ListValuesRange(fromPrefix string, toPrefix string) (keyval.BytesKeyValIterator, error)
ListValuesRange returns an iterator that enables to traverse values stored under the provided key.
func (*BytesConnectionEtcd) NewBroker ¶
func (db *BytesConnectionEtcd) NewBroker(prefix string) keyval.BytesBroker
NewBroker creates a new instance of a proxy that provides access to etcd. BytesConnectionEtcd is used to access the etcd Prefix will be prepend to key argument in all calls on created BytesBrokerWatcherEtcd. To avoid using a prefix pass keyval.Root constant as argument.
func (*BytesConnectionEtcd) NewTxn ¶
func (db *BytesConnectionEtcd) NewTxn() keyval.BytesTxn
NewTxn creates a new transaction. A transaction can holds multiple operations that are all committed to the data store together. After a transaction has been created, one or more operations (put or delete) can be added to the transaction before it is committed.
func (*BytesConnectionEtcd) NewWatcher ¶
func (db *BytesConnectionEtcd) NewWatcher(prefix string) keyval.BytesWatcher
NewWatcher creates a new instance of a proxy that provides access to etcd. BytesConnectionEtcd is used to access the etcd. Prefix will be prepend to key argument in all calls on created BytesBrokerWatcherEtcd. To avoid using a prefix pass keyval.Root constant as argument.
func (*BytesConnectionEtcd) Put ¶
Put writes the provided key-value item into the data store. Returns an error if the item could not be written, nil otherwise.
func (*BytesConnectionEtcd) Watch ¶
func (db *BytesConnectionEtcd) Watch(resp func(keyval.BytesWatchResp), keys ...string) error
Watch starts subscription for changes associated with the selected keys. Watch events will be delivered to respChan.
type BytesWatchDelResp ¶
type BytesWatchDelResp struct {
// contains filtered or unexported fields
}
BytesWatchDelResp is sent when a key-value pair has been removed
func NewBytesWatchDelResp ¶
func NewBytesWatchDelResp(key string, revision int64) *BytesWatchDelResp
NewBytesWatchDelResp creates an instance of BytesWatchDelResp
func (*BytesWatchDelResp) GetChangeType ¶
func (resp *BytesWatchDelResp) GetChangeType() datasync.PutDel
GetChangeType returns "Delete" for BytesWatchPutResp
func (*BytesWatchDelResp) GetKey ¶
func (resp *BytesWatchDelResp) GetKey() string
GetKey returns the key that has been deleted
func (*BytesWatchDelResp) GetRevision ¶
func (resp *BytesWatchDelResp) GetRevision() int64
GetRevision returns the revision associated with the delete operation
func (*BytesWatchDelResp) GetValue ¶
func (resp *BytesWatchDelResp) GetValue() []byte
GetValue returns nil for BytesWatchDelResp
type BytesWatchPutResp ¶
type BytesWatchPutResp struct {
// contains filtered or unexported fields
}
BytesWatchPutResp is sent when new key-value pair has been inserted or the value is updated
func NewBytesWatchPutResp ¶
func NewBytesWatchPutResp(key string, value []byte, revision int64) *BytesWatchPutResp
NewBytesWatchPutResp creates an instance of BytesWatchPutResp
func (*BytesWatchPutResp) GetChangeType ¶
func (resp *BytesWatchPutResp) GetChangeType() datasync.PutDel
GetChangeType returns "Put" for BytesWatchPutResp
func (*BytesWatchPutResp) GetKey ¶
func (resp *BytesWatchPutResp) GetKey() string
GetKey returns the key that has been inserted
func (*BytesWatchPutResp) GetRevision ¶
func (resp *BytesWatchPutResp) GetRevision() int64
GetRevision returns the revision associated with create action
func (*BytesWatchPutResp) GetValue ¶
func (resp *BytesWatchPutResp) GetValue() []byte
GetValue returns the value that has been inserted
type ClientConfig ¶
type ClientConfig struct { *clientv3.Config // OpTimeout is the maximum amount of time the client will wait on a pending operation before timing out. OpTimeout time.Duration }
ClientConfig extends clientv3.Config with configuration options introduced by this package.
func ConfigToClientv3 ¶
func ConfigToClientv3(yc *Config) (*ClientConfig, error)
ConfigToClientv3 transforms the configuration modelled by yaml structure into ClientConfig. If the endpoints are not specified the function tries to load endpoints ETCDV3_ENDPOINTS environment variable.
type Config ¶
type Config struct { Endpoints []string `json:"endpoints"` DialTimeout time.Duration `json:"dial-timeout"` OpTimeout time.Duration `json:"operation-timeout"` InsecureTransport bool `json:"insecure-transport"` InsecureSkipTLSVerify bool `json:"insecure-skip-tls-verify"` Certfile string `json:"cert-file"` Keyfile string `json:"key-file"` CAfile string `json:"ca-file"` }
Config represents a part of etcd configuration that can be loaded from a file. The Config might be afterwards transformed into ClientConfig using ConfigToClientv3 function.
type Deps ¶
type Deps struct {
localdeps.PluginInfraDeps // inject
}
Deps is here to group injected dependencies of plugin to not mix with other plugin fields.
type Plugin ¶
Plugin implements Plugin interface therefore can be loaded with other plugins
func FromExistingConnection ¶
func FromExistingConnection(connection keyval.CoreBrokerWatcher, sl servicelabel.ReaderAPI) *Plugin
FromExistingConnection is used mainly for testing
func (*Plugin) AfterInit ¶
AfterInit is called by the Agent Core after all plugins have been initialized.
func (*Plugin) Disabled ¶
Disabled if the plugin was not found
func (*Plugin) Init ¶
Init is called at plugin startup. The connection to etcd is established.