couchbase

package
v0.0.0-...-8836d2b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 27, 2018 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Overview

Package couchbase provides a smart client for go.

Usage:

client, err := couchbase.Connect("http://myserver:8091/")
handleError(err)
pool, err := client.GetPool("default")
handleError(err)
bucket, err := pool.GetBucket("MyAwesomeBucket")
handleError(err)
...

or a shortcut for the bucket directly

bucket, err := couchbase.GetBucket("http://myserver:8091/", "default", "default")

in any case, you can specify authentication credentials using standard URL userinfo syntax:

b, err := couchbase.GetBucket("http://bucketname:bucketpass@myserver:8091/",
        "default", "bucket")

Index

Constants

View Source
const (
	// Raw specifies that the value is raw []byte or nil; don't
	// JSON-encode it.
	Raw = WriteOptions(1 << iota)
	// AddOnly indicates an item should only be written if it
	// doesn't exist, otherwise ErrKeyExists is returned.
	AddOnly
	// Persist causes the operation to block until the server
	// confirms the item is persisted.
	Persist
	// Indexable causes the operation to block until it's availble via the index.
	Indexable
	// Append indicates the given value should be appended to the
	// existing value for the given key.
	Append
)
View Source
const DCP_ADD_STREAM_ACTIVE_VB_ONLY = uint32(0x10) // 16
View Source
const DEFAULT_WINDOW_SIZE = uint32(20 * 1024 * 1024) // 20 Mb
View Source
const UpdateCancel = memcached.CASQuit

Return this as the error from an UpdateFunc to cancel the Update operation.

Variables

View Source
var ClientOpCallback func(opname, k string, start time.Time, err error)

ClientOpCallback is called for each invocation of Do.

View Source
var ConnPoolAvailWaitTime = time.Millisecond

ConnPoolAvailWaitTime is the amount of time to wait for an existing connection from the pool before considering the creation of a new one.

View Source
var ConnPoolCallback func(host string, source string, start time.Time, err error)

ConnPoolTimeout is notified whenever connections are acquired from a pool.

View Source
var ConnPoolTimeout = time.Hour * 24 * 30

Default timeout for retrieving a connection from the pool.

View Source
var ErrKeyExists = errors.New("key exists")

Error returned from Write with AddOnly flag, when key already exists in the bucket.

View Source
var ErrOverwritten = errors.New("overwritten")

Returned from WaitForPersistence (or Write, if the Persistent or Indexable flag is used) if the value has been overwritten by another before being persisted.

View Source
var ErrTimeout = errors.New("timeout")

Returned from WaitForPersistence (or Write, if the Persistent or Indexable flag is used) if the value hasn't been persisted by the timeout interval

View Source
var ErrorClosed = errors.New("dcp.closed")

ErrorClosed

View Source
var ErrorFailoverLog = errors.New("dcp.failoverLog")

ErrorFailoverLog

View Source
var ErrorInconsistentDcpStats = errors.New("dcp.insconsistentDcpStats")

ErrorInconsistentDcpStats

View Source
var ErrorInvalidBucket = errors.New("dcp.invalidBucket")

ErrorInvalidBucket

View Source
var ErrorInvalidVbucket = errors.New("dcp.invalidVbucket")

ErrorInvalidVbucket

View Source
var ErrorTimeoutDcpStats = errors.New("dcp.timeoutDcpStats")

ErrorTimeoutDcpStats

View Source
var HTTPClient = &http.Client{Transport: HTTPTransport}
View Source
var MaxBulkRetries = 1000

Maximum number of times to retry a chunk of a bulk get on error.

View Source
var MaxIdleConnsPerHost = 256

HTTPClient to use for REST and view operations.

View Source
var PoolOverflow = PoolSize

PoolOverflow is the number of overflow connections allowed in a pool.

View Source
var PoolSize = 64

PoolSize is the size of each connection pool (per host).

View Source
var SlowServerCallWarningThreshold time.Duration

If this is set to a nonzero duration, Do() and ViewCustom() will log a warning if the call takes longer than that.

Functions

func CleanupHost

func CleanupHost(h, commonSuffix string) string

CleanupHost returns the hostname with the given suffix removed.

func FindCommonSuffix

func FindCommonSuffix(input []string) string

FindCommonSuffix returns the longest common suffix from the given strings.

func GetSeqs

func GetSeqs(mc *memcached.Client, seqnos []uint64, buf []byte) error

func ParseURL

func ParseURL(urlStr string) (result *url.URL, err error)

ParseURL is a wrapper around url.Parse with some sanity-checking

Types

type AuthHandler

type AuthHandler interface {
	GetCredentials() (string, string)
}

AuthHandler is a callback that gets the auth username and password for the given bucket.

type Bucket

type Bucket struct {
	AuthType            string                 `json:"authType"`
	Capabilities        []string               `json:"bucketCapabilities"`
	CapabilitiesVersion string                 `json:"bucketCapabilitiesVer"`
	Type                string                 `json:"bucketType"`
	Name                string                 `json:"name"`
	NodeLocator         string                 `json:"nodeLocator"`
	Quota               map[string]float64     `json:"quota,omitempty"`
	Replicas            int                    `json:"replicaNumber"`
	Password            string                 `json:"saslPassword"`
	URI                 string                 `json:"uri"`
	StreamingURI        string                 `json:"streamingUri"`
	LocalRandomKeyURI   string                 `json:"localRandomKeyUri,omitempty"`
	UUID                string                 `json:"uuid"`
	BasicStats          map[string]interface{} `json:"basicStats,omitempty"`
	Controllers         map[string]interface{} `json:"controllers,omitempty"`

	// These are used for JSON IO, but isn't used for processing
	// since it needs to be swapped out safely.
	VBSMJson  VBucketServerMap `json:"vBucketServerMap"`
	NodesJSON []Node           `json:"nodes"`
	// contains filtered or unexported fields
}

Bucket is the primary entry point for most data operations.

func GetBucket

func GetBucket(endpoint, poolname, bucketname string) (*Bucket, error)

GetBucket is a convenience function for getting a named bucket from a URL

func (*Bucket) Add

func (b *Bucket) Add(k string, exp int, v interface{}) (added bool, err error)

Add adds a value to this bucket; like Set except that nothing happens if the key exists. The value will be serialized into a JSON document.

func (*Bucket) AddRaw

func (b *Bucket) AddRaw(k string, exp int, v []byte) (added bool, err error)

AddRaw adds a value to this bucket; like SetRaw except that nothing happens if the key exists. The value will be stored as raw bytes.

func (*Bucket) Append

func (b *Bucket) Append(k string, data []byte) error

Append appends raw data to an existing item.

func (*Bucket) Cas

func (b *Bucket) Cas(k string, exp int, cas uint64, v interface{}) error

Set a value in this bucket with Cas

func (*Bucket) CasRaw

func (b *Bucket) CasRaw(k string, exp int, cas uint64, v interface{}) error

Set a value in this bucket with Cas without json encoding it

func (*Bucket) Close

func (b *Bucket) Close()

Close marks this bucket as no longer needed, closing connections it may have open.

func (Bucket) CommonAddressSuffix

func (b Bucket) CommonAddressSuffix() string

CommonAddressSuffix finds the longest common suffix of all host:port strings in the node list.

func (*Bucket) Delete

func (b *Bucket) Delete(k string) error

Delete a key from this bucket.

func (*Bucket) Do

func (b *Bucket) Do(k string, f func(mc *memcached.Client, vb uint16) error) (err error)

Do executes a function on a memcached connection to the node owning key "k"

Note that this automatically handles transient errors by replaying your function on a "not-my-vbucket" error, so don't assume your command will only be executed only once.

func (*Bucket) Get

func (b *Bucket) Get(k string, rv interface{}) error

Get a value from this bucket. The value is expected to be a JSON stream and will be deserialized into rv.

func (*Bucket) GetBulk

func (b *Bucket) GetBulk(keys []string) (map[string]*transport.MCResponse, error)

GetBulk fetches multiple keys concurrently.

Unlike more convenient GETs, the entire response is returned in the map for each key. Keys that were not found will not be included in the map.

func (*Bucket) GetDcpConn

func (b *Bucket) GetDcpConn(name DcpFeedName, host string) (*memcached.Client, error)

func (*Bucket) GetFailoverLogs

func (b *Bucket) GetFailoverLogs(
	opaque uint16,
	vBuckets []uint16, config map[string]interface{}) (FailoverLog, error)

GetFailoverLogs get the failover logs for a set of vbucket ids

func (*Bucket) GetPool

func (b *Bucket) GetPool() *Pool

GetPool gets the pool to which this bucket belongs.

func (*Bucket) GetRaw

func (b *Bucket) GetRaw(k string) ([]byte, error)

GetRaw gets a raw value from this bucket. No marshaling is performed.

func (*Bucket) GetStats

func (b *Bucket) GetStats(which string) (map[string]map[string]string, error)

GetStats gets a set of stats from all servers.

Returns a map of server ID -> map of stat key to map value.

func (*Bucket) GetVBmap

func (b *Bucket) GetVBmap(addrs []string) (map[string][]uint16, error)

func (*Bucket) Gets

func (b *Bucket) Gets(k string, rv interface{}, caso *uint64) error

Gets gets a value from this bucket, including its CAS counter. The value is expected to be a JSON stream and will be deserialized into rv.

func (*Bucket) GetsRaw

func (b *Bucket) GetsRaw(k string) (data []byte, flags int,
	cas uint64, err error)

GetsRaw gets a raw value from this bucket including its CAS counter and flags.

func (*Bucket) Incr

func (b *Bucket) Incr(k string, amt, def uint64, exp int) (val uint64, err error)

Incr increments the value at a given key.

func (Bucket) NodeAddresses

func (b Bucket) NodeAddresses() []string

NodeAddresses gets the (sorted) list of memcached node addresses (hostname:port).

func (Bucket) Nodes

func (b Bucket) Nodes() []Node

Nodes returns the current list of nodes servicing this bucket.

func (*Bucket) Observe

func (b *Bucket) Observe(k string) (result memcached.ObserveResult, err error)

Observe observes the current state of a document.

func (*Bucket) Refresh

func (b *Bucket) Refresh() error

func (*Bucket) Set

func (b *Bucket) Set(k string, exp int, v interface{}) error

Set a value in this bucket. The value will be serialized into a JSON document.

func (*Bucket) SetRaw

func (b *Bucket) SetRaw(k string, exp int, v []byte) error

SetRaw sets a value in this bucket without JSON encoding it.

func (*Bucket) StartDcpFeed

func (b *Bucket) StartDcpFeed(
	name DcpFeedName, sequence, flags uint32,
	opaque uint16,
	config map[string]interface{}) (*DcpFeed, error)

StartDcpFeed creates and starts a new Dcp feed. No data will be sent on the channel unless vbuckets streams are requested.

func (*Bucket) StartDcpFeedOver

func (b *Bucket) StartDcpFeedOver(
	name DcpFeedName,
	sequence, flags uint32,
	kvaddrs []string,
	opaque uint16,
	config map[string]interface{}) (*DcpFeed, error)

StartDcpFeedOver creates and starts a new Dcp feed. No data will be sent on the channel unless vbuckets streams are requested. Connections will be made only to specified kvnodes `kvaddrs`, to connect will all kvnodes hosting the bucket, pass `kvaddrs` as nil

configuration parameters,

"genChanSize", buffer channel size for control path.
"dataChanSize", buffer channel size for data path.
"numConnections", number of connections with DCP for local vbuckets.

func (*Bucket) StartTapFeed

func (b *Bucket) StartTapFeed(args *memcached.TapArguments) (*TapFeed, error)

StartTapFeed creates and starts a new Tap feed

func (*Bucket) Update

func (b *Bucket) Update(k string, exp int, callback UpdateFunc) error

Update performs a Safe update of a document, avoiding conflicts by using CAS.

The callback function will be invoked with the current raw document contents (or nil if the document doesn't exist); it should return the updated raw contents (or nil to delete.) If it decides not to change anything it can return UpdateCancel as the error.

If another writer modifies the document between the get and the set, the callback will be invoked again with the newer value.

func (*Bucket) VBHash

func (b *Bucket) VBHash(key string) uint32

VBHash finds the vbucket for the given key.

func (*Bucket) VBServerMap

func (b *Bucket) VBServerMap() *VBucketServerMap

VBServerMap returns the current VBucketServerMap.

func (*Bucket) WaitForPersistence

func (b *Bucket) WaitForPersistence(k string, cas uint64, deletion bool) error

WaitForPersistence waits for an item to be considered durable.

Besides transport errors, ErrOverwritten may be returned if the item is overwritten before it reaches durability. ErrTimeout may occur if the item isn't found durable in a reasonable amount of time.

func (*Bucket) Write

func (b *Bucket) Write(k string, flags, exp int, v interface{},
	opt WriteOptions) (err error)

General-purpose value setter.

The Set, Add and Delete methods are just wrappers around this. The interpretation of `v` depends on whether the `Raw` option is given. If it is, v must be a byte array or nil. (A nil value causes a delete.) If `Raw` is not given, `v` will be marshaled as JSON before being written. It must be JSON-marshalable and it must not be nil.

func (*Bucket) WriteCas

func (b *Bucket) WriteCas(k string, flags, exp int, cas uint64, v interface{},
	opt WriteOptions) (err error)

func (*Bucket) WriteUpdate

func (b *Bucket) WriteUpdate(k string, exp int, callback WriteUpdateFunc) error

WriteUpdate performs a Safe update of a document, avoiding conflicts by using CAS. WriteUpdate is like Update, except that the callback can return a set of WriteOptions, of which Persist and Indexable are recognized: these cause the call to wait until the document update has been persisted to disk and/or become available to index.

type BucketInfo

type BucketInfo struct {
	Name     string // name of bucket
	Password string // SASL password of bucket
}

Get SASL buckets

func GetBucketList

func GetBucketList(baseU string) (bInfo []BucketInfo, err error)

type Client

type Client struct {
	BaseURL *url.URL

	Info Pools
	// contains filtered or unexported fields
}

A Client is the starting point for all services across all buckets in a Couchbase cluster.

func Connect

func Connect(baseU string) (Client, error)

Connect to a couchbase cluster. An authentication handler will be created from the userinfo in the URL if provided.

func ConnectWithAuth

func ConnectWithAuth(baseU string, ah AuthHandler) (c Client, err error)

ConnectWithAuth connects to a couchbase cluster with the given authentication handler.

func (*Client) GetPool

func (c *Client) GetPool(name string) (p Pool, err error)

GetPool gets a pool from within the couchbase cluster (usually "default").

func (*Client) GetPoolServices

func (c *Client) GetPoolServices(name string) (ps PoolServices, err error)

GetPoolServices returns all the bucket-independent services in a pool. (See "Exposing services outside of bucket context" in http://goo.gl/uuXRkV)

func (*Client) RunObserveNodeServices

func (c *Client) RunObserveNodeServices(pool string, callb func(interface{}) error, cancel chan bool) error

NodeServices streaming API based observe-callback wrapper

func (*Client) RunObservePool

func (c *Client) RunObservePool(pool string, callb func(interface{}) error, cancel chan bool) error

Pool streaming API based observe-callback wrapper

type DcpFeed

type DcpFeed struct {
	C <-chan *memcached.DcpEvent
	// contains filtered or unexported fields
}

DcpFeed streams mutation events from a bucket.

Events from the bucket can be read from the channel 'C'. Remember to call Close() on it when you're done, unless its channel has closed itself already.

func (*DcpFeed) Close

func (feed *DcpFeed) Close() error

Close DcpFeed. Synchronous call.

func (*DcpFeed) DcpCloseStream

func (feed *DcpFeed) DcpCloseStream(vb, opaqueMSB uint16) error

DcpCloseStream closes a stream for a vb on a feed and immediately returns, it is upto the channel listener to detect StreamEnd.

func (*DcpFeed) DcpGetSeqnos

func (feed *DcpFeed) DcpGetSeqnos() (map[uint16]uint64, error)

DcpGetSeqnos return the list of seqno for vbuckets, synchronous call.

func (*DcpFeed) DcpRequestStream

func (feed *DcpFeed) DcpRequestStream(
	vb uint16, opaque uint16, flags uint32,
	vbuuid, startSequence, endSequence, snapStart, snapEnd uint64) error

DcpRequestStream starts a stream for a vb on a feed and immediately returns, it is upto the channel listener to detect StreamBegin. Synchronous call.

func (*DcpFeed) GetName

func (feed *DcpFeed) GetName() DcpFeedName

DcpFeedName returns feed name

type DcpFeedName

type DcpFeedName struct {
	// contains filtered or unexported fields
}

func NewDcpFeedName

func NewDcpFeedName(name string) DcpFeedName

Make a valid DCP feed name. These always begin with eventing:

func (*DcpFeedName) Raw

func (f *DcpFeedName) Raw() string

type FailoverLog

type FailoverLog map[uint16]memcached.FailoverLog

FailoverLog for list of vbuckets.

type FeedInfo

type FeedInfo struct {
	// contains filtered or unexported fields
}

FeedInfo is dcp-feed from a single connection.

type GenericMcdAuthHandler

type GenericMcdAuthHandler interface {
	AuthHandler
	AuthenticateMemcachedConn(string, *memcached.Client) error
}

GenericMcdAuthHandler is a kind of AuthHandler that performs special auth exchange (like non-standard auth, possibly followed by select-bucket).

type Node

type Node struct {
	ClusterCompatibility int                `json:"clusterCompatibility"`
	ClusterMembership    string             `json:"clusterMembership"`
	CouchAPIBase         string             `json:"couchApiBase"`
	Hostname             string             `json:"hostname"`
	InterestingStats     map[string]float64 `json:"interestingStats,omitempty"`
	MCDMemoryAllocated   float64            `json:"mcdMemoryAllocated"`
	MCDMemoryReserved    float64            `json:"mcdMemoryReserved"`
	MemoryFree           float64            `json:"memoryFree"`
	MemoryTotal          float64            `json:"memoryTotal"`
	OS                   string             `json:"os"`
	Ports                map[string]int     `json:"ports"`
	Status               string             `json:"status"`
	Uptime               int                `json:"uptime,string"`
	Version              string             `json:"version"`
	ThisNode             bool               `json:"thisNode,omitempty"`
	Services             []string           `json:"services,omitempty"`
}

A Node is a computer in a cluster running the couchbase software.

type NodeServices

type NodeServices struct {
	Services map[string]int `json:"services,omitempty"`
	Hostname string         `json:"hostname"`
	ThisNode bool           `json:"thisNode"`
}

NodeServices is all the bucket-independent services running on a node (given by Hostname)

type Pool

type Pool struct {
	BucketMap map[string]Bucket
	Nodes     []Node

	BucketURL       map[string]string `json:"buckets"`
	ServerGroupsUri string            `json:"serverGroupsUri"`
	// contains filtered or unexported fields
}

A Pool of nodes and buckets.

func (*Pool) GetBucket

func (p *Pool) GetBucket(name string) (*Bucket, error)

GetBucket gets a bucket from within this pool.

func (*Pool) GetClient

func (p *Pool) GetClient() *Client

GetClient gets the client from which we got this pool.

func (*Pool) GetServerGroups

func (p *Pool) GetServerGroups() (groups ServerGroups, err error)

type PoolServices

type PoolServices struct {
	Rev      int            `json:"rev"`
	NodesExt []NodeServices `json:"nodesExt"`
}

PoolServices is all the bucket-independent services in a pool

type Pools

type Pools struct {
	ComponentsVersion     map[string]string `json:"componentsVersion,omitempty"`
	ImplementationVersion string            `json:"implementationVersion"`
	IsAdmin               bool              `json:"isAdminCreds"`
	UUID                  string            `json:"uuid"`
	Pools                 []RestPool        `json:"pools"`
}

Pools represents the collection of pools as returned from the REST API.

type RestPool

type RestPool struct {
	Name         string `json:"name"`
	StreamingURI string `json:"streamingUri"`
	URI          string `json:"uri"`
}

RestPool represents a single pool returned from the pools REST API.

type ServerGroup

type ServerGroup struct {
	Name  string `json:"name"`
	Nodes []Node `json:"nodes"`
}

type ServerGroups

type ServerGroups struct {
	Groups []ServerGroup `json:"groups"`
}

type TapFeed

type TapFeed struct {
	C <-chan memcached.TapEvent
	// contains filtered or unexported fields
}

A TapFeed streams mutation events from a bucket.

Events from the bucket can be read from the channel 'C'. Remember to call Close() on it when you're done, unless its channel has closed itself already.

func (*TapFeed) Close

func (feed *TapFeed) Close() error

Close a Tap feed.

type UpdateFunc

type UpdateFunc func(current []byte) (updated []byte, err error)

An UpdateFunc is a callback function to update a document

type VBucketServerMap

type VBucketServerMap struct {
	HashAlgorithm string   `json:"hashAlgorithm"`
	NumReplicas   int      `json:"numReplicas"`
	ServerList    []string `json:"serverList"`
	VBucketMap    [][]int  `json:"vBucketMap"`
}

VBucketServerMap is the a mapping of vbuckets to nodes.

type WriteOptions

type WriteOptions int

WriteOptions is the set of option flags availble for the Write method. They are ORed together to specify the desired request.

func (WriteOptions) String

func (w WriteOptions) String() string

String representation of WriteOptions

type WriteUpdateFunc

type WriteUpdateFunc func(current []byte) (updated []byte, opt WriteOptions, err error)

A WriteUpdateFunc is a callback function to update a document

Directories

Path Synopsis
Package transport is binary protocol packet formats and constants.
Package transport is binary protocol packet formats and constants.
client
Package memcached provides a memcached binary protocol client.
Package memcached provides a memcached binary protocol client.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL