cassandra

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 10, 2024 License: Apache-2.0 Imports: 14 Imported by: 0

README

Cassandra implementation of Broker interface

The API was tested with Cassandra 3 and supports:

  • UDT (User Defined Types) / embedded structs and honors gocql.Marshaler/gocql.Unmarshaler
  • handling all primitive types (like int aliases , IP address);
    • net.IP can be stored as ipnet
    • net.IPNet can be stored with a MarshalCQL/UnmarshalCQL wrapper go structure
  • dumping all rows except for Table
  • quering by secondary indexes
  • mocking of gocql behavior (using gockle library)in automated unit tests

Cassandra Timeouts

The API will allow the client to configure either single node or multi-node cluster. Also, the client can configure following timeouts:

  • DialTimeout
    • Initial connection timeout, used during initial dial to server
    • Default value is 600ms
  • OpTimeout
    • Connection timeout, used during executing query
    • Default value is 600ms
  • RedialInterval
    • If not zero, gocql attempt to reconnect known DOWN nodes in every ReconnectSleep (ReconnectInterval)
    • Default value is 60s
  • Example
    config := &cassandra.Config{
        Endpoints:      127.0.0.1,
        Port:           9042,
        DialTimeout:    600 * time.Millisecond,
        OpTimeout:      600 * time.Millisecond,
        RedialInterval: 60 * time.Second,
    }

    clientConfig, err := cassandra.ConfigToClientConfig(config)

The timeout parameters are defined here config.go

Supported by underlying gocql structure ClusterConfig

Cassandra Data Consistency

The API will allow the client to configure consistency level for both

  • Session
  • Query (to be implemented)

Supported by underlying gocql structure Session

Factors to be considered for achieving desired consistency level.

  • Replication strategy

    • A replication strategy determines the nodes where replicas are placed.
      • SimpleStrategy: Use for a single data center only.
      • NetworkTopologyStrategy: Used for more than one data center.
  • Replication factor

    • The total number of replicas across the cluster is referred to as the replication factor.
    • A replication factor of 1 means that there is only one copy of each row on one node.
    • A replication factor of 2 means that there are two copies of each row, and each copy is stored on a different node.
    • The replication factor should not exceed the number of nodes in the cluster.
  • To achieve Quorum

    • Quorum = (sum_of_replication_factors / 2) + 1
    • (nodes_written + nodes_read) > replication_factor
  • References

Documentation

Overview

Package cassandra is the implementation of the SQL Data Broker client API for the Cassandra data store. See cn-infra/db/sql for the definition of the key-value Data Broker client API.

The entity that provides access to the data store is called gocql.Session (wrapped by Broker for convenience).

+--------+             +----------+          crud           +-----------+
| Broker |    ---->    | Session  |          ---->          | Cassandra |
+--------+             +----------+                         +-----------+

To create a Session use the following function

import "github.com/gocql/gocql"

cluster := gocql.NewCluster("172.17.0.1")
cluster.Keyspace = "demo"
session, err := cluster.CreateSession()

Then create broker instance:

import (
      "github.com/ligato/cn-infra/db/sql/cassandra"
      "github.com/willfaught/gockle"
)
db := cassandra.NewBrokerUsingSession(gockle.NewSession(session)))

To insert single key-value pair into Cassandra run (both values are pointers, JamesBond is instance of User struct.):

db.Put(sql.PK(&JamesBond.ID), JamesBond)

To remove a value identified by key:

datasync.Delete(sql.FROM(JamesBond, sql.WHERE(sql.PK(&JamesBond.ID)))

To retrieve a value identified by key (both values are pointers):

data, found, rev, err := db.GetValue(sql.FROM(UserTable, sql.WHERE(sql.Field(&UserTable.ID, sql.EQ("James Bond"))))
if err == nil && found {
   ...
}

To retrieve all values matching a key prefix:

itr, err := db.ListValues(sql.FROM(UserTable, sql.WHERE(sql.Field(&UserTable.LastName, sql.EQ("Bond"))))
if err != nil {
   for {
      data, allReceived, rev, err := itr.GetNext()
      if allReceived {
          break
      }
      if err != nil {
          return err
      }
      process data...
   }
}

To retrieve values more conveniently directrly in slice (without using iterator):

users := &[]User{}
 err := sql.SliceIt(users, db.ListValues(sql.FROM(UserTable,
              sql.WHERE(sql.Field(&UserTable.LastName, sql.EQ("Bond"))))

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrMissingVisitorEntity is error returned when visitor is missing entity.
	ErrMissingVisitorEntity = errors.New("cassandra: visitor is missing entity")

	// ErrMissingEntityField is error returned when visitor entity is missing field.
	ErrMissingEntityField = errors.New("cassandra: visitor entity is missing field")

	// ErrUnexportedEntityField is error returned when visitor entity has unexported field.
	ErrUnexportedEntityField = errors.New("cassandra: visitor entity with unexported field")

	// ErrInvalidEndpointConfig is error returned when endpoint and port are not in valid format.
	ErrInvalidEndpointConfig = errors.New("cassandra: invalid configuration, endpoint and port not in valid format")
)
View Source
var DefaultPlugin = *NewPlugin()

DefaultPlugin is a default instance of Plugin.

Functions

func CreateSessionFromConfig

func CreateSessionFromConfig(config *ClientConfig) (*gocql.Session, error)

CreateSessionFromConfig creates and initializes the cluster based on the supplied config and returns a new session object that can be used to interact with the database. The function propagates errors returned from gocql.CreateSession().

func ExpToString

func ExpToString(exp sql.Expression) (sql string, bindings []interface{}, err error)

ExpToString converts expression to string & slice of bindings

func HostsAsString

func HostsAsString(hostArr []string) string

HostsAsString converts an array of hosts addresses into a comma separated string

func PutExpToString

func PutExpToString(whereCondition sql.Expression, entity interface{}) (sqlStr string, bindings []interface{},
	err error)

PutExpToString converts expression to string & slice of bindings

func SelectExpToString

func SelectExpToString(fromWhere sql.Expression) (sqlStr string, bindings []interface{},
	err error)

SelectExpToString converts expression to string & slice of bindings

func SliceOfFieldsWithValPtrs

func SliceOfFieldsWithValPtrs(val interface{}) (fieldNames []string, vals []interface{})

SliceOfFieldsWithValPtrs generates slice of translated (cql tag) field names with field values used for unit testing purposes only - list_values test

Types

type BrokerCassa

type BrokerCassa struct {
	// contains filtered or unexported fields
}

BrokerCassa implements interface db.Broker. This implementation simplifies work with gocql in the way that it is not need to write "SQL" queries. But the "SQL" is not really hidden, one can use it if needed. The "SQL" queries are generated from the go structures (see more details in Put, Delete, Key, GetValue, ListValues).

func NewBrokerUsingSession

func NewBrokerUsingSession(gocqlSession gockle.Session) *BrokerCassa

NewBrokerUsingSession is a Broker constructor. Use it like this:

session := gockle.NewSession(gocql.NewCluster("172.17.0.1")) defer db.Close() db := NewBrokerUsingSession(session) db.ListValues(...)

func (*BrokerCassa) Delete

func (pdb *BrokerCassa) Delete(fromWhere sql.Expression) error

Delete - see the description in interface sql.Broker.ExecPut() Delete generates statement & binding for gocql Exec()

func (*BrokerCassa) Exec

func (pdb *BrokerCassa) Exec(statement string, binding ...interface{}) error

Exec - see the description in interface sql.Broker.ExecPut() Exec runs statement (AS-IS) using gocql

func (*BrokerCassa) GetValue

func (pdb *BrokerCassa) GetValue(query sql.Expression, reqObj interface{}) (found bool, err error)

GetValue - see the description in interface sql.Broker.GetValue() GetValue just iterate once for ListValues()

func (*BrokerCassa) ListValues

func (pdb *BrokerCassa) ListValues(query sql.Expression) sql.ValIterator

ListValues retrieves an iterator for elements stored under the provided key. ListValues runs query (AS-IS) using gocql Scan Iterator.

func (*BrokerCassa) NewTxn

func (pdb *BrokerCassa) NewTxn() sql.Txn

NewTxn creates a new Data Broker transaction. A transaction can hold multiple operations that are all committed to the data store together. After a transaction has been created, one or more operations (put or delete) can be added to the transaction before it is committed.

func (*BrokerCassa) Put

func (pdb *BrokerCassa) Put(where sql.Expression, pointerToAStruct interface{}) error

Put - see the description in interface sql.Broker.Put(). Put generates statement & binding for gocql Exec(). Any error returned from gockle.Session.Exec is propagated upwards.

type ClientConfig

type ClientConfig struct {
	*gocql.ClusterConfig
}

ClientConfig wrapping gocql ClusterConfig

func ConfigToClientConfig

func ConfigToClientConfig(ymlConfig *Config) (*ClientConfig, error)

ConfigToClientConfig transforms the yaml configuration into ClientConfig. If the configuration of endpoints is invalid, error ErrInvalidEndpointConfig is returned.

type Config

type Config struct {
	// A list of host addresses of cluster nodes.
	Endpoints []string `json:"endpoints"`

	// port for Cassandra (default: 9042)
	Port int `json:"port"`

	// session timeout (default: 600ms)
	OpTimeout time.Duration `json:"op_timeout"`

	// initial session timeout, used during initial dial to server (default: 600ms)
	DialTimeout time.Duration `json:"dial_timeout"`

	// If not zero, gocql attempt to reconnect known DOWN nodes in every ReconnectSleep.
	RedialInterval time.Duration `json:"redial_interval"`

	// ProtoVersion sets the version of the native protocol to use, this will
	// enable features in the driver for specific protocol versions, generally this
	// should be set to a known version (2,3,4) for the cluster being connected to.
	//
	// If it is 0 or unset (the default) then the driver will attempt to discover the
	// highest supported protocol for the cluster. In clusters with nodes of different
	// versions the protocol selected is not defined (ie, it can be any of the supported in the cluster)
	ProtocolVersion int `json:"protocol_version"`

	//TLS used to configure TLS
	TLS TLS `json:"tls"`
}

Config Configuration for Cassandra clients loaded from a configuration file

type Deps

type Deps struct {
	infra.PluginDeps
	StatusCheck statuscheck.PluginStatusWriter // inject
}

Deps is here to group injected dependencies of plugin to not mix with other plugin fields.

type ErrIterator

type ErrIterator struct {
	LastError error
}

ErrIterator is an iterator that stops immediately and just returns last error on Close()

func (*ErrIterator) Close

func (it *ErrIterator) Close() error

Close the iterator. Note, the error is important (may occure during marshalling/un-marshalling)

func (*ErrIterator) GetNext

func (it *ErrIterator) GetNext(outVal interface{}) (stop bool)

GetNext returns the following item from the result set. If data was returned, found is set to true. argument "outVal" can be: - pointer to structure - map

type Option

type Option func(*Plugin)

Option is a function that can be used in NewPlugin to customize Plugin.

func UseDeps

func UseDeps(cb func(*Deps)) Option

UseDeps returns Option that can inject custom dependencies.

type Plugin

type Plugin struct {
	Deps
	// contains filtered or unexported fields
}

Plugin implements Plugin interface therefore can be loaded with other plugins

func FromExistingSession

func FromExistingSession(session gockle.Session) *Plugin

FromExistingSession is used mainly for testing

func NewPlugin

func NewPlugin(opts ...Option) *Plugin

NewPlugin creates a new Plugin with the provided Options.

func (*Plugin) AfterInit

func (p *Plugin) AfterInit() error

AfterInit registers Cassandra to status check.

func (*Plugin) Close

func (p *Plugin) Close() error

Close resources

func (*Plugin) Init

func (p *Plugin) Init() (err error)

Init is called at plugin startup. The session to Cassandra is established.

func (*Plugin) NewBroker

func (p *Plugin) NewBroker() sql.Broker

NewBroker returns a Broker instance to work with Cassandra Data Base

type TLS

type TLS struct {
	Certfile               string `json:"cert_path"`                // client certificate
	Keyfile                string `json:"key_path"`                 // client private key
	CAfile                 string `json:"ca_path"`                  // certificate authority
	EnableHostVerification bool   `json:"enable_host_verification"` // whether to skip verification of server name & certificate
	Enabled                bool   `json:"enabled"`                  // enable/disable TLS
}

TLS used to configure TLS

type ValIterator

type ValIterator struct {
	Delegate gockle.Iterator
}

ValIterator is an iterator returned by ListValues call

func (*ValIterator) Close

func (it *ValIterator) Close() error

Close the iterator. Note, the error is important (may occure during marshalling/un-marshalling)

func (*ValIterator) GetNext

func (it *ValIterator) GetNext(outVal interface{}) (stop bool)

GetNext returns the following item from the result set. If data was returned, found is set to true. argument "outVal" can be: - pointer to structure - map

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL