couchbase

package module
v0.0.0-...-ee2ae26 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 11, 2015 License: MIT Imports: 21 Imported by: 0

README

A smart client for couchbase in go

This is our previous, unofficial Go client library. If you are looking for our official, fully supported Couchbase Go SDK, please see: https://github.com/couchbaselabs/gocb.

This is an evolving package, but does provide a useful interface to a couchbase server including all of the pool/bucket discovery features, compatible key distribution with other clients, and vbucket motion awareness so application can continue to operate during rebalances.

It also supports view querying with source node randomization so you don't bang on all one node to do all the work.

Install

go get github.com/couchbase/go-couchbase

Example

c, err := couchbase.Connect("http://dev-couchbase.example.com:8091/")
if err != nil {
	log.Fatalf("Error connecting:  %v", err)
}

pool, err := c.GetPool("default")
if err != nil {
	log.Fatalf("Error getting pool:  %v", err)
}

bucket, err := pool.GetBucket("default")
if err != nil {
	log.Fatalf("Error getting bucket:  %v", err)
}

bucket.Set("someKey", 0, []string{"an", "example", "list"})

Documentation

Overview

Package couchbase provides a smart client for go.

Usage:

client, err := couchbase.Connect("http://myserver:8091/")
handleError(err)
pool, err := client.GetPool("default")
handleError(err)
bucket, err := pool.GetBucket("MyAwesomeBucket")
handleError(err)
...

or a shortcut for the bucket directly

bucket, err := couchbase.GetBucket("http://myserver:8091/", "default", "default")

in any case, you can specify authentication credentials using standard URL userinfo syntax:

b, err := couchbase.GetBucket("http://bucketname:bucketpass@myserver:8091/",
        "default", "bucket")

Index

Constants

View Source
const (
	// Raw specifies that the value is raw []byte or nil; don't
	// JSON-encode it.
	Raw = WriteOptions(1 << iota)
	// AddOnly indicates an item should only be written if it
	// doesn't exist, otherwise ErrKeyExists is returned.
	AddOnly
	// Persist causes the operation to block until the server
	// confirms the item is persisted.
	Persist
	// Indexable causes the operation to block until it's availble via the index.
	Indexable
	// Append indicates the given value should be appended to the
	// existing value for the given key.
	Append
)
View Source
const ABS_MAX_RETRIES = 10
View Source
const ABS_MIN_RETRIES = 3
View Source
const DEFAULT_WINDOW_SIZE = 20 * 1024 * 1024 // 20 Mb
View Source
const HTTP_MAX_RETRY = 5

arbitary number, may need to be tuned #FIXME

View Source
const START_NODE_ID = -1
View Source
const UpdateCancel = memcached.CASQuit

Return this as the error from an UpdateFunc to cancel the Update operation.

Variables

View Source
var ClientOpCallback func(opname, k string, start time.Time, err error)

ClientOpCallback is called for each invocation of Do.

View Source
var ConnPoolAvailWaitTime = time.Millisecond

ConnPoolAvailWaitTime is the amount of time to wait for an existing connection from the pool before considering the creation of a new one.

View Source
var ConnPoolCallback func(host string, source string, start time.Time, err error)

ConnPoolTimeout is notified whenever connections are acquired from a pool.

View Source
var ConnPoolTimeout = time.Hour * 24 * 30

Default timeout for retrieving a connection from the pool.

View Source
var ErrKeyExists = errors.New("key exists")

Error returned from Write with AddOnly flag, when key already exists in the bucket.

View Source
var ErrOverwritten = errors.New("overwritten")

Returned from WaitForPersistence (or Write, if the Persistent or Indexable flag is used) if the value has been overwritten by another before being persisted.

View Source
var ErrTimeout = errors.New("timeout")

Returned from WaitForPersistence (or Write, if the Persistent or Indexable flag is used) if the value hasn't been persisted by the timeout interval

View Source
var HTTPClient = &http.Client{Transport: HTTPTransport}
View Source
var MaxBulkRetries = 1000

Maximum number of times to retry a chunk of a bulk get on error.

View Source
var MaxIdleConnsPerHost = 256

HTTPClient to use for REST and view operations.

View Source
var PoolOverflow = PoolSize

PoolOverflow is the number of overflow connections allowed in a pool.

View Source
var PoolSize = 4

PoolSize is the size of each connection pool (per host).

View Source
var SlowServerCallWarningThreshold time.Duration

If this is set to a nonzero duration, Do() and ViewCustom() will log a warning if the call takes longer than that.

View Source
var TimeoutError = errors.New("timeout waiting to build connection")

Error raised when a connection can't be retrieved from a pool.

View Source
var ViewCallback func(ddoc, name string, start time.Time, err error)

ViewCallback is called for each view invocation.

Functions

func CleanupHost

func CleanupHost(h, commonSuffix string) string

CleanupHost returns the hostname with the given suffix removed.

func FindCommonSuffix

func FindCommonSuffix(input []string) string

FindCommonSuffix returns the longest common suffix from the given strings.

func IsKeyNoEntError

func IsKeyNoEntError(err error) bool

Return true if error is KEY_ENOENT. Required by cbq-engine

func ParseURL

func ParseURL(urlStr string) (result *url.URL, err error)

ParseURL is a wrapper around url.Parse with some sanity-checking

func SetViewUpdateParams

func SetViewUpdateParams(baseU string, params map[string]interface{}) (viewOpts map[string]interface{}, err error)

Set viewUpdateDaemonOptions

Types

type AuthHandler

type AuthHandler interface {
	GetCredentials() (string, string, string)
}

AuthHandler is a callback that gets the auth username and password for the given bucket.

type Bucket

type Bucket struct {
	AuthType            string             `json:"authType"`
	Capabilities        []string           `json:"bucketCapabilities"`
	CapabilitiesVersion string             `json:"bucketCapabilitiesVer"`
	Type                string             `json:"bucketType"`
	Name                string             `json:"name"`
	NodeLocator         string             `json:"nodeLocator"`
	Quota               map[string]float64 `json:"quota,omitempty"`
	Replicas            int                `json:"replicaNumber"`
	Password            string             `json:"saslPassword"`
	URI                 string             `json:"uri"`
	StreamingURI        string             `json:"streamingUri"`
	LocalRandomKeyURI   string             `json:"localRandomKeyUri,omitempty"`
	UUID                string             `json:"uuid"`
	DDocs               struct {
		URI string `json:"uri"`
	} `json:"ddocs,omitempty"`
	BasicStats  map[string]interface{} `json:"basicStats,omitempty"`
	Controllers map[string]interface{} `json:"controllers,omitempty"`

	// These are used for JSON IO, but isn't used for processing
	// since it needs to be swapped out safely.
	VBSMJson  VBucketServerMap `json:"vBucketServerMap"`
	NodesJSON []Node           `json:"nodes"`
	// contains filtered or unexported fields
}

Bucket is the primary entry point for most data operations.

func GetBucket

func GetBucket(endpoint, poolname, bucketname string) (*Bucket, error)

GetBucket is a convenience function for getting a named bucket from a URL

func (*Bucket) Add

func (b *Bucket) Add(k string, exp int, v interface{}) (added bool, err error)

Add adds a value to this bucket; like Set except that nothing happens if the key exists. The value will be serialized into a JSON document.

func (*Bucket) AddRaw

func (b *Bucket) AddRaw(k string, exp int, v []byte) (added bool, err error)

AddRaw adds a value to this bucket; like SetRaw except that nothing happens if the key exists. The value will be stored as raw bytes.

func (*Bucket) Append

func (b *Bucket) Append(k string, data []byte) error

Append appends raw data to an existing item.

func (*Bucket) Cas

func (b *Bucket) Cas(k string, exp int, cas uint64, v interface{}) error

Set a value in this bucket with Cas

func (*Bucket) CasRaw

func (b *Bucket) CasRaw(k string, exp int, cas uint64, v interface{}) error

Set a value in this bucket with Cas without json encoding it

func (*Bucket) Close

func (b *Bucket) Close()

Close marks this bucket as no longer needed, closing connections it may have open.

func (Bucket) CommonAddressSuffix

func (b Bucket) CommonAddressSuffix() string

CommonAddressSuffix finds the longest common suffix of all host:port strings in the node list.

func (*Bucket) Delete

func (b *Bucket) Delete(k string) error

Delete a key from this bucket.

func (*Bucket) DeleteDDoc

func (b *Bucket) DeleteDDoc(docname string) error

DeleteDDoc removes a design document.

func (*Bucket) Do

func (b *Bucket) Do(k string, f func(mc *memcached.Client, vb uint16) error) (err error)

Do executes a function on a memcached connection to the node owning key "k"

Note that this automatically handles transient errors by replaying your function on a "not-my-vbucket" error, so don't assume your command will only be executed only once.

func (*Bucket) Get

func (b *Bucket) Get(k string, rv interface{}) error

Get a value from this bucket. The value is expected to be a JSON stream and will be deserialized into rv.

func (*Bucket) GetBulk

func (b *Bucket) GetBulk(keys []string) (map[string]*gomemcached.MCResponse, error)

GetBulk fetches multiple keys concurrently.

Unlike more convenient GETs, the entire response is returned in the map for each key. Keys that were not found will not be included in the map.

func (*Bucket) GetDDoc

func (b *Bucket) GetDDoc(docname string, into interface{}) error

GetDDoc retrieves a specific a design doc.

func (*Bucket) GetDDocWithRetry

func (b *Bucket) GetDDocWithRetry(docname string, into interface{}) error

func (*Bucket) GetDDocs

func (b *Bucket) GetDDocs() (DDocsResult, error)

GetDDocs lists all design documents

func (*Bucket) GetDDocsWithRetry

func (b *Bucket) GetDDocsWithRetry() (DDocsResult, error)

func (*Bucket) GetFailoverLogs

func (b *Bucket) GetFailoverLogs(vBuckets []uint16) (FailoverLog, error)

GetFailoverLogs, get the failover logs for a set of vbucket ids

func (*Bucket) GetPool

func (b *Bucket) GetPool() *Pool

GetPool gets the pool to which this bucket belongs.

func (*Bucket) GetRaw

func (b *Bucket) GetRaw(k string) ([]byte, error)

GetRaw gets a raw value from this bucket. No marshaling is performed.

func (*Bucket) GetStats

func (b *Bucket) GetStats(which string) map[string]map[string]string

GetStats gets a set of stats from all servers.

Returns a map of server ID -> map of stat key to map value.

func (*Bucket) GetVBmap

func (b *Bucket) GetVBmap(addrs []string) (map[string][]uint16, error)

func (*Bucket) Gets

func (b *Bucket) Gets(k string, rv interface{}, caso *uint64) error

Gets gets a value from this bucket, including its CAS counter. The value is expected to be a JSON stream and will be deserialized into rv.

func (*Bucket) GetsRaw

func (b *Bucket) GetsRaw(k string) (data []byte, flags int,
	cas uint64, err error)

GetsRaw gets a raw value from this bucket including its CAS counter and flags.

func (Bucket) HealthyNodes

func (b Bucket) HealthyNodes() []Node

return the list of healthy nodes

func (*Bucket) Incr

func (b *Bucket) Incr(k string, amt, def uint64, exp int) (val uint64, err error)

Incr increments the value at a given key.

func (Bucket) NodeAddresses

func (b Bucket) NodeAddresses() []string

NodeAddresses gets the (sorted) list of memcached node addresses (hostname:port).

func (Bucket) Nodes

func (b Bucket) Nodes() []Node

Nodes returns teh current list of nodes servicing this bucket.

func (*Bucket) Observe

func (b *Bucket) Observe(k string) (result memcached.ObserveResult, err error)

Observe observes the current state of a document.

func (*Bucket) PutDDoc

func (b *Bucket) PutDDoc(docname string, value interface{}) error

PutDDoc installs a design document.

func (*Bucket) Refresh

func (b *Bucket) Refresh() error

func (*Bucket) Set

func (b *Bucket) Set(k string, exp int, v interface{}) error

Set a value in this bucket. The value will be serialized into a JSON document.

func (*Bucket) SetRaw

func (b *Bucket) SetRaw(k string, exp int, v []byte) error

SetRaw sets a value in this bucket without JSON encoding it.

func (*Bucket) StartTapFeed

func (b *Bucket) StartTapFeed(args *memcached.TapArguments) (*TapFeed, error)

StartTapFeed creates and starts a new Tap feed

func (*Bucket) StartUprFeed

func (b *Bucket) StartUprFeed(name string, sequence uint32) (*UprFeed, error)

StartUprFeed creates and starts a new Upr feed No data will be sent on the channel unless vbuckets streams are requested

func (*Bucket) Update

func (b *Bucket) Update(k string, exp int, callback UpdateFunc) error

Update performs a Safe update of a document, avoiding conflicts by using CAS.

The callback function will be invoked with the current raw document contents (or nil if the document doesn't exist); it should return the updated raw contents (or nil to delete.) If it decides not to change anything it can return UpdateCancel as the error.

If another writer modifies the document between the get and the set, the callback will be invoked again with the newer value.

func (*Bucket) VBHash

func (b *Bucket) VBHash(key string) uint32

VBHash finds the vbucket for the given key.

func (*Bucket) VBServerMap

func (b *Bucket) VBServerMap() *VBucketServerMap

VBServerMap returns the current VBucketServerMap.

func (*Bucket) View

func (b *Bucket) View(ddoc, name string, params map[string]interface{}) (ViewResult, error)

View executes a view.

The ddoc parameter is just the bare name of your design doc without the "_design/" prefix.

Parameters are string keys with values that correspond to couchbase view parameters. Primitive should work fairly naturally (booleans, ints, strings, etc...) and other values will attempt to be JSON marshaled (useful for array indexing on on view keys, for example).

Example:

res, err := couchbase.View("myddoc", "myview", map[string]interface{}{
    "group_level": 2,
    "startkey_docid":    []interface{}{"thing"},
    "endkey_docid":      []interface{}{"thing", map[string]string{}},
    "stale": false,
    })

func (*Bucket) ViewCustom

func (b *Bucket) ViewCustom(ddoc, name string, params map[string]interface{},
	vres interface{}) (err error)

ViewCustom performs a view request that can map row values to a custom type.

See the source to View for an example usage.

func (*Bucket) ViewURL

func (b *Bucket) ViewURL(ddoc, name string,
	params map[string]interface{}) (string, error)

ViewURL constructs a URL for a view with the given ddoc, view name, and parameters.

func (*Bucket) WaitForPersistence

func (b *Bucket) WaitForPersistence(k string, cas uint64, deletion bool) error

WaitForPersistence waits for an item to be considered durable.

Besides transport errors, ErrOverwritten may be returned if the item is overwritten before it reaches durability. ErrTimeout may occur if the item isn't found durable in a reasonable amount of time.

func (*Bucket) Write

func (b *Bucket) Write(k string, flags, exp int, v interface{},
	opt WriteOptions) (err error)

General-purpose value setter.

The Set, Add and Delete methods are just wrappers around this. The interpretation of `v` depends on whether the `Raw` option is given. If it is, v must be a byte array or nil. (A nil value causes a delete.) If `Raw` is not given, `v` will be marshaled as JSON before being written. It must be JSON-marshalable and it must not be nil.

func (*Bucket) WriteCas

func (b *Bucket) WriteCas(k string, flags, exp int, cas uint64, v interface{},
	opt WriteOptions) (err error)

func (*Bucket) WriteUpdate

func (b *Bucket) WriteUpdate(k string, exp int, callback WriteUpdateFunc) error

WriteUpdate performs a Safe update of a document, avoiding conflicts by using CAS. WriteUpdate is like Update, except that the callback can return a set of WriteOptions, of which Persist and Indexable are recognized: these cause the call to wait until the document update has been persisted to disk and/or become available to index.

type BucketAuth

type BucketAuth struct {
	// contains filtered or unexported fields
}

func (*BucketAuth) GetCredentials

func (ba *BucketAuth) GetCredentials() (string, string, string)

type BucketInfo

type BucketInfo struct {
	Name     string // name of bucket
	Password string // SASL password of bucket
}

func GetBucketList

func GetBucketList(baseU string) (bInfo []BucketInfo, err error)

Get SASL buckets

type Client

type Client struct {
	BaseURL *url.URL

	Info Pools
	// contains filtered or unexported fields
}

A Client is the starting point for all services across all buckets in a Couchbase cluster.

func Connect

func Connect(baseU string) (Client, error)

Connect to a couchbase cluster. An authentication handler will be created from the userinfo in the URL if provided.

func ConnectWithAuth

func ConnectWithAuth(baseU string, ah AuthHandler) (c Client, err error)

ConnectWithAuth connects to a couchbase cluster with the given authentication handler.

func ConnectWithAuthCreds

func ConnectWithAuthCreds(baseU, username, password string) (c Client, err error)

ConnectWithAuthCreds connects to a couchbase cluster with the give authorization creds returned by cb_auth

func (*Client) GetPool

func (c *Client) GetPool(name string) (p Pool, err error)

GetPool gets a pool from within the couchbase cluster (usually "default").

func (*Client) GetPoolServices

func (c *Client) GetPoolServices(name string) (ps PoolServices, err error)

GetPoolServices returns all the bucket-independent services in a pool. (See "Exposing services outside of bucket context" in http://goo.gl/uuXRkV)

type DDoc

type DDoc struct {
	Language string                    `json:"language,omitempty"`
	Views    map[string]ViewDefinition `json:"views"`
}

DDoc is the document body of a design document specifying a view.

type DDocsResult

type DDocsResult struct {
	Rows []struct {
		DDoc struct {
			Meta map[string]interface{}
			JSON DDoc
		} `json:"doc"`
	} `json:"rows"`
}

DDocsResult represents the result from listing the design documents.

type DocID

type DocID string

DocID is the document ID type for the startkey_docid parameter in views.

type FailoverLog

type FailoverLog map[uint16]memcached.FailoverLog

type FeedInfo

type FeedInfo struct {
	// contains filtered or unexported fields
}

UprFeed from a single connection

type GenericMcdAuthHandler

type GenericMcdAuthHandler interface {
	AuthHandler
	AuthenticateMemcachedConn(host string, conn *memcached.Client) error
}

GenericMcdAuthHandler is a kind of AuthHandler that performs special auth exchange (like non-standard auth, possibly followed by select-bucket).

type HTTPAuthHandler

type HTTPAuthHandler interface {
	AuthHandler
	SetCredsForRequest(req *http.Request) error
}

HTTPAuthHandler is kind of AuthHandler that performs more general for outgoing http requests than is possible via simple GetCredentials() call (i.e. digest auth or different auth per different destinations).

type MultiBucketAuthHandler

type MultiBucketAuthHandler interface {
	AuthHandler
	ForBucket(bucket string) AuthHandler
}

MultiBucketAuthHandler is kind of AuthHandler that may perform different auth for different buckets.

type Node

type Node struct {
	ClusterCompatibility int                `json:"clusterCompatibility"`
	ClusterMembership    string             `json:"clusterMembership"`
	CouchAPIBase         string             `json:"couchApiBase"`
	Hostname             string             `json:"hostname"`
	InterestingStats     map[string]float64 `json:"interestingStats,omitempty"`
	MCDMemoryAllocated   float64            `json:"mcdMemoryAllocated"`
	MCDMemoryReserved    float64            `json:"mcdMemoryReserved"`
	MemoryFree           float64            `json:"memoryFree"`
	MemoryTotal          float64            `json:"memoryTotal"`
	OS                   string             `json:"os"`
	Ports                map[string]int     `json:"ports"`
	Status               string             `json:"status"`
	Uptime               int                `json:"uptime,string"`
	Version              string             `json:"version"`
	ThisNode             bool               `json:"thisNode,omitempty"`
}

A Node is a computer in a cluster running the couchbase software.

type NodeServices

type NodeServices struct {
	Services map[string]int `json:"services,omitempty"`
	Hostname string         `json:"hostname"`
	ThisNode bool           `json:"thisNode"`
}

NodeServices is all the bucket-independent services running on a node (given by Hostname)

type Pool

type Pool struct {
	BucketMap map[string]Bucket
	Nodes     []Node

	BucketURL map[string]string `json:"buckets"`
	// contains filtered or unexported fields
}

A Pool of nodes and buckets.

func (*Pool) GetBucket

func (p *Pool) GetBucket(name string) (*Bucket, error)

GetBucket gets a bucket from within this pool.

func (*Pool) GetBucketWithAuth

func (p *Pool) GetBucketWithAuth(bucket, username, password string) (*Bucket, error)

GetBucket gets a bucket from within this pool.

func (*Pool) GetClient

func (p *Pool) GetClient() *Client

GetClient gets the client from which we got this pool.

type PoolServices

type PoolServices struct {
	Rev      int            `json:"rev"`
	NodesExt []NodeServices `json:"nodesExt"`
}

PoolServices is all the bucket-independent services in a pool

type Pools

type Pools struct {
	ComponentsVersion     map[string]string `json:"componentsVersion,omitempty"`
	ImplementationVersion string            `json:"implementationVersion"`
	IsAdmin               bool              `json:"isAdminCreds"`
	UUID                  string            `json:"uuid"`
	Pools                 []RestPool        `json:"pools"`
}

Pools represents the collection of pools as returned from the REST API.

type RestPool

type RestPool struct {
	Name         string `json:"name"`
	StreamingURI string `json:"streamingUri"`
	URI          string `json:"uri"`
}

RestPool represents a single pool returned from the pools REST API.

type TapFeed

type TapFeed struct {
	C <-chan memcached.TapEvent
	// contains filtered or unexported fields
}

A TapFeed streams mutation events from a bucket.

Events from the bucket can be read from the channel 'C'. Remember to call Close() on it when you're done, unless its channel has closed itself already.

func (*TapFeed) Close

func (feed *TapFeed) Close() error

Close a Tap feed.

type UpdateFunc

type UpdateFunc func(current []byte) (updated []byte, err error)

An UpdateFunc is a callback function to update a document

type UprFeed

type UprFeed struct {
	C <-chan *memcached.UprEvent
	// contains filtered or unexported fields
}

A UprFeed streams mutation events from a bucket.

Events from the bucket can be read from the channel 'C'. Remember to call Close() on it when you're done, unless its channel has closed itself already.

func (*UprFeed) Close

func (feed *UprFeed) Close() error

Close a Upr feed.

func (*UprFeed) UprCloseStream

func (feed *UprFeed) UprCloseStream(vb, opaqueMSB uint16) error

UprCloseStream ends a vbucket stream.

func (*UprFeed) UprRequestStream

func (feed *UprFeed) UprRequestStream(vb uint16, opaque uint16, flags uint32,
	vuuid, startSequence, endSequence, snapStart, snapEnd uint64) error

UprRequestStream starts a stream for a vb on a feed

type VBucketServerMap

type VBucketServerMap struct {
	HashAlgorithm string   `json:"hashAlgorithm"`
	NumReplicas   int      `json:"numReplicas"`
	ServerList    []string `json:"serverList"`
	VBucketMap    [][]int  `json:"vBucketMap"`
}

VBucketServerMap is the a mapping of vbuckets to nodes.

type ViewDefinition

type ViewDefinition struct {
	Map    string `json:"map"`
	Reduce string `json:"reduce,omitempty"`
}

ViewDefinition represents a single view within a design document.

type ViewError

type ViewError struct {
	From   string
	Reason string
}

A ViewError is a node-specific error indicating a partial failure within a view result.

func (ViewError) Error

func (ve ViewError) Error() string

type ViewResult

type ViewResult struct {
	TotalRows int `json:"total_rows"`
	Rows      []ViewRow
	Errors    []ViewError
}

ViewResult holds the entire result set from a view request, including the rows and the errors.

type ViewRow

type ViewRow struct {
	ID    string
	Key   interface{}
	Value interface{}
	Doc   *interface{}
}

ViewRow represents a single result from a view.

Doc is present only if include_docs was set on the request.

type WriteOptions

type WriteOptions int

WriteOptions is the set of option flags availble for the Write method. They are ORed together to specify the desired request.

func (WriteOptions) String

func (w WriteOptions) String() string

String representation of WriteOptions

type WriteUpdateFunc

type WriteUpdateFunc func(current []byte) (updated []byte, opt WriteOptions, err error)

A WriteUpdateFunc is a callback function to update a document

Directories

Path Synopsis
Package cbdatasource streams data from a Couchbase cluster.
Package cbdatasource streams data from a Couchbase cluster.
examples
upr_bench
Tool receives raw events from go-couchbase UPR client.
Tool receives raw events from go-couchbase UPR client.
tools
Package couchbaseutil offers some convenience functions for apps that use couchbase.
Package couchbaseutil offers some convenience functions for apps that use couchbase.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL