dcpelasticsearch

package module
v1.2.0-rc.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 7, 2024 License: MIT Imports: 22 Imported by: 0

README

Go Dcp Elasticsearch

Go Reference Go Report Card

Go implementation of the Elasticsearch Connect Couchbase.

Go Dcp Elasticsearch streams documents from Couchbase Database Change Protocol (DCP) and writes to Elasticsearch index in near real-time.

Features

  • Less resource usage and higher throughput(see Benchmarks).
  • Custom routing support(see Example).
  • Update multiple documents for a DCP event(see Example).
  • Handling different DCP events such as expiration, deletion and mutation(see Example).
  • Elasticsearch compression request body support.
  • Managing batch configurations such as maximum batch size, batch bytes, batch ticker durations.
  • Scale up and down by custom membership algorithms(Couchbase, KubernetesHa, Kubernetes StatefulSet or Static, see examples).
  • Easily manageable configurations.

Benchmarks

The benchmark was made with the 1,001,006 Couchbase document, because it is possible to more clearly observe the difference in the batch structure between the two packages. Default configurations for Java Elasticsearch Connect Couchbase used for both connectors.

Package Time to Process Events Elasticsearch Indexing Rate(/s) Average CPU Usage(Core) Average Memory Usage
Go Dcp Elasticsearch(Go 1.20) 50s go 0.486 408MB
Java Elasticsearch Connect Couchbase(JDK15) 80s go 0.31 1091MB

Example

Struct Config

func mapper(event couchbase.Event) []document.ESActionDocument {
  if event.IsMutated {
    e := document.NewIndexAction(event.Key, event.Value, nil)
    return []document.ESActionDocument{e}
  }
  e := document.NewDeleteAction(event.Key, nil)
  return []document.ESActionDocument{e}
}

func main() {
  connector, err := dcpelasticsearch.NewConnectorBuilder(config.Config{
    Elasticsearch: config.Elasticsearch{
      CollectionIndexMapping: map[string]string{
        "_default": "indexname",
      },
      Urls: []string{"http://localhost:9200"},
    },
    Dcp: dcpConfig.Dcp{
      Username:   "user",
      Password:   "password",
      BucketName: "dcp-test",
      Hosts:      []string{"localhost:8091"},
      Dcp: dcpConfig.ExternalDcp{
        Group: dcpConfig.DCPGroup{
          Name: "groupName",
          Membership: dcpConfig.DCPGroupMembership{
            Type: "static",
          },
        },
      },
      Metadata: dcpConfig.Metadata{
        Config: map[string]string{
          "bucket":     "checkpoint-bucket-name",
          "scope":      "_default",
          "collection": "_default",
        },
        Type: "couchbase",
      },
    },
  }).
    SetMapper(mapper).
    Build()
  if err != nil {
    panic(err)
  }

  defer connector.Close()
  connector.Start()
}

File Config

Default Mapper

Configuration

Dcp Configuration

Check out on go-dcp

Elasticsearch Specific Configuration
Variable Type Required Default Description
elasticsearch.collectionIndexMapping map[string]string yes Defines which Couchbase collection events will be written to which index
elasticsearch.urls []string yes Elasticsearch connection urls
elasticsearch.username string no The username of Elasticsearch
elasticsearch.password string no The password of Elasticsearch
elasticsearch.typeName string no Defines Elasticsearch index type name
elasticsearch.batchSizeLimit int no 1000 Maximum message count for batch, if exceed flush will be triggered.
elasticsearch.batchTickerDuration time.Duration no 10s Batch is being flushed automatically at specific time intervals for long waiting messages in batch.
elasticsearch.batchByteSizeLimit int, string no 10mb Maximum size(byte) for batch, if exceed flush will be triggered. 10mb is default.
elasticsearch.maxConnsPerHost int no 512 Maximum number of connections per each host which may be established
elasticsearch.maxIdleConnDuration time.Duration no 10s Idle keep-alive connections are closed after this duration.
elasticsearch.compressionEnabled boolean no false Compression can be used if message size is large, CPU usage may be affected.
elasticsearch.concurrentRequest int no 1 Concurrent bulk request count
elasticsearch.disableDiscoverNodesOnStart boolean no false Disable discover nodes when initializing the client.
elasticsearch.discoverNodesInterval time.Duration no 5m Discover nodes periodically
elasticsearch.rejectionLog.index string no cbes-rejects Rejection log index name. cbes-rejects is default.
elasticsearch.rejectionLog.includeSource boolean no false Includes rejection log source info. false is default.

Exposed metrics

Metric Name Description Labels Value Type
elasticsearch_connector_latency_ms Time to adding to the batch. N/A Gauge
elasticsearch_connector_bulk_request_process_latency_ms Time to process bulk request. N/A Gauge
elasticsearch_connector_action_total Count elasticsearch actions action_type: Type of action (e.g., delete, index) result: Result of the action (e.g., success, error) index_name: The name of the index to which the action is applied Counter

You can also use all DCP-related metrics explained here. All DCP-related metrics are automatically injected. It means you don't need to do anything.

Grafana Metric Dashboard

Grafana & Prometheus Example

Contributing

Go Dcp Elasticsearch is always open for direct contributions. For more information please check our Contribution Guideline document.

License

Released under the MIT License.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DefaultMapper

func DefaultMapper(event couchbase.Event) []document.ESActionDocument

Types

type Connector

type Connector interface {
	Start()
	Close()
	GetDcpClient() dcpCouchbase.Client
}

type ConnectorBuilder

type ConnectorBuilder struct {
	// contains filtered or unexported fields
}

func NewConnectorBuilder

func NewConnectorBuilder(config any) *ConnectorBuilder

func (*ConnectorBuilder) Build

func (c *ConnectorBuilder) Build() (Connector, error)

func (*ConnectorBuilder) SetLogger

func (c *ConnectorBuilder) SetLogger(logrus *logrus.Logger) *ConnectorBuilder

func (*ConnectorBuilder) SetMapper

func (c *ConnectorBuilder) SetMapper(mapper Mapper) *ConnectorBuilder

func (*ConnectorBuilder) SetSinkResponseHandler added in v1.1.28

func (c *ConnectorBuilder) SetSinkResponseHandler(sinkResponseHandler dcpElasticsearch.SinkResponseHandler) *ConnectorBuilder

type DcpEventHandler added in v0.0.40

type DcpEventHandler struct {
	// contains filtered or unexported fields
}

func (*DcpEventHandler) AfterRebalanceEnd added in v0.0.40

func (h *DcpEventHandler) AfterRebalanceEnd()

func (*DcpEventHandler) AfterRebalanceStart added in v0.0.40

func (h *DcpEventHandler) AfterRebalanceStart()

func (*DcpEventHandler) AfterStreamStart added in v0.0.40

func (h *DcpEventHandler) AfterStreamStart()

func (*DcpEventHandler) AfterStreamStop added in v0.0.40

func (h *DcpEventHandler) AfterStreamStop()

func (*DcpEventHandler) BeforeRebalanceEnd added in v0.0.40

func (h *DcpEventHandler) BeforeRebalanceEnd()

func (*DcpEventHandler) BeforeRebalanceStart added in v0.0.40

func (h *DcpEventHandler) BeforeRebalanceStart()

func (*DcpEventHandler) BeforeStreamStart added in v0.0.40

func (h *DcpEventHandler) BeforeStreamStart()

func (*DcpEventHandler) BeforeStreamStop added in v0.0.40

func (h *DcpEventHandler) BeforeStreamStop()

type Mapper

type Mapper func(event couchbase.Event) []document.ESActionDocument

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL