etcd

package
v2.0.0+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2019 License: Apache-2.0 Imports: 20 Imported by: 123

README

Etcd plugin

The Etcd plugin provides access to an etcd key-value data store.

Configuration

  • Location of the Etcd configuration file can be defined either by the command line flag etcd-config or set via the ETCD_CONFIG environment variable.

Status Check

  • If injected, Etcd plugin will use StatusCheck plugin to periodically issue a minimalistic GET request to check for the status of the connection. The etcd connection state affects the global status of the agent. If agent cannot establish connection with etcd, both the readiness and the liveness probe from the probe plugin will return a negative result (accessible only via REST API in such case).

Compacting

You can compact Etcd using two ways.

  • using API by calling plugin.Compact() which will compact the database to the current revision.
  • using config file by setting auto-compact option to the duration of period that you want the Etcd to be compacted.

Reconnect resynchronization

  • If connection to the ETCD is interrupted, resync can be automatically called after re-connection. This option is disabled by default and has to be allowed in the etcd.conf file.

    Set resync-after-reconnect to true to enable the feature.

Documentation

Overview

Package etcd implements the key-value Data Broker client API for the etcd key-value data store. See cn-infra/db/keyval for the definition of the key-value Data Broker client API.

The entity that provides access to the data store is called BytesConnectionEtcd.

+-----------------------+       crud/watch         ______
|  BytesConnectionEtcd  |          ---->          | ETCD |
+-----------------------+        []byte           +------+

To create a BytesConnectionEtcd, use the following function

import  "github.com/ligato/cn-infra/db/keyval/etcd"

db := etcd.NewEtcdConnectionWithBytes(config)

config is a path to a file with the following format:

key-file: <filepath>
ca-file: <filepath>
cert-file: <filepath>
insecure-skip-tls-verify: <bool>
insecure-transport: <bool>
dial-timeout: <nanoseconds>
operation-timeout: <nanoseconds>
endpoints:
  - <address_1>:<port>
  - <address_2>:<port>
  - ..
  - <address_n>:<port>

Connection to etcd is established using the provided config behind the scenes.

Alternatively, you may connect to etcd by yourself and initialize the connection object with a given client.

db := etcd.NewEtcdConnectionUsingClient(client)

Created BytesConnectionEtcd implements Broker and KeyValProtoWatcher interfaces. The example of use can be seen below.

To insert single key-value pair into etcd run:

db.Put(key, data)

To remove a value identified by key:

datasync.Delete(key)

In addition to single key-value pair approach, the transaction API is provided. Transaction executes multiple operations in a more efficient way than one by one execution.

// create new transaction
txn := db.NewTxn()

// add put operation into the transaction
txn.Put(key, value)

// add delete operation into the transaction
txn.Delete(key, value)

// try to commit the transaction
err := txn.Commit(context.Background())

To retrieve a value identified by key:

data, found, rev, err := db.GetValue(key)
if err == nil && found {
   ...
}

To retrieve all values matching a key prefix:

itr, err := db.ListValues(key)
if err != nil {
   for {
      data, allReceived, rev, err := itr.GetNext()
      if allReceived {
          break
      }
      if err != nil {
          return err
      }
      process data...
   }
}

To retrieve values in specified key range:

itr, err := db.ListValues(key)
if err != nil {
   for {
      data, rev, allReceived := itr.GetNext()
      if allReceived {
          break
      }
      process data...
   }
}

To list keys without fetching the values:

itr, err := db.ListKeys(prefix)
if err != nil {
   for {
      key, rev, allReceived := itr.GetNext()
      if allReceived {
          break
      }
      process key...
   }
}

To start watching changes in etcd:

respChan := make(chan keyval.BytesWatchResp, 0)
err = dbw.Watch(respChan, key)
if err != nil {
    os.Exit(1)
}
for {
     select {
         case resp := <-respChan:
            switch resp.GetChangeType() {
            case data.Put:
            key := resp.GetKey()
                value := resp.GetValue()
                rev := resp.GetRevision()
            case data.Delete:
                ...
            }
     }
}

BytesConnectionEtcd also allows to create proxy instances (BytesBrokerWatcherEtcd) using NewBroker and NewWatcher methods. Both of them accept the prefix argument. The prefix will be automatically prepended to all keys in put/delete requests made from the proxy instances. In case of get-like calls (GetValue, ListValues, ...) the prefix is trimmed from the key of the returned values. They contain only the part following the prefix in the key field. The created proxy instances share the connection of the BytesConnectionEtcd.

     +-----------------------+
     | BytesBrokerWatcherEtcd |
     +-----------------------+
             |
             |
              ----------------->   +-----------------------+       crud/watch         ______
			                          |  BytesConnectionEtcd  |       ---->             | ETCD |
              ----------------->   +-----------------------+        ([]byte)         +------+
             |
             |
     +------------------------+
     | BytesBrokerWatcherEtcd |
     +------------------------+

To create proxy instances, type:

prefixedBroker := db.NewBroker(prefix)
prefixedWatcher := db.NewWatcher(prefix)

The usage is the same as shown above.

The package also provides a proto decorator that simplifies the manipulation of proto modelled data. The proto decorator accepts arguments of type proto.message and marshals them into []byte slices.

+-------------------+--------------------+       crud/watch         ______
|  ProtoWrapperEtcd |  ProtoWrapperEtcd  |       ---->             | ETCD |
+-------------------+--------------------+        ([]byte)         +------+
  (proto.Message)

The ProtoWrapperEtcd API is very similar to the BytesConnectionEtcd API. The difference is that arguments of type []byte are replaced by arguments of type proto.Message, and in some case one of the return values is transformed into an output argument.

Example of the decorator initialization:

// conn is BytesConnectionEtcd initialized as shown at the top of the page
protoBroker := etcd.NewProtoWrapperEtcd(conn)

The only difference in Put/Delete functions is the type of the argument; apart from that the usage is the same as described above.

Example of retrieving single key-value pair using proto decorator:

// if the value exists it is unmarshalled into the msg
found, rev, err := protoBroker.GetValue(key, msg)

To retrieve all values matching the key prefix use

 resp, err := protoDb.ListValues(path)
 if err != nil {
    os.Exit(1)
 }

 for {
    // phonebook.Contact is a proto modelled structure (implementing proto.Message interface)
    contact := &phonebook.Contact{}
    // the value is unmarshaled into the contact variable
    kv, allReceived  := resp.GetNext()
    if allReceived {
       break
    }
    err = kv.GetValue(contact)
    if err != nil {
        os.Exit(1)
    }
    ... use contact
}

The Etcd plugin

plugin := etcd.Plugin{}
// initialization by agent core

Plugin allows to create a broker

broker := plugin.NewBroker(prefix)

and watcher

watcher := plugin.NewWatcher(prefix)

Index

Constants

This section is empty.

Variables

View Source
var DefaultPlugin = *NewPlugin()

DefaultPlugin is a default instance of Plugin.

Functions

This section is empty.

Types

type BytesBrokerWatcherEtcd

type BytesBrokerWatcherEtcd struct {
	logging.Logger
	// contains filtered or unexported fields
}

BytesBrokerWatcherEtcd uses BytesConnectionEtcd to access the datastore. The connection can be shared among multiple BytesBrokerWatcherEtcd. In case of accessing a particular subtree in etcd only, BytesBrokerWatcherEtcd allows defining a keyPrefix that is prepended to all keys in its methods in order to shorten keys used in arguments.

func (*BytesBrokerWatcherEtcd) Delete

func (pdb *BytesBrokerWatcherEtcd) Delete(key string, opts ...datasync.DelOption) (existed bool, err error)

Delete calls 'Delete' function of the underlying BytesConnectionEtcd. KeyPrefix defined in constructor is prepended to the key argument.

func (*BytesBrokerWatcherEtcd) GetValue

func (pdb *BytesBrokerWatcherEtcd) GetValue(key string) (data []byte, found bool, revision int64, err error)

GetValue calls 'GetValue' function of the underlying BytesConnectionEtcd. KeyPrefix defined in constructor is prepended to the key argument.

func (*BytesBrokerWatcherEtcd) ListKeys

func (pdb *BytesBrokerWatcherEtcd) ListKeys(prefix string) (keyval.BytesKeyIterator, error)

ListKeys calls 'ListKeys' function of the underlying BytesConnectionEtcd. KeyPrefix defined in constructor is prepended to the argument.

func (*BytesBrokerWatcherEtcd) ListValues

ListValues calls 'ListValues' function of the underlying BytesConnectionEtcd. KeyPrefix defined in constructor is prepended to the key argument. The prefix is removed from the keys of the returned values.

func (*BytesBrokerWatcherEtcd) NewTxn

func (pdb *BytesBrokerWatcherEtcd) NewTxn() keyval.BytesTxn

NewTxn creates a new transaction. KeyPrefix defined in constructor will be prepended to all key arguments in the transaction.

func (*BytesBrokerWatcherEtcd) Put

func (pdb *BytesBrokerWatcherEtcd) Put(key string, data []byte, opts ...datasync.PutOption) error

Put calls 'Put' function of the underlying BytesConnectionEtcd. KeyPrefix defined in constructor is prepended to the key argument.

func (*BytesBrokerWatcherEtcd) Watch

func (pdb *BytesBrokerWatcherEtcd) Watch(resp func(keyval.BytesWatchResp), closeChan chan string, keys ...string) error

Watch starts subscription for changes associated with the selected <keys>. KeyPrefix defined in constructor is prepended to all <keys> in the argument list. The prefix is removed from the keys returned in watch events. Watch events will be delivered to <resp> callback.

type BytesConnectionEtcd

type BytesConnectionEtcd struct {
	logging.Logger
	// contains filtered or unexported fields
}

BytesConnectionEtcd encapsulates the connection to etcd. It provides API to read/edit and watch values from etcd.

func NewEtcdConnectionUsingClient

func NewEtcdConnectionUsingClient(etcdClient *clientv3.Client, log logging.Logger) (*BytesConnectionEtcd, error)

NewEtcdConnectionUsingClient creates a new instance of BytesConnectionEtcd using the provided etcd client. This constructor is used primarily for testing.

func NewEtcdConnectionWithBytes

func NewEtcdConnectionWithBytes(config ClientConfig, log logging.Logger) (*BytesConnectionEtcd, error)

NewEtcdConnectionWithBytes creates new connection to etcd based on the given config file.

func (*BytesConnectionEtcd) Close

func (db *BytesConnectionEtcd) Close() error

Close closes the connection to ETCD.

func (*BytesConnectionEtcd) Compact

func (db *BytesConnectionEtcd) Compact(rev ...int64) (int64, error)

Compact compacts the ETCD database to specific revision

func (*BytesConnectionEtcd) Delete

func (db *BytesConnectionEtcd) Delete(key string, opts ...datasync.DelOption) (existed bool, err error)

Delete removes data identified by the <key>.

func (*BytesConnectionEtcd) GetRevision

func (db *BytesConnectionEtcd) GetRevision() (revision int64, err error)

GetRevision returns current revision of ETCD database

func (*BytesConnectionEtcd) GetValue

func (db *BytesConnectionEtcd) GetValue(key string) (data []byte, found bool, revision int64, err error)

GetValue retrieves one key-value item from the data store. The item is identified by the provided <key>.

func (*BytesConnectionEtcd) GetValueRev

func (db *BytesConnectionEtcd) GetValueRev(key string, rev int64) (data []byte, found bool, revision int64, err error)

GetValueRev retrieves one key-value item from the data store. The item is identified by the provided <key>.

func (*BytesConnectionEtcd) ListKeys

func (db *BytesConnectionEtcd) ListKeys(prefix string) (keyval.BytesKeyIterator, error)

ListKeys returns an iterator that allows traversing all keys from data store that share the given <prefix>.

func (*BytesConnectionEtcd) ListValues

ListValues returns an iterator that enables traversing values stored under the provided <key>.

func (*BytesConnectionEtcd) ListValuesRange

func (db *BytesConnectionEtcd) ListValuesRange(fromPrefix string, toPrefix string) (keyval.BytesKeyValIterator, error)

ListValuesRange returns an iterator that enables traversing values stored under the keys from a given range.

func (*BytesConnectionEtcd) NewBroker

func (db *BytesConnectionEtcd) NewBroker(prefix string) keyval.BytesBroker

NewBroker creates a new instance of a proxy that provides access to etcd. The proxy will reuse the connection from BytesConnectionEtcd. <prefix> will be prepended to the key argument in all calls from the created BytesBrokerWatcherEtcd. To avoid using a prefix, pass keyval. Root constant as an argument.

func (*BytesConnectionEtcd) NewTxn

func (db *BytesConnectionEtcd) NewTxn() keyval.BytesTxn

NewTxn creates a new transaction. A transaction can hold multiple operations that are all committed to the data store together. After a transaction has been created, one or more operations (put or delete) can be added to the transaction before it is committed.

func (*BytesConnectionEtcd) NewWatcher

