Documentation ¶
Overview ¶
Package memcached provides a memcached binary protocol client.
go implementation of upr client. See https://github.com/couchbaselabs/cbupr/blob/master/transport-spec.md TODO 1. Use a pool allocator to avoid garbage
Index ¶
- Constants
- Variables
- func GetSubDocVal(subPaths []string, context []*ClientContext) (extraBuf, valueBuf []byte)
- func IfResStatusError(response *gomemcached.MCResponse) bool
- func SetConnectionName(name string)
- func SetDefaultDialTimeout(dial time.Duration)
- func SetDefaultTimeouts(dial, read, write time.Duration)
- func UnwrapMemcachedError(rv *gomemcached.MCResponse, err error) (*gomemcached.MCResponse, error)
- type CASState
- type CasFunc
- type CasOp
- type Client
- func (c *Client) Add(vb uint16, key string, flags int, exp int, body []byte, ...) (*gomemcached.MCResponse, error)
- func (c *Client) Append(vb uint16, key string, data []byte, context ...*ClientContext) (*gomemcached.MCResponse, error)
- func (c *Client) Auth(user, pass string) (*gomemcached.MCResponse, error)
- func (c *Client) AuthList() (*gomemcached.MCResponse, error)
- func (c *Client) AuthPlain(user, pass string) (*gomemcached.MCResponse, error)
- func (c *Client) AuthScramSha(user, pass string) (*gomemcached.MCResponse, error)
- func (c *Client) CAS(vb uint16, k string, f CasFunc, initexp int) (*gomemcached.MCResponse, error)
- func (c *Client) CASNext(vb uint16, k string, exp int, state *CASState) bool
- func (c *Client) CancelRangeScan(vb uint16, uuid []byte, opaque uint32, context ...*ClientContext) (*gomemcached.MCResponse, error)
- func (c *Client) Close() error
- func (c *Client) CollectionEnabled() bool
- func (c *Client) CollectionsGetCID(scope string, collection string) (*gomemcached.MCResponse, error)
- func (c *Client) Conn() io.ReadWriteCloser
- func (c *Client) ContinueRangeScan(vb uint16, uuid []byte, opaque uint32, items uint32, timeout uint32, ...) error
- func (c *Client) CreateRandomScan(vb uint16, sampleSize int, withDocs bool, context ...*ClientContext) (*gomemcached.MCResponse, error)
- func (c *Client) CreateRangeScan(vb uint16, start []byte, excludeStart bool, end []byte, excludeEnd bool, ...) (*gomemcached.MCResponse, error)
- func (c *Client) Decr(vb uint16, key string, amt, def uint64, exp int, context ...*ClientContext) (uint64, error)
- func (c *Client) Del(vb uint16, key string, context ...*ClientContext) (*gomemcached.MCResponse, error)
- func (c *Client) EnableDataPool(getter func(uint64) ([]byte, error), doneCb func([]byte)) error
- func (c *Client) EnableFeatures(features Features) (*gomemcached.MCResponse, error)
- func (c *Client) EnableMutationToken() (*gomemcached.MCResponse, error)
- func (c *Client) Get(vb uint16, key string, context ...*ClientContext) (*gomemcached.MCResponse, error)
- func (c *Client) GetAllVbSeqnos(vbSeqnoMap map[uint16]uint64, context ...*ClientContext) (map[uint16]uint64, error)
- func (c *Client) GetAndTouch(vb uint16, key string, exp int, context ...*ClientContext) (*gomemcached.MCResponse, error)
- func (c *Client) GetBulk(vb uint16, keys []string, rv map[string]*gomemcached.MCResponse, ...) error
- func (c *Client) GetCollectionsManifest() (*gomemcached.MCResponse, error)
- func (c *Client) GetConnName() string
- func (c *Client) GetErrorMap(errMapVersion gomemcached.ErrorMapVersion) (map[string]interface{}, error)
- func (c *Client) GetMeta(vb uint16, key string, context ...*ClientContext) (*gomemcached.MCResponse, error)
- func (c *Client) GetRandomDoc(context ...*ClientContext) (*gomemcached.MCResponse, error)
- func (c *Client) GetSubdoc(vb uint16, key string, subPaths []string, context ...*ClientContext) (*gomemcached.MCResponse, error)
- func (c *Client) Hijack() MemcachedConnection
- func (c *Client) Incr(vb uint16, key string, amt, def uint64, exp int, context ...*ClientContext) (uint64, error)
- func (c *Client) IsFeatureEnabled(feature Feature) bool
- func (c Client) IsHealthy() bool
- func (c *Client) LastBucket() string
- func (mc *Client) NewUprFeed() (*UprFeed, error)
- func (mc *Client) NewUprFeedIface() (UprFeedIface, error)
- func (mc *Client) NewUprFeedWithConfig(ackByClient bool) (*UprFeed, error)
- func (mc *Client) NewUprFeedWithConfigIface(ackByClient bool) (UprFeedIface, error)
- func (c *Client) Observe(vb uint16, key string) (result ObserveResult, err error)
- func (c *Client) ObserveSeq(vb uint16, vbuuid uint64) (result *ObserveSeqResult, err error)
- func (c *Client) Receive() (*gomemcached.MCResponse, error)
- func (c *Client) ReceiveWithDeadline(deadline time.Time) (*gomemcached.MCResponse, error)
- func (c *Client) Replica() bool
- func (c *Client) SelectBucket(bucket string) (*gomemcached.MCResponse, error)
- func (c *Client) Send(req *gomemcached.MCRequest) (rv *gomemcached.MCResponse, err error)
- func (c *Client) Set(vb uint16, key string, flags int, exp int, body []byte, ...) (*gomemcached.MCResponse, error)
- func (c *Client) SetCas(vb uint16, key string, flags int, exp int, cas uint64, body []byte, ...) (*gomemcached.MCResponse, error)
- func (c *Client) SetConnName(name string)
- func (c *Client) SetDeadline(t time.Time)
- func (c *Client) SetKeepAliveOptions(interval time.Duration)
- func (c *Client) SetReadDeadline(t time.Time)
- func (c *Client) SetReplica(r bool)
- func (c *Client) SetSubdoc(vb uint16, key string, ops []SubDocOp, addOnly bool, exp int, cas uint64, ...) (*gomemcached.MCResponse, error)
- func (mc *Client) StartTapFeed(args TapArguments) (*TapFeed, error)
- func (c *Client) Stats(key string) ([]StatValue, error)
- func (c *Client) StatsFunc(key string, fn func(key, val []byte)) error
- func (c *Client) StatsMap(key string) (map[string]string, error)
- func (c *Client) StatsMapForSpecifiedStats(key string, statsMap map[string]string) error
- func (c *Client) Transmit(req *gomemcached.MCRequest) error
- func (c *Client) TransmitResponse(res *gomemcached.MCResponse) error
- func (c *Client) TransmitWithDeadline(req *gomemcached.MCRequest, deadline time.Time) error
- func (mc *Client) UprGetFailoverLog(vb []uint16) (map[uint16]*FailoverLog, error)
- func (c *Client) ValidateKey(vb uint16, key string, context ...*ClientContext) (bool, error)
- type ClientContext
- type ClientIface
- type CollectionChangedEvent
- type CollectionCreateEvent
- type CollectionDropEvent
- type CollectionsFilter
- type DcpStreamType
- type FailoverLog
- type Feature
- type Features
- type FeedState
- type MemcachedConnection
- type ObserveResult
- type ObserveSeqResult
- type ObservedStatus
- type PriorityType
- type ScopeCreateEvent
- type ScopeDropEvent
- type StatValue
- type SubDocOp
- type SystemEventType
- type TapArguments
- type TapEvent
- type TapFeed
- type TapOpcode
- type Uleb128
- type UprEvent
- func (event *UprEvent) GetCollectionId() (uint32, error)
- func (event *UprEvent) GetManifestId() (uint64, error)
- func (event *UprEvent) GetMaxTTL() (uint32, error)
- func (event *UprEvent) GetOsoBegin() (bool, error)
- func (event *UprEvent) GetScopeId() (uint32, error)
- func (event *UprEvent) GetSystemEventName() (string, error)
- func (event *UprEvent) IsCollectionType() bool
- func (event *UprEvent) IsOsoSnapshot() bool
- func (event *UprEvent) IsSeqnoAdv() bool
- func (event *UprEvent) IsSnappyDataType() bool
- func (event *UprEvent) IsStreamEnd() bool
- func (event *UprEvent) IsSystemEvent() bool
- func (event *UprEvent) PopulateEvent(extras []byte)
- func (event *UprEvent) PopulateFieldsBasedOnStreamType(rq gomemcached.MCRequest, streamType DcpStreamType)
- func (event *UprEvent) PopulateOso(extras []byte)
- func (event *UprEvent) PopulateSeqnoAdv(extras []byte)
- func (event *UprEvent) PopulateStreamEndFlags(extras []byte)
- func (event *UprEvent) String() string
- type UprFeatures
- type UprFeed
- func (feed *UprFeed) ClientAck(event *UprEvent) error
- func (feed *UprFeed) Close()
- func (feed *UprFeed) CloseStream(vbno, opaqueMSB uint16) error
- func (feed *UprFeed) Closed() bool
- func (feed *UprFeed) GetError() error
- func (feed *UprFeed) GetUprEventCh() <-chan *UprEvent
- func (feed *UprFeed) GetUprStats() *UprStats
- func (feed *UprFeed) SetPriorityAsync(p PriorityType) error
- func (feed *UprFeed) StartFeed() error
- func (feed *UprFeed) StartFeedWithConfig(datachan_len int) error
- func (feed *UprFeed) UprOpen(name string, sequence uint32, bufSize uint32) error
- func (feed *UprFeed) UprOpenWithFeatures(name string, sequence uint32, bufSize uint32, features UprFeatures) (error, UprFeatures)
- func (feed *UprFeed) UprOpenWithXATTR(name string, sequence uint32, bufSize uint32) error
- func (feed *UprFeed) UprRequestCollectionsStream(vbno, opaqueMSB uint16, flags uint32, ...) error
- func (feed *UprFeed) UprRequestStream(vbno, opaqueMSB uint16, flags uint32, ...) error
- type UprFeedIface
- type UprStats
- type UprStream
- type VBSeqnos
- type VbStateType
Constants ¶
const ( FeatureTcpNoDelay = Feature(0x03) FeatureMutationToken = Feature(0x04) // XATTR bit in data type field with dcp mutations FeatureXattr = Feature(0x06) FeatureXerror = Feature(0x07) FeatureSnappyCompression = Feature(0x0a) FeatureDataType = Feature(0x0b) FeatureTracing = Feature(0x0f) FeatureSyncReplication = Feature(0x11) FeatureCollections = Feature(0x12) FeatureOpenTracing = Feature(0x13) FeaturePreserveExpiry = Feature(0x14) FeatureComputeUnits = Feature(0x1a) FeatureHandleThrottle = Feature(0x1b) )
const ( ObservedNotPersisted = ObservedStatus(0x00) // found, not persisted ObservedPersisted = ObservedStatus(0x01) // found, persisted ObservedNotFound = ObservedStatus(0x80) // not found (or a persisted delete) ObservedLogicallyDeleted = ObservedStatus(0x81) // pending deletion (not persisted yet) )
Observation status values.
const ( // CASStore instructs the server to store the new value normally CASStore = CasOp(iota) // CASQuit instructs the client to stop attempting to CAS, leaving value untouched CASQuit // CASDelete instructs the server to delete the current value CASDelete )
const ( TapBeginBackfill = TapOpcode(iota) TapEndBackfill TapMutation TapDeletion TapCheckpointStart TapCheckpointEnd )
Tap opcode values.
const ( FeedStateInitial = iota FeedStateOpened = iota FeedStateClosed = iota )
const ( CompressionTypeStartMarker = iota // also means invalid CompressionTypeNone = iota CompressionTypeSnappy = iota CompressionTypeEndMarker = iota // also means invalid )
const ( JSONDataType uint8 = 1 SnappyDataType uint8 = 2 XattrDataType uint8 = 4 )
kv_engine/include/mcbp/protocol/datatype.h
const RandomScanSeed = 0x5eedbead
const StreamNotRequested = "has not been requested"
const TapNoBackfill = math.MaxUint64
Value for TapArguments.Backfill denoting that no past events at all should be sent.
const UPRDefaultNoopIntervalSeconds = 120
Variables ¶
var ( ErrUnSuccessfulHello = errors.New("Unsuccessful HELLO exchange") ErrInvalidHello = errors.New("Invalid HELLO response") ErrPreserveExpiryNotSupported = errors.New("PreserveExpiry is not supported") ErrDurabilityNotSupported = errors.New("Durability is not supported") )
var ( // ConnName is used if Client.connName is not set ConnName = "GoMemcached" DefaultDialTimeout = time.Duration(0) // No timeout DefaultWriteTimeout = time.Duration(0) // No timeout )
var ErrorInvalidLog = errors.New("couchbase.errorInvalidLog")
error codes
var ErrorInvalidOp error = fmt.Errorf("Invalid Operation")
var ErrorInvalidVersion error = fmt.Errorf("Invalid version for parsing")
var ErrorNoMaxTTL error = fmt.Errorf("This event has no max TTL")
var ErrorValueTooShort error = fmt.Errorf("Value length is too short")
var Healthy uint32 = 1
var ReceiveHook func(*gomemcached.MCResponse, int, error)
ReceiveHook is called after every packet is received (or attempted to be)
var TapRecvHook func(*gomemcached.MCRequest, int, error)
TapRecvHook is called after every incoming tap packet is received.
var TransmitHook func(*gomemcached.MCRequest, int, error)
TransmitHook is called after each packet is transmitted.
var UnHealthy uint32 = 0
Functions ¶
func GetSubDocVal ¶
func GetSubDocVal(subPaths []string, context []*ClientContext) (extraBuf, valueBuf []byte)
func IfResStatusError ¶
func IfResStatusError(response *gomemcached.MCResponse) bool
func SetConnectionName ¶ added in v0.3.0
func SetConnectionName(name string)
func SetDefaultDialTimeout ¶
func SetDefaultTimeouts ¶
func UnwrapMemcachedError ¶
func UnwrapMemcachedError(rv *gomemcached.MCResponse, err error) (*gomemcached.MCResponse, error)
UnwrapMemcachedError converts memcached errors to normal responses.
If the error is a memcached response, declare the error to be nil so a client can handle the status without worrying about whether it indicates success or failure.
Types ¶
type CASState ¶
type CASState struct { Value []byte // Current value of key; update in place to new value Cas uint64 // Current CAS value of key Exists bool // Does a value exist for the key? (If not, Value will be nil) Err error // Error, if any, after CASNext returns false // contains filtered or unexported fields }
CASState tracks the state of CAS over several operations.
This is used directly by CASNext and indirectly by CAS
type CasFunc ¶
CasFunc is type type of function to perform a CAS transform.
Input is the current value, or nil if no value exists. The function should return the new value (if any) to set, and the store/quit/delete operation.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
The Client itself.
func ConnectTLS ¶
Connect to a memcached server using TLS.
func (*Client) Add ¶
func (c *Client) Add(vb uint16, key string, flags int, exp int, body []byte, context ...*ClientContext) (*gomemcached.MCResponse, error)
Add a value for a key (store if not exists).
func (*Client) Append ¶
func (c *Client) Append(vb uint16, key string, data []byte, context ...*ClientContext) (*gomemcached.MCResponse, error)
Append data to the value of a key.
func (*Client) Auth ¶
func (c *Client) Auth(user, pass string) (*gomemcached.MCResponse, error)
Auth performs SASL PLAIN authentication against the server.
func (*Client) AuthList ¶
func (c *Client) AuthList() (*gomemcached.MCResponse, error)
AuthList lists SASL auth mechanisms.
func (*Client) AuthPlain ¶
func (c *Client) AuthPlain(user, pass string) (*gomemcached.MCResponse, error)
func (*Client) AuthScramSha ¶
func (c *Client) AuthScramSha(user, pass string) (*gomemcached.MCResponse, error)
AuthScramSha performs SCRAM-SHA authentication against the server.
func (*Client) CAS ¶
func (c *Client) CAS(vb uint16, k string, f CasFunc, initexp int) (*gomemcached.MCResponse, error)
CAS performs a CAS transform with the given function.
If the value does not exist, a nil current value will be sent to f.
func (*Client) CASNext ¶
CASNext is a non-callback, loop-based version of CAS method.
Usage is like this:
var state memcached.CASState
for client.CASNext(vb, key, exp, &state) { state.Value = some_mutation(state.Value) }
if state.Err != nil { ... }
func (*Client) CancelRangeScan ¶ added in v0.2.1
func (c *Client) CancelRangeScan(vb uint16, uuid []byte, opaque uint32, context ...*ClientContext) ( *gomemcached.MCResponse, error)
func (*Client) CollectionEnabled ¶
func (*Client) CollectionsGetCID ¶
func (c *Client) CollectionsGetCID(scope string, collection string) (*gomemcached.MCResponse, error)
Retrieve the collections manifest.
func (*Client) Conn ¶
func (c *Client) Conn() io.ReadWriteCloser
func (*Client) ContinueRangeScan ¶ added in v0.2.1
func (*Client) CreateRandomScan ¶ added in v0.2.1
func (c *Client) CreateRandomScan(vb uint16, sampleSize int, withDocs bool, context ...*ClientContext) ( *gomemcached.MCResponse, error)
func (*Client) CreateRangeScan ¶ added in v0.2.1
func (c *Client) CreateRangeScan(vb uint16, start []byte, excludeStart bool, end []byte, excludeEnd bool, withDocs bool, context ...*ClientContext) (*gomemcached.MCResponse, error)
func (*Client) Decr ¶
func (c *Client) Decr(vb uint16, key string, amt, def uint64, exp int, context ...*ClientContext) (uint64, error)
Decr decrements the value at the given key.
func (*Client) Del ¶
func (c *Client) Del(vb uint16, key string, context ...*ClientContext) (*gomemcached.MCResponse, error)
Del deletes a key.
func (*Client) EnableDataPool ¶ added in v0.3.1
func (*Client) EnableFeatures ¶
func (c *Client) EnableFeatures(features Features) (*gomemcached.MCResponse, error)
Send a hello command to enable specific features
func (*Client) EnableMutationToken ¶
func (c *Client) EnableMutationToken() (*gomemcached.MCResponse, error)
Send a hello command to enable MutationTokens
func (*Client) Get ¶
func (c *Client) Get(vb uint16, key string, context ...*ClientContext) (*gomemcached.MCResponse, error)
Get the value for a key.
func (*Client) GetAllVbSeqnos ¶
func (c *Client) GetAllVbSeqnos(vbSeqnoMap map[uint16]uint64, context ...*ClientContext) (map[uint16]uint64, error)
Since the binary request supports only a single collection at a time, it is possible that this may be called multiple times in succession by callers to get vbSeqnos for multiple collections. Thus, caller could pass in a non-nil map so the gomemcached client won't need to allocate new map for each call to prevent too much GC NOTE: If collection is enabled and context is not given, KV will still return stats for default collection
func (*Client) GetAndTouch ¶
func (c *Client) GetAndTouch(vb uint16, key string, exp int, context ...*ClientContext) (*gomemcached.MCResponse, error)
Get the value for a key, and update expiry
func (*Client) GetBulk ¶
func (c *Client) GetBulk(vb uint16, keys []string, rv map[string]*gomemcached.MCResponse, subPaths []string, context ...*ClientContext) error
GetBulk gets keys in bulk
func (*Client) GetCollectionsManifest ¶
func (c *Client) GetCollectionsManifest() (*gomemcached.MCResponse, error)
Retrieve the collections manifest.
func (*Client) GetConnName ¶ added in v0.3.0
func (*Client) GetErrorMap ¶ added in v0.3.0
func (c *Client) GetErrorMap(errMapVersion gomemcached.ErrorMapVersion) (map[string]interface{}, error)
func (*Client) GetMeta ¶
func (c *Client) GetMeta(vb uint16, key string, context ...*ClientContext) (*gomemcached.MCResponse, error)
Get metadata for a key
func (*Client) GetRandomDoc ¶
func (c *Client) GetRandomDoc(context ...*ClientContext) (*gomemcached.MCResponse, error)
Get a random document
func (*Client) GetSubdoc ¶
func (c *Client) GetSubdoc(vb uint16, key string, subPaths []string, context ...*ClientContext) (*gomemcached.MCResponse, error)
Get the xattrs, doc value for the input key
func (*Client) Hijack ¶
func (c *Client) Hijack() MemcachedConnection
Hijack exposes the underlying connection from this client.
It also marks the connection as unhealthy since the client will have lost control over the connection and can't otherwise verify things are in good shape for connection pools.
func (*Client) Incr ¶
func (c *Client) Incr(vb uint16, key string, amt, def uint64, exp int, context ...*ClientContext) (uint64, error)
Incr increments the value at the given key.
func (*Client) IsFeatureEnabled ¶ added in v0.1.4
func (Client) IsHealthy ¶
IsHealthy returns true unless the client is belived to have difficulty communicating to its server.
This is useful for connection pools where we want to non-destructively determine that a connection may be reused.
func (*Client) LastBucket ¶ added in v0.1.2
func (*Client) NewUprFeed ¶
NewUprFeed creates a new UPR Feed. TODO: Describe side-effects on bucket instance and its connection pool.
func (*Client) NewUprFeedIface ¶
func (mc *Client) NewUprFeedIface() (UprFeedIface, error)
func (*Client) NewUprFeedWithConfig ¶
func (*Client) NewUprFeedWithConfigIface ¶
func (mc *Client) NewUprFeedWithConfigIface(ackByClient bool) (UprFeedIface, error)
func (*Client) Observe ¶
func (c *Client) Observe(vb uint16, key string) (result ObserveResult, err error)
Observe gets the persistence/replication/CAS state of a key
func (*Client) ObserveSeq ¶
func (c *Client) ObserveSeq(vb uint16, vbuuid uint64) (result *ObserveSeqResult, err error)
func (*Client) Receive ¶
func (c *Client) Receive() (*gomemcached.MCResponse, error)
Receive a response
func (*Client) ReceiveWithDeadline ¶
func (c *Client) ReceiveWithDeadline(deadline time.Time) (*gomemcached.MCResponse, error)
func (*Client) SelectBucket ¶
func (c *Client) SelectBucket(bucket string) (*gomemcached.MCResponse, error)
select bucket
func (*Client) Send ¶
func (c *Client) Send(req *gomemcached.MCRequest) (rv *gomemcached.MCResponse, err error)
Send a custom request and get the response.
func (*Client) Set ¶
func (c *Client) Set(vb uint16, key string, flags int, exp int, body []byte, context ...*ClientContext) (*gomemcached.MCResponse, error)
Set the value for a key.
func (*Client) SetCas ¶
func (c *Client) SetCas(vb uint16, key string, flags int, exp int, cas uint64, body []byte, context ...*ClientContext) (*gomemcached.MCResponse, error)
SetCas set the value for a key with cas
func (*Client) SetConnName ¶ added in v0.3.0
func (*Client) SetDeadline ¶
func (*Client) SetKeepAliveOptions ¶
func (*Client) SetReadDeadline ¶
func (*Client) SetReplica ¶ added in v0.1.4
Read from replica setting
func (*Client) SetSubdoc ¶ added in v0.3.0
func (c *Client) SetSubdoc(vb uint16, key string, ops []SubDocOp, addOnly bool, exp int, cas uint64, context ...*ClientContext) ( *gomemcached.MCResponse, error)
func (*Client) StartTapFeed ¶
func (mc *Client) StartTapFeed(args TapArguments) (*TapFeed, error)
StartTapFeed starts a TAP feed on a client connection.
The events can be read from the returned channel. The connection can no longer be used for other purposes; it's now reserved for receiving the TAP messages. To stop receiving events, close the client connection.
func (*Client) StatsFunc ¶ added in v0.1.2
Stats requests server-side stats.
Use "" as the stat key for toplevel stats.
func (*Client) StatsMap ¶
StatsMap requests server-side stats similarly to Stats, but returns them as a map.
Use "" as the stat key for toplevel stats.
func (*Client) StatsMapForSpecifiedStats ¶
instead of returning a new statsMap, simply populate passed in statsMap, which contains all the keys for which stats needs to be retrieved
func (*Client) Transmit ¶
func (c *Client) Transmit(req *gomemcached.MCRequest) error
Transmit send a request, but does not wait for a response.
func (*Client) TransmitResponse ¶
func (c *Client) TransmitResponse(res *gomemcached.MCResponse) error
TransmitResponse send a response, does not wait.
func (*Client) TransmitWithDeadline ¶
func (*Client) UprGetFailoverLog ¶
func (mc *Client) UprGetFailoverLog(vb []uint16) (map[uint16]*FailoverLog, error)
UprGetFailoverLog for given list of vbuckets.
func (*Client) ValidateKey ¶ added in v0.2.1
type ClientContext ¶
type ClientContext struct { // Collection-based context CollId uint32 // Impersonate context User string // VB-state related context // nil means not used in this context VbState *VbStateType // Preserve Expiry PreserveExpiry bool // Durability Level DurabilityLevel gomemcached.DurabilityLvl // Durability Timeout DurabilityTimeout time.Duration // Data is JSON in snappy compressed format Compressed bool // Sub-doc paths are document fields (not XATTRs) DocumentSubDocPaths bool // Include XATTRs in random document retrieval IncludeXATTRs bool }
func (*ClientContext) Copy ¶ added in v0.3.0
func (this *ClientContext) Copy() *ClientContext
func (*ClientContext) InitExtras ¶
func (context *ClientContext) InitExtras(req *gomemcached.MCRequest, client *Client)
type ClientIface ¶
type ClientIface interface { Add(vb uint16, key string, flags int, exp int, body []byte, context ...*ClientContext) (*gomemcached.MCResponse, error) Append(vb uint16, key string, data []byte, context ...*ClientContext) (*gomemcached.MCResponse, error) Auth(user, pass string) (*gomemcached.MCResponse, error) AuthList() (*gomemcached.MCResponse, error) AuthPlain(user, pass string) (*gomemcached.MCResponse, error) AuthScramSha(user, pass string) (*gomemcached.MCResponse, error) CASNext(vb uint16, k string, exp int, state *CASState) bool CAS(vb uint16, k string, f CasFunc, initexp int) (*gomemcached.MCResponse, error) CollectionsGetCID(scope string, collection string) (*gomemcached.MCResponse, error) CollectionEnabled() bool Close() error Decr(vb uint16, key string, amt, def uint64, exp int, context ...*ClientContext) (uint64, error) Del(vb uint16, key string, context ...*ClientContext) (*gomemcached.MCResponse, error) EnableMutationToken() (*gomemcached.MCResponse, error) EnableFeatures(features Features) (*gomemcached.MCResponse, error) EnableDataPool(getter func(uint64) ([]byte, error), doneCb func([]byte)) error Get(vb uint16, key string, context ...*ClientContext) (*gomemcached.MCResponse, error) GetAllVbSeqnos(vbSeqnoMap map[uint16]uint64, context ...*ClientContext) (map[uint16]uint64, error) GetAndTouch(vb uint16, key string, exp int, context ...*ClientContext) (*gomemcached.MCResponse, error) GetBulk(vb uint16, keys []string, rv map[string]*gomemcached.MCResponse, subPaths []string, context ...*ClientContext) error GetCollectionsManifest() (*gomemcached.MCResponse, error) GetMeta(vb uint16, key string, context ...*ClientContext) (*gomemcached.MCResponse, error) GetRandomDoc(context ...*ClientContext) (*gomemcached.MCResponse, error) GetSubdoc(vb uint16, key string, subPaths []string, context ...*ClientContext) (*gomemcached.MCResponse, error) SetSubdoc(vb uint16, key string, ops []SubDocOp, addOnly bool, exp int, cas uint64, context ...*ClientContext) ( *gomemcached.MCResponse, error) Hijack() MemcachedConnection Incr(vb uint16, key string, amt, def uint64, exp int, context ...*ClientContext) (uint64, error) LastBucket() string Observe(vb uint16, key string) (result ObserveResult, err error) ObserveSeq(vb uint16, vbuuid uint64) (result *ObserveSeqResult, err error) Receive() (*gomemcached.MCResponse, error) ReceiveWithDeadline(deadline time.Time) (*gomemcached.MCResponse, error) Replica() bool Send(req *gomemcached.MCRequest) (rv *gomemcached.MCResponse, err error) Set(vb uint16, key string, flags int, exp int, body []byte, context ...*ClientContext) (*gomemcached.MCResponse, error) SetKeepAliveOptions(interval time.Duration) SetReadDeadline(t time.Time) SetDeadline(t time.Time) SetReplica(r bool) SelectBucket(bucket string) (*gomemcached.MCResponse, error) SetCas(vb uint16, key string, flags int, exp int, cas uint64, body []byte, context ...*ClientContext) ( *gomemcached.MCResponse, error) Stats(key string) ([]StatValue, error) StatsFunc(key string, fn func(key, val []byte)) error StatsMap(key string) (map[string]string, error) StatsMapForSpecifiedStats(key string, statsMap map[string]string) error Transmit(req *gomemcached.MCRequest) error TransmitWithDeadline(req *gomemcached.MCRequest, deadline time.Time) error TransmitResponse(res *gomemcached.MCResponse) error UprGetFailoverLog(vb []uint16) (map[uint16]*FailoverLog, error) GetConnName() string SetConnName(name string) // UprFeed Related NewUprFeed() (*UprFeed, error) NewUprFeedIface() (UprFeedIface, error) NewUprFeedWithConfig(ackByClient bool) (*UprFeed, error) NewUprFeedWithConfigIface(ackByClient bool) (UprFeedIface, error) CreateRangeScan(vb uint16, start []byte, excludeStart bool, end []byte, excludeEnd bool, withDocs bool, context ...*ClientContext) (*gomemcached.MCResponse, error) CreateRandomScan(vb uint16, sampleSize int, withDocs bool, context ...*ClientContext) (*gomemcached.MCResponse, error) ContinueRangeScan(vb uint16, uuid []byte, opaque uint32, items uint32, maxSize uint32, timeout uint32, context ...*ClientContext) error CancelRangeScan(vb uint16, uuid []byte, opaque uint32, context ...*ClientContext) (*gomemcached.MCResponse, error) ValidateKey(vb uint16, key string, context ...*ClientContext) (bool, error) GetErrorMap(errMapVersion gomemcached.ErrorMapVersion) (map[string]interface{}, error) }
type CollectionChangedEvent ¶
type CollectionCreateEvent ¶
type CollectionDropEvent ¶
type CollectionsFilter ¶
type CollectionsFilter struct { ManifestUid uint64 UseManifestUid bool StreamId uint16 UseStreamId bool // Use either ScopeId OR CollectionsList, not both CollectionsList []uint32 ScopeId uint32 }
Collection based filter
func (*CollectionsFilter) IsValid ¶
func (c *CollectionsFilter) IsValid() error
func (*CollectionsFilter) ToStreamReqBody ¶
func (c *CollectionsFilter) ToStreamReqBody() ([]byte, error)
type DcpStreamType ¶
type DcpStreamType int32
const ( NonCollectionStream DcpStreamType = 0 CollectionsNonStreamId DcpStreamType = iota CollectionsStreamId DcpStreamType = iota )
var UninitializedStream DcpStreamType = -1
func (DcpStreamType) String ¶
func (t DcpStreamType) String() string
type FailoverLog ¶
type FailoverLog [][2]uint64
FailoverLog containing vvuid and sequnce number
func (*FailoverLog) Latest ¶
func (flogp *FailoverLog) Latest() (vbuuid, seqno uint64, err error)
type MemcachedConnection ¶ added in v0.3.2
type ObserveResult ¶
type ObserveResult struct { Status ObservedStatus // Whether the value has been persisted/deleted Cas uint64 // Current value's CAS PersistenceTime time.Duration // Node's average time to persist a value ReplicationTime time.Duration // Node's average time to replicate a value }
ObserveResult represents the data obtained by an Observe call
func (ObserveResult) CheckPersistence ¶
func (result ObserveResult) CheckPersistence(cas uint64, deletion bool) (persisted bool, overwritten bool)
CheckPersistence checks whether a stored value has been persisted to disk yet.
type ObserveSeqResult ¶
type ObserveSeqResult struct { Failover uint8 // Set to 1 if a failover took place VbId uint16 // vbucket id Vbuuid uint64 // vucket uuid LastPersistedSeqNo uint64 // last persisted sequence number CurrentSeqNo uint64 // current sequence number OldVbuuid uint64 // Old bucket vbuuid LastSeqNo uint64 // last sequence number received before failover }
Sequence number based Observe Implementation
type ObservedStatus ¶
type ObservedStatus uint8
ObservedStatus is the type reported by the Observe method
type PriorityType ¶
type PriorityType string
const ( PriorityDisabled PriorityType = "" PriorityLow PriorityType = "low" PriorityMed PriorityType = "medium" PriorityHigh PriorityType = "high" )
high > medium > disabled > low
type ScopeCreateEvent ¶
type ScopeDropEvent ¶
type SystemEventType ¶
type SystemEventType int
const ( CollectionCreate SystemEventType = 0 CollectionDrop SystemEventType = iota CollectionFlush SystemEventType = iota // KV did not implement ScopeCreate SystemEventType = iota ScopeDrop SystemEventType = iota CollectionChanged SystemEventType = iota )
const InvalidSysEvent SystemEventType = -1
type TapArguments ¶
type TapArguments struct { // Timestamp of oldest item to send. // // Use TapNoBackfill to suppress all past items. Backfill uint64 // If set, server will disconnect after sending existing items. Dump bool // The indices of the vbuckets to watch; empty/nil to watch all. VBuckets []uint16 // Transfers ownership of vbuckets during cluster rebalance. Takeover bool // If true, server will wait for client ACK after every notification. SupportAck bool // If true, client doesn't want values so server shouldn't send them. KeysOnly bool // If true, client wants the server to send checkpoint events. Checkpoint bool // Optional identifier to use for this client, to allow reconnects ClientName string // Registers this client (by name) till explicitly deregistered. RegisteredClient bool }
TapArguments are parameters for requesting a TAP feed.
Call DefaultTapArguments to get a default one.
func DefaultTapArguments ¶
func DefaultTapArguments() TapArguments
DefaultTapArguments returns a default set of parameter values to pass to StartTapFeed.
type TapEvent ¶
type TapEvent struct { Opcode TapOpcode // Type of event VBucket uint16 // VBucket this event applies to Flags uint32 // Item flags Expiry uint32 // Item expiration time Key, Value []byte // Item key/value Cas uint64 }
TapEvent is a TAP notification of an operation on the server.
type UprEvent ¶
type UprEvent struct { Opcode gomemcached.CommandCode // Type of event Status gomemcached.Status // Response status VBucket uint16 // VBucket this event applies to DataType uint8 // data type Opaque uint16 // 16 MSB of opaque VBuuid uint64 // This field is set by downstream Flags uint32 // Item flags Expiry uint32 // Item expiration time Key, Value []byte // Item key/value OldValue []byte // TODO: TBD: old document value Cas uint64 // CAS value of the item Seqno uint64 // sequence number of the mutation RevSeqno uint64 // rev sequence number : deletions LockTime uint32 // Lock time MetadataSize uint16 // Metadata size SnapstartSeq uint64 // start sequence number of this snapshot SnapendSeq uint64 // End sequence number of the snapshot SnapshotType uint32 // 0: disk 1: memory FailoverLog *FailoverLog // Failover log containing vvuid and sequnce number Error error // Error value in case of a failure ExtMeta []byte // Extended Metadata AckSize uint32 // The number of bytes that can be Acked to DCP SystemEvent SystemEventType // Only valid if IsSystemEvent() is true SysEventVersion uint8 // Based on the version, the way Extra bytes is parsed is different ValueLen int // Cache it to avoid len() calls for performance CollectionId uint32 // Valid if Collection is in use StreamId *uint16 // Nil if not in use }
UprEvent memcached events for UPR streams.
func (*UprEvent) GetCollectionId ¶
func (*UprEvent) GetManifestId ¶
func (*UprEvent) GetOsoBegin ¶ added in v0.1.1
Only if error is nil: Returns true if event states oso begins Return false if event states oso ends
func (*UprEvent) GetScopeId ¶
func (*UprEvent) GetSystemEventName ¶
func (*UprEvent) IsCollectionType ¶
func (*UprEvent) IsOsoSnapshot ¶ added in v0.1.1
func (*UprEvent) IsSeqnoAdv ¶
func (*UprEvent) IsSnappyDataType ¶
func (*UprEvent) IsStreamEnd ¶ added in v0.3.0
func (*UprEvent) IsSystemEvent ¶
func (*UprEvent) PopulateEvent ¶
func (*UprEvent) PopulateFieldsBasedOnStreamType ¶
func (event *UprEvent) PopulateFieldsBasedOnStreamType(rq gomemcached.MCRequest, streamType DcpStreamType)
func (*UprEvent) PopulateOso ¶ added in v0.1.1
func (*UprEvent) PopulateSeqnoAdv ¶
func (*UprEvent) PopulateStreamEndFlags ¶ added in v0.3.0
type UprFeatures ¶
type UprFeatures struct { Xattribute bool CompressionType int IncludeDeletionTime bool DcpPriority PriorityType EnableExpiry bool EnableStreamId bool EnableOso bool SendStreamEndOnClose bool // contains filtered or unexported fields }
func (*UprFeatures) EnableDeadConnDetection ¶ added in v0.3.2
func (f *UprFeatures) EnableDeadConnDetection(thresholdSeconds int) error
Enables client-side dead connection detection. `threshold` should have a minimum value of (2*UPRDefaultNoopInterval).
type UprFeed ¶
type UprFeed struct { C <-chan *UprEvent // Exported channel for receiving UPR events Error error // error // contains filtered or unexported fields }
UprFeed represents an UPR feed. A feed contains a connection to a single host and multiple vBuckets
func (*UprFeed) ClientAck ¶
Client, after completing processing of an UprEvent, need to call this API to notify UprFeed, so that UprFeed can update its ack bytes stats and send ack to DCP if needed Client needs to set ackByClient flag to true in NewUprFeedWithConfig() call as a prerequisite for this call to work This API is not thread safe. Caller should NOT have more than one go rountine calling this API
func (*UprFeed) CloseStream ¶
CloseStream for specified vbucket.
func (*UprFeed) GetUprEventCh ¶
func (*UprFeed) GetUprStats ¶
func (*UprFeed) SetPriorityAsync ¶
func (feed *UprFeed) SetPriorityAsync(p PriorityType) error
func (*UprFeed) StartFeedWithConfig ¶
func (*UprFeed) UprOpen ¶
UprOpen to connect with a UPR producer. Name: name of te UPR connection sequence: sequence number for the connection bufsize: max size of the application
func (*UprFeed) UprOpenWithFeatures ¶
func (feed *UprFeed) UprOpenWithFeatures(name string, sequence uint32, bufSize uint32, features UprFeatures) (error, UprFeatures)
func (*UprFeed) UprOpenWithXATTR ¶
UprOpen with XATTR enabled.
func (*UprFeed) UprRequestCollectionsStream ¶
type UprFeedIface ¶
type UprFeedIface interface { Close() Closed() bool CloseStream(vbno, opaqueMSB uint16) error GetError() error GetUprStats() *UprStats ClientAck(event *UprEvent) error GetUprEventCh() <-chan *UprEvent StartFeed() error StartFeedWithConfig(datachan_len int) error UprOpen(name string, sequence uint32, bufSize uint32) error UprOpenWithXATTR(name string, sequence uint32, bufSize uint32) error UprOpenWithFeatures(name string, sequence uint32, bufSize uint32, features UprFeatures) (error, UprFeatures) UprRequestStream(vbno, opaqueMSB uint16, flags uint32, vuuid, startSequence, endSequence, snapStart, snapEnd uint64) error // Set DCP priority on an existing DCP connection. The command is sent asynchronously without waiting for a response SetPriorityAsync(p PriorityType) error // Various Collection-Type RequestStreams UprRequestCollectionsStream(vbno, opaqueMSB uint16, flags uint32, vbuuid, startSeq, endSeq, snapStart, snapEnd uint64, filter *CollectionsFilter) error }
Exported interface - to allow for mocking
type UprStream ¶
type UprStream struct { Vbucket uint16 // Vbucket id Vbuuid uint64 // vbucket uuid StartSeq uint64 // start sequence number EndSeq uint64 // end sequence number StreamType DcpStreamType // contains filtered or unexported fields }
UprStream is per stream data structure over an UPR Connection.
type VbStateType ¶
type VbStateType uint8
const ( VbAlive VbStateType = 0x00 VbActive VbStateType = 0x01 VbReplica VbStateType = 0x02 VbPending VbStateType = 0x03 VbDead VbStateType = 0x04 )