etcd

package
v0.0.0-...-2561dba Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 15, 2024 License: GPL-3.0 Imports: 33 Imported by: 15

Documentation

Overview

Package etcd implements the distributed key value store and fs integration. This also takes care of managing and clustering of the embedded etcd server. The automatic clustering is considered experimental. If you require a more robust, battle-test etcd cluster, then manage your own, and point each mgmt agent at it with --seeds and --no-server.

Algorithm

The elastic etcd algorithm works in the following way:

* When you start up mgmt, you can pass it a list of seeds.

* If no seeds are given, then assume you are the first server and startup.

* If a seed is given, connect as a client, and volunteer to be a server.

* All volunteering clients should listen for a message for nomination.

* If a client has been nominated, it should startup a server.

* A server should shutdown if its nomination is removed.

* The elected leader should decide who to nominate/unnominate as needed.

Notes

If you attempt to add a new member to the cluster with a duplicate hostname, then the behaviour is undefined, and you could bork your cluster. This is not recommended or supported. Please ensure that your hostnames are unique.

A single ^C requests an orderly shutdown, however a third ^C will ask etcd to shutdown forcefully. It is not recommended that you use this option, it exists as a way to make exit easier if something deadlocked the cluster. If this was due to user error (eg: duplicate hostnames) then it was your fault, but if the member did not shutdown from a single ^C under normal circumstances, then please file a bug.

There are currently some races in this implementation. In practice, this should not cause any adverse effects unless you simultaneously add or remove members at a high rate. Fixing these races will probably require some internal changes to etcd. Help is welcome if you're interested in working on this.

Smoke testing

Here is a simple way to test etcd clustering basics...

./mgmt run --tmp-prefix --no-pgp --hostname h1 empty
./mgmt run --tmp-prefix --no-pgp --hostname h2 --seeds=http://127.0.0.1:2379 --client-urls=http://127.0.0.1:2381 --server-urls=http://127.0.0.1:2382 empty
./mgmt run --tmp-prefix --no-pgp --hostname h3 --seeds=http://127.0.0.1:2379 --client-urls=http://127.0.0.1:2383 --server-urls=http://127.0.0.1:2384 empty
ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2379 put /_mgmt/chooser/dynamicsize/idealclustersize 3
./mgmt run --tmp-prefix --no-pgp --hostname h4 --seeds=http://127.0.0.1:2379 --client-urls=http://127.0.0.1:2385 --server-urls=http://127.0.0.1:2386 empty
./mgmt run --tmp-prefix --no-pgp --hostname h5 --seeds=http://127.0.0.1:2379 --client-urls=http://127.0.0.1:2387 --server-urls=http://127.0.0.1:2388 empty
ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2379 member list
ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2381 put /_mgmt/chooser/dynamicsize/idealclustersize 5
ETCDCTL_API=3 etcdctl --endpoints 127.0.0.1:2381 member list

Bugs

A member might occasionally think that an endpoint still exists after it has already shutdown. This isn't a major issue, since if that endpoint doesn't respond, then it will automatically choose the next available one. To see this issue, turn on debugging and start: H1, H2, H3, then stop H2, and you might see that H3 still knows about H2.

Shutting down a cluster by setting the idealclustersize to zero is currently buggy and not supported. Try this at your own risk.

If a member is nominated, and it doesn't respond to the nominate event and startup, and we lost quorum to add it, then we could be in a blocked state. This can be improved upon if we can call memberRemove after a timeout.

Adding new cluster members very quickly, might trigger a: `runtime error: error validating peerURLs ... member count is unequal` error. See: https://github.com/etcd-io/etcd/issues/10626 for more information.

If you use the dynamic size feature to start and stop the server process, once it has already started and then stopped, it can't be re-started because of a bug in etcd that doesn't free the port. Instead you'll get a: `bind: address already in use` error. See: https://github.com/etcd-io/etcd/issues/6042 for more information.

Index

Constants

View Source
const (

	// NominatedPath is the unprefixed path under which nominated hosts are
	// stored. This is public so that other consumers can know to avoid this
	// key prefix.
	NominatedPath = "/nominated/"

	// VolunteerPath is the unprefixed path under which volunteering hosts
	// are stored. This is public so that other consumers can know to avoid
	// this key prefix.
	VolunteerPath = "/volunteer/"

	// EndpointsPath is the unprefixed path under which the advertised host
	// endpoints are stored. This is public so that other consumers can know
	// to avoid this key prefix.
	EndpointsPath = "/endpoints/"

	// ChooserPath is the unprefixed path under which the chooser algorithm
	// may store data. This is public so that other consumers can know to
	// avoid this key prefix.
	ChooserPath = "/chooser" // all hosts share the same namespace

	// ConvergedPath is the unprefixed path under which the converger
	// may store data. This is public so that other consumers can know to
	// avoid this key prefix.
	ConvergedPath = "/converged/"

	// SchedulerPath is the unprefixed path under which the scheduler
	// may store data. This is public so that other consumers can know to
	// avoid this key prefix.
	SchedulerPath = "/scheduler/"

	// DefaultClientURL is the default value that is used for client URLs.
	// It is pulled from the upstream etcd package.
	DefaultClientURL = embed.DefaultListenClientURLs // 127.0.0.1:2379

	// DefaultServerURL is the default value that is used for server URLs.
	// It is pulled from the upstream etcd package.
	DefaultServerURL = embed.DefaultListenPeerURLs // 127.0.0.1:2380

	// DefaultMaxTxnOps is the maximum number of operations to run in a
	// single etcd transaction. If you exceed this limit, it is possible
	// that you have either an extremely large code base, or that you have
	// some code which is possibly not as efficient as it could be. Let us
	// know so that we can analyze the situation, and increase this if
	// necessary.
	DefaultMaxTxnOps = 512

	// RunStartupTimeout is the amount of time we will wait for regular run
	// startup before cancelling it all.
	RunStartupTimeout = 30 * time.Second

	// ClientDialTimeout is the DialTimeout option in the client config.
	ClientDialTimeout = 5 * time.Second

	// ClientDialKeepAliveTime is the DialKeepAliveTime config value for the
	// etcd client. It is recommended that you use this so that dead
	// endpoints don't block any cluster operations.
	ClientDialKeepAliveTime = 2 * time.Second // from etcdctl
	// ClientDialKeepAliveTimeout is the DialKeepAliveTimeout config value
	// for the etcd client. It is recommended that you use this so that dead
	// endpoints don't block any cluster operations.
	ClientDialKeepAliveTimeout = 6 * time.Second // from etcdctl

	// MemberChangeInterval is the polling interval to use when watching for
	// member changes during add or remove.
	MemberChangeInterval = 500 * time.Millisecond

	// SelfRemoveTimeout gives unnominated members a chance to self exit.
	SelfRemoveTimeout = 10 * time.Second

	// ForceExitTimeout is the amount of time we will wait for a force exit
	// to occur before cancelling it all.
	ForceExitTimeout = 15 * time.Second

	// SessionTTL is the number of seconds to wait before a dead or
	// unresponsive host has their volunteer keys removed from the cluster.
	// This should be an integer multiple of seconds, since one second is
	// the TTL precision used in etcd.
	SessionTTL = 10 * time.Second // seconds

	// RequireLeaderCtx specifies whether the volunteer loop should use the
	// WithRequireLeader ctx wrapper. It is unknown at this time if this
	// would cause occasional events to be lost, more extensive testing is
	// needed.
	RequireLeaderCtx = false

	// ConvergerHostnameNamespace is a unique key used in the converger.
	ConvergerHostnameNamespace = "etcd-hostname"
)
View Source
const (
	// MaxServerStartTimeout is the amount of time to wait for the server
	// to start before considering it a failure. If you hit this timeout,
	// let us know so that we can analyze the situation, and increase this
	// if necessary.
	MaxServerStartTimeout = 60 * time.Second

	// MaxServerCloseTimeout is the maximum amount of time we'll wait for
	// the server to close down. If it exceeds this, it's probably a bug.
	MaxServerCloseTimeout = 15 * time.Second

	// MaxServerRetries is the maximum number of times we can try to restart
	// the server if it fails on startup. This can help workaround some
	// timing bugs in etcd.
	MaxServerRetries = 5

	// ServerRetryWait is the amount of time to wait between retries.
	ServerRetryWait = 500 * time.Millisecond
)

Variables

This section is empty.

Functions

This section is empty.

Types

type EmbdEtcd

type EmbdEtcd struct {
	Hostname string

	// Seeds is the list of servers that this client could connect to.
	Seeds etcdtypes.URLs

	// ClientURLs are the locations to listen for clients if i am a server.
	ClientURLs etcdtypes.URLs
	// ServerURLs are the locations to listen for servers (peers) if i am a
	// server (peer).
	ServerURLs etcdtypes.URLs
	// AClientURLs are the client urls to advertise.
	AClientURLs etcdtypes.URLs
	// AServerURLscare the server (peer) urls to advertise.
	AServerURLs etcdtypes.URLs

	// NoServer disables all server peering for this host.
	// TODO: allow changing this at runtime with some function call?
	NoServer bool
	// NoNetwork causes this to use unix:// sockets instead of TCP for
	// connections.
	NoNetwork bool

	// Chooser is the implementation of the algorithm that decides which
	// hosts to add or remove to grow and shrink the cluster.
	Chooser chooser.Chooser

	// Converger is a converged coordinator object that can be used to
	// track the converged state.
	Converger *converger.Coordinator

	// NS is a string namespace that we prefix to every key operation.
	NS string

	// Prefix is the directory where any etcd related state is stored. It
	// must be an absolute directory path.
	Prefix string

	Debug bool
	Logf  func(format string, v ...interface{})
	// contains filtered or unexported fields
}

EmbdEtcd provides the embedded server and client etcd functionality.

func (*EmbdEtcd) Close

func (obj *EmbdEtcd) Close() error

Close cleans up after you are done using the struct.

func (*EmbdEtcd) ConnectBlock

func (obj *EmbdEtcd) ConnectBlock(ctx context.Context, fn func(context.Context) error) <-chan error

ConnectBlock runs a command as soon as the client is connected. When this happens, it closes the output channel. In case any error occurs, it sends it on that channel.

func (*EmbdEtcd) Destroy

func (obj *EmbdEtcd) Destroy() error

Destroy cleans up the entire embedded etcd system. Use DestroyServer if you only want to shutdown the embedded server portion.

func (*EmbdEtcd) Exited

func (obj *EmbdEtcd) Exited() <-chan struct{}

Exited returns a channel that closes when we've destroyed. This process happens after Run exits. If Run is never called, this will never happen.

func (*EmbdEtcd) Init

func (obj *EmbdEtcd) Init() error

Init initializes the struct after it has been populated as desired. You must not use the struct if this returns an error.

func (*EmbdEtcd) Interrupt

func (obj *EmbdEtcd) Interrupt() error

Interrupt causes this member to force shutdown. It does not safely wait for an ordered shutdown. It is not recommended to use this unless you're borked.

func (*EmbdEtcd) MakeClient

func (obj *EmbdEtcd) MakeClient() (interfaces.Client, error)

MakeClient returns an etcd Client interface that is suitable for basic tasks. Don't run this until the Ready method has acknowledged.

func (*EmbdEtcd) MakeClientFromNamespace

func (obj *EmbdEtcd) MakeClientFromNamespace(ns string) (interfaces.Client, error)

MakeClientFromNamespace returns an etcd Client interface that is suitable for basic tasks and that has a key namespace prefix. // Don't run this until the Ready method has acknowledged.

func (*EmbdEtcd) Ready

func (obj *EmbdEtcd) Ready() <-chan struct{}

Ready returns a channel that closes when we're up and running. This process happens when calling Run. If Run is never called, this will never happen. Our main startup must be running, and our client must be connected to get here.

func (*EmbdEtcd) Run

func (obj *EmbdEtcd) Run() error

Run is the main entry point to kick off the embedded etcd client and server. It blocks until we've exited for shutdown. The shutdown can be triggered by calling Destroy.

func (*EmbdEtcd) ServerExited

func (obj *EmbdEtcd) ServerExited() (<-chan struct{}, func())

ServerExited returns a channel that closes when the server is destroyed. This process happens after runServer exits. If runServer is never called, this will never happen. It also returns a cancel/ack function which must be called once the signal is received or we are done watching it. This is because this is a cyclical signal which happens, and then gets reset as the server starts up, shuts down, and repeats the cycle. The cancel/ack function ensures that we only watch a signal when it's ready to be read, and only reset it when we are done watching it.

func (*EmbdEtcd) ServerReady

func (obj *EmbdEtcd) ServerReady() (<-chan struct{}, func())

ServerReady returns a channel that closes when we're up and running. This process happens when calling runServer. If runServer is never called, this will never happen. It also returns a cancel/ack function which must be called once the signal is received or we are done watching it. This is because this is a cyclical signal which happens, and then gets reset as the server starts up, shuts down, and repeats the cycle. The cancel/ack function ensures that we only watch a signal when it's ready to be read, and only reset it when we are done watching it.

func (*EmbdEtcd) Validate

func (obj *EmbdEtcd) Validate() error

Validate the initial struct. This is called from Init, but can be used if you would like to check your configuration is correct.

type World

type World struct {
	Hostname       string // uuid for the consumer of these
	Client         interfaces.Client
	MetadataPrefix string    // expected metadata prefix
	StoragePrefix  string    // storage prefix for etcdfs storage
	StandaloneFs   engine.Fs // store an fs here for local usage
	GetURI         func() string
	Debug          bool
	Logf           func(format string, v ...interface{})
}

World is an etcd backed implementation of the World interface.

func (*World) Fs

func (obj *World) Fs(uri string) (engine.Fs, error)

Fs returns a distributed file system from a unique URI. For single host execution that doesn't span more than a single host, this file system might actually be a local or memory backed file system, so actually only distributed within the boredom that is a single host cluster.

func (*World) IdealClusterSizeGet

func (obj *World) IdealClusterSizeGet(ctx context.Context) (uint16, error)

IdealClusterSizeGet gets the cluster-wide dynamic cluster size setpoint.

func (*World) IdealClusterSizeSet

func (obj *World) IdealClusterSizeSet(ctx context.Context, size uint16) (bool, error)

IdealClusterSizeSet sets the cluster-wide dynamic cluster size setpoint.

func (*World) IdealClusterSizeWatch

func (obj *World) IdealClusterSizeWatch(ctx context.Context) (chan error, error)

IdealClusterSizeWatch returns a stream of errors anytime the cluster-wide dynamic cluster size setpoint changes.

func (*World) ResCollect

func (obj *World) ResCollect(ctx context.Context, hostnameFilter, kindFilter []string) ([]engine.Res, error)

ResCollect gets the collection of exported resources which match the filter. It does this atomically so that a call always returns a complete collection.

func (*World) ResExport

func (obj *World) ResExport(ctx context.Context, resourceList []engine.Res) error

ResExport exports a list of resources under our hostname namespace. Subsequent calls replace the previously set collection atomically.

func (*World) ResWatch

func (obj *World) ResWatch(ctx context.Context) (chan error, error)

ResWatch returns a channel which spits out events on possible exported resource changes.

func (*World) Scheduler

func (obj *World) Scheduler(namespace string, opts ...scheduler.Option) (*scheduler.Result, error)

Scheduler returns a scheduling result of hosts in a particular namespace. XXX: Add a context.Context here

func (*World) StrDel

func (obj *World) StrDel(ctx context.Context, namespace string) error

StrDel deletes the value in a particular namespace.

func (*World) StrGet

func (obj *World) StrGet(ctx context.Context, namespace string) (string, error)

StrGet returns the value for the the given namespace.

func (*World) StrIsNotExist

func (obj *World) StrIsNotExist(err error) bool

StrIsNotExist returns whether the error from StrGet is a key missing error.

func (*World) StrMapDel

func (obj *World) StrMapDel(ctx context.Context, namespace string) error

StrMapDel deletes the value in a particular namespace.

func (*World) StrMapGet

func (obj *World) StrMapGet(ctx context.Context, namespace string) (map[string]string, error)

StrMapGet returns a map of hostnames to values in the given namespace.

func (*World) StrMapSet

func (obj *World) StrMapSet(ctx context.Context, namespace, value string) error

StrMapSet sets the namespace value to a particular string under the identity of its own hostname.

func (*World) StrMapWatch

func (obj *World) StrMapWatch(ctx context.Context, namespace string) (chan error, error)

StrMapWatch returns a channel which spits out events on possible string changes.

func (*World) StrSet

func (obj *World) StrSet(ctx context.Context, namespace, value string) error

StrSet sets the namespace value to a particular string. XXX: This can overwrite another hosts value that was set with StrMapSet. Add possible cryptographic signing or special namespacing to prevent such things.

func (*World) StrWatch

func (obj *World) StrWatch(ctx context.Context, namespace string) (chan error, error)

StrWatch returns a channel which spits out events on possible string changes.

func (*World) URI

func (obj *World) URI() string

URI returns the current FS URI. TODO: Can we improve this API or deprecate it entirely?

func (*World) WatchMembers

func (obj *World) WatchMembers(ctx context.Context) (<-chan *interfaces.MembersResult, error)

WatchMembers returns a channel of changing members in the cluster.

Directories

Path Synopsis
str
Package fs implements a very simple and limited file system on top of etcd.
Package fs implements a very simple and limited file system on top of etcd.
Package scheduler implements a distributed consensus scheduler with etcd.
Package scheduler implements a distributed consensus scheduler with etcd.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL