dcp

package module
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2025 License: MIT Imports: 25 Imported by: 3

README

Go Dcp Go Reference Go Report Card

This repository contains go implementation of a Couchbase Database Change Protocol (DCP) client.

Contents
Why?
Example
package main

import (
  "github.com/Trendyol/go-dcp"
  "github.com/Trendyol/go-dcp/logger"
  "github.com/Trendyol/go-dcp/models"
)

func listener(ctx *models.ListenerContext) {
  switch event := ctx.Event.(type) {
  case models.DcpMutation:
    logger.Log.Info(
      "mutated(vb=%v,eventTime=%v) | id: %v, value: %v | isCreated: %v",
      event.VbID, event.EventTime, string(event.Key), string(event.Value), event.IsCreated(),
    )
  case models.DcpDeletion:
    logger.Log.Info(
      "deleted(vb=%v,eventTime=%v) | id: %v",
      event.VbID, event.EventTime, string(event.Key),
    )
  case models.DcpExpiration:
    logger.Log.Info(
      "expired(vb=%v,eventTime=%v) | id: %v",
      event.VbID, event.EventTime, string(event.Key),
    )
  }

  ctx.Ack()
}

func main() {
  connector, err := dcp.NewDcp("config.yml", listener)
  if err != nil {
    panic(err)
  }

  defer connector.Close()

  connector.Start()
}
Usage
$ go get github.com/Trendyol/go-dcp

Configuration
Variable Type Required Default Description
hosts []string yes - Couchbase host like localhost:8091.
username string yes - Couchbase username.
password string yes - Couchbase password.
bucketName string yes - Couchbase DCP bucket.
dcp.group.name string yes DCP group name for vbuckets.
scopeName string no _default Couchbase scope name.
collectionNames []string no _default Couchbase collection names.
connectionBufferSize uint, string no 20mb Source Bucket tcp connection buffer size (x Node Count). Check this if you get OOM Killed.
maxQueueSize int no 2048 The maximum number of requests that can be queued waiting to be sent to a node. Check this if you get queue overflowed or queue full.
connectionTimeout time.Duration no 1m Couchbase connection timeout.
secureConnection bool no false Enable TLS connection of Couchbase.
rootCAPath string no *not set if secureConnection set true this field is required.
debug bool no false For debugging purpose.
dcp.bufferSize int no 16mb DCP internal queue buffer size (x Node Count). Check this if you get OOM Killed.
dcp.connectionBufferSize uint, string no 20mb DCP tcp connection buffer size (x Node Count). Check this if you get OOM Killed.
dcp.connectionTimeout time.Duration no 1m DCP connection timeout.
dcp.maxQueueSize int no 2048 The maximum number of requests that can be queued waiting to be sent to a node. Check this if you get queue overflowed or queue full.
dcp.listener.skipUntil time.Time no Set this if you want to skip events until certain time.
dcp.group.membership.type string no DCP membership types. couchbase, kubernetesHa, kubernetesStatefulSet, static or dynamic. Check examples for details.
dcp.group.membership.memberNumber int no 1 Set this if membership is static. Other methods will ignore this field.
dcp.group.membership.totalMembers int no 1 Set this if membership is static or kubernetesStatefulSet. Other methods will ignore this field.
dcp.group.membership.rebalanceDelay time.Duration no 30s Works for autonomous mode. If membership is dynamic, it is ignored and set to 0s.
dcp.group.membership.config map[string]string no *not set Set key-values of config. expirySeconds,heartbeatInterval,heartbeatToleranceDuration,monitorInterval,timeout for couchbase type
dcp.config.disableChangeStreams bool no false Set this to true if you did not want to get older versions of changes for Couchbase Server 7.2.0+ using Magma storage buckets
leaderElection.enabled bool no false Set this true for memberships kubernetesHa.
leaderElection.type string no kubernetes Leader Election types. kubernetes
leaderElection.config map[string]string no *not set Set key-values of config. leaseLockName,leaseLockNamespace, leaseDuration, renewDeadline, retryPeriod for kubernetes type.
leaderElection.rpc.port int no 8081 This field is usable for kubernetesStatefulSet membership.
checkpoint.type string no auto Set checkpoint type auto or manual.
checkpoint.autoReset string no earliest Set checkpoint start point to earliest or latest.
checkpoint.interval time.Duration no 1m Checkpoint checking interval.
checkpoint.timeout time.Duration no 1m Checkpoint checking timeout.
healthCheck.disabled bool no false Disable Couchbase connection health check.
healthCheck.interval time.Duration no 1m Couchbase connection health checking interval duration.
healthCheck.timeout time.Duration no 1m Couchbase connection health checking timeout duration.
rollbackMitigation.disabled bool no false Disable reprocessing for roll-backed Vbucket offsets.
rollbackMitigation.interval time.Duration no 1s Persisted sequence numbers polling interval.
rollbackMitigation.configWatchInterval time.Duration no 10s Cluster config changes listener interval.
metadata.type string no couchbase Metadata storing types. file or couchbase.
metadata.readOnly bool no false Set this for debugging state purposes.
metadata.config map[string]string no *not set Set key-values of config. hosts, username, password, bucket,scope,collection,maxQueueSize,connectionBufferSize 5mb is default (x Node Count),connectionTimeout, secureConnection, rootCAPath for couchbase type
api.disabled bool no false Disable metric endpoints
api.port int no 8080 Set API port
metric.path string no /metrics Set metric endpoint path.
logging.level string no info Set logging level.
Environment Variables

These environment variables will overwrite the corresponding configs.

Variable Type Corresponding Config Description
GO_DCP__DCP_GROUP_MEMBERSHIP_MEMBERNUMBER int dcp.group.membership.memberNumber To be able to prevent making deployment to scale up or down.
GO_DCP__DCP_GROUP_MEMBERSHIP_TOTALMEMBERS int dcp.group.membership.totalMembers To be able to prevent making deployment to scale up or down.
Monitoring

The client offers an API that handles different endpoints and expose several metrics.

API
Endpoint Description Debug Mode Body
GET /status Returns a 200 OK status if the client is able to ping the couchbase server successfully.
GET /rebalance Triggers a rebalance operation for the vBuckets.
GET /states/offset Returns the current offsets for each vBucket. x
GET /states/followers Returns the list of follower clients if service discovery enabled x
GET /debug/pprof/* Fiber Pprof x
PUT /membership/info Updates membership info and applies rebalance. {"memberNumber": 1,"totalMembers": 3 }

The Client collects relevant metrics and makes them available at /metrics endpoint. In case you haven't configured a metric.path, the metrics will be exposed at the /metrics.

Exposed metrics
Metric Name Description Labels Value Type
cbgo_mutation_total The total number of mutations on a specific vBucket vbId: ID of the vBucket Counter
cbgo_deletion_total The total number of deletions on a specific vBucket vbId: ID of the vBucket Counter
cbgo_expiration_total The total number of expirations on a specific vBucket vbId: ID of the vBucket Counter
cbgo_agent_queue_current The current number of agent queue address: Couchbase, is dcp: Is Dcp Agent Gauge
cbgo_agent_queue_max The max number of agent queue address: Couchbase, is dcp: Is Dcp Agent Gauge
cbgo_seq_no_current The current sequence number on a specific vBucket vbId: ID of the vBucket Gauge
cbgo_start_seq_no_current The starting sequence number on a specific vBucket vbId: ID of the vBucket Gauge
cbgo_end_seq_no_current The ending sequence number on a specific vBucket vbId: ID of the vBucket Gauge
cbgo_persist_seq_no_current The persist sequence number on a specific vBucket vbId: ID of the vBucket Gauge
cbgo_lag_current The current lag on a specific vBucket vbId: ID of the vBucket Gauge
cbgo_total_lag_current The current total lag N/A Gauge
cbgo_process_latency_ms_current The latest process latency in milliseconds N/A Gauge
cbgo_dcp_latency_ms_current The latest consumed dcp message latency in milliseconds N/A Counter
cbgo_rebalance_current The number of total rebalance N/A Counter
cbgo_active_stream_current The number of total active stream N/A Gauge
cbgo_total_members_current The total number of members in the cluster N/A Gauge
cbgo_member_number_current The number of the current member N/A Gauge
cbgo_membership_type_current The type of membership of the current member Membership type Gauge
cbgo_offset_write_current The latest number of the offset write N/A Gauge
cbgo_offset_write_latency_ms_current The latest offset write latency in milliseconds N/A Gauge
Compatibility
Go DCP Version Minimum Couchbase Server Version
x<1.1.16 6.5.x
1.1.16>=x 5.x.x

Breaking Changes

Date taking effect Version Change How to check
December 14, 2023 v1.1.19 dcp.config.[DisableExpiryOpcode,DisableStreamEndByClient, EnableChangeStreams] removed Review your configs
Examples

Grafana Metric Dashboard

Grafana & Prometheus Example

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Dcp

type Dcp interface {
	WaitUntilReady() chan struct{}
	Start()
	Close()
	Commit()
	GetClient() couchbase.Client
	GetConfig() *config.Dcp
	GetVersion() *couchbase.Version
	SetMetadata(metadata metadata.Metadata)
	SetMetricCollectors(collectors ...prometheus.Collector)
	SetEventHandler(handler models.EventHandler)
}

func NewDcp

func NewDcp(cfg any, listener models.Listener) (Dcp, error)

NewDcp creates a new Dcp client

config: path to a configuration file or a configuration struct listener is a callback function that will be called when a mutation, deletion or expiration event occurs

func NewDcpWithLogger added in v1.1.0

func NewDcpWithLogger(cfg any, listener models.Listener, logrus *logrus.Logger) (Dcp, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL