client

package
v0.0.0-...-02ea31e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 30, 2015 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Overview

Package client provides clients for accessing the various externally-facing Cockroach database endpoints.

KV Client

The KV client is a fully-featured client of Cockroach's key-value database. It provides a simple, synchronous interface well-suited to parallel updates and queries.

The simplest way to use the client is through the Run method. Run synchronously invokes the call and fills in the the reply and returns an error. The example below shows a get and a put.

kv := client.NewKV(nil, client.NewHTTPSender("localhost:8080", httpClient))

getCall := client.Get(proto.Key("a"))
getResp := getCall.Reply.(*proto.GetResponse)
if err := kv.Run(getCall); err != nil {
  log.Fatal(err)
}
if err := kv.Run(client.Put(proto.Key("b"), getResp.Value.Bytes)) err != nil {
  log.Fatal(err)
}

The API is synchronous, but accommodates efficient parallel updates and queries using the variadic Run method. An arbitrary number of calls may be passed to Run which are sent to Cockroach as part of a batch. Note however that such the individual API calls within a batch are not guaranteed to have atomic semantics. A transaction must be used to guarantee atomicity. A simple example of using the API which does two scans in parallel and then sends a sequence of puts in parallel:

kv := client.NewKV(nil, client.NewHTTPSender("localhost:8080", httpClient))

acScan := client.Scan(proto.Key("a"), proto.Key("c\x00"), 1000)
xzScan := client.Scan(proto.Key("x"), proto.Key("z\x00"), 1000)

// Run sends both scans in parallel and returns first error or nil.
if err := kv.Run(acScan, xzScan); err != nil {
  log.Fatal(err)
}

acResp := acScan.Reply.(*proto.ScanResponse)
xzResp := xzScan.Reply.(*proto.ScanResponse)

// Append maximum value from "a"-"c" to all values from "x"-"z".
max := []byte(nil)
for _, keyVal := range acResp.Rows {
  if bytes.Compare(max, keyVal.Value.Bytes) < 0 {
    max = keyVal.Value.Bytes
  }
}
var calls []*client.Call
for keyVal := range xzResp.Rows {
  putCall := client.Put(keyVal.Key, bytes.Join([][]byte{keyVal.Value.Bytes, max}, []byte(nil)))
  calls = append(calls, putCall)
}

// Run all puts for parallel execution.
if err := kv.Run(calls...); err != nil {
  log.Fatal(err)
}

Transactions are supported through the RunTransaction() method, which takes a retryable function, itself composed of the same simple mix of API calls typical of a non-transactional operation. Within the context of the RunTransaction call, all method invocations are transparently given necessary transactional details, and conflicts are handled with backoff/retry loops and transaction restarts as necessary. An example of using transactions with parallel writes:

kv := client.NewKV(nil, client.NewHTTPSender("localhost:8080", httpClient))

opts := &client.TransactionOptions{Name: "test", Isolation: proto.SERIALIZABLE}
err := kv.RunTransaction(opts, func(txn *client.Txn) error {
  for i := 0; i < 100; i++ {
    key := proto.Key(fmt.Sprintf("testkey-%02d", i))
    txn.Prepare(client.Put(key, []byte("test value")))
  }

  // Note that the Txn client is flushed automatically on transaction
  // commit. Invoking Flush after individual API methods is only
  // required if the result needs to be received to take conditional
  // action.
  return nil
})
if err != nil {
  log.Fatal(err)
}

Note that with Cockroach's lock-free transactions, clients should expect retries as a matter of course. This is why the transaction functionality is exposed through a retryable function. The retryable function should have no side effects which are not idempotent.

Transactions should endeavor to write using KV.Prepare calls. This allows writes to the same range to be batched together. In cases where the entire transaction affects only a single range, transactions can commit in a single round trip.

Index

Examples

Constants

View Source
const (
	// KVDBEndpoint is the URL path prefix which accepts incoming
	// HTTP requests for the KV API.
	KVDBEndpoint = "/kv/db/"
	// StatusTooManyRequests indicates client should retry due to
	// server having too many requests.
	StatusTooManyRequests = 429
)

Variables

View Source
var (
	// DefaultTxnRetryOptions are the standard retry options used
	// for transactions.
	// This is exported for testing purposes only.
	DefaultTxnRetryOptions = retry.Options{
		Backoff:     50 * time.Millisecond,
		MaxBackoff:  5 * time.Second,
		Constant:    2,
		MaxAttempts: 0,
		UseV1Info:   true,
	}
	DefaultClock = systemClock{}
)

Context defaults.

View Source
var HTTPRetryOptions = retry.Options{
	Backoff:     50 * time.Millisecond,
	MaxBackoff:  5 * time.Second,
	Constant:    2,
	MaxAttempts: 0,
	UseV1Info:   true,
}

HTTPRetryOptions sets the retry options for handling retryable HTTP errors and connection I/O errors.

Functions

func RegisterSender

func RegisterSender(scheme string, f NewSenderFunc)

RegisterSender registers the specified function to be used for creation of a new sender when the specified scheme is encountered.

Types

type Batch

type Batch struct {
	// Results contains an entry for each operation added to the batch. The order
	// of the results matches the order the operations were added to the
	// batch. For example:
	//
	//   b := client.B.Put("a", "1").Put("b", "2")
	//   _ = db.Run(b)
	//   // string(b.Results[0].Rows[0].Key) == "a"
	//   // string(b.Results[1].Rows[0].Key) == "b"
	Results []Result
	// contains filtered or unexported fields
}

Batch provides for the parallel execution of a number of database operations. Operations are added to the Batch and then the Batch is executed via either DB.Run, Tx.Run or Tx.Commit.

TODO(pmattis): Allow a timestamp to be specified which is applied to all operations within the batch.

Example
package main

import (
	"fmt"
	"log"

	"github.com/cockroachdb/cockroach/client"
	"github.com/cockroachdb/cockroach/server"
)

func setup() (*server.TestServer, *client.DB) {
	s := server.StartTestServer(nil)
	db, err := client.Open("https://root@" + s.ServingAddr() + "?certs=test_certs")
	if err != nil {
		log.Fatal(err)
	}
	return s, db
}

func main() {
	s, db := setup()
	defer s.Stop()

	b := db.B.Get("aa").Put("bb", "2")
	if err := db.Run(b); err != nil {
		panic(err)
	}
	for _, result := range b.Results {
		for _, row := range result.Rows {
			fmt.Printf("%s=%s\n", row.Key, row.ValueBytes())
		}
	}

}
Output:

aa=
bb=2

func (*Batch) CPut

func (b *Batch) CPut(key, value, expValue interface{}) *Batch

CPut conditionally sets the value for a key if the existing value is equal to expValue. To conditionally set a value only if there is no existing entry pass nil for expValue.

A new result will be appended to the batch which will contain a single row and Result.Err will indicate success or failure.

key can be either a byte slice, a string, a fmt.Stringer or an encoding.BinaryMarshaler. value can be any key type or a proto.Message.

func (*Batch) Del

func (b *Batch) Del(keys ...interface{}) *Batch

Del deletes one or more keys.

A new result will be appended to the batch and each key will have a corresponding row in the returned Result.

key can be either a byte slice, a string, a fmt.Stringer or an encoding.BinaryMarshaler.

func (*Batch) DelRange

func (b *Batch) DelRange(s, e interface{}) *Batch

DelRange deletes the rows between begin (inclusive) and end (exclusive).

A new result will be appended to the batch which will contain 0 rows and Result.Err will indicate success or failure.

key can be either a byte slice, a string, a fmt.Stringer or an encoding.BinaryMarshaler.

func (*Batch) Get

func (b *Batch) Get(keys ...interface{}) *Batch

Get retrieves one or more keys. A new result will be appended to the batch and each requested key will have a corresponding row in the Result.

r := db.Get("a", "b", "c")
// string(r.Rows[0].Key) == "a"
// string(r.Rows[1].Key) == "b"
// string(r.Rows[2].Key) == "c"

key can be either a byte slice, a string, a fmt.Stringer or an encoding.BinaryMarshaler.

func (*Batch) Inc

func (b *Batch) Inc(key interface{}, value int64) *Batch

Inc increments the integer value at key. If the key does not exist it will be created with an initial value of 0 which will then be incremented. If the key exists but was set using Put or CPut an error will be returned.

A new result will be appended to the batch which will contain a single row and Result.Err will indicate success or failure.

key can be either a byte slice, a string, a fmt.Stringer or an encoding.BinaryMarshaler.

func (*Batch) InternalAddCall

func (b *Batch) InternalAddCall(call Call)

InternalAddCall adds the specified call to the batch. It is intended for internal use only. It is an error to use InternalAddCall to execute operations that are available via the Batch methods (e.g. Get).

func (*Batch) Put

func (b *Batch) Put(key, value interface{}) *Batch

Put sets the value for a key.

A new result will be appended to the batch which will contain a single row and Result.Err will indicate success or failure.

key can be either a byte slice, a string, a fmt.Stringer or an encoding.BinaryMarshaler. value can be any key type or a proto.Message.

func (*Batch) Scan

func (b *Batch) Scan(s, e interface{}, maxRows int64) *Batch

Scan retrieves the rows between begin (inclusive) and end (exclusive).

A new result will be appended to the batch which will contain up to maxRows rows and Result.Err will indicate success or failure.

key can be either a byte slice, a string, a fmt.Stringer or an encoding.BinaryMarshaler.

type Call

type Call struct {
	Args  proto.Request  // The argument to the command
	Reply proto.Response // The reply from the command
	Err   error          // Error during call creation
	Post  func() error   // Function to be called after successful completion
}

A Call is a pending database API call.

func ConditionalPut

func ConditionalPut(key proto.Key, valueBytes, expValueBytes []byte) Call

ConditionalPut returns a Call object initialized to put value as a byte slice at key if the existing value at key equals expValueBytes.

func Delete

func Delete(key proto.Key) Call

Delete returns a Call object initialized to delete the value at key.

func DeleteRange

func DeleteRange(startKey, endKey proto.Key) Call

DeleteRange returns a Call object initialized to delete the values in the given key range (excluding the endpoint).

func Get

func Get(key proto.Key) Call

Get returns a Call object initialized to get the value at key.

func GetProto

func GetProto(key proto.Key, msg gogoproto.Message) Call

GetProto returns a Call object initialized to get the value at key and then to decode it as a protobuf message.

func Increment

func Increment(key proto.Key, increment int64) Call

Increment returns a Call object initialized to increment the value at key by increment.

func Put

func Put(key proto.Key, valueBytes []byte) Call

Put returns a Call object initialized to put value as a byte slice at key.

func PutProto

func PutProto(key proto.Key, msg gogoproto.Message) Call

PutProto returns a Call object initialized to put the proto message as a byte slice at key.

func Scan

func Scan(key, endKey proto.Key, maxResults int64) Call

Scan returns a Call object initialized to scan from start to end keys with max results.

func (*Call) Method

func (c *Call) Method() proto.Method

Method returns the method of the database command for the call.

type Clock

type Clock interface {
	// Now returns nanoseconds since the Jan 1, 1970 GMT.
	Now() int64
}

A Clock is an interface which provides the current time.

type Context

type Context struct {
	User            string
	UserPriority    int32
	TxnRetryOptions retry.Options
	Clock           Clock
}

A Context stores configuration to be used when creating a KV object.

func NewContext

func NewContext() *Context

NewContext creates a new context with default values.

type DB

type DB struct {
	// B is a helper to make creating a new batch and performing an
	// operation on it easer:
	//
	//   err := db.Run(db.B.Put("a", "1").Put("b", "2"))
	B batcher
	// contains filtered or unexported fields
}

DB is a database handle to a single cockroach cluster. A DB is safe for concurrent use by multiple goroutines.

func Open

func Open(addr string) (*DB, error)

Open creates a new database handle to the cockroach cluster specified by addr. The cluster is identified by a URL with the format:

<sender>://[<user>@]<host>:<port>[?certs=<dir>]

The URL scheme (<sender>) specifies which transport to use for talking to the cockroach cluster. Currently allowable values are: http, https, rpc, rpcs. The rpc and rpcs senders use a variant of Go's builtin rpc library for communication with the cluster. This protocol is lower overhead and more efficient than http. The decision between the encrypted (https, rpcs) and unencrypted senders (http, rpc) depends on the settings of the cluster. A given cluster supports either encrypted or unencrypted traffic, but not both.

If not specified, the <user> field defaults to "root".

The certs parameter can be used to override the default directory to use for client certificates. In tests, the directory "test_certs" uses the embedded test certificates.

func (*DB) AdminMerge

func (db *DB) AdminMerge(key interface{}) (Result, error)

AdminMerge merges the range containing key and the subsequent range. After the merge operation is complete, the range containing key will contain all of the key/value pairs of the subsequent range and the subsequent range will no longer exist.

key can be either a byte slice, a string, a fmt.Stringer or an encoding.BinaryMarshaler.

func (*DB) AdminSplit

func (db *DB) AdminSplit(key, splitKey interface{}) (Result, error)

AdminSplit splits the range containing key. If splitKey is non-nil it specifies the key to split the range at, otherwise an appropriate key is chosen automatically.

key can be either a byte slice, a string, a fmt.Stringer or an encoding.BinaryMarshaler.

func (*DB) CPut

func (db *DB) CPut(key, value, expValue interface{}) (Result, error)

CPut conditionally sets the value for a key if the existing value is equal to expValue. To conditionally set a value only if there is no existing entry pass nil for expValue.

The returned Result will contain a single row and Result.Err will indicate success or failure.

key can be either a byte slice, a string, a fmt.Stringer or an encoding.BinaryMarshaler. value can be any key type or a proto.Message.

Example
package main

import (
	"fmt"
	"log"

	"github.com/cockroachdb/cockroach/client"
	"github.com/cockroachdb/cockroach/server"
)

func setup() (*server.TestServer, *client.DB) {
	s := server.StartTestServer(nil)
	db, err := client.Open("https://root@" + s.ServingAddr() + "?certs=test_certs")
	if err != nil {
		log.Fatal(err)
	}
	return s, db
}

func main() {
	s, db := setup()
	defer s.Stop()

	if _, err := db.Put("aa", "1"); err != nil {
		panic(err)
	}
	if _, err := db.CPut("aa", "2", "1"); err != nil {
		panic(err)
	}
	result, err := db.Get("aa")
	if err != nil {
		panic(err)
	}
	fmt.Printf("aa=%s\n", result.Rows[0].ValueBytes())

	if _, err = db.CPut("aa", "3", "1"); err == nil {
		panic("expected error from conditional put")
	}
	result, err = db.Get("aa")
	if err != nil {
		panic(err)
	}
	fmt.Printf("aa=%s\n", result.Rows[0].ValueBytes())

	if _, err = db.CPut("bb", "4", "1"); err == nil {
		panic("expected error from conditional put")
	}
	result, err = db.Get("bb")
	if err != nil {
		panic(err)
	}
	fmt.Printf("bb=%s\n", result.Rows[0].ValueBytes())
	if _, err = db.CPut("bb", "4", nil); err != nil {
		panic(err)
	}
	result, err = db.Get("bb")
	if err != nil {
		panic(err)
	}
	fmt.Printf("bb=%s\n", result.Rows[0].ValueBytes())

}
Output:

aa=2
aa=2
bb=
bb=4

func (*DB) Del

func (db *DB) Del(keys ...interface{}) (Result, error)

Del deletes one or more keys.

Each key will have a corresponding row in the returned Result.

key can be either a byte slice, a string, a fmt.Stringer or an encoding.BinaryMarshaler.

Example
package main

import (
	"fmt"
	"log"

	"github.com/cockroachdb/cockroach/client"
	"github.com/cockroachdb/cockroach/server"
)

func setup() (*server.TestServer, *client.DB) {
	s := server.StartTestServer(nil)
	db, err := client.Open("https://root@" + s.ServingAddr() + "?certs=test_certs")
	if err != nil {
		log.Fatal(err)
	}
	return s, db
}

func main() {
	s, db := setup()
	defer s.Stop()

	if err := db.Run(db.B.Put("aa", "1").Put("ab", "2").Put("ac", "3")); err != nil {
		panic(err)
	}
	if _, err := db.Del("ab"); err != nil {
		panic(err)
	}
	result, err := db.Scan("a", "b", 100)
	if err != nil {
		panic(err)
	}
	for i, row := range result.Rows {
		fmt.Printf("%d: %s=%s\n", i, row.Key, row.ValueBytes())
	}

}
Output:

0: aa=1
1: ac=3

func (*DB) DelRange

func (db *DB) DelRange(begin, end interface{}) (Result, error)

DelRange deletes the rows between begin (inclusive) and end (exclusive).

The returned Result will contain 0 rows and Result.Err will indicate success or failure.

TODO(pmattis): Perhaps the result should return which rows were deleted.

key can be either a byte slice, a string, a fmt.Stringer or an encoding.BinaryMarshaler.

func (*DB) Get

func (db *DB) Get(keys ...interface{}) (Result, error)

Get retrieves one or more keys. Each requested key will have a corresponding row in the returned Result.

r := db.Get("a", "b", "c")
// string(r.Rows[0].Key) == "a"
// string(r.Rows[1].Key) == "b"
// string(r.Rows[2].Key) == "c"

key can be either a byte slice, a string, a fmt.Stringer or an encoding.BinaryMarshaler.

Example
package main

import (
	"fmt"
	"log"

	"github.com/cockroachdb/cockroach/client"
	"github.com/cockroachdb/cockroach/server"
)

func setup() (*server.TestServer, *client.DB) {
	s := server.StartTestServer(nil)
	db, err := client.Open("https://root@" + s.ServingAddr() + "?certs=test_certs")
	if err != nil {
		log.Fatal(err)
	}
	return s, db
}

func main() {
	s, db := setup()
	defer s.Stop()

	result, err := db.Get("aa")
	if err != nil {
		panic(err)
	}
	fmt.Printf("aa=%s\n", result.Rows[0].ValueBytes())

}
Output:

aa=

func (*DB) Inc

func (db *DB) Inc(key interface{}, value int64) (Result, error)

Inc increments the integer value at key. If the key does not exist it will be created with an initial value of 0 which will then be incremented. If the key exists but was set using Put or CPut an error will be returned.

The returned Result will contain a single row and Result.Err will indicate success or failure.

key can be either a byte slice, a string, a fmt.Stringer or an encoding.BinaryMarshaler.

Example
package main

import (
	"fmt"
	"log"

	"github.com/cockroachdb/cockroach/client"
	"github.com/cockroachdb/cockroach/server"
)

func setup() (*server.TestServer, *client.DB) {
	s := server.StartTestServer(nil)
	db, err := client.Open("https://root@" + s.ServingAddr() + "?certs=test_certs")
	if err != nil {
		log.Fatal(err)
	}
	return s, db
}

func main() {
	s, db := setup()
	defer s.Stop()

	if _, err := db.Inc("aa", 100); err != nil {
		panic(err)
	}
	result, err := db.Get("aa")
	if err != nil {
		panic(err)
	}
	fmt.Printf("aa=%d\n", result.Rows[0].ValueInt())

}
Output:

aa=100

func (*DB) Put

func (db *DB) Put(key, value interface{}) (Result, error)

Put sets the value for a key.

The returned Result will contain a single row and Result.Err will indicate success or failure.

key can be either a byte slice, a string, a fmt.Stringer or an encoding.BinaryMarshaler. value can be any key type or a proto.Message.

Example
package main

import (
	"fmt"
	"log"

	"github.com/cockroachdb/cockroach/client"
	"github.com/cockroachdb/cockroach/server"
)

func setup() (*server.TestServer, *client.DB) {
	s := server.StartTestServer(nil)
	db, err := client.Open("https://root@" + s.ServingAddr() + "?certs=test_certs")
	if err != nil {
		log.Fatal(err)
	}
	return s, db
}

func main() {
	s, db := setup()
	defer s.Stop()

	if _, err := db.Put("aa", "1"); err != nil {
		panic(err)
	}
	result, err := db.Get("aa")
	if err != nil {
		panic(err)
	}
	fmt.Printf("aa=%s\n", result.Rows[0].ValueBytes())

}
Output:

aa=1

func (*DB) Run

func (db *DB) Run(b *Batch) error

Run executes the operations queued up within a batch. Before executing any of the operations the batch is first checked to see if there were any errors during its construction (e.g. failure to marshal a proto message).

The operations within a batch are run in parallel and the order is non-deterministic. It is an unspecified behavior to modify and retrieve the same key within a batch.

Upon completion, Batch.Results will contain the results for each operation. The order of the results matches the order the operations were added to the batch.

func (*DB) Scan

func (db *DB) Scan(begin, end interface{}, maxRows int64) (Result, error)

Scan retrieves the rows between begin (inclusive) and end (exclusive).

The returned Result will contain up to maxRows rows and Result.Err will indicate success or failure.

key can be either a byte slice, a string, a fmt.Stringer or an encoding.BinaryMarshaler.

Example
package main

import (
	"fmt"
	"log"

	"github.com/cockroachdb/cockroach/client"
	"github.com/cockroachdb/cockroach/server"
)

func setup() (*server.TestServer, *client.DB) {
	s := server.StartTestServer(nil)
	db, err := client.Open("https://root@" + s.ServingAddr() + "?certs=test_certs")
	if err != nil {
		log.Fatal(err)
	}
	return s, db
}

func main() {
	s, db := setup()
	defer s.Stop()

	b := db.B.Put("aa", "1").Put("ab", "2").Put("bb", "3")
	if err := db.Run(b); err != nil {
		panic(err)
	}
	result, err := db.Scan("a", "b", 100)
	if err != nil {
		panic(err)
	}
	for i, row := range result.Rows {
		fmt.Printf("%d: %s=%s\n", i, row.Key, row.ValueBytes())
	}

}
Output:

0: aa=1
1: ab=2

func (*DB) Tx

func (db *DB) Tx(retryable func(tx *Tx) error) error

Tx executes retryable in the context of a distributed transaction. The transaction is automatically aborted if retryable returns any error aside from recoverable internal errors, and is automatically committed otherwise. The retryable function should have no side effects which could cause problems in the event it must be run more than once.

TODO(pmattis): Allow transaction options to be specified.

type HTTPSender

type HTTPSender struct {
	// contains filtered or unexported fields
}

HTTPSender is an implementation of KVSender which exposes the Key-Value database provided by a Cockroach cluster by connecting via HTTP to a Cockroach node. Overly-busy nodes will redirect this client to other nodes.

func NewHTTPSender

func NewHTTPSender(server string, ctx *base.Context) (*HTTPSender, error)

NewHTTPSender returns a new instance of HTTPSender.

func (*HTTPSender) Send

func (s *HTTPSender) Send(_ context.Context, call Call)

Send sends call to Cockroach via an HTTP post. HTTP response codes which are retryable are retried with backoff in a loop using the default retry options. Other errors sending HTTP request are retried indefinitely using the same client command ID to avoid reporting failure when in fact the command may have gone through and been executed successfully. We retry here to eventually get through with the same client command ID and be given the cached response.

type KV

type KV struct {
	// User is the default user to set on API calls. If User is set to
	// non-empty in call arguments, this value is ignored.
	User string
	// UserPriority is the default user priority to set on API calls. If
	// UserPriority is set non-zero in call arguments, this value is
	// ignored.
	UserPriority    int32
	TxnRetryOptions retry.Options
	Sender          KVSender
	// contains filtered or unexported fields
}

KV provides access to a KV store. A KV instance is safe for concurrent use by multiple goroutines.

func NewKV

func NewKV(ctx *Context, sender KVSender) *KV

NewKV creates a new instance of KV using the specified sender. To create a transactional client, the KV struct should be manually initialized in order to utilize a txnSender. Clock is used to formulate client command IDs, which provide idempotency on API calls and defaults to the system clock. implementation.

func (*KV) NewDB

func (kv *KV) NewDB() *DB

NewDB returns a new database handle using KV for the underlying communication.

TODO(pmattis): Remove once we plumb usage of DB everywhere.

func (*KV) Run

func (kv *KV) Run(calls ...Call) (err error)

Run runs the specified calls synchronously in a single batch and returns any errors.

func (*KV) RunTransaction

func (kv *KV) RunTransaction(opts *TransactionOptions, retryable func(txn *Txn) error) error

RunTransaction executes retryable in the context of a distributed transaction. The transaction is automatically aborted if retryable returns any error aside from recoverable internal errors, and is automatically committed otherwise. retryable should have no side effects which could cause problems in the event it must be run more than once. The opts struct contains transaction settings.

Example

This is an example for using the RunTransaction() method to submit multiple Key Value API operations inside a transaction.

package main

import (
	"bytes"
	"fmt"

	"github.com/cockroachdb/cockroach/client"
	"github.com/cockroachdb/cockroach/proto"
	"github.com/cockroachdb/cockroach/server"
	"github.com/cockroachdb/cockroach/storage"
	"github.com/cockroachdb/cockroach/testutils"
	"github.com/cockroachdb/cockroach/util/log"
)

func main() {
	// Using built-in test server for this example code.
	serv := server.StartTestServer(nil)
	defer serv.Stop()

	// Key Value Client initialization.
	sender, err := client.NewHTTPSender(serv.ServingAddr(), testutils.NewTestBaseContext())
	if err != nil {
		log.Fatal(err)
	}
	kvClient := client.NewKV(nil, sender)
	kvClient.User = storage.UserRoot

	// Create test data.
	numKVPairs := 10
	keys := make([]string, numKVPairs)
	values := make([][]byte, numKVPairs)
	for i := 0; i < numKVPairs; i++ {
		keys[i] = fmt.Sprintf("testkey-%03d", i)
		values[i] = []byte(fmt.Sprintf("testvalue-%03d", i))
	}

	// Insert all KV pairs inside a transaction.
	putOpts := client.TransactionOptions{Name: "example put"}
	err = kvClient.RunTransaction(&putOpts, func(txn *client.Txn) error {
		for i := 0; i < numKVPairs; i++ {
			txn.Prepare(client.Put(proto.Key(keys[i]), values[i]))
		}
		// Note that the KV client is flushed automatically on transaction
		// commit. Invoking Flush after individual API methods is only
		// required if the result needs to be received to take conditional
		// action.
		return nil
	})
	if err != nil {
		log.Fatal(err)
	}

	// Read back KV pairs inside a transaction.
	getResponses := make([]*proto.GetResponse, numKVPairs)
	getOpts := client.TransactionOptions{Name: "example get"}
	err = kvClient.RunTransaction(&getOpts, func(txn *client.Txn) error {
		for i := 0; i < numKVPairs; i++ {
			call := client.Get(proto.Key(keys[i]))
			getResponses[i] = call.Reply.(*proto.GetResponse)
			txn.Prepare(call)
		}
		return nil
	})
	if err != nil {
		log.Fatal(err)
	}

	// Check results.
	for i, getResp := range getResponses {
		if getResp.Value == nil {
			log.Fatal("No value returned for ", keys[i])
		} else {
			if !bytes.Equal(values[i], getResp.Value.Bytes) {
				log.Fatal("Data mismatch for ", keys[i], ", got: ", getResp.Value.Bytes)
			}
		}
	}

	fmt.Println("Transaction example done.")
}
Output:

Transaction example done.

type KVSender

type KVSender interface {
	// Send invokes the Call.Method with Call.Args and sets the result
	// in Call.Reply.
	Send(context.Context, Call)
}

KVSender is an interface for sending a request to a Key-Value database backend.

type KVSenderFunc

type KVSenderFunc func(context.Context, Call)

KVSenderFunc is an adapter to allow the use of ordinary functions as KVSenders.

func (KVSenderFunc) Send

func (f KVSenderFunc) Send(ctx context.Context, c Call)

Send calls f(c).

type KeyValue

type KeyValue struct {
	Key       []byte
	Value     interface{}
	Timestamp time.Time
}

KeyValue represents a single key/value pair and corresponding timestamp.

func (*KeyValue) Exists

func (kv *KeyValue) Exists() bool

Exists returns true iff the value exists.

func (*KeyValue) String

func (kv *KeyValue) String() string

func (*KeyValue) ValueBytes

func (kv *KeyValue) ValueBytes() []byte

ValueBytes returns the value as a byte slice. This method will panic if the value's type is not a byte slice.

func (*KeyValue) ValueInt

func (kv *KeyValue) ValueInt() int64

ValueInt returns the value as an int64. This method will panic if the value's type is not an int64.

func (*KeyValue) ValueProto

func (kv *KeyValue) ValueProto(msg gogoproto.Message) error

ValueProto parses the byte slice value as a proto message.

type NewSenderFunc

type NewSenderFunc func(u *url.URL, ctx *base.Context) (KVSender, error)

NewSenderFunc creates a new sender for the registered scheme.

type Result

type Result struct {

	// Err contains any error encountered when performing the operation.
	Err error
	// Rows contains the key/value pairs for the operation. The number of rows
	// returned varies by operation. For Get, Put, CPut, Inc and Del the number
	// of rows returned is the number of keys operated on. For Scan the number of
	// rows returned is the number or rows matching the scan capped by the
	// maxRows parameter. For DelRange Rows is nil.
	Rows []KeyValue
	// contains filtered or unexported fields
}

Result holds the result for a single DB or Tx operation (e.g. Get, Put, etc).

func (Result) String

func (r Result) String() string

type Runner

type Runner interface {
	Run(b *Batch) error
}

Runner only exports the Run method on a batch of operations.

type TransactionOptions

type TransactionOptions struct {
	Name         string // Concise desc of txn for debugging
	Isolation    proto.IsolationType
	UserPriority int32
}

TransactionOptions are parameters for use with KV.RunTransaction.

type Tx

type Tx struct {
	// B is a helper to make creating a new batch and performing an
	// operation on it easer:
	//
	//   err := db.Tx(func(tx *Tx) error {
	//     return tx.Commit(tx.B.Put("a", "1").Put("b", "2"))
	//   })
	B batcher
	// contains filtered or unexported fields
}

Tx is an in-progress distributed database transaction. A Tx is not safe for concurrent use by multiple goroutines.

func (*Tx) CPut

func (tx *Tx) CPut(key, value, expValue interface{}) (Result, error)

CPut conditionally sets the value for a key if the existing value is equal to expValue. To conditionally set a value only if there is no existing entry pass nil for expValue.

The returned Result will contain a single row and Result.Err will indicate success or failure.

key can be either a byte slice, a string, a fmt.Stringer or an encoding.BinaryMarshaler. value can be any key type or a proto.Message.

func (*Tx) Commit

func (tx *Tx) Commit(b *Batch) error

Commit executes the operations queued up within a batch and commits the transaction. Explicitly committing a transaction is optional, but more efficient than relying on the implicit commit performed when the transaction function returns without error.

Example
package main

import (
	"fmt"
	"log"

	"github.com/cockroachdb/cockroach/client"
	"github.com/cockroachdb/cockroach/server"
)

func setup() (*server.TestServer, *client.DB) {
	s := server.StartTestServer(nil)
	db, err := client.Open("https://root@" + s.ServingAddr() + "?certs=test_certs")
	if err != nil {
		log.Fatal(err)
	}
	return s, db
}

func main() {
	s, db := setup()
	defer s.Stop()

	err := db.Tx(func(tx *client.Tx) error {
		return tx.Commit(tx.B.Put("aa", "1").Put("ab", "2"))
	})
	if err != nil {
		panic(err)
	}

	result, err := db.Get("aa", "ab")
	if err != nil {
		panic(err)
	}
	for i, row := range result.Rows {
		fmt.Printf("%d: %s=%s\n", i, row.Key, row.ValueBytes())
	}

}
Output:

0: aa=1
1: ab=2

func (*Tx) Del

func (tx *Tx) Del(keys ...interface{}) (Result, error)

Del deletes one or more keys.

Each key will have a corresponding row in the returned Result.

key can be either a byte slice, a string, a fmt.Stringer or an encoding.BinaryMarshaler.

func (*Tx) DelRange

func (tx *Tx) DelRange(begin, end interface{}) (Result, error)

DelRange deletes the rows between begin (inclusive) and end (exclusive).

The returned Result will contain 0 rows and Result.Err will indicate success or failure.

key can be either a byte slice, a string, a fmt.Stringer or an encoding.BinaryMarshaler.

func (*Tx) Get

func (tx *Tx) Get(keys ...interface{}) (Result, error)

Get retrieves one or more keys. Each requested key will have a corresponding row in the returned Result.

r := db.Get("a", "b", "c")
// string(r.Rows[0].Key) == "a"
// string(r.Rows[1].Key) == "b"
// string(r.Rows[2].Key) == "c"

key can be either a byte slice, a string, a fmt.Stringer or an encoding.BinaryMarshaler.

func (*Tx) Inc

func (tx *Tx) Inc(key interface{}, value int64) (Result, error)

Inc increments the integer value at key. If the key does not exist it will be created with an initial value of 0 which will then be incremented. If the key exists but was set using Put or CPut an error will be returned.

The returned Result will contain a single row and Result.Err will indicate success or failure.

key can be either a byte slice, a string, a fmt.Stringer or an encoding.BinaryMarshaler.

func (*Tx) Put

func (tx *Tx) Put(key, value interface{}) (Result, error)

Put sets the value for a key.

The returned Result will contain a single row and Result.Err will indicate success or failure.

key can be either a byte slice, a string, a fmt.Stringer or an encoding.BinaryMarshaler. value can be any key type or a proto.Message.

func (*Tx) Run

func (tx *Tx) Run(b *Batch) error

Run executes the operations queued up within a batch. Before executing any of the operations the batch is first checked to see if there were any errors during its construction (e.g. failure to marshal a proto message).

The operations within a batch are run in parallel and the order is non-deterministic. It is an unspecified behavior to modify and retrieve the same key within a batch.

Upon completion, Batch.Results will contain the results for each operation. The order of the results matches the order the operations were added to the batch.

func (*Tx) Scan

func (tx *Tx) Scan(begin, end interface{}, maxRows int64) (Result, error)

Scan retrieves the rows between begin (inclusive) and end (exclusive).

The returned Result will contain up to maxRows rows and Result.Err will indicate success or failure.

key can be either a byte slice, a string, a fmt.Stringer or an encoding.BinaryMarshaler.

func (*Tx) SetSnapshotIsolation

func (tx *Tx) SetSnapshotIsolation()

SetSnapshotIsolation sets the transaction's isolation type to snapshot. Transactions default to serializable isolation. The isolation must be set before any operations are performed on the transaction.

TODO(pmattis): This isn't tested yet but will be as part of the conversion of client_test.go.

type Txn

type Txn struct {
	// contains filtered or unexported fields
}

Txn provides serial access to a KV store via Run and parallel access via Prepare and Flush. On receipt of TransactionRestartError, the transaction epoch is incremented and error passed to caller. On receipt of TransactionAbortedError, the transaction is re-created and the error passed to caller.

A Txn instance is not thread safe.

func (*Txn) Flush

func (t *Txn) Flush() error

Flush sends all previously prepared calls, buffered by invocations of Prepare(). The calls are organized into a single batch command and sent together. Flush returns nil if all prepared calls are executed successfully. Otherwise, Flush returns the first error, where calls are executed in the order in which they were prepared. After Flush returns, all prepared reply structs will be valid.

func (*Txn) Prepare

func (t *Txn) Prepare(calls ...Call)

Prepare accepts a KV API call, specified by arguments and a reply struct. The call will be buffered locally until the first call to Flush(), at which time it will be sent for execution as part of a batch call. Using Prepare/Flush parallelizes queries and updates and should be used where possible for efficiency.

For clients using an HTTP sender, Prepare/Flush allows multiple commands to be sent over the same connection. Prepare/Flush can dramatically improve efficiency by compressing multiple writes into a single atomic update in the event that the writes are to keys within a single range.

TODO(pmattis): Can Prepare/Flush be replaced with a Batch struct? Doing so could potentially make the Txn interface more symmetric with the KV interface, but potentially removes the optimization to send the EndTransaction in the same batch as the final set of prepared calls.

func (*Txn) Run

func (t *Txn) Run(calls ...Call) error

Run runs the specified calls synchronously in a single batch and returns any errors.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL