Documentation ¶
Overview ¶
Package memcached provides a memcached binary protocol client.
go implementation of upr client. See https://github.com/couchbaselabs/cbupr/blob/master/transport-spec.md TODO 1. Use a pool allocator to avoid garbage
Index ¶
- Constants
- Variables
- func HMACHash(message []byte, secret []byte, hashFunc func() hash.Hash) []byte
- func SHAHash(message []byte, hashFunc func() hash.Hash) []byte
- func UnwrapMemcachedError(rv *gomemcached.MCResponse, err error) (*gomemcached.MCResponse, error)
- type CASState
- type CasFunc
- type CasOp
- type Client
- func (c *Client) Add(vb uint16, key string, flags int, exp int, body []byte) (*gomemcached.MCResponse, error)
- func (c *Client) Append(vb uint16, key string, data []byte) (*gomemcached.MCResponse, error)
- func (c *Client) Auth(user, pass string) (*gomemcached.MCResponse, error)
- func (c *Client) AuthAdvanced(user, pass string) (*gomemcached.MCResponse, error)
- func (c *Client) AuthList() (*gomemcached.MCResponse, error)
- func (c *Client) AuthPlain(user, pass string) (*gomemcached.MCResponse, error)
- func (c *Client) AuthSHA(user, pass string, mech string, hashFunc func() hash.Hash, hashSize int) (*gomemcached.MCResponse, error)
- func (c *Client) AuthSHA1(user, pass string) (*gomemcached.MCResponse, error)
- func (c *Client) AuthSHA256(user, pass string) (*gomemcached.MCResponse, error)
- func (c *Client) AuthSHA512(user, pass string) (*gomemcached.MCResponse, error)
- func (c *Client) AuthScramSha(user, pass string) (*gomemcached.MCResponse, error)
- func (c *Client) CAS(vb uint16, k string, f CasFunc, initexp int) (*gomemcached.MCResponse, error)
- func (c *Client) CASNext(vb uint16, k string, exp int, state *CASState) bool
- func (c *Client) Close() error
- func (c *Client) Decr(vb uint16, key string, amt, def uint64, exp int) (uint64, error)
- func (c *Client) Del(vb uint16, key string) (*gomemcached.MCResponse, error)
- func (c *Client) DelCas(vb uint16, key string, cas uint64) (*gomemcached.MCResponse, error)
- func (c *Client) EnableMutationToken() (*gomemcached.MCResponse, error)
- func (c *Client) Get(vb uint16, key string) (*gomemcached.MCResponse, error)
- func (c *Client) GetAndTouch(vb uint16, key string, exp int) (*gomemcached.MCResponse, error)
- func (c *Client) GetBulk(vb uint16, keys []string, rv map[string]*gomemcached.MCResponse) error
- func (c *Client) GetMeta(vb uint16, key string) (*gomemcached.MCResponse, error)
- func (c *Client) GetRandomDoc() (*gomemcached.MCResponse, error)
- func (c *Client) Hijack() io.ReadWriteCloser
- func (c *Client) Incr(vb uint16, key string, amt, def uint64, exp int) (uint64, error)
- func (c Client) IsHealthy() bool
- func (mc *Client) NewUprFeed() (*UprFeed, error)
- func (mc *Client) NewUprFeedWithConfig(ackByClient bool) (*UprFeed, error)
- func (c *Client) Observe(vb uint16, key string) (result ObserveResult, err error)
- func (c *Client) ObserveSeq(vb uint16, vbuuid uint64) (result *ObserveSeqResult, err error)
- func (c *Client) Receive() (*gomemcached.MCResponse, error)
- func (c *Client) SelectBucket(bucket string) (*gomemcached.MCResponse, error)
- func (c *Client) Send(req *gomemcached.MCRequest) (rv *gomemcached.MCResponse, err error)
- func (c *Client) Set(vb uint16, key string, flags int, exp int, body []byte) (*gomemcached.MCResponse, error)
- func (c *Client) SetCas(vb uint16, key string, flags int, exp int, cas uint64, body []byte) (*gomemcached.MCResponse, error)
- func (c *Client) SetKeepAliveOptions(interval time.Duration)
- func (c *Client) SetReadDeadline(t time.Time)
- func (mc *Client) StartTapFeed(args TapArguments) (*TapFeed, error)
- func (c *Client) Stats(key string) ([]StatValue, error)
- func (c *Client) StatsMap(key string) (map[string]string, error)
- func (c *Client) StatsMapForSpecifiedStats(key string, statsMap map[string]string) error
- func (c *Client) Transmit(req *gomemcached.MCRequest) error
- func (mc *Client) UprGetFailoverLog(vb []uint16) (map[uint16]*FailoverLog, error)
- type FailoverLog
- type ObserveResult
- type ObserveSeqResult
- type ObservedStatus
- type StatValue
- type TapArguments
- type TapEvent
- type TapFeed
- type TapOpcode
- type UprEvent
- type UprFeed
- func (feed *UprFeed) Close()
- func (feed *UprFeed) CloseStream(vbno, opaqueMSB uint16) error
- func (feed *UprFeed) Closed() bool
- func (feed *UprFeed) GetUprStats() *UprStats
- func (feed *UprFeed) IncrementAckBytes(bytes uint32) error
- func (feed *UprFeed) StartFeed() error
- func (feed *UprFeed) StartFeedWithConfig(datachan_len int) error
- func (feed *UprFeed) UprOpen(name string, sequence uint32, bufSize uint32) error
- func (feed *UprFeed) UprOpenWithXATTR(name string, sequence uint32, bufSize uint32) error
- func (feed *UprFeed) UprRequestStream(vbno, opaqueMSB uint16, flags uint32, ...) error
- type UprStats
- type UprStream
Constants ¶
const ( AuthMech_SHA512 = "SCRAM-SHA512" AuthMech_SHA256 = "SCRAM-SHA256" AuthMech_SHA1 = "SCRAM-SHA1" AuthMech_PLAIN = "PLAIN" )
authentication mechanisms
const ( ObservedNotPersisted = ObservedStatus(0x00) // found, not persisted ObservedPersisted = ObservedStatus(0x01) // found, persisted ObservedNotFound = ObservedStatus(0x80) // not found (or a persisted delete) ObservedLogicallyDeleted = ObservedStatus(0x81) // pending deletion (not persisted yet) )
Observation status values.
const ( // CASStore instructs the server to store the new value normally CASStore = CasOp(iota) // CASQuit instructs the client to stop attempting to CAS, leaving value untouched CASQuit // CASDelete instructs the server to delete the current value CASDelete )
const ( TapBeginBackfill = TapOpcode(iota) TapEndBackfill TapMutation TapDeletion TapCheckpointStart TapCheckpointEnd )
Tap opcode values.
const TapNoBackfill = math.MaxUint64
Value for TapArguments.Backfill denoting that no past events at all should be sent.
Variables ¶
var (
DefaultDialTimeout = time.Duration(0) // No timeout
)
var ErrorInvalidLog = errors.New("couchbase.errorInvalidLog")
error codes
var Healthy uint32 = 1
var InvalidResponseError = errors.New("the server response is not according to the spec")
var InvalidStartResponseError = errors.New("the server initial response is not according to the spec")
var InvalidStepResponseError = errors.New("the server final response is not according to the spec")
var ReceiveHook func(*gomemcached.MCResponse, int, error)
ReceiveHook is called after every packet is received (or attempted to be)
var TapRecvHook func(*gomemcached.MCRequest, int, error)
TapRecvHook is called after every incoming tap packet is received.
var TransmitHook func(*gomemcached.MCRequest, int, error)
TransmitHook is called after each packet is transmitted.
var UnHealthy uint32 = 0
Functions ¶
func UnwrapMemcachedError ¶
func UnwrapMemcachedError(rv *gomemcached.MCResponse, err error) (*gomemcached.MCResponse, error)
UnwrapMemcachedError converts memcached errors to normal responses.
If the error is a memcached response, declare the error to be nil so a client can handle the status without worrying about whether it indicates success or failure.
Types ¶
type CASState ¶
type CASState struct { Value []byte // Current value of key; update in place to new value Cas uint64 // Current CAS value of key Exists bool // Does a value exist for the key? (If not, Value will be nil) Err error // Error, if any, after CASNext returns false // contains filtered or unexported fields }
CASState tracks the state of CAS over several operations.
This is used directly by CASNext and indirectly by CAS
type CasFunc ¶
CasFunc is type type of function to perform a CAS transform.
Input is the current value, or nil if no value exists. The function should return the new value (if any) to set, and the store/quit/delete operation.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
The Client itself.
func (*Client) Add ¶
func (c *Client) Add(vb uint16, key string, flags int, exp int, body []byte) (*gomemcached.MCResponse, error)
Add a value for a key (store if not exists).
func (*Client) Append ¶
func (c *Client) Append(vb uint16, key string, data []byte) (*gomemcached.MCResponse, error)
Append data to the value of a key.
func (*Client) Auth ¶
func (c *Client) Auth(user, pass string) (*gomemcached.MCResponse, error)
Auth performs SASL PLAIN authentication against the server.
func (*Client) AuthAdvanced ¶
func (c *Client) AuthAdvanced(user, pass string) (*gomemcached.MCResponse, error)
uses SCRAM-SHA authentication if supported. uses plain authentication otherwise
func (*Client) AuthList ¶
func (c *Client) AuthList() (*gomemcached.MCResponse, error)
AuthList lists SASL auth mechanisms.
func (*Client) AuthPlain ¶
func (c *Client) AuthPlain(user, pass string) (*gomemcached.MCResponse, error)
func (*Client) AuthSHA ¶
func (c *Client) AuthSHA(user, pass string, mech string, hashFunc func() hash.Hash, hashSize int) (*gomemcached.MCResponse, error)
func (*Client) AuthSHA1 ¶
func (c *Client) AuthSHA1(user, pass string) (*gomemcached.MCResponse, error)
func (*Client) AuthSHA256 ¶
func (c *Client) AuthSHA256(user, pass string) (*gomemcached.MCResponse, error)
func (*Client) AuthSHA512 ¶
func (c *Client) AuthSHA512(user, pass string) (*gomemcached.MCResponse, error)
func (*Client) AuthScramSha ¶
func (c *Client) AuthScramSha(user, pass string) (*gomemcached.MCResponse, error)
uses SCRAM-SHA authentication. returns error if SCRAM-SHA is not supported
func (*Client) CAS ¶
func (c *Client) CAS(vb uint16, k string, f CasFunc, initexp int) (*gomemcached.MCResponse, error)
CAS performs a CAS transform with the given function.
If the value does not exist, a nil current value will be sent to f.
func (*Client) CASNext ¶
CASNext is a non-callback, loop-based version of CAS method.
Usage is like this:
var state memcached.CASState
for client.CASNext(vb, key, exp, &state) { state.Value = some_mutation(state.Value) }
if state.Err != nil { ... }
func (*Client) Del ¶
func (c *Client) Del(vb uint16, key string) (*gomemcached.MCResponse, error)
Del deletes a key.
func (*Client) DelCas ¶
func (c *Client) DelCas(vb uint16, key string, cas uint64) (*gomemcached.MCResponse, error)
DelCas attempts to delete a key with a CAS.
func (*Client) EnableMutationToken ¶
func (c *Client) EnableMutationToken() (*gomemcached.MCResponse, error)
Send a hello command to enable MutationTokens
func (*Client) Get ¶
func (c *Client) Get(vb uint16, key string) (*gomemcached.MCResponse, error)
Get the value for a key.
func (*Client) GetAndTouch ¶
func (c *Client) GetAndTouch(vb uint16, key string, exp int) (*gomemcached.MCResponse, error)
Get the value for a key, and update expiry
func (*Client) GetBulk ¶
func (c *Client) GetBulk(vb uint16, keys []string, rv map[string]*gomemcached.MCResponse) error
GetBulk gets keys in bulk
func (*Client) GetMeta ¶
func (c *Client) GetMeta(vb uint16, key string) (*gomemcached.MCResponse, error)
Get metadata for a key
func (*Client) GetRandomDoc ¶
func (c *Client) GetRandomDoc() (*gomemcached.MCResponse, error)
Get a random document
func (*Client) Hijack ¶
func (c *Client) Hijack() io.ReadWriteCloser
Hijack exposes the underlying connection from this client.
It also marks the connection as unhealthy since the client will have lost control over the connection and can't otherwise verify things are in good shape for connection pools.
func (Client) IsHealthy ¶
IsHealthy returns true unless the client is belived to have difficulty communicating to its server.
This is useful for connection pools where we want to non-destructively determine that a connection may be reused.
func (*Client) NewUprFeed ¶
NewUprFeed creates a new UPR Feed. TODO: Describe side-effects on bucket instance and its connection pool.
func (*Client) NewUprFeedWithConfig ¶
func (*Client) Observe ¶
func (c *Client) Observe(vb uint16, key string) (result ObserveResult, err error)
Observe gets the persistence/replication/CAS state of a key
func (*Client) ObserveSeq ¶
func (c *Client) ObserveSeq(vb uint16, vbuuid uint64) (result *ObserveSeqResult, err error)
func (*Client) Receive ¶
func (c *Client) Receive() (*gomemcached.MCResponse, error)
Receive a response
func (*Client) SelectBucket ¶
func (c *Client) SelectBucket(bucket string) (*gomemcached.MCResponse, error)
select bucket
func (*Client) Send ¶
func (c *Client) Send(req *gomemcached.MCRequest) (rv *gomemcached.MCResponse, err error)
Send a custom request and get the response.
func (*Client) Set ¶
func (c *Client) Set(vb uint16, key string, flags int, exp int, body []byte) (*gomemcached.MCResponse, error)
Set the value for a key.
func (*Client) SetCas ¶
func (c *Client) SetCas(vb uint16, key string, flags int, exp int, cas uint64, body []byte) (*gomemcached.MCResponse, error)
SetCas set the value for a key with cas
func (*Client) SetKeepAliveOptions ¶
func (*Client) SetReadDeadline ¶
func (*Client) StartTapFeed ¶
func (mc *Client) StartTapFeed(args TapArguments) (*TapFeed, error)
StartTapFeed starts a TAP feed on a client connection.
The events can be read from the returned channel. The connection can no longer be used for other purposes; it's now reserved for receiving the TAP messages. To stop receiving events, close the client connection.
func (*Client) StatsMap ¶
StatsMap requests server-side stats similarly to Stats, but returns them as a map.
Use "" as the stat key for toplevel stats.
func (*Client) StatsMapForSpecifiedStats ¶
instead of returning a new statsMap, simply populate passed in statsMap, which contains all the keys for which stats needs to be retrieved
func (*Client) Transmit ¶
func (c *Client) Transmit(req *gomemcached.MCRequest) error
Transmit send a request, but does not wait for a response.
func (*Client) UprGetFailoverLog ¶
func (mc *Client) UprGetFailoverLog( vb []uint16) (map[uint16]*FailoverLog, error)
UprGetFailoverLog for given list of vbuckets.
type FailoverLog ¶
type FailoverLog [][2]uint64
FailoverLog containing vvuid and sequnce number
func (*FailoverLog) Latest ¶
func (flogp *FailoverLog) Latest() (vbuuid, seqno uint64, err error)
type ObserveResult ¶
type ObserveResult struct { Status ObservedStatus // Whether the value has been persisted/deleted Cas uint64 // Current value's CAS PersistenceTime time.Duration // Node's average time to persist a value ReplicationTime time.Duration // Node's average time to replicate a value }
ObserveResult represents the data obtained by an Observe call
func (ObserveResult) CheckPersistence ¶
func (result ObserveResult) CheckPersistence(cas uint64, deletion bool) (persisted bool, overwritten bool)
CheckPersistence checks whether a stored value has been persisted to disk yet.
type ObserveSeqResult ¶
type ObserveSeqResult struct { Failover uint8 // Set to 1 if a failover took place VbId uint16 // vbucket id Vbuuid uint64 // vucket uuid LastPersistedSeqNo uint64 // last persisted sequence number CurrentSeqNo uint64 // current sequence number OldVbuuid uint64 // Old bucket vbuuid LastSeqNo uint64 // last sequence number received before failover }
Sequence number based Observe Implementation
type ObservedStatus ¶
type ObservedStatus uint8
ObservedStatus is the type reported by the Observe method
type TapArguments ¶
type TapArguments struct { // Timestamp of oldest item to send. // // Use TapNoBackfill to suppress all past items. Backfill uint64 // If set, server will disconnect after sending existing items. Dump bool // The indices of the vbuckets to watch; empty/nil to watch all. VBuckets []uint16 // Transfers ownership of vbuckets during cluster rebalance. Takeover bool // If true, server will wait for client ACK after every notification. SupportAck bool // If true, client doesn't want values so server shouldn't send them. KeysOnly bool // If true, client wants the server to send checkpoint events. Checkpoint bool // Optional identifier to use for this client, to allow reconnects ClientName string // Registers this client (by name) till explicitly deregistered. RegisteredClient bool }
TapArguments are parameters for requesting a TAP feed.
Call DefaultTapArguments to get a default one.
func DefaultTapArguments ¶
func DefaultTapArguments() TapArguments
DefaultTapArguments returns a default set of parameter values to pass to StartTapFeed.
type TapEvent ¶
type TapEvent struct { Opcode TapOpcode // Type of event VBucket uint16 // VBucket this event applies to Flags uint32 // Item flags Expiry uint32 // Item expiration time Key, Value []byte // Item key/value Cas uint64 }
TapEvent is a TAP notification of an operation on the server.
type UprEvent ¶
type UprEvent struct { Opcode gomemcached.CommandCode // Type of event Status gomemcached.Status // Response status VBucket uint16 // VBucket this event applies to DataType uint8 // data type Opaque uint16 // 16 MSB of opaque VBuuid uint64 // This field is set by downstream Flags uint32 // Item flags Expiry uint32 // Item expiration time Key, Value []byte // Item key/value OldValue []byte // TODO: TBD: old document value Cas uint64 // CAS value of the item Seqno uint64 // sequence number of the mutation RevSeqno uint64 // rev sequence number : deletions LockTime uint32 // Lock time MetadataSize uint16 // Metadata size SnapstartSeq uint64 // start sequence number of this snapshot SnapendSeq uint64 // End sequence number of the snapshot SnapshotType uint32 // 0: disk 1: memory FailoverLog *FailoverLog // Failover log containing vvuid and sequnce number Error error // Error value in case of a failure ExtMeta []byte AckSize uint32 // The number of bytes that can be Acked to DCP }
UprEvent memcached events for UPR streams.
type UprFeed ¶
type UprFeed struct { C <-chan *UprEvent // Exported channel for receiving UPR events Error error // error // contains filtered or unexported fields }
UprFeed represents an UPR feed. A feed contains a connection to a single host and multiple vBuckets
func (*UprFeed) CloseStream ¶
CloseStream for specified vbucket.
func (*UprFeed) GetUprStats ¶
func (*UprFeed) IncrementAckBytes ¶
Client, after setting ackByClient flag to true in NewUprFeedWithConfig() call, can call this API to notify gomemcached that the client has completed processing of a number of bytes This API is not thread safe. Caller should NOT have more than one go rountine calling this API
func (*UprFeed) StartFeedWithConfig ¶
func (*UprFeed) UprOpen ¶
UprOpen to connect with a UPR producer. Name: name of te UPR connection sequence: sequence number for the connection bufsize: max size of the application
func (*UprFeed) UprOpenWithXATTR ¶
UprOpen with XATTR enabled.