README ¶
Cassandra implementation of Broker interface
The API was tested with Cassandra 3 and supports:
- UDT (User Defined Types) / embedded structs and honors gocql.Marshaler/gocql.Unmarshaler
- handling all primitive types (like int aliases , IP address);
- net.IP can be stored as ipnet
- net.IPNet can be stored with a MarshalCQL/UnmarshalCQL wrapper go structure
- dumping all rows except for Table
- quering by secondary indexes
- mocking of gocql behavior (using gockle library)in automated unit tests
Cassandra Timeouts
The API will allow the client to configure either single node or multi-node cluster. Also, the client can configure following timeouts:
- DialTimeout
- Initial connection timeout, used during initial dial to server
- Default value is 600ms
- OpTimeout
- Connection timeout, used during executing query
- Default value is 600ms
- RedialInterval
- If not zero, gocql attempt to reconnect known DOWN nodes in every ReconnectSleep (ReconnectInterval)
- Default value is 60s
- Example
config := &cassandra.Config{
Endpoints: 127.0.0.1,
Port: 9042,
DialTimeout: 600 * time.Millisecond,
OpTimeout: 600 * time.Millisecond,
RedialInterval: 60 * time.Second,
}
clientConfig, err := cassandra.ConfigToClientConfig(config)
The timeout parameters are defined here config.go
Supported by underlying gocql structure ClusterConfig
Cassandra Data Consistency
The API will allow the client to configure consistency level for both
- Session
- Query (to be implemented)
Supported by underlying gocql structure Session
Factors to be considered for achieving desired consistency level.
-
Replication strategy
- A replication strategy determines the nodes where replicas are placed.
- SimpleStrategy: Use for a single data center only.
- NetworkTopologyStrategy: Used for more than one data center.
- A replication strategy determines the nodes where replicas are placed.
-
Replication factor
- The total number of replicas across the cluster is referred to as the replication factor.
- A replication factor of 1 means that there is only one copy of each row on one node.
- A replication factor of 2 means that there are two copies of each row, and each copy is stored on a different node.
- The replication factor should not exceed the number of nodes in the cluster.
-
To achieve Quorum
- Quorum = (sum_of_replication_factors / 2) + 1
- (nodes_written + nodes_read) > replication_factor
-
References
Documentation ¶
Overview ¶
Package cassandra is the implementation of the SQL Data Broker client API for the Cassandra data store. See cn-infra/db/sql for the definition of the key-value Data Broker client API.
The entity that provides access to the data store is called gocql.Session (wrapped by Broker for convenience).
+--------+ +----------+ crud +-----------+ | Broker | ----> | Session | ----> | Cassandra | +--------+ +----------+ +-----------+
To create a Session use the following function
import "github.com/gocql/gocql" cluster := gocql.NewCluster("172.17.0.1") cluster.Keyspace = "demo" session, err := cluster.CreateSession()
Then create broker instance:
import ( "github.com/ligato/cn-infra/db/sql/cassandra" "github.com/willfaught/gockle" ) db := cassandra.NewBrokerUsingSession(gockle.NewSession(session)))
To insert single key-value pair into Cassandra run (both values are pointers, JamesBond is instance of User struct.):
db.Put(sql.PK(&JamesBond.ID), JamesBond)
To remove a value identified by key:
datasync.Delete(sql.FROM(JamesBond, sql.WHERE(sql.PK(&JamesBond.ID)))
To retrieve a value identified by key (both values are pointers):
data, found, rev, err := db.GetValue(sql.FROM(UserTable, sql.WHERE(sql.Field(&UserTable.ID, sql.EQ("James Bond")))) if err == nil && found { ... }
To retrieve all values matching a key prefix:
itr, err := db.ListValues(sql.FROM(UserTable, sql.WHERE(sql.Field(&UserTable.LastName, sql.EQ("Bond")))) if err != nil { for { data, allReceived, rev, err := itr.GetNext() if allReceived { break } if err != nil { return err } process data... } }
To retrieve values more conveniently directrly in slice (without using iterator):
users := &[]User{} err := sql.SliceIt(users, db.ListValues(sql.FROM(UserTable, sql.WHERE(sql.Field(&UserTable.LastName, sql.EQ("Bond"))))
Index ¶
- Variables
- func CreateSessionFromConfig(config *ClientConfig) (*gocql.Session, error)
- func ExpToString(exp sql.Expression) (sql string, bindings []interface{}, err error)
- func HostsAsString(hostArr []string) string
- func PutExpToString(whereCondition sql.Expression, entity interface{}) (sqlStr string, bindings []interface{}, err error)
- func SelectExpToString(fromWhere sql.Expression) (sqlStr string, bindings []interface{}, err error)
- func SliceOfFieldsWithValPtrs(val interface{}) (fieldNames []string, vals []interface{})
- type BrokerCassa
- func (pdb *BrokerCassa) Delete(fromWhere sql.Expression) error
- func (pdb *BrokerCassa) Exec(statement string, binding ...interface{}) error
- func (pdb *BrokerCassa) GetValue(query sql.Expression, reqObj interface{}) (found bool, err error)
- func (pdb *BrokerCassa) ListValues(query sql.Expression) sql.ValIterator
- func (pdb *BrokerCassa) NewTxn() sql.Txn
- func (pdb *BrokerCassa) Put(where sql.Expression, pointerToAStruct interface{}) error
- type ClientConfig
- type Config
- type Deps
- type ErrIterator
- type Option
- type Plugin
- type TLS
- type ValIterator
Constants ¶
This section is empty.
Variables ¶
var ( // ErrMissingVisitorEntity is error returned when visitor is missing entity. ErrMissingVisitorEntity = errors.New("cassandra: visitor is missing entity") // ErrMissingEntityField is error returned when visitor entity is missing field. ErrMissingEntityField = errors.New("cassandra: visitor entity is missing field") // ErrUnexportedEntityField is error returned when visitor entity has unexported field. ErrUnexportedEntityField = errors.New("cassandra: visitor entity with unexported field") // ErrInvalidEndpointConfig is error returned when endpoint and port are not in valid format. ErrInvalidEndpointConfig = errors.New("cassandra: invalid configuration, endpoint and port not in valid format") )
var DefaultPlugin = *NewPlugin()
DefaultPlugin is a default instance of Plugin.
Functions ¶
func CreateSessionFromConfig ¶
func CreateSessionFromConfig(config *ClientConfig) (*gocql.Session, error)
CreateSessionFromConfig creates and initializes the cluster based on the supplied config and returns a new session object that can be used to interact with the database. The function propagates errors returned from gocql.CreateSession().
func ExpToString ¶
func ExpToString(exp sql.Expression) (sql string, bindings []interface{}, err error)
ExpToString converts expression to string & slice of bindings
func HostsAsString ¶
HostsAsString converts an array of hosts addresses into a comma separated string
func PutExpToString ¶
func PutExpToString(whereCondition sql.Expression, entity interface{}) (sqlStr string, bindings []interface{}, err error)
PutExpToString converts expression to string & slice of bindings
func SelectExpToString ¶
func SelectExpToString(fromWhere sql.Expression) (sqlStr string, bindings []interface{}, err error)
SelectExpToString converts expression to string & slice of bindings
func SliceOfFieldsWithValPtrs ¶
func SliceOfFieldsWithValPtrs(val interface{}) (fieldNames []string, vals []interface{})
SliceOfFieldsWithValPtrs generates slice of translated (cql tag) field names with field values used for unit testing purposes only - list_values test
Types ¶
type BrokerCassa ¶
type BrokerCassa struct {
// contains filtered or unexported fields
}
BrokerCassa implements interface db.Broker. This implementation simplifies work with gocql in the way that it is not need to write "SQL" queries. But the "SQL" is not really hidden, one can use it if needed. The "SQL" queries are generated from the go structures (see more details in Put, Delete, Key, GetValue, ListValues).
func NewBrokerUsingSession ¶
func NewBrokerUsingSession(gocqlSession gockle.Session) *BrokerCassa
NewBrokerUsingSession is a Broker constructor. Use it like this:
session := gockle.NewSession(gocql.NewCluster("172.17.0.1")) defer db.Close() db := NewBrokerUsingSession(session) db.ListValues(...)
func (*BrokerCassa) Delete ¶
func (pdb *BrokerCassa) Delete(fromWhere sql.Expression) error
Delete - see the description in interface sql.Broker.ExecPut() Delete generates statement & binding for gocql Exec()
func (*BrokerCassa) Exec ¶
func (pdb *BrokerCassa) Exec(statement string, binding ...interface{}) error
Exec - see the description in interface sql.Broker.ExecPut() Exec runs statement (AS-IS) using gocql
func (*BrokerCassa) GetValue ¶
func (pdb *BrokerCassa) GetValue(query sql.Expression, reqObj interface{}) (found bool, err error)
GetValue - see the description in interface sql.Broker.GetValue() GetValue just iterate once for ListValues()
func (*BrokerCassa) ListValues ¶
func (pdb *BrokerCassa) ListValues(query sql.Expression) sql.ValIterator
ListValues retrieves an iterator for elements stored under the provided key. ListValues runs query (AS-IS) using gocql Scan Iterator.
func (*BrokerCassa) NewTxn ¶
func (pdb *BrokerCassa) NewTxn() sql.Txn
NewTxn creates a new Data Broker transaction. A transaction can hold multiple operations that are all committed to the data store together. After a transaction has been created, one or more operations (put or delete) can be added to the transaction before it is committed.
func (*BrokerCassa) Put ¶
func (pdb *BrokerCassa) Put(where sql.Expression, pointerToAStruct interface{}) error
Put - see the description in interface sql.Broker.Put(). Put generates statement & binding for gocql Exec(). Any error returned from gockle.Session.Exec is propagated upwards.
type ClientConfig ¶
type ClientConfig struct {
*gocql.ClusterConfig
}
ClientConfig wrapping gocql ClusterConfig
func ConfigToClientConfig ¶
func ConfigToClientConfig(ymlConfig *Config) (*ClientConfig, error)
ConfigToClientConfig transforms the yaml configuration into ClientConfig. If the configuration of endpoints is invalid, error ErrInvalidEndpointConfig is returned.
type Config ¶
type Config struct { // A list of host addresses of cluster nodes. Endpoints []string `json:"endpoints"` // port for Cassandra (default: 9042) Port int `json:"port"` // session timeout (default: 600ms) OpTimeout time.Duration `json:"op_timeout"` // initial session timeout, used during initial dial to server (default: 600ms) DialTimeout time.Duration `json:"dial_timeout"` // If not zero, gocql attempt to reconnect known DOWN nodes in every ReconnectSleep. RedialInterval time.Duration `json:"redial_interval"` // ProtoVersion sets the version of the native protocol to use, this will // enable features in the driver for specific protocol versions, generally this // should be set to a known version (2,3,4) for the cluster being connected to. // // If it is 0 or unset (the default) then the driver will attempt to discover the // highest supported protocol for the cluster. In clusters with nodes of different // versions the protocol selected is not defined (ie, it can be any of the supported in the cluster) ProtocolVersion int `json:"protocol_version"` //TLS used to configure TLS TLS TLS `json:"tls"` }
Config Configuration for Cassandra clients loaded from a configuration file
type Deps ¶
type Deps struct { infra.PluginDeps StatusCheck statuscheck.PluginStatusWriter // inject }
Deps is here to group injected dependencies of plugin to not mix with other plugin fields.
type ErrIterator ¶
type ErrIterator struct {
LastError error
}
ErrIterator is an iterator that stops immediately and just returns last error on Close()
func (*ErrIterator) Close ¶
func (it *ErrIterator) Close() error
Close the iterator. Note, the error is important (may occure during marshalling/un-marshalling)
func (*ErrIterator) GetNext ¶
func (it *ErrIterator) GetNext(outVal interface{}) (stop bool)
GetNext returns the following item from the result set. If data was returned, found is set to true. argument "outVal" can be: - pointer to structure - map
type Option ¶ added in v1.5.0
type Option func(*Plugin)
Option is a function that can be used in NewPlugin to customize Plugin.
type Plugin ¶
type Plugin struct { Deps // contains filtered or unexported fields }
Plugin implements Plugin interface therefore can be loaded with other plugins
func FromExistingSession ¶
FromExistingSession is used mainly for testing
type TLS ¶ added in v1.0.7
type TLS struct { Certfile string `json:"cert_path"` // client certificate Keyfile string `json:"key_path"` // client private key CAfile string `json:"ca_path"` // certificate authority EnableHostVerification bool `json:"enable_host_verification"` // whether to skip verification of server name & certificate Enabled bool `json:"enabled"` // enable/disable TLS }
TLS used to configure TLS
type ValIterator ¶
ValIterator is an iterator returned by ListValues call
func (*ValIterator) Close ¶
func (it *ValIterator) Close() error
Close the iterator. Note, the error is important (may occure during marshalling/un-marshalling)
func (*ValIterator) GetNext ¶
func (it *ValIterator) GetNext(outVal interface{}) (stop bool)
GetNext returns the following item from the result set. If data was returned, found is set to true. argument "outVal" can be: - pointer to structure - map