memcached

package
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 18, 2024 License: MIT Imports: 22 Imported by: 215

Documentation

Overview

Package memcached provides a memcached binary protocol client.

go implementation of upr client. See https://github.com/couchbaselabs/cbupr/blob/master/transport-spec.md TODO 1. Use a pool allocator to avoid garbage

Index

Constants

View Source
const (
	FeatureTcpNoDelay        = Feature(0x03)
	FeatureMutationToken     = Feature(0x04) // XATTR bit in data type field with dcp mutations
	FeatureXattr             = Feature(0x06)
	FeatureXerror            = Feature(0x07)
	FeatureSnappyCompression = Feature(0x0a)
	FeatureDataType          = Feature(0x0b)
	FeatureTracing           = Feature(0x0f)
	FeatureSyncReplication   = Feature(0x11)
	FeatureCollections       = Feature(0x12)
	FeatureOpenTracing       = Feature(0x13)
	FeaturePreserveExpiry    = Feature(0x14)
	FeatureComputeUnits      = Feature(0x1a)
	FeatureHandleThrottle    = Feature(0x1b)
)
View Source
const (
	ObservedNotPersisted     = ObservedStatus(0x00) // found, not persisted
	ObservedPersisted        = ObservedStatus(0x01) // found, persisted
	ObservedNotFound         = ObservedStatus(0x80) // not found (or a persisted delete)
	ObservedLogicallyDeleted = ObservedStatus(0x81) // pending deletion (not persisted yet)
)

Observation status values.

View Source
const (
	// CASStore instructs the server to store the new value normally
	CASStore = CasOp(iota)
	// CASQuit instructs the client to stop attempting to CAS, leaving value untouched
	CASQuit
	// CASDelete instructs the server to delete the current value
	CASDelete
)
View Source
const (
	TapBeginBackfill = TapOpcode(iota)
	TapEndBackfill
	TapMutation
	TapDeletion
	TapCheckpointStart
	TapCheckpointEnd
)

Tap opcode values.

View Source
const (
	FeedStateInitial = iota
	FeedStateOpened  = iota
	FeedStateClosed  = iota
)
View Source
const (
	CompressionTypeStartMarker = iota // also means invalid
	CompressionTypeNone        = iota
	CompressionTypeSnappy      = iota
	CompressionTypeEndMarker   = iota // also means invalid
)
View Source
const (
	JSONDataType   uint8 = 1
	SnappyDataType uint8 = 2
	XattrDataType  uint8 = 4
)

kv_engine/include/mcbp/protocol/datatype.h

View Source
const RandomScanSeed = 0x5eedbead
View Source
const StreamNotRequested = "has not been requested"
View Source
const TapNoBackfill = math.MaxUint64

Value for TapArguments.Backfill denoting that no past events at all should be sent.

View Source
const UPRDefaultNoopIntervalSeconds = 120

Variables

View Source
var (
	ErrUnSuccessfulHello          = errors.New("Unsuccessful HELLO exchange")
	ErrInvalidHello               = errors.New("Invalid HELLO response")
	ErrPreserveExpiryNotSupported = errors.New("PreserveExpiry is not supported")
	ErrDurabilityNotSupported     = errors.New("Durability is not supported")
)
View Source
var (
	// ConnName is used if Client.connName is not set
	ConnName           = "GoMemcached"
	DefaultDialTimeout = time.Duration(0) // No timeout

	DefaultWriteTimeout = time.Duration(0) // No timeout

)
View Source
var ErrorInvalidLog = errors.New("couchbase.errorInvalidLog")

error codes

View Source
var ErrorInvalidOp error = fmt.Errorf("Invalid Operation")
View Source
var ErrorInvalidVersion error = fmt.Errorf("Invalid version for parsing")
View Source
var ErrorNoMaxTTL error = fmt.Errorf("This event has no max TTL")
View Source
var ErrorValueTooShort error = fmt.Errorf("Value length is too short")
View Source
var Healthy uint32 = 1
View Source
var ReceiveHook func(*gomemcached.MCResponse, int, error)

ReceiveHook is called after every packet is received (or attempted to be)

View Source
var TapRecvHook func(*gomemcached.MCRequest, int, error)

TapRecvHook is called after every incoming tap packet is received.

View Source
var TransmitHook func(*gomemcached.MCRequest, int, error)

TransmitHook is called after each packet is transmitted.

View Source
var UnHealthy uint32 = 0

Functions

func GetSubDocVal

func GetSubDocVal(subPaths []string, context []*ClientContext) (extraBuf, valueBuf []byte)

func IfResStatusError

func IfResStatusError(response *gomemcached.MCResponse) bool

func SetConnectionName added in v0.3.0

func SetConnectionName(name string)

func SetDefaultDialTimeout

func SetDefaultDialTimeout(dial time.Duration)

func SetDefaultTimeouts

func SetDefaultTimeouts(dial, read, write time.Duration)

func UnwrapMemcachedError

func UnwrapMemcachedError(rv *gomemcached.MCResponse,
	err error) (*gomemcached.MCResponse, error)

UnwrapMemcachedError converts memcached errors to normal responses.

If the error is a memcached response, declare the error to be nil so a client can handle the status without worrying about whether it indicates success or failure.

Types

type CASState

type CASState struct {
	Value  []byte // Current value of key; update in place to new value
	Cas    uint64 // Current CAS value of key
	Exists bool   // Does a value exist for the key? (If not, Value will be nil)
	Err    error  // Error, if any, after CASNext returns false
	// contains filtered or unexported fields
}

CASState tracks the state of CAS over several operations.

This is used directly by CASNext and indirectly by CAS

type CasFunc

type CasFunc func(current []byte) ([]byte, CasOp)

CasFunc is type type of function to perform a CAS transform.

Input is the current value, or nil if no value exists. The function should return the new value (if any) to set, and the store/quit/delete operation.

type CasOp

type CasOp uint8

CasOp is the type of operation to perform on this CAS loop.

func (CasOp) Error

func (c CasOp) Error() string

User specified termination is returned as an error.

type Client

type Client struct {
	// contains filtered or unexported fields
}

The Client itself.

func Connect

func Connect(prot, dest string) (rv *Client, err error)

Connect to a memcached server.

func ConnectTLS

func ConnectTLS(prot, dest string, config *tls.Config) (rv *Client, err error)

Connect to a memcached server using TLS.

func Wrap

func Wrap(conn MemcachedConnection) (rv *Client, err error)

Wrap an existing transport.

func (*Client) Add

func (c *Client) Add(vb uint16, key string, flags int, exp int,
	body []byte, context ...*ClientContext) (*gomemcached.MCResponse, error)

Add a value for a key (store if not exists).

func (*Client) Append

func (c *Client) Append(vb uint16, key string, data []byte, context ...*ClientContext) (*gomemcached.MCResponse, error)

Append data to the value of a key.

func (*Client) Auth

func (c *Client) Auth(user, pass string) (*gomemcached.MCResponse, error)

Auth performs SASL PLAIN authentication against the server.

func (*Client) AuthList

func (c *Client) AuthList() (*gomemcached.MCResponse, error)

AuthList lists SASL auth mechanisms.

func (*Client) AuthPlain

func (c *Client) AuthPlain(user, pass string) (*gomemcached.MCResponse, error)

func (*Client) AuthScramSha

func (c *Client) AuthScramSha(user, pass string) (*gomemcached.MCResponse, error)

AuthScramSha performs SCRAM-SHA authentication against the server.

func (*Client) CAS

func (c *Client) CAS(vb uint16, k string, f CasFunc,
	initexp int) (*gomemcached.MCResponse, error)

CAS performs a CAS transform with the given function.

If the value does not exist, a nil current value will be sent to f.

func (*Client) CASNext

func (c *Client) CASNext(vb uint16, k string, exp int, state *CASState) bool

CASNext is a non-callback, loop-based version of CAS method.

Usage is like this:

var state memcached.CASState

for client.CASNext(vb, key, exp, &state) {
    state.Value = some_mutation(state.Value)
}

if state.Err != nil { ... }

func (*Client) CancelRangeScan added in v0.2.1

func (c *Client) CancelRangeScan(vb uint16, uuid []byte, opaque uint32, context ...*ClientContext) (
	*gomemcached.MCResponse, error)

func (*Client) Close

func (c *Client) Close() error

Close the connection when you're done.

func (*Client) CollectionEnabled

func (c *Client) CollectionEnabled() bool

func (*Client) CollectionsGetCID

func (c *Client) CollectionsGetCID(scope string, collection string) (*gomemcached.MCResponse, error)

Retrieve the collections manifest.

func (*Client) Conn

func (c *Client) Conn() io.ReadWriteCloser

func (*Client) ContinueRangeScan added in v0.2.1

func (c *Client) ContinueRangeScan(vb uint16, uuid []byte, opaque uint32, items uint32, timeout uint32, maxSize uint32,
	context ...*ClientContext) error

func (*Client) CreateRandomScan added in v0.2.1

func (c *Client) CreateRandomScan(vb uint16, sampleSize int, withDocs bool, context ...*ClientContext) (
	*gomemcached.MCResponse, error)

func (*Client) CreateRangeScan added in v0.2.1

func (c *Client) CreateRangeScan(vb uint16, start []byte, excludeStart bool, end []byte, excludeEnd bool,
	withDocs bool, context ...*ClientContext) (*gomemcached.MCResponse, error)

func (*Client) Decr

func (c *Client) Decr(vb uint16, key string,
	amt, def uint64, exp int, context ...*ClientContext) (uint64, error)

Decr decrements the value at the given key.

func (*Client) Del

func (c *Client) Del(vb uint16, key string, context ...*ClientContext) (*gomemcached.MCResponse, error)

Del deletes a key.

func (*Client) EnableDataPool added in v0.3.1

func (c *Client) EnableDataPool(getter func(uint64) ([]byte, error), doneCb func([]byte)) error

func (*Client) EnableFeatures

func (c *Client) EnableFeatures(features Features) (*gomemcached.MCResponse, error)

Send a hello command to enable specific features

func (*Client) EnableMutationToken

func (c *Client) EnableMutationToken() (*gomemcached.MCResponse, error)

Send a hello command to enable MutationTokens

func (*Client) Get

func (c *Client) Get(vb uint16, key string, context ...*ClientContext) (*gomemcached.MCResponse, error)

Get the value for a key.

func (*Client) GetAllVbSeqnos

func (c *Client) GetAllVbSeqnos(vbSeqnoMap map[uint16]uint64, context ...*ClientContext) (map[uint16]uint64, error)

Since the binary request supports only a single collection at a time, it is possible that this may be called multiple times in succession by callers to get vbSeqnos for multiple collections. Thus, caller could pass in a non-nil map so the gomemcached client won't need to allocate new map for each call to prevent too much GC NOTE: If collection is enabled and context is not given, KV will still return stats for default collection

func (*Client) GetAndTouch

func (c *Client) GetAndTouch(vb uint16, key string, exp int, context ...*ClientContext) (*gomemcached.MCResponse, error)

Get the value for a key, and update expiry

func (*Client) GetBulk

func (c *Client) GetBulk(vb uint16, keys []string, rv map[string]*gomemcached.MCResponse, subPaths []string, context ...*ClientContext) error

GetBulk gets keys in bulk

func (*Client) GetCollectionsManifest

func (c *Client) GetCollectionsManifest() (*gomemcached.MCResponse, error)

Retrieve the collections manifest.

func (*Client) GetConnName added in v0.3.0

func (c *Client) GetConnName() string

func (*Client) GetErrorMap added in v0.3.0

func (c *Client) GetErrorMap(errMapVersion gomemcached.ErrorMapVersion) (map[string]interface{}, error)

func (*Client) GetMeta

func (c *Client) GetMeta(vb uint16, key string, context ...*ClientContext) (*gomemcached.MCResponse, error)

Get metadata for a key

func (*Client) GetRandomDoc

func (c *Client) GetRandomDoc(context ...*ClientContext) (*gomemcached.MCResponse, error)

Get a random document

func (*Client) GetSubdoc

func (c *Client) GetSubdoc(vb uint16, key string, subPaths []string, context ...*ClientContext) (*gomemcached.MCResponse, error)

Get the xattrs, doc value for the input key

func (*Client) Hijack

func (c *Client) Hijack() MemcachedConnection

Hijack exposes the underlying connection from this client.

It also marks the connection as unhealthy since the client will have lost control over the connection and can't otherwise verify things are in good shape for connection pools.

func (*Client) Incr

func (c *Client) Incr(vb uint16, key string,
	amt, def uint64, exp int, context ...*ClientContext) (uint64, error)

Incr increments the value at the given key.

func (*Client) IsFeatureEnabled added in v0.1.4

func (c *Client) IsFeatureEnabled(feature Feature) bool

func (Client) IsHealthy

func (c Client) IsHealthy() bool

IsHealthy returns true unless the client is belived to have difficulty communicating to its server.

This is useful for connection pools where we want to non-destructively determine that a connection may be reused.

func (*Client) LastBucket added in v0.1.2

func (c *Client) LastBucket() string

func (*Client) NewUprFeed

func (mc *Client) NewUprFeed() (*UprFeed, error)

NewUprFeed creates a new UPR Feed. TODO: Describe side-effects on bucket instance and its connection pool.

func (*Client) NewUprFeedIface

func (mc *Client) NewUprFeedIface() (UprFeedIface, error)

func (*Client) NewUprFeedWithConfig

func (mc *Client) NewUprFeedWithConfig(ackByClient bool) (*UprFeed, error)

func (*Client) NewUprFeedWithConfigIface

func (mc *Client) NewUprFeedWithConfigIface(ackByClient bool) (UprFeedIface, error)

func (*Client) Observe

func (c *Client) Observe(vb uint16, key string) (result ObserveResult, err error)

Observe gets the persistence/replication/CAS state of a key

func (*Client) ObserveSeq

func (c *Client) ObserveSeq(vb uint16, vbuuid uint64) (result *ObserveSeqResult, err error)

func (*Client) Receive

func (c *Client) Receive() (*gomemcached.MCResponse, error)

Receive a response

func (*Client) ReceiveWithDeadline

func (c *Client) ReceiveWithDeadline(deadline time.Time) (*gomemcached.MCResponse, error)

func (*Client) Replica added in v0.3.0

func (c *Client) Replica() bool

func (*Client) SelectBucket

func (c *Client) SelectBucket(bucket string) (*gomemcached.MCResponse, error)

select bucket

func (*Client) Send

func (c *Client) Send(req *gomemcached.MCRequest) (rv *gomemcached.MCResponse, err error)

Send a custom request and get the response.

func (*Client) Set

func (c *Client) Set(vb uint16, key string, flags int, exp int,
	body []byte, context ...*ClientContext) (*gomemcached.MCResponse, error)

Set the value for a key.

func (*Client) SetCas

func (c *Client) SetCas(vb uint16, key string, flags int, exp int, cas uint64,
	body []byte, context ...*ClientContext) (*gomemcached.MCResponse, error)

SetCas set the value for a key with cas

func (*Client) SetConnName added in v0.3.0

func (c *Client) SetConnName(name string)

func (*Client) SetDeadline

func (c *Client) SetDeadline(t time.Time)

func (*Client) SetKeepAliveOptions

func (c *Client) SetKeepAliveOptions(interval time.Duration)

func (*Client) SetReadDeadline

func (c *Client) SetReadDeadline(t time.Time)

func (*Client) SetReplica added in v0.1.4

func (c *Client) SetReplica(r bool)

Read from replica setting

func (*Client) SetSubdoc added in v0.3.0

func (c *Client) SetSubdoc(vb uint16, key string, ops []SubDocOp, addOnly bool, exp int, cas uint64, context ...*ClientContext) (
	*gomemcached.MCResponse, error)

func (*Client) StartTapFeed

func (mc *Client) StartTapFeed(args TapArguments) (*TapFeed, error)

StartTapFeed starts a TAP feed on a client connection.

The events can be read from the returned channel. The connection can no longer be used for other purposes; it's now reserved for receiving the TAP messages. To stop receiving events, close the client connection.

func (*Client) Stats

func (c *Client) Stats(key string) ([]StatValue, error)

Stats requests server-side stats.

Use "" as the stat key for toplevel stats.

func (*Client) StatsFunc added in v0.1.2

func (c *Client) StatsFunc(key string, fn func(key, val []byte)) error

Stats requests server-side stats.

Use "" as the stat key for toplevel stats.

func (*Client) StatsMap

func (c *Client) StatsMap(key string) (map[string]string, error)

StatsMap requests server-side stats similarly to Stats, but returns them as a map.

Use "" as the stat key for toplevel stats.

func (*Client) StatsMapForSpecifiedStats

func (c *Client) StatsMapForSpecifiedStats(key string, statsMap map[string]string) error

instead of returning a new statsMap, simply populate passed in statsMap, which contains all the keys for which stats needs to be retrieved

func (*Client) Transmit

func (c *Client) Transmit(req *gomemcached.MCRequest) error

Transmit send a request, but does not wait for a response.

func (*Client) TransmitResponse

func (c *Client) TransmitResponse(res *gomemcached.MCResponse) error

TransmitResponse send a response, does not wait.

func (*Client) TransmitWithDeadline

func (c *Client) TransmitWithDeadline(req *gomemcached.MCRequest, deadline time.Time) error

func (*Client) UprGetFailoverLog

func (mc *Client) UprGetFailoverLog(vb []uint16) (map[uint16]*FailoverLog, error)

UprGetFailoverLog for given list of vbuckets.

func (*Client) ValidateKey added in v0.2.1

func (c *Client) ValidateKey(vb uint16, key string, context ...*ClientContext) (bool, error)

type ClientContext

type ClientContext struct {
	// Collection-based context
	CollId uint32

	// Impersonate context
	User string

	// VB-state related context
	// nil means not used in this context
	VbState *VbStateType

	// Preserve Expiry
	PreserveExpiry bool

	// Durability Level
	DurabilityLevel gomemcached.DurabilityLvl

	// Durability Timeout
	DurabilityTimeout time.Duration

	// Data is JSON in snappy compressed format
	Compressed bool

	// Sub-doc paths are document fields (not XATTRs)
	DocumentSubDocPaths bool

	// Include XATTRs in random document retrieval
	IncludeXATTRs bool
}

func (*ClientContext) Copy added in v0.3.0

func (this *ClientContext) Copy() *ClientContext

func (*ClientContext) InitExtras

func (context *ClientContext) InitExtras(req *gomemcached.MCRequest, client *Client)

type ClientIface

type ClientIface interface {
	Add(vb uint16, key string, flags int, exp int, body []byte, context ...*ClientContext) (*gomemcached.MCResponse, error)
	Append(vb uint16, key string, data []byte, context ...*ClientContext) (*gomemcached.MCResponse, error)
	Auth(user, pass string) (*gomemcached.MCResponse, error)
	AuthList() (*gomemcached.MCResponse, error)
	AuthPlain(user, pass string) (*gomemcached.MCResponse, error)
	AuthScramSha(user, pass string) (*gomemcached.MCResponse, error)
	CASNext(vb uint16, k string, exp int, state *CASState) bool
	CAS(vb uint16, k string, f CasFunc, initexp int) (*gomemcached.MCResponse, error)
	CollectionsGetCID(scope string, collection string) (*gomemcached.MCResponse, error)
	CollectionEnabled() bool
	Close() error
	Decr(vb uint16, key string, amt, def uint64, exp int, context ...*ClientContext) (uint64, error)
	Del(vb uint16, key string, context ...*ClientContext) (*gomemcached.MCResponse, error)
	EnableMutationToken() (*gomemcached.MCResponse, error)
	EnableFeatures(features Features) (*gomemcached.MCResponse, error)
	EnableDataPool(getter func(uint64) ([]byte, error), doneCb func([]byte)) error
	Get(vb uint16, key string, context ...*ClientContext) (*gomemcached.MCResponse, error)
	GetAllVbSeqnos(vbSeqnoMap map[uint16]uint64, context ...*ClientContext) (map[uint16]uint64, error)
	GetAndTouch(vb uint16, key string, exp int, context ...*ClientContext) (*gomemcached.MCResponse, error)
	GetBulk(vb uint16, keys []string, rv map[string]*gomemcached.MCResponse, subPaths []string, context ...*ClientContext) error
	GetCollectionsManifest() (*gomemcached.MCResponse, error)
	GetMeta(vb uint16, key string, context ...*ClientContext) (*gomemcached.MCResponse, error)
	GetRandomDoc(context ...*ClientContext) (*gomemcached.MCResponse, error)
	GetSubdoc(vb uint16, key string, subPaths []string, context ...*ClientContext) (*gomemcached.MCResponse, error)
	SetSubdoc(vb uint16, key string, ops []SubDocOp, addOnly bool, exp int, cas uint64, context ...*ClientContext) (
		*gomemcached.MCResponse, error)
	Hijack() MemcachedConnection
	Incr(vb uint16, key string, amt, def uint64, exp int, context ...*ClientContext) (uint64, error)
	LastBucket() string
	Observe(vb uint16, key string) (result ObserveResult, err error)
	ObserveSeq(vb uint16, vbuuid uint64) (result *ObserveSeqResult, err error)
	Receive() (*gomemcached.MCResponse, error)
	ReceiveWithDeadline(deadline time.Time) (*gomemcached.MCResponse, error)
	Replica() bool
	Send(req *gomemcached.MCRequest) (rv *gomemcached.MCResponse, err error)
	Set(vb uint16, key string, flags int, exp int, body []byte, context ...*ClientContext) (*gomemcached.MCResponse, error)
	SetKeepAliveOptions(interval time.Duration)
	SetReadDeadline(t time.Time)
	SetDeadline(t time.Time)
	SetReplica(r bool)
	SelectBucket(bucket string) (*gomemcached.MCResponse, error)
	SetCas(vb uint16, key string, flags int, exp int, cas uint64, body []byte, context ...*ClientContext) (
		*gomemcached.MCResponse, error)
	Stats(key string) ([]StatValue, error)
	StatsFunc(key string, fn func(key, val []byte)) error
	StatsMap(key string) (map[string]string, error)
	StatsMapForSpecifiedStats(key string, statsMap map[string]string) error
	Transmit(req *gomemcached.MCRequest) error
	TransmitWithDeadline(req *gomemcached.MCRequest, deadline time.Time) error
	TransmitResponse(res *gomemcached.MCResponse) error
	UprGetFailoverLog(vb []uint16) (map[uint16]*FailoverLog, error)
	GetConnName() string
	SetConnName(name string)

	// UprFeed Related
	NewUprFeed() (*UprFeed, error)
	NewUprFeedIface() (UprFeedIface, error)
	NewUprFeedWithConfig(ackByClient bool) (*UprFeed, error)
	NewUprFeedWithConfigIface(ackByClient bool) (UprFeedIface, error)

	CreateRangeScan(vb uint16, start []byte, excludeStart bool, end []byte, excludeEnd bool, withDocs bool,
		context ...*ClientContext) (*gomemcached.MCResponse, error)
	CreateRandomScan(vb uint16, sampleSize int, withDocs bool, context ...*ClientContext) (*gomemcached.MCResponse, error)
	ContinueRangeScan(vb uint16, uuid []byte, opaque uint32, items uint32, maxSize uint32, timeout uint32,
		context ...*ClientContext) error
	CancelRangeScan(vb uint16, uuid []byte, opaque uint32, context ...*ClientContext) (*gomemcached.MCResponse, error)

	ValidateKey(vb uint16, key string, context ...*ClientContext) (bool, error)

	GetErrorMap(errMapVersion gomemcached.ErrorMapVersion) (map[string]interface{}, error)
}

type CollectionChangedEvent

type CollectionChangedEvent interface {
	GetCollectionId() (uint32, error)
	GetManifestId() (uint64, error)
	GetMaxTTL() (uint32, error)
}

type CollectionCreateEvent

type CollectionCreateEvent interface {
	GetSystemEventName() (string, error)
	GetScopeId() (uint32, error)
	GetCollectionId() (uint32, error)
	GetManifestId() (uint64, error)
	GetMaxTTL() (uint32, error)
}

type CollectionDropEvent

type CollectionDropEvent interface {
	GetScopeId() (uint32, error)
	GetCollectionId() (uint32, error)
	GetManifestId() (uint64, error)
}

type CollectionsFilter

type CollectionsFilter struct {
	ManifestUid    uint64
	UseManifestUid bool
	StreamId       uint16
	UseStreamId    bool

	// Use either ScopeId OR CollectionsList, not both
	CollectionsList []uint32
	ScopeId         uint32
}

Collection based filter

func (*CollectionsFilter) IsValid

func (c *CollectionsFilter) IsValid() error

func (*CollectionsFilter) ToStreamReqBody

func (c *CollectionsFilter) ToStreamReqBody() ([]byte, error)

type DcpStreamType

type DcpStreamType int32
const (
	NonCollectionStream    DcpStreamType = 0
	CollectionsNonStreamId DcpStreamType = iota
	CollectionsStreamId    DcpStreamType = iota
)
var UninitializedStream DcpStreamType = -1

func (DcpStreamType) String

func (t DcpStreamType) String() string

type FailoverLog

type FailoverLog [][2]uint64

FailoverLog containing vvuid and sequnce number

func (*FailoverLog) Latest

func (flogp *FailoverLog) Latest() (vbuuid, seqno uint64, err error)

type Feature

type Feature uint16

type Features

type Features []Feature

type FeedState

type FeedState int

func (FeedState) String

func (fs FeedState) String() string

type MemcachedConnection added in v0.3.2

type MemcachedConnection interface {
	io.ReadWriteCloser

	SetReadDeadline(time.Time) error
	SetDeadline(time.Time) error
}

type ObserveResult

type ObserveResult struct {
	Status          ObservedStatus // Whether the value has been persisted/deleted
	Cas             uint64         // Current value's CAS
	PersistenceTime time.Duration  // Node's average time to persist a value
	ReplicationTime time.Duration  // Node's average time to replicate a value
}

ObserveResult represents the data obtained by an Observe call

func (ObserveResult) CheckPersistence

func (result ObserveResult) CheckPersistence(cas uint64, deletion bool) (persisted bool, overwritten bool)

CheckPersistence checks whether a stored value has been persisted to disk yet.

type ObserveSeqResult

type ObserveSeqResult struct {
	Failover           uint8  // Set to 1 if a failover took place
	VbId               uint16 // vbucket id
	Vbuuid             uint64 // vucket uuid
	LastPersistedSeqNo uint64 // last persisted sequence number
	CurrentSeqNo       uint64 // current sequence number
	OldVbuuid          uint64 // Old bucket vbuuid
	LastSeqNo          uint64 // last sequence number received before failover
}

Sequence number based Observe Implementation

type ObservedStatus

type ObservedStatus uint8

ObservedStatus is the type reported by the Observe method

type PriorityType

type PriorityType string
const (
	PriorityDisabled PriorityType = ""
	PriorityLow      PriorityType = "low"
	PriorityMed      PriorityType = "medium"
	PriorityHigh     PriorityType = "high"
)

high > medium > disabled > low

type ScopeCreateEvent

type ScopeCreateEvent interface {
	GetSystemEventName() (string, error)
	GetScopeId() (uint32, error)
	GetManifestId() (uint64, error)
}

type ScopeDropEvent

type ScopeDropEvent interface {
	GetScopeId() (uint32, error)
	GetManifestId() (uint64, error)
}

type StatValue

type StatValue struct {
	// The stat key
	Key string
	// The stat value
	Val string
}

StatValue is one of the stats returned from the Stats method.

type SubDocOp added in v0.3.0

type SubDocOp struct {
	Xattr   bool
	Path    string
	Value   []byte
	Counter bool
}

type SystemEventType

type SystemEventType int
const (
	CollectionCreate  SystemEventType = 0
	CollectionDrop    SystemEventType = iota
	CollectionFlush   SystemEventType = iota // KV did not implement
	ScopeCreate       SystemEventType = iota
	ScopeDrop         SystemEventType = iota
	CollectionChanged SystemEventType = iota
)
const InvalidSysEvent SystemEventType = -1

type TapArguments

type TapArguments struct {
	// Timestamp of oldest item to send.
	//
	// Use TapNoBackfill to suppress all past items.
	Backfill uint64
	// If set, server will disconnect after sending existing items.
	Dump bool
	// The indices of the vbuckets to watch; empty/nil to watch all.
	VBuckets []uint16
	// Transfers ownership of vbuckets during cluster rebalance.
	Takeover bool
	// If true, server will wait for client ACK after every notification.
	SupportAck bool
	// If true, client doesn't want values so server shouldn't send them.
	KeysOnly bool
	// If true, client wants the server to send checkpoint events.
	Checkpoint bool
	// Optional identifier to use for this client, to allow reconnects
	ClientName string
	// Registers this client (by name) till explicitly deregistered.
	RegisteredClient bool
}

TapArguments are parameters for requesting a TAP feed.

Call DefaultTapArguments to get a default one.

func DefaultTapArguments

func DefaultTapArguments() TapArguments

DefaultTapArguments returns a default set of parameter values to pass to StartTapFeed.

type TapEvent

type TapEvent struct {
	Opcode     TapOpcode // Type of event
	VBucket    uint16    // VBucket this event applies to
	Flags      uint32    // Item flags
	Expiry     uint32    // Item expiration time
	Key, Value []byte    // Item key/value
	Cas        uint64
}

TapEvent is a TAP notification of an operation on the server.

func (TapEvent) String

func (event TapEvent) String() string

type TapFeed

type TapFeed struct {
	C     <-chan TapEvent
	Error error
	// contains filtered or unexported fields
}

TapFeed represents a stream of events from a server.

func (*TapFeed) Close

func (feed *TapFeed) Close()

Close terminates a TapFeed.

Call this if you stop using a TapFeed before its channel ends.

type TapOpcode

type TapOpcode uint8

TapOpcode is the tap operation type (found in TapEvent)

func (TapOpcode) String

func (opcode TapOpcode) String() string

type Uleb128

type Uleb128 []byte

func (Uleb128) ToUint32 added in v0.1.1

func (u Uleb128) ToUint32(cachedLen int) (result uint32, bytesShifted int)

type UprEvent

type UprEvent struct {
	Opcode          gomemcached.CommandCode // Type of event
	Status          gomemcached.Status      // Response status
	VBucket         uint16                  // VBucket this event applies to
	DataType        uint8                   // data type
	Opaque          uint16                  // 16 MSB of opaque
	VBuuid          uint64                  // This field is set by downstream
	Flags           uint32                  // Item flags
	Expiry          uint32                  // Item expiration time
	Key, Value      []byte                  // Item key/value
	OldValue        []byte                  // TODO: TBD: old document value
	Cas             uint64                  // CAS value of the item
	Seqno           uint64                  // sequence number of the mutation
	RevSeqno        uint64                  // rev sequence number : deletions
	LockTime        uint32                  // Lock time
	MetadataSize    uint16                  // Metadata size
	SnapstartSeq    uint64                  // start sequence number of this snapshot
	SnapendSeq      uint64                  // End sequence number of the snapshot
	SnapshotType    uint32                  // 0: disk 1: memory
	FailoverLog     *FailoverLog            // Failover log containing vvuid and sequnce number
	Error           error                   // Error value in case of a failure
	ExtMeta         []byte                  // Extended Metadata
	AckSize         uint32                  // The number of bytes that can be Acked to DCP
	SystemEvent     SystemEventType         // Only valid if IsSystemEvent() is true
	SysEventVersion uint8                   // Based on the version, the way Extra bytes is parsed is different
	ValueLen        int                     // Cache it to avoid len() calls for performance
	CollectionId    uint32                  // Valid if Collection is in use
	StreamId        *uint16                 // Nil if not in use
}

UprEvent memcached events for UPR streams.

func (*UprEvent) GetCollectionId

func (event *UprEvent) GetCollectionId() (uint32, error)

func (*UprEvent) GetManifestId

func (event *UprEvent) GetManifestId() (uint64, error)

func (*UprEvent) GetMaxTTL

func (event *UprEvent) GetMaxTTL() (uint32, error)

func (*UprEvent) GetOsoBegin added in v0.1.1

func (event *UprEvent) GetOsoBegin() (bool, error)

Only if error is nil: Returns true if event states oso begins Return false if event states oso ends

func (*UprEvent) GetScopeId

func (event *UprEvent) GetScopeId() (uint32, error)

func (*UprEvent) GetSystemEventName

func (event *UprEvent) GetSystemEventName() (string, error)

func (*UprEvent) IsCollectionType

func (event *UprEvent) IsCollectionType() bool

func (*UprEvent) IsOsoSnapshot added in v0.1.1

func (event *UprEvent) IsOsoSnapshot() bool

func (*UprEvent) IsSeqnoAdv

func (event *UprEvent) IsSeqnoAdv() bool

func (*UprEvent) IsSnappyDataType

func (event *UprEvent) IsSnappyDataType() bool

func (*UprEvent) IsStreamEnd added in v0.3.0

func (event *UprEvent) IsStreamEnd() bool

func (*UprEvent) IsSystemEvent

func (event *UprEvent) IsSystemEvent() bool

func (*UprEvent) PopulateEvent

func (event *UprEvent) PopulateEvent(extras []byte)

func (*UprEvent) PopulateFieldsBasedOnStreamType

func (event *UprEvent) PopulateFieldsBasedOnStreamType(rq gomemcached.MCRequest, streamType DcpStreamType)

func (*UprEvent) PopulateOso added in v0.1.1

func (event *UprEvent) PopulateOso(extras []byte)

func (*UprEvent) PopulateSeqnoAdv

func (event *UprEvent) PopulateSeqnoAdv(extras []byte)

func (*UprEvent) PopulateStreamEndFlags added in v0.3.0

func (event *UprEvent) PopulateStreamEndFlags(extras []byte)

func (*UprEvent) String

func (event *UprEvent) String() string

type UprFeatures

type UprFeatures struct {
	Xattribute           bool
	CompressionType      int
	IncludeDeletionTime  bool
	DcpPriority          PriorityType
	EnableExpiry         bool
	EnableStreamId       bool
	EnableOso            bool
	SendStreamEndOnClose bool
	// contains filtered or unexported fields
}

func (*UprFeatures) EnableDeadConnDetection added in v0.3.2

func (f *UprFeatures) EnableDeadConnDetection(thresholdSeconds int) error

Enables client-side dead connection detection. `threshold` should have a minimum value of (2*UPRDefaultNoopInterval).

Refer https://github.com/couchbase/kv_engine/blob/df1df5e3986dbca368834e6e32c98103deeeec1b/docs/dcp/documentation/dead-connections.md

type UprFeed

type UprFeed struct {
	C <-chan *UprEvent // Exported channel for receiving UPR events

	Error error // error
	// contains filtered or unexported fields
}

UprFeed represents an UPR feed. A feed contains a connection to a single host and multiple vBuckets

func (*UprFeed) ClientAck

func (feed *UprFeed) ClientAck(event *UprEvent) error

Client, after completing processing of an UprEvent, need to call this API to notify UprFeed, so that UprFeed can update its ack bytes stats and send ack to DCP if needed Client needs to set ackByClient flag to true in NewUprFeedWithConfig() call as a prerequisite for this call to work This API is not thread safe. Caller should NOT have more than one go rountine calling this API

func (*UprFeed) Close

func (feed *UprFeed) Close()

Close this UprFeed.

func (*UprFeed) CloseStream

func (feed *UprFeed) CloseStream(vbno, opaqueMSB uint16) error

CloseStream for specified vbucket.

func (*UprFeed) Closed

func (feed *UprFeed) Closed() bool

check if the UprFeed has been closed

func (*UprFeed) GetError

func (feed *UprFeed) GetError() error

func (*UprFeed) GetUprEventCh

func (feed *UprFeed) GetUprEventCh() <-chan *UprEvent

func (*UprFeed) GetUprStats

func (feed *UprFeed) GetUprStats() *UprStats

func (*UprFeed) SetPriorityAsync

func (feed *UprFeed) SetPriorityAsync(p PriorityType) error

func (*UprFeed) StartFeed

func (feed *UprFeed) StartFeed() error

StartFeed to start the upper feed.

func (*UprFeed) StartFeedWithConfig

func (feed *UprFeed) StartFeedWithConfig(datachan_len int) error

func (*UprFeed) UprOpen

func (feed *UprFeed) UprOpen(name string, sequence uint32, bufSize uint32) error

UprOpen to connect with a UPR producer. Name: name of te UPR connection sequence: sequence number for the connection bufsize: max size of the application

func (*UprFeed) UprOpenWithFeatures

func (feed *UprFeed) UprOpenWithFeatures(name string, sequence uint32, bufSize uint32, features UprFeatures) (error, UprFeatures)

func (*UprFeed) UprOpenWithXATTR

func (feed *UprFeed) UprOpenWithXATTR(name string, sequence uint32, bufSize uint32) error

UprOpen with XATTR enabled.

func (*UprFeed) UprRequestCollectionsStream

func (feed *UprFeed) UprRequestCollectionsStream(vbno, opaqueMSB uint16, flags uint32,
	vbuuid, startSequence, endSequence, snapStart, snapEnd uint64, filter *CollectionsFilter) error

func (*UprFeed) UprRequestStream

func (feed *UprFeed) UprRequestStream(vbno, opaqueMSB uint16, flags uint32,
	vuuid, startSequence, endSequence, snapStart, snapEnd uint64) error

UprRequestStream for a single vbucket.

type UprFeedIface

type UprFeedIface interface {
	Close()
	Closed() bool
	CloseStream(vbno, opaqueMSB uint16) error
	GetError() error
	GetUprStats() *UprStats
	ClientAck(event *UprEvent) error
	GetUprEventCh() <-chan *UprEvent
	StartFeed() error
	StartFeedWithConfig(datachan_len int) error
	UprOpen(name string, sequence uint32, bufSize uint32) error
	UprOpenWithXATTR(name string, sequence uint32, bufSize uint32) error
	UprOpenWithFeatures(name string, sequence uint32, bufSize uint32, features UprFeatures) (error, UprFeatures)
	UprRequestStream(vbno, opaqueMSB uint16, flags uint32, vuuid, startSequence, endSequence, snapStart, snapEnd uint64) error
	// Set DCP priority on an existing DCP connection. The command is sent asynchronously without waiting for a response
	SetPriorityAsync(p PriorityType) error

	// Various Collection-Type RequestStreams
	UprRequestCollectionsStream(vbno, opaqueMSB uint16, flags uint32, vbuuid, startSeq, endSeq, snapStart, snapEnd uint64, filter *CollectionsFilter) error
}

Exported interface - to allow for mocking

type UprStats

type UprStats struct {
	TotalBytes         uint64
	TotalMutation      uint64
	TotalBufferAckSent uint64
	TotalSnapShot      uint64
}

type UprStream

type UprStream struct {
	Vbucket  uint16 // Vbucket id
	Vbuuid   uint64 // vbucket uuid
	StartSeq uint64 // start sequence number
	EndSeq   uint64 // end sequence number

	StreamType DcpStreamType
	// contains filtered or unexported fields
}

UprStream is per stream data structure over an UPR Connection.

type VBSeqnos

type VBSeqnos [][2]uint64

Containing a pair of vbno and the high seqno

type VbStateType

type VbStateType uint8
const (
	VbAlive   VbStateType = 0x00
	VbActive  VbStateType = 0x01
	VbReplica VbStateType = 0x02
	VbPending VbStateType = 0x03
	VbDead    VbStateType = 0x04
)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL