concurrency

package
v0.0.0-...-3e944c0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 30, 2024 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Overview

Package concurrency implements concurrency operations on top of etcd such as distributed locks, barriers, and elections.

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	ErrElectionNotLeader = errors.New("election: not leader")
	ErrElectionNoLeader  = errors.New("election: no leader")
)

Functions

func NewLocker

func NewLocker(s *Session, pfx string) sync.Locker

NewLocker creates a sync.Locker backed by an etcd mutex.

func NewSTM

func NewSTM(c *v3.Client, apply func(STM) error, so ...stmOption) (*v3.TxnResponse, error)

NewSTM initiates a new STM instance, using serializable snapshot isolation by default.

func NewSTMReadCommitted

func NewSTMReadCommitted(ctx context.Context, c *v3.Client, apply func(STM) error) (*v3.TxnResponse, error)

NewSTMReadCommitted is deprecated.

func NewSTMRepeatable

func NewSTMRepeatable(ctx context.Context, c *v3.Client, apply func(STM) error) (*v3.TxnResponse, error)

NewSTMRepeatable is deprecated.

func NewSTMSerializable

func NewSTMSerializable(ctx context.Context, c *v3.Client, apply func(STM) error) (*v3.TxnResponse, error)

NewSTMSerializable is deprecated.

func WithAbortContext

func WithAbortContext(ctx context.Context) stmOption

WithAbortContext specifies the context for permanently aborting the transaction.

func WithIsolation

func WithIsolation(lvl Isolation) stmOption

WithIsolation specifies the transaction isolation level.

func WithPrefetch

func WithPrefetch(keys ...string) stmOption

WithPrefetch is a hint to prefetch a list of keys before trying to apply. If an STM transaction will unconditionally fetch a set of keys, prefetching those keys will save the round-trip cost from requesting each key one by one with Get().

Types

type Election

type Election struct {
	// contains filtered or unexported fields
}

func NewElection

func NewElection(s *Session, pfx string) *Election

NewElection returns a new election on a given key prefix.

func ResumeElection

func ResumeElection(s *Session, pfx string, leaderKey string, leaderRev int64) *Election

ResumeElection initializes an election with a known leader.

func (*Election) Campaign

func (e *Election) Campaign(ctx context.Context, val string) error

Campaign puts a value as eligible for the election. It blocks until it is elected, an error occurs, or the context is cancelled.

Example
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {
	log.Fatal(err)
}
defer cli.Close()

// create two separate sessions for election competition
s1, err := concurrency.NewSession(cli)
if err != nil {
	log.Fatal(err)
}
defer s1.Close()
e1 := concurrency.NewElection(s1, "/my-election/")

s2, err := concurrency.NewSession(cli)
if err != nil {
	log.Fatal(err)
}
defer s2.Close()
e2 := concurrency.NewElection(s2, "/my-election/")

// create competing candidates, with e1 initially losing to e2
var wg sync.WaitGroup
wg.Add(2)
electc := make(chan *concurrency.Election, 2)
go func() {
	defer wg.Done()
	// delay candidacy so e2 wins first
	time.Sleep(3 * time.Second)
	if err := e1.Campaign(context.Background(), "e1"); err != nil {
		log.Fatal(err)
	}
	electc <- e1
}()
go func() {
	defer wg.Done()
	if err := e2.Campaign(context.Background(), "e2"); err != nil {
		log.Fatal(err)
	}
	electc <- e2
}()

cctx, cancel := context.WithCancel(context.TODO())
defer cancel()

e := <-electc
fmt.Println("completed first election with", string((<-e.Observe(cctx)).Kvs[0].Value))

// resign so next candidate can be elected
if err := e.Resign(context.TODO()); err != nil {
	log.Fatal(err)
}

e = <-electc
fmt.Println("completed second election with", string((<-e.Observe(cctx)).Kvs[0].Value))

wg.Wait()
Output:

completed first election with e2
completed second election with e1

func (*Election) Header

func (e *Election) Header() *pb.ResponseHeader

Header is the response header from the last successful election proposal.

func (*Election) Key

func (e *Election) Key() string

Key returns the leader key if elected, empty string otherwise.

func (*Election) Leader

func (e *Election) Leader(ctx context.Context) (*v3.GetResponse, error)

Leader returns the leader value for the current election.

func (*Election) Observe

func (e *Election) Observe(ctx context.Context) <-chan v3.GetResponse

Observe returns a channel that reliably observes ordered leader proposals as GetResponse values on every current elected leader key. It will not necessarily fetch all historical leader updates, but will always post the most recent leader value.

The channel closes when the context is canceled or the underlying watcher is otherwise disrupted.

func (*Election) Proclaim

func (e *Election) Proclaim(ctx context.Context, val string) error

Proclaim lets the leader announce a new value without another election.

func (*Election) Resign

func (e *Election) Resign(ctx context.Context) (err error)

Resign lets a leader start a new election.

func (*Election) Rev

func (e *Election) Rev() int64

Rev returns the leader key's creation revision, if elected.

type Isolation

type Isolation int

Isolation is an enumeration of transactional isolation levels which describes how transactions should interfere and conflict.

const (
	// SerializableSnapshot provides serializable isolation and also checks
	// for write conflicts.
	SerializableSnapshot Isolation = iota
	// Serializable reads within the same transaction attempt return data
	// from the at the revision of the first read.
	Serializable
	// RepeatableReads reads within the same transaction attempt always
	// return the same data.
	RepeatableReads
	// ReadCommitted reads keys from any committed revision.
	ReadCommitted
)

type Mutex

type Mutex struct {
	// contains filtered or unexported fields
}

Mutex implements the sync Locker interface with etcd

func NewMutex

func NewMutex(s *Session, pfx string) *Mutex

func (*Mutex) Header

func (m *Mutex) Header() *pb.ResponseHeader

Header is the response header received from etcd on acquiring the lock.

func (*Mutex) IsOwner

func (m *Mutex) IsOwner() v3.Cmp

func (*Mutex) Key

func (m *Mutex) Key() string

func (*Mutex) Lock

func (m *Mutex) Lock(ctx context.Context) error

Lock locks the mutex with a cancelable context. If the context is canceled while trying to acquire the lock, the mutex tries to clean its stale lock entry.

Example
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {
	log.Fatal(err)
}
defer cli.Close()

// create two separate sessions for lock competition
s1, err := concurrency.NewSession(cli)
if err != nil {
	log.Fatal(err)
}
defer s1.Close()
m1 := concurrency.NewMutex(s1, "/my-lock/")

s2, err := concurrency.NewSession(cli)
if err != nil {
	log.Fatal(err)
}
defer s2.Close()
m2 := concurrency.NewMutex(s2, "/my-lock/")

// acquire lock for s1
if err := m1.Lock(context.TODO()); err != nil {
	log.Fatal(err)
}
fmt.Println("acquired lock for s1")

m2Locked := make(chan struct{})
go func() {
	defer close(m2Locked)
	// wait until s1 is locks /my-lock/
	if err := m2.Lock(context.TODO()); err != nil {
		log.Fatal(err)
	}
}()

if err := m1.Unlock(context.TODO()); err != nil {
	log.Fatal(err)
}
fmt.Println("released lock for s1")

<-m2Locked
fmt.Println("acquired lock for s2")
Output:

acquired lock for s1
released lock for s1
acquired lock for s2

func (*Mutex) Unlock

func (m *Mutex) Unlock(ctx context.Context) error

type STM

type STM interface {
	// Get returns the value for a key and inserts the key in the txn's read set.
	// If Get fails, it aborts the transaction with an error, never returning.
	Get(key ...string) string
	// Put adds a value for a key to the write set.
	Put(key, val string, opts ...v3.OpOption)
	// Rev returns the revision of a key in the read set.
	Rev(key string) int64
	// Del deletes a key.
	Del(key string)
	// contains filtered or unexported methods
}

STM is an interface for software transactional memory.

Example (Apply)

ExampleSTM_apply shows how to use STM with a transactional transfer between balances.

cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {
	log.Fatal(err)
}
defer cli.Close()

// set up "accounts"
totalAccounts := 5
for i := 0; i < totalAccounts; i++ {
	k := fmt.Sprintf("accts/%d", i)
	if _, err = cli.Put(context.TODO(), k, "100"); err != nil {
		log.Fatal(err)
	}
}

exchange := func(stm concurrency.STM) error {
	from, to := rand.Intn(totalAccounts), rand.Intn(totalAccounts)
	if from == to {
		// nothing to do
		return nil
	}
	// read values
	fromK, toK := fmt.Sprintf("accts/%d", from), fmt.Sprintf("accts/%d", to)
	fromV, toV := stm.Get(fromK), stm.Get(toK)
	fromInt, toInt := 0, 0
	fmt.Sscanf(fromV, "%d", &fromInt)
	fmt.Sscanf(toV, "%d", &toInt)

	// transfer amount
	xfer := fromInt / 2
	fromInt, toInt = fromInt-xfer, toInt+xfer

	// write back
	stm.Put(fromK, fmt.Sprintf("%d", fromInt))
	stm.Put(toK, fmt.Sprintf("%d", toInt))
	return nil
}

// concurrently exchange values between accounts
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
	go func() {
		defer wg.Done()
		if _, serr := concurrency.NewSTM(cli, exchange); serr != nil {
			log.Fatal(serr)
		}
	}()
}
wg.Wait()

// confirm account sum matches sum from beginning.
sum := 0
accts, err := cli.Get(context.TODO(), "accts/", clientv3.WithPrefix())
if err != nil {
	log.Fatal(err)
}
for _, kv := range accts.Kvs {
	v := 0
	fmt.Sscanf(string(kv.Value), "%d", &v)
	sum += v
}

fmt.Println("account sum is", sum)
Output:

account sum is 500

type Session

type Session struct {
	// contains filtered or unexported fields
}

Session represents a lease kept alive for the lifetime of a client. Fault-tolerant applications may use sessions to reason about liveness.

func NewSession

func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error)

NewSession gets the leased session for a client.

func (*Session) Client

func (s *Session) Client() *v3.Client

Client is the etcd client that is attached to the session.

func (*Session) Close

func (s *Session) Close() error

Close orphans the session and revokes the session lease.

func (*Session) Done

func (s *Session) Done() <-chan struct{}

Done returns a channel that closes when the lease is orphaned, expires, or is otherwise no longer being refreshed.

func (*Session) Lease

func (s *Session) Lease() v3.LeaseID

Lease is the lease ID for keys bound to the session.

func (*Session) Orphan

func (s *Session) Orphan()

Orphan ends the refresh for the session lease. This is useful in case the state of the client connection is indeterminate (revoke would fail) or when transferring lease ownership.

type SessionOption

type SessionOption func(*sessionOptions)

SessionOption configures Session.

func WithContext

func WithContext(ctx context.Context) SessionOption

WithContext assigns a context to the session instead of defaulting to using the client context. This is useful for canceling NewSession and Close operations immediately without having to close the client. If the context is canceled before Close() completes, the session's lease will be abandoned and left to expire instead of being revoked.

func WithLease

func WithLease(leaseID v3.LeaseID) SessionOption

WithLease specifies the existing leaseID to be used for the session. This is useful in process restart scenario, for example, to reclaim leadership from an election prior to restart.

func WithTTL

func WithTTL(ttl int) SessionOption

WithTTL configures the session's TTL in seconds. If TTL is <= 0, the default 60 seconds TTL will be used.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL