Documentation ¶
Overview ¶
Package etcd implements the distributed key value store and fs integration. This also takes care of managing and clustering of the embedded etcd server. The automatic clustering is considered experimental. If you require a more robust, battle-test etcd cluster, then manage your own, and point each mgmt agent at it with --seeds and --no-server.
Algorithm ¶
The elastic etcd algorithm works in the following way:
* When you start up mgmt, you can pass it a list of seeds.
* If no seeds are given, then assume you are the first server and startup.
* If a seed is given, connect as a client, and volunteer to be a server.
* All volunteering clients should listen for a message for nomination.
* If a client has been nominated, it should startup a server.
* A server should shutdown if its nomination is removed.
* The elected leader should decide who to nominate/unnominate as needed.
Notes ¶
If you attempt to add a new member to the cluster with a duplicate hostname, then the behaviour is undefined, and you could bork your cluster. This is not recommended or supported. Please ensure that your hostnames are unique.
A single ^C requests an orderly shutdown, however a third ^C will ask etcd to shutdown forcefully. It is not recommended that you use this option, it exists as a way to make exit easier if something deadlocked the cluster. If this was due to user error (eg: duplicate hostnames) then it was your fault, but if the member did not shutdown from a single ^C under normal circumstances, then please file a bug.
There are currently some races in this implementation. In practice, this should not cause any adverse effects unless you simultaneously add or remove members at a high rate. Fixing these races will probably require some internal changes to etcd. Help is welcome if you're interested in working on this.
Smoke testing ¶
Here is a simple way to test etcd clustering basics...
./mgmt run --tmp-prefix --no-pgp --hostname h1 empty ./mgmt run --tmp-prefix --no-pgp --hostname h2 --seeds=http://127.0.0.1:2379 --client-urls=http://127.0.0.1:2381 --server-urls=http://127.0.0.1:2382 empty ./mgmt run --tmp-prefix --no-pgp --hostname h3 --seeds=http://127.0.0.1:2379 --client-urls=http://127.0.0.1:2383 --server-urls=http://127.0.0.1:2384 empty ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2379 put /_mgmt/chooser/dynamicsize/idealclustersize 3 ./mgmt run --tmp-prefix --no-pgp --hostname h4 --seeds=http://127.0.0.1:2379 --client-urls=http://127.0.0.1:2385 --server-urls=http://127.0.0.1:2386 empty ./mgmt run --tmp-prefix --no-pgp --hostname h5 --seeds=http://127.0.0.1:2379 --client-urls=http://127.0.0.1:2387 --server-urls=http://127.0.0.1:2388 empty ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2379 member list ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2381 put /_mgmt/chooser/dynamicsize/idealclustersize 5 ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2381 member list
Bugs ¶
A member might occasionally think that an endpoint still exists after it has already shutdown. This isn't a major issue, since if that endpoint doesn't respond, then it will automatically choose the next available one. To see this issue, turn on debugging and start: H1, H2, H3, then stop H2, and you might see that H3 still knows about H2.
Shutting down a cluster by setting the idealclustersize to zero is currently buggy and not supported. Try this at your own risk.
If a member is nominated, and it doesn't respond to the nominate event and startup, and we lost quorum to add it, then we could be in a blocked state. This can be improved upon if we can call memberRemove after a timeout.
Adding new cluster members very quickly, might trigger a: `runtime error: error validating peerURLs ... member count is unequal` error. See: https://github.com/etcd-io/etcd/issues/10626 for more information.
If you use the dynamic size feature to start and stop the server process, once it has already started and then stopped, it can't be re-started because of a bug in etcd that doesn't free the port. Instead you'll get a: `bind: address already in use` error. See: https://github.com/etcd-io/etcd/issues/6042 for more information.
Index ¶
- Constants
- type EmbdEtcd
- func (obj *EmbdEtcd) Close() error
- func (obj *EmbdEtcd) ConnectBlock(ctx context.Context, fn func(context.Context) error) <-chan error
- func (obj *EmbdEtcd) Destroy() error
- func (obj *EmbdEtcd) Exited() <-chan struct{}
- func (obj *EmbdEtcd) Init() error
- func (obj *EmbdEtcd) Interrupt() error
- func (obj *EmbdEtcd) MakeClient() (interfaces.Client, error)
- func (obj *EmbdEtcd) MakeClientFromNamespace(ns string) (interfaces.Client, error)
- func (obj *EmbdEtcd) Ready() <-chan struct{}
- func (obj *EmbdEtcd) Run() error
- func (obj *EmbdEtcd) ServerExited() (<-chan struct{}, func())
- func (obj *EmbdEtcd) ServerReady() (<-chan struct{}, func())
- func (obj *EmbdEtcd) Validate() error
- type World
- func (obj *World) Fs(uri string) (engine.Fs, error)
- func (obj *World) IdealClusterSizeGet(ctx context.Context) (uint16, error)
- func (obj *World) IdealClusterSizeSet(ctx context.Context, size uint16) (bool, error)
- func (obj *World) IdealClusterSizeWatch(ctx context.Context) (chan error, error)
- func (obj *World) ResCollect(ctx context.Context, hostnameFilter, kindFilter []string) ([]engine.Res, error)
- func (obj *World) ResExport(ctx context.Context, resourceList []engine.Res) error
- func (obj *World) ResWatch(ctx context.Context) (chan error, error)
- func (obj *World) Scheduler(namespace string, opts ...scheduler.Option) (*scheduler.Result, error)
- func (obj *World) StrDel(ctx context.Context, namespace string) error
- func (obj *World) StrGet(ctx context.Context, namespace string) (string, error)
- func (obj *World) StrIsNotExist(err error) bool
- func (obj *World) StrMapDel(ctx context.Context, namespace string) error
- func (obj *World) StrMapGet(ctx context.Context, namespace string) (map[string]string, error)
- func (obj *World) StrMapSet(ctx context.Context, namespace, value string) error
- func (obj *World) StrMapWatch(ctx context.Context, namespace string) (chan error, error)
- func (obj *World) StrSet(ctx context.Context, namespace, value string) error
- func (obj *World) StrWatch(ctx context.Context, namespace string) (chan error, error)
- func (obj *World) URI() string
- func (obj *World) WatchMembers(ctx context.Context) (<-chan *interfaces.MembersResult, error)
Constants ¶
const ( // NominatedPath is the unprefixed path under which nominated hosts are // stored. This is public so that other consumers can know to avoid this // key prefix. NominatedPath = "/nominated/" // VolunteerPath is the unprefixed path under which volunteering hosts // are stored. This is public so that other consumers can know to avoid // this key prefix. VolunteerPath = "/volunteer/" // EndpointsPath is the unprefixed path under which the advertised host // endpoints are stored. This is public so that other consumers can know // to avoid this key prefix. EndpointsPath = "/endpoints/" // ChooserPath is the unprefixed path under which the chooser algorithm // may store data. This is public so that other consumers can know to // avoid this key prefix. ChooserPath = "/chooser" // all hosts share the same namespace // ConvergedPath is the unprefixed path under which the converger // may store data. This is public so that other consumers can know to // avoid this key prefix. ConvergedPath = "/converged/" // SchedulerPath is the unprefixed path under which the scheduler // may store data. This is public so that other consumers can know to // avoid this key prefix. SchedulerPath = "/scheduler/" // DefaultClientURL is the default value that is used for client URLs. // It is pulled from the upstream etcd package. DefaultClientURL = embed.DefaultListenClientURLs // 127.0.0.1:2379 // DefaultServerURL is the default value that is used for server URLs. // It is pulled from the upstream etcd package. DefaultServerURL = embed.DefaultListenPeerURLs // 127.0.0.1:2380 // DefaultMaxTxnOps is the maximum number of operations to run in a // single etcd transaction. If you exceed this limit, it is possible // that you have either an extremely large code base, or that you have // some code which is possibly not as efficient as it could be. Let us // know so that we can analyze the situation, and increase this if // necessary. DefaultMaxTxnOps = 512 // RunStartupTimeout is the amount of time we will wait for regular run // startup before cancelling it all. RunStartupTimeout = 30 * time.Second // ClientDialTimeout is the DialTimeout option in the client config. ClientDialTimeout = 5 * time.Second // ClientDialKeepAliveTime is the DialKeepAliveTime config value for the // etcd client. It is recommended that you use this so that dead // endpoints don't block any cluster operations. ClientDialKeepAliveTime = 2 * time.Second // from etcdctl // ClientDialKeepAliveTimeout is the DialKeepAliveTimeout config value // for the etcd client. It is recommended that you use this so that dead // endpoints don't block any cluster operations. ClientDialKeepAliveTimeout = 6 * time.Second // from etcdctl // MemberChangeInterval is the polling interval to use when watching for // member changes during add or remove. MemberChangeInterval = 500 * time.Millisecond // SelfRemoveTimeout gives unnominated members a chance to self exit. SelfRemoveTimeout = 10 * time.Second // ForceExitTimeout is the amount of time we will wait for a force exit // to occur before cancelling it all. ForceExitTimeout = 15 * time.Second // SessionTTL is the number of seconds to wait before a dead or // unresponsive host has their volunteer keys removed from the cluster. // This should be an integer multiple of seconds, since one second is // the TTL precision used in etcd. SessionTTL = 10 * time.Second // seconds // RequireLeaderCtx specifies whether the volunteer loop should use the // WithRequireLeader ctx wrapper. It is unknown at this time if this // would cause occasional events to be lost, more extensive testing is // needed. RequireLeaderCtx = false // ConvergerHostnameNamespace is a unique key used in the converger. ConvergerHostnameNamespace = "etcd-hostname" )
const ( // MaxServerStartTimeout is the amount of time to wait for the server // to start before considering it a failure. If you hit this timeout, // let us know so that we can analyze the situation, and increase this // if necessary. MaxServerStartTimeout = 60 * time.Second // MaxServerCloseTimeout is the maximum amount of time we'll wait for // the server to close down. If it exceeds this, it's probably a bug. MaxServerCloseTimeout = 15 * time.Second // MaxServerRetries is the maximum number of times we can try to restart // the server if it fails on startup. This can help workaround some // timing bugs in etcd. MaxServerRetries = 5 // ServerRetryWait is the amount of time to wait between retries. ServerRetryWait = 500 * time.Millisecond )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type EmbdEtcd ¶
type EmbdEtcd struct { Hostname string // Seeds is the list of servers that this client could connect to. Seeds etcdtypes.URLs // ClientURLs are the locations to listen for clients if i am a server. ClientURLs etcdtypes.URLs // ServerURLs are the locations to listen for servers (peers) if i am a // server (peer). ServerURLs etcdtypes.URLs // AClientURLs are the client urls to advertise. AClientURLs etcdtypes.URLs // AServerURLscare the server (peer) urls to advertise. AServerURLs etcdtypes.URLs // NoServer disables all server peering for this host. // TODO: allow changing this at runtime with some function call? NoServer bool // NoNetwork causes this to use unix:// sockets instead of TCP for // connections. NoNetwork bool // Chooser is the implementation of the algorithm that decides which // hosts to add or remove to grow and shrink the cluster. Chooser chooser.Chooser // Converger is a converged coordinator object that can be used to // track the converged state. Converger *converger.Coordinator // NS is a string namespace that we prefix to every key operation. NS string // Prefix is the directory where any etcd related state is stored. It // must be an absolute directory path. Prefix string Debug bool Logf func(format string, v ...interface{}) // contains filtered or unexported fields }
EmbdEtcd provides the embedded server and client etcd functionality.
func (*EmbdEtcd) ConnectBlock ¶
ConnectBlock runs a command as soon as the client is connected. When this happens, it closes the output channel. In case any error occurs, it sends it on that channel.
func (*EmbdEtcd) Destroy ¶
Destroy cleans up the entire embedded etcd system. Use DestroyServer if you only want to shutdown the embedded server portion.
func (*EmbdEtcd) Exited ¶
func (obj *EmbdEtcd) Exited() <-chan struct{}
Exited returns a channel that closes when we've destroyed. This process happens after Run exits. If Run is never called, this will never happen.
func (*EmbdEtcd) Init ¶
Init initializes the struct after it has been populated as desired. You must not use the struct if this returns an error.
func (*EmbdEtcd) Interrupt ¶
Interrupt causes this member to force shutdown. It does not safely wait for an ordered shutdown. It is not recommended to use this unless you're borked.
func (*EmbdEtcd) MakeClient ¶
func (obj *EmbdEtcd) MakeClient() (interfaces.Client, error)
MakeClient returns an etcd Client interface that is suitable for basic tasks. Don't run this until the Ready method has acknowledged.
func (*EmbdEtcd) MakeClientFromNamespace ¶
func (obj *EmbdEtcd) MakeClientFromNamespace(ns string) (interfaces.Client, error)
MakeClientFromNamespace returns an etcd Client interface that is suitable for basic tasks and that has a key namespace prefix. // Don't run this until the Ready method has acknowledged.
func (*EmbdEtcd) Ready ¶
func (obj *EmbdEtcd) Ready() <-chan struct{}
Ready returns a channel that closes when we're up and running. This process happens when calling Run. If Run is never called, this will never happen. Our main startup must be running, and our client must be connected to get here.
func (*EmbdEtcd) Run ¶
Run is the main entry point to kick off the embedded etcd client and server. It blocks until we've exited for shutdown. The shutdown can be triggered by calling Destroy.
func (*EmbdEtcd) ServerExited ¶
func (obj *EmbdEtcd) ServerExited() (<-chan struct{}, func())
ServerExited returns a channel that closes when the server is destroyed. This process happens after runServer exits. If runServer is never called, this will never happen. It also returns a cancel/ack function which must be called once the signal is received or we are done watching it. This is because this is a cyclical signal which happens, and then gets reset as the server starts up, shuts down, and repeats the cycle. The cancel/ack function ensures that we only watch a signal when it's ready to be read, and only reset it when we are done watching it.
func (*EmbdEtcd) ServerReady ¶
func (obj *EmbdEtcd) ServerReady() (<-chan struct{}, func())
ServerReady returns a channel that closes when we're up and running. This process happens when calling runServer. If runServer is never called, this will never happen. It also returns a cancel/ack function which must be called once the signal is received or we are done watching it. This is because this is a cyclical signal which happens, and then gets reset as the server starts up, shuts down, and repeats the cycle. The cancel/ack function ensures that we only watch a signal when it's ready to be read, and only reset it when we are done watching it.
type World ¶
type World struct { Hostname string // uuid for the consumer of these Client interfaces.Client MetadataPrefix string // expected metadata prefix StoragePrefix string // storage prefix for etcdfs storage StandaloneFs engine.Fs // store an fs here for local usage GetURI func() string Debug bool Logf func(format string, v ...interface{}) }
World is an etcd backed implementation of the World interface.
func (*World) Fs ¶
Fs returns a distributed file system from a unique URI. For single host execution that doesn't span more than a single host, this file system might actually be a local or memory backed file system, so actually only distributed within the boredom that is a single host cluster.
func (*World) IdealClusterSizeGet ¶
IdealClusterSizeGet gets the cluster-wide dynamic cluster size setpoint.
func (*World) IdealClusterSizeSet ¶
IdealClusterSizeSet sets the cluster-wide dynamic cluster size setpoint.
func (*World) IdealClusterSizeWatch ¶
IdealClusterSizeWatch returns a stream of errors anytime the cluster-wide dynamic cluster size setpoint changes.
func (*World) ResCollect ¶
func (obj *World) ResCollect(ctx context.Context, hostnameFilter, kindFilter []string) ([]engine.Res, error)
ResCollect gets the collection of exported resources which match the filter. It does this atomically so that a call always returns a complete collection.
func (*World) ResExport ¶
ResExport exports a list of resources under our hostname namespace. Subsequent calls replace the previously set collection atomically.
func (*World) ResWatch ¶
ResWatch returns a channel which spits out events on possible exported resource changes.
func (*World) Scheduler ¶
Scheduler returns a scheduling result of hosts in a particular namespace. XXX: Add a context.Context here
func (*World) StrIsNotExist ¶
StrIsNotExist returns whether the error from StrGet is a key missing error.
func (*World) StrMapSet ¶
StrMapSet sets the namespace value to a particular string under the identity of its own hostname.
func (*World) StrMapWatch ¶
StrMapWatch returns a channel which spits out events on possible string changes.
func (*World) StrSet ¶
StrSet sets the namespace value to a particular string. XXX: This can overwrite another hosts value that was set with StrMapSet. Add possible cryptographic signing or special namespacing to prevent such things.
func (*World) StrWatch ¶
StrWatch returns a channel which spits out events on possible string changes.
func (*World) URI ¶
URI returns the current FS URI. TODO: Can we improve this API or deprecate it entirely?
func (*World) WatchMembers ¶
func (obj *World) WatchMembers(ctx context.Context) (<-chan *interfaces.MembersResult, error)
WatchMembers returns a channel of changing members in the cluster.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package fs implements a very simple and limited file system on top of etcd.
|
Package fs implements a very simple and limited file system on top of etcd. |
Package scheduler implements a distributed consensus scheduler with etcd.
|
Package scheduler implements a distributed consensus scheduler with etcd. |