Documentation ¶
Overview ¶
Package client provides clients for accessing the various externally-facing Cockroach database endpoints.
KV Client ¶
The KV client is a fully-featured client of Cockroach's key-value database. It provides a simple, synchronous interface well-suited to parallel updates and queries.
The simplest way to use the client is through the Call method. Call synchronously invokes the method and returns the reply and an error. The example below shows a get and a put.
kv := client.NewKV(client.NewHTTPSender("localhost:8080", tlsConfig), nil) getResp := &proto.GetResponse{} if err := kv.Call(proto.Get, proto.GetArgs(proto.Key("a")), getResp); err != nil { log.Fatal(err) } putResp := &proto.PutResponse{} if _, err := kv.Call(proto.Put, proto.PutArgs(proto.Key("b"), getResp.Value.Bytes), putResp) err != nil { log.Fatal(err) }
The API is synchronous, but accommodates efficient parallel updates and queries using the Prepare method. An arbitrary number of Prepare invocations are followed up with a call to Flush. Until the Flush, requests are buffered locally in anticipation of being sent to Cockroach as part of a batch. The Flush batches prepared calls and sends them together. Note however that API calls which are buffered and sent together are not guaranteed to have atomic semantics. A transaction must be used to guarantee atomicity. A simple example of using the API which does two scans in parallel and then sends a sequence of puts in parallel:
kv := client.NewKV(client.NewHTTPSender("localhost:8080", tlsConfig), nil) acResp, xzResp := &proto.ScanResponse{}, &proto.ScanResponse{} kv.Prepare(proto.Scan, proto.ScanArgs(proto.Key("a"), proto.Key("c").Next()), acResp) kv.Prepare(proto.Scan, proto.ScanArgs(proto.Key("x"), proto.Key("z").Next()), xzResp) // Flush sends both scans in parallel and returns first error or nil. if err := kv.Flush(); err != nil { log.Fatal(err) } // Append maximum value from "a"-"c" to all values from "x"-"z". max := []byte(nil) for _, keyVal := range acResp.Rows { if bytes.Compare(max, keyVal.Value.Bytes) < 0 { max = keyVal.Value.Bytes } } for keyVal := range xzResp.Rows { putReq := proto.PutArgs(keyVal.Key, bytes.Join([][]byte{keyVal.Value.Bytes, max}, []byte(nil))) kv.Prepare(proto.Put, putReq, &proto.PutReponse{}) } // Flush all puts for parallel execution. if _, err := kv.Flush(); err != nil { log.Fatal(err) }
Transactions are supported through the RunTransaction() method, which takes a retryable function, itself composed of the same simple mix of API calls typical of a non-transactional operation. Within the context of the RunTransaction call, all method invocations are transparently given necessary transactional details, and conflicts are handled with backoff/retry loops and transaction restarts as necessary. An example of using transactions with parallel writes:
kv := client.NewKV(client.NewHTTPSender("localhost:8080", tlsConfig), nil) opts := client.TransactionOptions{Name: "test", Isolation: proto.SERIALIZABLE} err := kv.RunTransaction(opts, func(txn *client.KV) error { for i := 0; i < 100; i++ { key := proto.Key(fmt.Sprintf("testkey-%02d", i)) txn.Prepare(proto.Put, proto.PutArgs(key, []byte("test value")), &proto.PutResponse{}) } // Note that the KV client is flushed automatically on transaction // commit. Invoking Flush after individual API methods is only // required if the result needs to be received to take conditional // action. return nil }) if err != nil { log.Fatal(err) }
Note that with Cockroach's lock-free transactions, clients should expect retries as a matter of course. This is why the transaction functionality is exposed through a retryable function. The retryable function should have no side effects which are not idempotent.
Transactions should endeavor to write using KV.Prepare calls. This allows writes to the same range to be batched together. In cases where the entire transaction affects only a single range, transactions can commit in a single round trip.
Index ¶
- Constants
- Variables
- type Call
- type Clock
- type HTTPSender
- type KV
- func (kv *KV) Call(method string, args proto.Request, reply proto.Response) error
- func (kv *KV) Close()
- func (kv *KV) Flush() (err error)
- func (kv *KV) GetI(key proto.Key, iface interface{}) (bool, proto.Timestamp, error)
- func (kv *KV) GetProto(key proto.Key, msg gogoproto.Message) (bool, proto.Timestamp, error)
- func (kv *KV) Prepare(method string, args proto.Request, reply proto.Response)
- func (kv *KV) PreparePutProto(key proto.Key, msg gogoproto.Message) error
- func (kv *KV) PutI(key proto.Key, iface interface{}) error
- func (kv *KV) PutProto(key proto.Key, msg gogoproto.Message) error
- func (kv *KV) RunTransaction(opts *TransactionOptions, retryable func(txn *KV) error) error
- func (kv *KV) Sender() KVSender
- type KVSender
- type TransactionOptions
Examples ¶
Constants ¶
const ( // KVDBEndpoint is the URL path prefix which accepts incoming // HTTP requests for the KV API. KVDBEndpoint = "/kv/db/" // KVDBScheme is the scheme for connecting to the kvdb endpoint. // TODO(spencer): change this to CONSTANT https. We shouldn't be // supporting http here at all. KVDBScheme = "http" // StatusTooManyRequests indicates client should retry due to // server having too many requests. StatusTooManyRequests = 429 )
Variables ¶
var HTTPRetryOptions = util.RetryOptions{ Backoff: 50 * time.Millisecond, MaxBackoff: 5 * time.Second, Constant: 2, MaxAttempts: 0, }
HTTPRetryOptions sets the retry options for handling retryable HTTP errors and connection I/O errors.
var TxnRetryOptions = util.RetryOptions{ Backoff: 50 * time.Millisecond, MaxBackoff: 5 * time.Second, Constant: 2, MaxAttempts: 0, }
TxnRetryOptions sets the retry options for handling write conflicts.
Functions ¶
This section is empty.
Types ¶
type Call ¶
type Call struct { Method string // The name of the database command (see api.proto) Args proto.Request // The argument to the command Reply proto.Response // The reply from the command }
A Call is a pending database API call.
type Clock ¶
type Clock interface { // Now returns nanoseconds since the Jan 1, 1970 GMT. Now() int64 }
A Clock is an interface which provides the current time.
type HTTPSender ¶
type HTTPSender struct {
// contains filtered or unexported fields
}
HTTPSender is an implementation of KVSender which exposes the Key-Value database provided by a Cockroach cluster by connecting via HTTP to a Cockroach node. Overly-busy nodes will redirect this client to other nodes.
func NewHTTPSender ¶
func NewHTTPSender(server string, transport *http.Transport) *HTTPSender
NewHTTPSender returns a new instance of HTTPSender.
func (*HTTPSender) Send ¶
func (s *HTTPSender) Send(call *Call)
Send sends call to Cockroach via an HTTP post. HTTP response codes which are retryable are retried with backoff in a loop using the default retry options. Other errors sending HTTP request are retried indefinitely using the same client command ID to avoid reporting failure when in fact the command may have gone through and been executed successfully. We retry here to eventually get through with the same client command ID and be given the cached response.
type KV ¶
type KV struct { // User is the default user to set on API calls. If User is set to // non-empty in call arguments, this value is ignored. User string // UserPriority is the default user priority to set on API calls. If // UserPriority is set non-zero in call arguments, this value is // ignored. UserPriority int32 // contains filtered or unexported fields }
KV provides serial access to a KV store via Call and parallel access via Prepare and Flush. A KV instance is not thread safe.
func NewKV ¶
NewKV creates a new instance of KV using the specified sender. To create a transactional client, the KV struct should be manually initialized in order to utilize a txnSender. Clock is used to formulate client command IDs, which provide idempotency on API calls. If clock is nil, uses time.UnixNanos as default implementation.
func (*KV) Call ¶
Call invokes the KV command synchronously and returns the response and error, if applicable. If preceeding calls have been made to Prepare() without a call to Flush(), this call is prepared and then all prepared calls are flushed.
Example ¶
This is an example for using the Call() method to Put and then Get a value for a given key.
package main import ( "bytes" "fmt" "net/http" "testing" "github.com/cockroachdb/cockroach/client" "github.com/cockroachdb/cockroach/proto" "github.com/cockroachdb/cockroach/rpc" "github.com/cockroachdb/cockroach/server" "github.com/cockroachdb/cockroach/storage" "github.com/cockroachdb/cockroach/util/log" ) func StartTestServer(t *testing.T) *server.TestServer { s := &server.TestServer{} if err := s.Start(); err != nil { t.Fatalf("Could not start server: %v", err) } log.Infof("Test server listening on http: %s, rpc: %s", s.HTTPAddr, s.RPCAddr) return s } func main() { // Using built-in test server for this example code. serv := StartTestServer(nil) defer serv.Stop() // Replace with actual host:port address string (ex "localhost:8080") for server cluster. serverAddress := serv.HTTPAddr // Key Value Client initialization. sender := client.NewHTTPSender(serverAddress, &http.Transport{ TLSClientConfig: rpc.LoadInsecureTLSConfig().Config(), }) kvClient := client.NewKV(sender, nil) kvClient.User = storage.UserRoot defer kvClient.Close() key := proto.Key("a") value := []byte{1, 2, 3, 4} // Store test value. putResp := &proto.PutResponse{} if err := kvClient.Call(proto.Put, proto.PutArgs(key, value), putResp); err != nil { log.Fatal(err) } // Retrieve test value using same key. getResp := &proto.GetResponse{} if err := kvClient.Call(proto.Get, proto.GetArgs(key), getResp); err != nil { log.Fatal(err) } // Data validation. if getResp.Value == nil { log.Fatal("No value returned.") } if !bytes.Equal(value, getResp.Value.Bytes) { log.Fatal("Data mismatch on retrieved value.") } fmt.Println("Client example done.") }
Output: Client example done.
func (*KV) Flush ¶
Flush sends all previously prepared calls, buffered by invocations of Prepare(). The calls are organized into a single batch command and sent together. Flush returns nil if all prepared calls are executed successfully. Otherwise, Flush returns the first error, where calls are executed in the order in which they were prepared. After Flush returns, all prepared reply structs will be valid.
func (*KV) GetI ¶
GetI fetches the value at the specified key and gob-deserializes it into "value". Returns true on success or false if the key was not found. The timestamp of the write is returned as the second return value. The first result parameter is "ok": true if a value was found for the requested key; false otherwise. An error is returned on error fetching from underlying storage or deserializing value.
func (*KV) GetProto ¶
GetProto fetches the value at the specified key and unmarshals it using a protobuf decoder. See comments for GetI for details on return values.
func (*KV) Prepare ¶
Prepare accepts a KV API call, specified by method name, arguments and a reply struct. The call will be buffered locally until the first call to Flush(), at which time it will be sent for execution as part of a batch call. Using Prepare/Flush parallelizes queries and updates and should be used where possible for efficiency.
For clients using an HTTP sender, Prepare/Flush allows multiple commands to be sent over the same connection. For transactional clients, Prepare/Flush can dramatically improve efficiency by compressing multiple writes into a single atomic update in the event that the writes are to keys within a single range. However, using Prepare/Flush alone will not guarantee atomicity. Clients must use a transaction for that purpose.
The supplied reply struct will not be valid until after a call to Flush().
Example ¶
This is an example for using the Prepare() method to submit multiple Key Value API operations to be run in parallel. Flush() is then used to begin execution of all the prepared operations.
package main import ( "bytes" "fmt" "net/http" "testing" "github.com/cockroachdb/cockroach/client" "github.com/cockroachdb/cockroach/proto" "github.com/cockroachdb/cockroach/rpc" "github.com/cockroachdb/cockroach/server" "github.com/cockroachdb/cockroach/storage" "github.com/cockroachdb/cockroach/util/log" ) func StartTestServer(t *testing.T) *server.TestServer { s := &server.TestServer{} if err := s.Start(); err != nil { t.Fatalf("Could not start server: %v", err) } log.Infof("Test server listening on http: %s, rpc: %s", s.HTTPAddr, s.RPCAddr) return s } func main() { // Using built-in test server for this example code. serv := StartTestServer(nil) defer serv.Stop() // Replace with actual host:port address string (ex "localhost:8080") for server cluster. serverAddress := serv.HTTPAddr // Key Value Client initialization. sender := client.NewHTTPSender(serverAddress, &http.Transport{ TLSClientConfig: rpc.LoadInsecureTLSConfig().Config(), }) kvClient := client.NewKV(sender, nil) kvClient.User = storage.UserRoot defer kvClient.Close() // Insert test data. batchSize := 12 keys := make([]string, batchSize) values := make([][]byte, batchSize) for i := 0; i < batchSize; i++ { keys[i] = fmt.Sprintf("key-%03d", i) values[i] = []byte(fmt.Sprintf("value-%03d", i)) putReq := proto.PutArgs(proto.Key(keys[i]), values[i]) putResp := &proto.PutResponse{} kvClient.Prepare(proto.Put, putReq, putResp) } // Flush all puts for parallel execution. if err := kvClient.Flush(); err != nil { log.Fatal(err) } // Scan for the newly inserted rows in parallel. numScans := 3 rowsPerScan := batchSize / numScans scanResponses := make([]proto.ScanResponse, numScans) for i := 0; i < numScans; i++ { firstKey := proto.Key(keys[i*rowsPerScan]) lastKey := proto.Key(keys[((i+1)*rowsPerScan)-1]) kvClient.Prepare(proto.Scan, proto.ScanArgs(firstKey, lastKey.Next(), int64(rowsPerScan)), &scanResponses[i]) } // Flush all scans for parallel execution. if err := kvClient.Flush(); err != nil { log.Fatal(err) } // Check results which may be returned out-of-order from creation. var matchCount int for i := 0; i < numScans; i++ { for _, keyVal := range scanResponses[i].Rows { currKey := keyVal.Key currValue := keyVal.Value.Bytes for j, origKey := range keys { if bytes.Equal(currKey, proto.Key(origKey)) && bytes.Equal(currValue, values[j]) { matchCount++ } } } } if matchCount != batchSize { log.Fatal("Data mismatch.") } fmt.Println("Prepare Flush example done.") }
Output: Prepare Flush example done.
func (*KV) PreparePutProto ¶
PreparePutProto sets the given key to the protobuf-serialized byte string of msg. The resulting Put call is buffered and will not be sent until a subsequent call to Flush. Returns marshalling errors if encountered.
func (*KV) RunTransaction ¶
func (kv *KV) RunTransaction(opts *TransactionOptions, retryable func(txn *KV) error) error
RunTransaction executes retryable in the context of a distributed transaction. The transaction is automatically aborted if retryable returns any error aside from recoverable internal errors, and is automatically committed otherwise. retryable should have no side effects which could cause problems in the event it must be run more than once. The opts struct contains transaction settings.
Calling RunTransaction on the transactional KV client which is supplied to the retryable function is an error.
Example ¶
This is an example for using the RunTransaction() method to submit multiple Key Value API operations inside a transaction.
package main import ( "bytes" "fmt" "net/http" "testing" "github.com/cockroachdb/cockroach/client" "github.com/cockroachdb/cockroach/proto" "github.com/cockroachdb/cockroach/rpc" "github.com/cockroachdb/cockroach/server" "github.com/cockroachdb/cockroach/storage" "github.com/cockroachdb/cockroach/util/log" ) func StartTestServer(t *testing.T) *server.TestServer { s := &server.TestServer{} if err := s.Start(); err != nil { t.Fatalf("Could not start server: %v", err) } log.Infof("Test server listening on http: %s, rpc: %s", s.HTTPAddr, s.RPCAddr) return s } func main() { // Using built-in test server for this example code. serv := StartTestServer(nil) defer serv.Stop() // Replace with actual host:port address string (ex "localhost:8080") for server cluster. serverAddress := serv.HTTPAddr // Key Value Client initialization. sender := client.NewHTTPSender(serverAddress, &http.Transport{ TLSClientConfig: rpc.LoadInsecureTLSConfig().Config(), }) kvClient := client.NewKV(sender, nil) kvClient.User = storage.UserRoot defer kvClient.Close() // Create test data. numKVPairs := 10 keys := make([]string, numKVPairs) values := make([][]byte, numKVPairs) for i := 0; i < numKVPairs; i++ { keys[i] = fmt.Sprintf("testkey-%03d", i) values[i] = []byte(fmt.Sprintf("testvalue-%03d", i)) } // Insert all KV pairs inside a transaction. putOpts := client.TransactionOptions{Name: "example put"} err := kvClient.RunTransaction(&putOpts, func(txn *client.KV) error { for i := 0; i < numKVPairs; i++ { txn.Prepare(proto.Put, proto.PutArgs(proto.Key(keys[i]), values[i]), &proto.PutResponse{}) } // Note that the KV client is flushed automatically on transaction // commit. Invoking Flush after individual API methods is only // required if the result needs to be received to take conditional // action. return nil }) if err != nil { log.Fatal(err) } // Read back KV pairs inside a transaction. getResponses := make([]proto.GetResponse, numKVPairs) getOpts := client.TransactionOptions{Name: "example get"} err = kvClient.RunTransaction(&getOpts, func(txn *client.KV) error { for i := 0; i < numKVPairs; i++ { txn.Prepare(proto.Get, proto.GetArgs(proto.Key(keys[i])), &getResponses[i]) } return nil }) if err != nil { log.Fatal(err) } // Check results. for i, getResp := range getResponses { if getResp.Value == nil { log.Fatal("No value returned for ", keys[i]) } else { if !bytes.Equal(values[i], getResp.Value.Bytes) { log.Fatal("Data mismatch for ", keys[i], ", got: ", getResp.Value.Bytes) } } } fmt.Println("Transaction example done.") }
Output: Transaction example done.
type KVSender ¶
type KVSender interface { // Send invokes the Call.Method with Call.Args and sets the result // in Call.Reply. Send(*Call) // Close frees up resources in use by the sender. Close() }
KVSender is an interface for sending a request to a Key-Value database backend.
type TransactionOptions ¶
type TransactionOptions struct { Name string // Concise desc of txn for debugging Isolation proto.IsolationType }
TransactionOptions are parameters for use with KV.RunTransaction.