Documentation ¶
Overview ¶
Package memcached provides a memcached binary protocol client.
go implementation of upr client. See https://github.com/couchbaselabs/cbupr/blob/master/transport-spec.md TODO 1. Use a pool allocator to avoid garbage
Index ¶
- Constants
- Variables
- func GetSubDocVal(subPaths []string) (extraBuf, valueBuf []byte)
- func IfResStatusError(response *gomemcached.MCResponse) bool
- func SetDefaultDialTimeout(dial time.Duration)
- func SetDefaultTimeouts(dial, read, write time.Duration)
- func UnwrapMemcachedError(rv *gomemcached.MCResponse, err error) (*gomemcached.MCResponse, error)
- type CASState
- type CasFunc
- type CasOp
- type Client
- func (c *Client) Add(vb uint16, key string, flags int, exp int, body []byte) (*gomemcached.MCResponse, error)
- func (c *Client) Append(vb uint16, key string, data []byte) (*gomemcached.MCResponse, error)
- func (c *Client) Auth(user, pass string) (*gomemcached.MCResponse, error)
- func (c *Client) AuthList() (*gomemcached.MCResponse, error)
- func (c *Client) AuthPlain(user, pass string) (*gomemcached.MCResponse, error)
- func (c *Client) AuthScramSha(user, pass string) (*gomemcached.MCResponse, error)
- func (c *Client) CAS(vb uint16, k string, f CasFunc, initexp int) (*gomemcached.MCResponse, error)
- func (c *Client) CASNext(vb uint16, k string, exp int, state *CASState) bool
- func (c *Client) Close() error
- func (c *Client) Decr(vb uint16, key string, amt, def uint64, exp int) (uint64, error)
- func (c *Client) Del(vb uint16, key string) (*gomemcached.MCResponse, error)
- func (c *Client) EnableFeatures(features Features) (*gomemcached.MCResponse, error)
- func (c *Client) EnableMutationToken() (*gomemcached.MCResponse, error)
- func (c *Client) Get(vb uint16, key string) (*gomemcached.MCResponse, error)
- func (c *Client) GetAndTouch(vb uint16, key string, exp int) (*gomemcached.MCResponse, error)
- func (c *Client) GetBulk(vb uint16, keys []string, rv map[string]*gomemcached.MCResponse, ...) error
- func (c *Client) GetMeta(vb uint16, key string) (*gomemcached.MCResponse, error)
- func (c *Client) GetRandomDoc() (*gomemcached.MCResponse, error)
- func (c *Client) GetSubdoc(vb uint16, key string, subPaths []string) (*gomemcached.MCResponse, error)
- func (c *Client) Hijack() io.ReadWriteCloser
- func (c *Client) Incr(vb uint16, key string, amt, def uint64, exp int) (uint64, error)
- func (c Client) IsHealthy() bool
- func (mc *Client) NewUprFeed() (*UprFeed, error)
- func (mc *Client) NewUprFeedIface() (UprFeedIface, error)
- func (mc *Client) NewUprFeedWithConfig(ackByClient bool) (*UprFeed, error)
- func (mc *Client) NewUprFeedWithConfigIface(ackByClient bool) (UprFeedIface, error)
- func (c *Client) Observe(vb uint16, key string) (result ObserveResult, err error)
- func (c *Client) ObserveSeq(vb uint16, vbuuid uint64) (result *ObserveSeqResult, err error)
- func (c *Client) Receive() (*gomemcached.MCResponse, error)
- func (c *Client) ReceiveWithDeadline(deadline time.Time) (*gomemcached.MCResponse, error)
- func (c *Client) SelectBucket(bucket string) (*gomemcached.MCResponse, error)
- func (c *Client) Send(req *gomemcached.MCRequest) (rv *gomemcached.MCResponse, err error)
- func (c *Client) Set(vb uint16, key string, flags int, exp int, body []byte) (*gomemcached.MCResponse, error)
- func (c *Client) SetCas(vb uint16, key string, flags int, exp int, cas uint64, body []byte) (*gomemcached.MCResponse, error)
- func (c *Client) SetDeadline(t time.Time)
- func (c *Client) SetKeepAliveOptions(interval time.Duration)
- func (c *Client) SetReadDeadline(t time.Time)
- func (mc *Client) StartTapFeed(args TapArguments) (*TapFeed, error)
- func (c *Client) Stats(key string) ([]StatValue, error)
- func (c *Client) StatsMap(key string) (map[string]string, error)
- func (c *Client) StatsMapForSpecifiedStats(key string, statsMap map[string]string) error
- func (c *Client) Transmit(req *gomemcached.MCRequest) error
- func (c *Client) TransmitResponse(res *gomemcached.MCResponse) error
- func (c *Client) TransmitWithDeadline(req *gomemcached.MCRequest, deadline time.Time) error
- func (mc *Client) UprGetFailoverLog(vb []uint16) (map[uint16]*FailoverLog, error)
- type ClientIface
- type FailoverLog
- type Feature
- type Features
- type ObserveResult
- type ObserveSeqResult
- type ObservedStatus
- type StatValue
- type TapArguments
- type TapEvent
- type TapFeed
- type TapOpcode
- type UprEvent
- type UprFeatures
- type UprFeed
- func (feed *UprFeed) ClientAck(event *UprEvent) error
- func (feed *UprFeed) Close()
- func (feed *UprFeed) CloseStream(vbno, opaqueMSB uint16) error
- func (feed *UprFeed) Closed() bool
- func (feed *UprFeed) GetError() error
- func (feed *UprFeed) GetUprEventCh() <-chan *UprEvent
- func (feed *UprFeed) GetUprStats() *UprStats
- func (feed *UprFeed) StartFeed() error
- func (feed *UprFeed) StartFeedWithConfig(datachan_len int) error
- func (feed *UprFeed) UprOpen(name string, sequence uint32, bufSize uint32) error
- func (feed *UprFeed) UprOpenWithFeatures(name string, sequence uint32, bufSize uint32, features UprFeatures) (error, UprFeatures)
- func (feed *UprFeed) UprOpenWithXATTR(name string, sequence uint32, bufSize uint32) error
- func (feed *UprFeed) UprRequestStream(vbno, opaqueMSB uint16, flags uint32, ...) error
- type UprFeedIface
- type UprStats
- type UprStream
Constants ¶
const ( ObservedNotPersisted = ObservedStatus(0x00) // found, not persisted ObservedPersisted = ObservedStatus(0x01) // found, persisted ObservedNotFound = ObservedStatus(0x80) // not found (or a persisted delete) ObservedLogicallyDeleted = ObservedStatus(0x81) // pending deletion (not persisted yet) )
Observation status values.
const ( // CASStore instructs the server to store the new value normally CASStore = CasOp(iota) // CASQuit instructs the client to stop attempting to CAS, leaving value untouched CASQuit // CASDelete instructs the server to delete the current value CASDelete )
const ( TapBeginBackfill = TapOpcode(iota) TapEndBackfill TapMutation TapDeletion TapCheckpointStart TapCheckpointEnd )
Tap opcode values.
const ( CompressionTypeStartMarker = iota // also means invalid CompressionTypeNone = iota CompressionTypeSnappy = iota CompressionTypeEndMarker = iota // also means invalid )
const ( JSONDataType uint8 = 1 SnappyDataType uint8 = 2 XattrDataType uint8 = 4 )
kv_engine/include/mcbp/protocol/datatype.h
const FeatureDataType = Feature(0x0b)
const FeatureMutationToken = Feature(0x04)
const FeatureXattr = Feature(0x06)
const TapNoBackfill = math.MaxUint64
Value for TapArguments.Backfill denoting that no past events at all should be sent.
Variables ¶
var ( DefaultDialTimeout = time.Duration(0) // No timeout DefaultWriteTimeout = time.Duration(0) // No timeout )
var ErrorInvalidLog = errors.New("couchbase.errorInvalidLog")
error codes
var Healthy uint32 = 1
var ReceiveHook func(*gomemcached.MCResponse, int, error)
ReceiveHook is called after every packet is received (or attempted to be)
var TapRecvHook func(*gomemcached.MCRequest, int, error)
TapRecvHook is called after every incoming tap packet is received.
var TransmitHook func(*gomemcached.MCRequest, int, error)
TransmitHook is called after each packet is transmitted.
var UnHealthy uint32 = 0
Functions ¶
func GetSubDocVal ¶
func IfResStatusError ¶
func IfResStatusError(response *gomemcached.MCResponse) bool
func SetDefaultDialTimeout ¶
func SetDefaultTimeouts ¶
func UnwrapMemcachedError ¶
func UnwrapMemcachedError(rv *gomemcached.MCResponse, err error) (*gomemcached.MCResponse, error)
UnwrapMemcachedError converts memcached errors to normal responses.
If the error is a memcached response, declare the error to be nil so a client can handle the status without worrying about whether it indicates success or failure.
Types ¶
type CASState ¶
type CASState struct { Value []byte // Current value of key; update in place to new value Cas uint64 // Current CAS value of key Exists bool // Does a value exist for the key? (If not, Value will be nil) Err error // Error, if any, after CASNext returns false // contains filtered or unexported fields }
CASState tracks the state of CAS over several operations.
This is used directly by CASNext and indirectly by CAS
type CasFunc ¶
CasFunc is type type of function to perform a CAS transform.
Input is the current value, or nil if no value exists. The function should return the new value (if any) to set, and the store/quit/delete operation.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
The Client itself.
func (*Client) Add ¶
func (c *Client) Add(vb uint16, key string, flags int, exp int, body []byte) (*gomemcached.MCResponse, error)
Add a value for a key (store if not exists).
func (*Client) Append ¶
func (c *Client) Append(vb uint16, key string, data []byte) (*gomemcached.MCResponse, error)
Append data to the value of a key.
func (*Client) Auth ¶
func (c *Client) Auth(user, pass string) (*gomemcached.MCResponse, error)
Auth performs SASL PLAIN authentication against the server.
func (*Client) AuthList ¶
func (c *Client) AuthList() (*gomemcached.MCResponse, error)
AuthList lists SASL auth mechanisms.
func (*Client) AuthPlain ¶
func (c *Client) AuthPlain(user, pass string) (*gomemcached.MCResponse, error)
func (*Client) AuthScramSha ¶
func (c *Client) AuthScramSha(user, pass string) (*gomemcached.MCResponse, error)
AuthScramSha performs SCRAM-SHA authentication against the server.
func (*Client) CAS ¶
func (c *Client) CAS(vb uint16, k string, f CasFunc, initexp int) (*gomemcached.MCResponse, error)
CAS performs a CAS transform with the given function.
If the value does not exist, a nil current value will be sent to f.
func (*Client) CASNext ¶
CASNext is a non-callback, loop-based version of CAS method.
Usage is like this:
var state memcached.CASState
for client.CASNext(vb, key, exp, &state) { state.Value = some_mutation(state.Value) }
if state.Err != nil { ... }
func (*Client) Del ¶
func (c *Client) Del(vb uint16, key string) (*gomemcached.MCResponse, error)
Del deletes a key.
func (*Client) EnableFeatures ¶
func (c *Client) EnableFeatures(features Features) (*gomemcached.MCResponse, error)
Send a hello command to enable specific features
func (*Client) EnableMutationToken ¶
func (c *Client) EnableMutationToken() (*gomemcached.MCResponse, error)
Send a hello command to enable MutationTokens
func (*Client) Get ¶
func (c *Client) Get(vb uint16, key string) (*gomemcached.MCResponse, error)
Get the value for a key.
func (*Client) GetAndTouch ¶
func (c *Client) GetAndTouch(vb uint16, key string, exp int) (*gomemcached.MCResponse, error)
Get the value for a key, and update expiry
func (*Client) GetBulk ¶
func (c *Client) GetBulk(vb uint16, keys []string, rv map[string]*gomemcached.MCResponse, subPaths []string) error
GetBulk gets keys in bulk
func (*Client) GetMeta ¶
func (c *Client) GetMeta(vb uint16, key string) (*gomemcached.MCResponse, error)
Get metadata for a key
func (*Client) GetRandomDoc ¶
func (c *Client) GetRandomDoc() (*gomemcached.MCResponse, error)
Get a random document
func (*Client) GetSubdoc ¶
func (c *Client) GetSubdoc(vb uint16, key string, subPaths []string) (*gomemcached.MCResponse, error)
Get the xattrs, doc value for the input key
func (*Client) Hijack ¶
func (c *Client) Hijack() io.ReadWriteCloser
Hijack exposes the underlying connection from this client.
It also marks the connection as unhealthy since the client will have lost control over the connection and can't otherwise verify things are in good shape for connection pools.
func (Client) IsHealthy ¶
IsHealthy returns true unless the client is belived to have difficulty communicating to its server.
This is useful for connection pools where we want to non-destructively determine that a connection may be reused.
func (*Client) NewUprFeed ¶
NewUprFeed creates a new UPR Feed. TODO: Describe side-effects on bucket instance and its connection pool.
func (*Client) NewUprFeedIface ¶
func (mc *Client) NewUprFeedIface() (UprFeedIface, error)
func (*Client) NewUprFeedWithConfig ¶
func (*Client) NewUprFeedWithConfigIface ¶
func (mc *Client) NewUprFeedWithConfigIface(ackByClient bool) (UprFeedIface, error)
func (*Client) Observe ¶
func (c *Client) Observe(vb uint16, key string) (result ObserveResult, err error)
Observe gets the persistence/replication/CAS state of a key
func (*Client) ObserveSeq ¶
func (c *Client) ObserveSeq(vb uint16, vbuuid uint64) (result *ObserveSeqResult, err error)
func (*Client) Receive ¶
func (c *Client) Receive() (*gomemcached.MCResponse, error)
Receive a response
func (*Client) ReceiveWithDeadline ¶
func (c *Client) ReceiveWithDeadline(deadline time.Time) (*gomemcached.MCResponse, error)
func (*Client) SelectBucket ¶
func (c *Client) SelectBucket(bucket string) (*gomemcached.MCResponse, error)
select bucket
func (*Client) Send ¶
func (c *Client) Send(req *gomemcached.MCRequest) (rv *gomemcached.MCResponse, err error)
Send a custom request and get the response.
func (*Client) Set ¶
func (c *Client) Set(vb uint16, key string, flags int, exp int, body []byte) (*gomemcached.MCResponse, error)
Set the value for a key.
func (*Client) SetCas ¶
func (c *Client) SetCas(vb uint16, key string, flags int, exp int, cas uint64, body []byte) (*gomemcached.MCResponse, error)
SetCas set the value for a key with cas
func (*Client) SetDeadline ¶
func (*Client) SetKeepAliveOptions ¶
func (*Client) SetReadDeadline ¶
func (*Client) StartTapFeed ¶
func (mc *Client) StartTapFeed(args TapArguments) (*TapFeed, error)
StartTapFeed starts a TAP feed on a client connection.
The events can be read from the returned channel. The connection can no longer be used for other purposes; it's now reserved for receiving the TAP messages. To stop receiving events, close the client connection.
func (*Client) StatsMap ¶
StatsMap requests server-side stats similarly to Stats, but returns them as a map.
Use "" as the stat key for toplevel stats.
func (*Client) StatsMapForSpecifiedStats ¶
instead of returning a new statsMap, simply populate passed in statsMap, which contains all the keys for which stats needs to be retrieved
func (*Client) Transmit ¶
func (c *Client) Transmit(req *gomemcached.MCRequest) error
Transmit send a request, but does not wait for a response.
func (*Client) TransmitResponse ¶
func (c *Client) TransmitResponse(res *gomemcached.MCResponse) error
TransmitResponse send a response, does not wait.
func (*Client) TransmitWithDeadline ¶
func (*Client) UprGetFailoverLog ¶
func (mc *Client) UprGetFailoverLog( vb []uint16) (map[uint16]*FailoverLog, error)
UprGetFailoverLog for given list of vbuckets.
type ClientIface ¶
type ClientIface interface { Add(vb uint16, key string, flags int, exp int, body []byte) (*gomemcached.MCResponse, error) Append(vb uint16, key string, data []byte) (*gomemcached.MCResponse, error) Auth(user, pass string) (*gomemcached.MCResponse, error) AuthList() (*gomemcached.MCResponse, error) AuthPlain(user, pass string) (*gomemcached.MCResponse, error) AuthScramSha(user, pass string) (*gomemcached.MCResponse, error) CASNext(vb uint16, k string, exp int, state *CASState) bool CAS(vb uint16, k string, f CasFunc, initexp int) (*gomemcached.MCResponse, error) Close() error Decr(vb uint16, key string, amt, def uint64, exp int) (uint64, error) Del(vb uint16, key string) (*gomemcached.MCResponse, error) EnableMutationToken() (*gomemcached.MCResponse, error) Get(vb uint16, key string) (*gomemcached.MCResponse, error) GetSubdoc(vb uint16, key string, subPaths []string) (*gomemcached.MCResponse, error) GetAndTouch(vb uint16, key string, exp int) (*gomemcached.MCResponse, error) GetBulk(vb uint16, keys []string, rv map[string]*gomemcached.MCResponse, subPaths []string) error GetMeta(vb uint16, key string) (*gomemcached.MCResponse, error) GetRandomDoc() (*gomemcached.MCResponse, error) Hijack() io.ReadWriteCloser Incr(vb uint16, key string, amt, def uint64, exp int) (uint64, error) Observe(vb uint16, key string) (result ObserveResult, err error) ObserveSeq(vb uint16, vbuuid uint64) (result *ObserveSeqResult, err error) Receive() (*gomemcached.MCResponse, error) ReceiveWithDeadline(deadline time.Time) (*gomemcached.MCResponse, error) Send(req *gomemcached.MCRequest) (rv *gomemcached.MCResponse, err error) Set(vb uint16, key string, flags int, exp int, body []byte) (*gomemcached.MCResponse, error) SetKeepAliveOptions(interval time.Duration) SetReadDeadline(t time.Time) SetDeadline(t time.Time) SelectBucket(bucket string) (*gomemcached.MCResponse, error) SetCas(vb uint16, key string, flags int, exp int, cas uint64, body []byte) (*gomemcached.MCResponse, error) Stats(key string) ([]StatValue, error) StatsMap(key string) (map[string]string, error) StatsMapForSpecifiedStats(key string, statsMap map[string]string) error Transmit(req *gomemcached.MCRequest) error TransmitWithDeadline(req *gomemcached.MCRequest, deadline time.Time) error TransmitResponse(res *gomemcached.MCResponse) error // UprFeed Related NewUprFeed() (*UprFeed, error) NewUprFeedIface() (UprFeedIface, error) NewUprFeedWithConfig(ackByClient bool) (*UprFeed, error) NewUprFeedWithConfigIface(ackByClient bool) (UprFeedIface, error) UprGetFailoverLog(vb []uint16) (map[uint16]*FailoverLog, error) }
type FailoverLog ¶
type FailoverLog [][2]uint64
FailoverLog containing vvuid and sequnce number
func (*FailoverLog) Latest ¶
func (flogp *FailoverLog) Latest() (vbuuid, seqno uint64, err error)
type ObserveResult ¶
type ObserveResult struct { Status ObservedStatus // Whether the value has been persisted/deleted Cas uint64 // Current value's CAS PersistenceTime time.Duration // Node's average time to persist a value ReplicationTime time.Duration // Node's average time to replicate a value }
ObserveResult represents the data obtained by an Observe call
func (ObserveResult) CheckPersistence ¶
func (result ObserveResult) CheckPersistence(cas uint64, deletion bool) (persisted bool, overwritten bool)
CheckPersistence checks whether a stored value has been persisted to disk yet.
type ObserveSeqResult ¶
type ObserveSeqResult struct { Failover uint8 // Set to 1 if a failover took place VbId uint16 // vbucket id Vbuuid uint64 // vucket uuid LastPersistedSeqNo uint64 // last persisted sequence number CurrentSeqNo uint64 // current sequence number OldVbuuid uint64 // Old bucket vbuuid LastSeqNo uint64 // last sequence number received before failover }
Sequence number based Observe Implementation
type ObservedStatus ¶
type ObservedStatus uint8
ObservedStatus is the type reported by the Observe method
type TapArguments ¶
type TapArguments struct { // Timestamp of oldest item to send. // // Use TapNoBackfill to suppress all past items. Backfill uint64 // If set, server will disconnect after sending existing items. Dump bool // The indices of the vbuckets to watch; empty/nil to watch all. VBuckets []uint16 // Transfers ownership of vbuckets during cluster rebalance. Takeover bool // If true, server will wait for client ACK after every notification. SupportAck bool // If true, client doesn't want values so server shouldn't send them. KeysOnly bool // If true, client wants the server to send checkpoint events. Checkpoint bool // Optional identifier to use for this client, to allow reconnects ClientName string // Registers this client (by name) till explicitly deregistered. RegisteredClient bool }
TapArguments are parameters for requesting a TAP feed.
Call DefaultTapArguments to get a default one.
func DefaultTapArguments ¶
func DefaultTapArguments() TapArguments
DefaultTapArguments returns a default set of parameter values to pass to StartTapFeed.
type TapEvent ¶
type TapEvent struct { Opcode TapOpcode // Type of event VBucket uint16 // VBucket this event applies to Flags uint32 // Item flags Expiry uint32 // Item expiration time Key, Value []byte // Item key/value Cas uint64 }
TapEvent is a TAP notification of an operation on the server.
type UprEvent ¶
type UprEvent struct { Opcode gomemcached.CommandCode // Type of event Status gomemcached.Status // Response status VBucket uint16 // VBucket this event applies to DataType uint8 // data type Opaque uint16 // 16 MSB of opaque VBuuid uint64 // This field is set by downstream Flags uint32 // Item flags Expiry uint32 // Item expiration time Key, Value []byte // Item key/value OldValue []byte // TODO: TBD: old document value Cas uint64 // CAS value of the item Seqno uint64 // sequence number of the mutation RevSeqno uint64 // rev sequence number : deletions LockTime uint32 // Lock time MetadataSize uint16 // Metadata size SnapstartSeq uint64 // start sequence number of this snapshot SnapendSeq uint64 // End sequence number of the snapshot SnapshotType uint32 // 0: disk 1: memory FailoverLog *FailoverLog // Failover log containing vvuid and sequnce number Error error // Error value in case of a failure ExtMeta []byte AckSize uint32 // The number of bytes that can be Acked to DCP }
UprEvent memcached events for UPR streams.
func (*UprEvent) IsSnappyDataType ¶
type UprFeatures ¶
type UprFeed ¶
type UprFeed struct { C <-chan *UprEvent // Exported channel for receiving UPR events Error error // error // contains filtered or unexported fields }
UprFeed represents an UPR feed. A feed contains a connection to a single host and multiple vBuckets
func (*UprFeed) ClientAck ¶
Client, after completing processing of an UprEvent, need to call this API to notify UprFeed, so that UprFeed can update its ack bytes stats and send ack to DCP if needed Client needs to set ackByClient flag to true in NewUprFeedWithConfig() call as a prerequisite for this call to work This API is not thread safe. Caller should NOT have more than one go rountine calling this API
func (*UprFeed) CloseStream ¶
CloseStream for specified vbucket.
func (*UprFeed) GetUprEventCh ¶
func (*UprFeed) GetUprStats ¶
func (*UprFeed) StartFeedWithConfig ¶
func (*UprFeed) UprOpen ¶
UprOpen to connect with a UPR producer. Name: name of te UPR connection sequence: sequence number for the connection bufsize: max size of the application
func (*UprFeed) UprOpenWithFeatures ¶
func (feed *UprFeed) UprOpenWithFeatures(name string, sequence uint32, bufSize uint32, features UprFeatures) (error, UprFeatures)
func (*UprFeed) UprOpenWithXATTR ¶
UprOpen with XATTR enabled.
type UprFeedIface ¶
type UprFeedIface interface { Close() Closed() bool CloseStream(vbno, opaqueMSB uint16) error GetError() error GetUprStats() *UprStats ClientAck(event *UprEvent) error GetUprEventCh() <-chan *UprEvent StartFeed() error StartFeedWithConfig(datachan_len int) error UprOpen(name string, sequence uint32, bufSize uint32) error UprOpenWithXATTR(name string, sequence uint32, bufSize uint32) error UprOpenWithFeatures(name string, sequence uint32, bufSize uint32, features UprFeatures) (error, UprFeatures) UprRequestStream(vbno, opaqueMSB uint16, flags uint32, vuuid, startSequence, endSequence, snapStart, snapEnd uint64) error }
Exported interface - to allow for mocking