Documentation ¶
Overview ¶
Package memcached provides a memcached binary protocol client.
Index ¶
- Constants
- Variables
- func GetDcpMemcachedTimeout() uint32
- func SetDcpMemcachedTimeout(val uint32)
- func UnwrapMemcachedError(rv *transport.MCResponse, err error) (*transport.MCResponse, error)
- type Average
- type CASState
- type CasFunc
- type CasOp
- type Client
- func (c *Client) Add(vb uint16, key string, flags int, exp int, body []byte) (*transport.MCResponse, error)
- func (c *Client) Append(vb uint16, key string, data []byte) (*transport.MCResponse, error)
- func (c *Client) Auth(user, pass string) (*transport.MCResponse, error)
- func (c *Client) AuthList() (*transport.MCResponse, error)
- func (c *Client) CAS(vb uint16, k string, f CasFunc, initexp int) (*transport.MCResponse, error)
- func (c *Client) CASNext(vb uint16, k string, exp int, state *CASState) bool
- func (c *Client) Close() error
- func (c *Client) Del(vb uint16, key string) (*transport.MCResponse, error)
- func (c *Client) EnableCollections(clientName string) error
- func (c *Client) Get(vb uint16, key string) (*transport.MCResponse, error)
- func (c *Client) GetBulk(vb uint16, keys []string) (map[string]*transport.MCResponse, error)
- func (c *Client) Hijack() io.ReadWriteCloser
- func (c *Client) Incr(vb uint16, key string, amt, def uint64, exp int) (uint64, error)
- func (c *Client) IsCollectionsEnabled() bool
- func (c Client) IsHealthy() bool
- func (c *Client) Observe(vb uint16, key string) (result ObserveResult, err error)
- func (c *Client) Receive() (*transport.MCResponse, error)
- func (c *Client) ReceiveInBuf(res *transport.MCResponse, buf []byte) error
- func (c *Client) ResetMcdConnectionDeadline()
- func (c *Client) ResetMcdConnectionWriteDeadline()
- func (c *Client) ResetMcdMutationReadDeadline()
- func (c *Client) SelectBucket(bucket string) (*transport.MCResponse, error)
- func (c *Client) Send(req *transport.MCRequest) (rv *transport.MCResponse, err error)
- func (c *Client) Set(vb uint16, key string, flags int, exp int, body []byte) (*transport.MCResponse, error)
- func (c *Client) SetCas(vb uint16, key string, flags int, exp int, cas uint64, body []byte) (*transport.MCResponse, error)
- func (c *Client) SetCollectionsEnabled()
- func (c *Client) SetDeadline(t time.Time) error
- func (c *Client) SetMcdConnectionDeadline()
- func (c *Client) SetMcdConnectionWriteDeadline()
- func (c *Client) SetMcdMutationReadDeadline()
- func (c *Client) SetReadDeadline(t time.Time) error
- func (c *Client) SetWriteDeadline(t time.Time) error
- func (mc *Client) StartTapFeed(args TapArguments) (*TapFeed, error)
- func (c *Client) Stats(key string) ([]StatValue, error)
- func (c *Client) StatsMap(key string) (map[string]string, error)
- func (c *Client) Transmit(req *transport.MCRequest) error
- func (c *Client) TransmitResponse(res *transport.MCResponse) error
- type DcpEvent
- type DcpFeed
- func (feed *DcpFeed) Close() error
- func (feed *DcpFeed) CloseStream(vbno, opaqueMSB uint16) error
- func (feed *DcpFeed) DcpGetFailoverLog(opaque uint16, vblist []uint16) (map[uint16]*FailoverLog, error)
- func (feed *DcpFeed) DcpGetSeqnos() (map[uint16]uint64, error)
- func (feed *DcpFeed) DcpOpen(name string, sequence, flags, bufsize uint32, opaque uint16) error
- func (feed *DcpFeed) DcpRequestStream(vbno, opaqueMSB uint16, flags uint32, ...) error
- func (feed *DcpFeed) Name() string
- type DcpStats
- type DcpStream
- type FailoverLog
- type ObserveResult
- type ObservedStatus
- type StatValue
- type StreamRequestValue
- type TapArguments
- type TapEvent
- type TapFeed
- type TapOpcode
- type XattrVal
Constants ¶
const ( ObservedNotPersisted = ObservedStatus(0x00) // found, not persisted ObservedPersisted = ObservedStatus(0x01) // found, persisted ObservedNotFound = ObservedStatus(0x80) // not found (or a persisted delete) ObservedLogicallyDeleted = ObservedStatus(0x81) // pending deletion (not persisted yet) )
Observation status values.
const ( // CASStore instructs the server to store the new value normally CASStore = CasOp(iota) // CASQuit instructs the client to stop attempting to CAS, leaving value untouched CASQuit // CASDelete instructs the server to delete the current value CASDelete )
const ( TapBeginBackfill = TapOpcode(iota) TapEndBackfill TapMutation TapDeletion TapCheckpointStart TapCheckpointEnd )
Tap opcode values.
const DcpMutationReadTimeout uint32 = 1800
30 minutes
const IncludeXATTRs = uint32(4)
const (
MaxFailoverLogEntries = 20
)
const TapNoBackfill = math.MaxUint64
Value for TapArguments.Backfill denoting that no past events at all should be sent.
Variables ¶
var DcpMemcachedTimeout uint32 = 120
Timeout for memcached communication where client is actively waiting. In Seconds.
var ErrorCollectionsNotEnabled = errors.New("dcp.ErrorCollectionsNotEnabled")
var ErrorConnection = errors.New("dcp.connection")
ErrorConnection
var ErrorEnableCollections = errors.New("dcp.EnableCollections")
ErrorEnableCollections
var ErrorInvalidFeed = errors.New("dcp.invalidFeed")
ErrorInvalidFeed
var ErrorInvalidLog = errors.New("couchbase.errorInvalidLog")
error codes
var ReceiveHook func(*transport.MCResponse, int, error)
ReceiveHook is called after every packet is received (or attempted to be)
var SyncGatewayAttachmentPrefix = []byte("_sync:att")
var SyncGatewayMutationPrefix = []byte("_sync:")
var TapRecvHook func(*transport.MCRequest, int, error)
TapRecvHook is called after every incoming tap packet is received.
var TransactionMutationPrefix = []byte("_txn:")
var TransmitHook func(*transport.MCRequest, int, error)
TransmitHook is called after each packet is transmitted.
Functions ¶
func GetDcpMemcachedTimeout ¶
func GetDcpMemcachedTimeout() uint32
func SetDcpMemcachedTimeout ¶
func SetDcpMemcachedTimeout(val uint32)
func UnwrapMemcachedError ¶
func UnwrapMemcachedError(rv *transport.MCResponse, err error) (*transport.MCResponse, error)
UnwrapMemcachedError converts memcached errors to normal responses.
If the error is a memcached response, declare the error to be nil so a client can handle the status without worrying about whether it indicates success or failure.
Types ¶
type Average ¶
type Average struct {
// contains filtered or unexported fields
}
Average maintains the average and variance of a stream of numbers in a space-efficient manner.
type CASState ¶
type CASState struct { Value []byte // Current value of key; update in place to new value Cas uint64 // Current CAS value of key Exists bool // Does a value exist for the key? (If not, Value will be nil) Err error // Error, if any, after CASNext returns false // contains filtered or unexported fields }
CASState tracks the state of CAS over several operations.
This is used directly by CASNext and indirectly by CAS
type CasFunc ¶
CasFunc is type type of function to perform a CAS transform.
Input is the current value, or nil if no value exists. The function should return the new value (if any) to set, and the store/quit/delete operation.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
The Client itself.
func ConnectTLS ¶
Connect to a memcached server using TLS.
func (*Client) Add ¶
func (c *Client) Add(vb uint16, key string, flags int, exp int, body []byte) (*transport.MCResponse, error)
Add a value for a key (store if not exists).
func (*Client) Auth ¶
func (c *Client) Auth(user, pass string) (*transport.MCResponse, error)
Auth performs SASL PLAIN authentication against the server.
func (*Client) AuthList ¶
func (c *Client) AuthList() (*transport.MCResponse, error)
AuthList lists SASL auth mechanisms.
func (*Client) CAS ¶
CAS performs a CAS transform with the given function.
If the value does not exist, a nil current value will be sent to f.
func (*Client) CASNext ¶
CASNext is a non-callback, loop-based version of CAS method.
Usage is like this:
var state memcached.CASState
for client.CASNext(vb, key, exp, &state) { state.Value = some_mutation(state.Value) }
if state.Err != nil { ... }
func (*Client) EnableCollections ¶
func (*Client) Hijack ¶
func (c *Client) Hijack() io.ReadWriteCloser
Hijack exposes the underlying connection from this client.
It also marks the connection as unhealthy since the client will have lost control over the connection and can't otherwise verify things are in good shape for connection pools.
func (*Client) IsCollectionsEnabled ¶
func (Client) IsHealthy ¶
IsHealthy returns true unless the client is belived to have difficulty communicating to its server.
This is useful for connection pools where we want to non-destructively determine that a connection may be reused.
func (*Client) Observe ¶
func (c *Client) Observe(vb uint16, key string) (result ObserveResult, err error)
Observe gets the persistence/replication/CAS state of a key
func (*Client) Receive ¶
func (c *Client) Receive() (*transport.MCResponse, error)
Receive a response
func (*Client) ReceiveInBuf ¶
func (c *Client) ReceiveInBuf(res *transport.MCResponse, buf []byte) error
func (*Client) ResetMcdConnectionDeadline ¶
func (c *Client) ResetMcdConnectionDeadline()
Reset Memcached Connection Deadline. Log the error in SetDeadline, if any. If SetMcdConnectionDeadline was successful, and ResetMcdConnectionDeadline fails, the connection may get closed during some other IO operation. So, report the error if any. No need to explicitly fail the product workflow.
func (*Client) ResetMcdConnectionWriteDeadline ¶
func (c *Client) ResetMcdConnectionWriteDeadline()
Reset Memcached Connection WriteDeadline.
func (*Client) ResetMcdMutationReadDeadline ¶
func (c *Client) ResetMcdMutationReadDeadline()
Reset Memcached Connection ReadDeadline.
func (*Client) SelectBucket ¶
func (c *Client) SelectBucket(bucket string) (*transport.MCResponse, error)
SelectBucket for this connection.
func (*Client) Set ¶
func (c *Client) Set(vb uint16, key string, flags int, exp int, body []byte) (*transport.MCResponse, error)
Set the value for a key.
func (*Client) SetCas ¶
func (c *Client) SetCas(vb uint16, key string, flags int, exp int, cas uint64, body []byte) (*transport.MCResponse, error)
SetCas set the value for a key with cas
func (*Client) SetCollectionsEnabled ¶
func (c *Client) SetCollectionsEnabled()
func (*Client) SetMcdConnectionDeadline ¶
func (c *Client) SetMcdConnectionDeadline()
Set Memcached Connection Deadline. Ignore the error in SetDeadline, if any. There are no side effects in SetDeadline error codepaths.
func (*Client) SetMcdConnectionWriteDeadline ¶
func (c *Client) SetMcdConnectionWriteDeadline()
Set Memcached Connection WriteDeadline.
func (*Client) SetMcdMutationReadDeadline ¶
func (c *Client) SetMcdMutationReadDeadline()
Set Memcached Connection ReadDeadline.
func (*Client) StartTapFeed ¶
func (mc *Client) StartTapFeed(args TapArguments) (*TapFeed, error)
StartTapFeed starts a TAP feed on a client connection.
The events can be read from the returned channel. The connection can no longer be used for other purposes; it's now reserved for receiving the TAP messages. To stop receiving events, close the client connection.
func (*Client) StatsMap ¶
StatsMap requests server-side stats similarly to Stats, but returns them as a map.
Use "" as the stat key for toplevel stats.
func (*Client) TransmitResponse ¶
func (c *Client) TransmitResponse(res *transport.MCResponse) error
TransmitResponse send a request, but does not wait for a response.
type DcpEvent ¶
type DcpEvent struct { Opcode transport.CommandCode // Type of event Status transport.Status // Response status Datatype uint8 // Datatype per binary protocol VBucket uint16 // VBucket this event applies to Opaque uint16 // 16 MSB of opaque VBuuid uint64 // This field is set by downstream Key, Value []byte // Item key/value OldValue []byte // TODO: TBD: old document value Cas uint64 // CAS value of the item CollectionID uint32 // Collection Id ScopeID uint32 // Scope Id SystemXattrs map[string]XattrVal // System Xattrs map UserXattrs map[string]XattrVal // User Xattrs map EventType transport.CollectionEvent // For DCP_SYSTEM_EVENT, DCP_OSO_SNAPSHOT types // meta fields Seqno uint64 // seqno. of the mutation, doubles as rollback-seqno // https://issues.couchbase.com/browse/MB-15333, RevSeqno uint64 Flags uint32 // Item flags Expiry uint32 // Item expiration time LockTime uint32 Nru byte // snapshots SnapstartSeq uint64 // start sequence number of this snapshot SnapendSeq uint64 // End sequence number of the snapshot SnapshotType uint32 // 0: disk 1: memory // collections ManifestUID []byte // For DCP_SYSTEM_EVENT // failoverlog FailoverLog *FailoverLog // Failover log containing vvuid and sequnce number Error error // Error value in case of a failure // stats Ctime int64 }
DcpEvent memcached events for DCP streams.
type DcpFeed ¶
type DcpFeed struct {
// contains filtered or unexported fields
}
DcpFeed represents an DCP feed. A feed contains a connection to a single host and multiple vBuckets
func NewDcpFeed ¶
func NewDcpFeed( mc *Client, name string, outch chan<- *DcpEvent, opaque uint16, config map[string]interface{}) (*DcpFeed, error)
NewDcpFeed creates a new DCP Feed.
func (*DcpFeed) CloseStream ¶
CloseStream for specified vbucket.
func (*DcpFeed) DcpGetFailoverLog ¶
func (feed *DcpFeed) DcpGetFailoverLog( opaque uint16, vblist []uint16) (map[uint16]*FailoverLog, error)
DcpGetFailoverLog for given list of vbuckets.
func (*DcpFeed) DcpGetSeqnos ¶
DcpGetSeqnos for vbuckets hosted by this node.
func (*DcpFeed) DcpOpen ¶
DcpOpen to connect with a DCP producer. Name: name of te DCP connection sequence: sequence number for the connection bufsize: max size of the application
type DcpStats ¶
type DcpStats struct { TotalBufferAckSent uint64 TotalBytes uint64 TotalCloseStream uint64 TotalMutation uint64 TotalSnapShot uint64 TotalStreamReq uint64 TotalStreamEnd uint64 LastAckTime int64 }
DcpStats on mutations/snapshots/buff-acks.
type DcpStream ¶
type DcpStream struct { AppOpaque uint16 CloseOpaque uint16 Vbucket uint16 // Vbucket id Vbuuid uint64 // vbucket uuid Seqno uint64 StartSeq uint64 // start sequence number EndSeq uint64 // end sequence number Snapstart uint64 Snapend uint64 LastSeen int64 // UnixNano value of last seen // contains filtered or unexported fields }
DcpStream is per stream data structure over an DCP Connection.
type FailoverLog ¶
type FailoverLog [][2]uint64
FailoverLog containing vvuid and sequnce number
func (*FailoverLog) Latest ¶
func (flogp *FailoverLog) Latest() (vbuuid, seqno uint64, err error)
Latest will return the recent vbuuid and its high-seqno.
func (*FailoverLog) PopAndGetLatest ¶
func (flogp *FailoverLog) PopAndGetLatest() (vbuuid, seqno uint64, err error)
func (*FailoverLog) PopTillSeqNo ¶
func (flogp *FailoverLog) PopTillSeqNo(desiredSeqNo uint64) (vbuuid, seqno uint64, err error)
type ObserveResult ¶
type ObserveResult struct { Status ObservedStatus // Whether the value has been persisted/deleted Cas uint64 // Current value's CAS PersistenceTime time.Duration // Node's average time to persist a value ReplicationTime time.Duration // Node's average time to replicate a value }
ObserveResult represents the data obtained by an Observe call
func (ObserveResult) CheckPersistence ¶
func (result ObserveResult) CheckPersistence(cas uint64, deletion bool) (persisted bool, overwritten bool)
CheckPersistence checks whether a stored value has been persisted to disk yet.
type ObservedStatus ¶
type ObservedStatus uint8
ObservedStatus is the type reported by the Observe method
type StreamRequestValue ¶
type TapArguments ¶
type TapArguments struct { // Timestamp of oldest item to send. // // Use TapNoBackfill to suppress all past items. Backfill uint64 // If set, server will disconnect after sending existing items. Dump bool // The indices of the vbuckets to watch; empty/nil to watch all. VBuckets []uint16 // Transfers ownership of vbuckets during cluster rebalance. Takeover bool // If true, server will wait for client ACK after every notification. SupportAck bool // If true, client doesn't want values so server shouldn't send them. KeysOnly bool // If true, client wants the server to send checkpoint events. Checkpoint bool // Optional identifier to use for this client, to allow reconnects ClientName string // Registers this client (by name) till explicitly deregistered. RegisteredClient bool }
TapArguments are parameters for requesting a TAP feed.
Call DefaultTapArguments to get a default one.
func DefaultTapArguments ¶
func DefaultTapArguments() TapArguments
DefaultTapArguments returns a default set of parameter values to pass to StartTapFeed.
type TapEvent ¶
type TapEvent struct { Opcode TapOpcode // Type of event VBucket uint16 // VBucket this event applies to Flags uint32 // Item flags Expiry uint32 // Item expiration time Key, Value []byte // Item key/value Cas uint64 }
TapEvent is a TAP notification of an operation on the server.