Documentation ¶
Overview ¶
Package concurrency implements concurrency operations on top of etcd such as distributed locks, barriers, and elections.
Index ¶
- Variables
- func NewLocker(s *Session, pfx string) sync.Locker
- func NewSTM(c *v3.Client, apply func(STM) error, so ...stmOption) (*v3.TxnResponse, error)
- func NewSTMReadCommitted(ctx context.Context, c *v3.Client, apply func(STM) error) (*v3.TxnResponse, error)
- func NewSTMRepeatable(ctx context.Context, c *v3.Client, apply func(STM) error) (*v3.TxnResponse, error)
- func NewSTMSerializable(ctx context.Context, c *v3.Client, apply func(STM) error) (*v3.TxnResponse, error)
- func WithAbortContext(ctx context.Context) stmOption
- func WithIsolation(lvl Isolation) stmOption
- func WithPrefetch(keys ...string) stmOption
- type Election
- func (e *Election) Campaign(ctx context.Context, val string) error
- func (e *Election) Header() *pb.ResponseHeader
- func (e *Election) Key() string
- func (e *Election) Leader(ctx context.Context) (*v3.GetResponse, error)
- func (e *Election) Observe(ctx context.Context) <-chan v3.GetResponse
- func (e *Election) Proclaim(ctx context.Context, val string) error
- func (e *Election) Resign(ctx context.Context) (err error)
- func (e *Election) Rev() int64
- type Isolation
- type Mutex
- type STM
- type Session
- type SessionOption
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( ErrElectionNotLeader = errors.New("election: not leader") ErrElectionNoLeader = errors.New("election: no leader") )
var ErrLocked = errors.New("mutex: Locked by another session")
ErrLocked is returned by TryLock when Mutex is already locked by another session.
var ErrSessionExpired = errors.New("mutex: session is expired")
Functions ¶
func NewSTM ¶
NewSTM initiates a new STM instance, using serializable snapshot isolation by default.
func NewSTMReadCommitted ¶
func NewSTMReadCommitted(ctx context.Context, c *v3.Client, apply func(STM) error) (*v3.TxnResponse, error)
NewSTMReadCommitted is deprecated.
func NewSTMRepeatable ¶
func NewSTMRepeatable(ctx context.Context, c *v3.Client, apply func(STM) error) (*v3.TxnResponse, error)
NewSTMRepeatable is deprecated.
func NewSTMSerializable ¶
func NewSTMSerializable(ctx context.Context, c *v3.Client, apply func(STM) error) (*v3.TxnResponse, error)
NewSTMSerializable is deprecated.
func WithAbortContext ¶
WithAbortContext specifies the context for permanently aborting the transaction.
func WithIsolation ¶
func WithIsolation(lvl Isolation) stmOption
WithIsolation specifies the transaction isolation level.
func WithPrefetch ¶
func WithPrefetch(keys ...string) stmOption
WithPrefetch is a hint to prefetch a list of keys before trying to apply. If an STM transaction will unconditionally fetch a set of keys, prefetching those keys will save the round-trip cost from requesting each key one by one with Get().
Types ¶
type Election ¶
type Election struct {
// contains filtered or unexported fields
}
func NewElection ¶
NewElection returns a new election on a given key prefix.
func ResumeElection ¶
ResumeElection initializes an election with a known leader.
func (*Election) Campaign ¶
Campaign puts a value as eligible for the election on the prefix key. Multiple sessions can participate in the election for the same prefix, but only one can be the leader at a time.
If the context is 'context.TODO()/context.Background()', the Campaign will continue to be blocked for other keys to be deleted, unless server returns a non-recoverable error (e.g. ErrCompacted). Otherwise, until the context is not cancelled or timed-out, Campaign will continue to be blocked until it becomes the leader.
Example ¶
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints}) if err != nil { log.Fatal(err) } defer cli.Close() // create two separate sessions for election competition s1, err := concurrency.NewSession(cli) if err != nil { log.Fatal(err) } defer s1.Close() e1 := concurrency.NewElection(s1, "/my-election/") s2, err := concurrency.NewSession(cli) if err != nil { log.Fatal(err) } defer s2.Close() e2 := concurrency.NewElection(s2, "/my-election/") // create competing candidates, with e1 initially losing to e2 var wg sync.WaitGroup wg.Add(2) electc := make(chan *concurrency.Election, 2) go func() { defer wg.Done() // delay candidacy so e2 wins first time.Sleep(3 * time.Second) if err := e1.Campaign(context.Background(), "e1"); err != nil { log.Fatal(err) } electc <- e1 }() go func() { defer wg.Done() if err := e2.Campaign(context.Background(), "e2"); err != nil { log.Fatal(err) } electc <- e2 }() cctx, cancel := context.WithCancel(context.TODO()) defer cancel() e := <-electc fmt.Println("completed first election with", string((<-e.Observe(cctx)).Kvs[0].Value)) // resign so next candidate can be elected if err := e.Resign(context.TODO()); err != nil { log.Fatal(err) } e = <-electc fmt.Println("completed second election with", string((<-e.Observe(cctx)).Kvs[0].Value)) wg.Wait()
Output: completed first election with e2 completed second election with e1
func (*Election) Header ¶
func (e *Election) Header() *pb.ResponseHeader
Header is the response header from the last successful election proposal.
func (*Election) Observe ¶
func (e *Election) Observe(ctx context.Context) <-chan v3.GetResponse
Observe returns a channel that reliably observes ordered leader proposals as GetResponse values on every current elected leader key. It will not necessarily fetch all historical leader updates, but will always post the most recent leader value.
The channel closes when the context is canceled or the underlying watcher is otherwise disrupted.
type Isolation ¶
type Isolation int
Isolation is an enumeration of transactional isolation levels which describes how transactions should interfere and conflict.
const ( // SerializableSnapshot provides serializable isolation and also checks // for write conflicts. SerializableSnapshot Isolation = iota // Serializable reads within the same transaction attempt return data // from the at the revision of the first read. Serializable // RepeatableReads reads within the same transaction attempt always // return the same data. RepeatableReads // ReadCommitted reads keys from any committed revision. ReadCommitted )
type Mutex ¶
type Mutex struct {
// contains filtered or unexported fields
}
Mutex implements the sync Locker interface with etcd
func (*Mutex) Header ¶
func (m *Mutex) Header() *pb.ResponseHeader
Header is the response header received from etcd on acquiring the lock.
func (*Mutex) Lock ¶
Lock locks the mutex with a cancelable context. If the context is canceled while trying to acquire the lock, the mutex tries to clean its stale lock entry.
Example ¶
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints}) if err != nil { log.Fatal(err) } defer cli.Close() // create two separate sessions for lock competition s1, err := concurrency.NewSession(cli) if err != nil { log.Fatal(err) } defer s1.Close() m1 := concurrency.NewMutex(s1, "/my-lock/") s2, err := concurrency.NewSession(cli) if err != nil { log.Fatal(err) } defer s2.Close() m2 := concurrency.NewMutex(s2, "/my-lock/") // acquire lock for s1 if err := m1.Lock(context.TODO()); err != nil { log.Fatal(err) } fmt.Println("acquired lock for s1") m2Locked := make(chan struct{}) go func() { defer close(m2Locked) // wait until s1 is locks /my-lock/ if err := m2.Lock(context.TODO()); err != nil { log.Fatal(err) } }() if err := m1.Unlock(context.TODO()); err != nil { log.Fatal(err) } fmt.Println("released lock for s1") <-m2Locked fmt.Println("acquired lock for s2")
Output: acquired lock for s1 released lock for s1 acquired lock for s2
func (*Mutex) TryLock ¶
TryLock locks the mutex if not already locked by another session. If lock is held by another session, return immediately after attempting necessary cleanup The ctx argument is used for the sending/receiving Txn RPC.
Example ¶
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints}) if err != nil { log.Fatal(err) } defer cli.Close() // create two separate sessions for lock competition s1, err := concurrency.NewSession(cli) if err != nil { log.Fatal(err) } defer s1.Close() m1 := concurrency.NewMutex(s1, "/my-lock") s2, err := concurrency.NewSession(cli) if err != nil { log.Fatal(err) } defer s2.Close() m2 := concurrency.NewMutex(s2, "/my-lock") // acquire lock for s1 if err = m1.Lock(context.TODO()); err != nil { log.Fatal(err) } fmt.Println("acquired lock for s1") if err = m2.TryLock(context.TODO()); err == nil { log.Fatal("should not acquire lock") } if err == concurrency.ErrLocked { fmt.Println("cannot acquire lock for s2, as already locked in another session") } if err = m1.Unlock(context.TODO()); err != nil { log.Fatal(err) } fmt.Println("released lock for s1") if err = m2.TryLock(context.TODO()); err != nil { log.Fatal(err) } fmt.Println("acquired lock for s2")
Output: acquired lock for s1 cannot acquire lock for s2, as already locked in another session released lock for s1 acquired lock for s2
type STM ¶
type STM interface { // Get returns the value for a key and inserts the key in the txn's read set. // If Get fails, it aborts the transaction with an error, never returning. Get(key ...string) string // Put adds a value for a key to the write set. Put(key, val string, opts ...v3.OpOption) // Rev returns the revision of a key in the read set. Rev(key string) int64 // Del deletes a key. Del(key string) // contains filtered or unexported methods }
STM is an interface for software transactional memory.
Example (Apply) ¶
ExampleSTM_apply shows how to use STM with a transactional transfer between balances.
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints}) if err != nil { log.Fatal(err) } defer cli.Close() // set up "accounts" totalAccounts := 5 for i := 0; i < totalAccounts; i++ { k := fmt.Sprintf("accts/%d", i) if _, err = cli.Put(context.TODO(), k, "100"); err != nil { log.Fatal(err) } } exchange := func(stm concurrency.STM) error { from, to := rand.Intn(totalAccounts), rand.Intn(totalAccounts) if from == to { // nothing to do return nil } // read values fromK, toK := fmt.Sprintf("accts/%d", from), fmt.Sprintf("accts/%d", to) fromV, toV := stm.Get(fromK), stm.Get(toK) fromInt, toInt := 0, 0 fmt.Sscanf(fromV, "%d", &fromInt) fmt.Sscanf(toV, "%d", &toInt) // transfer amount xfer := fromInt / 2 fromInt, toInt = fromInt-xfer, toInt+xfer // write back stm.Put(fromK, fmt.Sprintf("%d", fromInt)) stm.Put(toK, fmt.Sprintf("%d", toInt)) return nil } // concurrently exchange values between accounts var wg sync.WaitGroup wg.Add(10) for i := 0; i < 10; i++ { go func() { defer wg.Done() if _, serr := concurrency.NewSTM(cli, exchange); serr != nil { log.Fatal(serr) } }() } wg.Wait() // confirm account sum matches sum from beginning. sum := 0 accts, err := cli.Get(context.TODO(), "accts/", clientv3.WithPrefix()) if err != nil { log.Fatal(err) } for _, kv := range accts.Kvs { v := 0 fmt.Sscanf(string(kv.Value), "%d", &v) sum += v } fmt.Println("account sum is", sum)
Output: account sum is 500
type Session ¶
type Session struct {
// contains filtered or unexported fields
}
Session represents a lease kept alive for the lifetime of a client. Fault-tolerant applications may use sessions to reason about liveness.
func NewSession ¶
func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error)
NewSession gets the leased session for a client.
func (*Session) Done ¶
func (s *Session) Done() <-chan struct{}
Done returns a channel that closes when the lease is orphaned, expires, or is otherwise no longer being refreshed.
type SessionOption ¶
type SessionOption func(*sessionOptions)
SessionOption configures Session.
func WithContext ¶
func WithContext(ctx context.Context) SessionOption
WithContext assigns a context to the session instead of defaulting to using the client context. This is useful for canceling NewSession and Close operations immediately without having to close the client. If the context is canceled before Close() completes, the session's lease will be abandoned and left to expire instead of being revoked.
func WithLease ¶
func WithLease(leaseID v3.LeaseID) SessionOption
WithLease specifies the existing leaseID to be used for the session. This is useful in process restart scenario, for example, to reclaim leadership from an election prior to restart.
func WithTTL ¶
func WithTTL(ttl int) SessionOption
WithTTL configures the session's TTL in seconds. If TTL is <= 0, the default 60 seconds TTL will be used.