Documentation ¶
Overview ¶
Package cbdatasource streams data from a Couchbase cluster. It is implemented using Couchbase DCP protocol and has auto-reconnecting and auto-restarting goroutines underneath the hood to provide a simple, high-level cluster-wide abstraction. By using cbdatasource, your application does not need to worry about connections or reconnections to individual server nodes or cluster topology changes, rebalance & failovers. The API starting point is NewBucketDataSource().
Index ¶
- Constants
- Variables
- func ExponentialBackoffLoop(name string, f func() int, startSleepMS int, backoffFactor float32, ...)
- func ParseFailOverLog(body []byte) ([][]uint64, error)
- func UPROpen(mc *memcached.Client, name string, option *BucketDataSourceOptions, ...) error
- type AllServerURLsConnectBucketError
- type AuthFailError
- type Bucket
- type BucketDataSource
- type BucketDataSourceOptions
- type BucketDataSourceStats
- type Receiver
- type ReceiverEx
- type VBucketMetaData
- type VBucketState
Constants ¶
const FeatureEnabledDataType = uint16(0x01)
const FeatureEnabledXAttrs = uint16(0x06)
const FeatureEnabledXError = uint16(0x07)
const FlagOpenIncludeXattrs = uint32(4)
const FlagOpenProducer = uint32(1)
Variables ¶
var DefaultBucketDataSourceOptions = &BucketDataSourceOptions{ ClusterManagerBackoffFactor: 1.5, ClusterManagerSleepInitMS: 100, ClusterManagerSleepMaxMS: 1000, DataManagerBackoffFactor: 1.5, DataManagerSleepInitMS: 100, DataManagerSleepMaxMS: 1000, FeedBufferSizeBytes: 20000000, FeedBufferAckThreshold: 0.2, TraceCapacity: 200, PingTimeoutMS: 30000, IncludeXAttrs: false, }
DefaultBucketDataSourceOptions defines the default options that will be used if nil is provided to NewBucketDataSource().
var ErrXAttrsNotSupported = fmt.Errorf("xattrs not supported by server")
Functions ¶
func ExponentialBackoffLoop ¶
func ExponentialBackoffLoop(name string, f func() int, startSleepMS int, backoffFactor float32, maxSleepMS int)
ExponentialBackoffLoop invokes f() in a loop, sleeping in an exponential number of milliseconds in between invocations if needed. The provided f() function should return < 0 to stop the loop; >= 0 to continue the loop, where > 0 means there was progress which allows an immediate retry of f() with no sleeping. A return of < 0 is useful when f() will never make any future progress. Repeated attempts with no progress will have exponential backoff sleep times.
func ParseFailOverLog ¶
ParseFailOverLog parses a byte array to an array of [vbucketUUID, seqNum] pairs. It is exposed for testability.
Types ¶
type AllServerURLsConnectBucketError ¶
type AllServerURLsConnectBucketError struct {
ServerURLs []string
}
AllServerURLsConnectBucketError is the error type passed to Receiver.OnError() when the BucketDataSource failed to connect to all the serverURL's provided as a parameter to NewBucketDataSource(). The application, for example, may choose to BucketDataSource.Close() based on this error. Otherwise, the BucketDataSource will backoff and retry reconnecting to the serverURL's.
func (*AllServerURLsConnectBucketError) Error ¶
func (e *AllServerURLsConnectBucketError) Error() string
type AuthFailError ¶
AuthFailError is the error type passed to Receiver.OnError() when there is an auth request error to the Couchbase cluster or server node.
func (*AuthFailError) Error ¶
func (e *AuthFailError) Error() string
type Bucket ¶
type Bucket interface { Close() GetUUID() string VBServerMap() *couchbase.VBucketServerMap }
A Bucket interface defines the set of methods that cbdatasource needs from an abstract couchbase.Bucket. This separate interface allows for easier testability.
func ConnectBucket ¶
func ConnectBucket(serverURL, poolName, bucketName string, auth couchbase.AuthHandler) (Bucket, error)
ConnectBucket is the default function used by BucketDataSource to connect to a Couchbase cluster to retrieve Bucket information. It is exposed for testability and to allow applications to override or wrap via BucketDataSourceOptions.
type BucketDataSource ¶
type BucketDataSource interface { // Use Start() to kickoff connectivity to a Couchbase cluster, // after which calls will be made to the Receiver's methods. Start() error // Asynchronously request a cluster map refresh. A reason string // of "" is valid. Kick(reason string) error // Returns an immutable snapshot of stats. Stats(dest *BucketDataSourceStats) error // Stops the underlying goroutines. Close() error }
BucketDataSource is the main control interface returned by NewBucketDataSource().
func NewBucketDataSource ¶
func NewBucketDataSource( serverURLs []string, poolName string, bucketName string, bucketUUID string, vbucketIDs []uint16, auth couchbase.AuthHandler, receiver Receiver, options *BucketDataSourceOptions) (BucketDataSource, error)
NewBucketDataSource is the main starting point for using the cbdatasource API. The application must supply an array of 1 or more serverURLs (or "seed" URL's) to Couchbase Server cluster-manager REST URL endpoints, like "http://localhost:8091". The BucketDataSource (after Start()'ing) will try each serverURL, in turn, until it can get a successful cluster map. Additionally, the application must supply a poolName & bucketName from where the BucketDataSource will retrieve data. The optional bucketUUID is double-checked by the BucketDataSource to ensure we have the correct bucket, and a bucketUUID of "" means skip the bucketUUID validation. An optional array of vbucketID numbers allows the application to specify which vbuckets to retrieve; and the vbucketIDs array can be nil which means all vbuckets are retrieved by the BucketDataSource. The optional auth parameter can be nil. The application must supply its own implementation of the Receiver interface (see the example program as a sample). The optional options parameter (which may be nil) allows the application to specify advanced parameters like backoff and retry-sleep values.
type BucketDataSourceOptions ¶
type BucketDataSourceOptions struct { // Optional - used during UPR_OPEN stream start. If empty a // random name will be automatically generated. Name string // Factor (like 1.5) to increase sleep time between retries // in connecting to a cluster manager node. ClusterManagerBackoffFactor float32 // Initial sleep time (millisecs) before first retry to cluster manager. ClusterManagerSleepInitMS int // Maximum sleep time (millisecs) between retries to cluster manager. ClusterManagerSleepMaxMS int // Factor (like 1.5) to increase sleep time between retries // in connecting to a data manager node. DataManagerBackoffFactor float32 // Initial sleep time (millisecs) before first retry to data manager. DataManagerSleepInitMS int // Maximum sleep time (millisecs) between retries to data manager. DataManagerSleepMaxMS int // Buffer size in bytes provided for UPR flow control. FeedBufferSizeBytes uint32 // Used for UPR flow control and buffer-ack messages when this // percentage of FeedBufferSizeBytes is reached. FeedBufferAckThreshold float32 // Used for applications like backup which wish to control the // last sequence number provided. Key is vbucketID, value is seqEnd. SeqEnd map[uint16]uint64 // Optional function to connect to a couchbase cluster manager bucket. // Defaults to ConnectBucket() function in this package. ConnectBucket func(serverURL, poolName, bucketName string, auth couchbase.AuthHandler) (Bucket, error) // Optional function to connect to a couchbase data manager node. // Defaults to memcached.Connect(). Connect func(protocol, dest string) (*memcached.Client, error) // Optional function for logging diagnostic messages. Logf func(fmt string, v ...interface{}) // When true, message trace information will be captured and // reported via the Logf() callback. TraceCapacity int `json:"-"` // When there's been no send/receive activity for this many // milliseconds, then transmit a NOOP to the DCP source. When 0, // the DefaultBucketDataSourceOptions.PingTimeoutMS is used. Of // note, the NOOP itself counts as send/receive activity. PingTimeoutMS int // IncludeXAttrs is an optional flag which specifies whether // the clients are interested in the X Attributes values // during DCP connection set up. // Defaulted to false to keep it backward compatible. IncludeXAttrs bool }
BucketDataSourceOptions allows the application to provide configuration settings to NewBucketDataSource().
type BucketDataSourceStats ¶
type BucketDataSourceStats struct { TotStart uint64 TotKick uint64 TotKickDeduped uint64 TotKickOk uint64 TotRefreshCluster uint64 TotRefreshClusterConnectBucket uint64 TotRefreshClusterConnectBucketErr uint64 TotRefreshClusterConnectBucketOk uint64 TotRefreshClusterBucketUUIDErr uint64 TotRefreshClusterVBMNilErr uint64 TotRefreshClusterKickWorkers uint64 TotRefreshClusterKickWorkersClosed uint64 TotRefreshClusterKickWorkersStopped uint64 TotRefreshClusterKickWorkersOk uint64 TotRefreshClusterStopped uint64 TotRefreshClusterAwokenClosed uint64 TotRefreshClusterAwokenStopped uint64 TotRefreshClusterAwokenRestart uint64 TotRefreshClusterAwoken uint64 TotRefreshClusterAllServerURLsConnectBucketErr uint64 TotRefreshClusterDone uint64 TotRefreshWorkers uint64 TotRefreshWorkersVBMNilErr uint64 TotRefreshWorkersVBucketIDErr uint64 TotRefreshWorkersServerIdxsErr uint64 TotRefreshWorkersMasterIdxErr uint64 TotRefreshWorkersMasterServerErr uint64 TotRefreshWorkersRemoveWorker uint64 TotRefreshWorkersAddWorker uint64 TotRefreshWorkersKickWorker uint64 TotRefreshWorkersCloseWorker uint64 TotRefreshWorkersLoop uint64 TotRefreshWorkersLoopDone uint64 TotRefreshWorkersDone uint64 TotWorkerStart uint64 TotWorkerDone uint64 TotWorkerBody uint64 TotWorkerBodyKick uint64 TotWorkerConnect uint64 TotWorkerConnectErr uint64 TotWorkerConnectOk uint64 TotWorkerAuth uint64 TotWorkerAuthErr uint64 TotWorkerAuthFail uint64 TotWorkerAuthOk uint64 TotWorkerUPROpenErr uint64 TotWorkerUPROpenOk uint64 TotWorkerAuthenticateMemcachedConn uint64 TotWorkerAuthenticateMemcachedConnErr uint64 TotWorkerAuthenticateMemcachedConnOk uint64 TotWorkerClientClose uint64 TotWorkerClientCloseDone uint64 TotWorkerTransmitStart uint64 TotWorkerTransmit uint64 TotWorkerTransmitErr uint64 TotWorkerTransmitOk uint64 TotWorkerTransmitDone uint64 TotWorkerReceiveStart uint64 TotWorkerReceive uint64 TotWorkerReceiveErr uint64 TotWorkerReceiveOk uint64 TotWorkerReceiveDone uint64 TotWorkerSendEndCh uint64 TotWorkerRecvEndCh uint64 TotWorkerHandleRecv uint64 TotWorkerHandleRecvErr uint64 TotWorkerHandleRecvOk uint64 TotWorkerCleanup uint64 TotWorkerCleanupDone uint64 TotRefreshWorker uint64 TotRefreshWorkerDone uint64 TotRefreshWorkerOk uint64 TotUPRDataChange uint64 TotUPRDataChangeStateErr uint64 TotUPRDataChangeMutation uint64 TotUPRDataChangeDeletion uint64 TotUPRDataChangeExpiration uint64 TotUPRDataChangeErr uint64 TotUPRDataChangeOk uint64 TotUPRCloseStream uint64 TotUPRCloseStreamRes uint64 TotUPRCloseStreamResStateErr uint64 TotUPRCloseStreamResErr uint64 TotUPRCloseStreamResOk uint64 TotUPRStreamReq uint64 TotUPRStreamReqWant uint64 TotUPRStreamReqRes uint64 TotUPRStreamReqResStateErr uint64 TotUPRStreamReqResFail uint64 TotUPRStreamReqResFailNotMyVBucket uint64 TotUPRStreamReqResFailERange uint64 TotUPRStreamReqResFailENoMem uint64 TotUPRStreamReqResRollback uint64 TotUPRStreamReqResRollbackStart uint64 TotUPRStreamReqResRollbackErr uint64 TotUPRStreamReqResWantAfterRollbackErr uint64 TotUPRStreamReqResKick uint64 TotUPRStreamReqResSuccess uint64 TotUPRStreamReqResSuccessOk uint64 TotUPRStreamReqResFLogErr uint64 TotUPRStreamEnd uint64 TotUPRStreamEndStateErr uint64 TotUPRStreamEndKick uint64 TotUPRSnapshot uint64 TotUPRSnapshotStateErr uint64 TotUPRSnapshotStart uint64 TotUPRSnapshotStartErr uint64 TotUPRSnapshotOk uint64 TotUPRNoop uint64 TotUPRControl uint64 TotUPRControlErr uint64 TotUPRBufferAck uint64 TotWantCloseRequestedVBucketErr uint64 TotWantClosingVBucketErr uint64 TotSelectBucketErr uint64 TotHandShakeErr uint64 TotGetVBucketMetaData uint64 TotGetVBucketMetaDataUnmarshalErr uint64 TotGetVBucketMetaDataErr uint64 TotGetVBucketMetaDataOk uint64 TotSetVBucketMetaData uint64 TotSetVBucketMetaDataMarshalErr uint64 TotSetVBucketMetaDataErr uint64 TotSetVBucketMetaDataOk uint64 TotPingTimeout uint64 TotPingReq uint64 TotPingReqDone uint64 }
BucketDataSourceStats is filled by the BucketDataSource.Stats() method. All the metrics here prefixed with "Tot" are monotonic counters: they only increase.
func (*BucketDataSourceStats) AtomicCopyTo ¶
func (s *BucketDataSourceStats) AtomicCopyTo(r *BucketDataSourceStats, fn func(sv uint64, rv uint64) uint64)
AtomicCopyTo copies metrics from s to r (or, from source to result), and also applies an optional fn function. The fn is invoked with metrics from s and r, and can be used to compute additions, subtractions, negations, etc. When fn is nil, AtomicCopyTo behaves as a straight copier.
type Receiver ¶
type Receiver interface { // Invoked in advisory fashion by the BucketDataSource when it // encounters an error. The BucketDataSource will continue to try // to "heal" and restart connections, etc, as necessary. The // Receiver has a recourse during these error notifications of // simply Close()'ing the BucketDataSource. OnError(error) // Invoked by the BucketDataSource when it has received a mutation // from the data source. Receiver implementation is responsible // for making its own copies of the key and request. DataUpdate(vbucketID uint16, key []byte, seq uint64, r *gomemcached.MCRequest) error // Invoked by the BucketDataSource when it has received a deletion // or expiration from the data source. Receiver implementation is // responsible for making its own copies of the key and request. DataDelete(vbucketID uint16, key []byte, seq uint64, r *gomemcached.MCRequest) error // An callback invoked by the BucketDataSource when it has // received a start snapshot message from the data source. The // Receiver implementation, for example, might choose to optimize // persistence perhaps by preparing a batch write to // application-specific storage. SnapshotStart(vbucketID uint16, snapStart, snapEnd uint64, snapType uint32) error // The Receiver should persist the value parameter of // SetMetaData() for retrieval during some future call to // GetMetaData() by the BucketDataSource. The metadata value // should be considered "in-stream", or as part of the sequence // history of mutations. That is, a later Rollback() to some // previous sequence number for a particular vbucketID should // rollback both persisted metadata and regular data. SetMetaData(vbucketID uint16, value []byte) error // GetMetaData() should return the opaque value previously // provided by an earlier call to SetMetaData(). If there was no // previous call to SetMetaData(), such as in the case of a brand // new instance of a Receiver (as opposed to a restarted or // reloaded Receiver), the Receiver should return (nil, 0, nil) // for (value, lastSeq, err), respectively. The lastSeq should be // the last sequence number received and persisted during calls to // the Receiver's DataUpdate() & DataDelete() methods. GetMetaData(vbucketID uint16) (value []byte, lastSeq uint64, err error) // Invoked by the BucketDataSource when the datasource signals a // rollback during stream initialization. Note that both data and // metadata should be rolled back. Rollback(vbucketID uint16, rollbackSeq uint64) error }
A Receiver interface is implemented by the application, or the receiver of data. Calls to methods on this interface will be made by the BucketDataSource using multiple, concurrent goroutines, so the application should implement its own Receiver-side synchronizations if needed.
type ReceiverEx ¶
type ReceiverEx interface { Receiver // Invoked by the BucketDataSource when the datasource signals a // rollback during stream initialization. Note that both data and // metadata should be rolled back. RollbackEx(vbucketID uint16, vbucketUUID uint64, rollbackSeq uint64) error }
A ReceiverEx interface is implemented by the application, or the receiver of data. Calls to methods on this interface will be made by the BucketDataSource using multiple, concurrent goroutines, so the application should implement its own Receiver-side synchronizations if needed.
type VBucketMetaData ¶
type VBucketMetaData struct { SeqStart uint64 `json:"seqStart"` SeqEnd uint64 `json:"seqEnd"` SnapStart uint64 `json:"snapStart"` SnapEnd uint64 `json:"snapEnd"` FailOverLog [][]uint64 `json:"failOverLog"` }
VBucketMetaData is an internal struct is exposed to enable json marshaling.