Documentation ¶
Overview ¶
Package redisc implements a redis cluster client on top of the redigo client package. It supports all commands that can be executed on a redis cluster, including pub-sub, scripts and read-only connections to read data from replicas. See http://redis.io/topics/cluster-spec for details.
Design ¶
The package defines two main types: Cluster and Conn. Both are described in more details below, but the Cluster manages the mapping of keys (or more exactly, hash slots computed from keys) to a group of nodes that form a redis cluster, and a Conn manages a connection to this cluster.
The package is designed such that for simple uses, or when keys have been carefully named to play well with a redis cluster, a Cluster value can be used as a drop-in replacement for a redis.Pool from the redigo package.
Similarly, the Conn type implements redigo's redis.Conn interface (and the augmented redis.ConnWithTimeout one too), so the API to execute commands is the same - in fact the redisc package uses the redigo package as its only third-party dependency.
When more control is needed, the package offers some extra behaviour specific to working with a redis cluster:
Slot and SplitBySlot functions to compute the slot for a given key and to split a list of keys into groups of keys from the same slot, so that each group can safely be handled using the same connection.
*Conn.Bind (or the BindConn package-level helper function) to explicitly specify the keys that will be used with the connection so that the right node is selected, instead of relying on the automatic detection based on the first parameter of the command.
*Conn.ReadOnly (or the ReadOnlyConn package-level helper function) to mark a connection as read-only, allowing commands to be served by a replica instead of the master.
RetryConn to wrap a connection into one that automatically follows redirections when the cluster moves slots around.
Helper functions to deal with cluster-specific errors.
Cluster ¶
The Cluster type manages a redis cluster and offers an interface compatible with redigo's redis.Pool:
Get() redis.Conn Close() error
Along with some additional methods specific to a cluster:
Dial() (redis.Conn, error) EachNode(bool, func(string, redis.Conn) error) error Refresh() error Stats() map[string]redis.PoolStats
If the CreatePool function field is set, then a redis.Pool is created to manage connections to each of the cluster's nodes. A call to Get returns a connection from that pool.
The Dial method, on the other hand, guarantees that the returned connection will not be managed by a pool, even if CreatePool is set. It calls redigo's redis.Dial function to create the unpooled connection, passing along any DialOptions set on the cluster. If the cluster's CreatePool field is nil, Get behaves the same as Dial.
The Refresh method refreshes the cluster's internal mapping of hash slots to nodes. It should typically be called only once, after the cluster is created and before it is used, so that the first connections already benefit from smart routing. It is automatically kept up-to-date based on the redis MOVED responses afterwards.
The EachNode method visits each node in the cluster and calls the provided function with a connection to that node, which may be useful to run diagnostics commands on each node or to collect keys across the whole cluster.
The Stats method returns the pool statistics for each node, with the node's address as key of the map.
A cluster must be closed once it is no longer used to release its resources.
Connection ¶
The connection returned from Get or Dial is a redigo redis.Conn interface (that also implements redis.ConnWithTimeout), with a concrete type of *Conn. In addition to the interface's required methods, *Conn adds the following methods:
Bind(...string) error ReadOnly() error
The returned connection is not yet connected to any node; it is "bound" to a specific node only when a call to Do, Send, Receive or Bind is made. For Do, Send and Receive, the node selection is implicit, it uses the first parameter of the command, and computes the hash slot assuming that first parameter is a key. It then binds the connection to the node corresponding to that slot. If there are no parameters for the command, or if there is no command (e.g. in a call to Receive), a random node is selected.
Bind is explicit, it gives control to the caller over which node to select by specifying a list of keys that the caller wishes to handle with the connection. All keys must belong to the same slot, and the connection must not already be bound to a node, otherwise an error is returned. On success, the connection is bound to the node holding the slot of the specified key(s).
Because the connection is returned as a redis.Conn interface, a type assertion must be used to access the underlying *Conn and to be able to call Bind:
redisConn := cluster.Get() if conn, ok := redisConn.(*redisc.Conn); ok { if err := conn.Bind("my-key"); err != nil { // handle error } }
The BindConn package-level function is provided as a helper for this common use-case.
The ReadOnly method marks the connection as read-only, meaning that it will attempt to connect to a replica instead of the master node for its slot. Once bound to a node, the READONLY redis command is sent automatically, so it doesn't have to be sent explicitly before use. ReadOnly must be called before the connection is bound to a node, otherwise an error is returned.
For the same reason as for Bind, a type assertion must be used to call ReadOnly on a *Conn, so a package-level helper function is also provided, ReadOnlyConn.
There is no ReadWrite method, because it can be sent as a normal redis command and will essentially end that connection (all commands will now return MOVED errors). If the connection was wrapped in a RetryConn call, then it will automatically follow the redirection to the master node (see the Redirections section).
The connection must be closed after use, to release the underlying resources.
Redirections ¶
The redis cluster may return MOVED and ASK errors when the node that received the command doesn't currently hold the slot corresponding to the key. The package cannot reliably handle those redirections automatically because the redirection error may be returned for a pipeline of commands, some of which may have succeeded.
However, a connection can be wrapped by a call to RetryConn, which returns a redis.Conn interface where only calls to Do, Close and Err can succeed. That means pipelining is not supported, and only a single command can be executed at a time, but it will automatically handle MOVED and ASK replies, as well as TRYAGAIN errors.
Note that even if RetryConn is not used, the cluster always updates its mapping of slots to nodes automatically by keeping track of MOVED replies.
Concurrency ¶
The concurrency model is similar to that of the redigo package:
Cluster methods are safe to call concurrently (like redis.Pool).
Connections do not support concurrent calls to write methods (Send, Flush) or concurrent calls to the read method (Receive).
Connections do allow a concurrent reader and writer.
Because the Do method combines the functionality of Send, Flush and Receive, it cannot be called concurrently with other methods.
The Bind and ReadOnly methods are safe to call concurrently, but there is not much point in doing so for as both will fail if the connection is already bound.
Example ¶
Create and use a cluster.
package main import ( "log" "time" "github.com/gomodule/redigo/redis" "github.com/mna/redisc" ) func main() { // create the cluster cluster := redisc.Cluster{ StartupNodes: []string{":7000", ":7001", ":7002"}, DialOptions: []redis.DialOption{redis.DialConnectTimeout(5 * time.Second)}, CreatePool: createPool, } defer cluster.Close() // initialize its mapping if err := cluster.Refresh(); err != nil { log.Fatalf("Refresh failed: %v", err) } // grab a connection from the pool conn := cluster.Get() defer conn.Close() // call commands on it s, err := redis.String(conn.Do("GET", "some-key")) if err != nil { log.Fatalf("GET failed: %v", err) } log.Println(s) // grab a non-pooled connection conn2, err := cluster.Dial() if err != nil { log.Fatalf("Dial failed: %v", err) } defer conn2.Close() // make it handle redirections automatically rc, err := redisc.RetryConn(conn2, 3, 100*time.Millisecond) if err != nil { log.Fatalf("RetryConn failed: %v", err) } _, err = rc.Do("SET", "some-key", 2) if err != nil { log.Fatalf("SET failed: %v", err) } } func createPool(addr string, opts ...redis.DialOption) (*redis.Pool, error) { return &redis.Pool{ MaxIdle: 5, MaxActive: 10, IdleTimeout: time.Minute, Dial: func() (redis.Conn, error) { return redis.Dial("tcp", addr, opts...) }, TestOnBorrow: func(c redis.Conn, t time.Time) error { _, err := c.Do("PING") return err }, }, nil }
Output:
Index ¶
- Constants
- func BindConn(c redis.Conn, keys ...string) error
- func IsCrossSlot(err error) bool
- func IsTryAgain(err error) bool
- func ReadOnlyConn(c redis.Conn) error
- func RetryConn(c redis.Conn, maxAtt int, tryAgainDelay time.Duration) (redis.Conn, error)
- func Slot(key string) int
- func SplitBySlot(keys ...string) [][]string
- type BgErrorSrc
- type Cluster
- type Conn
- func (c *Conn) Bind(keys ...string) error
- func (c *Conn) Close() error
- func (c *Conn) Do(cmd string, args ...interface{}) (interface{}, error)
- func (c *Conn) DoWithTimeout(timeout time.Duration, cmd string, args ...interface{}) (v interface{}, err error)
- func (c *Conn) Err() error
- func (c *Conn) Flush() error
- func (c *Conn) ReadOnly() error
- func (c *Conn) Receive() (interface{}, error)
- func (c *Conn) ReceiveWithTimeout(timeout time.Duration) (v interface{}, err error)
- func (c *Conn) Send(cmd string, args ...interface{}) error
- type RedirError
Examples ¶
Constants ¶
const HashSlots = 16384
HashSlots is the number of slots supported by redis cluster.
Variables ¶
This section is empty.
Functions ¶
func BindConn ¶
BindConn is a convenience function that checks if c implements a Bind method with the right signature such as the one for a *Conn, and calls that method. If c doesn't implement that method, it returns an error.
func IsCrossSlot ¶
IsCrossSlot returns true if the error is a redis cluster error of type CROSSSLOT, meaning that a command was sent with keys from different slots.
func IsTryAgain ¶
IsTryAgain returns true if the error is a redis cluster error of type TRYAGAIN, meaning that the command is valid, but the cluster is in an unstable state and it can't complete the request at the moment.
func ReadOnlyConn ¶
ReadOnlyConn is a convenience function that checks if c implements a ReadOnly method with the right signature such as the one for a *Conn, and calls that method. If c doesn't implement that method, it returns an error.
func RetryConn ¶
RetryConn wraps the connection c (which must be a *redisc.Conn) into a connection that automatically handles cluster redirections (MOVED and ASK replies) and retries for TRYAGAIN errors. Only Do, Close and Err can be called on that connection, all other methods return an error.
The maxAtt parameter indicates the maximum number of attempts to successfully execute the command. The tryAgainDelay is the duration to wait before retrying a TRYAGAIN error.
The only case where it returns a non-nil error is if c is not a *redisc.Conn.
Example ¶
Automatically retry in case of redirection errors.
package main import ( "fmt" "log" "time" "github.com/gomodule/redigo/redis" "github.com/mna/redisc" ) func createPool(addr string, opts ...redis.DialOption) (*redis.Pool, error) { return &redis.Pool{ MaxIdle: 5, MaxActive: 10, IdleTimeout: time.Minute, Dial: func() (redis.Conn, error) { return redis.Dial("tcp", addr, opts...) }, TestOnBorrow: func(c redis.Conn, t time.Time) error { _, err := c.Do("PING") return err }, }, nil } func main() { // create the cluster cluster := redisc.Cluster{ StartupNodes: []string{":7000", ":7001", ":7002"}, DialOptions: []redis.DialOption{redis.DialConnectTimeout(5 * time.Second)}, CreatePool: createPool, } defer cluster.Close() // initialize its mapping if err := cluster.Refresh(); err != nil { log.Fatalf("Refresh failed: %v", err) } // get a connection from the cluster conn := cluster.Get() defer conn.Close() // create the retry connection - only Do, Close and Err are // supported on that connection. It will make up to 3 attempts // to get a valid response, and will wait 100ms before a retry // in case of a TRYAGAIN redis error. retryConn, err := redisc.RetryConn(conn, 3, 100*time.Millisecond) if err != nil { log.Fatalf("RetryConn failed: %v", err) } // call commands v, err := retryConn.Do("GET", "key") if err != nil { log.Fatalf("GET failed: %v", err) } fmt.Println("GET returned ", v) }
Output:
func SplitBySlot ¶
SplitBySlot takes a list of keys and returns a list of list of keys, grouped by identical cluster slot. For example:
bySlot := SplitBySlot("k1", "k2", "k3") for _, keys := range bySlot { // keys is a list of keys that belong to the same slot }
Types ¶
type BgErrorSrc ¶ added in v1.3.3
type BgErrorSrc uint
BgErrorSrc identifies the origin of a background error as reported by calls to Cluster.BgError, when set.
const ( // ClusterRefresh indicates the error comes from a background refresh of // cluster slots mapping, e.g. following reception of a MOVED error. ClusterRefresh BgErrorSrc = iota // RetryCloseConn indicates the error comes from the call to Close for a // previous connection, before retrying a command with a new one. RetryCloseConn )
List of possible BgErrorSrc values.
type Cluster ¶
type Cluster struct { // StartupNodes is the list of initial nodes that make up the cluster. The // values are expected as "address:port" (e.g.: "127.0.0.1:6379"). StartupNodes []string // DialOptions is the list of options to set on each new connection. DialOptions []interface{} // CreatePool is the function to call to create a redis.Pool for the // specified TCP address, using the provided options as set in DialOptions. // If this field is not nil, a redis.Pool is created for each node in the // cluster and the pool is used to manage the connections returned by Get. CreatePool func(address string, options ...interface{}) (*redis.Pool, error) // PoolWaitTime is the time to wait when getting a connection from a pool // configured with MaxActive > 0 and Wait set to true, and MaxActive // connections are already in use. // // If <= 0 (or with Go < 1.7), there is no wait timeout, it will wait // indefinitely if Pool.Wait is true. PoolWaitTime time.Duration // BgError is an optional function to call when a background error occurs // that would otherwise go unnoticed. The source of the error is indicated // by the parameter of type BgErrorSrc, see the list of BgErrorSrc values // for possible error sources. The function may be called in a distinct // goroutine, it should not access shared values that are not meant to be // used concurrently. BgError func(BgErrorSrc, error) // LayoutRefresh is an optional function that is called each time a cluster // refresh is successfully executed, either by an explicit call to // Cluster.Refresh or e.g. as required following a MOVED error. Note that // even though it is unlikely, the old and new mappings could be identical. // The function may be called in a separate goroutine, it should not access // shared values that are not meant to be used concurrently. LayoutRefresh func(old, new [HashSlots][]string) // contains filtered or unexported fields }
A Cluster manages a redis cluster. If the CreatePool field is not nil, a redis.Pool is used for each node in the cluster to get connections via Get. If it is nil or if Dial is called, redis.Dial is used to get the connection.
All fields must be set prior to using the Cluster value, and must not be changed afterwards, as that could be a data race.
func (*Cluster) Close ¶
Close releases the resources used by the cluster. It closes all the pools that were created, if any.
func (*Cluster) Dial ¶
Dial returns a connection the same way as Get, but it guarantees that the connection will not be managed by the pool, even if CreatePool is set. The actual returned type is *Conn, see its documentation for details.
func (*Cluster) EachNode ¶ added in v1.3.3
EachNode calls fn for each node in the cluster, with a connection bound to that node. The connection is automatically closed (and potentially returned to the pool if Cluster.CreatePool is set) after the function executes. Note that conn is not a RetryConn and using one is inappropriate, as the goal of EachNode is to connect to specific nodes, not to target specific keys. The visited nodes are those that are known at the time of the call - it does not force a refresh of the cluster layout. If no nodes are known, it returns an error.
If fn returns an error, no more nodes are visited and that error is returned by EachNode. If replicas is true, it will visit each replica node instead, otherwise the primary nodes are visited. Keep in mind that if replicas is true, it will visit all known replicas - which is great e.g. to run diagnostics on each node, but can be surprising if the goal is e.g. to collect all keys, as it is possible that more than one node is acting as replica for the same primary, meaning that the same keys could be seen multiple times - you should be prepared to handle this scenario. The connection provided to fn is not a ReadOnly connection (conn.ReadOnly hasn't been called on it), it is up to fn to execute the READONLY redis command if required.
func (*Cluster) Get ¶
Get returns a redis.Conn interface that can be used to call redis commands on the cluster. The application must close the returned connection. The actual returned type is *Conn, see its documentation for details.
func (*Cluster) Refresh ¶
Refresh updates the cluster's internal mapping of hash slots to redis node. It calls CLUSTER SLOTS on each known node until one of them succeeds.
It should typically be called after creating the Cluster and before using it. The cluster automatically keeps its mapping up-to-date afterwards, based on the redis commands' MOVED responses.
type Conn ¶
type Conn struct {
// contains filtered or unexported fields
}
Conn is a redis cluster connection. When returned by Get or Dial, it is not yet bound to any node in the cluster. Only when a call to Do, Send, Receive or Bind is made is a connection to a specific node established:
- if Do or Send is called first, the command's first parameter is assumed to be the key, and its slot is used to find the node
- if Receive is called first, or if Do or Send is called first but with no parameter for the command (or no command), a random node is selected in the cluster
- if Bind is called first, the node corresponding to the slot of the specified key(s) is selected
Because Get and Dial return a redis.Conn interface, a type assertion can be used to call Bind or ReadOnly on this concrete Conn type:
redisConn := cluster.Get() if conn, ok := redisConn.(*redisc.Conn); ok { if err := conn.Bind("my-key"); err != nil { // handle error } }
Alternatively, the package-level BindConn or ReadOnlyConn helper functions may be used.
Example ¶
Execute scripts.
package main import ( "fmt" "log" "time" "github.com/gomodule/redigo/redis" "github.com/mna/redisc" ) func createPool(addr string, opts ...redis.DialOption) (*redis.Pool, error) { return &redis.Pool{ MaxIdle: 5, MaxActive: 10, IdleTimeout: time.Minute, Dial: func() (redis.Conn, error) { return redis.Dial("tcp", addr, opts...) }, TestOnBorrow: func(c redis.Conn, t time.Time) error { _, err := c.Do("PING") return err }, }, nil } func main() { // create the cluster cluster := redisc.Cluster{ StartupNodes: []string{":7000", ":7001", ":7002"}, DialOptions: []redis.DialOption{redis.DialConnectTimeout(5 * time.Second)}, CreatePool: createPool, } defer cluster.Close() // initialize its mapping if err := cluster.Refresh(); err != nil { log.Fatalf("Refresh failed: %v", err) } // create a script that takes 2 keys and 2 values, and returns 1 var script = redis.NewScript(2, ` redis.call("SET", KEYS[1], ARGV[1]) redis.call("SET", KEYS[2], ARGV[2]) return 1 `) // get a connection from the cluster conn := cluster.Get() defer conn.Close() // bind it to the right node for the required keys, ahead of time if err := redisc.BindConn(conn, "scr{a}1", "src{a}2"); err != nil { log.Fatalf("BindConn failed: %v", err) } // script.Do, sends the whole script on first use v, err := script.Do(conn, "scr{a}1", "scr{a}2", "x", "y") if err != nil { log.Fatalf("script.Do failed: %v", err) } fmt.Println("Do returned ", v) // it is also possible to send only the hash, once it has been // loaded on that node if err := script.SendHash(conn, "scr{a}1", "scr{a}2", "x", "y"); err != nil { log.Fatalf("script.SendHash failed: %v", err) } if err := conn.Flush(); err != nil { log.Fatalf("Flush failed: %v", err) } // and receive the script's result v, err = conn.Receive() if err != nil { log.Fatalf("Receive failed: %v", err) } fmt.Println("Receive returned ", v) }
Output:
func (*Conn) Bind ¶
Bind binds the connection to the cluster node corresponding to the slot of the provided keys. If the keys don't belong to the same slot, an error is returned and the connection is not bound. If the connection is already bound, an error is returned. If no key is provided, it binds to a random node.
func (*Conn) Do ¶
Do sends a command to the server and returns the received reply. If the connection is not yet bound to a cluster node, it will be after this call, based on the rules documented in the Conn type.
func (*Conn) DoWithTimeout ¶ added in v1.1.4
func (c *Conn) DoWithTimeout(timeout time.Duration, cmd string, args ...interface{}) (v interface{}, err error)
DoWithTimeout sends a command to the server and returns the received reply. If the connection is not yet bound to a cluster node, it will be after this call, based on the rules documented in the Conn type.
The timeout overrides the read timeout set when dialing the connection (in the DialOptions of the Cluster).
func (*Conn) Err ¶
Err returns a non-nil value if the connection is broken. Applications should close broken connections.
func (*Conn) ReadOnly ¶
ReadOnly marks the connection as read-only, meaning that when it is bound to a cluster node, it will attempt to connect to a replica instead of the master and will automatically emit a READONLY command so that the replica agrees to serve read commands. Be aware that reading from a replica may return stale data. Sending write commands on a read-only connection will fail with a MOVED error. See http://redis.io/commands/readonly for more details.
If the connection is already bound to a node, either via a call to Do, Send, Receive or Bind, ReadOnly returns an error.
func (*Conn) Receive ¶
Receive receives a single reply from the server. If the connection is not yet bound to a cluster node, it will be after this call, based on the rules documented in the Conn type.
func (*Conn) ReceiveWithTimeout ¶ added in v1.1.4
ReceiveWithTimeout receives a single reply from the Redis server. If the connection is not yet bound to a cluster node, it will be after this call, based on the rules documented in the Conn type.
The timeout overrides the read timeout set when dialing the connection (in the DialOptions of the Cluster).
type RedirError ¶
type RedirError struct { // Type indicates if the redirection is a MOVED or an ASK. Type string // NewSlot is the slot number of the redirection. NewSlot int // Addr is the node address to redirect to. Addr string // contains filtered or unexported fields }
RedirError is a cluster redirection error. It indicates that the redis node returned either a MOVED or an ASK error, as specified by the Type field.
func ParseRedir ¶
func ParseRedir(err error) *RedirError
ParseRedir parses err into a RedirError. If err is not a MOVED or ASK error or if it is nil, it returns nil.
func (*RedirError) Error ¶
func (e *RedirError) Error() string
Error returns the error message of a RedirError. This is the message as received from redis.
Directories ¶
Path | Synopsis |
---|---|
Command ccheck implements the consistency checker redis cluster client as described in http://redis.io/topics/cluster-tutorial.
|
Command ccheck implements the consistency checker redis cluster client as described in http://redis.io/topics/cluster-tutorial. |
Package redistest provides test helpers to manage a redis server.
|
Package redistest provides test helpers to manage a redis server. |
resp
Package resp implements an efficient decoder for the Redis Serialization Protocol (RESP).
|
Package resp implements an efficient decoder for the Redis Serialization Protocol (RESP). |