Documentation ¶
Overview ¶
package consulutil contains common routines for setting up a live Consul server for use in unit tests. The server runs within the test process and uses an isolated in-memory data store. This functionality is not currently recommended for use due to data / races present in the github.com/hashicorp/consul/testutil package.
Index ¶
- Variables
- func Get(ctx context.Context, clientKV ConsulGetter, key string, ...) (*api.KVPair, *api.QueryMeta, error)
- func List(clientKV ConsulLister, done <-chan struct{}, prefix string, ...) (api.KVPairs, *api.QueryMeta, error)
- func SafeKeys(clientKV ConsulKeyser, done <-chan struct{}, prefix string, ...) ([]string, *api.QueryMeta, error)
- func SessionManager(config api.SessionEntry, client ConsulClient, output chan<- string, ...)
- func WatchDiff(prefix string, clientKV ConsulLister, quitCh <-chan struct{}) (<-chan *WatchedChanges, <-chan error)
- func WatchKeys(prefix string, clientKV ConsulKeyser, done <-chan struct{}, ...) chan WatchedKeys
- func WatchNewKeys(pairsChan <-chan api.KVPairs, onNewKey NewKeyHandler, done <-chan struct{})
- func WatchPrefix(prefix string, clientKV ConsulLister, outPairs chan<- api.KVPairs, ...)
- func WatchSingle(key string, clientKV ConsulGetter, outKVP chan<- *api.KVPair, ...)
- func WithSession(done <-chan struct{}, sessions <-chan string, ...)
- type ConsulClient
- type ConsulGetter
- type ConsulKVClient
- type ConsulKeyser
- type ConsulLister
- type ConsulSessionClient
- type FakeConsulClient
- type FakeKV
- func (f *FakeKV) Acquire(pair *api.KVPair, q *api.WriteOptions) (bool, *api.WriteMeta, error)
- func (f *FakeKV) CAS(p *api.KVPair, q *api.WriteOptions) (bool, *api.WriteMeta, error)
- func (f *FakeKV) Delete(key string, w *api.WriteOptions) (*api.WriteMeta, error)
- func (f *FakeKV) DeleteCAS(pair *api.KVPair, opts *api.WriteOptions) (bool, *api.WriteMeta, error)
- func (f *FakeKV) DeleteTree(prefix string, w *api.WriteOptions) (*api.WriteMeta, error)
- func (f *FakeKV) Get(key string, q *api.QueryOptions) (*api.KVPair, *api.QueryMeta, error)
- func (f *FakeKV) Keys(prefix, separator string, q *api.QueryOptions) ([]string, *api.QueryMeta, error)
- func (f *FakeKV) List(prefix string, q *api.QueryOptions) (api.KVPairs, *api.QueryMeta, error)
- func (f *FakeKV) Put(pair *api.KVPair, q *api.WriteOptions) (*api.WriteMeta, error)
- func (f *FakeKV) Release(p *api.KVPair, q *api.WriteOptions) (bool, *api.WriteMeta, error)
- func (f *FakeKV) Txn(txn api.KVTxnOps, q *api.QueryOptions) (bool, *api.KVTxnResponse, *api.QueryMeta, error)
- type Fixture
- type KVError
- type NewKeyHandler
- type WatchedChanges
- type WatchedKeys
Constants ¶
This section is empty.
Variables ¶
var CanceledError = errors.New("Consul operation canceled")
CanceledError signifies that the Consul operation was explicitly canceled.
var SessionMaxRetrySeconds = param.Int("session_max_retry_seconds", 300)
SessionMaxRetrySeconds is the maximum number of seconds to wait between failed attempts to acquire a session.
var SessionRetrySeconds = param.Int("session_retry_seconds", 10)
SessionRetrySeconds specifies the base time to wait between retries when establishing a session. In the presence of errors, the effective time is derived from this base following a strategy of exponential backoff with jitter.
Functions ¶
func Get ¶
func Get( ctx context.Context, clientKV ConsulGetter, key string, options *api.QueryOptions, ) (*api.KVPair, *api.QueryMeta, error)
Like List, but for a single key instead of a list.
func List ¶
func List( clientKV ConsulLister, done <-chan struct{}, prefix string, options *api.QueryOptions, ) (api.KVPairs, *api.QueryMeta, error)
List performs a KV List operation that can be canceled. When the "done" channel is closed, CanceledError will be immediately returned. (The HTTP RPC can't be canceled, but it will be ignored.) Errors from Consul will be wrapped in a KVError value.
func SafeKeys ¶
func SafeKeys( clientKV ConsulKeyser, done <-chan struct{}, prefix string, options *api.QueryOptions, ) ([]string, *api.QueryMeta, error)
SafeKeys performs a KV Keys operation that can be canceled. When the "done" channel is closed, CanceledError will be immediately returned. (The HTTP RPC can't be canceled, but it will be ignored.) Errors from Consul will be wrapped in a KVError value.
func SessionManager ¶
func SessionManager( config api.SessionEntry, client ConsulClient, output chan<- string, done chan struct{}, logger logging.Logger, )
SessionManager continually creates and maintains Consul sessions. It is intended to be run in its own goroutine. If one session expires, a new one will be created. As sessions come and go, the session ID (or "" for an expired session) will be sent on the output channel.
Parameters:
config: Configuration passed to Consul when creating a new session. client: The Consul client to use. output: The channel used for exposing Consul session IDs. This method takes ownership of this channel and will close it once no new IDs will be created. done: Close this channel to close the current session (if any) and stop creating new sessions. logger: Errors will be logged to this logger.
func WatchDiff ¶
func WatchDiff( prefix string, clientKV ConsulLister, quitCh <-chan struct{}, ) (<-chan *WatchedChanges, <-chan error)
WatchDiff watches a Consul prefix for changes and categorizes them into create, update, and delete, please note that if a kvPair was create and modified before this starts watching, this watch will treat it as a create
func WatchKeys ¶
func WatchKeys( prefix string, clientKV ConsulKeyser, done <-chan struct{}, pause time.Duration, ) chan WatchedKeys
WatchKeys executes consul keys queries on a particular prefix and passes the set of keys on an output channel each time the query returns
func WatchNewKeys ¶
func WatchNewKeys(pairsChan <-chan api.KVPairs, onNewKey NewKeyHandler, done <-chan struct{})
WatchNewKeys watches for changes to a list of Key/Value pairs and lets each key be handled individually though a subscription-like interface.
This function models a key's lifetime in the following way. When a key is first seen, the given NewKeyHandler function will be run, which may return a channel. When the key's value changes, new K/V updates are sent to the key's notification channel. When the key is deleted, `nil` is sent. After being deleted or if the watcher is asked to exit, a key's channel will be closed, to notify the receiver that no further updates are coming.
WatchNewKeys doesn't watch a prefix itself--the caller should arrange a suitable input stream of K/V pairs, probably from WatchPrefix(). This function runs until the input stream closes. Closing "done" will asynchronously cancel the watch and cause it to eventually exit.
func WatchPrefix ¶
func WatchPrefix( prefix string, clientKV ConsulLister, outPairs chan<- api.KVPairs, done <-chan struct{}, outErrors chan<- error, pause time.Duration, jitterWindow time.Duration, )
WatchPrefix watches a Consul prefix for changes to any keys that have the prefix. When anything changes, all Key/Value pairs having that prefix will be written to the provided channel.
Errors will sent on the given output channel but do not otherwise affect execution. The given output stream will become owned by this function call, and this call will close it when the function ends. This function will run until explicitly canceled by closing the "done" channel. Data is written to the output channel synchronously, so readers must consume the data or this method will block.
jitterWindow is used to add randomized jitter between consul requests which can help mitigate a flood of requests to a server when it because available after a period of unavailability. A sleep time will be chosen between 0 and the jitterWindow setting with a uniform probability over that range
func WatchSingle ¶
func WatchSingle( key string, clientKV ConsulGetter, outKVP chan<- *api.KVPair, done <-chan struct{}, outErrors chan<- error, )
WatchSingle has the same semantics as WatchPrefix, but for a single key in Consul. If the key is deleted, a nil will be sent on the output channel, but the watch will not be terminated. In addition, if updates happen in rapid succession, intervening updates may be missed. If these semantics are undesirable, consider WatchNewKeys instead.
func WithSession ¶
func WithSession( done <-chan struct{}, sessions <-chan string, f func(done <-chan struct{}, session string), )
WithSession executes the function f when there is an active session. When that session ends, f is signaled to exit. Once f finishes, a new execution will start when a new session begins.
This function runs until the input stream of sessions is closed. Closing the "done" argument provides a shortcut to exit this function when also tearing down the session producer. In either case, any running f will be terminated before returning. A panic in f will also propagate upwards, causing the function to exit.
Types ¶
type ConsulClient ¶
type ConsulClient interface { KV() ConsulKVClient Session() ConsulSessionClient }
Wrapper interface that allows retrieval of the underlying interfaces.
func ConsulClientFromRaw ¶
func ConsulClientFromRaw(client *api.Client) ConsulClient
Sadly, *api.Client does not implement the ConsulClient interface because the return types of KV() and Session() don't match exactly, e.g. KV() returns an *api.KV not a ConsulKVCLient, even though *api.KV implements ConsulKVClient. This function wraps an *api.Client into something that implements ConsulClient.
type ConsulGetter ¶
type ConsulKVClient ¶
type ConsulKVClient interface { Acquire(p *api.KVPair, q *api.WriteOptions) (bool, *api.WriteMeta, error) CAS(p *api.KVPair, q *api.WriteOptions) (bool, *api.WriteMeta, error) Delete(key string, w *api.WriteOptions) (*api.WriteMeta, error) DeleteCAS(p *api.KVPair, q *api.WriteOptions) (bool, *api.WriteMeta, error) DeleteTree(prefix string, w *api.WriteOptions) (*api.WriteMeta, error) Get(key string, q *api.QueryOptions) (*api.KVPair, *api.QueryMeta, error) Keys(prefix, separator string, q *api.QueryOptions) ([]string, *api.QueryMeta, error) List(prefix string, q *api.QueryOptions) (api.KVPairs, *api.QueryMeta, error) Put(pair *api.KVPair, w *api.WriteOptions) (*api.WriteMeta, error) Release(p *api.KVPair, q *api.WriteOptions) (bool, *api.WriteMeta, error) Txn(txn api.KVTxnOps, q *api.QueryOptions) (bool, *api.KVTxnResponse, *api.QueryMeta, error) }
Interface representing the functionality of the api.KV struct returned by calling KV() on an *api.Client. This is useful for swapping in KV implementations for tests for example
type ConsulKeyser ¶
type ConsulLister ¶
type ConsulLister interface {
List(prefix string, opts *api.QueryOptions) (api.KVPairs, *api.QueryMeta, error)
}
ConsulLister is a portion of the interface for api.KV
type ConsulSessionClient ¶
type ConsulSessionClient interface { Create(se *api.SessionEntry, q *api.WriteOptions) (string, *api.WriteMeta, error) CreateNoChecks(*api.SessionEntry, *api.WriteOptions) (string, *api.WriteMeta, error) Destroy(string, *api.WriteOptions) (*api.WriteMeta, error) Info(id string, q *api.QueryOptions) (*api.SessionEntry, *api.QueryMeta, error) List(q *api.QueryOptions) ([]*api.SessionEntry, *api.QueryMeta, error) Renew(id string, q *api.WriteOptions) (*api.SessionEntry, *api.WriteMeta, error) RenewPeriodic(initialTTL string, id string, q *api.WriteOptions, doneCh chan struct{}) error }
Specifies the functionality provided by the *api.Session struct for managing consul sessions. This is useful for swapping in session client implementations in tests
type FakeConsulClient ¶
type FakeConsulClient struct {
KV_ ConsulKVClient
}
func NewFakeClient ¶
func NewFakeClient() *FakeConsulClient
func (FakeConsulClient) KV ¶
func (c FakeConsulClient) KV() ConsulKVClient
func (FakeConsulClient) Session ¶
func (c FakeConsulClient) Session() ConsulSessionClient
type FakeKV ¶
Provides a fake implementation of *api.KV{} which is useful in tests
func (*FakeKV) Acquire ¶
The fake implementation of this is just the same as writing a key, we expect callers to be using the /lock subtree so real keys won't ever appear like locks
func (*FakeKV) DeleteTree ¶
type Fixture ¶
type Fixture struct { Agent *agent.Agent Servers []*agent.HTTPServer Client ConsulClient T *testing.T HTTPPort int }
Fixture sets up a test Consul server and provides the client configuration for accessing it.
func NewFixture ¶
NewFixture creates a new testing instance of Consul.
type KVError ¶
type KVError struct { Op string Key string KVError error // contains filtered or unexported fields }
KVError encapsulates a consul error
func NewKVError ¶
NewKVError constructs a new KVError to wrap errors from Consul.
func (KVError) LineNumber ¶
LineNumber implements the "pkg/util".CallsiteError interface.