Documentation
¶
Overview ¶
Package redispipe - high throughput Redis connector with implicit pipelining.
https://redis.io/topics/pipelining
Pipelining improves maximum throughput that redis can serve, and reduces CPU usage both on redis server and on client. Mostly it comes from saving system CPU consumption.
But it is not always possible to use pipelining explicitly: usually there are dozens of concurrent goroutines, each sends just one request at a time. To handle usual workload, pipelining has to be implicit.
All known Golang redis connectors use connection-per-request working model with a connection pool, and provide only explicit pipelining. This showed far from optimal performance under highly concurrent load.
This connector was created as implicitly pipelined from the ground up to achieve maximum performance in a highly concurrent environment. It writes all requests to a single connection to redis, and continuously reads answers from another goroutine.
Note that it trades a bit of latency for throughput, and therefore may be not optimal for non-concurrent usage.
Capabilities ¶
- fast,
- thread-safe: no need to lock around connection, no need to "return to pool", etc,
- pipelining is implicit,
- transactions supported (but without WATCH),
- hook for custom logging,
- hook for request timing reporting.
Limitations ¶
- by default, it is not allowed to send blocking calls, because it will block the whole pipeline: `BLPOP`, `BRPOP`, `BRPOPLPUSH`, `BZPOPMIN`, `BZPOPMAX`, `XREAD`, `XREADGROUP`, `SAVE`. However, you could set `ScriptMode: true` option to enable these commands. `ScriptMode: true` also turns default `WritePause` to -1 (meaning it almost disables forced batching).
- `WATCH` is also forbidden by default: it is useless and even harmful when concurrent goroutines use the same connection. It is also allowed with `ScriptMode: true`, but you should be sure you use connection only from single goroutine.
- `SUBSCRIBE` and `PSUBSCRIBE` commands are forbidden. They switch the connection work mode to a completely different mode of communication, therefore it could not be combined with regular commands. This connector doesn't implement subscribing mode.
Structure ¶
- root package is empty
- common functionality is in redis subpackage
- singe connection is in redisconn subpackage
- cluster support is in rediscluster subpackage
Usage ¶
Both redisconn.Connect and rediscluster.NewCluster creates implementations of redis.Sender. redis.Sender provides asynchronous api for sending request/requests/transactions. That api accepts redis.Future interface implementations as an argument and fulfills it asynchronously. Usually you don't need to provide your own redis.Future implementation, but rather use synchronous wrappers.
To use convenient synchronous api, one should wrap "sender" with one of wrappers:
- redis.Sync{sender} - provides simple synchronouse api,
- redis.SyncCtx{sender} - provides same api, but all methods accept context.Context, and methods return immediately if that context is closed,
- redis.ChanFutured{sender} - provides api with future through channel closing.
Types accepted as command arguments: nil, []byte, string, int (and all other integer types), float64, float32, bool. All arguments are converted to redis bulk strings as usual (ie string and bytes - as is; numbers - in decimal notation). bool converted as "0/1", nil converted to empty string.
In difference to other redis packages, no custom types are used for request results. Results are de-serialized into plain go types and are returned as interface{}:
redis | go -------------|------- plain string | string bulk string | []byte integer | int64 array | []interface{} error | error (*errorx.Error)
IO, connection, and other errors are not returned separately but as result (and has same *errorx.Error underlying type).
Example (Usage) ¶
package main import ( "context" "fmt" "log" "github.com/joomcode/redispipe/redis" "github.com/joomcode/redispipe/rediscluster" "github.com/joomcode/redispipe/redisconn" ) const databaseno = 0 const password = "" var myhandle interface{} = nil func main() { ctx := context.Background() cluster := false SingleRedis := func(ctx context.Context) (redis.Sender, error) { opts := redisconn.Opts{ DB: databaseno, Password: password, Logger: redisconn.NoopLogger{}, // shut up logging. Could be your custom implementation. Handle: myhandle, // custom data, useful for custom logging // Other parameters (usually, no need to change) // IOTimeout, DialTimeout, ReconnectTimeout, TCPKeepAlive, Concurrency, WritePause, AsyncDial } conn, err := redisconn.Connect(ctx, "127.0.0.1:6379", opts) return conn, err } ClusterRedis := func(ctx context.Context) (redis.Sender, error) { opts := rediscluster.Opts{ HostOpts: redisconn.Opts{ // No DB Password: password, // Usually, no need for special logger }, Name: "mycluster", // name of a cluster Logger: rediscluster.NoopLogger{}, // shut up logging. Could be your custom implementation. Handle: myhandle, // custom data, useful for custom logging // Other parameters (usually, no need to change): // ConnsPerHost, ConnHostPolicy, CheckInterval, MovedRetries, WaitToMigrate, RoundRobinSeed, } addresses := []string{"127.0.0.1:20001"} // one or more of cluster addresses cluster, err := rediscluster.NewCluster(ctx, addresses, opts) return cluster, err } var sender redis.Sender var err error if cluster { sender, err = ClusterRedis(ctx) } else { sender, err = SingleRedis(ctx) } if err != nil { log.Fatal(err) } defer sender.Close() sync := redis.SyncCtx{sender} // wrapper for synchronous api res := sync.Do(ctx, "SET", "key", "ho") if err := redis.AsError(res); err != nil { log.Fatal(err) } fmt.Printf("result: %q\n", res) res = sync.Do(ctx, "GET", "key") if err := redis.AsError(res); err != nil { log.Fatal(err) } fmt.Printf("result: %q\n", res) res = sync.Send(ctx, redis.Req("HMSET", "hashkey", "field1", "val1", "field2", "val2")) if err := redis.AsError(res); err != nil { log.Fatal(err) } res = sync.Send(ctx, redis.Req("HMGET", "hashkey", "field1", "field2", "field3")) if err := redis.AsError(res); err != nil { log.Fatal(err) } for i, v := range res.([]interface{}) { fmt.Printf("%d: %T %q\n", i, v, v) } res = sync.Send(ctx, redis.Req("HMGET", "key", "field1")) if err := redis.AsError(res); err != nil { if rerr := redis.AsErrorx(res); rerr != nil && rerr.IsOfType(redis.ErrResult) { fmt.Printf("expected error: %v\n", rerr) } else { fmt.Printf("unexpected error: %v\n", err) } } else { fmt.Printf("unexpected missed error\n") } results := sync.SendMany(ctx, []redis.Request{ redis.Req("GET", "key"), redis.Req("HMGET", "hashkey", "field1", "field3"), }) // results is []interface{}, each element is result for corresponding request for i, res := range results { fmt.Printf("result[%d]: %T %q\n", i, res, res) } results, err = sync.SendTransaction(ctx, []redis.Request{ redis.Req("SET", "a{x}", "b"), redis.Req("SET", "b{x}", 0), redis.Req("INCRBY", "b{x}", 3), }) if err != nil { log.Fatal(err) } for i, res := range results { fmt.Printf("tresult[%d]: %T %q\n", i, res, res) } scanner := sync.Scanner(ctx, redis.ScanOpts{Match: "*key*"}) for { keys, err := scanner.Next() if err != nil { if err != redis.ScanEOF { log.Fatal(err) } break } fmt.Printf("keys: %q", keys) } }
Output: result: "OK" result: "ho" 0: []uint8 "val1" 1: []uint8 "val2" 2: <nil> %!q(<nil>) expected error: redispipe.result: WRONGTYPE Operation against a key holding the wrong kind of value {request: Req("HMGET", ["key" "field1"]), address: 127.0.0.1:6379} result[0]: []uint8 "ho" result[1]: []interface {} ["val1" <nil>] tresult[0]: string "OK" tresult[1]: string "OK" tresult[2]: int64 '\x03' keys: ["key" "hashkey"]
Directories
¶
Path | Synopsis |
---|---|
bin
|
|
Package redis contains common parts for other packages.
|
Package redis contains common parts for other packages. |
Package rediscluster implements a connector for redis cluster.
|
Package rediscluster implements a connector for redis cluster. |
redisclusterutil
Package redisclusterutil implements some protocol level details of cluster specification.
|
Package redisclusterutil implements some protocol level details of cluster specification. |
bench
Module
|
|
Package redisconn implements connection to single redis server.
|
Package redisconn implements connection to single redis server. |
bench
Module
|
|
Package redisdumb contains dumbest implementation of redis.Sender
|
Package redisdumb contains dumbest implementation of redis.Sender |
Package testbed is a tool for running redis-server for tests.
|
Package testbed is a tool for running redis-server for tests. |