Documentation ¶
Overview ¶
Package etcd implements the key-value Data Broker client API for the etcd key-value data store. See cn-infra/db/keyval for the definition of the key-value Data Broker client API.
The entity that provides access to the data store is called BytesConnectionEtcd.
+-----------------------+ crud/watch ______ | BytesConnectionEtcd | ----> | ETCD | +-----------------------+ []byte +------+
To create a BytesConnectionEtcd, use the following function
import "github.com/ligato/cn-infra/db/keyval/etcd" db := etcd.NewEtcdConnectionWithBytes(config)
config is a path to a file with the following format:
key-file: <filepath> ca-file: <filepath> cert-file: <filepath> insecure-skip-tls-verify: <bool> insecure-transport: <bool> dial-timeout: <nanoseconds> operation-timeout: <nanoseconds> endpoints: - <address_1>:<port> - <address_2>:<port> - .. - <address_n>:<port>
Connection to etcd is established using the provided config behind the scenes.
Alternatively, you may connect to etcd by yourself and initialize the connection object with a given client.
db := etcd.NewEtcdConnectionUsingClient(client)
Created BytesConnectionEtcd implements Broker and KeyValProtoWatcher interfaces. The example of use can be seen below.
To insert single key-value pair into etcd run:
db.Put(key, data)
To remove a value identified by key:
datasync.Delete(key)
In addition to single key-value pair approach, the transaction API is provided. Transaction executes multiple operations in a more efficient way than one by one execution.
// create new transaction txn := db.NewTxn() // add put operation into the transaction txn.Put(key, value) // add delete operation into the transaction txn.Delete(key, value) // try to commit the transaction err := txn.Commit(context.Background())
To retrieve a value identified by key:
data, found, rev, err := db.GetValue(key) if err == nil && found { ... }
To retrieve all values matching a key prefix:
itr, err := db.ListValues(key) if err != nil { for { data, allReceived, rev, err := itr.GetNext() if allReceived { break } if err != nil { return err } process data... } }
To retrieve values in specified key range:
itr, err := db.ListValues(key) if err != nil { for { data, rev, allReceived := itr.GetNext() if allReceived { break } process data... } }
To list keys without fetching the values:
itr, err := db.ListKeys(prefix) if err != nil { for { key, rev, allReceived := itr.GetNext() if allReceived { break } process key... } }
To start watching changes in etcd:
respChan := make(chan keyval.BytesWatchResp, 0) err = dbw.Watch(respChan, key) if err != nil { os.Exit(1) } for { select { case resp := <-respChan: switch resp.GetChangeType() { case data.Put: key := resp.GetKey() value := resp.GetValue() rev := resp.GetRevision() case data.Delete: ... } } }
BytesConnectionEtcd also allows to create proxy instances (BytesBrokerWatcherEtcd) using NewBroker and NewWatcher methods. Both of them accept the prefix argument. The prefix will be automatically prepended to all keys in put/delete requests made from the proxy instances. In case of get-like calls (GetValue, ListValues, ...) the prefix is trimmed from the key of the returned values. They contain only the part following the prefix in the key field. The created proxy instances share the connection of the BytesConnectionEtcd.
+-----------------------+ | BytesBrokerWatcherEtcd | +-----------------------+ | | -----------------> +-----------------------+ crud/watch ______ | BytesConnectionEtcd | ----> | ETCD | -----------------> +-----------------------+ ([]byte) +------+ | | +------------------------+ | BytesBrokerWatcherEtcd | +------------------------+
To create proxy instances, type:
prefixedBroker := db.NewBroker(prefix) prefixedWatcher := db.NewWatcher(prefix)
The usage is the same as shown above.
The package also provides a proto decorator that simplifies the manipulation of proto modelled data. The proto decorator accepts arguments of type proto.message and marshals them into []byte slices.
+-------------------+--------------------+ crud/watch ______ | ProtoWrapperEtcd | ProtoWrapperEtcd | ----> | ETCD | +-------------------+--------------------+ ([]byte) +------+ (proto.Message)
The ProtoWrapperEtcd API is very similar to the BytesConnectionEtcd API. The difference is that arguments of type []byte are replaced by arguments of type proto.Message, and in some case one of the return values is transformed into an output argument.
Example of the decorator initialization:
// conn is BytesConnectionEtcd initialized as shown at the top of the page protoBroker := etcd.NewProtoWrapperEtcd(conn)
The only difference in Put/Delete functions is the type of the argument; apart from that the usage is the same as described above.
Example of retrieving single key-value pair using proto decorator:
// if the value exists it is unmarshalled into the msg found, rev, err := protoBroker.GetValue(key, msg)
To retrieve all values matching the key prefix use
resp, err := protoDb.ListValues(path) if err != nil { os.Exit(1) } for { // phonebook.Contact is a proto modelled structure (implementing proto.Message interface) contact := &phonebook.Contact{} // the value is unmarshaled into the contact variable kv, allReceived := resp.GetNext() if allReceived { break } err = kv.GetValue(contact) if err != nil { os.Exit(1) } ... use contact }
The Etcd plugin
plugin := etcd.Plugin{} // initialization by agent core
Plugin allows to create a broker
broker := plugin.NewBroker(prefix)
and watcher
watcher := plugin.NewWatcher(prefix)
Index ¶
- Variables
- type BytesBrokerWatcherEtcd
- func (pdb *BytesBrokerWatcherEtcd) Delete(key string, opts ...datasync.DelOption) (existed bool, err error)
- func (pdb *BytesBrokerWatcherEtcd) GetValue(key string) (data []byte, found bool, revision int64, err error)
- func (pdb *BytesBrokerWatcherEtcd) ListKeys(prefix string) (keyval.BytesKeyIterator, error)
- func (pdb *BytesBrokerWatcherEtcd) ListValues(key string) (keyval.BytesKeyValIterator, error)
- func (pdb *BytesBrokerWatcherEtcd) NewTxn() keyval.BytesTxn
- func (pdb *BytesBrokerWatcherEtcd) Put(key string, data []byte, opts ...datasync.PutOption) error
- func (pdb *BytesBrokerWatcherEtcd) Watch(resp func(keyval.BytesWatchResp), closeChan chan string, keys ...string) error
- type BytesConnectionEtcd
- func (db *BytesConnectionEtcd) Close() error
- func (db *BytesConnectionEtcd) Compact(rev ...int64) (int64, error)
- func (db *BytesConnectionEtcd) Delete(key string, opts ...datasync.DelOption) (existed bool, err error)
- func (db *BytesConnectionEtcd) GetRevision() (revision int64, err error)
- func (db *BytesConnectionEtcd) GetValue(key string) (data []byte, found bool, revision int64, err error)
- func (db *BytesConnectionEtcd) GetValueRev(key string, rev int64) (data []byte, found bool, revision int64, err error)
- func (db *BytesConnectionEtcd) ListKeys(prefix string) (keyval.BytesKeyIterator, error)
- func (db *BytesConnectionEtcd) ListValues(key string) (keyval.BytesKeyValIterator, error)
- func (db *BytesConnectionEtcd) ListValuesRange(fromPrefix string, toPrefix string) (keyval.BytesKeyValIterator, error)
- func (db *BytesConnectionEtcd) NewBroker(prefix string) keyval.BytesBroker
- func (db *BytesConnectionEtcd) NewTxn() keyval.BytesTxn
- func (db *BytesConnectionEtcd) NewWatcher(prefix string) keyval.BytesWatcher
- func (db *BytesConnectionEtcd) Put(key string, binData []byte, opts ...datasync.PutOption) error
- func (db *BytesConnectionEtcd) PutIfNotExists(key string, binData []byte) (succeeded bool, err error)
- func (db *BytesConnectionEtcd) Watch(resp func(keyval.BytesWatchResp), closeChan chan string, keys ...string) error
- type BytesWatchDelResp
- type BytesWatchPutResp
- type ClientConfig
- type Config
- type Deps
- type Option
- type Plugin
- func (p *Plugin) AfterInit() error
- func (p *Plugin) Close() error
- func (p *Plugin) Compact(rev ...int64) (toRev int64, err error)
- func (p *Plugin) Disabled() (disabled bool)
- func (p *Plugin) GetPluginName() infra.PluginName
- func (p *Plugin) Init() (err error)
- func (p *Plugin) NewBroker(keyPrefix string) keyval.ProtoBroker
- func (p *Plugin) NewWatcher(keyPrefix string) keyval.ProtoWatcher
- func (p *Plugin) OnConnect(callback func() error)
- func (p *Plugin) PutIfNotExists(key string, value []byte) (succeeded bool, err error)
Constants ¶
This section is empty.
Variables ¶
var DefaultPlugin = *NewPlugin()
DefaultPlugin is a default instance of Plugin.
Functions ¶
This section is empty.
Types ¶
type BytesBrokerWatcherEtcd ¶
BytesBrokerWatcherEtcd uses BytesConnectionEtcd to access the datastore. The connection can be shared among multiple BytesBrokerWatcherEtcd. In case of accessing a particular subtree in etcd only, BytesBrokerWatcherEtcd allows defining a keyPrefix that is prepended to all keys in its methods in order to shorten keys used in arguments.
func (*BytesBrokerWatcherEtcd) Delete ¶
func (pdb *BytesBrokerWatcherEtcd) Delete(key string, opts ...datasync.DelOption) (existed bool, err error)
Delete calls 'Delete' function of the underlying BytesConnectionEtcd. KeyPrefix defined in constructor is prepended to the key argument.
func (*BytesBrokerWatcherEtcd) GetValue ¶
func (pdb *BytesBrokerWatcherEtcd) GetValue(key string) (data []byte, found bool, revision int64, err error)
GetValue calls 'GetValue' function of the underlying BytesConnectionEtcd. KeyPrefix defined in constructor is prepended to the key argument.
func (*BytesBrokerWatcherEtcd) ListKeys ¶
func (pdb *BytesBrokerWatcherEtcd) ListKeys(prefix string) (keyval.BytesKeyIterator, error)
ListKeys calls 'ListKeys' function of the underlying BytesConnectionEtcd. KeyPrefix defined in constructor is prepended to the argument.
func (*BytesBrokerWatcherEtcd) ListValues ¶
func (pdb *BytesBrokerWatcherEtcd) ListValues(key string) (keyval.BytesKeyValIterator, error)
ListValues calls 'ListValues' function of the underlying BytesConnectionEtcd. KeyPrefix defined in constructor is prepended to the key argument. The prefix is removed from the keys of the returned values.
func (*BytesBrokerWatcherEtcd) NewTxn ¶
func (pdb *BytesBrokerWatcherEtcd) NewTxn() keyval.BytesTxn
NewTxn creates a new transaction. KeyPrefix defined in constructor will be prepended to all key arguments in the transaction.
func (*BytesBrokerWatcherEtcd) Put ¶
Put calls 'Put' function of the underlying BytesConnectionEtcd. KeyPrefix defined in constructor is prepended to the key argument.
func (*BytesBrokerWatcherEtcd) Watch ¶
func (pdb *BytesBrokerWatcherEtcd) Watch(resp func(keyval.BytesWatchResp), closeChan chan string, keys ...string) error
Watch starts subscription for changes associated with the selected <keys>. KeyPrefix defined in constructor is prepended to all <keys> in the argument list. The prefix is removed from the keys returned in watch events. Watch events will be delivered to <resp> callback.
type BytesConnectionEtcd ¶
BytesConnectionEtcd encapsulates the connection to etcd. It provides API to read/edit and watch values from etcd.
func NewEtcdConnectionUsingClient ¶
func NewEtcdConnectionUsingClient(etcdClient *clientv3.Client, log logging.Logger) (*BytesConnectionEtcd, error)
NewEtcdConnectionUsingClient creates a new instance of BytesConnectionEtcd using the provided etcd client. This constructor is used primarily for testing.
func NewEtcdConnectionWithBytes ¶
func NewEtcdConnectionWithBytes(config ClientConfig, log logging.Logger) (*BytesConnectionEtcd, error)
NewEtcdConnectionWithBytes creates new connection to etcd based on the given config file.
func (*BytesConnectionEtcd) Close ¶
func (db *BytesConnectionEtcd) Close() error
Close closes the connection to ETCD.
func (*BytesConnectionEtcd) Compact ¶
func (db *BytesConnectionEtcd) Compact(rev ...int64) (int64, error)
Compact compacts the ETCD database to specific revision
func (*BytesConnectionEtcd) Delete ¶
func (db *BytesConnectionEtcd) Delete(key string, opts ...datasync.DelOption) (existed bool, err error)
Delete removes data identified by the <key>.
func (*BytesConnectionEtcd) GetRevision ¶
func (db *BytesConnectionEtcd) GetRevision() (revision int64, err error)
GetRevision returns current revision of ETCD database
func (*BytesConnectionEtcd) GetValue ¶
func (db *BytesConnectionEtcd) GetValue(key string) (data []byte, found bool, revision int64, err error)
GetValue retrieves one key-value item from the data store. The item is identified by the provided <key>.
func (*BytesConnectionEtcd) GetValueRev ¶
func (db *BytesConnectionEtcd) GetValueRev(key string, rev int64) (data []byte, found bool, revision int64, err error)
GetValueRev retrieves one key-value item from the data store. The item is identified by the provided <key>.
func (*BytesConnectionEtcd) ListKeys ¶
func (db *BytesConnectionEtcd) ListKeys(prefix string) (keyval.BytesKeyIterator, error)
ListKeys returns an iterator that allows traversing all keys from data store that share the given <prefix>.
func (*BytesConnectionEtcd) ListValues ¶
func (db *BytesConnectionEtcd) ListValues(key string) (keyval.BytesKeyValIterator, error)
ListValues returns an iterator that enables traversing values stored under the provided <key>.
func (*BytesConnectionEtcd) ListValuesRange ¶
func (db *BytesConnectionEtcd) ListValuesRange(fromPrefix string, toPrefix string) (keyval.BytesKeyValIterator, error)
ListValuesRange returns an iterator that enables traversing values stored under the keys from a given range.
func (*BytesConnectionEtcd) NewBroker ¶
func (db *BytesConnectionEtcd) NewBroker(prefix string) keyval.BytesBroker
NewBroker creates a new instance of a proxy that provides access to etcd. The proxy will reuse the connection from BytesConnectionEtcd. <prefix> will be prepended to the key argument in all calls from the created BytesBrokerWatcherEtcd. To avoid using a prefix, pass keyval. Root constant as an argument.
func (*BytesConnectionEtcd) NewTxn ¶
func (db *BytesConnectionEtcd) NewTxn() keyval.BytesTxn
NewTxn creates a new transaction. A transaction can hold multiple operations that are all committed to the data store together. After a transaction has been created, one or more operations (put or delete) can be added to the transaction before it is committed.
func (*BytesConnectionEtcd) NewWatcher ¶
func (db *BytesConnectionEtcd) NewWatcher(prefix string) keyval.BytesWatcher
NewWatcher creates a new instance of a proxy that provides access to etcd. The proxy will reuse the connection from BytesConnectionEtcd. <prefix> will be prepended to the key argument in all calls on created BytesBrokerWatcherEtcd. To avoid using a prefix, pass keyval. Root constant as an argument.
func (*BytesConnectionEtcd) Put ¶
Put writes the provided key-value item into the data store. Returns an error if the item could not be written, nil otherwise.
func (*BytesConnectionEtcd) PutIfNotExists ¶
func (db *BytesConnectionEtcd) PutIfNotExists(key string, binData []byte) (succeeded bool, err error)
PutIfNotExists puts given key-value pair into etcd if there is no value set for the key. If the put was successful succeeded is true. If the key already exists succeeded is false and the value for the key is untouched.
func (*BytesConnectionEtcd) Watch ¶
func (db *BytesConnectionEtcd) Watch(resp func(keyval.BytesWatchResp), closeChan chan string, keys ...string) error
Watch starts subscription for changes associated with the selected keys. Watch events will be delivered to <resp> callback. closeCh is a channel closed when Close method is called.It is leveraged to stop go routines from specific subscription, or only goroutine with provided key prefix
type BytesWatchDelResp ¶
type BytesWatchDelResp struct {
// contains filtered or unexported fields
}
BytesWatchDelResp is sent when a key-value pair has been removed.
func NewBytesWatchDelResp ¶
func NewBytesWatchDelResp(key string, prevValue []byte, revision int64) *BytesWatchDelResp
NewBytesWatchDelResp creates an instance of BytesWatchDelResp.
func (*BytesWatchDelResp) GetChangeType ¶
func (resp *BytesWatchDelResp) GetChangeType() datasync.Op
GetChangeType returns "Delete" for BytesWatchPutResp.
func (*BytesWatchDelResp) GetKey ¶
func (resp *BytesWatchDelResp) GetKey() string
GetKey returns the key that a value has been deleted from.
func (*BytesWatchDelResp) GetPrevValue ¶
func (resp *BytesWatchDelResp) GetPrevValue() []byte
GetPrevValue returns previous value for BytesWatchDelResp.
func (*BytesWatchDelResp) GetRevision ¶
func (resp *BytesWatchDelResp) GetRevision() int64
GetRevision returns the revision associated with the 'delete' operation.
func (*BytesWatchDelResp) GetValue ¶
func (resp *BytesWatchDelResp) GetValue() []byte
GetValue returns nil for BytesWatchDelResp.
type BytesWatchPutResp ¶
type BytesWatchPutResp struct {
// contains filtered or unexported fields
}
BytesWatchPutResp is sent when new key-value pair has been inserted or the value has been updated.
func NewBytesWatchPutResp ¶
func NewBytesWatchPutResp(key string, value []byte, prevValue []byte, revision int64) *BytesWatchPutResp
NewBytesWatchPutResp creates an instance of BytesWatchPutResp.
func (*BytesWatchPutResp) GetChangeType ¶
func (resp *BytesWatchPutResp) GetChangeType() datasync.Op
GetChangeType returns "Put" for BytesWatchPutResp.
func (*BytesWatchPutResp) GetKey ¶
func (resp *BytesWatchPutResp) GetKey() string
GetKey returns the key that the value has been inserted under.
func (*BytesWatchPutResp) GetPrevValue ¶
func (resp *BytesWatchPutResp) GetPrevValue() []byte
GetPrevValue returns the previous value that has been inserted.
func (*BytesWatchPutResp) GetRevision ¶
func (resp *BytesWatchPutResp) GetRevision() int64
GetRevision returns the revision associated with the 'put' operation.
func (*BytesWatchPutResp) GetValue ¶
func (resp *BytesWatchPutResp) GetValue() []byte
GetValue returns the value that has been inserted.
type ClientConfig ¶
type ClientConfig struct { *clientv3.Config // OpTimeout is the maximum amount of time the client will wait for a pending // operation before timing out. OpTimeout time.Duration }
ClientConfig extends clientv3.Config with configuration options introduced by this package.
func ConfigToClient ¶
func ConfigToClient(yc *Config) (*ClientConfig, error)
ConfigToClient transforms yaml configuration <yc> modelled by Config into ClientConfig, which is ready for use with the underlying coreos/etcd package. If the etcd endpoint addresses are not specified in the configuration, the function will query the ETCD_ENDPOINTS environment variable for a non-empty value. If neither the config nor the environment specify the endpoint location, a default address "127.0.0.1:2379" is assumed. The function may return error only if TLS connection is selected and the CA or client certificate is not accessible/valid.
type Config ¶
type Config struct { Endpoints []string `json:"endpoints"` DialTimeout time.Duration `json:"dial-timeout"` OpTimeout time.Duration `json:"operation-timeout"` InsecureTransport bool `json:"insecure-transport"` InsecureSkipTLSVerify bool `json:"insecure-skip-tls-verify"` Certfile string `json:"cert-file"` Keyfile string `json:"key-file"` CAfile string `json:"ca-file"` AutoCompact time.Duration `json:"auto-compact"` ReconnectResync bool `json:"resync-after-reconnect"` AllowDelayedStart bool `json:"allow-delayed-start"` ReconnectInterval time.Duration `json:"reconnect-interval"` }
Config represents a part of the etcd configuration that can be loaded from a file. Usually, the Config is next transformed into ClientConfig using ConfigToClient() function for use with the coreos/etcd package.
type Deps ¶
type Deps struct { infra.PluginDeps StatusCheck statuscheck.PluginStatusWriter // inject Resync *resync.Plugin }
Deps lists dependencies of the etcd plugin. If injected, etcd plugin will use StatusCheck to signal the connection status.
type Option ¶ added in v1.5.0
type Option func(*Plugin)
Option is a function that can be used in NewPlugin to customize Plugin.
type Plugin ¶
Plugin implements etcd plugin.
func NewPlugin ¶ added in v1.5.0
NewPlugin creates a new Plugin with the provided Options.
func (*Plugin) AfterInit ¶
AfterInit registers ETCD plugin to status check if needed
func (*Plugin) Compact ¶
Compact compatcs the ETCD database to the specific revision
func (*Plugin) Disabled ¶
Disabled returns *true* if the plugin is not in use due to missing etcd configuration.
func (*Plugin) GetPluginName ¶ added in v1.5.0
func (p *Plugin) GetPluginName() infra.PluginName
GetPluginName returns name of the plugin
func (*Plugin) Init ¶
Init retrieves ETCD configuration and establishes a new connection with the etcd data store. If the configuration file doesn't exist or cannot be read, the returned error will be of os.PathError type. An untyped error is returned in case the file doesn't contain a valid YAML configuration. The function may also return error if TLS connection is selected and the CA or client certificate is not accessible(os.PathError)/valid(untyped). Check clientv3.New from coreos/etcd for possible errors returned in case the connection cannot be established.
func (*Plugin) NewBroker ¶
func (p *Plugin) NewBroker(keyPrefix string) keyval.ProtoBroker
NewBroker creates new instance of prefixed broker that provides API with arguments of type proto.Message.
func (*Plugin) NewWatcher ¶
func (p *Plugin) NewWatcher(keyPrefix string) keyval.ProtoWatcher
NewWatcher creates new instance of prefixed broker that provides API with arguments of type proto.Message.
func (*Plugin) OnConnect ¶ added in v1.5.0
OnConnect executes callback if plugin is connected, or gathers functions from all plugin with ETCD as dependency
func (*Plugin) PutIfNotExists ¶
PutIfNotExists puts given key-value pair into etcd if there is no value set for the key. If the put was successful succeeded is true. If the key already exists succeeded is false and the value for the key is untouched.