func (db *BytesConnectionEtcd) NewWatcher(prefix string) keyval.BytesWatcher

NewWatcher creates a new instance of a proxy that provides access to etcd. The proxy will reuse the connection from BytesConnectionEtcd. <prefix> will be prepended to the key argument in all calls on created BytesBrokerWatcherEtcd. To avoid using a prefix, pass keyval. Root constant as an argument.

func (*BytesConnectionEtcd) Put

func (db *BytesConnectionEtcd) Put(key string, binData []byte, opts ...datasync.PutOption) error

Put writes the provided key-value item into the data store. Returns an error if the item could not be written, nil otherwise.

func (*BytesConnectionEtcd) PutIfNotExists

func (db *BytesConnectionEtcd) PutIfNotExists(key string, binData []byte) (succeeded bool, err error)

PutIfNotExists puts given key-value pair into etcd if there is no value set for the key. If the put was successful succeeded is true. If the key already exists succeeded is false and the value for the key is untouched.

func (*BytesConnectionEtcd) Watch

func (db *BytesConnectionEtcd) Watch(resp func(keyval.BytesWatchResp), closeChan chan string, keys ...string) error

Watch starts subscription for changes associated with the selected keys. Watch events will be delivered to <resp> callback. closeCh is a channel closed when Close method is called.It is leveraged to stop go routines from specific subscription, or only goroutine with provided key prefix

type BytesWatchDelResp

type BytesWatchDelResp struct {
	// contains filtered or unexported fields
}

BytesWatchDelResp is sent when a key-value pair has been removed.

func NewBytesWatchDelResp

func NewBytesWatchDelResp(key string, prevValue []byte, revision int64) *BytesWatchDelResp

NewBytesWatchDelResp creates an instance of BytesWatchDelResp.

func (*BytesWatchDelResp) GetChangeType

func (resp *BytesWatchDelResp) GetChangeType() datasync.Op

GetChangeType returns "Delete" for BytesWatchPutResp.

func (*BytesWatchDelResp) GetKey

func (resp *BytesWatchDelResp) GetKey() string

GetKey returns the key that a value has been deleted from.

func (*BytesWatchDelResp) GetPrevValue

func (resp *BytesWatchDelResp) GetPrevValue() []byte

GetPrevValue returns previous value for BytesWatchDelResp.

func (*BytesWatchDelResp) GetRevision

func (resp *BytesWatchDelResp) GetRevision() int64

GetRevision returns the revision associated with the 'delete' operation.

func (*BytesWatchDelResp) GetValue

func (resp *BytesWatchDelResp) GetValue() []byte

GetValue returns nil for BytesWatchDelResp.

type BytesWatchPutResp

type BytesWatchPutResp struct {
	// contains filtered or unexported fields
}

BytesWatchPutResp is sent when new key-value pair has been inserted or the value has been updated.

func NewBytesWatchPutResp

func NewBytesWatchPutResp(key string, value []byte, prevValue []byte, revision int64) *BytesWatchPutResp

NewBytesWatchPutResp creates an instance of BytesWatchPutResp.

func (*BytesWatchPutResp) GetChangeType

func (resp *BytesWatchPutResp) GetChangeType() datasync.Op

GetChangeType returns "Put" for BytesWatchPutResp.

func (*BytesWatchPutResp) GetKey

func (resp *BytesWatchPutResp) GetKey() string

GetKey returns the key that the value has been inserted under.

func (*BytesWatchPutResp) GetPrevValue

func (resp *BytesWatchPutResp) GetPrevValue() []byte

GetPrevValue returns the previous value that has been inserted.

func (*BytesWatchPutResp) GetRevision

func (resp *BytesWatchPutResp) GetRevision() int64

GetRevision returns the revision associated with the 'put' operation.

func (*BytesWatchPutResp) GetValue

func (resp *BytesWatchPutResp) GetValue() []byte

GetValue returns the value that has been inserted.

type ClientConfig

type ClientConfig struct {
	*clientv3.Config

	// OpTimeout is the maximum amount of time the client will wait for a pending
	// operation before timing out.
	OpTimeout time.Duration
}

ClientConfig extends clientv3.Config with configuration options introduced by this package.

func ConfigToClient

func ConfigToClient(yc *Config) (*ClientConfig, error)

ConfigToClient transforms yaml configuration <yc> modelled by Config into ClientConfig, which is ready for use with the underlying coreos/etcd package. If the etcd endpoint addresses are not specified in the configuration, the function will query the ETCD_ENDPOINTS environment variable for a non-empty value. If neither the config nor the environment specify the endpoint location, a default address "127.0.0.1:2379" is assumed. The function may return error only if TLS connection is selected and the CA or client certificate is not accessible/valid.

type Config

type Config struct {
	Endpoints             []string      `json:"endpoints"`
	DialTimeout           time.Duration `json:"dial-timeout"`
	OpTimeout             time.Duration `json:"operation-timeout"`
	InsecureTransport     bool          `json:"insecure-transport"`
	InsecureSkipTLSVerify bool          `json:"insecure-skip-tls-verify"`
	Certfile              string        `json:"cert-file"`
	Keyfile               string        `json:"key-file"`
	CAfile                string        `json:"ca-file"`
	AutoCompact           time.Duration `json:"auto-compact"`
	ReconnectResync       bool          `json:"resync-after-reconnect"`
	AllowDelayedStart     bool          `json:"allow-delayed-start"`
	ReconnectInterval     time.Duration `json:"reconnect-interval"`
}

Config represents a part of the etcd configuration that can be loaded from a file. Usually, the Config is next transformed into ClientConfig using ConfigToClient() function for use with the coreos/etcd package.

type Deps

type Deps struct {
	infra.PluginDeps
	StatusCheck statuscheck.PluginStatusWriter // inject
	Resync      *resync.Plugin
}

Deps lists dependencies of the etcd plugin. If injected, etcd plugin will use StatusCheck to signal the connection status.

type Option added in v1.5.0

type Option func(*Plugin)

Option is a function that can be used in NewPlugin to customize Plugin.

func UseDeps added in v1.5.0

func UseDeps(cb func(*Deps)) Option

UseDeps returns Option that can inject custom dependencies.

type Plugin

type Plugin struct {
	Deps

	sync.Mutex
	// contains filtered or unexported fields
}

Plugin implements etcd plugin.

func NewPlugin added in v1.5.0

func NewPlugin(opts ...Option) *Plugin

NewPlugin creates a new Plugin with the provided Options.

func (*Plugin) AfterInit

func (p *Plugin) AfterInit() error

AfterInit registers ETCD plugin to status check if needed

func (*Plugin) Close

func (p *Plugin) Close() error

Close shutdowns the connection.

func (*Plugin) Compact

func (p *Plugin) Compact(rev ...int64) (toRev int64, err error)

Compact compatcs the ETCD database to the specific revision

func (*Plugin) Disabled

func (p *Plugin) Disabled() (disabled bool)

Disabled returns *true* if the plugin is not in use due to missing etcd configuration.

func (*Plugin) GetPluginName added in v1.5.0

func (p *Plugin) GetPluginName() infra.PluginName

GetPluginName returns name of the plugin

func (*Plugin) Init

func (p *Plugin) Init() (err error)

Init retrieves ETCD configuration and establishes a new connection with the etcd data store. If the configuration file doesn't exist or cannot be read, the returned error will be of os.PathError type. An untyped error is returned in case the file doesn't contain a valid YAML configuration. The function may also return error if TLS connection is selected and the CA or client certificate is not accessible(os.PathError)/valid(untyped). Check clientv3.New from coreos/etcd for possible errors returned in case the connection cannot be established.

func (*Plugin) NewBroker

func (p *Plugin) NewBroker(keyPrefix string) keyval.ProtoBroker

NewBroker creates new instance of prefixed broker that provides API with arguments of type proto.Message.

func (*Plugin) NewWatcher

func (p *Plugin) NewWatcher(keyPrefix string) keyval.ProtoWatcher

NewWatcher creates new instance of prefixed broker that provides API with arguments of type proto.Message.

func (*Plugin) OnConnect added in v1.5.0

func (p *Plugin) OnConnect(callback func() error)

OnConnect executes callback if plugin is connected, or gathers functions from all plugin with ETCD as dependency

func (*Plugin) PutIfNotExists

func (p *Plugin) PutIfNotExists(key string, value []byte) (succeeded bool, err error)

PutIfNotExists puts given key-value pair into etcd if there is no value set for the key. If the put was successful succeeded is true. If the key already exists succeeded is false and the value for the key is untouched.

Directories

Path Synopsis
Package mocks implements an embedded etcd mock used in unit & integration tests.
Package mocks implements an embedded etcd mock used in unit & integration tests.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL