redis

package
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 8, 2017 License: Apache-2.0 Imports: 16 Imported by: 35

Documentation

Overview

Package redis is the implementation of the key-value Data Broker client API for the Redis key-value data store. See cn-infra/db/keyval for the definition of the key-value Data Broker client API.

The entity BytesConnectionRedis provides access to CRUD as well as event subscription API's.

+-----+    --> (Broker)   +------------------------+  -->  CRUD      +-------+
| app |                   |  BytesConnectionRedis  |                 | Redis |
+-----+    <-- (KeyValProtoWatcher)  +------------------------+  <--  events    +-------+

The code snippets below provide examples to help you get started. For simplicity, error handling is omitted.

Imports

import "github.com/ligato/cn-infra/db/keyval/kvproto"
import "github.com/ligato/cn-infra/db/keyval/redis"
import "github.com/ligato/cn-infra/utils/config"
import "github.com/ligato/cn-infra/logging/logroot"

Define client configuration based on your Redis installation.

  • Single Node var cfg redis.NodeConfig
  • Sentinel Enabled Cluster var cfg redis.SentinelConfig
  • Redis Cluster var cfg redis.ClusterConfig

See sample YAML configurations in

ligato/cn-infra/db/keyval/redis/examples/*.yaml

You can initialize any of the above configuration instances in memory, or load the settings from file using

err = config.ParseConfigFromYamlFile(configFile, &cfg)

You can also load any of the three configuration files using

var cfg interface{}
cfg, err := redis.LoadConfig(configFile)

Create connection from configuration

client, err := redis.CreateClient(cfg)
db, err := redis.NewBytesConnection(client, logroot.StandardLogger())

Create Brokers / Watchers from connection

// create broker/watcher that share the same connection pools.
bytesBroker := db.NewBroker("some-prefix")
bytesWatcher := db.NewWatcher("some-prefix")

// create broker/watcher that share the same connection pools,
// capable of processing protocol-buffer generated data.
wrapper := kvproto.NewProtoWrapper(db)
protoBroker := wrapper.NewBroker("some-prefix")
protoWatcher := wrapper.NewWatcher("some-prefix")

CRUD

// put
err = db.Put("some-key", []byte("some-value"))
err = db.Put("some-temp-key", []byte("valid for 20 seconds"),
             datasync.WithTTL(20*time.Second))

// get
value, found, revision, err := db.GetValue("some-key")
if found {
    ...
}

// Note: flight.Info implements proto.Message.
f := flight.Info{
        Airline:  "UA",
        Number:   1573,
        Priority: 1,
     }
err = protoBroker.Put("some-key-prefix", &f)
f2 := flight.Info{}
found, revision, err = protoBroker.GetValue("some-key-prefix", &f2)

// list
keyPrefix := "some"
kv, err := db.ListValues(keyPrefix)
for {
    kv, done := kv.GetNext()
    if done {
        break
    }
    key := kv.GetKey()
    value := kv.GetValue()
}

// delete
found, err := db.Delete("some-key")
// or, delete all keys matching the prefix "some-key".
found, err := db.Delete("some-key", datasync.WithPrefix())

// transaction
var txn keyval.BytesTxn = db.NewTxn()
txn.Put("key101", []byte("val 101")).Put("key102", []byte("val 102"))
txn.Put("key103", []byte("val 103")).Put("key104", []byte("val 104"))
err := txn.Commit()

Subscribe to key space events:

watchChan := make(chan keyval.BytesWatchResp, 10)
err = db.Watch(watchChan, "some-key")
for {
    select {
    case r := <-watchChan:
        switch r.GetChangeType() {
        case datasync.Put:
            log.Infof("KeyValProtoWatcher received %v: %s=%s", r.GetChangeType(),
                      r.GetKey(), string(r.GetValue()))
        case datasync.Delete:
            ...
        }
    ...
    }
}

NOTE: You must configure Redis for it to publish key space events. For example,

config SET notify-keyspace-events KA

See EVENT NOTIFICATION in https://raw.githubusercontent.com/antirez/redis/3.2/redis.conf

You can find detailed examples in

ligato/cn-infra/db/keyval/redis/examples/simple/
ligato/cn-infra/db/keyval/redis/examples/airport/

Resiliency

Connection/read/write time-outs, failover, reconnection and recovery are validated by running the airport example against a Redis Sentinel Cluster. Redis nodes are paused selectively to simulate server down:

$ docker-compose ps

Name                             Command                          State           Ports
----------------------------------------------------------------------------------------------
dockerredissentinel_master_1     docker-entrypoint.sh redis ...   Paused   6379/tcp
dockerredissentinel_slave_1      docker-entrypoint.sh redis ...   Up       6379/tcp
dockerredissentinel_slave_2      docker-entrypoint.sh redis ...   Up       6379/tcp
dockerredissentinel_sentinel_1   sentinel-entrypoint.sh           Up       26379/tcp, 6379/tcp
dockerredissentinel_sentinel_2   sentinel-entrypoint.sh           Up       26379/tcp, 6379/tcp
dockerredissentinel_sentinel_3   sentinel-entrypoint.sh           Up       26379/tcp, 6379/tcp

Index

Constants

View Source
const GoRedisNil = goredis.Nil

GoRedisNil go-redis return this error when Redis replies nil, .e.g. when key does not exist.

Variables

This section is empty.

Functions

func LoadConfig

func LoadConfig(configFile string) (cfg interface{}, err error)

LoadConfig Loads the given configFile and returns appropriate config instance.

Types

type BytesBrokerWatcherRedis

type BytesBrokerWatcherRedis struct {
	logging.Logger
	// contains filtered or unexported fields
}

BytesBrokerWatcherRedis uses BytesConnectionRedis to access the datastore. The connection can be shared among multiple BytesBrokerWatcherRedis. BytesBrokerWatcherRedis allows to define a keyPrefix that is prepended to all keys in its methods in order to shorten keys used in arguments.

func (*BytesBrokerWatcherRedis) Delete

func (pdb *BytesBrokerWatcherRedis) Delete(match string, opts ...datasync.DelOption) (found bool, err error)

Delete calls Delete function of BytesConnectionRedis. Prefix will be prepended to key argument when searching.

func (*BytesBrokerWatcherRedis) GetPrefix

func (pdb *BytesBrokerWatcherRedis) GetPrefix() string

GetPrefix returns the prefix associated with this BytesBrokerWatcherRedis.

func (*BytesBrokerWatcherRedis) GetValue

func (pdb *BytesBrokerWatcherRedis) GetValue(key string) (data []byte, found bool, revision int64, err error)

GetValue call GetValue function of BytesConnectionRedis. Prefix will be prepended to key argument when searching.

func (*BytesBrokerWatcherRedis) ListKeys

ListKeys calls ListKeys function of BytesConnectionRedis. Prefix will be prepended to key argument when searching. The returned keys, however, will have the prefix trimmed. When done traversing, you must close the iterator by calling its Close() method.

func (*BytesBrokerWatcherRedis) ListValues

ListValues calls ListValues function of BytesConnectionRedis. Prefix will be prepended to key argument when searching. The returned keys, however, will have the prefix trimmed. When done traversing, you must close the iterator by calling its Close() method.

func (*BytesBrokerWatcherRedis) NewTxn

func (pdb *BytesBrokerWatcherRedis) NewTxn() keyval.BytesTxn

NewTxn creates new transaction. Prefix will be prepended to key argument.

func (*BytesBrokerWatcherRedis) Put

func (pdb *BytesBrokerWatcherRedis) Put(key string, data []byte, opts ...datasync.PutOption) error

Put calls Put function of BytesConnectionRedis. Prefix will be prepended to key argument.

func (*BytesBrokerWatcherRedis) Watch

func (pdb *BytesBrokerWatcherRedis) Watch(resp func(keyval.BytesWatchResp), keys ...string) error

Watch starts subscription for changes associated with the selected key. Watch events will be delivered to respChan.

type BytesConnectionRedis

type BytesConnectionRedis struct {
	logging.Logger
	// contains filtered or unexported fields
}

BytesConnectionRedis allows to store, read and watch values from Redis.

func NewBytesConnection

func NewBytesConnection(client Client, log logging.Logger) (*BytesConnectionRedis, error)

NewBytesConnection creates a new instance of BytesConnectionRedis using the provided Client (be it node, cluster, or sentinel client)

func (*BytesConnectionRedis) Close

func (db *BytesConnectionRedis) Close() error

Close closes the connection to redis.

func (*BytesConnectionRedis) Delete

func (db *BytesConnectionRedis) Delete(key string, opts ...datasync.DelOption) (found bool, err error)

Delete deletes all the keys that start with the given match string.

func (*BytesConnectionRedis) GetValue

func (db *BytesConnectionRedis) GetValue(key string) (data []byte, found bool, revision int64, err error)

GetValue retrieves the value of the key from Redis.

func (*BytesConnectionRedis) ListKeys

func (db *BytesConnectionRedis) ListKeys(match string) (keyval.BytesKeyIterator, error)

ListKeys returns an iterator used to traverse keys that start with the given match string. When done traversing, you must close the iterator by calling its Close() method.

func (*BytesConnectionRedis) ListValues

func (db *BytesConnectionRedis) ListValues(match string) (keyval.BytesKeyValIterator, error)

ListValues returns an iterator used to traverse key value pairs for all the keys that start with the given match string. When done traversing, you must close the iterator by calling its Close() method.

func (*BytesConnectionRedis) NewBroker

func (db *BytesConnectionRedis) NewBroker(prefix string) keyval.BytesBroker

NewBroker creates a new CRUD proxy instance to redis using through BytesConnectionRedis. The given prefix will be prepended to key argument in all calls. Specify empty string ("") if not wanting to use prefix.

func (*BytesConnectionRedis) NewBrokerWatcher

func (db *BytesConnectionRedis) NewBrokerWatcher(prefix string) *BytesBrokerWatcherRedis

NewBrokerWatcher creates a new CRUD + KeyValProtoWatcher proxy instance to redis using through BytesConnectionRedis. The given prefix will be prepended to key argument in all calls. Specify empty string ("") if not wanting to use prefix.

func (*BytesConnectionRedis) NewTxn

func (db *BytesConnectionRedis) NewTxn() keyval.BytesTxn

NewTxn creates new transaction.

func (*BytesConnectionRedis) NewWatcher

func (db *BytesConnectionRedis) NewWatcher(prefix string) keyval.BytesWatcher

NewWatcher creates a new KeyValProtoWatcher proxy instance to redis using through BytesConnectionRedis. The given prefix will be prepended to key argument in all calls. Specify empty string ("") if not wanting to use prefix.

func (*BytesConnectionRedis) Put

func (db *BytesConnectionRedis) Put(key string, data []byte, opts ...datasync.PutOption) error

Put sets the key/value in Redis data store. Replaces value if the key already exists.

func (*BytesConnectionRedis) Watch

func (db *BytesConnectionRedis) Watch(resp func(keyval.BytesWatchResp), keys ...string) error

Watch starts subscription for changes associated with the selected key. Watch events will be delivered to respChan. Subscription can be canceled by StopWatch call.

type BytesWatchDelResp

type BytesWatchDelResp struct {
	// contains filtered or unexported fields
}

BytesWatchDelResp is sent when a key-value pair has been removed

func NewBytesWatchDelResp

func NewBytesWatchDelResp(key string, revision int64) *BytesWatchDelResp

NewBytesWatchDelResp creates an instance of BytesWatchDelResp

func (*BytesWatchDelResp) GetChangeType

func (resp *BytesWatchDelResp) GetChangeType() datasync.PutDel

GetChangeType returns "Delete" for BytesWatchPutResp

func (*BytesWatchDelResp) GetKey

func (resp *BytesWatchDelResp) GetKey() string

GetKey returns the key that has been deleted

func (*BytesWatchDelResp) GetRevision

func (resp *BytesWatchDelResp) GetRevision() int64

GetRevision returns the revision associated with the delete operation

func (*BytesWatchDelResp) GetValue

func (resp *BytesWatchDelResp) GetValue() []byte

GetValue returns nil for BytesWatchDelResp

type BytesWatchPutResp

type BytesWatchPutResp struct {
	// contains filtered or unexported fields
}

BytesWatchPutResp is sent when new key-value pair has been inserted or the value is updated

func NewBytesWatchPutResp

func NewBytesWatchPutResp(key string, value []byte, revision int64) *BytesWatchPutResp

NewBytesWatchPutResp creates an instance of BytesWatchPutResp

func (*BytesWatchPutResp) GetChangeType

func (resp *BytesWatchPutResp) GetChangeType() datasync.PutDel

GetChangeType returns "Put" for BytesWatchPutResp

func (*BytesWatchPutResp) GetKey

func (resp *BytesWatchPutResp) GetKey() string

GetKey returns the key that has been inserted

func (*BytesWatchPutResp) GetRevision

func (resp *BytesWatchPutResp) GetRevision() int64

GetRevision returns the revision associated with create action

func (*BytesWatchPutResp) GetValue

func (resp *BytesWatchPutResp) GetValue() []byte

GetValue returns the value that has been inserted

type Client

type Client interface {
	// The easiest way to adapt Cmdable interface is to just embed it.
	goredis.Cmdable

	// Declare these additional methods to enable access to them through this interface
	Close() error
	TxPipeline() goredis.Pipeliner
	PSubscribe(channels ...string) *goredis.PubSub
}

Client Common interface to adapt all types of Redis clients

func CreateClient

func CreateClient(config interface{}) (Client, error)

CreateClient Creates an appropriate client according to the configuration parameter.

func CreateClusterClient

func CreateClusterClient(config ClusterConfig) (Client, error)

CreateClusterClient Creates a client that will connect to a redis cluster.

func CreateNodeClient

func CreateNodeClient(config NodeConfig) (Client, error)

CreateNodeClient Creates a client that will connect to a redis node, like master and/or slave.

func CreateSentinelClient

func CreateSentinelClient(config SentinelConfig) (Client, error)

CreateSentinelClient Creates a failover client that will connect to redis sentinels.

type ClientConfig

type ClientConfig struct {
	Password     string        `json:"password"`      // Password for authentication, if required
	DialTimeout  time.Duration `json:"dial-timeout"`  // Dial timeout for establishing new connections. Default is 5 seconds.
	ReadTimeout  time.Duration `json:"read-timeout"`  // Timeout for socket reads. If reached, commands will fail with a timeout instead of blocking. Default is 3 seconds.
	WriteTimeout time.Duration `json:"write-timeout"` // Timeout for socket writes. If reached, commands will fail with a timeout instead of blocking. Default is ReadTimeout.
	Pool         PoolConfig    `json:"pool"`          // Connection pool configuration
}

ClientConfig Configuration common to all types of Redis clients

type ClusterConfig

type ClusterConfig struct {
	Endpoints              []string `json:"endpoints"`             // A seed list of host:port addresses of cluster nodes.
	EnableReadQueryOnSlave bool     `json:"enable-query-on-slave"` // Enables read only queries on slave nodes.

	// The maximum number of redirects before giving up.
	// Command is retried on network errors and MOVED/ASK redirects. Default is 16.
	MaxRedirects int `json:"max-rediects"`
	// Allows routing read-only commands to the closest master or slave node.
	RouteByLatency bool `json:"route-by-latency"`

	ClientConfig
}

ClusterConfig Cluster client configuration

type Deps

type Deps struct {
	local.PluginInfraDeps //inject
}

Deps is here to group injected dependencies of plugin to not mix with other plugin fields.

type NodeConfig

type NodeConfig struct {
	Endpoint               string `json:"endpoint"`              // host:port address of a Redis node
	DB                     int    `json:"db"`                    // Database to be selected after connecting to the server.
	EnableReadQueryOnSlave bool   `json:"enable-query-on-slave"` // Enables read only queries on slave nodes.
	TLS                    TLS    `json:"tls"`                   // TLS configuration -- only applies to node client.
	ClientConfig
}

NodeConfig Node client configuration

type Plugin

type Plugin struct {
	Deps
	*plugin.Skeleton
	// contains filtered or unexported fields
}

Plugin implements Plugin interface therefore can be loaded with other plugins

func (*Plugin) Close

func (p *Plugin) Close() error

Close resources

func (*Plugin) Disabled

func (p *Plugin) Disabled() (disabled bool)

Disabled if the plugin was not found

func (*Plugin) Init

func (p *Plugin) Init() error

Init is called on plugin startup. It establishes the connection to redis.

type PoolConfig

type PoolConfig struct {
	// Maximum number of socket connections.
	// Default is 10 connections per every CPU as reported by runtime.NumCPU.
	PoolSize int `json:"max-connections"`
	// Amount of time, in seconds, client waits for connection if all connections
	// are busy before returning an error.
	// Default is ReadTimeout + 1 second.
	PoolTimeout time.Duration `json:"busy-timeout"`
	// Amount of time, in seconds, after which client closes idle connections.
	// Should be less than server's timeout.
	// Default is 5 minutes.
	IdleTimeout time.Duration `json:"idle-timeout"`
	// Frequency of idle checks.
	// Default is 1 minute.
	// When minus value is set, then idle check is disabled.
	IdleCheckFrequency time.Duration `json:"idle-check-frequency"`
}

PoolConfig Configuration of go-redis connection pool

type SentinelConfig

type SentinelConfig struct {
	Endpoints  []string `json:"endpoints"`   // A seed list of host:port addresses sentinel nodes.
	MasterName string   `json:"master-name"` // The sentinel master name.
	DB         int      `json:"db"`          // Database to be selected after connecting to the server.

	ClientConfig
}

SentinelConfig Sentinel client configuration

type TLS

type TLS struct {
	Enabled    bool   `json:"enabled"`     // enable/disable TLS
	SkipVerify bool   `json:"skip-verify"` // whether to skip verification of server name & certificate
	Certfile   string `json:"cert-file"`   // client certificate
	Keyfile    string `json:"key-file"`    // client private key
	CAfile     string `json:"ca-file"`     // certificate authority
}

TLS configures TLS properties

type Txn

type Txn struct {
	// contains filtered or unexported fields
}

Txn allows to group operations into the transaction. Transaction executes multiple operations in a more efficient way in contrast to executing them one by one.

func (*Txn) Commit

func (tx *Txn) Commit() (err error)

Commit commits all operations in a transaction to the data store. Commit is atomic - either all operations in the transaction are committed to the data store, or none of them.

func (*Txn) Delete

func (tx *Txn) Delete(key string) keyval.BytesTxn

Delete adds a new 'delete' operation to a previously created transaction.

func (*Txn) Put

func (tx *Txn) Put(key string, value []byte) keyval.BytesTxn

Put adds a new 'put' operation to a previously created transaction. If the key does not exist in the data store, a new key-value item will be added to the data store. If key exists in the data store, the existing value will be overwritten with the value from this operation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL