Documentation ¶
Index ¶
- Variables
- type ConsulKV
- type ConsulLock
- type ConsulLockFactory
- type ConsulLocker
- type ConsulSession
- type ContextCancelledError
- type FakeConsulKV
- func (fck *FakeConsulKV) DeleteCAS(kv *consul.KVPair, wo *consul.WriteOptions) (bool, *consul.WriteMeta, error)
- func (fck *FakeConsulKV) Get(key string, qo *consul.QueryOptions) (*consul.KVPair, *consul.QueryMeta, error)
- func (fck *FakeConsulKV) Put(kvp *consul.KVPair, wo *consul.WriteOptions) (*consul.WriteMeta, error)
- type FakeConsulSession
- func (fcs *FakeConsulSession) Create(se *consul.SessionEntry, q *consul.WriteOptions) (string, *consul.WriteMeta, error)
- func (fcs *FakeConsulSession) Expired(id string) (bool, error)
- func (fcs *FakeConsulSession) Info(id string, q *consul.QueryOptions) (*consul.SessionEntry, *consul.QueryMeta, error)
- func (fcs *FakeConsulSession) RenewPeriodic(initialTTL string, id string, q *consul.WriteOptions, doneCh <-chan struct{}) error
- func (fcs *FakeConsulSession) SetExpires(id string, expires time.Time) error
- type FakeLockFactory
- type FakePreemptableLock
- type FakePreemptiveLockProvider
- type FakeRealLockFactory
- type FakeRealPreemptableLock
- type Lock
- type LockFactory
- type LockProvider
- type PreemptableLock
- type PreemptedLockError
- type PreemptiveLockProvider
- type PreemptiveLocker
- type PreemptiveLockerOpts
Constants ¶
This section is empty.
Variables ¶
var DefaultSessionCheckInterval time.Duration = 100 * time.Millisecond
Functions ¶
This section is empty.
Types ¶
type ConsulKV ¶
type ConsulKV interface { Put(*consul.KVPair, *consul.WriteOptions) (*consul.WriteMeta, error) Get(string, *consul.QueryOptions) (*consul.KVPair, *consul.QueryMeta, error) DeleteCAS(*consul.KVPair, *consul.WriteOptions) (bool, *consul.WriteMeta, error) }
ConsulKV describes an object capable of using Consul as a KV store
type ConsulLock ¶
type ConsulLock struct {
// contains filtered or unexported fields
}
ConsulLock is an object representing a lock on a repo/PR combination using Consul
func (*ConsulLock) Release ¶
func (cl *ConsulLock) Release() error
Release frees and cleans up the lock
type ConsulLockFactory ¶
type ConsulLockFactory struct {
// contains filtered or unexported fields
}
ConsulLockFactory implements the LockFactory interface for a consul client
func NewConsulLockFactory ¶
func NewConsulLockFactory(consul *consul.Client) ConsulLockFactory
NewConsulLockFactory creates a new ConsulLockFactory
func (ConsulLockFactory) LockOpts ¶
func (f ConsulLockFactory) LockOpts(opt *consul.LockOptions) (PreemptableLock, error)
LockOpts returns a preeptable lock with the specified options or an error
type ConsulLocker ¶
type ConsulLocker struct {
// contains filtered or unexported fields
}
ConsulLocker uses Consul to provide a lock service
func NewConsulLocker ¶
func NewConsulLocker(caddr, pfx, datadogServiceNamePrefix string, enableTracing bool) (*ConsulLocker, error)
NewConsulLocker returns a ConsulLocker using the Consul agent at caddr and using pfx as the lock name prefix
func (*ConsulLocker) AcquireLock ¶
func (cl *ConsulLocker) AcquireLock(repo, pr string) (Lock, error)
AcquireLock attempts to acquire a lock for a specific repo/PR combination or returns nil if the lock could not be acquired. Client is responsible for holding on to the returned lock and releasing it when finished
func (*ConsulLocker) AcquireNamedLock ¶
func (cl *ConsulLocker) AcquireNamedLock(name string, ttl string) (Lock, error)
AcquireNamedLock attempts to acquire a lock with the specified name and TTL or returns nil if the lock could not be acquired. Client is responsible for holding on to the returned lock and releasing it when finished
func (*ConsulLocker) NewPreemptiveLocker ¶
func (cl *ConsulLocker) NewPreemptiveLocker(repo, pr string, opts PreemptiveLockerOpts) *PreemptiveLocker
NewPreemptiveLocker creates a new PreemptiveLocker for the specified repo and pr
type ConsulSession ¶
type ConsulSession interface { Create(se *consul.SessionEntry, q *consul.WriteOptions) (string, *consul.WriteMeta, error) RenewPeriodic(initialTTL string, id string, q *consul.WriteOptions, doneCh <-chan struct{}) error }
ConsulSession describes an object capable of interacting with Consul sessions
type ContextCancelledError ¶ added in v0.7.0
type ContextCancelledError struct {
ID string
}
ContextCancelledError is returned when a Lock operation is canceled via context.
func (*ContextCancelledError) Error ¶ added in v0.7.0
func (e *ContextCancelledError) Error() string
type FakeConsulKV ¶
type FakeConsulKV struct {
// contains filtered or unexported fields
}
FakeConsulKV is a faked out version of a Consul KV, for use with a real lock
func NewFakeConsulKV ¶
func NewFakeConsulKV() *FakeConsulKV
func (*FakeConsulKV) DeleteCAS ¶
func (fck *FakeConsulKV) DeleteCAS(kv *consul.KVPair, wo *consul.WriteOptions) (bool, *consul.WriteMeta, error)
func (*FakeConsulKV) Get ¶
func (fck *FakeConsulKV) Get(key string, qo *consul.QueryOptions) (*consul.KVPair, *consul.QueryMeta, error)
func (*FakeConsulKV) Put ¶
func (fck *FakeConsulKV) Put(kvp *consul.KVPair, wo *consul.WriteOptions) (*consul.WriteMeta, error)
type FakeConsulSession ¶
FakeConsulKV is a faked out version of a Consul Session store, for use with a real lock
func (*FakeConsulSession) Create ¶
func (fcs *FakeConsulSession) Create(se *consul.SessionEntry, q *consul.WriteOptions) (string, *consul.WriteMeta, error)
func (*FakeConsulSession) Expired ¶
func (fcs *FakeConsulSession) Expired(id string) (bool, error)
Expired checks if the session is expired (this is not part of the Consul interface)
func (*FakeConsulSession) Info ¶
func (fcs *FakeConsulSession) Info(id string, q *consul.QueryOptions) (*consul.SessionEntry, *consul.QueryMeta, error)
func (*FakeConsulSession) RenewPeriodic ¶
func (fcs *FakeConsulSession) RenewPeriodic(initialTTL string, id string, q *consul.WriteOptions, doneCh <-chan struct{}) error
func (*FakeConsulSession) SetExpires ¶
func (fcs *FakeConsulSession) SetExpires(id string, expires time.Time) error
SetExpires manually sets the expired timestamp for a session (this is not part of the Consul interface)
type FakeLockFactory ¶
type FakeLockFactory struct {
ChannelFactory func() chan struct{}
}
FakeLockFactory returns a FakePreemptableLock using ChannelFactory
func (*FakeLockFactory) LockOpts ¶
func (flf *FakeLockFactory) LockOpts(*consul.LockOptions) (PreemptableLock, error)
type FakePreemptableLock ¶
type FakePreemptableLock struct {
Preempt chan struct{}
}
FakePreempatableLock is a faked out version of a preemptable lock
func (*FakePreemptableLock) Destroy ¶
func (fpl *FakePreemptableLock) Destroy() error
func (*FakePreemptableLock) Lock ¶
func (fpl *FakePreemptableLock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error)
func (*FakePreemptableLock) Unlock ¶
func (fpl *FakePreemptableLock) Unlock() error
type FakePreemptiveLockProvider ¶
type FakePreemptiveLockProvider struct {
ChannelFactory func() chan struct{}
}
FakePreemptiveLockProvider fakes out all aspects of a preemptive lock
func (*FakePreemptiveLockProvider) NewPreemptiveLocker ¶
func (fplp *FakePreemptiveLockProvider) NewPreemptiveLocker(repo, pr string, opts PreemptiveLockerOpts) *PreemptiveLocker
type FakeRealLockFactory ¶
type FakeRealLockFactory struct { CS *FakeConsulSession // SessionCheckInterval is the delay between checks to see if a lock's session has expired. If zero, DefaultSessionCheckInterval is used. SessionCheckInterval, LockWait time.Duration // contains filtered or unexported fields }
FakeRealLockFactory is a functional fake of a Consul lock that uses a mutex
func (*FakeRealLockFactory) LockOpts ¶
func (frlf *FakeRealLockFactory) LockOpts(lo *consul.LockOptions) (PreemptableLock, error)
LockOpts returns a new PreemptableLock for lo.Key or the existing one, if found
type FakeRealPreemptableLock ¶
type FakeRealPreemptableLock struct {
// contains filtered or unexported fields
}
FakeRealPreemptableLock is a fake PreemptableLock that functions as closely as possible as a Consul lock.
func (*FakeRealPreemptableLock) Destroy ¶
func (frpl *FakeRealPreemptableLock) Destroy() error
func (*FakeRealPreemptableLock) Lock ¶
func (frpl *FakeRealPreemptableLock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error)
func (*FakeRealPreemptableLock) Unlock ¶
func (frpl *FakeRealPreemptableLock) Unlock() error
type Lock ¶
type Lock interface {
Release() error
}
Lock describes an object representing a held lock that can be released
type LockFactory ¶
type LockFactory interface {
LockOpts(*consul.LockOptions) (PreemptableLock, error)
}
LockFactory describes an object capable of creating a Consul lock
type LockProvider ¶
type LockProvider interface { AcquireLock(repo, pr string) (Lock, error) AcquireNamedLock(string, string) (Lock, error) }
LockProvider describes an object capable of creating and releasing locks
type PreemptableLock ¶
type PreemptableLock interface { Destroy() error Lock(stopCh <-chan struct{}) (<-chan struct{}, error) Unlock() error }
PreemptableLock describes an object that acts as a Lock
type PreemptedLockError ¶ added in v0.7.0
type PreemptedLockError struct {
ID string
}
PreemptedLockError is returned when a Lock() operation is preempted
func (*PreemptedLockError) Error ¶ added in v0.7.0
func (e *PreemptedLockError) Error() string
type PreemptiveLockProvider ¶
type PreemptiveLockProvider interface {
NewPreemptiveLocker(repo, pr string, opts PreemptiveLockerOpts) *PreemptiveLocker
}
PreemptiveLockProvider describes an object capable of creating PreemptiveLockers
type PreemptiveLocker ¶
type PreemptiveLocker struct {
// contains filtered or unexported fields
}
PreemptiveLocker represents a distributed lock where callers can be preempted while waiting for the lock to be released or while holding the lock. High level, the algorithm is as follows: - Client A calls Lock() which returns immediately, Client A now has the lock. Client A periodically renews the underlying Consul session. If Client A dies, the session expires and the lock is automatically released. - Client B calls Lock() which blocks since the lock is held by Client A. - Client A receives a value on the channel returned from the Lock() call indicating Client A should release the lock ASAP - Client C calls Lock() which blocks - Client B's invocation of Lock() returns with an error indicating it was preempted while waiting for the lock to release - Client A calls Unlock() - Client C's invocation of Lock() returns successfully
func NewPreemptiveLocker ¶
func NewPreemptiveLocker(factory LockFactory, kv ConsulKV, cs ConsulSession, key string, opts PreemptiveLockerOpts, datadogServiceName string, enableTracing bool) *PreemptiveLocker
NewPreemptiveLocker returns a new preemptive locker or an error
func (*PreemptiveLocker) Lock ¶
func (p *PreemptiveLocker) Lock(ctx context.Context) (_ <-chan interface{}, err error)
Lock locks the lock and returns a channel used to signal if the lock should be released ASAP. If the lock is currently in use, this method will block until the lock is released. If caller is preemptied while waiting for the lock to be released, an error is returned.
func (*PreemptiveLocker) Release ¶
func (p *PreemptiveLocker) Release(ctx context.Context) (err error)
Release releases the lock
func (*PreemptiveLocker) WasPreempted ¶
func (p *PreemptiveLocker) WasPreempted(err error) bool
WasPreempted returns true if the provided error was returned due to the caller being preempted while waiting for the lock to be released
type PreemptiveLockerOpts ¶
type PreemptiveLockerOpts struct {
// SessionTTL is how long the underlying Consul session will live between heartbeats
// LockWait is how long a preemtive lock will block waiting for the lock to be acquired
// LockDelay is how long to wait after a lock session has been forcefully invalidated before allowing a new client to acquire the lock
SessionTTL, LockWait, LockDelay time.Duration
}
PreemptiveLockerOpts contains options for the locks produced by PreemptiveLocker