cbdatasource

package
v0.0.0-...-4905c10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 24, 2018 License: MIT Imports: 15 Imported by: 0

README

cbdatasource

golang library to stream data from a Couchbase cluster

The cbdatasource package is implemented using Couchbase DCP protocol and has auto-reconnecting and auto-restarting goroutines underneath the hood to provide a simple, high-level cluster-wide abstraction. By using cbdatasource, your application does not need to worry about connections or reconnections to individual server nodes or cluster topology changes, rebalance & failovers. The API starting point is NewBucketDataSource().

LICENSE: Apache 2.0

GoDoc

Documentation

Overview

Package cbdatasource streams data from a Couchbase cluster. It is implemented using Couchbase DCP protocol and has auto-reconnecting and auto-restarting goroutines underneath the hood to provide a simple, high-level cluster-wide abstraction. By using cbdatasource, your application does not need to worry about connections or reconnections to individual server nodes or cluster topology changes, rebalance & failovers. The API starting point is NewBucketDataSource().

Index

Constants

View Source
const FeatureEnabledDataType = uint16(0x01)
View Source
const FeatureEnabledXAttrs = uint16(0x06)
View Source
const FeatureEnabledXError = uint16(0x07)
View Source
const FlagOpenIncludeXattrs = uint32(4)
View Source
const FlagOpenProducer = uint32(1)

Variables

View Source
var DefaultBucketDataSourceOptions = &BucketDataSourceOptions{
	ClusterManagerBackoffFactor: 1.5,
	ClusterManagerSleepInitMS:   100,
	ClusterManagerSleepMaxMS:    1000,

	DataManagerBackoffFactor: 1.5,
	DataManagerSleepInitMS:   100,
	DataManagerSleepMaxMS:    1000,

	FeedBufferSizeBytes:    20000000,
	FeedBufferAckThreshold: 0.2,

	TraceCapacity: 200,

	PingTimeoutMS: 30000,

	IncludeXAttrs: false,
}

DefaultBucketDataSourceOptions defines the default options that will be used if nil is provided to NewBucketDataSource().

View Source
var ErrXAttrsNotSupported = fmt.Errorf("xattrs not supported by server")

Functions

func ExponentialBackoffLoop

func ExponentialBackoffLoop(name string,
	f func() int,
	startSleepMS int,
	backoffFactor float32,
	maxSleepMS int)

ExponentialBackoffLoop invokes f() in a loop, sleeping in an exponential number of milliseconds in between invocations if needed. The provided f() function should return < 0 to stop the loop; >= 0 to continue the loop, where > 0 means there was progress which allows an immediate retry of f() with no sleeping. A return of < 0 is useful when f() will never make any future progress. Repeated attempts with no progress will have exponential backoff sleep times.

func ParseFailOverLog

func ParseFailOverLog(body []byte) ([][]uint64, error)

ParseFailOverLog parses a byte array to an array of [vbucketUUID, seqNum] pairs. It is exposed for testability.

func UPROpen

func UPROpen(mc *memcached.Client, name string,
	option *BucketDataSourceOptions, openFlags uint32) error

UPROpen starts a UPR_OPEN stream on a memcached client connection. It is exposed for testability.

Types

type AllServerURLsConnectBucketError

type AllServerURLsConnectBucketError struct {
	ServerURLs []string
}

AllServerURLsConnectBucketError is the error type passed to Receiver.OnError() when the BucketDataSource failed to connect to all the serverURL's provided as a parameter to NewBucketDataSource(). The application, for example, may choose to BucketDataSource.Close() based on this error. Otherwise, the BucketDataSource will backoff and retry reconnecting to the serverURL's.

func (*AllServerURLsConnectBucketError) Error

type AuthFailError

type AuthFailError struct {
	ServerURL string
	User      string
}

AuthFailError is the error type passed to Receiver.OnError() when there is an auth request error to the Couchbase cluster or server node.

func (*AuthFailError) Error

func (e *AuthFailError) Error() string

type Bucket

type Bucket interface {
	Close()
	GetUUID() string
	VBServerMap() *couchbase.VBucketServerMap
}

A Bucket interface defines the set of methods that cbdatasource needs from an abstract couchbase.Bucket. This separate interface allows for easier testability.

func ConnectBucket

func ConnectBucket(serverURL, poolName, bucketName string,
	auth couchbase.AuthHandler) (Bucket, error)

ConnectBucket is the default function used by BucketDataSource to connect to a Couchbase cluster to retrieve Bucket information. It is exposed for testability and to allow applications to override or wrap via BucketDataSourceOptions.

type BucketDataSource

type BucketDataSource interface {
	// Use Start() to kickoff connectivity to a Couchbase cluster,
	// after which calls will be made to the Receiver's methods.
	Start() error

	// Asynchronously request a cluster map refresh.  A reason string
	// of "" is valid.
	Kick(reason string) error

	// Returns an immutable snapshot of stats.
	Stats(dest *BucketDataSourceStats) error

	// Stops the underlying goroutines.
	Close() error
}

BucketDataSource is the main control interface returned by NewBucketDataSource().

func NewBucketDataSource

func NewBucketDataSource(
	serverURLs []string,
	poolName string,
	bucketName string,
	bucketUUID string,
	vbucketIDs []uint16,
	auth couchbase.AuthHandler,
	receiver Receiver,
	options *BucketDataSourceOptions) (BucketDataSource, error)

NewBucketDataSource is the main starting point for using the cbdatasource API. The application must supply an array of 1 or more serverURLs (or "seed" URL's) to Couchbase Server cluster-manager REST URL endpoints, like "http://localhost:8091". The BucketDataSource (after Start()'ing) will try each serverURL, in turn, until it can get a successful cluster map. Additionally, the application must supply a poolName & bucketName from where the BucketDataSource will retrieve data. The optional bucketUUID is double-checked by the BucketDataSource to ensure we have the correct bucket, and a bucketUUID of "" means skip the bucketUUID validation. An optional array of vbucketID numbers allows the application to specify which vbuckets to retrieve; and the vbucketIDs array can be nil which means all vbuckets are retrieved by the BucketDataSource. The optional auth parameter can be nil. The application must supply its own implementation of the Receiver interface (see the example program as a sample). The optional options parameter (which may be nil) allows the application to specify advanced parameters like backoff and retry-sleep values.

type BucketDataSourceOptions

type BucketDataSourceOptions struct {
	// Optional - used during UPR_OPEN stream start.  If empty a
	// random name will be automatically generated.
	Name string

	// Factor (like 1.5) to increase sleep time between retries
	// in connecting to a cluster manager node.
	ClusterManagerBackoffFactor float32

	// Initial sleep time (millisecs) before first retry to cluster manager.
	ClusterManagerSleepInitMS int

	// Maximum sleep time (millisecs) between retries to cluster manager.
	ClusterManagerSleepMaxMS int

	// Factor (like 1.5) to increase sleep time between retries
	// in connecting to a data manager node.
	DataManagerBackoffFactor float32

	// Initial sleep time (millisecs) before first retry to data manager.
	DataManagerSleepInitMS int

	// Maximum sleep time (millisecs) between retries to data manager.
	DataManagerSleepMaxMS int

	// Buffer size in bytes provided for UPR flow control.
	FeedBufferSizeBytes uint32

	// Used for UPR flow control and buffer-ack messages when this
	// percentage of FeedBufferSizeBytes is reached.
	FeedBufferAckThreshold float32

	// Used for applications like backup which wish to control the
	// last sequence number provided.  Key is vbucketID, value is seqEnd.
	SeqEnd map[uint16]uint64

	// Optional function to connect to a couchbase cluster manager bucket.
	// Defaults to ConnectBucket() function in this package.
	ConnectBucket func(serverURL, poolName, bucketName string,
		auth couchbase.AuthHandler) (Bucket, error)

	// Optional function to connect to a couchbase data manager node.
	// Defaults to memcached.Connect().
	Connect func(protocol, dest string) (*memcached.Client, error)

	// Optional function for logging diagnostic messages.
	Logf func(fmt string, v ...interface{})

	// When true, message trace information will be captured and
	// reported via the Logf() callback.
	TraceCapacity int `json:"-"`

	// When there's been no send/receive activity for this many
	// milliseconds, then transmit a NOOP to the DCP source.  When 0,
	// the DefaultBucketDataSourceOptions.PingTimeoutMS is used.  Of
	// note, the NOOP itself counts as send/receive activity.
	PingTimeoutMS int

	// IncludeXAttrs is an optional flag which specifies whether
	// the clients are interested in the X Attributes values
	// during DCP connection set up.
	// Defaulted to false to keep it backward compatible.
	IncludeXAttrs bool
}

BucketDataSourceOptions allows the application to provide configuration settings to NewBucketDataSource().

type BucketDataSourceStats

type BucketDataSourceStats struct {
	TotStart uint64

	TotKick        uint64
	TotKickDeduped uint64
	TotKickOk      uint64

	TotRefreshCluster                              uint64
	TotRefreshClusterConnectBucket                 uint64
	TotRefreshClusterConnectBucketErr              uint64
	TotRefreshClusterConnectBucketOk               uint64
	TotRefreshClusterBucketUUIDErr                 uint64
	TotRefreshClusterVBMNilErr                     uint64
	TotRefreshClusterKickWorkers                   uint64
	TotRefreshClusterKickWorkersClosed             uint64
	TotRefreshClusterKickWorkersStopped            uint64
	TotRefreshClusterKickWorkersOk                 uint64
	TotRefreshClusterStopped                       uint64
	TotRefreshClusterAwokenClosed                  uint64
	TotRefreshClusterAwokenStopped                 uint64
	TotRefreshClusterAwokenRestart                 uint64
	TotRefreshClusterAwoken                        uint64
	TotRefreshClusterAllServerURLsConnectBucketErr uint64
	TotRefreshClusterDone                          uint64

	TotRefreshWorkers                uint64
	TotRefreshWorkersVBMNilErr       uint64
	TotRefreshWorkersVBucketIDErr    uint64
	TotRefreshWorkersServerIdxsErr   uint64
	TotRefreshWorkersMasterIdxErr    uint64
	TotRefreshWorkersMasterServerErr uint64
	TotRefreshWorkersRemoveWorker    uint64
	TotRefreshWorkersAddWorker       uint64
	TotRefreshWorkersKickWorker      uint64
	TotRefreshWorkersCloseWorker     uint64
	TotRefreshWorkersLoop            uint64
	TotRefreshWorkersLoopDone        uint64
	TotRefreshWorkersDone            uint64

	TotWorkerStart      uint64
	TotWorkerDone       uint64
	TotWorkerBody       uint64
	TotWorkerBodyKick   uint64
	TotWorkerConnect    uint64
	TotWorkerConnectErr uint64
	TotWorkerConnectOk  uint64
	TotWorkerAuth       uint64
	TotWorkerAuthErr    uint64
	TotWorkerAuthFail   uint64
	TotWorkerAuthOk     uint64
	TotWorkerUPROpenErr uint64
	TotWorkerUPROpenOk  uint64

	TotWorkerAuthenticateMemcachedConn    uint64
	TotWorkerAuthenticateMemcachedConnErr uint64
	TotWorkerAuthenticateMemcachedConnOk  uint64

	TotWorkerClientClose     uint64
	TotWorkerClientCloseDone uint64

	TotWorkerTransmitStart uint64
	TotWorkerTransmit      uint64
	TotWorkerTransmitErr   uint64
	TotWorkerTransmitOk    uint64
	TotWorkerTransmitDone  uint64

	TotWorkerReceiveStart uint64
	TotWorkerReceive      uint64
	TotWorkerReceiveErr   uint64
	TotWorkerReceiveOk    uint64
	TotWorkerReceiveDone  uint64

	TotWorkerSendEndCh uint64
	TotWorkerRecvEndCh uint64

	TotWorkerHandleRecv    uint64
	TotWorkerHandleRecvErr uint64
	TotWorkerHandleRecvOk  uint64

	TotWorkerCleanup     uint64
	TotWorkerCleanupDone uint64

	TotRefreshWorker     uint64
	TotRefreshWorkerDone uint64
	TotRefreshWorkerOk   uint64

	TotUPRDataChange                       uint64
	TotUPRDataChangeStateErr               uint64
	TotUPRDataChangeMutation               uint64
	TotUPRDataChangeDeletion               uint64
	TotUPRDataChangeExpiration             uint64
	TotUPRDataChangeErr                    uint64
	TotUPRDataChangeOk                     uint64
	TotUPRCloseStream                      uint64
	TotUPRCloseStreamRes                   uint64
	TotUPRCloseStreamResStateErr           uint64
	TotUPRCloseStreamResErr                uint64
	TotUPRCloseStreamResOk                 uint64
	TotUPRStreamReq                        uint64
	TotUPRStreamReqWant                    uint64
	TotUPRStreamReqRes                     uint64
	TotUPRStreamReqResStateErr             uint64
	TotUPRStreamReqResFail                 uint64
	TotUPRStreamReqResFailNotMyVBucket     uint64
	TotUPRStreamReqResFailERange           uint64
	TotUPRStreamReqResFailENoMem           uint64
	TotUPRStreamReqResRollback             uint64
	TotUPRStreamReqResRollbackStart        uint64
	TotUPRStreamReqResRollbackErr          uint64
	TotUPRStreamReqResWantAfterRollbackErr uint64
	TotUPRStreamReqResKick                 uint64
	TotUPRStreamReqResSuccess              uint64
	TotUPRStreamReqResSuccessOk            uint64
	TotUPRStreamReqResFLogErr              uint64
	TotUPRStreamEnd                        uint64
	TotUPRStreamEndStateErr                uint64
	TotUPRStreamEndKick                    uint64
	TotUPRSnapshot                         uint64
	TotUPRSnapshotStateErr                 uint64
	TotUPRSnapshotStart                    uint64
	TotUPRSnapshotStartErr                 uint64
	TotUPRSnapshotOk                       uint64
	TotUPRNoop                             uint64
	TotUPRControl                          uint64
	TotUPRControlErr                       uint64
	TotUPRBufferAck                        uint64

	TotWantCloseRequestedVBucketErr uint64
	TotWantClosingVBucketErr        uint64

	TotSelectBucketErr                uint64
	TotHandShakeErr                   uint64
	TotGetVBucketMetaData             uint64
	TotGetVBucketMetaDataUnmarshalErr uint64
	TotGetVBucketMetaDataErr          uint64
	TotGetVBucketMetaDataOk           uint64

	TotSetVBucketMetaData           uint64
	TotSetVBucketMetaDataMarshalErr uint64
	TotSetVBucketMetaDataErr        uint64
	TotSetVBucketMetaDataOk         uint64

	TotPingTimeout uint64
	TotPingReq     uint64
	TotPingReqDone uint64
}

BucketDataSourceStats is filled by the BucketDataSource.Stats() method. All the metrics here prefixed with "Tot" are monotonic counters: they only increase.

func (*BucketDataSourceStats) AtomicCopyTo

func (s *BucketDataSourceStats) AtomicCopyTo(r *BucketDataSourceStats,
	fn func(sv uint64, rv uint64) uint64)

AtomicCopyTo copies metrics from s to r (or, from source to result), and also applies an optional fn function. The fn is invoked with metrics from s and r, and can be used to compute additions, subtractions, negations, etc. When fn is nil, AtomicCopyTo behaves as a straight copier.

type Receiver

type Receiver interface {
	// Invoked in advisory fashion by the BucketDataSource when it
	// encounters an error.  The BucketDataSource will continue to try
	// to "heal" and restart connections, etc, as necessary.  The
	// Receiver has a recourse during these error notifications of
	// simply Close()'ing the BucketDataSource.
	OnError(error)

	// Invoked by the BucketDataSource when it has received a mutation
	// from the data source.  Receiver implementation is responsible
	// for making its own copies of the key and request.
	DataUpdate(vbucketID uint16, key []byte, seq uint64,
		r *gomemcached.MCRequest) error

	// Invoked by the BucketDataSource when it has received a deletion
	// or expiration from the data source.  Receiver implementation is
	// responsible for making its own copies of the key and request.
	DataDelete(vbucketID uint16, key []byte, seq uint64,
		r *gomemcached.MCRequest) error

	// An callback invoked by the BucketDataSource when it has
	// received a start snapshot message from the data source.  The
	// Receiver implementation, for example, might choose to optimize
	// persistence perhaps by preparing a batch write to
	// application-specific storage.
	SnapshotStart(vbucketID uint16, snapStart, snapEnd uint64, snapType uint32) error

	// The Receiver should persist the value parameter of
	// SetMetaData() for retrieval during some future call to
	// GetMetaData() by the BucketDataSource.  The metadata value
	// should be considered "in-stream", or as part of the sequence
	// history of mutations.  That is, a later Rollback() to some
	// previous sequence number for a particular vbucketID should
	// rollback both persisted metadata and regular data.
	SetMetaData(vbucketID uint16, value []byte) error

	// GetMetaData() should return the opaque value previously
	// provided by an earlier call to SetMetaData().  If there was no
	// previous call to SetMetaData(), such as in the case of a brand
	// new instance of a Receiver (as opposed to a restarted or
	// reloaded Receiver), the Receiver should return (nil, 0, nil)
	// for (value, lastSeq, err), respectively.  The lastSeq should be
	// the last sequence number received and persisted during calls to
	// the Receiver's DataUpdate() & DataDelete() methods.
	GetMetaData(vbucketID uint16) (value []byte, lastSeq uint64, err error)

	// Invoked by the BucketDataSource when the datasource signals a
	// rollback during stream initialization.  Note that both data and
	// metadata should be rolled back.
	Rollback(vbucketID uint16, rollbackSeq uint64) error
}

A Receiver interface is implemented by the application, or the receiver of data. Calls to methods on this interface will be made by the BucketDataSource using multiple, concurrent goroutines, so the application should implement its own Receiver-side synchronizations if needed.

type ReceiverEx

type ReceiverEx interface {
	Receiver
	// Invoked by the BucketDataSource when the datasource signals a
	// rollback during stream initialization.  Note that both data and
	// metadata should be rolled back.
	RollbackEx(vbucketID uint16, vbucketUUID uint64, rollbackSeq uint64) error
}

A ReceiverEx interface is implemented by the application, or the receiver of data. Calls to methods on this interface will be made by the BucketDataSource using multiple, concurrent goroutines, so the application should implement its own Receiver-side synchronizations if needed.

type VBucketMetaData

type VBucketMetaData struct {
	SeqStart    uint64     `json:"seqStart"`
	SeqEnd      uint64     `json:"seqEnd"`
	SnapStart   uint64     `json:"snapStart"`
	SnapEnd     uint64     `json:"snapEnd"`
	FailOverLog [][]uint64 `json:"failOverLog"`
}

VBucketMetaData is an internal struct is exposed to enable json marshaling.

type VBucketState

type VBucketState struct {
	// Valid values for state: "" (dead/closed/unknown), "requested",
	// "running", "closing".
	State     string
	SnapStart uint64
	SnapEnd   uint64
	SnapSaved bool // True when the snapStart/snapEnd have been persisted.
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL