etcd

package module
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 9, 2022 License: MIT Imports: 13 Imported by: 0

README

go-pkg ETCD Package

Go Report Card GoDoc

go-pkg/etcd is an componentized etcd client package.

It providels:

  • An easy way to configre and manage etcd v3 client.
  • Lease proxy.
  • Lock handler.

Based on go.etcd.io/etcd/client/v3

Install

go get github.com/wwwangxc/go-pkg/etcd

Quick Start

Client Proxy
package main

import (
	"context"
	"fmt"

	clientv3 "go.etcd.io/etcd/client/v3"

	// go-pkg/etcd will automatically read configuration
	// files (./app.yaml) when package loaded
	"github.com/wwwangxc/go-pkg/etcd"
)

func ExampleNewClientProxy() {
	_ = etcd.NewClientProxy("etcd1",
		etcd.WithTarget("127.0.0.1:2379,127.0.0.1:2380"),       // set target, target is a list of URLs, multiple URL split by ','.
		etcd.WithTimeout(1000),                                 // set timeout, unit millisecond, default 3000.
		etcd.WithUsername("username"),                          // set user name for authentication.
		etcd.WithPassword("password"),                          // set password for authentication.
		etcd.WithTLSKeyPath("/usr/local/etcd_conf/key.pem"),    // set tls key file path.
		etcd.WithTLSCertPath("/usr/local/etcd_conf/cert.pem"),  // set tls cert file path.
		etcd.WithCACertPath("/usr/local/etcd_conf/cacert.pem"), // set ca cert file path.
	)
}

func ExampleClientProxy_Put() {
	// do etcd put operation
	_, err := etcd.NewClientProxy("etcd1").Put(context.Background(), "key", "val")
	if err != nil {
		fmt.Printf("put operation fail. error:%v", err)
		return
	}

	// or

	// do etcd put operation and convert result to an error
	if err = etcd.PutResult(etcd.NewClientProxy("etcd").Put(context.Background(), "key", "val")); err != nil {
		fmt.Printf("put operation fail. error:%v", err)
		return
	}
}

func ExampleClientProxy_PutWithLease() {
	cli := etcd.NewClientProxy("etcd1")
	lease := cli.Lease()

	// create a lease
	id, err := etcd.LeaseGrantResult(lease.Grant(context.Background(), 10))
	if err != nil {
		fmt.Printf("lease grant fail. error:%v", err)
		return
	}

	// put with lease
	err = etcd.PutResult(cli.Put(context.Background(), "key", "val", clientv3.WithLease(id)))
	if err != nil {
		fmt.Printf("put operation fail. error:%v", err)
	}
}

func ExampleClientProxy_Get() {
	// do etcd get operation
	resp, err := etcd.NewClientProxy("etcd1").Get(context.Background(), "key")
	if err != nil {
		fmt.Printf("get operation fail. error:%v", err)
		return
	}

	for k, v := range resp.Kvs {
		fmt.Printf("key: %s\n", k)
		fmt.Printf("val: %s\n", v)
	}

	// or

	// do etcd get operation and convert result to map[string]string and an error
	m, err := etcd.GetResult(etcd.NewClientProxy("etcd").Get(context.Background(), "key"))
	if err != nil {
		fmt.Printf("get operation fail. error:%v", err)
		return
	}

	for k, v := range m {
		fmt.Printf("key: %s\n", k)
		fmt.Printf("val: %s\n", v)
	}
}

func ExampleClientProxy_Delete() {
	// do etcd delete operation
	resp, err := etcd.NewClientProxy("etcd1").Delete(context.Background(), "key")
	if err != nil {
		fmt.Printf("delete operation fail. error:%v", err)
		return
	}

	fmt.Printf("number of keys deleted: %d\n", resp.Deleted)

	// or

	// do etcd delte operation and convert result to map[string]string and an error
	deletedNumber, err := etcd.DeleteResult(etcd.NewClientProxy("etcd").Delete(context.Background(), "key"))
	if err != nil {
		fmt.Printf("get operation fail. error:%v", err)
		return
	}

	fmt.Printf("number of keys deleted: %d\n", deletedNumber)
}

func ExampleClientProxy_Watch() {
	// do etcd watch operation
	watchChan, err := etcd.NewClientProxy("etcd1").Watch(context.Background(), "key")
	if err != nil {
		fmt.Printf("watch operation fail. error:%v", err)
		return
	}

	for v := range watchChan {
		// do something...
	}
}

func ExampleClientProxy_Txn() {
	_, err := etcd.NewClientProxy("etcd1").Txn(context.Background(),
		[]clientv3.Cmp{clientv3.Compare(clientv3.Value("key"), "=", "val")}, // if key's value == val
		[]clientv3.Op{clientv3.OpPut("key", "val1")},                        // then put key's value = val1
		[]clientv3.Op{clientv3.OpPut("key", "val")})                         // else put key's value = val
	if err != nil {
		fmt.Printf("txn fail. error:%v", err)
		return
	}

	// or

	err = etcd.TxnResult(etcd.NewClientProxy("etcd1").Txn(context.Background(),
		[]clientv3.Cmp{clientv3.Compare(clientv3.Value("key"), "=", "val")}, // if key's value == val
		[]clientv3.Op{clientv3.OpPut("key", "val1")},                        // then put key's value = val1
		[]clientv3.Op{clientv3.OpPut("key", "val")}))                        // else put key's value = val
	if err != nil {
		fmt.Printf("txn fail. error:%v", err)
		return
	}

}
Lease Proxy
package main

import (
	"context"
	"fmt"
	"time"

	// go-pkg/etcd will automatically read configuration
	// files (./app.yaml) when package loaded
	"github.com/wwwangxc/go-pkg/etcd"
)

func ExampleLeaseProxy_Grant() {
	lease := etcd.NewClientProxy("etcd1").Lease()

	// create a lease
	id, err := etcd.LeaseGrantResult(lease.Grant(context.Background(), 10))
	if err != nil {
		fmt.Printf("lease grant fail. error:%v", err)
		return
	}

	fmt.Printf("lease:0x%x\n", id)
}

func ExampleLeaseProxy_RevokeResult() {
	lease := etcd.NewClientProxy("etcd1").Lease()

	// create a lease
	id, err := etcd.LeaseGrantResult(lease.Grant(context.Background(), 10))
	if err != nil {
		fmt.Printf("lease grant fail. error:%v", err)
		return
	}

	// revoke a lease
	err = etcd.LeaseRevokeResult(lease.Revoke(context.Background(), id))
	if err != nil {
		fmt.Printf("lease revoke fail. error:%v", err)
		return
	}
}

func ExampleLeaseProxy_LeaseTimeToLiveResult() {
	lease := etcd.NewClientProxy("etcd1").Lease()

	// create a lease
	id, err := etcd.LeaseGrantResult(lease.Grant(context.Background(), 10))
	if err != nil {
		fmt.Printf("lease grant fail. error:%v", err)
		return
	}

	for {
		// get lease ttl
		ttl, err := etcd.LeaseTimeToLiveResult(lease.TimeToLive(context.Background(), id))
		if err != nil {
			fmt.Printf("get lease ttl fail. error:%v", err)
			return
		}

		if ttl == -1 {
			fmt.Printf("lease:0x%x expired\n", id)
			break
		}

		time.Sleep(time.Second)
	}
}

func ExampleLeaseProxy_KeepAlive() {
	lease := etcd.NewClientProxy("etcd1").Lease()

	// create a lease
	id, err := etcd.LeaseGrantResult(lease.Grant(context.Background(), 10))
	if err != nil {
		fmt.Printf("lease grant fail. error:%v", err)
		return
	}

	ch, err := lease.KeepAlive(context.Background(), id)
	if err != nil {
		fmt.Printf("lease keep alive fail. error:%v", err)
		return
	}

	for {
		ka := <-ch
		if ka == nil {
			fmt.Println("lease timeout")
			return
		}
		fmt.Println("ttl:", ka.TTL)
	}
}
Locker Proxy
package main

