Documentation ¶
Overview ¶
Package etcd implements the distributed key value store integration. This also takes care of managing and clustering the embedded etcd server. The elastic etcd algorithm works in the following way: * When you start up mgmt, you can pass it a list of seeds. * If no seeds are given, then assume you are the first server and startup. * If a seed is given, connect as a client, and optionally volunteer to be a server. * All volunteering clients should listen for a message from the master for nomination. * If a client has been nominated, it should startup a server. * All servers should list for their nomination to be removed and shutdown if so. * The elected leader should decide who to nominate/unnominate to keep the right number of servers.
Smoke testing: mkdir /tmp/mgmt{A..E} ./mgmt run --yaml examples/etcd1a.yaml --hostname h1 --tmp-prefix --no-pgp ./mgmt run --yaml examples/etcd1b.yaml --hostname h2 --tmp-prefix --no-pgp --seeds http://127.0.0.1:2379 --client-urls http://127.0.0.1:2381 --server-urls http://127.0.0.1:2382 ./mgmt run --yaml examples/etcd1c.yaml --hostname h3 --tmp-prefix --no-pgp --seeds http://127.0.0.1:2379 --client-urls http://127.0.0.1:2383 --server-urls http://127.0.0.1:2384 ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2379 put /_mgmt/idealClusterSize 3 ./mgmt run --yaml examples/etcd1d.yaml --hostname h4 --tmp-prefix --no-pgp --seeds http://127.0.0.1:2379 --client-urls http://127.0.0.1:2385 --server-urls http://127.0.0.1:2386 ./mgmt run --yaml examples/etcd1e.yaml --hostname h5 --tmp-prefix --no-pgp --seeds http://127.0.0.1:2379 --client-urls http://127.0.0.1:2387 --server-urls http://127.0.0.1:2388 ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2379 member list ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2381 put /_mgmt/idealClusterSize 5 ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2381 member list
Index ¶
- Constants
- Variables
- func AddHostnameConvergedWatcher(obj *EmbdEtcd, callbackFn func(map[string]bool) error) (func(), error)
- func AdvertiseEndpoints(obj *EmbdEtcd, urls etcdtypes.URLs) error
- func ApplyDeltaEvents(re *RE, urlsmap etcdtypes.URLsMap) (etcdtypes.URLsMap, error)
- func Endpoints(obj *EmbdEtcd) (etcdtypes.URLsMap, error)
- func GetClusterSize(obj *EmbdEtcd) (uint16, error)
- func GetResources(obj *EmbdEtcd, hostnameFilter, kindFilter []string) ([]resources.Res, error)
- func GetStr(obj *EmbdEtcd, key string) (string, error)
- func GetStrMap(obj *EmbdEtcd, hostnameFilter []string, key string) (map[string]string, error)
- func HostnameConverged(obj *EmbdEtcd) (map[string]bool, error)
- func Leader(obj *EmbdEtcd) (string, error)
- func MemberAdd(obj *EmbdEtcd, peerURLs etcdtypes.URLs) (*etcd.MemberAddResponse, error)
- func MemberRemove(obj *EmbdEtcd, mID uint64) (bool, error)
- func Members(obj *EmbdEtcd) (map[uint64]string, error)
- func Nominate(obj *EmbdEtcd, hostname string, urls etcdtypes.URLs) error
- func Nominated(obj *EmbdEtcd) (etcdtypes.URLsMap, error)
- func SetClusterSize(obj *EmbdEtcd, value uint16) error
- func SetHostnameConverged(obj *EmbdEtcd, hostname string, isConverged bool) error
- func SetResources(obj *EmbdEtcd, hostname string, resourceList []resources.Res) error
- func SetStr(obj *EmbdEtcd, key string, data *string) error
- func SetStrMap(obj *EmbdEtcd, hostname, key string, data *string) error
- func Volunteer(obj *EmbdEtcd, urls etcdtypes.URLs) error
- func Volunteers(obj *EmbdEtcd) (etcdtypes.URLsMap, error)
- func WatchResources(obj *EmbdEtcd) chan error
- func WatchStr(obj *EmbdEtcd, key string) chan error
- func WatchStrMap(obj *EmbdEtcd, key string) chan error
- type AW
- type CtxDelayErr
- type CtxPermanentErr
- type CtxReconnectErr
- type CtxRetriesErr
- type DL
- type EmbdEtcd
- func (obj *EmbdEtcd) AddWatcher(path string, callback func(re *RE) error, errCheck bool, skipConv bool, ...) (func(), error)
- func (obj *EmbdEtcd) CancelCtx(ctx context.Context) (context.Context, func())
- func (obj *EmbdEtcd) CbLoop()
- func (obj *EmbdEtcd) ComplexGet(path string, skipConv bool, opts ...etcd.OpOption) (map[string]string, error)
- func (obj *EmbdEtcd) Connect(reconnect bool) error
- func (obj *EmbdEtcd) CtxError(ctx context.Context, err error) (context.Context, error)
- func (obj *EmbdEtcd) Delete(path string, opts ...etcd.OpOption) (int64, error)
- func (obj *EmbdEtcd) Destroy() error
- func (obj *EmbdEtcd) DestroyServer() error
- func (obj *EmbdEtcd) Get(path string, opts ...etcd.OpOption) (map[string]string, error)
- func (obj *EmbdEtcd) GetConfig() etcd.Config
- func (obj *EmbdEtcd) LocalhostClientURLs() etcdtypes.URLs
- func (obj *EmbdEtcd) Loop()
- func (obj *EmbdEtcd) ServerReady() <-chan struct{}
- func (obj *EmbdEtcd) Set(key, value string, opts ...etcd.OpOption) error
- func (obj *EmbdEtcd) StartServer(newCluster bool, peerURLsMap etcdtypes.URLsMap) error
- func (obj *EmbdEtcd) Startup() error
- func (obj *EmbdEtcd) TimeoutCtx(ctx context.Context, t time.Duration) (context.Context, func())
- func (obj *EmbdEtcd) Txn(ifcmps []etcd.Cmp, thenops, elseops []etcd.Op) (*etcd.TxnResponse, error)
- type Flags
- type GQ
- type KV
- type RE
- type TN
- type World
- func (obj *World) ResCollect(hostnameFilter, kindFilter []string) ([]resources.Res, error)
- func (obj *World) ResExport(resourceList []resources.Res) error
- func (obj *World) ResWatch() chan error
- func (obj *World) StrDel(namespace string) error
- func (obj *World) StrGet(namespace string) (string, error)
- func (obj *World) StrIsNotExist(err error) bool
- func (obj *World) StrMapDel(namespace string) error
- func (obj *World) StrMapGet(namespace string) (map[string]string, error)
- func (obj *World) StrMapSet(namespace, value string) error
- func (obj *World) StrMapWatch(namespace string) chan error
- func (obj *World) StrSet(namespace, value string) error
- func (obj *World) StrWatch(namespace string) chan error
Constants ¶
const ( NS = "_mgmt" // root namespace for mgmt operations MaxStartServerTimeout = 60 // max number of seconds to wait for server to start MaxStartServerRetries = 3 // number of times to retry starting the etcd server DefaultIdealClusterSize = 5 // default ideal cluster size target for initial seed DefaultClientURL = "127.0.0.1:2379" DefaultServerURL = "127.0.0.1:2380" )
constant parameters which may need to be tweaked or customized
Variables ¶
var ErrNotExist = errors.New("errNotExist")
ErrNotExist is returned when GetStr can not find the requested key. TODO: https://dave.cheney.net/2016/04/07/constant-errors
Functions ¶
func AddHostnameConvergedWatcher ¶
func AddHostnameConvergedWatcher(obj *EmbdEtcd, callbackFn func(map[string]bool) error) (func(), error)
AddHostnameConvergedWatcher adds a watcher with a callback that runs on hostname state changes.
func AdvertiseEndpoints ¶
AdvertiseEndpoints advertises the list of available client endpoints.
func ApplyDeltaEvents ¶
ApplyDeltaEvents modifies a URLsMap with the deltas from a WatchResponse.
func GetClusterSize ¶
GetClusterSize gets the ideal target cluster size of etcd peers.
func GetResources ¶
GetResources collects all of the resources which match a filter from etcd. If the kindfilter or hostnameFilter is empty, then it assumes no filtering... TODO: Expand this with a more powerful filter based on what we eventually support in our collect DSL. Ideally a server side filter like WithFilter() We could do this if the pattern was /$NS/exported/$kind/$hostname/$uid = $data.
func HostnameConverged ¶
HostnameConverged returns a map of every hostname's converged state.
func MemberRemove ¶
MemberRemove removes a member by mID and returns if it worked, and also if there was an error. This is because it might have run without error, but the member wasn't found, for example.
func Members ¶
Members returns information on cluster membership. The member ID's are the keys, because an empty names means unstarted! TODO: consider queueing this through the main loop with CtxError(ctx, err)
func Nominated ¶
Nominated returns a urls map of nominated etcd server volunteers. NOTE: I know 'nominees' might be more correct, but is less consistent here
func SetClusterSize ¶
SetClusterSize sets the ideal target cluster size of etcd peers.
func SetHostnameConverged ¶
SetHostnameConverged sets whether a specific hostname is converged.
func SetResources ¶
SetResources exports all of the resources which we pass in to etcd.
func SetStr ¶
SetStr sets a key and hostname pair to a certain value. If the value is nil, then it deletes the key. Otherwise the value should point to a string. TODO: TTL or delete disconnect?
func SetStrMap ¶
SetStrMap sets a key and hostname pair to a certain value. If the value is nil, then it deletes the key. Otherwise the value should point to a string. TODO: TTL or delete disconnect?
func Volunteers ¶
Volunteers returns a urls map of available etcd server volunteers.
func WatchResources ¶
WatchResources returns a channel that outputs events when exported resources change. TODO: Filter our watch (on the server side if possible) based on the collection prefixes and filters that we care about...
func WatchStr ¶
WatchStr returns a channel which spits out events on key activity. FIXME: It should close the channel when it's done, and spit out errors when something goes wrong.
func WatchStrMap ¶
WatchStrMap returns a channel which spits out events on key activity. FIXME: It should close the channel when it's done, and spit out errors when something goes wrong.
Types ¶
type AW ¶
type AW struct {
// contains filtered or unexported fields
}
AW is a struct for the AddWatcher queue.
type CtxDelayErr ¶
CtxDelayErr requests a retry in Delta duration.
func (*CtxDelayErr) Error ¶
func (obj *CtxDelayErr) Error() string
type CtxPermanentErr ¶
type CtxPermanentErr struct {
Message string
}
CtxPermanentErr is a permanent failure error to notify about borkage.
func (*CtxPermanentErr) Error ¶
func (obj *CtxPermanentErr) Error() string
type CtxReconnectErr ¶
type CtxReconnectErr struct {
Message string
}
CtxReconnectErr requests a client reconnect to the new endpoint list.
func (*CtxReconnectErr) Error ¶
func (obj *CtxReconnectErr) Error() string
type CtxRetriesErr ¶
CtxRetriesErr lets you retry as long as you have retries available. TODO: consider combining this with CtxDelayErr
func (*CtxRetriesErr) Error ¶
func (obj *CtxRetriesErr) Error() string
type DL ¶
type DL struct {
// contains filtered or unexported fields
}
DL is a struct for the delete queue.
type EmbdEtcd ¶
type EmbdEtcd struct {
// contains filtered or unexported fields
}
EmbdEtcd provides the embedded server and client etcd functionality.
func NewEmbdEtcd ¶
func NewEmbdEtcd(hostname string, seeds, clientURLs, serverURLs, advertiseClientURLs, advertiseServerURLs etcdtypes.URLs, noServer bool, idealClusterSize uint16, flags Flags, prefix string, converger converger.Converger) *EmbdEtcd
NewEmbdEtcd creates the top level embedded etcd struct client and server obj.
func (*EmbdEtcd) AddWatcher ¶
func (obj *EmbdEtcd) AddWatcher(path string, callback func(re *RE) error, errCheck bool, skipConv bool, opts ...etcd.OpOption) (func(), error)
AddWatcher queues up an add watcher request and returns a cancel function. Remember to add the etcd.WithPrefix() option if you want to watch recursively.
func (*EmbdEtcd) CbLoop ¶
func (obj *EmbdEtcd) CbLoop()
CbLoop is the loop where callback execution is serialized.
func (*EmbdEtcd) ComplexGet ¶
func (obj *EmbdEtcd) ComplexGet(path string, skipConv bool, opts ...etcd.OpOption) (map[string]string, error)
ComplexGet performs a get operation and waits for an ACK to continue. It can accept more arguments that are useful for the less common operations. TODO: perhaps a get should never cause an un-converge ?
func (*EmbdEtcd) Connect ¶
Connect connects the client to a server, and then builds the *API structs. If reconnect is true, it will force a reconnect with new config endpoints.
func (*EmbdEtcd) CtxError ¶
CtxError is called whenever there is a connection or other client problem that needs to be resolved before we can continue, eg: connection disconnected, change of server to connect to, etc... It modifies the context if needed.
func (*EmbdEtcd) Destroy ¶
Destroy cleans up the entire embedded etcd system. Use DestroyServer if you only want to shutdown the embedded server portion.
func (*EmbdEtcd) DestroyServer ¶
DestroyServer shuts down the embedded etcd server portion.
func (*EmbdEtcd) GetConfig ¶
GetConfig returns the config struct to be used for the etcd client connect.
func (*EmbdEtcd) LocalhostClientURLs ¶
LocalhostClientURLs returns the most localhost like URLs for direct connection. This gets clients to talk to the local servers first before searching remotely.
func (*EmbdEtcd) Loop ¶
func (obj *EmbdEtcd) Loop()
Loop is the main loop where everything is serialized.
func (*EmbdEtcd) ServerReady ¶
func (obj *EmbdEtcd) ServerReady() <-chan struct{}
ServerReady returns on a channel when the server has started successfully.
func (*EmbdEtcd) StartServer ¶
StartServer kicks of a new embedded etcd server.
func (*EmbdEtcd) Startup ¶
Startup is the main entry point to kick off the embedded etcd client & server.
func (*EmbdEtcd) TimeoutCtx ¶
TimeoutCtx adds a tracked cancel function with timeout around an existing context.
type Flags ¶
type Flags struct { Debug bool // add additional log messages Trace bool // add execution flow log messages Verbose bool // add extra log message output }
Flags are some constant flags which are used throughout the program.
type GQ ¶
type GQ struct {
// contains filtered or unexported fields
}
GQ is a struct for the get queue.
type KV ¶
type KV struct {
// contains filtered or unexported fields
}
KV is a key + value struct to hold the two items together.
type RE ¶
type RE struct {
// contains filtered or unexported fields
}
RE is a response + error struct since these two values often occur together. This is now called an event with the move to the etcd v3 API.
type TN ¶
type TN struct {
// contains filtered or unexported fields
}
TN is a struct for the txn queue.
type World ¶
World is an etcd backed implementation of the World interface.
func (*World) ResCollect ¶
ResCollect gets the collection of exported resources which match the filter. It does this atomically so that a call always returns a complete collection.
func (*World) ResExport ¶
ResExport exports a list of resources under our hostname namespace. Subsequent calls replace the previously set collection atomically.
func (*World) ResWatch ¶
ResWatch returns a channel which spits out events on possible exported resource changes.
func (*World) StrIsNotExist ¶
StrIsNotExist returns whether the error from StrGet is a key missing error.
func (*World) StrMapSet ¶
StrMapSet sets the namespace value to a particular string under the identity of its own hostname.
func (*World) StrMapWatch ¶
StrMapWatch returns a channel which spits out events on possible string changes.