etcd

package
v2.5.0-alpha Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 13, 2019 License: Apache-2.0 Imports: 22 Imported by: 37

Documentation

Overview

Package etcd implements the key-value Data Broker client API for the etcd key-value data store. See cn-infra/db/keyval for the definition of the key-value Data Broker client API.

The entity that provides access to the data store is called BytesConnectionEtcd.

+-----------------------+       crud/watch         ______
|  BytesConnectionEtcd  |          ---->          | ETCD |
+-----------------------+        []byte           +------+

To create a BytesConnectionEtcd, use the following function

import  "github.com/ligato/cn-infra/db/keyval/etcd"

db := etcd.NewEtcdConnectionWithBytes(config)

config is a path to a file with the following format:

key-file: <filepath>
ca-file: <filepath>
cert-file: <filepath>
insecure-skip-tls-verify: <bool>
insecure-transport: <bool>
dial-timeout: <nanoseconds>
operation-timeout: <nanoseconds>
endpoints:
  - <address_1>:<port>
  - <address_2>:<port>
  - ..
  - <address_n>:<port>

Connection to etcd is established using the provided config behind the scenes.

Alternatively, you may connect to etcd by yourself and initialize the connection object with a given client.

db := etcd.NewEtcdConnectionUsingClient(client)

Created BytesConnectionEtcd implements Broker and KeyValProtoWatcher interfaces. The example of use can be seen below.

To insert single key-value pair into etcd run:

db.Put(key, data)

To remove a value identified by key:

datasync.Delete(key)

In addition to single key-value pair approach, the transaction API is provided. Transaction executes multiple operations in a more efficient way than one by one execution.

// create new transaction
txn := db.NewTxn()

// add put operation into the transaction
txn.Put(key, value)

// add delete operation into the transaction
txn.Delete(key, value)

// try to commit the transaction
err := txn.Commit(context.Background())

To retrieve a value identified by key:

data, found, rev, err := db.GetValue(key)
if err == nil && found {
   ...
}

To retrieve all values matching a key prefix:

itr, err := db.ListValues(key)
if err != nil {
   for {
      data, allReceived, rev, err := itr.GetNext()
      if allReceived {
          break
      }
      if err != nil {
          return err
      }
      process data...
   }
}

To retrieve values in specified key range:

itr, err := db.ListValues(key)
if err != nil {
   for {
      data, rev, allReceived := itr.GetNext()
      if allReceived {
          break
      }
      process data...
   }
}

To list keys without fetching the values:

itr, err := db.ListKeys(prefix)
if err != nil {
   for {
      key, rev, allReceived := itr.GetNext()
      if allReceived {
          break
      }
      process key...
   }
}

To start watching changes in etcd:

respChan := make(chan keyval.BytesWatchResp, 0)
err = dbw.Watch(respChan, key)
if err != nil {
    os.Exit(1)
}
for {
     select {
         case resp := <-respChan:
            switch resp.GetChangeType() {
            case data.Put:
            key := resp.GetKey()
                value := resp.GetValue()
                rev := resp.GetRevision()
            case data.Delete:
                ...
            }
     }
}

BytesConnectionEtcd also allows to create proxy instances (BytesBrokerWatcherEtcd) using NewBroker and NewWatcher methods. Both of them accept the prefix argument. The prefix will be automatically prepended to all keys in put/delete requests made from the proxy instances. In case of get-like calls (GetValue, ListValues, ...) the prefix is trimmed from the key of the returned values. They contain only the part following the prefix in the key field. The created proxy instances share the connection of the BytesConnectionEtcd.

     +-----------------------+
     | BytesBrokerWatcherEtcd |
     +-----------------------+
             |
             |
              ----------------->   +-----------------------+       crud/watch         ______
			                          |  BytesConnectionEtcd  |       ---->             | ETCD |
              ----------------->   +-----------------------+        ([]byte)         +------+
             |
             |
     +------------------------+
     | BytesBrokerWatcherEtcd |
     +------------------------+

To create proxy instances, type:

prefixedBroker := db.NewBroker(prefix)
prefixedWatcher := db.NewWatcher(prefix)

The usage is the same as shown above.

The package also provides a proto decorator that simplifies the manipulation of proto modelled data. The proto decorator accepts arguments of type proto.message and marshals them into []byte slices.

+-------------------+--------------------+       crud/watch         ______
|  ProtoWrapperEtcd |  ProtoWrapperEtcd  |       ---->             | ETCD |
+-------------------+--------------------+        ([]byte)         +------+
  (proto.Message)

The ProtoWrapperEtcd API is very similar to the BytesConnectionEtcd API. The difference is that arguments of type []byte are replaced by arguments of type proto.Message, and in some case one of the return values is transformed into an output argument.

Example of the decorator initialization:

// conn is BytesConnectionEtcd initialized as shown at the top of the page
protoBroker := etcd.NewProtoWrapperEtcd(conn)

The only difference in Put/Delete functions is the type of the argument; apart from that the usage is the same as described above.

Example of retrieving single key-value pair using proto decorator:

// if the value exists it is unmarshalled into the msg
found, rev, err := protoBroker.GetValue(key, msg)

To retrieve all values matching the key prefix use

 resp, err := protoDb.ListValues(path)
 if err != nil {
    os.Exit(1)
 }

 for {
    // phonebook.Contact is a proto modelled structure (implementing proto.Message interface)
    contact := &phonebook.Contact{}
    // the value is unmarshaled into the contact variable
    kv, allReceived  := resp.GetNext()
    if allReceived {
       break
    }
    err = kv.GetValue(contact)
    if err != nil {
        os.Exit(1)
    }
    ... use contact
}

The Etcd plugin

plugin := etcd.Plugin{}
// initialization by agent core

Plugin allows to create a broker

broker := plugin.NewBroker(prefix)

and watcher

watcher := plugin.NewWatcher(prefix)

Index

Constants

This section is empty.

Variables

View Source
var DefaultPlugin = *NewPlugin()

DefaultPlugin is a default instance of Plugin.

Functions

This section is empty.

Types

type BytesBrokerWatcherEtcd

type BytesBrokerWatcherEtcd struct {
	logging.Logger
	// contains filtered or unexported fields
}

BytesBrokerWatcherEtcd uses BytesConnectionEtcd to access the datastore. The connection can be shared among multiple BytesBrokerWatcherEtcd. In case of accessing a particular subtree in etcd only, BytesBrokerWatcherEtcd allows defining a keyPrefix that is prepended to all keys in its methods in order to shorten keys used in arguments.

func (*BytesBrokerWatcherEtcd) CompareAndDelete

func (pdb *BytesBrokerWatcherEtcd) CompareAndDelete(key string, data []byte) (succeeded bool, err error)

CompareAndDelete compares the value currently stored under the given key with the expected <data>, and only if the expected and actual data match, the value is then removed from the datastore. The comparison and the value removal are executed together in a single transaction and cannot be interleaved with another operation for that key.

func (*BytesBrokerWatcherEtcd) CompareAndSwap

func (pdb *BytesBrokerWatcherEtcd) CompareAndSwap(key string, oldData, newData []byte) (swapped bool, err error)

CompareAndSwap compares the value currently stored under the given key with the expected <oldData>, and only if the expected and actual data match, the value is then changed to <newData>. The comparison and the value change are executed together in a single transaction and cannot be interleaved with another operation for that key.

func (*BytesBrokerWatcherEtcd) Delete

func (pdb *BytesBrokerWatcherEtcd) Delete(key string, opts ...datasync.DelOption) (existed bool, err error)

Delete calls 'Delete' function of the underlying BytesConnectionEtcd. KeyPrefix defined in constructor is prepended to the key argument.

func (*BytesBrokerWatcherEtcd) GetValue

func (pdb *BytesBrokerWatcherEtcd) GetValue(key string) (data []byte, found bool, revision int64, err error)

GetValue calls 'GetValue' function of the underlying BytesConnectionEtcd. KeyPrefix defined in constructor is prepended to the key argument.

func (*BytesBrokerWatcherEtcd) ListKeys

func (pdb *BytesBrokerWatcherEtcd) ListKeys(prefix string) (keyval.BytesKeyIterator, error)

ListKeys calls 'ListKeys' function of the underlying BytesConnectionEtcd. KeyPrefix defined in constructor is prepended to the argument.

func (*BytesBrokerWatcherEtcd) ListValues

ListValues calls 'ListValues' function of the underlying BytesConnectionEtcd. KeyPrefix defined in constructor is prepended to the key argument. The prefix is removed from the keys of the returned values.

func (*BytesBrokerWatcherEtcd) NewTxn

func (pdb *BytesBrokerWatcherEtcd) NewTxn() keyval.BytesTxn

NewTxn creates a new transaction. KeyPrefix defined in constructor will be prepended to all key arguments in the transaction.

func (*BytesBrokerWatcherEtcd) Put

func (pdb *BytesBrokerWatcherEtcd) Put(key string, data []byte, opts ...datasync.PutOption) error

Put calls 'Put' function of the underlying BytesConnectionEtcd. KeyPrefix defined in constructor is prepended to the key argument.

func (*BytesBrokerWatcherEtcd) PutIfNotExists

func (pdb *BytesBrokerWatcherEtcd) PutIfNotExists(key string, data []byte) (succeeded bool, err error)

PutIfNotExists puts given key-value pair into etcd if there is no value set for the key. If the put was successful succeeded is true. If the key already exists succeeded is false and the value for the key is untouched.

func (*BytesBrokerWatcherEtcd) Watch

func (pdb *BytesBrokerWatcherEtcd) Watch(resp func(keyval.BytesWatchResp), closeChan chan string, keys ...string) error

Watch starts subscription for changes associated with the selected <keys>. KeyPrefix defined in constructor is prepended to all <keys> in the argument list. The prefix is removed from the keys returned in watch events. Watch events will be delivered to <resp> callback.

type BytesConnectionEtcd

type BytesConnectionEtcd struct {
	logging.Logger
	// contains filtered or unexported fields
}

BytesConnectionEtcd encapsulates the connection to etcd. It provides API to read/edit and watch values from etcd.

func NewEtcdConnectionUsingClient

func NewEtcdConnectionUsingClient(etcdClient *clientv3.Client, log logging.Logger) (*BytesConnectionEtcd, error)

NewEtcdConnectionUsingClient creates a new instance of BytesConnectionEtcd using the provided etcd client. This constructor is used primarily for testing.

func NewEtcdConnectionWithBytes

func NewEtcdConnectionWithBytes(config ClientConfig, log logging.Logger) (*BytesConnectionEtcd, error)

NewEtcdConnectionWithBytes creates new connection to etcd based on the given config file.

func (*BytesConnectionEtcd) CampaignInElection

func (db *BytesConnectionEtcd) CampaignInElection(ctx context.Context, prefix string) (func(c context.Context), error)

CampaignInElection starts campaign in leader election on a given prefix. Multiple instances can compete on a given prefix. Only one can be elected as leader at a time. The function call blocks until either context is canceled or the caller is elected as leader. Upon successful call a resign callback, that can be used to resign - trigger new election, is returned.

func (*BytesConnectionEtcd) Close

func (db *BytesConnectionEtcd) Close() error

Close closes the connection to ETCD.

func (*BytesConnectionEtcd) Compact

func (db *BytesConnectionEtcd) Compact(rev ...int64) (int64, error)

Compact compacts the ETCD database to specific revision

func (*BytesConnectionEtcd) CompareAndDelete

func (db *BytesConnectionEtcd) CompareAndDelete(key string, data []byte) (succeeded bool, err error)

CompareAndDelete compares the value currently stored under the given key with the expected <data>, and only if the expected and actual data match, the value is then removed from the datastore. The comparison and the value removal are executed together in a single transaction and cannot be interleaved with another operation for that key.

func (*BytesConnectionEtcd) CompareAndSwap

func (db *BytesConnectionEtcd) CompareAndSwap(key string, oldData, newData []byte) (succeeded bool, err error)

CompareAndSwap compares the value currently stored under the given key with the expected <oldData>, and only if the expected and actual data match, the value is then changed to <newData>. The comparison and the value change are executed together in a single transaction and cannot be interleaved with another operation for that key.

func (*BytesConnectionEtcd) Delete

func (db *BytesConnectionEtcd) Delete(key string, opts ...datasync.DelOption) (existed bool, err error)

Delete removes data identified by the <key>.

func (*BytesConnectionEtcd) GetRevision

func (db *BytesConnectionEtcd) GetRevision() (revision int64, err error)

GetRevision returns current revision of ETCD database

func (*BytesConnectionEtcd) GetValue

func (db *BytesConnectionEtcd) GetValue(key string) (data []byte, found bool, revision int64, err error)

GetValue retrieves one key-value item from the data store. The item is identified by the provided <key>.

func (*BytesConnectionEtcd) GetValueRev

func (db *BytesConnectionEtcd) GetValueRev(key string, rev int64) (data []byte, found bool, revision int64, err error)

GetValueRev retrieves one key-value item from the data store. The item is identified by the provided <key>.

func (*BytesConnectionEtcd) ListKeys

func (db *BytesConnectionEtcd) ListKeys(prefix string) (keyval.BytesKeyIterator, error)

ListKeys returns an iterator that allows traversing all keys from data store that share the given <prefix>.

func (*BytesConnectionEtcd) ListValues

ListValues returns an iterator that enables traversing values stored under the provided <key>.

func (*BytesConnectionEtcd) ListValuesRange

func (db *BytesConnectionEtcd) ListValuesRange(fromPrefix string, toPrefix string) (keyval.BytesKeyValIterator, error)

ListValuesRange returns an iterator that enables traversing values stored under the keys from a given range.

func (*BytesConnectionEtcd) NewBroker

func (db *BytesConnectionEtcd) NewBroker(prefix string) keyval.BytesBroker

NewBroker creates a new instance of a proxy that provides access to etcd. The proxy will reuse the connection from BytesConnectionEtcd. <prefix> will be prepended to the key argument in all calls from the created BytesBrokerWatcherEtcd. To avoid using a prefix, pass keyval. Root constant as an argument.

func (*BytesConnectionEtcd) NewTxn

func (db *BytesConnectionEtcd) NewTxn() keyval.BytesTxn

NewTxn creates a new transaction. A transaction can hold multiple operations that are all committed to the data store together. After a transaction has been created, one or more operations (put or delete) can be added to the transaction before it is committed.

func (*BytesConnectionEtcd) NewWatcher

func (db *BytesConnectionEtcd) NewWatcher(prefix string) keyval.BytesWatcher

NewWatcher creates a new instance of a proxy that provides access to etcd. The proxy will reuse the connection from BytesConnectionEtcd. <prefix> will be prepended to the key argument in all calls on created BytesBrokerWatcherEtcd. To avoid using a prefix, pass keyval. Root constant as an argument.

func (*BytesConnectionEtcd) Put

func (db *BytesConnectionEtcd) Put(key string, binData []byte, opts ...datasync.PutOption) error

Put writes the provided key-value item into the data store. Returns an error if the item could not be written, nil otherwise.

func (*BytesConnectionEtcd) PutIfNotExists

func (db *BytesConnectionEtcd) PutIfNotExists(key string, data []byte) (succeeded bool, err error)

PutIfNotExists puts given key-value pair into etcd if there is no value set for the key. If the put was successful succeeded is true. If the key already exists succeeded is false and the value for the key is untouched.

func (*BytesConnectionEtcd) Watch

func (db *BytesConnectionEtcd) Watch(resp func(keyval.BytesWatchResp), closeChan chan string, keys ...string) error

Watch starts subscription for changes associated with the selected keys. Watch events will be delivered to <resp> callback. closeCh is a channel closed when Close method is called.It is leveraged to stop go routines from specific subscription, or only goroutine with provided key prefix

type BytesWatchDelResp

type BytesWatchDelResp struct {
	// contains filtered or unexported fields
}

BytesWatchDelResp is sent when a key-value pair has been removed.

func NewBytesWatchDelResp

func NewBytesWatchDelResp(key string, prevValue []byte, revision int64) *BytesWatchDelResp

NewBytesWatchDelResp creates an instance of BytesWatchDelResp.

func (*BytesWatchDelResp) GetChangeType

func (resp *BytesWatchDelResp) GetChangeType() datasync.Op

GetChangeType returns "Delete" for BytesWatchPutResp.

func (*BytesWatchDelResp) GetKey

func (resp *BytesWatchDelResp) GetKey() string

GetKey returns the key that a value has been deleted from.

func (*BytesWatchDelResp) GetPrevValue

func (resp *BytesWatchDelResp) GetPrevValue() []byte

GetPrevValue returns previous value for BytesWatchDelResp.

func (*BytesWatchDelResp) GetRevision

func (resp *BytesWatchDelResp) GetRevision() int64

GetRevision returns the revision associated with the 'delete' operation.

func (*BytesWatchDelResp) GetValue

func (resp *BytesWatchDelResp) GetValue() []byte

GetValue returns nil for BytesWatchDelResp.

type BytesWatchPutResp

type BytesWatchPutResp struct {
	// contains filtered or unexported fields
}

BytesWatchPutResp is sent when new key-value pair has been inserted or the value has been updated.

func NewBytesWatchPutResp

func NewBytesWatchPutResp(key string, value []byte, prevValue []byte, revision int64) *BytesWatchPutResp

NewBytesWatchPutResp creates an instance of BytesWatchPutResp.

func (*BytesWatchPutResp) GetChangeType

func (resp *BytesWatchPutResp) GetChangeType() datasync.Op

GetChangeType returns "Put" for BytesWatchPutResp.

func (*BytesWatchPutResp) GetKey

func (resp *BytesWatchPutResp) GetKey() string

GetKey returns the key that the value has been inserted under.

func (*BytesWatchPutResp) GetPrevValue

func (resp *BytesWatchPutResp) GetPrevValue() []byte

GetPrevValue returns the previous value that has been inserted.

func (*BytesWatchPutResp) GetRevision

func (resp *BytesWatchPutResp) GetRevision() int64

GetRevision returns the revision associated with the 'put' operation.

func (*BytesWatchPutResp) GetValue

func (resp *BytesWatchPutResp) GetValue() []byte

GetValue returns the value that has been inserted.

type ClientConfig

type ClientConfig struct {
	*clientv3.Config

	// OpTimeout is the maximum amount of time the client will wait for a pending
	// operation before timing out.
	OpTimeout time.Duration

	// SessionTTL is default TTL (in seconds) used by client in RunElection or WithClientLifetimeTTL put option.
	// Once given number of seconds elapses without value's lease renewal the value is removed.
	SessionTTL int

	// ExpandEnvVars, if enabled, will cause the JSON value serializer to replace ${var} or $var in received JSON
	// data according to the values of the current environment variables. References to undefined variables are replaced
	// by the empty string.
	ExpandEnvVars bool
}

ClientConfig extends clientv3.Config with configuration options introduced by this package.

func ConfigToClient

func ConfigToClient(yc *Config) (*ClientConfig, error)

ConfigToClient transforms yaml configuration <yc> modelled by Config into ClientConfig, which is ready for use with the underlying coreos/etcd package. If the etcd endpoint addresses are not specified in the configuration, the function will query the ETCD_ENDPOINTS environment variable for a non-empty value. If neither the config nor the environment specify the endpoint location, a default address "127.0.0.1:2379" is assumed. The function may return error only if TLS connection is selected and the CA or client certificate is not accessible/valid.

type Config

type Config struct {
	Endpoints             []string      `json:"endpoints"`
	DialTimeout           time.Duration `json:"dial-timeout"`
	OpTimeout             time.Duration `json:"operation-timeout"`
	InsecureTransport     bool          `json:"insecure-transport"`
	InsecureSkipTLSVerify bool          `json:"insecure-skip-tls-verify"`
	Certfile              string        `json:"cert-file"`
	Keyfile               string        `json:"key-file"`
	CAfile                string        `json:"ca-file"`
	AutoCompact           time.Duration `json:"auto-compact"`
	ReconnectResync       bool          `json:"resync-after-reconnect"`
	AllowDelayedStart     bool          `json:"allow-delayed-start"`
	ReconnectInterval     time.Duration `json:"reconnect-interval"`
	SessionTTL            int           `json:"session-ttl"`
	ExpandEnvVars         bool          `json:"expand-env-variables"`
}

Config represents a part of the etcd configuration that can be loaded from a file. Usually, the Config is next transformed into ClientConfig using ConfigToClient() function for use with the coreos/etcd package.

type Deps

type Deps struct {
	infra.PluginDeps
	StatusCheck statuscheck.PluginStatusWriter // inject
	Resync      *resync.Plugin
	Serializer  keyval.Serializer // optional, by default the JSON serializer is used
}

Deps lists dependencies of the etcd plugin. If injected, etcd plugin will use StatusCheck to signal the connection status.

type Option

type Option func(*Plugin)

Option is a function that can be used in NewPlugin to customize Plugin.

func UseDeps

func UseDeps(cb func(*Deps)) Option

UseDeps returns Option that can inject custom dependencies.

type Plugin

type Plugin struct {
	Deps

	sync.Mutex
	// contains filtered or unexported fields
}

Plugin implements etcd plugin.

func NewPlugin

func NewPlugin(opts ...Option) *Plugin

NewPlugin creates a new Plugin with the provided Options.

func (*Plugin) AfterInit

func (p *Plugin) AfterInit() error

AfterInit registers ETCD plugin to status check if needed

func (*Plugin) CampaignInElection

func (p *Plugin) CampaignInElection(ctx context.Context, prefix string) (func(context.Context), error)

CampaignInElection starts campaign in leader election on a given prefix. Multiple instances can compete on a given prefix. Only one can be elected as leader at a time. The function call blocks until either context is canceled or the caller is elected as leader. Upon successful call a resign callback that triggers new election is returned.

func (*Plugin) Close

func (p *Plugin) Close() error

Close shutdowns the connection.

func (*Plugin) Compact

func (p *Plugin) Compact(rev ...int64) (toRev int64, err error)

Compact compatcs the ETCD database to the specific revision

func (*Plugin) Disabled

func (p *Plugin) Disabled() (disabled bool)

Disabled returns *true* if the plugin is not in use due to missing etcd configuration.

func (*Plugin) GetPluginName

func (p *Plugin) GetPluginName() infra.PluginName

GetPluginName returns name of the plugin

func (*Plugin) Init

func (p *Plugin) Init() (err error)

Init retrieves ETCD configuration and establishes a new connection with the etcd data store. If the configuration file doesn't exist or cannot be read, the returned error will be of os.PathError type. An untyped error is returned in case the file doesn't contain a valid YAML configuration. The function may also return error if TLS connection is selected and the CA or client certificate is not accessible(os.PathError)/valid(untyped). Check clientv3.New from coreos/etcd for possible errors returned in case the connection cannot be established.

func (*Plugin) NewBroker

func (p *Plugin) NewBroker(keyPrefix string) keyval.ProtoBroker

NewBroker creates new instance of prefixed broker that provides API with arguments of type proto.Message.

func (*Plugin) NewBrokerWithAtomic

func (p *Plugin) NewBrokerWithAtomic(keyPrefix string) keyval.BytesBrokerWithAtomic

NewBrokerWithAtomic creates new instance of prefixed (byte-oriented) broker with atomic operations. It is equivalent to: RawAccess().NewBroker(keyPrefix).(keyval.BytesBrokerWithAtomic), but the presence of this method can be used as a compile-time check for the support of atomic operations (of an injected dependency).

func (*Plugin) NewWatcher

func (p *Plugin) NewWatcher(keyPrefix string) keyval.ProtoWatcher

NewWatcher creates new instance of prefixed broker that provides API with arguments of type proto.Message.

func (*Plugin) OnConnect

func (p *Plugin) OnConnect(callback func() error)

OnConnect executes callback if plugin is connected, or gathers functions from all plugin with ETCD as dependency

func (*Plugin) RawAccess

func (p *Plugin) RawAccess() keyval.KvBytesPlugin

RawAccess allows to access data in the database as raw bytes (i.e. not formatted by protobuf).

Directories

Path Synopsis
Package mocks implements an embedded etcd mock used in unit & integration tests.
Package mocks implements an embedded etcd mock used in unit & integration tests.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL