Documentation
¶
Overview ¶
Package etcd implements the distributed key value store integration. This also takes care of managing and clustering the embedded etcd server. The elastic etcd algorithm works in the following way: * When you start up mgmt, you can pass it a list of seeds. * If no seeds are given, then assume you are the first server and startup. * If a seed is given, connect as a client, and optionally volunteer to be a server. * All volunteering clients should listen for a message from the master for nomination. * If a client has been nominated, it should startup a server. * All servers should list for their nomination to be removed and shutdown if so. * The elected leader should decide who to nominate/unnominate to keep the right number of servers.
Smoke testing: mkdir /tmp/mgmt{A..E} ./mgmt run --yaml examples/etcd1a.yaml --hostname h1 --tmp-prefix --no-pgp ./mgmt run --yaml examples/etcd1b.yaml --hostname h2 --tmp-prefix --no-pgp --seeds http://127.0.0.1:2379 --client-urls http://127.0.0.1:2381 --server-urls http://127.0.0.1:2382 ./mgmt run --yaml examples/etcd1c.yaml --hostname h3 --tmp-prefix --no-pgp --seeds http://127.0.0.1:2379 --client-urls http://127.0.0.1:2383 --server-urls http://127.0.0.1:2384 ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2379 put /_mgmt/idealClusterSize 3 ./mgmt run --yaml examples/etcd1d.yaml --hostname h4 --tmp-prefix --no-pgp --seeds http://127.0.0.1:2379 --client-urls http://127.0.0.1:2385 --server-urls http://127.0.0.1:2386 ./mgmt run --yaml examples/etcd1e.yaml --hostname h5 --tmp-prefix --no-pgp --seeds http://127.0.0.1:2379 --client-urls http://127.0.0.1:2387 --server-urls http://127.0.0.1:2388 ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2379 member list ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2381 put /_mgmt/idealClusterSize 5 ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2381 member list
Index ¶
- Constants
- Variables
- func AddDeploy(obj Client, id uint64, hash, pHash string, data *string) error
- func AddHostnameConvergedWatcher(obj *EmbdEtcd, callbackFn func(map[string]bool) error) (func(), error)
- func AdvertiseEndpoints(obj *EmbdEtcd, urls etcdtypes.URLs) error
- func ApplyDeltaEvents(re *RE, urlsmap etcdtypes.URLsMap) (etcdtypes.URLsMap, error)
- func Endpoints(obj *EmbdEtcd) (etcdtypes.URLsMap, error)
- func GetClusterSize(obj *EmbdEtcd) (uint16, error)
- func GetDeploy(obj Client, id uint64) (string, error)
- func GetDeploys(obj Client) (map[uint64]string, error)
- func GetResources(obj *EmbdEtcd, hostnameFilter, kindFilter []string) ([]resources.Res, error)
- func GetStr(obj *EmbdEtcd, key string) (string, error)
- func GetStrMap(obj *EmbdEtcd, hostnameFilter []string, key string) (map[string]string, error)
- func HostnameConverged(obj *EmbdEtcd) (map[string]bool, error)
- func Leader(obj *EmbdEtcd) (string, error)
- func MemberAdd(obj *EmbdEtcd, peerURLs etcdtypes.URLs) (*etcd.MemberAddResponse, error)
- func MemberRemove(obj *EmbdEtcd, mID uint64) (bool, error)
- func Members(obj *EmbdEtcd) (map[uint64]string, error)
- func Nominate(obj *EmbdEtcd, hostname string, urls etcdtypes.URLs) error
- func Nominated(obj *EmbdEtcd) (etcdtypes.URLsMap, error)
- func SetClusterSize(obj *EmbdEtcd, value uint16) error
- func SetHostnameConverged(obj *EmbdEtcd, hostname string, isConverged bool) error
- func SetResources(obj *EmbdEtcd, hostname string, resourceList []resources.Res) error
- func SetStr(obj *EmbdEtcd, key string, data *string) error
- func SetStrMap(obj *EmbdEtcd, hostname, key string, data *string) error
- func Volunteer(obj *EmbdEtcd, urls etcdtypes.URLs) error
- func Volunteers(obj *EmbdEtcd) (etcdtypes.URLsMap, error)
- func WatchDeploy(obj *EmbdEtcd) chan error
- func WatchResources(obj *EmbdEtcd) chan error
- func WatchStr(obj *EmbdEtcd, key string) chan error
- func WatchStrMap(obj *EmbdEtcd, key string) chan error
- type AW
- type Client
- type ClientEtcd
- func (obj *ClientEtcd) Connect() error
- func (obj *ClientEtcd) Destroy() error
- func (obj *ClientEtcd) Get(path string, opts ...etcd.OpOption) (map[string]string, error)
- func (obj *ClientEtcd) GetClient() *etcd.Client
- func (obj *ClientEtcd) GetConfig() etcd.Config
- func (obj *ClientEtcd) Txn(ifcmps []etcd.Cmp, thenops, elseops []etcd.Op) (*etcd.TxnResponse, error)
- type CtxDelayErr
- type CtxPermanentErr
- type CtxReconnectErr
- type CtxRetriesErr
- type DL
- type EmbdEtcd
- func (obj *EmbdEtcd) AddWatcher(path string, callback func(re *RE) error, errCheck bool, skipConv bool, ...) (func(), error)
- func (obj *EmbdEtcd) CancelCtx(ctx context.Context) (context.Context, func())
- func (obj *EmbdEtcd) CbLoop()
- func (obj *EmbdEtcd) ComplexGet(path string, skipConv bool, opts ...etcd.OpOption) (map[string]string, error)
- func (obj *EmbdEtcd) Connect(reconnect bool) error
- func (obj *EmbdEtcd) CtxError(ctx context.Context, err error) (context.Context, error)
- func (obj *EmbdEtcd) Delete(path string, opts ...etcd.OpOption) (int64, error)
- func (obj *EmbdEtcd) Destroy() error
- func (obj *EmbdEtcd) DestroyServer() error
- func (obj *EmbdEtcd) Get(path string, opts ...etcd.OpOption) (map[string]string, error)
- func (obj *EmbdEtcd) GetClient() *etcd.Client
- func (obj *EmbdEtcd) GetConfig() etcd.Config
- func (obj *EmbdEtcd) LocalhostClientURLs() etcdtypes.URLs
- func (obj *EmbdEtcd) Loop()
- func (obj *EmbdEtcd) ServerReady() <-chan struct{}
- func (obj *EmbdEtcd) Set(key, value string, opts ...etcd.OpOption) error
- func (obj *EmbdEtcd) StartServer(newCluster bool, peerURLsMap etcdtypes.URLsMap) error
- func (obj *EmbdEtcd) Startup() error
- func (obj *EmbdEtcd) TimeoutCtx(ctx context.Context, t time.Duration) (context.Context, func())
- func (obj *EmbdEtcd) Txn(ifcmps []etcd.Cmp, thenops, elseops []etcd.Op) (*etcd.TxnResponse, error)
- type Flags
- type GQ
- type KV
- type RE
- type TN
- type World
- func (obj *World) Fs(uri string) (resources.Fs, error)
- func (obj *World) ResCollect(hostnameFilter, kindFilter []string) ([]resources.Res, error)
- func (obj *World) ResExport(resourceList []resources.Res) error
- func (obj *World) ResWatch() chan error
- func (obj *World) Scheduler(namespace string, opts ...scheduler.Option) (*scheduler.Result, error)
- func (obj *World) StrDel(namespace string) error
- func (obj *World) StrGet(namespace string) (string, error)
- func (obj *World) StrIsNotExist(err error) bool
- func (obj *World) StrMapDel(namespace string) error
- func (obj *World) StrMapGet(namespace string) (map[string]string, error)
- func (obj *World) StrMapSet(namespace, value string) error
- func (obj *World) StrMapWatch(namespace string) chan error
- func (obj *World) StrSet(namespace, value string) error
- func (obj *World) StrWatch(namespace string) chan error
Constants ¶
const ( NS = "/_mgmt" // root namespace for mgmt operations MaxStartServerTimeout = 60 // max number of seconds to wait for server to start MaxStartServerRetries = 3 // number of times to retry starting the etcd server DefaultIdealClusterSize = 5 // default ideal cluster size target for initial seed DefaultClientURL = "127.0.0.1:2379" DefaultServerURL = "127.0.0.1:2380" // DefaultMaxTxnOps is the maximum number of operations to run in a // single etcd transaction. If you exceed this limit, it is possible // that you have either an extremely large code base, or that you have // some code which is possibly not as efficient as it could be. Let us // know so that we can analyze the situation, and increase this if // necessary. DefaultMaxTxnOps = 512 )
constant parameters which may need to be tweaked or customized
Variables ¶
var ErrNotExist = errors.New("errNotExist")
ErrNotExist is returned when GetStr can not find the requested key. TODO: https://dave.cheney.net/2016/04/07/constant-errors
Functions ¶
func AddDeploy ¶
AddDeploy adds a new deploy. It takes an id and ensures it's sequential. If hash is not empty, then it will check that the pHash matches what the previous hash was, and also adds this new hash along side the id. This is useful to make sure you get a linear chain of git patches, and to avoid two contributors pushing conflicting deploys. This isn't git specific, and so any arbitrary string hash can be used. FIXME: prune old deploys from the store when they aren't needed anymore...
func AddHostnameConvergedWatcher ¶
func AddHostnameConvergedWatcher(obj *EmbdEtcd, callbackFn func(map[string]bool) error) (func(), error)
AddHostnameConvergedWatcher adds a watcher with a callback that runs on hostname state changes.
func AdvertiseEndpoints ¶
AdvertiseEndpoints advertises the list of available client endpoints.
func ApplyDeltaEvents ¶
ApplyDeltaEvents modifies a URLsMap with the deltas from a WatchResponse.
func GetClusterSize ¶
GetClusterSize gets the ideal target cluster size of etcd peers.
func GetDeploy ¶
GetDeploy gets the latest deploy if id == 0, otherwise it returns the deploy with the specified id if it exists. FIXME: implement this more efficiently so that it doesn't have to download *all* the old deploys from etcd!
func GetDeploys ¶
GetDeploys gets all the available deploys.
func GetResources ¶
GetResources collects all of the resources which match a filter from etcd. If the kindfilter or hostnameFilter is empty, then it assumes no filtering... TODO: Expand this with a more powerful filter based on what we eventually support in our collect DSL. Ideally a server side filter like WithFilter() We could do this if the pattern was $NS/exported/$kind/$hostname/$uid = $data.
func HostnameConverged ¶
HostnameConverged returns a map of every hostname's converged state.
func MemberRemove ¶
MemberRemove removes a member by mID and returns if it worked, and also if there was an error. This is because it might have run without error, but the member wasn't found, for example.
func Members ¶
Members returns information on cluster membership. The member ID's are the keys, because an empty names means unstarted! TODO: consider queueing this through the main loop with CtxError(ctx, err)
func Nominated ¶
Nominated returns a urls map of nominated etcd server volunteers. NOTE: I know 'nominees' might be more correct, but is less consistent here
func SetClusterSize ¶
SetClusterSize sets the ideal target cluster size of etcd peers.
func SetHostnameConverged ¶
SetHostnameConverged sets whether a specific hostname is converged.
func SetResources ¶
SetResources exports all of the resources which we pass in to etcd.
func SetStr ¶
SetStr sets a key and hostname pair to a certain value. If the value is nil, then it deletes the key. Otherwise the value should point to a string. TODO: TTL or delete disconnect?
func SetStrMap ¶
SetStrMap sets a key and hostname pair to a certain value. If the value is nil, then it deletes the key. Otherwise the value should point to a string. TODO: TTL or delete disconnect?
func Volunteers ¶
Volunteers returns a urls map of available etcd server volunteers.
func WatchDeploy ¶
WatchDeploy returns a channel which spits out events on new deploy activity. FIXME: It should close the channel when it's done, and spit out errors when something goes wrong.
func WatchResources ¶
WatchResources returns a channel that outputs events when exported resources change. TODO: Filter our watch (on the server side if possible) based on the collection prefixes and filters that we care about...
func WatchStr ¶
WatchStr returns a channel which spits out events on key activity. FIXME: It should close the channel when it's done, and spit out errors when something goes wrong. XXX: since the caller of this (via the World API) has no way to tell it it's done, does that mean we leak go-routines since it might still be running, but perhaps even blocked??? Could this cause a dead-lock? Should we instead return some sort of struct which has a close method with it to ask for a shutdown?
func WatchStrMap ¶
WatchStrMap returns a channel which spits out events on key activity. FIXME: It should close the channel when it's done, and spit out errors when something goes wrong.
Types ¶
type AW ¶
type AW struct {
// contains filtered or unexported fields
}
AW is a struct for the AddWatcher queue.
type Client ¶
type Client interface { // TODO: add more method signatures Get(path string, opts ...etcd.OpOption) (map[string]string, error) Txn(ifcmps []etcd.Cmp, thenops, elseops []etcd.Op) (*etcd.TxnResponse, error) }
Client provides a simple interface specification for client requests. Both EmbdEtcd and ClientEtcd implement this.
type ClientEtcd ¶
type ClientEtcd struct { Seeds []string // list of endpoints to try to connect // contains filtered or unexported fields }
ClientEtcd provides a simple etcd client for deploy and status operations.
func (*ClientEtcd) Connect ¶
func (obj *ClientEtcd) Connect() error
Connect connects the client to a server, and then builds the *API structs. If reconnect is true, it will force a reconnect with new config endpoints.
func (*ClientEtcd) Destroy ¶
func (obj *ClientEtcd) Destroy() error
Destroy cleans up the entire etcd client connection.
func (*ClientEtcd) Get ¶
Get runs a get on the client connection. This has the same signature as our EmbdEtcd Get function.
func (*ClientEtcd) GetClient ¶
func (obj *ClientEtcd) GetClient() *etcd.Client
GetClient returns a handle to the raw etcd client object.
func (*ClientEtcd) GetConfig ¶
func (obj *ClientEtcd) GetConfig() etcd.Config
GetConfig returns the config struct to be used for the etcd client connect.
func (*ClientEtcd) Txn ¶
func (obj *ClientEtcd) Txn(ifcmps []etcd.Cmp, thenops, elseops []etcd.Op) (*etcd.TxnResponse, error)
Txn runs a transaction on the client connection. This has the same signature as our EmbdEtcd Txn function.
type CtxDelayErr ¶
CtxDelayErr requests a retry in Delta duration.
func (*CtxDelayErr) Error ¶
func (obj *CtxDelayErr) Error() string
type CtxPermanentErr ¶
type CtxPermanentErr struct {
Message string
}
CtxPermanentErr is a permanent failure error to notify about borkage.
func (*CtxPermanentErr) Error ¶
func (obj *CtxPermanentErr) Error() string
type CtxReconnectErr ¶
type CtxReconnectErr struct {
Message string
}
CtxReconnectErr requests a client reconnect to the new endpoint list.
func (*CtxReconnectErr) Error ¶
func (obj *CtxReconnectErr) Error() string
type CtxRetriesErr ¶
CtxRetriesErr lets you retry as long as you have retries available. TODO: consider combining this with CtxDelayErr
func (*CtxRetriesErr) Error ¶
func (obj *CtxRetriesErr) Error() string
type DL ¶
type DL struct {
// contains filtered or unexported fields
}
DL is a struct for the delete queue.
type EmbdEtcd ¶
type EmbdEtcd struct {
// contains filtered or unexported fields
}
EmbdEtcd provides the embedded server and client etcd functionality.
func NewEmbdEtcd ¶
func NewEmbdEtcd(hostname string, seeds, clientURLs, serverURLs, advertiseClientURLs, advertiseServerURLs etcdtypes.URLs, noServer bool, idealClusterSize uint16, flags Flags, prefix string, converger converger.Converger) *EmbdEtcd
NewEmbdEtcd creates the top level embedded etcd struct client and server obj.
func (*EmbdEtcd) AddWatcher ¶
func (obj *EmbdEtcd) AddWatcher(path string, callback func(re *RE) error, errCheck bool, skipConv bool, opts ...etcd.OpOption) (func(), error)
AddWatcher queues up an add watcher request and returns a cancel function. Remember to add the etcd.WithPrefix() option if you want to watch recursively.
func (*EmbdEtcd) CbLoop ¶
func (obj *EmbdEtcd) CbLoop()
CbLoop is the loop where callback execution is serialized.
func (*EmbdEtcd) ComplexGet ¶
func (obj *EmbdEtcd) ComplexGet(path string, skipConv bool, opts ...etcd.OpOption) (map[string]string, error)
ComplexGet performs a get operation and waits for an ACK to continue. It can accept more arguments that are useful for the less common operations. TODO: perhaps a get should never cause an un-converge ?
func (*EmbdEtcd) Connect ¶
Connect connects the client to a server, and then builds the *API structs. If reconnect is true, it will force a reconnect with new config endpoints.
func (*EmbdEtcd) CtxError ¶
CtxError is called whenever there is a connection or other client problem that needs to be resolved before we can continue, eg: connection disconnected, change of server to connect to, etc... It modifies the context if needed.
func (*EmbdEtcd) Destroy ¶
Destroy cleans up the entire embedded etcd system. Use DestroyServer if you only want to shutdown the embedded server portion.
func (*EmbdEtcd) DestroyServer ¶
DestroyServer shuts down the embedded etcd server portion.
func (*EmbdEtcd) GetClient ¶
GetClient returns a handle to the raw etcd client object for those scenarios.
func (*EmbdEtcd) GetConfig ¶
GetConfig returns the config struct to be used for the etcd client connect.
func (*EmbdEtcd) LocalhostClientURLs ¶
LocalhostClientURLs returns the most localhost like URLs for direct connection. This gets clients to talk to the local servers first before searching remotely.
func (*EmbdEtcd) Loop ¶
func (obj *EmbdEtcd) Loop()
Loop is the main loop where everything is serialized.
func (*EmbdEtcd) ServerReady ¶
func (obj *EmbdEtcd) ServerReady() <-chan struct{}
ServerReady returns on a channel when the server has started successfully.
func (*EmbdEtcd) StartServer ¶
StartServer kicks of a new embedded etcd server.
func (*EmbdEtcd) Startup ¶
Startup is the main entry point to kick off the embedded etcd client & server.
func (*EmbdEtcd) TimeoutCtx ¶
TimeoutCtx adds a tracked cancel function with timeout around an existing context.
type Flags ¶
type Flags struct { Debug bool // add additional log messages Trace bool // add execution flow log messages Verbose bool // add extra log message output }
Flags are some constant flags which are used throughout the program.
type GQ ¶
type GQ struct {
// contains filtered or unexported fields
}
GQ is a struct for the get queue.
type KV ¶
type KV struct {
// contains filtered or unexported fields
}
KV is a key + value struct to hold the two items together.
type RE ¶
type RE struct {
// contains filtered or unexported fields
}
RE is a response + error struct since these two values often occur together. This is now called an event with the move to the etcd v3 API.
type TN ¶
type TN struct {
// contains filtered or unexported fields
}
TN is a struct for the txn queue.
type World ¶
type World struct { Hostname string // uuid for the consumer of these EmbdEtcd *EmbdEtcd MetadataPrefix string // expected metadata prefix StoragePrefix string // storage prefix for etcdfs storage StandaloneFs resources.Fs // store an fs here for local usage Debug bool Logf func(format string, v ...interface{}) }
World is an etcd backed implementation of the World interface.
func (*World) Fs ¶
Fs returns a distributed file system from a unique URI. For single host execution that doesn't span more than a single host, this file system might actually be a local or memory backed file system, so actually only distributed within the boredom that is a single host cluster.
func (*World) ResCollect ¶
ResCollect gets the collection of exported resources which match the filter. It does this atomically so that a call always returns a complete collection.
func (*World) ResExport ¶
ResExport exports a list of resources under our hostname namespace. Subsequent calls replace the previously set collection atomically.
func (*World) ResWatch ¶
ResWatch returns a channel which spits out events on possible exported resource changes.
func (*World) StrIsNotExist ¶
StrIsNotExist returns whether the error from StrGet is a key missing error.
func (*World) StrMapSet ¶
StrMapSet sets the namespace value to a particular string under the identity of its own hostname.
func (*World) StrMapWatch ¶
StrMapWatch returns a channel which spits out events on possible string changes.
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
Package fs implements a very simple and limited file system on top of etcd.
|
Package fs implements a very simple and limited file system on top of etcd. |
Package scheduler implements a distributed consensus scheduler with etcd.
|
Package scheduler implements a distributed consensus scheduler with etcd. |