import (
	"context"
	"fmt"
	"time"

	// go-pkg/etcd will automatically read configuration
	// files (./app.yaml) when package loaded
	"github.com/wwwangxc/go-pkg/etcd"
)

func ExampleLockerProxy_Lock() {
	lockKeyPrefix := "lock/example/lock"

	// gets the lock operation proxy for the key prefix.
	// It while create an leased session and keep the lease alive until client error
	// or invork close function.
	locker, err := etcd.NewClientProxy("etcd1").Locker(lockKeyPrefix, 3)
	if err != nil {
		fmt.Printf("get locker proxy fail:%v\n", err)
		return
	}

	defer func() {

		// Close orphans the session and revokes the session lease.
		if err := locker.Close(); err != nil {
			fmt.Printf("locker close fail:%v", err)
			return
		}
	}()

	// Will block the current goroutine until locked.
	// If the context is canceled while trying to acquire the lock, the mutex tries to clean its stale lock entry.
	if err := locker.Lock(context.Background()); err != nil {
		fmt.Printf("lock fail:%v\n", err)
		return
	}

	// lock success

	defer func() {
		if err := locker.Unlock(context.Background()); err != nil {
			fmt.Printf("unlock fail:%v", err)
		}
	}()

	// do something...
}

func ExampleLockerProxy_TryLock() {
	lockKeyPrefix := "lock/example/try_lock"

	// gets the lock operation proxy for the key prefix.
	// It while create an leased session and keep the lease alive until client error
	// or invork close function.
	locker, err := etcd.NewClientProxy("etcd1").Locker(lockKeyPrefix, 3)
	if err != nil {
		fmt.Printf("get locker proxy fail:%v\n", err)
		return
	}

	defer func() {

		// Close orphans the session and revokes the session lease.
		if err := locker.Close(); err != nil {
			fmt.Printf("locker close fail:%v", err)
			return
		}
	}()

	if err = locker.TryLock(context.Background()); err != nil {

		// return 'ErrLockNotAcquired' when lock not acquired.
		if etcd.IsErrLockNotAcquired(err) {
			fmt.Printf("lock not acquired\n")
			return
		}

		fmt.Printf("try lock fail:%v\n", err)
		return
	}

	// lock success

	defer func() {
		if err := locker.Unlock(context.Background()); err != nil {
			fmt.Printf("unlock fail:%v", err)
		}
	}()

	// do something...
}
config

app.yaml

client:
  timeout: 3000
  service:
    - name: etcd1
      target: 127.0.0.1:2379,127.0.0.1:2380
      timeout: 1000
      username: username
      password: password
      tls_key: /usr/local/etcd_conf/key.pem
      tls_cert: /usr/local/etcd_conf/cert.pem
      ca_cert: /usr/local/etcd_conf/cacert.pem

Hot To Mock

Client & Lease Proxy & Locker Proxy
package tests

import (
	"testing"

	"github.com/agiledragon/gomonkey"
	"github.com/golang/mock/gomock"
	clientv3 "go.etcd.io/etcd/client/v3"

	// go-pkg/etcd will automatically read configuration
	// files (./app.yaml) when package loaded
	"github.com/wwwangxc/go-pkg/etcd"
	"github.com/wwwangxc/go-pkg/etcd/mocketcd"
)

func TestMockClientProxy(t *testing.T) {
	ctrl := gomock.NewController(t)
	defer ctrl.Finish()

	// Mock Lease Proxy
	mockLease := mocketcd.NewMockLeaseProxy(ctrl)
	mockLease.EXPECT().Grant(gomock.Any(), gomock.Any()).Return(&clientv3.LeaseGrantResponse{}, nil).AnyTimes()
	mockLease.EXPECT().Revoke(gomock.Any(), gomock.Any()).Return(&clientv3.LeaseRevokeResponse{}, nil).AnyTimes()
	mockLease.EXPECT().TimeToLive(gomock.Any(), gomock.Any(), gomock.Any()).Return(&clientv3.LeaseTimeToLiveResponse{}, nil).AnyTimes()
	mockLease.EXPECT().Leases(gomock.Any()).Return(&clientv3.LeaseLeasesResponse{}, nil).AnyTimes()
	mockLease.EXPECT().KeepAlive(gomock.Any(), gomock.Any()).Return(make(chan *clientv3.LeaseKeepAliveResponse), nil).AnyTimes()

  // Mock Locker Proxy
	mockLocker := mocketcd.NewMockLockerProxy(ctrl)
	mockLocker.EXPECT().Lock(gomock.Any()).Return(nil).AnyTimes()
	mockLocker.EXPECT().TryLock(gomock.Any()).Return(nil).AnyTimes()
	mockLocker.EXPECT().Unlock(gomock.Any()).Return(nil).AnyTimes()
	mockLocker.EXPECT().Close().Return(nil).AnyTimes()

	// Mock Client Proxy
	mockCli := mocketcd.NewMockClientProxy(ctrl)
	mockCli.EXPECT().Put(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&clientv3.PutResponse{}, nil).AnyTimes()
	mockCli.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return(&clientv3.GetResponse{}, nil).AnyTimes()
	mockCli.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return(&clientv3.DeleteResponse{}, nil).AnyTimes()
	mockCli.EXPECT().Watch(gomock.Any(), gomock.Any(), gomock.Any()).Return(make(chan clientv3.WatchChan), nil).AnyTimes()
	mockCli.EXPECT().Txn(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&clientv3.TxnResponse{}, nil).AnyTimes()
	mockCli.EXPECT().Lease().Return(mockLease).AnyTimes()
	mockCli.EXPECT().Locker(gomock.Any(), gomock.Any()).Return(mockLocker, nil).AnyTimes()

	patches := gomonkey.ApplyFunc(etcd.NewClientProxy,
		func(string, ...etcd.ClientOption) etcd.ClientProxy {
			return mockCli
		})
	defer patches.Reset()

	// do something...
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrLockNotAcquired lock not acquired
	ErrLockNotAcquired = errors.New("lock not acquired")
)
View Source
var (
	// ErrNil result nil error
	ErrNil = errors.New("result nil")
)

Functions

func DeleteResult

func DeleteResult(resp *clientv3.DeleteResponse, err error) (int64, error)

DeleteResult is a helper that return the deleted raw number of etcd delete operation

func GetResult

func GetResult(resp *clientv3.GetResponse, err error) (map[string]string, error)

GetResult is a helper that converts the response of an etcd get operation to a mapa[string]string

func GetResultCount

func GetResultCount(resp *clientv3.GetResponse, err error) (int64, error)

GetResultCount is a helper that return the count in the response of an etcd get operation

func GetResultInt

func GetResultInt(resp *clientv3.GetResponse, err error) (int, error)

GetResultInt is a helper that converts the first value in the response of an etcd get operation to a int

Return ErrNil when response count euqal 0.

func GetResultInts

func GetResultInts(resp *clientv3.GetResponse, err error) ([]int, error)

GetResultInts is a helper that converts the first value in the response of an etcd get operation to a []int

Return ErrNil when response count euqal 0.

func GetResultString

func GetResultString(resp *clientv3.GetResponse, err error) (string, error)

GetResultString is a helper that converts the first value in the response of an etcd get operation to a string

Return ErrNil when response count euqal 0.

func GetResultStrings

func GetResultStrings(resp *clientv3.GetResponse, err error) ([]string, error)

GetResultStrings is a helper that converts the first value in the response of an etcd get operation to a []string

Return ErrNil when response count euqal 0.

func IsErrLockNotAcquired added in v1.1.0

func IsErrLockNotAcquired(err error) bool

IsErrLockNotAcquired is lock not acquired error

func LeaseGrantResult

func LeaseGrantResult(resp *clientv3.LeaseGrantResponse, err error) (clientv3.LeaseID, error)

LeaseGrantResult is a helper that return lease id in the response of an etcd lease grant operation

func LeaseRevokeResult

func LeaseRevokeResult(_ *clientv3.LeaseRevokeResponse, err error) error

LeaseRevokeResult is a helper that converts etcd lease revoke operation response to an error

func LeaseTimeToLiveResult

func LeaseTimeToLiveResult(resp *clientv3.LeaseTimeToLiveResponse, err error) (int64, error)

LeaseTimeToLiveResult is a helper that return lease ttl in the response of an etcd lease grant operation

func PutResult

func PutResult(_ *clientv3.PutResponse, err error) error

PutResult is a helper that converts etcd put operation response to an error

func TxnResult

func TxnResult(_ *clientv3.TxnResponse, err error) error

TxnResult is a helper that converts etcd txn operation response to an error

Types

type ClientOption

type ClientOption func(*clientConfig)

ClientOption etcd client proxy option

func WithCACertPath

func WithCACertPath(caCertPath string) ClientOption

WithCACertPath set ca cert file path

func WithPassword

func WithPassword(password string) ClientOption

WithPassword set password for authentication

func WithTLSCertPath

func WithTLSCertPath(tlsCertPath string) ClientOption

WithTLSKeyPath set tls cert file path

func WithTLSKeyPath

func WithTLSKeyPath(tlsKeyPath string) ClientOption

WithTLSKeyPath set tls key file path

func WithTarget

func WithTarget(target string) ClientOption

WithTarget set target

Target is a list of URLs, multiple URL split by ','

func WithTimeout

func WithTimeout(timeout int) ClientOption

WithTimeout set timeout

Unit millisecond, default 3000

func WithUsername

func WithUsername(username string) ClientOption

WithUsername set user name for authentication

type ClientProxy

type ClientProxy interface {

	// Put puts a key-value pair into etcd.
	// Note that key,value can be plain bytes array and string is
	// an immutable representation of that bytes array.
	// To get a string of bytes, do string([]byte{0x10, 0x20}).
	Put(ctx context.Context, key, val string, opts ...clientv3.OpOption) (*clientv3.PutResponse, error)

	// Get retrieves keys.
	// By default, Get will return the value for "key", if any.
	// When passed WithRange(end), Get will return the keys in the range [key, end).
	// When passed WithFromKey(), Get returns keys greater than or equal to key.
	// When passed WithRev(rev) with rev > 0, Get retrieves keys at the given revision;
	// if the required revision is compacted, the request will fail with ErrCompacted .
	// When passed WithLimit(limit), the number of returned keys is bounded by limit.
	// When passed WithSort(), the keys will be sorted.
	Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error)

	// Delete deletes a key, or optionally using WithRange(end), [key, end).
	Delete(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.DeleteResponse, error)

	// Watch watches on a key or prefix. The watched events will be returned
	// through the returned channel. If revisions waiting to be sent over the
	// watch are compacted, then the watch will be canceled by the server, the
	// client will post a compacted error watch response, and the channel will close.
	// If the requested revision is 0 or unspecified, the returned channel will
	// return watch events that happen after the server receives the watch request.
	// If the context "ctx" is canceled or timed out, returned "WatchChan" is closed,
	// and "WatchResponse" from this closed channel has zero events and nil "Err()".
	// The context "ctx" MUST be canceled, as soon as watcher is no longer being used,
	// to release the associated resources.
	//
	// If the context is "context.Background/TODO", returned "WatchChan" will
	// not be closed and block until event is triggered, except when server
	// returns a non-recoverable error (e.g. ErrCompacted).
	// For example, when context passed with "WithRequireLeader" and the
	// connected server has no leader (e.g. due to network partition),
	// error "etcdserver: no leader" (ErrNoLeader) will be returned,
	// and then "WatchChan" is closed with non-nil "Err()".
	// In order to prevent a watch stream being stuck in a partitioned node,
	// make sure to wrap context with "WithRequireLeader".
	//
	// Otherwise, as long as the context has not been canceled or timed out,
	// watch will retry on other recoverable errors forever until reconnected.
	Watch(ctx context.Context, key string, opts ...clientv3.OpOption) (clientv3.WatchChan, error)

	// Txn creates a transaction.
	//
	// Step 1:
	// 	If takes a list of comparison. If all comparisons passed in succeed,
	// 	the operations passed into Then() will be executed. Or the operations
	// 	passed into Else() will be executed.
	//
	// Step 2:
	// 	Then takes a list of operations. The Ops list will be executed, if the
	// 	comparisons passed in If() succeed.
	//
	// 	Else takes a list of operations. The Ops list will be executed, if the
	// 	comparisons passed in If() fail.
	//
	// Step 3:
	// 	Commit tries to commit the transaction.
	Txn(ctx context.Context,
		cmps []clientv3.Cmp, thenOps []clientv3.Op, elseOps []clientv3.Op) (*clientv3.TxnResponse, error)

	// Lease get etcd lease proxy
	Lease() LeaseProxy

	// Locker gets the lock operation proxy for the key prefix.
	//
	// It while create an leased session and keep the lease alive until client error
	// or invork close function.
	Locker(prefix string, ttl int) (LockerProxy, error)
}

ClientProxy etcd client proxy

func NewClientProxy

func NewClientProxy(name string, opts ...ClientOption) ClientProxy

NewClientProxy new etcd client proxy

type LeaseProxy

type LeaseProxy interface {
	// Grant creates a new lease.
	Grant(ctx context.Context, ttl int64) (*clientv3.LeaseGrantResponse, error)

	// Revoke revokes the given lease.
	Revoke(ctx context.Context, id clientv3.LeaseID) (*clientv3.LeaseRevokeResponse, error)

	// TimeToLive retrieves the lease information of the given lease ID.
	TimeToLive(ctx context.Context,
		id clientv3.LeaseID, opts ...clientv3.LeaseOption) (*clientv3.LeaseTimeToLiveResponse, error)

	// Leases retrieves all leases.
	Leases(ctx context.Context) (*clientv3.LeaseLeasesResponse, error)

	// KeepAlive attempts to keep the given lease alive forever. If the keepalive responses posted
	// to the channel are not consumed promptly the channel may become full. When full, the lease
	// client will continue sending keep alive requests to the etcd server, but will drop responses
	// until there is capacity on the channel to send more responses.
	//
	// If client keep alive loop halts with an unexpected error (e.g. "etcdserver: no leader") or
	// canceled by the caller (e.g. context.Canceled), KeepAlive returns a ErrKeepAliveHalted error
	// containing the error reason.
	//
	// The returned "LeaseKeepAliveResponse" channel closes if underlying keep
	// alive stream is interrupted in some way the client cannot handle itself;
	// given context "ctx" is canceled or timed out.
	KeepAlive(ctx context.Context, id clientv3.LeaseID) (<-chan *clientv3.LeaseKeepAliveResponse, error)
}

LeaseProxy etcd lease proxy

type LockerProxy added in v1.1.0

type LockerProxy interface {

	// Lock locks the mutex with a cancelable context. If the context is canceled
	// while trying to acquire the lock, the mutex tries to clean its stale lock entry.
	Lock(ctx context.Context) error

	// TryLock locks the mutex if not already locked by another session.
	// If lock is held by another session, return immediately after attempting necessary cleanup
	// The ctx argument is used for the sending/receiving Txn RPC.
	// Return 'ErrLockNotAcquired' when lock not acquired.
	TryLock(ctx context.Context) error

	// Unlock
	// Return error if the lock key delete fail.
	Unlock(ctx context.Context) error

	// Close orphans the session and revokes the session lease.
	Close() error
}

LockerProxy etcd locker proxy

Directories

Path Synopsis
Package mocketcd is a generated GoMock package.
Package mocketcd is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL