redis

package
v2.0.0+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2019 License: Apache-2.0 Imports: 19 Imported by: 0

README

Redis is the implementation of the key-value Data Broker client API for the Redis key-value data store. See cn-infra/db/keyval for the definition of the key-value Data Broker client API.

The entity BytesConnectionRedis provides access to CRUD as well as event subscription API's.

   +-----+   (Broker)   +------------------------+ -->  CRUD      +-------+ -->
   | app |                   |  BytesConnectionRedis  |                 | Redis |
   +-----+    <-- (KeyValProtoWatcher)  +------------------------+  <--  events    +-------+

How to use Redis

The code snippets below provide examples to help you get started. For simplicity, error handling is omitted.

Need to import following dependencies
    import "github.com/ligato/cn-infra/db/keyval/kvproto"
    import "github.com/ligato/cn-infra/db/keyval/redis"
    import "github.com/ligato/cn-infra/utils/config"
    import "github.com/ligato/cn-infra/logging/logrus"
Define client configuration based on your Redis installation.
  • Single Node var cfg redis.NodeConfig
  • Sentinel Enabled Cluster var cfg redis.SentinelConfig
  • Redis Cluster var cfg redis.ClusterConfig
  • See sample YAML configurations (*.yaml files)

You can initialize any of the above configuration instances in memory, or load the settings from file using

   err = config.ParseConfigFromYamlFile(configFile, &cfg)

You can also load any of the three configuration files using

   var cfg interface{}
   cfg, err := redis.LoadConfig(configFile)
Create connection from configuration
   client, err := redis.CreateClient(cfg)
   db, err := redis.NewBytesConnection(client, logrus.DefaultLogger())
Create Brokers / Watchers from connection
   //create broker/watcher that share the same connection pools.
   bytesBroker := db.NewBroker("some-prefix")
   bytesWatcher := db.NewWatcher("some-prefix")

   // create broker/watcher that share the same connection pools,
   // capable of processing protocol-buffer generated data.
   wrapper := kvproto.NewProtoWrapper(db)
   protoBroker := wrapper.NewBroker("some-prefix")
   protoWatcher := wrapper.NewWatcher("some-prefix")
Perform CRUD operations
   // put
   err = db.Put("some-key", []byte("some-value"))
   err = db.Put("some-temp-key", []byte("valid for 20 seconds"),
                 datasync.WithTTL(20*time.Second))

   // get
   value, found, revision, err := db.GetValue("some-key")
   if found {
       ...
   }

   // Note: flight.Info implements proto.Message.
   f := flight.Info{
           Airline:  "UA",
           Number:   1573,
           Priority: 1,
        }
   err = protoBroker.Put("some-key-prefix", &f)
   f2 := flight.Info{}
   found, revision, err = protoBroker.GetValue("some-key-prefix", &f2)

   // list
   keyPrefix := "some"
   kv, err := db.ListValues(keyPrefix)
   for {
       kv, done := kv.GetNext()
       if done {
           break
       }
        key := kv.GetKey()
       value := kv.GetValue()
   }

   // delete
   found, err := db.Delete("some-key")
   // or, delete all keys matching the prefix "some-key".
   found, err := db.Delete("some-key", datasync.WithPrefix())

   // transaction
   var txn keyval.BytesTxn = db.NewTxn()
   txn.Put("key101", []byte("val 101")).Put("key102", []byte("val 102"))
   txn.Put("key103", []byte("val 103")).Put("key104", []byte("val 104"))
   err := txn.Commit()
Subscribe to key space events
   watchChan := make(chan keyval.BytesWatchResp, 10)
   err = db.Watch(watchChan, "some-key")
   for {
       select {
        case r := <-watchChan:
           switch r.GetChangeType() {
           case datasync.Put:
               log.Infof("KeyValProtoWatcher received %v: %s=%s", r.GetChangeType(),
                         r.GetKey(), string(r.GetValue()))
           case datasync.Delete:
               ...
           }
       ...
       }
  }

NOTE: You must configure Redis for it to publish key space events.

   config SET notify-keyspace-events KA

See EVENT NOTIFICATION for more details.

You can find detailed examples in

Resiliency

Connection/read/write time-outs, failover, reconnection and recovery are validated by running the airport example against a Redis Sentinel Cluster. Redis nodes are paused selectively to simulate server down:

$ docker-compose ps
Name Command State Ports
dockerredissentinel_master_1 docker-entrypoint.sh redis ... Paused 6379/tcp
dockerredissentinel_slave_1 docker-entrypoint.sh redis ... Up 6379/tcp
dockerredissentinel_slave_2 docker-entrypoint.sh redis ... Up 6379/tcp
dockerredissentinel_sentinel_1 sentinel-entrypoint.sh Up 26379/tcp, 6379/tcp
dockerredissentinel_sentinel_2 sentinel-entrypoint.sh Up 26379/tcp, 6379/tcp
dockerredissentinel_sentinel_3 sentinel-entrypoint.sh Up 26379/tcp, 6379/tcp

Documentation

Overview

Package redis is the implementation of the key-value Data Broker client API for the Redis key-value data store. See cn-infra/db/keyval for the definition of the key-value Data Broker client API.

Index

Constants

View Source
const GoRedisNil = goredis.Nil

GoRedisNil is error returned by go-redis when Redis replies with nil, .e.g. when key does not exist.

Variables

View Source
var DefaultPlugin = *NewPlugin()

DefaultPlugin is a default instance of Plugin.

Functions

func LoadConfig

func LoadConfig(configFile string) (cfg interface{}, err error)

LoadConfig Loads the given configFile and returns appropriate config instance.

Types

type BytesBrokerWatcherRedis

type BytesBrokerWatcherRedis struct {
	logging.Logger
	// contains filtered or unexported fields
}

BytesBrokerWatcherRedis uses BytesConnectionRedis to access the datastore. The connection can be shared among multiple BytesBrokerWatcherRedis. BytesBrokerWatcherRedis allows to define a keyPrefix that is prepended to all keys in its methods in order to shorten keys used in arguments.

func (*BytesBrokerWatcherRedis) Delete

func (pdb *BytesBrokerWatcherRedis) Delete(match string, opts ...datasync.DelOption) (found bool, err error)

Delete calls Delete function of BytesConnectionRedis. Prefix will be prepended to key argument when searching.

func (*BytesBrokerWatcherRedis) GetPrefix

func (pdb *BytesBrokerWatcherRedis) GetPrefix() string

GetPrefix returns the prefix associated with this BytesBrokerWatcherRedis.

func (*BytesBrokerWatcherRedis) GetValue

func (pdb *BytesBrokerWatcherRedis) GetValue(key string) (data []byte, found bool, revision int64, err error)

GetValue calls GetValue function of BytesConnectionRedis. Prefix will be prepended to the key argument when searching.

func (*BytesBrokerWatcherRedis) ListKeys

ListKeys calls ListKeys function of BytesConnectionRedis. Prefix will be prepended to key argument when searching. The returned keys, however, will have the prefix trimmed. When done traversing, you must close the iterator by calling its Close() method.

func (*BytesBrokerWatcherRedis) ListValues

ListValues calls ListValues function of BytesConnectionRedis. Prefix will be prepended to key argument when searching. The returned keys, however, will have the prefix trimmed. When done traversing, you must close the iterator by calling its Close() method.

func (*BytesBrokerWatcherRedis) NewTxn

func (pdb *BytesBrokerWatcherRedis) NewTxn() keyval.BytesTxn

NewTxn creates new transaction. Prefix will be prepended to the key argument.

func (*BytesBrokerWatcherRedis) Put

func (pdb *BytesBrokerWatcherRedis) Put(key string, data []byte, opts ...datasync.PutOption) error

Put calls Put function of BytesConnectionRedis. Prefix will be prepended to the key argument.

func (*BytesBrokerWatcherRedis) Watch

func (pdb *BytesBrokerWatcherRedis) Watch(resp func(keyval.BytesWatchResp), closeChan chan string, keys ...string) error

Watch starts subscription for changes associated with the selected key. Watch events will be delivered to respChan.

type BytesConnectionRedis

type BytesConnectionRedis struct {
	logging.Logger
	// contains filtered or unexported fields
}

BytesConnectionRedis allows to store, read and watch values from Redis.

func NewBytesConnection

func NewBytesConnection(client Client, log logging.Logger) (*BytesConnectionRedis, error)

NewBytesConnection creates a new instance of BytesConnectionRedis using the provided Client (be it node, or cluster, or sentinel client).

func (*BytesConnectionRedis) Close

func (db *BytesConnectionRedis) Close() error

Close closes the connection to redis.

func (*BytesConnectionRedis) Delete

func (db *BytesConnectionRedis) Delete(key string, opts ...datasync.DelOption) (found bool, err error)

Delete deletes all the keys that start with the given match string.

func (*BytesConnectionRedis) GetValue

func (db *BytesConnectionRedis) GetValue(key string) (data []byte, found bool, revision int64, err error)

GetValue retrieves the value of the key from Redis.

func (*BytesConnectionRedis) ListKeys

func (db *BytesConnectionRedis) ListKeys(match string) (keyval.BytesKeyIterator, error)

ListKeys returns an iterator used to traverse keys that start with the given match string. When done traversing, you must close the iterator by calling its Close() method.

func (*BytesConnectionRedis) ListValues

func (db *BytesConnectionRedis) ListValues(match string) (keyval.BytesKeyValIterator, error)

ListValues returns an iterator used to traverse key value pairs for all the keys that start with the given match string. When done traversing, you must close the iterator by calling its Close() method.

func (*BytesConnectionRedis) NewBroker

func (db *BytesConnectionRedis) NewBroker(prefix string) keyval.BytesBroker

NewBroker creates a new CRUD proxy instance to redis using BytesConnectionRedis. The given prefix will be prepended to key argument in all calls. Specify empty string ("") if not wanting to use prefix.

func (*BytesConnectionRedis) NewBrokerWatcher

func (db *BytesConnectionRedis) NewBrokerWatcher(prefix string) *BytesBrokerWatcherRedis

NewBrokerWatcher creates a new CRUD + KeyValProtoWatcher proxy instance to redis using BytesConnectionRedis. The given prefix will be prepended to key argument in all calls. Specify empty string ("") if not wanting to use prefix.

func (*BytesConnectionRedis) NewTxn

func (db *BytesConnectionRedis) NewTxn() keyval.BytesTxn

NewTxn creates new transaction.

func (*BytesConnectionRedis) NewWatcher

func (db *BytesConnectionRedis) NewWatcher(prefix string) keyval.BytesWatcher

NewWatcher creates a new KeyValProtoWatcher proxy instance to redis using BytesConnectionRedis. The given prefix will be prepended to key argument in all calls. Specify empty string ("") if not wanting to use prefix.

func (*BytesConnectionRedis) Put

func (db *BytesConnectionRedis) Put(key string, data []byte, opts ...datasync.PutOption) error

Put sets the key/value in Redis data store. Replaces value if the key already exists.

func (*BytesConnectionRedis) Watch

func (db *BytesConnectionRedis) Watch(resp func(keyval.BytesWatchResp), closeChan chan string, keys ...string) error

Watch starts subscription for changes associated with the selected key. Watch events will be delivered to respChan. Subscription can be canceled by StopWatch call.

type BytesWatchDelResp

type BytesWatchDelResp struct {
	// contains filtered or unexported fields
}

BytesWatchDelResp is sent when a key-value pair has been removed.

func NewBytesWatchDelResp

func NewBytesWatchDelResp(key string, revision int64) *BytesWatchDelResp

NewBytesWatchDelResp creates an instance of BytesWatchDelResp.

func (*BytesWatchDelResp) GetChangeType

func (resp *BytesWatchDelResp) GetChangeType() datasync.Op

GetChangeType returns "Delete" for BytesWatchPutResp.

func (*BytesWatchDelResp) GetKey

func (resp *BytesWatchDelResp) GetKey() string

GetKey returns the key that has been deleted.

func (*BytesWatchDelResp) GetPrevValue added in v1.0.6

func (resp *BytesWatchDelResp) GetPrevValue() []byte

GetPrevValue returns nil for BytesWatchDelResp

func (*BytesWatchDelResp) GetRevision

func (resp *BytesWatchDelResp) GetRevision() int64

GetRevision returns the revision associated with the delete operation.

func (*BytesWatchDelResp) GetValue

func (resp *BytesWatchDelResp) GetValue() []byte

GetValue returns nil for BytesWatchDelResp.

type BytesWatchPutResp

type BytesWatchPutResp struct {
	// contains filtered or unexported fields
}

BytesWatchPutResp is sent when new key-value pair has been inserted or the value is updated.

func NewBytesWatchPutResp

func NewBytesWatchPutResp(key string, value []byte, prevValue []byte, revision int64) *BytesWatchPutResp

NewBytesWatchPutResp creates an instance of BytesWatchPutResp.

func (*BytesWatchPutResp) GetChangeType

func (resp *BytesWatchPutResp) GetChangeType() datasync.Op

GetChangeType returns "Put" for BytesWatchPutResp.

func (*BytesWatchPutResp) GetKey

func (resp *BytesWatchPutResp) GetKey() string

GetKey returns the key that has been inserted.

func (*BytesWatchPutResp) GetPrevValue added in v1.0.6

func (resp *BytesWatchPutResp) GetPrevValue() []byte

GetPrevValue returns the value that has been inserted.

func (*BytesWatchPutResp) GetRevision

func (resp *BytesWatchPutResp) GetRevision() int64

GetRevision returns the revision associated with create action.

func (*BytesWatchPutResp) GetValue

func (resp *BytesWatchPutResp) GetValue() []byte

GetValue returns the value that has been inserted.

type Client

type Client interface {
	// The easiest way to adapt Cmdable interface is just to embed it.
	goredis.Cmdable

	// Declare these additional methods to enable access to them through this
	// interface.
	Close() error
	PSubscribe(channels ...string) *goredis.PubSub
}

Client is common interface used to adapt all types of Redis clients.

func ConfigToClient added in v1.3.0

func ConfigToClient(config interface{}) (Client, error)

ConfigToClient creates an appropriate client according to the configuration parameter.

func CreateClusterClient

func CreateClusterClient(config ClusterConfig) (Client, error)

CreateClusterClient Creates a client that will connect to a redis cluster.

func CreateNodeClient

func CreateNodeClient(config NodeConfig) (Client, error)

CreateNodeClient creates a client that will connect to a redis node, like master and/or slave.

func CreateSentinelClient

func CreateSentinelClient(config SentinelConfig) (Client, error)

CreateSentinelClient Creates a failover client that will connect to redis sentinels.

type ClientConfig

type ClientConfig struct {
	// Password for authentication, if required.
	Password string `json:"password"`

	// Dial timeout for establishing new connections. Default is 5 seconds.
	DialTimeout time.Duration `json:"dial-timeout"`

	// Timeout for socket reads. If reached, commands will fail with a timeout
	// instead of blocking. Default is 3 seconds.
	ReadTimeout time.Duration `json:"read-timeout"`

	// Timeout for socket writes. If reached, commands will fail with a timeout
	// instead of blocking. Default is ReadTimeout.
	WriteTimeout time.Duration `json:"write-timeout"`

	// Connection pool configuration.
	Pool PoolConfig `json:"pool"`
}

ClientConfig is a configuration common to all types of Redis clients.

type ClusterConfig

type ClusterConfig struct {
	// A seed list of host:port addresses of cluster nodes.
	Endpoints []string `json:"endpoints"`

	// Enables read-only queries on slave nodes.
	EnableReadQueryOnSlave bool `json:"enable-query-on-slave"`

	// The maximum number of redirects before giving up.
	// Command is retried on network errors and MOVED/ASK redirects. Default is 16.
	MaxRedirects int `json:"max-rediects"`
	// Allows routing read-only commands to the closest master or slave node.
	RouteByLatency bool `json:"route-by-latency"`

	ClientConfig
}

ClusterConfig Cluster client configuration

type Deps

type Deps struct {
	infra.PluginDeps
	StatusCheck statuscheck.PluginStatusWriter
	Resync      *resync.Plugin // inject
}

Deps lists dependencies of the redis plugin.

type NodeConfig

type NodeConfig struct {
	// host:port address of a Redis node
	Endpoint string `json:"endpoint"`

	// Database to be selected after connecting to the server.
	DB int `json:"db"`

	// Enables read-only queries on slave nodes.
	EnableReadQueryOnSlave bool `json:"enable-query-on-slave"`

	// TLS configuration -- only applies to node client.
	TLS TLS `json:"tls"`

	// Embedded common client configuration.
	ClientConfig
}

NodeConfig Node client configuration

type Option added in v1.5.0

type Option func(*Plugin)

Option is a function that can be used in NewPlugin to customize Plugin.

func UseDeps added in v1.5.0

func UseDeps(cb func(*Deps)) Option

UseDeps returns Option that can inject custom dependencies.

type Plugin

type Plugin struct {
	Deps
	// contains filtered or unexported fields
}

Plugin implements redis plugin.

func NewPlugin added in v1.5.0

func NewPlugin(opts ...Option) *Plugin

NewPlugin creates a new Plugin with the provided Options.

func (*Plugin) AfterInit added in v1.0.4

func (p *Plugin) AfterInit() error

AfterInit registers redis to status check if required

func (*Plugin) Close

func (p *Plugin) Close() error

Close does nothing for redis plugin.

func (*Plugin) Disabled

func (p *Plugin) Disabled() (disabled bool)

Disabled returns *true* if the plugin is not in use due to missing redis configuration.

func (*Plugin) Init

func (p *Plugin) Init() (err error)

Init retrieves redis configuration and establishes a new connection with the redis data store. If the configuration file doesn't exist or cannot be read, the returned errora will be of os.PathError type. An untyped error is returned in case the file doesn't contain a valid YAML configuration.

func (*Plugin) NewBroker added in v1.3.0

func (p *Plugin) NewBroker(keyPrefix string) keyval.ProtoBroker

NewBroker creates new instance of prefixed broker that provides API with arguments of type proto.Message.

func (*Plugin) NewWatcher added in v1.3.0

func (p *Plugin) NewWatcher(keyPrefix string) keyval.ProtoWatcher

NewWatcher creates new instance of prefixed broker that provides API with arguments of type proto.Message.

func (*Plugin) OnConnect added in v1.5.0

func (p *Plugin) OnConnect(callback func() error)

OnConnect executes callback from datasync

type PoolConfig

type PoolConfig struct {
	// Maximum number of socket connections.
	// Default is 10 connections per every CPU as reported by runtime.NumCPU.
	PoolSize int `json:"max-connections"`
	// Amount of time, in seconds, a client waits for connection if all connections
	// are busy before returning an error.
	// Default is ReadTimeout + 1 second.
	PoolTimeout time.Duration `json:"busy-timeout"`
	// Amount of time, in seconds, after which a client closes idle connections.
	// Should be less than server's timeout.
	// Default is 5 minutes.
	IdleTimeout time.Duration `json:"idle-timeout"`
	// Frequency of idle checks.
	// Default is 1 minute.
	// When negative value is set, then idle check is disabled.
	IdleCheckFrequency time.Duration `json:"idle-check-frequency"`
}

PoolConfig is a configuration of the go-redis connection pool.

type SentinelConfig

type SentinelConfig struct {
	// A seed list of host:port addresses sentinel nodes.
	Endpoints []string `json:"endpoints"`

	// The sentinel master name.
	MasterName string `json:"master-name"`

	// Database to be selected after connecting to the server.
	DB int `json:"db"`

	ClientConfig
}

SentinelConfig Sentinel client configuration

type TLS

type TLS struct {
	Enabled    bool   `json:"enabled"`     // enable/disable TLS
	SkipVerify bool   `json:"skip-verify"` // whether to skip verification of server name & certificate
	Certfile   string `json:"cert-file"`   // client certificate
	Keyfile    string `json:"key-file"`    // client private key
	CAfile     string `json:"ca-file"`     // certificate authority
}

TLS configures Transport layer security properties.

type Txn

type Txn struct {
	// contains filtered or unexported fields
}

Txn allows to group operations into the transaction. Transaction executes multiple operations in a more efficient way in contrast to executing them one by one.

func (*Txn) Commit

func (tx *Txn) Commit(ctx context.Context) (err error)

Commit commits all operations in a transaction to the data store. Commit is atomic - either all operations in the transaction are committed to the data store, or none of them.

func (*Txn) Delete

func (tx *Txn) Delete(key string) keyval.BytesTxn

Delete adds a new 'delete' operation to a previously created transaction.

func (*Txn) Put

func (tx *Txn) Put(key string, value []byte) keyval.BytesTxn

Put adds a new 'put' operation to a previously created transaction. If the key does not exist in the data store, a new key-value item will be added to the data store. If the key exists in the data store, the existing value will be overwritten with the value from this operation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL