Documentation ¶
Index ¶
- Constants
- Variables
- func StartLocalEtcdCluster(name string, nodes, startPort int, opts ...process.StartOp) ([]*process.Etcd, string, error)
- func StartLocalEtcdServer(opts ...process.StartOp) (*process.Etcd, error)
- type EtcdClient
- func (e *EtcdClient) ApplySTM(ctx context.Context, apply func(concurrency.STM) error) (int64, error)
- func (e *EtcdClient) CheckConnected(tries int, retryTime time.Duration) error
- func (e *EtcdClient) Close()
- func (e *EtcdClient) Create(ctx context.Context, key, val string) (int64, error)
- func (e *EtcdClient) Delete(ctx context.Context, key string) (int64, error)
- func (e *EtcdClient) Get(key string, ops ...objstore.KVOp) ([]byte, int64, int64, error)
- func (e *EtcdClient) Grant(ctx context.Context, ttl int64) (int64, error)
- func (e *EtcdClient) KeepAlive(ctx context.Context, leaseID int64) error
- func (e *EtcdClient) List(key string, cb objstore.ListCb) error
- func (e *EtcdClient) Put(ctx context.Context, key, val string, ops ...objstore.KVOp) (int64, error)
- func (e *EtcdClient) Revoke(ctx context.Context, lease int64) error
- func (e *EtcdClient) Sync(ctx context.Context, key string, cb objstore.SyncCb) error
- func (e *EtcdClient) Update(ctx context.Context, key, val string, version int64) (int64, error)
- type InMemoryStore
- func (e *InMemoryStore) ApplySTM(ctx context.Context, apply func(concurrency.STM) error) (int64, error)
- func (e *InMemoryStore) Create(ctx context.Context, key, val string) (int64, error)
- func (e *InMemoryStore) Delete(ctx context.Context, key string) (int64, error)
- func (e *InMemoryStore) Get(key string, opts ...objstore.KVOp) ([]byte, int64, int64, error)
- func (e *InMemoryStore) Grant(ctx context.Context, ttl int64) (int64, error)
- func (e *InMemoryStore) KeepAlive(ctx context.Context, leaseID int64) error
- func (e *InMemoryStore) List(key string, cb objstore.ListCb) error
- func (e *InMemoryStore) Put(ctx context.Context, key, val string, ops ...objstore.KVOp) (int64, error)
- func (e *InMemoryStore) Rev(key string) int64
- func (e *InMemoryStore) Revoke(ctx context.Context, lease int64) error
- func (e *InMemoryStore) Start() error
- func (e *InMemoryStore) Stop()
- func (e *InMemoryStore) Sync(ctx context.Context, prefix string, cb objstore.SyncCb) error
- func (e *InMemoryStore) Update(ctx context.Context, key, val string, version int64) (int64, error)
- type KVStoreSyncFactory
- type KVStoreSyncReservations
- func (s *KVStoreSyncReservations) Get(ctx context.Context) (map[string]string, error)
- func (s *KVStoreSyncReservations) Release(ctx context.Context, keys ...string) error
- func (s *KVStoreSyncReservations) ReleaseForOwner(ctx context.Context, ownerID string) error
- func (s *KVStoreSyncReservations) ReserveValues(ctx context.Context, ...) error
- type Sync
- func (s *Sync) ApplySTMWait(ctx context.Context, apply func(concurrency.STM) error) error
- func (s *Sync) ApplySTMWaitRev(ctx context.Context, apply func(concurrency.STM) error) (int64, error)
- func (s *Sync) Done()
- func (s *Sync) GetCache(ctx context.Context, key []byte) (edgeproto.ObjCache, bool)
- func (s *Sync) GetKVStore() objstore.KVStore
- func (s *Sync) RegisterCache(cache edgeproto.ObjCache)
- func (s *Sync) Start()
- func (s *Sync) SyncWait(rev int64)
- func (s *Sync) UsesOrg(org string) []string
Constants ¶
const EtcdLocalData string = "etcdLocal_data"
const EtcdLocalLog string = "etcdLocal.log"
const SyncFactoryRootPrefix = "syncfactory"
Variables ¶
var ( WriteRequestTimeout = 10 * time.Second ReadRequestTimeout = 2 * time.Second )
Functions ¶
func StartLocalEtcdCluster ¶
Types ¶
type EtcdClient ¶
type EtcdClient struct {
// contains filtered or unexported fields
}
func GetEtcdClient ¶
func GetEtcdClient(cfg *clientv3.Config) (*EtcdClient, error)
func GetEtcdClientBasic ¶
func GetEtcdClientBasic(clientUrls string) (*EtcdClient, error)
func (*EtcdClient) ApplySTM ¶
func (e *EtcdClient) ApplySTM(ctx context.Context, apply func(concurrency.STM) error) (int64, error)
func (*EtcdClient) CheckConnected ¶
func (e *EtcdClient) CheckConnected(tries int, retryTime time.Duration) error
Do a member list call to see if we're connected
func (*EtcdClient) Close ¶
func (e *EtcdClient) Close()
func (*EtcdClient) KeepAlive ¶
func (e *EtcdClient) KeepAlive(ctx context.Context, leaseID int64) error
func (*EtcdClient) List ¶
func (e *EtcdClient) List(key string, cb objstore.ListCb) error
Get records that have the given key prefix
func (*EtcdClient) Sync ¶
Sync is used to sync a cache with the etcd database. This is needed by the controllers to sync with each other. This could also be used by DMEs and CRMs to sync with the etcd database directly, rather than via the controller. The pros of syncing directly: 1. Etcd has built in failover given multiple etcd db (or proxy) endpoints. It can failover if one disappears or is manually removed. 2. Etcd has a history of changes, so that on reconnect, we do not have resend all records (assuming the history has not been compacted). Given the possible large distance between controllers/etcd and dme/crm instances, random disconnects may be somewhat common. Not having to send all records on reconnect will help a lot. 3. The controllers need to watch anyway to keep in sync with each other, so we already need to write this for the controller. Writing another algorithm to sync data from controller to dme/crm somewhat duplicates this code. The cons of syncing directly: 1. The DMEs/CRMs can't actually connect to etcd directly, as it does not scale. We need to run an etcd grpc-proxy on each controller that somewhat duplicates what the controller is doing in terms of caching data and handling queries. 2. The grpc-proxy is not a true cache. It only caches some results. 3. The controller becomes dependent on etcd for communicating with DMEs and CRMs 4. The grpc-proxy is a thin-ish client. If there is no cache hit, it forwards consolidated requests to the etcd server. So there is a tradeoff between latency vs # of requests to Etcd (if no cache hit). It appears that the cache defaults to 2048 entries but can be adjusted. 5. The grpc-proxy broadcasts replies to clients in series. So if 100 clients were watching on the proxy for the same data, when the upstream data changes, that data would be sent back to each one in serial. So it may not be as performant as our own cache.
type InMemoryStore ¶
type InMemoryStore struct {
// contains filtered or unexported fields
}
func (*InMemoryStore) ApplySTM ¶
func (e *InMemoryStore) ApplySTM(ctx context.Context, apply func(concurrency.STM) error) (int64, error)
Based on clientv3/concurrency/stm.go
func (*InMemoryStore) KeepAlive ¶
func (e *InMemoryStore) KeepAlive(ctx context.Context, leaseID int64) error
func (*InMemoryStore) Rev ¶
func (e *InMemoryStore) Rev(key string) int64
func (*InMemoryStore) Revoke ¶
func (e *InMemoryStore) Revoke(ctx context.Context, lease int64) error
func (*InMemoryStore) Start ¶
func (e *InMemoryStore) Start() error
func (*InMemoryStore) Stop ¶
func (e *InMemoryStore) Stop()
type KVStoreSyncFactory ¶
type KVStoreSyncFactory struct {
// contains filtered or unexported fields
}
func NewKVStoreSyncFactory ¶
func NewKVStoreSyncFactory(kvstore objstore.KVStore, nodeType string, keyPrefix string) *KVStoreSyncFactory
func (*KVStoreSyncFactory) NewSyncReservations ¶
func (s *KVStoreSyncFactory) NewSyncReservations(name string) syncdata.SyncReservations
type KVStoreSyncReservations ¶
type KVStoreSyncReservations struct {
// contains filtered or unexported fields
}
func (*KVStoreSyncReservations) Release ¶
func (s *KVStoreSyncReservations) Release(ctx context.Context, keys ...string) error
func (*KVStoreSyncReservations) ReleaseForOwner ¶
func (s *KVStoreSyncReservations) ReleaseForOwner(ctx context.Context, ownerID string) error
func (*KVStoreSyncReservations) ReserveValues ¶
func (s *KVStoreSyncReservations) ReserveValues(ctx context.Context, updateFn func(ctx context.Context, reservations syncdata.Reservations) error) error
type Sync ¶
type Sync struct {
// contains filtered or unexported fields
}
func (*Sync) ApplySTMWait ¶
func (*Sync) ApplySTMWaitRev ¶
func (*Sync) GetKVStore ¶
func (*Sync) RegisterCache ¶
func (*Sync) Start ¶
func (s *Sync) Start()
Watch on all key changes in a single thread. Compared to watching on different objects in separate threads, this prevents race conditions when objects have dependencies on each other, i.e. an update to one object must update other types of objects.