Documentation
¶
Overview ¶
Package redis contains common parts for other packages.
- main interfaces visible to user (Sender, Scanner, ScanOpts)
- wrappers for synchronous interface over Sender (Sync, SyncCtx) and chan-based-future interface (ChanFutured)
- request writing,
- response parsing,
- root errorx namespace and common error types.
Usually you get Sender from redisconn.Connect or rediscluster.NewCluster, then wrap with Sync or SyncCtx, and use their sync methods without any locking:
sender, err := redisconn.Connect(ctx, "127.0.0.1:6379", redisconn.Opts{}) sync := redis.Sync{sender} go func() { res := sync.Do("GET", "x") if err := redis.AsError(res); err != nil { log.Println("failed", err) } log.Println("found x", res) }() go func() { results := sync.SendMany([]redis.Request{ redis.Req("GET", "k1"), redis.Req("Incr", "k2"), redis.Req("HMGET, "h1", "hk1", "hk2"), }) if err := redis.AsError(results[0]); err != nil { log.Println("failed", err) } if results[0] == nil { log.Println("not found") } else { log.Println("k1: ", results[0]) } }()
See more documentation in root redispipe package.
Index ¶
- Variables
- func AppendRequest(buf []byte, req Request) ([]byte, error)
- func ArgToString(arg interface{}) (string, bool)
- func AsError(v interface{}) error
- func AsErrorx(v interface{}) *errorx.Error
- func Blocking(name string) bool
- func CheckRequest(req Request, singleThreaded bool) error
- func Dangerous(name string) bool
- func ForbiddenCommand(name string, singleThreaded bool) error
- func ReadResponse(b *bufio.Reader) (interface{}, int)
- func ReplicaSafe(name string) bool
- func ScanResponse(res interface{}) ([]byte, []string, error)
- func TransactionResponse(res interface{}) ([]interface{}, error)
- type ChanFuture
- type ChanFutured
- type ChanFutures
- type ChanTransaction
- type FuncFuture
- type Future
- type Request
- type ScanOpts
- type Scanner
- type ScannerBase
- type Sender
- type Sync
- type SyncCtx
- func (s SyncCtx) Do(ctx context.Context, cmd string, args ...interface{}) interface{}
- func (s SyncCtx) Scanner(ctx context.Context, opts ScanOpts) SyncCtxIterator
- func (s SyncCtx) Send(ctx context.Context, r Request) interface{}
- func (s SyncCtx) SendMany(ctx context.Context, reqs []Request) []interface{}
- func (s SyncCtx) SendTransaction(ctx context.Context, reqs []Request) ([]interface{}, error)
- type SyncCtxIterator
- type SyncIterator
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // Errors is a root namespaces of all redispipe errors. Errors = errorx.NewNamespace("redispipe").ApplyModifiers(errorx.TypeModifierOmitStackTrace) // ErrOpts - options are wrong ErrOpts = Errors.NewSubNamespace("opts") // ErrContextIsNil - context is not passed to constructor ErrContextIsNil = ErrOpts.NewType("context_is_nil") // ErrNoAddressProvided - no address is given to constructor ErrNoAddressProvided = ErrOpts.NewType("no_address") // ErrTraitNotSent signals request were not written to wire ErrTraitNotSent = errorx.RegisterTrait("request_not_sent") // ErrContextClosed - context were explicitly closed (or connection / cluster were shut down) ErrContextClosed = Errors.NewType("connection_context_closed", ErrTraitNotSent) // ErrTraitConnectivity marks all networking and io errors ErrTraitConnectivity = errorx.RegisterTrait("network") // ErrIO - io error: read/write error, or timeout, or connection closed while reading/writing // It is not known if request were processed or not ErrIO = Errors.NewType("io error", ErrTraitConnectivity) // ErrRequest - request malformed. Can not serialize request, no reason to retry. ErrRequest = Errors.NewSubNamespace("request") // ErrArgumentType - argument is not serializable ErrArgumentType = ErrRequest.NewType("argument_type") // ErrBatchFormat - some other command in batch is malformed ErrBatchFormat = ErrRequest.NewType("batch_format") // ErrNoSlotKey - no key to determine cluster slot ErrNoSlotKey = ErrRequest.NewType("no_slot_key") // ErrRequestCancelled - request already cancelled ErrRequestCancelled = ErrRequest.NewType("request_cancelled") // ErrCommandForbidden - command is blocking or dangerous ErrCommandForbidden = ErrRequest.NewType("command_forbidden") // ErrResponse - response malformed. Redis returns unexpected response. ErrResponse = Errors.NewSubNamespace("response") // ErrResponseFormat - response is not valid Redis response ErrResponseFormat = ErrResponse.NewType("format") // ErrResponseUnexpected - response is valid redis response, but its structure/type unexpected ErrResponseUnexpected = ErrResponse.NewType("unexpected") // ErrHeaderlineTooLarge - header line too large ErrHeaderlineTooLarge = ErrResponse.NewType("headerline_too_large") // ErrHeaderlineEmpty - header line is empty ErrHeaderlineEmpty = ErrResponse.NewType("headerline_empty") // ErrIntegerParsing - integer malformed ErrIntegerParsing = ErrResponse.NewType("integer_parsiing") // ErrNoFinalRN - no final "\r\n" ErrNoFinalRN = ErrResponse.NewType("no_final_rn") // ErrUnknownHeaderType - unknown header type ErrUnknownHeaderType = ErrResponse.NewType("unknown_headerline_type") // ErrPing - ping receives wrong response ErrPing = ErrResponse.NewType("ping") // ErrTraitClusterMove signals that error happens due to cluster rebalancing. ErrTraitClusterMove = errorx.RegisterTrait("cluster_move") // ErrResult - just regular redis response. ErrResult = Errors.NewType("result") // ErrMoved - MOVED response ErrMoved = ErrResult.NewSubtype("moved", ErrTraitClusterMove) // ErrAsk - ASK response ErrAsk = ErrResult.NewSubtype("ask", ErrTraitClusterMove) // ErrLoading - redis didn't finish start ErrLoading = ErrResult.NewSubtype("loading", ErrTraitNotSent) // ErrExecEmpty - EXEC returns nil (WATCH failed) (it is strange, cause we don't support WATCH) ErrExecEmpty = ErrResult.NewSubtype("exec_empty") // ErrExecAbort - EXEC returns EXECABORT ErrExecAbort = ErrResult.NewSubtype("exec_abort") // ErrTryAgain - EXEC returns TryAgain ErrTryAgain = ErrResult.NewSubtype("exec_try_again") )
var ( // EKLine - set by response parser for unrecognized header lines. EKLine = errorx.RegisterProperty("line") // EKMovedTo - set by response parser for MOVED and ASK responses. EKMovedTo = errorx.RegisterProperty("movedto") // EKSlot - set by response parser for MOVED and ASK responses. EKSlot = errorx.RegisterPrintableProperty("slot") // EKVal - set by request writer and checker to argument value which could not be serialized. EKVal = errorx.RegisterPrintableProperty("val") // EKArgPos - set by request writer and checker to argument position which could not be serialized. EKArgPos = errorx.RegisterPrintableProperty("argpos") // EKRequest - request that triggered error. EKRequest = errorx.RegisterPrintableProperty("request") // EKRequests - batch requests that triggered error. EKRequests = errorx.RegisterPrintableProperty("requests") // EKResponse - unexpected response EKResponse = errorx.RegisterProperty("response") // EKAddress - address of redis that has a problems EKAddress = errorx.RegisterPrintableProperty("address") )
var ( // CollectTrace - should Sync and SyncCtx wrappers collect stack traces on a call side. CollectTrace = false )
var ScanEOF = errors.New("Iteration finished")
ScanEOF is error returned by Sync wrappers when iteration exhausted.
Functions ¶
func AppendRequest ¶
AppendRequest appends request to byte slice as RESP request (ie as array of strings).
It could fail if some request value is not nil, integer, float, string or byte slice. In case of error it still returns modified buffer, but truncated to original size, it could be used save reallocation.
Note: command could contain single space. In that case, it will be split and last part will be prepended to arguments.
Example ¶
package main import ( "fmt" "time" "github.com/joomcode/redispipe/redis" ) func main() { req, err := redis.AppendRequest(nil, redis.Req("GET", "one")) fmt.Printf("%q\n%v\n", req, err) req, err = redis.AppendRequest(req, redis.Req("INCRBY", "cnt", 5)) fmt.Printf("%q\n%v\n", req, err) req, err = redis.AppendRequest(req, redis.Req("SENDFOO", time.Second)) fmt.Printf("%q\n%v\n", req, err) }
Output: "*2\r\n$3\r\nGET\r\n$3\r\none\r\n" <nil> "*2\r\n$3\r\nGET\r\n$3\r\none\r\n*3\r\n$6\r\nINCRBY\r\n$3\r\ncnt\r\n$1\r\n5\r\n" <nil> "*2\r\n$3\r\nGET\r\n$3\r\none\r\n*3\r\n$6\r\nINCRBY\r\n$3\r\ncnt\r\n$1\r\n5\r\n" redispipe.request.argument_type: {request: Req("SENDFOO", ["1s"]), argpos: 0, val: 1s}
func ArgToString ¶
ArgToString returns string representataion of an argument. Used in cluster to determine cluster slot. Have to be in sync with AppendRequest
func AsError ¶
func AsError(v interface{}) error
AsError casts interface to error (if it is error)
Example ¶
package main import ( "errors" "fmt" "github.com/joomcode/redispipe/redis" ) func main() { vals := []interface{}{ nil, 1, "hello", errors.New("high"), redis.ErrResult.New("goodbye"), } for _, v := range vals { fmt.Printf("%T %v => %T %v\n", v, v, redis.AsError(v), redis.AsError(v)) } }
Output: <nil> <nil> => <nil> <nil> int 1 => <nil> <nil> string hello => <nil> <nil> *errors.errorString high => *errors.errorString high *errorx.Error redispipe.result: goodbye => *errorx.Error redispipe.result: goodbye
func AsErrorx ¶
AsErrorx casts interface to *errorx.Error. It panics if value is error but not *redis.Error.
func Blocking ¶
Blocking returns true if command is known to be blocking. Blocking commands could stall whole pipeline and therefore affect other commands sent through this connection. It is undesirable and prevented by default.
This commands are forbidden in default configuration, but could be enabled with `SingleThreaded` connection option.
`WATCH` command is also included here because while it is dangerous in concurrent environment, it is safe to be used in single threaded case.
func CheckRequest ¶
CheckRequest checks requests command and arguments to be compatible with connector.
func Dangerous ¶
Dangerous returns true if command is not safe to use with the connector. Currently it includes `SUBSCRIBE`, `PSUBSCRIBE` commands, because they changes connection protocol mode.
func ForbiddenCommand ¶
ForbiddenCommand returns true if command is not allowed to run.
func ReadResponse ¶
ReadResponse reads single RESP answer from bufio.Reader
func ReplicaSafe ¶
ReplicaSafe returns true if command is readonly and "safe to run on replica". Some commands like "scan" are not included, because their result could differ between master and replica.
func ScanResponse ¶
ScanResponse parses response of Scan command, returns iterator and array of keys.
func TransactionResponse ¶
func TransactionResponse(res interface{}) ([]interface{}, error)
TransactionResponse parses response of EXEC command, returns array of answers.
Types ¶
type ChanFuture ¶
type ChanFuture struct {
// contains filtered or unexported fields
}
ChanFuture - future implemented with channel as signal of fulfillment.
func (*ChanFuture) Cancelled ¶
func (f *ChanFuture) Cancelled() error
Cancelled - implementation of Future.Cancelled (always false).
func (*ChanFuture) Done ¶
func (f *ChanFuture) Done() <-chan struct{}
Done returns channel that will be closed on fulfillment.
func (*ChanFuture) Resolve ¶
func (f *ChanFuture) Resolve(res interface{}, _ uint64)
Resolve - implementation of Future.Resolve
func (*ChanFuture) Value ¶
func (f *ChanFuture) Value() interface{}
Value waits for result to be fulfilled and returns result.
type ChanFutured ¶
type ChanFutured struct {
S Sender
}
ChanFutured wraps Sender and provides asynchronous interface through future implemented with channel.
func (ChanFutured) Send ¶
func (s ChanFutured) Send(r Request) *ChanFuture
Send sends requests and returns ChanFuture for result.
func (ChanFutured) SendMany ¶
func (s ChanFutured) SendMany(reqs []Request) ChanFutures
SendMany sends several requests and returns slice of ChanFuture for results.
func (ChanFutured) SendTransaction ¶
func (s ChanFutured) SendTransaction(r []Request) *ChanTransaction
SendTransaction sends several requests as MULTI+EXEC transaction, returns ChanTransaction - wrapper around ChanFuture with additional method.
type ChanFutures ¶
type ChanFutures []*ChanFuture
ChanFutures - implementation of Future over slice of *ChanFuture
func (ChanFutures) Cancelled ¶
func (f ChanFutures) Cancelled() error
Cancelled - implementation of Future.Cancelled (always false).
func (ChanFutures) Resolve ¶
func (f ChanFutures) Resolve(res interface{}, i uint64)
Resolve - implementation of Future.Resolve. It resolves ChanFuture corresponding to index.
type ChanTransaction ¶
type ChanTransaction struct {
ChanFuture
}
ChanTransaction - wrapper over ChanFuture with additional convenient method.
func (*ChanTransaction) Results ¶
func (f *ChanTransaction) Results() ([]interface{}, error)
Results - parses result of transaction and returns it as an array of results.
type FuncFuture ¶
type FuncFuture func(res interface{}, n uint64)
FuncFuture simple wrapper that makes Future from function.
func (FuncFuture) Cancelled ¶
func (f FuncFuture) Cancelled() error
Cancelled implements Future.Cancelled (always false)
func (FuncFuture) Resolve ¶
func (f FuncFuture) Resolve(res interface{}, n uint64)
Resolve implements Future.Resolve (by calling wrapped function).
type Future ¶
type Future interface { // Resolve is called by sender to pass result (or error) for particular request. // Single future could be used for accepting multiple results. // n argument is used then to distinguish request this result is for. Resolve(res interface{}, n uint64) // Cancelled method could inform sender that request is abandoned. // It is called usually before sending request, and if Cancelled returns non-nil error, // then Sender calls Resolve with ErrRequestCancelled error wrapped around returned error. Cancelled() error }
Future is interface accepted by Sender to signal request completion.
type Request ¶
type Request struct { // Cmd is a redis command to be sent. // It could contain single space, then it will be split, and last part will be serialized as an argument. Cmd string Args []interface{} }
Request represents request to be passed to redis.
type ScanOpts ¶
type ScanOpts struct { // Cmd - command to be sent. Could be 'SCAN', 'SSCAN', 'HSCAN', 'ZSCAN' // default is 'SCAN' Cmd string // Key - key for SSCAN, HSCAN and ZSCAN command Key string // Match - pattern for filtering keys Match string // Count - soft-limit of single *SCAN answer Count int }
ScanOpts is options for scanning
type Scanner ¶
type Scanner interface { // Next will call cb.Resolve(result, 0) where `results` is keys part of result of SCAN/HSCAN/SSCAN/ZSCAN // (ie iterator part is handled internally). // When iteration completes, cb.Resolve(nil, 0) will be called. Next(cb Future) }
Scanner is an object used for scanning redis key space. It is returned by Sender.Scanner().
Example ¶
package main import ( "context" "fmt" "log" "github.com/joomcode/redispipe/redis" "github.com/joomcode/redispipe/redisconn" "github.com/joomcode/redispipe/testbed" ) func main() { defer runServer(46231)() ctx := context.Background() conn, _ := redisconn.Connect(ctx, "127.0.0.1:46231", redisconn.Opts{ Logger: redisconn.NoopLogger{}, }) sync := redis.Sync{conn} sync.Do("SET", "key1", "val1") sync.Do("SET", "key2", "val2") scan := sync.Scanner(redis.ScanOpts{Match: "key*"}) for { keys, err := scan.Next() if err != nil { if err != redis.ScanEOF { log.Fatal(err) } break } for _, key := range keys { fmt.Println(key) } } } func runServer(port int) func() { testbed.InitDir(".") s := testbed.Server{Port: uint16(port)} s.Start() return func() { s.Stop() testbed.RmDir() } }
Output: key1 key2
type ScannerBase ¶
type ScannerBase struct { // ScanOpts - options for this scanning ScanOpts // Iter - current iterator state Iter []byte // Err - error occurred. Implementation should stop iteration if Err is nil. Err error // contains filtered or unexported fields }
ScannerBase is internal "parent" object for scanner implementations
func (*ScannerBase) Cancelled ¶
func (s *ScannerBase) Cancelled() error
Cancelled - implements Future.Cancelled method
func (*ScannerBase) DoNext ¶
func (s *ScannerBase) DoNext(cb Future, snd Sender)
DoNext - perform next step of iteration - send corresponding *SCAN command
func (*ScannerBase) IterLast ¶
func (s *ScannerBase) IterLast() bool
IterLast - return true if iterator is at the end of this server/key keyspace.
func (*ScannerBase) Resolve ¶
func (s *ScannerBase) Resolve(res interface{}, _ uint64)
Resolve - implements Future.Resolve. Accepts result of *SCAN command, remembers error and iterator and calls Resolve on underlying future.
type Sender ¶
type Sender interface { // Send sends request to redis. When response will arrive, cb.Resolve(result, n) will be called. // Note: cb.Resolve could be called before Send returns. Send(r Request, cb Future, n uint64) // SendMany sends many requests at once. // When responses will arrive, cb.Resolve will be called with distinct n values: // - first request's response will be passed as cb.Resolve(response, n) // - second request's response will be passed as cb.Resolve(response, n+1) // - third ... cb.Resolve(response, n+2) // Note: responses could arrive in arbitrary order. SendMany(r []Request, cb Future, n uint64) // SendTransaction sends several requests as MULTI+EXEC redis transaction. // Response will be passed only once as an array of responses to commands (as EXEC does) // cb.Resolve([]interface{res1, res2, res3, ...}, n) SendTransaction(r []Request, cb Future, n uint64) // Scanner returns scanner object that scans keyspace sequentially. Scanner(opts ScanOpts) Scanner // EachShard synchronously calls callback for each shard. // Single-connection client will call it only once, but clustered will call for every master. // If callback is called with error, it will not be called again. // If callback returns false, iteration stops. EachShard(func(Sender, error) bool) // Close closes client. All following requests will be immediately resolved with error. Close() }
Sender is interface of client implementation. It provides interface in term of Future, and could be either single connection, connection to cluster, or whatever.
type Sync ¶
type Sync struct {
S Sender
}
Sync provides convenient synchronouse interface over asynchronouse Sender.
Example ¶
package main import ( "context" "fmt" "github.com/joomcode/redispipe/redis" "github.com/joomcode/redispipe/redisconn" "github.com/joomcode/redispipe/testbed" ) func main() { defer runServer(46231)() ctx := context.Background() conn, _ := redisconn.Connect(ctx, "127.0.0.1:46231", redisconn.Opts{ Logger: redisconn.NoopLogger{}, }) sync := redis.Sync{conn} res := sync.Do("SET", "key1", "1") fmt.Println(res) res = sync.Send(redis.Req("SET", "key2", "2")) fmt.Println(res) ress := sync.SendMany([]redis.Request{ redis.Req("GET", "key1"), redis.Req("GET", "key2"), }) fmt.Printf("%q\n", ress) res = sync.Do("HSET", "key1", "field1", "val1") fmt.Println(redis.AsError(res)) ress, err := sync.SendTransaction([]redis.Request{ redis.Req("INCR", "key1"), redis.Req("INCRBY", "key2", -1), redis.Req("GET", "key1"), redis.Req("GET", "key2"), }) fmt.Println(err) fmt.Printf("%q\n", ress) } func runServer(port int) func() { testbed.InitDir(".") s := testbed.Server{Port: uint16(port)} s.Start() return func() { s.Stop() testbed.RmDir() } }
Output: OK OK ["1" "2"] redispipe.result: WRONGTYPE Operation against a key holding the wrong kind of value {request: Req("HSET", ["key1" "field1" "val1"]), address: 127.0.0.1:46231} <nil> ['\x02' '\x01' "2" "1"]
func (Sync) Do ¶
Do is convenient method to construct and send request. Returns value that could be either result or error.
func (Sync) Scanner ¶
func (s Sync) Scanner(opts ScanOpts) SyncIterator
Scanner returns synchronous iterator over redis keyspace/key.
func (Sync) SendMany ¶
SendMany sends several requests in "parallel" and returns slice or results in a same order. Each result could be value or error.
func (Sync) SendTransaction ¶
SendTransaction sends several requests as a single MULTI+EXEC transaction. It returns array of responses and an error, if transaction fails. Since Redis transaction either fully executed or fully failed, all values are valid if err == nil.
type SyncCtx ¶
type SyncCtx struct {
S Sender
}
SyncCtx (like Sync) provides convenient synchronous interface over asynchronous Sender. Its methods accept context.Context to allow early request cancelling. Note that if context were cancelled after request were send, redis still will execute it, but you will have no way to know about that fact.
func (SyncCtx) Do ¶
Do is convenient method to construct and send request. Returns value that could be either result or error. When context is cancelled, Do returns ErrRequestCancelled error.
func (SyncCtx) Scanner ¶
func (s SyncCtx) Scanner(ctx context.Context, opts ScanOpts) SyncCtxIterator
Scanner returns synchronous iterator over redis keyspace/key. Scanner will stop iteration if context were cancelled.
func (SyncCtx) Send ¶
Send sends request to redis. Returns value that could be either result or error. When context is cancelled, Send returns ErrRequestCancelled error.
func (SyncCtx) SendMany ¶
SendMany sends several requests in "parallel" and returns slice or results in a same order. Each result could be value or error. When context is cancelled, SendMany returns slice of ErrRequestCancelled errors.
func (SyncCtx) SendTransaction ¶
SendTransaction sends several requests as a single MULTI+EXEC transaction. It returns array of responses and an error, if transaction fails. Since Redis transaction either fully executed or fully failed, all values are valid if err == nil. But some of them could be error on their own. When context is cancelled, SendTransaction returns ErrRequestCancelled error.
type SyncCtxIterator ¶
type SyncCtxIterator struct {
// contains filtered or unexported fields
}
SyncCtxIterator is synchronous iterator over repeating *SCAN command. It will stop iteration if context were cancelled.
func (SyncCtxIterator) Next ¶
func (s SyncCtxIterator) Next() ([]string, error)
Next returns next bunch of keys, or error. ScanEOF error signals for regular iteration completion. It will return ErrRequestCancelled error if context were cancelled.
type SyncIterator ¶
type SyncIterator struct {
// contains filtered or unexported fields
}
SyncIterator is synchronous iterator over repeating *SCAN command.
func (SyncIterator) Next ¶
func (s SyncIterator) Next() ([]string, error)
Next returns next bunch of keys, or error. ScanEOF error signals for regular iteration completion.