Documentation ¶
Overview ¶
Package clientv3 implements the official Go etcd client for v3.
Create client using `clientv3.New`:
// expect dial time-out on ipv4 blackhole _, err := clientv3.New(clientv3.Config{ Endpoints: []string{"http://254.0.0.1:12345"}, DialTimeout: 2 * time.Second, }) // etcd clientv3 >= v3.2.10, grpc/grpc-go >= v1.7.3 if err == context.DeadlineExceeded { // handle errors } // etcd clientv3 <= v3.2.9, grpc/grpc-go <= v1.2.1 if err == grpc.ErrClientConnTimeout { // handle errors } cli, err := clientv3.New(clientv3.Config{ Endpoints: []string{"localhost:2379", "localhost:22379", "localhost:32379"}, DialTimeout: 5 * time.Second, }) if err != nil { // handle error! } defer cli.Close()
Make sure to close the client after using it. If the client is not closed, the connection will have leaky goroutines.
To specify a client request timeout, wrap the context with context.WithTimeout:
ctx, cancel := context.WithTimeout(context.Background(), timeout) resp, err := kvc.Put(ctx, "sample_key", "sample_value") cancel() if err != nil { // handle error! } // use the response
The Client has internal state (watchers and leases), so Clients should be reused instead of created as needed. Clients are safe for concurrent use by multiple goroutines.
etcd client returns 3 types of errors:
- context error: canceled or deadline exceeded.
- gRPC status error: e.g. when clock drifts in server-side before client's context deadline exceeded.
- gRPC error: see https://go.etcd.io/etcd/blob/master/etcdserver/api/v3rpc/rpctypes/error.go
Here is the example code to handle client errors:
resp, err := kvc.Put(ctx, "", "") if err != nil { if err == context.Canceled { // ctx is canceled by another routine } else if err == context.DeadlineExceeded { // ctx is attached with a deadline and it exceeded } else if err == rpctypes.ErrEmptyKey { // client-side error: key is not provided } else if ev, ok := status.FromError(err); ok { code := ev.Code() if code == codes.DeadlineExceeded { // server-side context might have timed-out first (due to clock skew) // while original client-side context is not timed-out yet } } else { // bad cluster endpoints, which are not etcd servers } } go func() { cli.Close() }() _, err := kvc.Get(ctx, "a") if err != nil { // with etcd clientv3 <= v3.3 if err == context.Canceled { // grpc balancer calls 'Get' with an inflight client.Close } else if err == grpc.ErrClientConnClosing { // grpc balancer calls 'Get' after client.Close. } // with etcd clientv3 >= v3.4 if clientv3.IsConnCanceled(err) { // gRPC client connection is closed } }
The grpc load balancer is registered statically and is shared across etcd clients. To enable detailed load balancer logging, set the ETCD_CLIENT_DEBUG environment variable. E.g. "ETCD_CLIENT_DEBUG=1".
Example ¶
package main import ( "context" "log" "os" "time" "go.etcd.io/etcd/clientv3" "google.golang.org/grpc/grpclog" ) var ( dialTimeout = 5 * time.Second endpoints = []string{"localhost:2379", "localhost:22379", "localhost:32379"} ) func main() { clientv3.SetLogger(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr)) cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: dialTimeout, }) if err != nil { log.Fatal(err) } defer cli.Close() // make sure to close the client _, err = cli.Put(context.TODO(), "foo", "bar") if err != nil { log.Fatal(err) } }
Output:
Index ¶
- Constants
- Variables
- func GetLogger() logutil.Logger
- func GetPrefixRangeEnd(prefix string) string
- func IsConnCanceled(err error) bool
- func NewLogger(gl grpclog.LoggerV2) logutil.Logger
- func RetryAuthClient(c *Client) pb.AuthClient
- func RetryClusterClient(c *Client) pb.ClusterClient
- func RetryKVClient(c *Client) pb.KVClient
- func RetryLeaseClient(c *Client) pb.LeaseClient
- func RetryMaintenanceClient(c *Client, conn *grpc.ClientConn) pb.MaintenanceClient
- func SetLogger(l grpclog.LoggerV2)
- func WithRequireLeader(ctx context.Context) context.Context
- type AlarmMember
- type AlarmResponse
- type Auth
- type AuthDisableResponse
- type AuthEnableResponse
- type AuthRoleAddResponse
- type AuthRoleDeleteResponse
- type AuthRoleGetResponse
- type AuthRoleGrantPermissionResponse
- type AuthRoleListResponse
- type AuthRoleRevokePermissionResponse
- type AuthUserAddResponse
- type AuthUserChangePasswordResponse
- type AuthUserDeleteResponse
- type AuthUserGetResponse
- type AuthUserGrantRoleResponse
- type AuthUserListResponse
- type AuthUserRevokeRoleResponse
- type AuthenticateResponse
- type Client
- func (c *Client) ActiveConnection() *grpc.ClientConn
- func (c *Client) Close() error
- func (c *Client) Ctx() context.Context
- func (c *Client) Dial(ep string) (*grpc.ClientConn, error)
- func (c *Client) Endpoints() (eps []string)
- func (c *Client) SetEndpoints(eps ...string)
- func (c *Client) Sync(ctx context.Context) error
- type Cluster
- type Cmp
- type CompactOp
- type CompactOption
- type CompactResponse
- type CompareResult
- type CompareTarget
- type Config
- type DefragmentResponse
- type DeleteResponse
- type ErrKeepAliveHalted
- type Event
- type GetResponse
- type HashKVResponse
- type KV
- type Lease
- type LeaseGrantResponse
- type LeaseID
- type LeaseKeepAliveResponse
- type LeaseLeasesResponse
- type LeaseOp
- type LeaseOption
- type LeaseRevokeResponse
- type LeaseStatus
- type LeaseTimeToLiveResponse
- type Maintenance
- type Member
- type MemberAddResponse
- type MemberListResponse
- type MemberRemoveResponse
- type MemberUpdateResponse
- type MoveLeaderResponse
- type Op
- func (op Op) IsCountOnly() bool
- func (op Op) IsDelete() bool
- func (op Op) IsGet() bool
- func (op Op) IsKeysOnly() bool
- func (op Op) IsPut() bool
- func (op Op) IsSerializable() bool
- func (op Op) IsTxn() bool
- func (op Op) KeyBytes() []byte
- func (op Op) MaxCreateRev() int64
- func (op Op) MaxModRev() int64
- func (op Op) MinCreateRev() int64
- func (op Op) MinModRev() int64
- func (op Op) RangeBytes() []byte
- func (op Op) Rev() int64
- func (op Op) Txn() ([]Cmp, []Op, []Op)
- func (op Op) ValueBytes() []byte
- func (op *Op) WithKeyBytes(key []byte)
- func (op *Op) WithRangeBytes(end []byte)
- func (op *Op) WithValueBytes(v []byte)
- type OpOption
- func WithCountOnly() OpOption
- func WithCreatedNotify() OpOption
- func WithFilterDelete() OpOption
- func WithFilterPut() OpOption
- func WithFirstCreate() []OpOption
- func WithFirstKey() []OpOption
- func WithFirstRev() []OpOption
- func WithFragment() OpOption
- func WithFromKey() OpOption
- func WithIgnoreLease() OpOption
- func WithIgnoreValue() OpOption
- func WithKeysOnly() OpOption
- func WithLastCreate() []OpOption
- func WithLastKey() []OpOption
- func WithLastRev() []OpOption
- func WithLease(leaseID LeaseID) OpOption
- func WithLimit(n int64) OpOption
- func WithMaxCreateRev(rev int64) OpOption
- func WithMaxModRev(rev int64) OpOption
- func WithMinCreateRev(rev int64) OpOption
- func WithMinModRev(rev int64) OpOption
- func WithPrefix() OpOption
- func WithPrevKV() OpOption
- func WithProgressNotify() OpOption
- func WithRange(endKey string) OpOption
- func WithRev(rev int64) OpOption
- func WithSerializable() OpOption
- func WithSort(target SortTarget, order SortOrder) OpOption
- type OpResponse
- type Permission
- type PermissionType
- type PutResponse
- type SortOption
- type SortOrder
- type SortTarget
- type StatusResponse
- type Txn
- type TxnResponse
- type WatchChan
- type WatchResponse
- type Watcher
Examples ¶
- Package
- Auth
- Client (Metrics)
- Cluster (MemberAdd)
- Cluster (MemberList)
- Cluster (MemberRemove)
- Cluster (MemberUpdate)
- Config (WithTLS)
- KV (Compact)
- KV (Delete)
- KV (Do)
- KV (Get)
- KV (GetSortedPrefix)
- KV (GetWithRev)
- KV (Put)
- KV (PutErrorHandling)
- KV (Txn)
- Lease (Grant)
- Lease (KeepAlive)
- Lease (KeepAliveOnce)
- Lease (Revoke)
- Maintenance (Defragment)
- Maintenance (Status)
- Watcher (Watch)
- Watcher (WatchWithPrefix)
- Watcher (WatchWithProgressNotify)
- Watcher (WatchWithRange)
Constants ¶
const ( PermRead = authpb.READ PermWrite = authpb.WRITE PermReadWrite = authpb.READWRITE )
const ( EventTypeDelete = mvccpb.DELETE EventTypePut = mvccpb.PUT )
const MaxLeaseTTL = 9000000000
MaxLeaseTTL is the maximum lease TTL value
Variables ¶
var ( ErrNoAvailableEndpoints = errors.New("etcdclient: no available endpoints") ErrOldCluster = errors.New("etcdclient: old cluster version") )
var DefaultLogConfig = zap.Config{ Level: zap.NewAtomicLevelAt(zap.WarnLevel), Development: false, Sampling: &zap.SamplingConfig{ Initial: 100, Thereafter: 100, }, Encoding: "json", EncoderConfig: zap.NewProductionEncoderConfig(), OutputPaths: []string{"stderr"}, ErrorOutputPaths: []string{"stderr"}, }
DefaultLogConfig is the default client logging configuration. Default log level is "Warn". Use "zap.InfoLevel" for debugging. Use "/dev/null" for output paths, to discard all logs.
var LeaseResponseChSize = 16
LeaseResponseChSize is the size of buffer to store unsent lease responses. WARNING: DO NOT UPDATE. Only for testing purposes.
Functions ¶
func GetPrefixRangeEnd ¶
GetPrefixRangeEnd gets the range end of the prefix. 'Get(foo, WithPrefix())' is equal to 'Get(foo, WithRange(GetPrefixRangeEnd(foo))'.
func IsConnCanceled ¶
IsConnCanceled returns true, if error is from a closed gRPC connection. ref. https://github.com/grpc/grpc-go/pull/1854
func RetryAuthClient ¶
func RetryAuthClient(c *Client) pb.AuthClient
RetryAuthClient implements a AuthClient.
func RetryClusterClient ¶
func RetryClusterClient(c *Client) pb.ClusterClient
RetryClusterClient implements a ClusterClient.
func RetryLeaseClient ¶
func RetryLeaseClient(c *Client) pb.LeaseClient
RetryLeaseClient implements a LeaseClient.
func RetryMaintenanceClient ¶
func RetryMaintenanceClient(c *Client, conn *grpc.ClientConn) pb.MaintenanceClient
RetryMaintenanceClient implements a Maintenance.
Types ¶
type AlarmMember ¶
type AlarmMember pb.AlarmMember
type AlarmResponse ¶
type AlarmResponse pb.AlarmResponse
type Auth ¶
type Auth interface { // AuthEnable enables auth of an etcd cluster. AuthEnable(ctx context.Context) (*AuthEnableResponse, error) // AuthDisable disables auth of an etcd cluster. AuthDisable(ctx context.Context) (*AuthDisableResponse, error) // UserAdd adds a new user to an etcd cluster. UserAdd(ctx context.Context, name string, password string) (*AuthUserAddResponse, error) // UserDelete deletes a user from an etcd cluster. UserDelete(ctx context.Context, name string) (*AuthUserDeleteResponse, error) // UserChangePassword changes a password of a user. UserChangePassword(ctx context.Context, name string, password string) (*AuthUserChangePasswordResponse, error) // UserGrantRole grants a role to a user. UserGrantRole(ctx context.Context, user string, role string) (*AuthUserGrantRoleResponse, error) // UserGet gets a detailed information of a user. UserGet(ctx context.Context, name string) (*AuthUserGetResponse, error) // UserList gets a list of all users. UserList(ctx context.Context) (*AuthUserListResponse, error) // UserRevokeRole revokes a role of a user. UserRevokeRole(ctx context.Context, name string, role string) (*AuthUserRevokeRoleResponse, error) // RoleAdd adds a new role to an etcd cluster. RoleAdd(ctx context.Context, name string) (*AuthRoleAddResponse, error) // RoleGrantPermission grants a permission to a role. RoleGrantPermission(ctx context.Context, name string, key, rangeEnd string, permType PermissionType) (*AuthRoleGrantPermissionResponse, error) // RoleGet gets a detailed information of a role. RoleGet(ctx context.Context, role string) (*AuthRoleGetResponse, error) // RoleList gets a list of all roles. RoleList(ctx context.Context) (*AuthRoleListResponse, error) // RoleRevokePermission revokes a permission from a role. RoleRevokePermission(ctx context.Context, role string, key, rangeEnd string) (*AuthRoleRevokePermissionResponse, error) // RoleDelete deletes a role. RoleDelete(ctx context.Context, role string) (*AuthRoleDeleteResponse, error) }
Example ¶
cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: dialTimeout, }) if err != nil { log.Fatal(err) } defer cli.Close() if _, err = cli.RoleAdd(context.TODO(), "root"); err != nil { log.Fatal(err) } if _, err = cli.UserAdd(context.TODO(), "root", "123"); err != nil { log.Fatal(err) } if _, err = cli.UserGrantRole(context.TODO(), "root", "root"); err != nil { log.Fatal(err) } if _, err = cli.RoleAdd(context.TODO(), "r"); err != nil { log.Fatal(err) } if _, err = cli.RoleGrantPermission( context.TODO(), "r", // role name "foo", // key "zoo", // range end clientv3.PermissionType(clientv3.PermReadWrite), ); err != nil { log.Fatal(err) } if _, err = cli.UserAdd(context.TODO(), "u", "123"); err != nil { log.Fatal(err) } if _, err = cli.UserGrantRole(context.TODO(), "u", "r"); err != nil { log.Fatal(err) } if _, err = cli.AuthEnable(context.TODO()); err != nil { log.Fatal(err) } cliAuth, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: dialTimeout, Username: "u", Password: "123", }) if err != nil { log.Fatal(err) } defer cliAuth.Close() if _, err = cliAuth.Put(context.TODO(), "foo1", "bar"); err != nil { log.Fatal(err) } _, err = cliAuth.Txn(context.TODO()). If(clientv3.Compare(clientv3.Value("zoo1"), ">", "abc")). Then(clientv3.OpPut("zoo1", "XYZ")). Else(clientv3.OpPut("zoo1", "ABC")). Commit() fmt.Println(err) // now check the permission with the root account rootCli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: dialTimeout, Username: "root", Password: "123", }) if err != nil { log.Fatal(err) } defer rootCli.Close() resp, err := rootCli.RoleGet(context.TODO(), "r") if err != nil { log.Fatal(err) } fmt.Printf("user u permission: key %q, range end %q\n", resp.Perm[0].Key, resp.Perm[0].RangeEnd) if _, err = rootCli.AuthDisable(context.TODO()); err != nil { log.Fatal(err) }
Output: etcdserver: permission denied user u permission: key "foo", range end "zoo"
type AuthDisableResponse ¶
type AuthDisableResponse pb.AuthDisableResponse
type AuthEnableResponse ¶
type AuthEnableResponse pb.AuthEnableResponse
type AuthRoleAddResponse ¶
type AuthRoleAddResponse pb.AuthRoleAddResponse
type AuthRoleDeleteResponse ¶
type AuthRoleDeleteResponse pb.AuthRoleDeleteResponse
type AuthRoleGetResponse ¶
type AuthRoleGetResponse pb.AuthRoleGetResponse
type AuthRoleGrantPermissionResponse ¶
type AuthRoleGrantPermissionResponse pb.AuthRoleGrantPermissionResponse
type AuthRoleListResponse ¶
type AuthRoleListResponse pb.AuthRoleListResponse
type AuthRoleRevokePermissionResponse ¶
type AuthRoleRevokePermissionResponse pb.AuthRoleRevokePermissionResponse
type AuthUserAddResponse ¶
type AuthUserAddResponse pb.AuthUserAddResponse
type AuthUserChangePasswordResponse ¶
type AuthUserChangePasswordResponse pb.AuthUserChangePasswordResponse
type AuthUserDeleteResponse ¶
type AuthUserDeleteResponse pb.AuthUserDeleteResponse
type AuthUserGetResponse ¶
type AuthUserGetResponse pb.AuthUserGetResponse
type AuthUserGrantRoleResponse ¶
type AuthUserGrantRoleResponse pb.AuthUserGrantRoleResponse
type AuthUserListResponse ¶
type AuthUserListResponse pb.AuthUserListResponse
type AuthUserRevokeRoleResponse ¶
type AuthUserRevokeRoleResponse pb.AuthUserRevokeRoleResponse
type AuthenticateResponse ¶
type AuthenticateResponse pb.AuthenticateResponse
type Client ¶
type Client struct { Cluster KV Lease Watcher Auth Maintenance // Username is a user name for authentication. Username string // Password is a password for authentication. Password string // contains filtered or unexported fields }
Client provides and manages an etcd v3 client session.
Example (Metrics) ¶
cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialOptions: []grpc.DialOption{ grpc.WithUnaryInterceptor(grpcprom.UnaryClientInterceptor), grpc.WithStreamInterceptor(grpcprom.StreamClientInterceptor), }, }) if err != nil { log.Fatal(err) } defer cli.Close() // get a key so it shows up in the metrics as a range RPC cli.Get(context.TODO(), "test_key") // listen for all Prometheus metrics ln, err := net.Listen("tcp", ":0") if err != nil { log.Fatal(err) } donec := make(chan struct{}) go func() { defer close(donec) http.Serve(ln, promhttp.Handler()) }() defer func() { ln.Close() <-donec }() // make an http request to fetch all Prometheus metrics url := "http://" + ln.Addr().String() + "/metrics" resp, err := http.Get(url) if err != nil { log.Fatalf("fetch error: %v", err) } b, err := ioutil.ReadAll(resp.Body) resp.Body.Close() if err != nil { log.Fatalf("fetch error: reading %s: %v", url, err) } // confirm range request in metrics for _, l := range strings.Split(string(b), "\n") { if strings.Contains(l, `grpc_client_started_total{grpc_method="Range"`) { fmt.Println(l) break } }
Output: grpc_client_started_total{grpc_method="Range",grpc_service="etcdserverpb.KV",grpc_type="unary"} 1
func NewCtxClient ¶
NewCtxClient creates a client with a context but no underlying grpc connection. This is useful for embedded cases that override the service interface implementations and do not need connection management.
func NewFromURL ¶
NewFromURL creates a new etcdv3 client from a URL.
func NewFromURLs ¶
NewFromURLs creates a new etcdv3 client from URLs.
func (*Client) ActiveConnection ¶
func (c *Client) ActiveConnection() *grpc.ClientConn
ActiveConnection returns the current in-use connection
func (*Client) Ctx ¶
Ctx is a context for "out of band" messages (e.g., for sending "clean up" message when another context is canceled). It is canceled on client Close().
func (*Client) Dial ¶
func (c *Client) Dial(ep string) (*grpc.ClientConn, error)
Dial connects to a single endpoint using the client's config.
func (*Client) SetEndpoints ¶
SetEndpoints updates client's endpoints.
type Cluster ¶
type Cluster interface { // MemberList lists the current cluster membership. MemberList(ctx context.Context) (*MemberListResponse, error) // MemberAdd adds a new member into the cluster. MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) // MemberRemove removes an existing member from the cluster. MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error) // MemberUpdate updates the peer addresses of the member. MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*MemberUpdateResponse, error) }
Example (MemberAdd) ¶
cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints[:2], DialTimeout: dialTimeout, }) if err != nil { log.Fatal(err) } defer cli.Close() peerURLs := endpoints[2:] mresp, err := cli.MemberAdd(context.Background(), peerURLs) if err != nil { log.Fatal(err) } fmt.Println("added member.PeerURLs:", mresp.Member.PeerURLs) // added member.PeerURLs: [http://localhost:32380]
Output:
Example (MemberList) ¶
cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: dialTimeout, }) if err != nil { log.Fatal(err) } defer cli.Close() resp, err := cli.MemberList(context.Background()) if err != nil { log.Fatal(err) } fmt.Println("members:", len(resp.Members))
Output: members: 3
Example (MemberRemove) ¶
cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints[1:], DialTimeout: dialTimeout, }) if err != nil { log.Fatal(err) } defer cli.Close() resp, err := cli.MemberList(context.Background()) if err != nil { log.Fatal(err) } _, err = cli.MemberRemove(context.Background(), resp.Members[0].ID) if err != nil { log.Fatal(err) }
Output:
Example (MemberUpdate) ¶
cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: dialTimeout, }) if err != nil { log.Fatal(err) } defer cli.Close() resp, err := cli.MemberList(context.Background()) if err != nil { log.Fatal(err) } peerURLs := []string{"http://localhost:12380"} _, err = cli.MemberUpdate(context.Background(), resp.Members[0].ID, peerURLs) if err != nil { log.Fatal(err) }
Output:
func NewCluster ¶
func NewClusterFromClusterClient ¶
func NewClusterFromClusterClient(remote pb.ClusterClient, c *Client) Cluster
type Cmp ¶
func CreateRevision ¶
func LeaseValue ¶
LeaseValue compares a key's LeaseID to a value of your choosing. The empty LeaseID is 0, otherwise known as `NoLease`.
func ModRevision ¶
func (*Cmp) ValueBytes ¶
ValueBytes returns the byte slice holding the comparison value, if any.
func (*Cmp) WithKeyBytes ¶
WithKeyBytes sets the byte slice for the comparison key.
func (Cmp) WithPrefix ¶
WithPrefix sets the comparison to scan all keys prefixed by the key.
func (*Cmp) WithValueBytes ¶
WithValueBytes sets the byte slice for the comparison's value.
type CompactOp ¶
type CompactOp struct {
// contains filtered or unexported fields
}
CompactOp represents a compact operation.
func OpCompact ¶
func OpCompact(rev int64, opts ...CompactOption) CompactOp
OpCompact wraps slice CompactOption to create a CompactOp.
type CompactOption ¶
type CompactOption func(*CompactOp)
CompactOption configures compact operation.
func WithCompactPhysical ¶
func WithCompactPhysical() CompactOption
WithCompactPhysical makes Compact wait until all compacted entries are removed from the etcd server's storage.
type CompactResponse ¶
type CompactResponse pb.CompactionResponse
type CompareResult ¶
type CompareResult int
type CompareTarget ¶
type CompareTarget int
const ( CompareVersion CompareTarget = iota CompareCreated CompareModified CompareValue )
type Config ¶
type Config struct { // Endpoints is a list of URLs. Endpoints []string `json:"endpoints"` // AutoSyncInterval is the interval to update endpoints with its latest members. // 0 disables auto-sync. By default auto-sync is disabled. AutoSyncInterval time.Duration `json:"auto-sync-interval"` // DialTimeout is the timeout for failing to establish a connection. DialTimeout time.Duration `json:"dial-timeout"` // DialKeepAliveTime is the time after which client pings the server to see if // transport is alive. DialKeepAliveTime time.Duration `json:"dial-keep-alive-time"` // DialKeepAliveTimeout is the time that the client waits for a response for the // keep-alive probe. If the response is not received in this time, the connection is closed. DialKeepAliveTimeout time.Duration `json:"dial-keep-alive-timeout"` // MaxCallSendMsgSize is the client-side request send limit in bytes. // If 0, it defaults to 2.0 MiB (2 * 1024 * 1024). // Make sure that "MaxCallSendMsgSize" < server-side default send/recv limit. // ("--max-request-bytes" flag to etcd or "embed.Config.MaxRequestBytes"). MaxCallSendMsgSize int // MaxCallRecvMsgSize is the client-side response receive limit. // If 0, it defaults to "math.MaxInt32", because range response can // easily exceed request send limits. // Make sure that "MaxCallRecvMsgSize" >= server-side default send/recv limit. // ("--max-request-bytes" flag to etcd or "embed.Config.MaxRequestBytes"). MaxCallRecvMsgSize int // TLS holds the client secure credentials, if any. TLS *tls.Config // Username is a user name for authentication. Username string `json:"username"` // Password is a password for authentication. Password string `json:"password"` // RejectOldCluster when set will refuse to create a client against an outdated cluster. RejectOldCluster bool `json:"reject-old-cluster"` // DialOptions is a list of dial options for the grpc client (e.g., for interceptors). DialOptions []grpc.DialOption // Context is the default client context; it can be used to cancel grpc dial out and // other operations that do not have an explicit context. Context context.Context // LogConfig configures client-side logger. // If nil, use the default logger. // TODO: configure gRPC logger LogConfig *zap.Config }
Example (WithTLS) ¶
package main import ( "context" "log" "time" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/pkg/transport" ) var ( dialTimeout = 5 * time.Second endpoints = []string{"localhost:2379", "localhost:22379", "localhost:32379"} ) func main() { tlsInfo := transport.TLSInfo{ CertFile: "/tmp/test-certs/test-name-1.pem", KeyFile: "/tmp/test-certs/test-name-1-key.pem", TrustedCAFile: "/tmp/test-certs/trusted-ca.pem", } tlsConfig, err := tlsInfo.ClientConfig() if err != nil { log.Fatal(err) } cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: dialTimeout, TLS: tlsConfig, }) if err != nil { log.Fatal(err) } defer cli.Close() // make sure to close the client _, err = cli.Put(context.TODO(), "foo", "bar") if err != nil { log.Fatal(err) } }
Output:
type DefragmentResponse ¶
type DefragmentResponse pb.DefragmentResponse
type DeleteResponse ¶
type DeleteResponse pb.DeleteRangeResponse
func (*DeleteResponse) OpResponse ¶
func (resp *DeleteResponse) OpResponse() OpResponse
type ErrKeepAliveHalted ¶
type ErrKeepAliveHalted struct {
Reason error
}
ErrKeepAliveHalted is returned if client keep alive loop halts with an unexpected error.
This usually means that automatic lease renewal via KeepAlive is broken, but KeepAliveOnce will still work as expected.
func (ErrKeepAliveHalted) Error ¶
func (e ErrKeepAliveHalted) Error() string
type Event ¶
type GetResponse ¶
type GetResponse pb.RangeResponse
func (*GetResponse) OpResponse ¶
func (resp *GetResponse) OpResponse() OpResponse
type HashKVResponse ¶
type HashKVResponse pb.HashKVResponse
type KV ¶
type KV interface { // Put puts a key-value pair into etcd. // Note that key,value can be plain bytes array and string is // an immutable representation of that bytes array. // To get a string of bytes, do string([]byte{0x10, 0x20}). Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) // Get retrieves keys. // By default, Get will return the value for "key", if any. // When passed WithRange(end), Get will return the keys in the range [key, end). // When passed WithFromKey(), Get returns keys greater than or equal to key. // When passed WithRev(rev) with rev > 0, Get retrieves keys at the given revision; // if the required revision is compacted, the request will fail with ErrCompacted . // When passed WithLimit(limit), the number of returned keys is bounded by limit. // When passed WithSort(), the keys will be sorted. Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error) // Delete deletes a key, or optionally using WithRange(end), [key, end). Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error) // Compact compacts etcd KV history before the given rev. Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error) // Do applies a single Op on KV without a transaction. // Do is useful when creating arbitrary operations to be issued at a // later time; the user can range over the operations, calling Do to // execute them. Get/Put/Delete, on the other hand, are best suited // for when the operation should be issued at the time of declaration. Do(ctx context.Context, op Op) (OpResponse, error) // Txn creates a transaction. Txn(ctx context.Context) Txn }
Example (Compact) ¶
cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: dialTimeout, }) if err != nil { log.Fatal(err) } defer cli.Close() ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) resp, err := cli.Get(ctx, "foo") cancel() if err != nil { log.Fatal(err) } compRev := resp.Header.Revision // specify compact revision of your choice ctx, cancel = context.WithTimeout(context.Background(), requestTimeout) _, err = cli.Compact(ctx, compRev) cancel() if err != nil { log.Fatal(err) }
Output:
Example (Delete) ¶
cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: dialTimeout, }) if err != nil { log.Fatal(err) } defer cli.Close() ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) defer cancel() // count keys about to be deleted gresp, err := cli.Get(ctx, "key", clientv3.WithPrefix()) if err != nil { log.Fatal(err) } // delete the keys dresp, err := cli.Delete(ctx, "key", clientv3.WithPrefix()) if err != nil { log.Fatal(err) } fmt.Println("Deleted all keys:", int64(len(gresp.Kvs)) == dresp.Deleted)
Output: Deleted all keys: true
Example (Do) ¶
cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: dialTimeout, }) if err != nil { log.Fatal(err) } defer cli.Close() ops := []clientv3.Op{ clientv3.OpPut("put-key", "123"), clientv3.OpGet("put-key"), clientv3.OpPut("put-key", "456")} for _, op := range ops { if _, err := cli.Do(context.TODO(), op); err != nil { log.Fatal(err) } }
Output:
Example (Get) ¶
cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: dialTimeout, }) if err != nil { log.Fatal(err) } defer cli.Close() _, err = cli.Put(context.TODO(), "foo", "bar") if err != nil { log.Fatal(err) } ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) resp, err := cli.Get(ctx, "foo") cancel() if err != nil { log.Fatal(err) } for _, ev := range resp.Kvs { fmt.Printf("%s : %s\n", ev.Key, ev.Value) }
Output: foo : bar
Example (GetSortedPrefix) ¶
cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: dialTimeout, }) if err != nil { log.Fatal(err) } defer cli.Close() for i := range make([]int, 3) { ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) _, err = cli.Put(ctx, fmt.Sprintf("key_%d", i), "value") cancel() if err != nil { log.Fatal(err) } } ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) resp, err := cli.Get(ctx, "key", clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend)) cancel() if err != nil { log.Fatal(err) } for _, ev := range resp.Kvs { fmt.Printf("%s : %s\n", ev.Key, ev.Value) }
Output: key_2 : value key_1 : value key_0 : value
Example (GetWithRev) ¶
cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: dialTimeout, }) if err != nil { log.Fatal(err) } defer cli.Close() presp, err := cli.Put(context.TODO(), "foo", "bar1") if err != nil { log.Fatal(err) } _, err = cli.Put(context.TODO(), "foo", "bar2") if err != nil { log.Fatal(err) } ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) resp, err := cli.Get(ctx, "foo", clientv3.WithRev(presp.Header.Revision)) cancel() if err != nil { log.Fatal(err) } for _, ev := range resp.Kvs { fmt.Printf("%s : %s\n", ev.Key, ev.Value) }
Output: foo : bar1
Example (Put) ¶
cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: dialTimeout, }) if err != nil { log.Fatal(err) } defer cli.Close() ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) _, err = cli.Put(ctx, "sample_key", "sample_value") cancel() if err != nil { log.Fatal(err) }
Output:
Example (PutErrorHandling) ¶
cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: dialTimeout, }) if err != nil { log.Fatal(err) } defer cli.Close() ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) _, err = cli.Put(ctx, "", "sample_value") cancel() if err != nil { switch err { case context.Canceled: fmt.Printf("ctx is canceled by another routine: %v\n", err) case context.DeadlineExceeded: fmt.Printf("ctx is attached with a deadline is exceeded: %v\n", err) case rpctypes.ErrEmptyKey: fmt.Printf("client-side error: %v\n", err) default: fmt.Printf("bad cluster endpoints, which are not etcd servers: %v\n", err) } }
Output: client-side error: etcdserver: key is not provided
Example (Txn) ¶
cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: dialTimeout, }) if err != nil { log.Fatal(err) } defer cli.Close() kvc := clientv3.NewKV(cli) _, err = kvc.Put(context.TODO(), "key", "xyz") if err != nil { log.Fatal(err) } ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) _, err = kvc.Txn(ctx). // txn value comparisons are lexical If(clientv3.Compare(clientv3.Value("key"), ">", "abc")). // the "Then" runs, since "xyz" > "abc" Then(clientv3.OpPut("key", "XYZ")). // the "Else" does not run Else(clientv3.OpPut("key", "ABC")). Commit() cancel() if err != nil { log.Fatal(err) } gresp, err := kvc.Get(context.TODO(), "key") cancel() if err != nil { log.Fatal(err) } for _, ev := range gresp.Kvs { fmt.Printf("%s : %s\n", ev.Key, ev.Value) }
Output: key : XYZ
type Lease ¶
type Lease interface { // Grant creates a new lease. Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) // Revoke revokes the given lease. Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) // TimeToLive retrieves the lease information of the given lease ID. TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error) // Leases retrieves all leases. Leases(ctx context.Context) (*LeaseLeasesResponse, error) // KeepAlive keeps the given lease alive forever. If the keepalive response // posted to the channel is not consumed immediately, the lease client will // continue sending keep alive requests to the etcd server at least every // second until latest response is consumed. // // The returned "LeaseKeepAliveResponse" channel closes if underlying keep // alive stream is interrupted in some way the client cannot handle itself; // given context "ctx" is canceled or timed out. "LeaseKeepAliveResponse" // from this closed channel is nil. // // If client keep alive loop halts with an unexpected error (e.g. "etcdserver: // no leader") or canceled by the caller (e.g. context.Canceled), the error // is returned. Otherwise, it retries. // // TODO(v4.0): post errors to last keep alive message before closing // (see https://go.etcd.io/etcd/pull/7866) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) // KeepAliveOnce renews the lease once. The response corresponds to the // first message from calling KeepAlive. If the response has a recoverable // error, KeepAliveOnce will retry the RPC with a new keep alive message. // // In most of the cases, Keepalive should be used instead of KeepAliveOnce. KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) // Close releases all resources Lease keeps for efficient communication // with the etcd server. Close() error }
Example (Grant) ¶
cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: dialTimeout, }) if err != nil { log.Fatal(err) } defer cli.Close() // minimum lease TTL is 5-second resp, err := cli.Grant(context.TODO(), 5) if err != nil { log.Fatal(err) } // after 5 seconds, the key 'foo' will be removed _, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(resp.ID)) if err != nil { log.Fatal(err) }
Output:
Example (KeepAlive) ¶
cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: dialTimeout, }) if err != nil { log.Fatal(err) } defer cli.Close() resp, err := cli.Grant(context.TODO(), 5) if err != nil { log.Fatal(err) } _, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(resp.ID)) if err != nil { log.Fatal(err) } // the key 'foo' will be kept forever ch, kaerr := cli.KeepAlive(context.TODO(), resp.ID) if kaerr != nil { log.Fatal(kaerr) } ka := <-ch fmt.Println("ttl:", ka.TTL)
Output: ttl: 5
Example (KeepAliveOnce) ¶
cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: dialTimeout, }) if err != nil { log.Fatal(err) } defer cli.Close() resp, err := cli.Grant(context.TODO(), 5) if err != nil { log.Fatal(err) } _, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(resp.ID)) if err != nil { log.Fatal(err) } // to renew the lease only once ka, kaerr := cli.KeepAliveOnce(context.TODO(), resp.ID) if kaerr != nil { log.Fatal(kaerr) } fmt.Println("ttl:", ka.TTL)
Output: ttl: 5
Example (Revoke) ¶
cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: dialTimeout, }) if err != nil { log.Fatal(err) } defer cli.Close() resp, err := cli.Grant(context.TODO(), 5) if err != nil { log.Fatal(err) } _, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(resp.ID)) if err != nil { log.Fatal(err) } // revoking lease expires the key attached to its lease ID _, err = cli.Revoke(context.TODO(), resp.ID) if err != nil { log.Fatal(err) } gresp, err := cli.Get(context.TODO(), "foo") if err != nil { log.Fatal(err) } fmt.Println("number of keys:", len(gresp.Kvs))
Output: number of keys: 0
func NewLeaseFromLeaseClient ¶
type LeaseGrantResponse ¶
type LeaseGrantResponse struct { *pb.ResponseHeader ID LeaseID TTL int64 Error string }
LeaseGrantResponse wraps the protobuf message LeaseGrantResponse.
type LeaseID ¶
type LeaseID int64
const ( // NoLease is a lease ID for the absence of a lease. NoLease LeaseID = 0 )
type LeaseKeepAliveResponse ¶
type LeaseKeepAliveResponse struct { *pb.ResponseHeader ID LeaseID TTL int64 }
LeaseKeepAliveResponse wraps the protobuf message LeaseKeepAliveResponse.
type LeaseLeasesResponse ¶
type LeaseLeasesResponse struct { *pb.ResponseHeader Leases []LeaseStatus `json:"leases"` }
LeaseLeasesResponse wraps the protobuf message LeaseLeasesResponse.
type LeaseOp ¶
type LeaseOp struct {
// contains filtered or unexported fields
}
LeaseOp represents an Operation that lease can execute.
type LeaseOption ¶
type LeaseOption func(*LeaseOp)
LeaseOption configures lease operations.
func WithAttachedKeys ¶
func WithAttachedKeys() LeaseOption
WithAttachedKeys makes TimeToLive list the keys attached to the given lease ID.
type LeaseRevokeResponse ¶
type LeaseRevokeResponse pb.LeaseRevokeResponse
type LeaseStatus ¶
type LeaseStatus struct {
ID LeaseID `json:"id"`
}
LeaseStatus represents a lease status.
type LeaseTimeToLiveResponse ¶
type LeaseTimeToLiveResponse struct { *pb.ResponseHeader ID LeaseID `json:"id"` // TTL is the remaining TTL in seconds for the lease; the lease will expire in under TTL+1 seconds. Expired lease will return -1. TTL int64 `json:"ttl"` // GrantedTTL is the initial granted time in seconds upon lease creation/renewal. GrantedTTL int64 `json:"granted-ttl"` // Keys is the list of keys attached to this lease. Keys [][]byte `json:"keys"` }
LeaseTimeToLiveResponse wraps the protobuf message LeaseTimeToLiveResponse.
type Maintenance ¶
type Maintenance interface { // AlarmList gets all active alarms. AlarmList(ctx context.Context) (*AlarmResponse, error) // AlarmDisarm disarms a given alarm. AlarmDisarm(ctx context.Context, m *AlarmMember) (*AlarmResponse, error) // Defragment releases wasted space from internal fragmentation on a given etcd member. // Defragment is only needed when deleting a large number of keys and want to reclaim // the resources. // Defragment is an expensive operation. User should avoid defragmenting multiple members // at the same time. // To defragment multiple members in the cluster, user need to call defragment multiple // times with different endpoints. Defragment(ctx context.Context, endpoint string) (*DefragmentResponse, error) // Status gets the status of the endpoint. Status(ctx context.Context, endpoint string) (*StatusResponse, error) // HashKV returns a hash of the KV state at the time of the RPC. // If revision is zero, the hash is computed on all keys. If the revision // is non-zero, the hash is computed on all keys at or below the given revision. HashKV(ctx context.Context, endpoint string, rev int64) (*HashKVResponse, error) // Snapshot provides a reader for a point-in-time snapshot of etcd. // If the context "ctx" is canceled or timed out, reading from returned // "io.ReadCloser" would error out (e.g. context.Canceled, context.DeadlineExceeded). Snapshot(ctx context.Context) (io.ReadCloser, error) // MoveLeader requests current leader to transfer its leadership to the transferee. // Request must be made to the leader. MoveLeader(ctx context.Context, transfereeID uint64) (*MoveLeaderResponse, error) }
Example (Defragment) ¶
for _, ep := range endpoints { cli, err := clientv3.New(clientv3.Config{ Endpoints: []string{ep}, DialTimeout: dialTimeout, }) if err != nil { log.Fatal(err) } defer cli.Close() if _, err = cli.Defragment(context.TODO(), ep); err != nil { log.Fatal(err) } }
Output:
Example (Status) ¶
for _, ep := range endpoints { cli, err := clientv3.New(clientv3.Config{ Endpoints: []string{ep}, DialTimeout: dialTimeout, }) if err != nil { log.Fatal(err) } defer cli.Close() resp, err := cli.Status(context.Background(), ep) if err != nil { log.Fatal(err) } fmt.Printf("endpoint: %s / Leader: %v\n", ep, resp.Header.MemberId == resp.Leader) } // endpoint: localhost:2379 / Leader: false // endpoint: localhost:22379 / Leader: false // endpoint: localhost:32379 / Leader: true
Output:
func NewMaintenance ¶
func NewMaintenance(c *Client) Maintenance
func NewMaintenanceFromMaintenanceClient ¶
func NewMaintenanceFromMaintenanceClient(remote pb.MaintenanceClient, c *Client) Maintenance
type MemberAddResponse ¶
type MemberAddResponse pb.MemberAddResponse
type MemberListResponse ¶
type MemberListResponse pb.MemberListResponse
type MemberRemoveResponse ¶
type MemberRemoveResponse pb.MemberRemoveResponse
type MemberUpdateResponse ¶
type MemberUpdateResponse pb.MemberUpdateResponse
type MoveLeaderResponse ¶
type MoveLeaderResponse pb.MoveLeaderResponse
type Op ¶
type Op struct {
// contains filtered or unexported fields
}
Op represents an Operation that kv can execute.
func (Op) IsCountOnly ¶
IsCountOnly returns whether countOnly is set.
func (Op) IsSerializable ¶
IsSerializable returns true if the serializable field is true.
func (Op) MaxCreateRev ¶
MaxCreateRev returns the operation's maximum create revision.
func (Op) MinCreateRev ¶
MinCreateRev returns the operation's minimum create revision.
func (Op) RangeBytes ¶
RangeBytes returns the byte slice holding with the Op's range end, if any.
func (Op) Txn ¶
Txn returns the comparison(if) operations, "then" operations, and "else" operations.
func (Op) ValueBytes ¶
ValueBytes returns the byte slice holding the Op's value, if any.
func (*Op) WithKeyBytes ¶
WithKeyBytes sets the byte slice for the Op's key.
func (*Op) WithRangeBytes ¶
WithRangeBytes sets the byte slice for the Op's range end.
func (*Op) WithValueBytes ¶
WithValueBytes sets the byte slice for the Op's value.
type OpOption ¶
type OpOption func(*Op)
OpOption configures Operations like Get, Put, Delete.
func WithCountOnly ¶
func WithCountOnly() OpOption
WithCountOnly makes the 'Get' request return only the count of keys.
func WithCreatedNotify ¶
func WithCreatedNotify() OpOption
WithCreatedNotify makes watch server sends the created event.
func WithFilterDelete ¶
func WithFilterDelete() OpOption
WithFilterDelete discards DELETE events from the watcher.
func WithFilterPut ¶
func WithFilterPut() OpOption
WithFilterPut discards PUT events from the watcher.
func WithFirstCreate ¶
func WithFirstCreate() []OpOption
WithFirstCreate gets the key with the oldest creation revision in the request range.
func WithFirstKey ¶
func WithFirstKey() []OpOption
WithFirstKey gets the lexically first key in the request range.
func WithFirstRev ¶
func WithFirstRev() []OpOption
WithFirstRev gets the key with the oldest modification revision in the request range.
func WithFragment ¶
func WithFragment() OpOption
WithFragment to receive raw watch response with fragmentation. Fragmentation is disabled by default. If fragmentation is enabled, etcd watch server will split watch response before sending to clients when the total size of watch events exceed server-side request limit. The default server-side request limit is 1.5 MiB, which can be configured as "--max-request-bytes" flag value + gRPC-overhead 512 bytes. See "etcdserver/api/v3rpc/watch.go" for more details.
func WithFromKey ¶
func WithFromKey() OpOption
WithFromKey specifies the range of 'Get', 'Delete', 'Watch' requests to be equal or greater than the key in the argument.
func WithIgnoreLease ¶
func WithIgnoreLease() OpOption
WithIgnoreLease updates the key using its current lease. This option can not be combined with WithLease. Returns an error if the key does not exist.
func WithIgnoreValue ¶
func WithIgnoreValue() OpOption
WithIgnoreValue updates the key using its current value. This option can not be combined with non-empty values. Returns an error if the key does not exist.
func WithKeysOnly ¶
func WithKeysOnly() OpOption
WithKeysOnly makes the 'Get' request return only the keys and the corresponding values will be omitted.
func WithLastCreate ¶
func WithLastCreate() []OpOption
WithLastCreate gets the key with the latest creation revision in the request range.
func WithLastKey ¶
func WithLastKey() []OpOption
WithLastKey gets the lexically last key in the request range.
func WithLastRev ¶
func WithLastRev() []OpOption
WithLastRev gets the key with the latest modification revision in the request range.
func WithLimit ¶
WithLimit limits the number of results to return from 'Get' request. If WithLimit is given a 0 limit, it is treated as no limit.
func WithMaxCreateRev ¶
WithMaxCreateRev filters out keys for Get with creation revisions greater than the given revision.
func WithMaxModRev ¶
WithMaxModRev filters out keys for Get with modification revisions greater than the given revision.
func WithMinCreateRev ¶
WithMinCreateRev filters out keys for Get with creation revisions less than the given revision.
func WithMinModRev ¶
WithMinModRev filters out keys for Get with modification revisions less than the given revision.
func WithPrefix ¶
func WithPrefix() OpOption
WithPrefix enables 'Get', 'Delete', or 'Watch' requests to operate on the keys with matching prefix. For example, 'Get(foo, WithPrefix())' can return 'foo1', 'foo2', and so on.
func WithPrevKV ¶
func WithPrevKV() OpOption
WithPrevKV gets the previous key-value pair before the event happens. If the previous KV is already compacted, nothing will be returned.
func WithProgressNotify ¶
func WithProgressNotify() OpOption
WithProgressNotify makes watch server send periodic progress updates every 10 minutes when there is no incoming events. Progress updates have zero events in WatchResponse.
func WithRange ¶
WithRange specifies the range of 'Get', 'Delete', 'Watch' requests. For example, 'Get' requests with 'WithRange(end)' returns the keys in the range [key, end). endKey must be lexicographically greater than start key.
func WithRev ¶
WithRev specifies the store revision for 'Get' request. Or the start revision of 'Watch' request.
func WithSerializable ¶
func WithSerializable() OpOption
WithSerializable makes 'Get' request serializable. By default, it's linearizable. Serializable requests are better for lower latency requirement.
func WithSort ¶
func WithSort(target SortTarget, order SortOrder) OpOption
WithSort specifies the ordering in 'Get' request. It requires 'WithRange' and/or 'WithPrefix' to be specified too. 'target' specifies the target to sort by: key, version, revisions, value. 'order' can be either 'SortNone', 'SortAscend', 'SortDescend'.
type OpResponse ¶
type OpResponse struct {
// contains filtered or unexported fields
}
func (OpResponse) Del ¶
func (op OpResponse) Del() *DeleteResponse
func (OpResponse) Get ¶
func (op OpResponse) Get() *GetResponse
func (OpResponse) Put ¶
func (op OpResponse) Put() *PutResponse
func (OpResponse) Txn ¶
func (op OpResponse) Txn() *TxnResponse
type Permission ¶
type Permission authpb.Permission
type PermissionType ¶
type PermissionType authpb.Permission_Type
func StrToPermissionType ¶
func StrToPermissionType(s string) (PermissionType, error)
type PutResponse ¶
type PutResponse pb.PutResponse
func (*PutResponse) OpResponse ¶
func (resp *PutResponse) OpResponse() OpResponse
type SortOption ¶
type SortOption struct { Target SortTarget Order SortOrder }
type SortTarget ¶
type SortTarget int
const ( SortByKey SortTarget = iota SortByVersion SortByCreateRevision SortByModRevision SortByValue )
type StatusResponse ¶
type StatusResponse pb.StatusResponse
type Txn ¶
type Txn interface { // If takes a list of comparison. If all comparisons passed in succeed, // the operations passed into Then() will be executed. Or the operations // passed into Else() will be executed. If(cs ...Cmp) Txn // Then takes a list of operations. The Ops list will be executed, if the // comparisons passed in If() succeed. Then(ops ...Op) Txn // Else takes a list of operations. The Ops list will be executed, if the // comparisons passed in If() fail. Else(ops ...Op) Txn // Commit tries to commit the transaction. Commit() (*TxnResponse, error) }
Txn is the interface that wraps mini-transactions.
Txn(context.TODO()).If( Compare(Value(k1), ">", v1), Compare(Version(k1), "=", 2) ).Then( OpPut(k2,v2), OpPut(k3,v3) ).Else( OpPut(k4,v4), OpPut(k5,v5) ).Commit()
type TxnResponse ¶
type TxnResponse pb.TxnResponse
func (*TxnResponse) OpResponse ¶
func (resp *TxnResponse) OpResponse() OpResponse
type WatchChan ¶
type WatchChan <-chan WatchResponse
type WatchResponse ¶
type WatchResponse struct { Header pb.ResponseHeader Events []*Event // CompactRevision is the minimum revision the watcher may receive. CompactRevision int64 // Canceled is used to indicate watch failure. // If the watch failed and the stream was about to close, before the channel is closed, // the channel sends a final response that has Canceled set to true with a non-nil Err(). Canceled bool // Created is used to indicate the creation of the watcher. Created bool // contains filtered or unexported fields }
func (*WatchResponse) Err ¶
func (wr *WatchResponse) Err() error
Err is the error value if this WatchResponse holds an error.
func (*WatchResponse) IsProgressNotify ¶
func (wr *WatchResponse) IsProgressNotify() bool
IsProgressNotify returns true if the WatchResponse is progress notification.
type Watcher ¶
type Watcher interface { // Watch watches on a key or prefix. The watched events will be returned // through the returned channel. If revisions waiting to be sent over the // watch are compacted, then the watch will be canceled by the server, the // client will post a compacted error watch response, and the channel will close. // If the context "ctx" is canceled or timed out, returned "WatchChan" is closed, // and "WatchResponse" from this closed channel has zero events and nil "Err()". // The context "ctx" MUST be canceled, as soon as watcher is no longer being used, // to release the associated resources. // // If the context is "context.Background/TODO", returned "WatchChan" will // not be closed and block until event is triggered, except when server // returns a non-recoverable error (e.g. ErrCompacted). // For example, when context passed with "WithRequireLeader" and the // connected server has no leader (e.g. due to network partition), // error "etcdserver: no leader" (ErrNoLeader) will be returned, // and then "WatchChan" is closed with non-nil "Err()". // In order to prevent a watch stream being stuck in a partitioned node, // make sure to wrap context with "WithRequireLeader". // // Otherwise, as long as the context has not been canceled or timed out, // watch will retry on other recoverable errors forever until reconnected. // // TODO: explicitly set context error in the last "WatchResponse" message and close channel? // Currently, client contexts are overwritten with "valCtx" that never closes. // TODO(v3.4): configure watch retry policy, limit maximum retry number // (see https://go.etcd.io/etcd/issues/8980) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan // RequestProgress requests a progress notify response be sent in all watch channels. RequestProgress(ctx context.Context) error // Close closes the watcher and cancels all watch requests. Close() error }
Example (Watch) ¶
cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: dialTimeout, }) if err != nil { log.Fatal(err) } defer cli.Close() rch := cli.Watch(context.Background(), "foo") for wresp := range rch { for _, ev := range wresp.Events { fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value) } } // PUT "foo" : "bar"
Output:
Example (WatchWithPrefix) ¶
cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: dialTimeout, }) if err != nil { log.Fatal(err) } defer cli.Close() rch := cli.Watch(context.Background(), "foo", clientv3.WithPrefix()) for wresp := range rch { for _, ev := range wresp.Events { fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value) } } // PUT "foo1" : "bar"
Output:
Example (WatchWithProgressNotify) ¶
cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: dialTimeout, }) if err != nil { log.Fatal(err) } rch := cli.Watch(context.Background(), "foo", clientv3.WithProgressNotify()) wresp := <-rch fmt.Printf("wresp.Header.Revision: %d\n", wresp.Header.Revision) fmt.Println("wresp.IsProgressNotify:", wresp.IsProgressNotify()) // wresp.Header.Revision: 0 // wresp.IsProgressNotify: true
Output:
Example (WatchWithRange) ¶
cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: dialTimeout, }) if err != nil { log.Fatal(err) } defer cli.Close() // watches within ['foo1', 'foo4'), in lexicographical order rch := cli.Watch(context.Background(), "foo1", clientv3.WithRange("foo4")) for wresp := range rch { for _, ev := range wresp.Events { fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value) } } // PUT "foo1" : "bar" // PUT "foo2" : "bar" // PUT "foo3" : "bar"
Output:
func NewWatchFromWatchClient ¶
func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher
func NewWatcher ¶
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package balancer implements client balancer.
|
Package balancer implements client balancer. |
picker
Package picker defines/implements client balancer picker policy.
|
Package picker defines/implements client balancer picker policy. |
resolver/endpoint
Package endpoint resolves etcd entpoints using grpc targets of the form 'endpoint://<id>/<endpoint>'.
|
Package endpoint resolves etcd entpoints using grpc targets of the form 'endpoint://<id>/<endpoint>'. |
Package clientv3util contains utility functions derived from clientv3.
|
Package clientv3util contains utility functions derived from clientv3. |
Package concurrency implements concurrency operations on top of etcd such as distributed locks, barriers, and elections.
|
Package concurrency implements concurrency operations on top of etcd such as distributed locks, barriers, and elections. |
Package integration implements tests built upon embedded etcd, and focuses on correctness of etcd client.
|
Package integration implements tests built upon embedded etcd, and focuses on correctness of etcd client. |
Package leasing serves linearizable reads from a local cache by acquiring exclusive write access to keys through a client-side leasing protocol.
|
Package leasing serves linearizable reads from a local cache by acquiring exclusive write access to keys through a client-side leasing protocol. |
Package mirror implements etcd mirroring operations.
|
Package mirror implements etcd mirroring operations. |
Package namespace is a clientv3 wrapper that translates all keys to begin with a given prefix.
|
Package namespace is a clientv3 wrapper that translates all keys to begin with a given prefix. |
Package naming provides an etcd-backed gRPC resolver for discovering gRPC services.
|
Package naming provides an etcd-backed gRPC resolver for discovering gRPC services. |
Package ordering is a clientv3 wrapper that caches response header revisions to detect ordering violations from stale responses.
|
Package ordering is a clientv3 wrapper that caches response header revisions to detect ordering violations from stale responses. |
Package snapshot implements utilities around etcd snapshot.
|
Package snapshot implements utilities around etcd snapshot. |
Package yaml handles yaml-formatted clientv3 configuration data.
|
Package yaml handles yaml-formatted clientv3 configuration data. |