Documentation ¶
Overview ¶
Package redis is the implementation of the key-value Data Broker client API for the Redis key-value data store. See cn-infra/db/keyval for the definition of the key-value Data Broker client API.
The entity BytesConnectionRedis provides access to CRUD as well as event subscription API's.
+-----+ --> (Broker) +------------------------+ --> CRUD +-------+ | app | | BytesConnectionRedis | | Redis | +-----+ <-- (KeyValProtoWatcher) +------------------------+ <-- events +-------+
The code snippets below provide examples to help you get started. For simplicity, error handling is omitted.
Imports
import "github.com/ligato/cn-infra/db/keyval/kvproto" import "github.com/ligato/cn-infra/db/keyval/redis" import "github.com/ligato/cn-infra/utils/config" import "github.com/ligato/cn-infra/logging/logroot"
Define client configuration based on your Redis installation.
- Single Node var cfg redis.NodeConfig
- Sentinel Enabled Cluster var cfg redis.SentinelConfig
- Redis Cluster var cfg redis.ClusterConfig
See sample YAML configurations in
ligato/cn-infra/db/keyval/redis/examples/*.yaml
You can initialize any of the above configuration instances in memory, or load the settings from file using
err = config.ParseConfigFromYamlFile(configFile, &cfg)
You can also load any of the three configuration files using
var cfg interface{} cfg, err := redis.LoadConfig(configFile)
Create connection from configuration
client, err := redis.CreateClient(cfg) db, err := redis.NewBytesConnection(client, logroot.StandardLogger())
Create Brokers / Watchers from connection
// create broker/watcher that share the same connection pools. bytesBroker := db.NewBroker("some-prefix") bytesWatcher := db.NewWatcher("some-prefix") // create broker/watcher that share the same connection pools, // capable of processing protocol-buffer generated data. wrapper := kvproto.NewProtoWrapper(db) protoBroker := wrapper.NewBroker("some-prefix") protoWatcher := wrapper.NewWatcher("some-prefix")
CRUD
// put err = db.Put("some-key", []byte("some-value")) err = db.Put("some-temp-key", []byte("valid for 20 seconds"), datasync.WithTTL(20*time.Second)) // get value, found, revision, err := db.GetValue("some-key") if found { ... } // Note: flight.Info implements proto.Message. f := flight.Info{ Airline: "UA", Number: 1573, Priority: 1, } err = protoBroker.Put("some-key-prefix", &f) f2 := flight.Info{} found, revision, err = protoBroker.GetValue("some-key-prefix", &f2) // list keyPrefix := "some" kv, err := db.ListValues(keyPrefix) for { kv, done := kv.GetNext() if done { break } key := kv.GetKey() value := kv.GetValue() } // delete found, err := db.Delete("some-key") // or, delete all keys matching the prefix "some-key". found, err := db.Delete("some-key", datasync.WithPrefix()) // transaction var txn keyval.BytesTxn = db.NewTxn() txn.Put("key101", []byte("val 101")).Put("key102", []byte("val 102")) txn.Put("key103", []byte("val 103")).Put("key104", []byte("val 104")) err := txn.Commit()
Subscribe to key space events:
watchChan := make(chan keyval.BytesWatchResp, 10) err = db.Watch(watchChan, "some-key") for { select { case r := <-watchChan: switch r.GetChangeType() { case datasync.Put: log.Infof("KeyValProtoWatcher received %v: %s=%s", r.GetChangeType(), r.GetKey(), string(r.GetValue())) case datasync.Delete: ... } ... } }
NOTE: You must configure Redis for it to publish key space events. For example,
config SET notify-keyspace-events KA
See EVENT NOTIFICATION in https://raw.githubusercontent.com/antirez/redis/3.2/redis.conf
You can find detailed examples in
ligato/cn-infra/db/keyval/redis/examples/simple/ ligato/cn-infra/db/keyval/redis/examples/airport/
Resiliency ¶
Connection/read/write time-outs, failover, reconnection and recovery are validated by running the airport example against a Redis Sentinel Cluster. Redis nodes are paused selectively to simulate server down:
$ docker-compose ps Name Command State Ports ---------------------------------------------------------------------------------------------- dockerredissentinel_master_1 docker-entrypoint.sh redis ... Paused 6379/tcp dockerredissentinel_slave_1 docker-entrypoint.sh redis ... Up 6379/tcp dockerredissentinel_slave_2 docker-entrypoint.sh redis ... Up 6379/tcp dockerredissentinel_sentinel_1 sentinel-entrypoint.sh Up 26379/tcp, 6379/tcp dockerredissentinel_sentinel_2 sentinel-entrypoint.sh Up 26379/tcp, 6379/tcp dockerredissentinel_sentinel_3 sentinel-entrypoint.sh Up 26379/tcp, 6379/tcp
Index ¶
- Constants
- func LoadConfig(configFile string) (cfg interface{}, err error)
- type BytesBrokerWatcherRedis
- func (pdb *BytesBrokerWatcherRedis) Delete(match string, opts ...datasync.DelOption) (found bool, err error)
- func (pdb *BytesBrokerWatcherRedis) GetPrefix() string
- func (pdb *BytesBrokerWatcherRedis) GetValue(key string) (data []byte, found bool, revision int64, err error)
- func (pdb *BytesBrokerWatcherRedis) ListKeys(match string) (keyval.BytesKeyIterator, error)
- func (pdb *BytesBrokerWatcherRedis) ListValues(match string) (keyval.BytesKeyValIterator, error)
- func (pdb *BytesBrokerWatcherRedis) NewTxn() keyval.BytesTxn
- func (pdb *BytesBrokerWatcherRedis) Put(key string, data []byte, opts ...datasync.PutOption) error
- func (pdb *BytesBrokerWatcherRedis) Watch(resp func(keyval.BytesWatchResp), keys ...string) error
- type BytesConnectionRedis
- func (db *BytesConnectionRedis) Close() error
- func (db *BytesConnectionRedis) Delete(key string, opts ...datasync.DelOption) (found bool, err error)
- func (db *BytesConnectionRedis) GetValue(key string) (data []byte, found bool, revision int64, err error)
- func (db *BytesConnectionRedis) ListKeys(match string) (keyval.BytesKeyIterator, error)
- func (db *BytesConnectionRedis) ListValues(match string) (keyval.BytesKeyValIterator, error)
- func (db *BytesConnectionRedis) NewBroker(prefix string) keyval.BytesBroker
- func (db *BytesConnectionRedis) NewBrokerWatcher(prefix string) *BytesBrokerWatcherRedis
- func (db *BytesConnectionRedis) NewTxn() keyval.BytesTxn
- func (db *BytesConnectionRedis) NewWatcher(prefix string) keyval.BytesWatcher
- func (db *BytesConnectionRedis) Put(key string, data []byte, opts ...datasync.PutOption) error
- func (db *BytesConnectionRedis) Watch(resp func(keyval.BytesWatchResp), keys ...string) error
- type BytesWatchDelResp
- type BytesWatchPutResp
- type Client
- type ClientConfig
- type ClusterConfig
- type Deps
- type NodeConfig
- type Plugin
- type PoolConfig
- type SentinelConfig
- type TLS
- type Txn
Constants ¶
GoRedisNil go-redis return this error when Redis replies nil, .e.g. when key does not exist.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type BytesBrokerWatcherRedis ¶
BytesBrokerWatcherRedis uses BytesConnectionRedis to access the datastore. The connection can be shared among multiple BytesBrokerWatcherRedis. BytesBrokerWatcherRedis allows to define a keyPrefix that is prepended to all keys in its methods in order to shorten keys used in arguments.
func (*BytesBrokerWatcherRedis) Delete ¶
func (pdb *BytesBrokerWatcherRedis) Delete(match string, opts ...datasync.DelOption) (found bool, err error)
Delete calls Delete function of BytesConnectionRedis. Prefix will be prepended to key argument when searching.
func (*BytesBrokerWatcherRedis) GetPrefix ¶
func (pdb *BytesBrokerWatcherRedis) GetPrefix() string
GetPrefix returns the prefix associated with this BytesBrokerWatcherRedis.
func (*BytesBrokerWatcherRedis) GetValue ¶
func (pdb *BytesBrokerWatcherRedis) GetValue(key string) (data []byte, found bool, revision int64, err error)
GetValue call GetValue function of BytesConnectionRedis. Prefix will be prepended to key argument when searching.
func (*BytesBrokerWatcherRedis) ListKeys ¶
func (pdb *BytesBrokerWatcherRedis) ListKeys(match string) (keyval.BytesKeyIterator, error)
ListKeys calls ListKeys function of BytesConnectionRedis. Prefix will be prepended to key argument when searching. The returned keys, however, will have the prefix trimmed. When done traversing, you must close the iterator by calling its Close() method.
func (*BytesBrokerWatcherRedis) ListValues ¶
func (pdb *BytesBrokerWatcherRedis) ListValues(match string) (keyval.BytesKeyValIterator, error)
ListValues calls ListValues function of BytesConnectionRedis. Prefix will be prepended to key argument when searching. The returned keys, however, will have the prefix trimmed. When done traversing, you must close the iterator by calling its Close() method.
func (*BytesBrokerWatcherRedis) NewTxn ¶
func (pdb *BytesBrokerWatcherRedis) NewTxn() keyval.BytesTxn
NewTxn creates new transaction. Prefix will be prepended to key argument.
func (*BytesBrokerWatcherRedis) Put ¶
Put calls Put function of BytesConnectionRedis. Prefix will be prepended to key argument.
func (*BytesBrokerWatcherRedis) Watch ¶
func (pdb *BytesBrokerWatcherRedis) Watch(resp func(keyval.BytesWatchResp), keys ...string) error
Watch starts subscription for changes associated with the selected key. Watch events will be delivered to respChan.
type BytesConnectionRedis ¶
BytesConnectionRedis allows to store, read and watch values from Redis.
func NewBytesConnection ¶
func NewBytesConnection(client Client, log logging.Logger) (*BytesConnectionRedis, error)
NewBytesConnection creates a new instance of BytesConnectionRedis using the provided Client (be it node, cluster, or sentinel client)
func (*BytesConnectionRedis) Close ¶
func (db *BytesConnectionRedis) Close() error
Close closes the connection to redis.
func (*BytesConnectionRedis) Delete ¶
func (db *BytesConnectionRedis) Delete(key string, opts ...datasync.DelOption) (found bool, err error)
Delete deletes all the keys that start with the given match string.
func (*BytesConnectionRedis) GetValue ¶
func (db *BytesConnectionRedis) GetValue(key string) (data []byte, found bool, revision int64, err error)
GetValue retrieves the value of the key from Redis.
func (*BytesConnectionRedis) ListKeys ¶
func (db *BytesConnectionRedis) ListKeys(match string) (keyval.BytesKeyIterator, error)
ListKeys returns an iterator used to traverse keys that start with the given match string. When done traversing, you must close the iterator by calling its Close() method.
func (*BytesConnectionRedis) ListValues ¶
func (db *BytesConnectionRedis) ListValues(match string) (keyval.BytesKeyValIterator, error)
ListValues returns an iterator used to traverse key value pairs for all the keys that start with the given match string. When done traversing, you must close the iterator by calling its Close() method.
func (*BytesConnectionRedis) NewBroker ¶
func (db *BytesConnectionRedis) NewBroker(prefix string) keyval.BytesBroker
NewBroker creates a new CRUD proxy instance to redis using through BytesConnectionRedis. The given prefix will be prepended to key argument in all calls. Specify empty string ("") if not wanting to use prefix.
func (*BytesConnectionRedis) NewBrokerWatcher ¶
func (db *BytesConnectionRedis) NewBrokerWatcher(prefix string) *BytesBrokerWatcherRedis
NewBrokerWatcher creates a new CRUD + KeyValProtoWatcher proxy instance to redis using through BytesConnectionRedis. The given prefix will be prepended to key argument in all calls. Specify empty string ("") if not wanting to use prefix.
func (*BytesConnectionRedis) NewTxn ¶
func (db *BytesConnectionRedis) NewTxn() keyval.BytesTxn
NewTxn creates new transaction.
func (*BytesConnectionRedis) NewWatcher ¶
func (db *BytesConnectionRedis) NewWatcher(prefix string) keyval.BytesWatcher
NewWatcher creates a new KeyValProtoWatcher proxy instance to redis using through BytesConnectionRedis. The given prefix will be prepended to key argument in all calls. Specify empty string ("") if not wanting to use prefix.
func (*BytesConnectionRedis) Put ¶
Put sets the key/value in Redis data store. Replaces value if the key already exists.
func (*BytesConnectionRedis) Watch ¶
func (db *BytesConnectionRedis) Watch(resp func(keyval.BytesWatchResp), keys ...string) error
Watch starts subscription for changes associated with the selected key. Watch events will be delivered to respChan. Subscription can be canceled by StopWatch call.
type BytesWatchDelResp ¶
type BytesWatchDelResp struct {
// contains filtered or unexported fields
}
BytesWatchDelResp is sent when a key-value pair has been removed
func NewBytesWatchDelResp ¶
func NewBytesWatchDelResp(key string, revision int64) *BytesWatchDelResp
NewBytesWatchDelResp creates an instance of BytesWatchDelResp
func (*BytesWatchDelResp) GetChangeType ¶
func (resp *BytesWatchDelResp) GetChangeType() datasync.PutDel
GetChangeType returns "Delete" for BytesWatchPutResp
func (*BytesWatchDelResp) GetKey ¶
func (resp *BytesWatchDelResp) GetKey() string
GetKey returns the key that has been deleted
func (*BytesWatchDelResp) GetRevision ¶
func (resp *BytesWatchDelResp) GetRevision() int64
GetRevision returns the revision associated with the delete operation
func (*BytesWatchDelResp) GetValue ¶
func (resp *BytesWatchDelResp) GetValue() []byte
GetValue returns nil for BytesWatchDelResp
type BytesWatchPutResp ¶
type BytesWatchPutResp struct {
// contains filtered or unexported fields
}
BytesWatchPutResp is sent when new key-value pair has been inserted or the value is updated
func NewBytesWatchPutResp ¶
func NewBytesWatchPutResp(key string, value []byte, revision int64) *BytesWatchPutResp
NewBytesWatchPutResp creates an instance of BytesWatchPutResp
func (*BytesWatchPutResp) GetChangeType ¶
func (resp *BytesWatchPutResp) GetChangeType() datasync.PutDel
GetChangeType returns "Put" for BytesWatchPutResp
func (*BytesWatchPutResp) GetKey ¶
func (resp *BytesWatchPutResp) GetKey() string
GetKey returns the key that has been inserted
func (*BytesWatchPutResp) GetRevision ¶
func (resp *BytesWatchPutResp) GetRevision() int64
GetRevision returns the revision associated with create action
func (*BytesWatchPutResp) GetValue ¶
func (resp *BytesWatchPutResp) GetValue() []byte
GetValue returns the value that has been inserted
type Client ¶
type Client interface { // The easiest way to adapt Cmdable interface is to just embed it. goredis.Cmdable // Declare these additional methods to enable access to them through this interface Close() error TxPipeline() goredis.Pipeliner PSubscribe(channels ...string) *goredis.PubSub }
Client Common interface to adapt all types of Redis clients
func CreateClient ¶
CreateClient Creates an appropriate client according to the configuration parameter.
func CreateClusterClient ¶
func CreateClusterClient(config ClusterConfig) (Client, error)
CreateClusterClient Creates a client that will connect to a redis cluster.
func CreateNodeClient ¶
func CreateNodeClient(config NodeConfig) (Client, error)
CreateNodeClient Creates a client that will connect to a redis node, like master and/or slave.
func CreateSentinelClient ¶
func CreateSentinelClient(config SentinelConfig) (Client, error)
CreateSentinelClient Creates a failover client that will connect to redis sentinels.
type ClientConfig ¶
type ClientConfig struct { Password string `json:"password"` // Password for authentication, if required DialTimeout time.Duration `json:"dial-timeout"` // Dial timeout for establishing new connections. Default is 5 seconds. ReadTimeout time.Duration `json:"read-timeout"` // Timeout for socket reads. If reached, commands will fail with a timeout instead of blocking. Default is 3 seconds. WriteTimeout time.Duration `json:"write-timeout"` // Timeout for socket writes. If reached, commands will fail with a timeout instead of blocking. Default is ReadTimeout. Pool PoolConfig `json:"pool"` // Connection pool configuration }
ClientConfig Configuration common to all types of Redis clients
type ClusterConfig ¶
type ClusterConfig struct { Endpoints []string `json:"endpoints"` // A seed list of host:port addresses of cluster nodes. EnableReadQueryOnSlave bool `json:"enable-query-on-slave"` // Enables read only queries on slave nodes. // The maximum number of redirects before giving up. // Command is retried on network errors and MOVED/ASK redirects. Default is 16. MaxRedirects int `json:"max-rediects"` // Allows routing read-only commands to the closest master or slave node. RouteByLatency bool `json:"route-by-latency"` ClientConfig }
ClusterConfig Cluster client configuration
type Deps ¶
type Deps struct {
local.PluginInfraDeps //inject
}
Deps is here to group injected dependencies of plugin to not mix with other plugin fields.
type NodeConfig ¶
type NodeConfig struct { Endpoint string `json:"endpoint"` // host:port address of a Redis node DB int `json:"db"` // Database to be selected after connecting to the server. EnableReadQueryOnSlave bool `json:"enable-query-on-slave"` // Enables read only queries on slave nodes. TLS TLS `json:"tls"` // TLS configuration -- only applies to node client. ClientConfig }
NodeConfig Node client configuration
type Plugin ¶
Plugin implements Plugin interface therefore can be loaded with other plugins
func (*Plugin) Disabled ¶
Disabled if the plugin was not found
type PoolConfig ¶
type PoolConfig struct { // Maximum number of socket connections. // Default is 10 connections per every CPU as reported by runtime.NumCPU. PoolSize int `json:"max-connections"` // Amount of time, in seconds, client waits for connection if all connections // are busy before returning an error. // Default is ReadTimeout + 1 second. PoolTimeout time.Duration `json:"busy-timeout"` // Amount of time, in seconds, after which client closes idle connections. // Should be less than server's timeout. // Default is 5 minutes. IdleTimeout time.Duration `json:"idle-timeout"` // Frequency of idle checks. // Default is 1 minute. // When minus value is set, then idle check is disabled. IdleCheckFrequency time.Duration `json:"idle-check-frequency"` }
PoolConfig Configuration of go-redis connection pool
type SentinelConfig ¶
type SentinelConfig struct { Endpoints []string `json:"endpoints"` // A seed list of host:port addresses sentinel nodes. MasterName string `json:"master-name"` // The sentinel master name. DB int `json:"db"` // Database to be selected after connecting to the server. ClientConfig }
SentinelConfig Sentinel client configuration
type TLS ¶
type TLS struct { Enabled bool `json:"enabled"` // enable/disable TLS SkipVerify bool `json:"skip-verify"` // whether to skip verification of server name & certificate Certfile string `json:"cert-file"` // client certificate Keyfile string `json:"key-file"` // client private key CAfile string `json:"ca-file"` // certificate authority }
TLS configures TLS properties
type Txn ¶
type Txn struct {
// contains filtered or unexported fields
}
Txn allows to group operations into the transaction. Transaction executes multiple operations in a more efficient way in contrast to executing them one by one.
func (*Txn) Commit ¶
Commit commits all operations in a transaction to the data store. Commit is atomic - either all operations in the transaction are committed to the data store, or none of them.
func (*Txn) Delete ¶
Delete adds a new 'delete' operation to a previously created transaction.
func (*Txn) Put ¶
Put adds a new 'put' operation to a previously created transaction. If the key does not exist in the data store, a new key-value item will be added to the data store. If key exists in the data store, the existing value will be overwritten with the value from this operation.