Documentation
¶
Index ¶
- Constants
- Variables
- func DoRedisCmd(conn redis.Conn, cmdName string, args ...interface{}) (reply interface{}, err error)
- func FindString(src []string, f string) int
- func GetHashedPartitionID(pk []byte, pnum int) int
- func IsConnectRefused(err error) bool
- func IsFailedOnClusterChanged(err error) bool
- func IsFailedOnNotWritable(err error) bool
- func IsSlowCmd(cmd string) bool
- func SetLogger(level int32, l Logger)
- type Cluster
- func (cluster *Cluster) ChangeMaxActive(active int)
- func (cluster *Cluster) Close()
- func (cluster *Cluster) GetAllHostsByPart(pid int) ([]*RedisHost, error)
- func (cluster *Cluster) GetConn(pk []byte, leader bool, tryLocalForRead bool, isSlowQuery bool) (redis.Conn, error)
- func (cluster *Cluster) GetConnForLarge(pk []byte, leader bool, tryLocalForRead bool, vsize int) (redis.Conn, error)
- func (cluster *Cluster) GetConnsByHosts(hosts []string, isSlowQuery bool) ([]redis.Conn, error)
- func (cluster *Cluster) GetConnsForAllParts(isSlowQuery bool) ([]redis.Conn, error)
- func (cluster *Cluster) GetHostAndConn(pk []byte, leader bool, tryLocalForRead bool, isSlowQuery bool) (*RedisHost, redis.Conn, error)
- func (cluster *Cluster) GetHostAndConnForLarge(pk []byte, leader bool, tryLocalForRead bool, vsize int) (*RedisHost, redis.Conn, error)
- func (cluster *Cluster) GetHostByPart(pid int, leader bool) (*RedisHost, error)
- func (cluster *Cluster) GetHostStats() map[string]HostStats
- func (cluster *Cluster) GetNodeHost(pk []byte, leader bool, tryLocalForRead bool) (*RedisHost, error)
- func (cluster *Cluster) GetPartitionNum() int
- func (cluster *Cluster) IsSameDCFirst() bool
- func (cluster *Cluster) MaybeTriggerCheckForError(err error, delay time.Duration) bool
- type Conf
- type HashElem
- type HostStats
- type LargeKeyConf
- type LevelLogger
- func (self *LevelLogger) Debugf(f string, args ...interface{})
- func (self *LevelLogger) Detailf(f string, args ...interface{})
- func (self *LevelLogger) Errorf(f string, args ...interface{})
- func (self *LevelLogger) Errorln(f string)
- func (self *LevelLogger) Flush()
- func (self *LevelLogger) Infof(f string, args ...interface{})
- func (self *LevelLogger) Infoln(f string)
- func (self *LevelLogger) Level() int32
- func (self *LevelLogger) Printf(f string, args ...interface{})
- func (self *LevelLogger) SetLevel(l int32)
- func (self *LevelLogger) Warningf(f string, args ...interface{})
- func (self *LevelLogger) Warningln(f string)
- type Logger
- type MultiClusterConf
- type NodeInfo
- type PKey
- type PartitionAddrInfo
- type PartitionAddrs
- type PartitionInfo
- type PartitionNodeInfo
- type Partitions
- type PipelineCmd
- type PipelineCmdList
- type PoolType
- type RedisHost
- func (rh *RedisHost) Addr() string
- func (rh *RedisHost) ChangeMaxActive(maxActive int, rangeRatio float64)
- func (rh *RedisHost) CloseConn()
- func (rh *RedisHost) ConnPool(poolType PoolType) *redis.QueuePool
- func (rh *RedisHost) GrpcAddr() string
- func (rh *RedisHost) IncSuccess()
- func (rh *RedisHost) InitConnPool(newFn func() (redis.Conn, error), newRangeFn func() (redis.Conn, error), ...)
- func (rh *RedisHost) MaybeIncFailed(err error)
- func (rh *RedisHost) Refresh()
- func (rh *RedisHost) Stats() HostStats
- type RemoteClusterConf
- type ScanKey
- type SimpleLogger
- type ZSetElem
- type ZanRedisClient
- func (client *ZanRedisClient) AdvScan(tp, set string, count int, cursor []byte) ([]byte, [][]byte, error)
- func (client *ZanRedisClient) AdvScanChannel(tp, set string, stopC chan struct{}) chan []byte
- func (client *ZanRedisClient) DoFullScan(cmd, tp, set string, count int, cursor []byte) ([]byte, []interface{}, error)
- func (client *ZanRedisClient) DoFullScanChannel(tp, set string, stopC chan struct{}) chan interface{}
- func (self *ZanRedisClient) DoRedis(cmd string, shardingKey []byte, toLeader bool, args ...interface{}) (interface{}, error)
- func (self *ZanRedisClient) DoRedisForException(cmd string, shardingKey []byte, toLeader bool, args ...interface{}) (interface{}, error)
- func (self *ZanRedisClient) DoRedisForRead(cmd string, shardingKey []byte, toLeader bool, args ...interface{}) (interface{}, error)
- func (self *ZanRedisClient) DoRedisForWrite(cmd string, shardingKey []byte, toLeader bool, args ...interface{}) (interface{}, error)
- func (self *ZanRedisClient) DoRedisTryLocalRead(cmd string, shardingKey []byte, toLeader bool, args ...interface{}) (interface{}, error)
- func (client *ZanRedisClient) DoScan(cmd, tp, set string, count int, cursor []byte) ([]byte, [][]byte, error)
- func (client *ZanRedisClient) DoScanChannel(cmd, tp, set string, stopC chan struct{}) chan []byte
- func (self *ZanRedisClient) FlushAndWaitPipelineCmd(cmds PipelineCmdList) ([]interface{}, []error)
- func (client *ZanRedisClient) FullScan(tp, set string, count int, cursor []byte) ([]byte, []interface{}, error)
- func (client *ZanRedisClient) FullScanChannel(tp, set string, stopC chan struct{}) chan interface{}
- func (client *ZanRedisClient) HScan(set string, key []byte, count int, cursor []byte) ([]byte, []HashElem, error)
- func (client *ZanRedisClient) HScanChannel(set string, key []byte, stopC chan struct{}) chan []byte
- func (self *ZanRedisClient) KVDel(set string, key []byte) (int, error)
- func (self *ZanRedisClient) KVGet(set string, key []byte) ([]byte, error)
- func (self *ZanRedisClient) KVMDel(readLeader bool, pKeys ...*PKey) (int64, error)
- func (self *ZanRedisClient) KVMExists(readLeader bool, pKeys ...*PKey) (int64, error)
- func (self *ZanRedisClient) KVMGet(readLeader bool, pKeys ...*PKey) ([][]byte, error)
- func (client *ZanRedisClient) KVScan(set string, count int, cursor []byte) ([]byte, [][]byte, error)
- func (client *ZanRedisClient) KVScanChannel(set string, stopC chan struct{}) chan []byte
- func (self *ZanRedisClient) KVSet(set string, key []byte, value []byte) error
- func (self *ZanRedisClient) KVSetNX(set string, key []byte, value []byte) (int, error)
- func (client *ZanRedisClient) SScan(set string, key []byte, count int, cursor []byte) ([]byte, [][]byte, error)
- func (client *ZanRedisClient) SScanChannel(set string, key []byte, stopC chan struct{}) chan []byte
- func (self *ZanRedisClient) SetLargeKeyConf(conf *LargeKeyConf)
- func (self *ZanRedisClient) Start()
- func (self *ZanRedisClient) Stop()
- func (self *ZanRedisClient) SwitchSameDC(useSameDC bool)
- func (client *ZanRedisClient) ZScan(set string, key []byte, count int, cursor []byte) ([]byte, []ZSetElem, error)
- func (client *ZanRedisClient) ZScanChannel(set string, key []byte, stopC chan struct{}) chan ZSetElem
Constants ¶
const ( DefaultConnPoolMaxActive = 400 DefaultConnPoolMaxIdle = 3 )
const ( LOG_ERR int32 = iota LOG_INFO LOG_DEBUG )
const (
MinRetrySleep = time.Millisecond * 10
)
Variables ¶
var ( RetryFailedInterval = time.Second * 5 MaxRetryInterval = time.Minute NextRetryFailedInterval = time.Minute * 2 ErrCntForStopRW = 3 LargeKeyPoolNum = 3 )
var ( FailedOnClusterChanged = "ERR_CLUSTER_CHANGED" FailedOnNotLeader = "E_FAILED_ON_NOT_LEADER" FailedOnNotWritable = "E_FAILED_ON_NOT_WRITABLE" FailedOnNodeStopped = "the node stopped" )
var ErrSizeExceedLimit = errors.New("key value size exceeded the limit")
Functions ¶
func DoRedisCmd ¶
func FindString ¶
func GetHashedPartitionID ¶
func IsConnectRefused ¶
func IsFailedOnNotWritable ¶
Types ¶
type Cluster ¶
func NewCluster ¶
func NewCluster(conf *Conf, largeConf *LargeKeyConf) *Cluster
func (*Cluster) ChangeMaxActive ¶ added in v0.3.0
func (*Cluster) GetAllHostsByPart ¶
func (*Cluster) GetConnForLarge ¶ added in v0.3.0
func (*Cluster) GetConnsByHosts ¶
func (*Cluster) GetConnsForAllParts ¶
func (*Cluster) GetHostAndConn ¶
func (*Cluster) GetHostAndConnForLarge ¶ added in v0.3.0
func (*Cluster) GetHostByPart ¶
func (*Cluster) GetHostStats ¶ added in v0.3.0
func (*Cluster) GetNodeHost ¶
func (*Cluster) GetPartitionNum ¶
func (*Cluster) IsSameDCFirst ¶ added in v0.3.0
type Conf ¶
type Conf struct { LookupList []string // multi conf and lookuplist should not be used both MultiConf MultiClusterConf DialTimeout time.Duration ReadTimeout time.Duration RangeReadTimeout time.Duration WriteTimeout time.Duration IdleTimeout time.Duration MaxActiveConn int // idle num that will be kept for all idle connections MaxIdleConn int // default 0.4 RangeConnRatio float64 TendInterval int64 Namespace string Password string // the datacenter info for client // will be used for a single cluster acrossing datacenter DC string }
func (*Conf) CheckValid ¶ added in v0.3.0
type LargeKeyConf ¶ added in v0.3.0
type LargeKeyConf struct { MinPoolSize int GetConnTimeoutForLargeKey time.Duration MaxAllowedValueSize int }
This configuration will be used to isolate the large key write and exception key access. Write a value large than max allowed size will return error and for MaxAllowedValueSize > value > MaxAllowedValueSize/2, a isolated pool with only MinPoolSize connection will be used MaxAllowedValueSize/2 > value > MaxAllowedValueSize/4, a isolated pool with only 2*MinPoolSize connections will be used MaxAllowedValueSize/4 > value > MaxAllowedValueSize/8, a isolated pool with only 4*MinPoolSize connections will be used for command with more than 1024 arguments will use the isolated pool with only MinPoolSize connection for exception command will use the isolated pool with only MinPoolSize connection
func NewLargeKeyConf ¶ added in v0.3.0
func NewLargeKeyConf() *LargeKeyConf
type LevelLogger ¶
type LevelLogger struct { Logger Logger // contains filtered or unexported fields }
func NewLevelLogger ¶
func NewLevelLogger(l int32, logger Logger) *LevelLogger
func (*LevelLogger) Debugf ¶
func (self *LevelLogger) Debugf(f string, args ...interface{})
func (*LevelLogger) Detailf ¶
func (self *LevelLogger) Detailf(f string, args ...interface{})
func (*LevelLogger) Errorf ¶
func (self *LevelLogger) Errorf(f string, args ...interface{})
func (*LevelLogger) Errorln ¶
func (self *LevelLogger) Errorln(f string)
func (*LevelLogger) Flush ¶
func (self *LevelLogger) Flush()
func (*LevelLogger) Infof ¶
func (self *LevelLogger) Infof(f string, args ...interface{})
func (*LevelLogger) Infoln ¶
func (self *LevelLogger) Infoln(f string)
func (*LevelLogger) Level ¶
func (self *LevelLogger) Level() int32
func (*LevelLogger) Printf ¶
func (self *LevelLogger) Printf(f string, args ...interface{})
used only for wrap call (for other logger interface)
func (*LevelLogger) SetLevel ¶
func (self *LevelLogger) SetLevel(l int32)
func (*LevelLogger) Warningf ¶
func (self *LevelLogger) Warningf(f string, args ...interface{})
func (*LevelLogger) Warningln ¶
func (self *LevelLogger) Warningln(f string)
type MultiClusterConf ¶ added in v0.3.0
type MultiClusterConf []RemoteClusterConf
func (MultiClusterConf) CheckValid ¶ added in v0.3.0
func (mcc MultiClusterConf) CheckValid() error
type PKey ¶
func (*PKey) ShardingKey ¶
type PartitionAddrInfo ¶
type PartitionAddrs ¶
type PartitionAddrs struct { PNum int PList []PartitionAddrInfo }
type PartitionInfo ¶
type PartitionNodeInfo ¶
type PartitionNodeInfo struct { Leader node `json:"leader"` Replicas []node `json:"replicas"` }
type Partitions ¶
type Partitions struct { PNum int Epoch int64 PList []PartitionInfo }
type PipelineCmd ¶
type PipelineCmdList ¶
type PipelineCmdList []PipelineCmd
type RedisHost ¶
type RedisHost struct {
// contains filtered or unexported fields
}
func (*RedisHost) ChangeMaxActive ¶ added in v0.3.0
func (*RedisHost) IncSuccess ¶
func (rh *RedisHost) IncSuccess()
func (*RedisHost) InitConnPool ¶
func (*RedisHost) MaybeIncFailed ¶
type RemoteClusterConf ¶ added in v0.3.0
type ScanKey ¶
type SimpleLogger ¶
type SimpleLogger struct {
// contains filtered or unexported fields
}
func NewSimpleLogger ¶
func NewSimpleLogger() *SimpleLogger
func (*SimpleLogger) Flush ¶
func (self *SimpleLogger) Flush()
func (*SimpleLogger) Output ¶
func (self *SimpleLogger) Output(depth int, s string)
func (*SimpleLogger) OutputErr ¶
func (self *SimpleLogger) OutputErr(depth int, s string)
type ZanRedisClient ¶
type ZanRedisClient struct {
// contains filtered or unexported fields
}
func NewZanRedisClient ¶
func NewZanRedisClient(conf *Conf) (*ZanRedisClient, error)
func (*ZanRedisClient) AdvScanChannel ¶
func (client *ZanRedisClient) AdvScanChannel(tp, set string, stopC chan struct{}) chan []byte
func (*ZanRedisClient) DoFullScan ¶
func (*ZanRedisClient) DoFullScanChannel ¶
func (client *ZanRedisClient) DoFullScanChannel(tp, set string, stopC chan struct{}) chan interface{}
func (*ZanRedisClient) DoRedis ¶
func (self *ZanRedisClient) DoRedis(cmd string, shardingKey []byte, toLeader bool, args ...interface{}) (interface{}, error)
func (*ZanRedisClient) DoRedisForException ¶ added in v0.3.0
func (self *ZanRedisClient) DoRedisForException(cmd string, shardingKey []byte, toLeader bool, args ...interface{}) (interface{}, error)
func (*ZanRedisClient) DoRedisForRead ¶ added in v0.3.0
func (self *ZanRedisClient) DoRedisForRead(cmd string, shardingKey []byte, toLeader bool, args ...interface{}) (interface{}, error)
func (*ZanRedisClient) DoRedisForWrite ¶ added in v0.3.0
func (self *ZanRedisClient) DoRedisForWrite(cmd string, shardingKey []byte, toLeader bool, args ...interface{}) (interface{}, error)
func (*ZanRedisClient) DoRedisTryLocalRead ¶ added in v0.3.0
func (self *ZanRedisClient) DoRedisTryLocalRead(cmd string, shardingKey []byte, toLeader bool, args ...interface{}) (interface{}, error)
DoRedisTryLocalRead should be used only for read command. If there is a local dc cluster and read the non-leader is allowed, it will try read local dc cluster first. If failed to read local dc for 2 times it will retry the primary dc cluster.
func (*ZanRedisClient) DoScanChannel ¶
func (client *ZanRedisClient) DoScanChannel(cmd, tp, set string, stopC chan struct{}) chan []byte
func (*ZanRedisClient) FlushAndWaitPipelineCmd ¶
func (self *ZanRedisClient) FlushAndWaitPipelineCmd(cmds PipelineCmdList) ([]interface{}, []error)
func (*ZanRedisClient) FullScanChannel ¶
func (client *ZanRedisClient) FullScanChannel(tp, set string, stopC chan struct{}) chan interface{}
func (*ZanRedisClient) HScanChannel ¶
func (client *ZanRedisClient) HScanChannel(set string, key []byte, stopC chan struct{}) chan []byte
func (*ZanRedisClient) KVDel ¶
func (self *ZanRedisClient) KVDel(set string, key []byte) (int, error)
func (*ZanRedisClient) KVGet ¶
func (self *ZanRedisClient) KVGet(set string, key []byte) ([]byte, error)
func (*ZanRedisClient) KVMDel ¶
func (self *ZanRedisClient) KVMDel(readLeader bool, pKeys ...*PKey) (int64, error)
func (*ZanRedisClient) KVMExists ¶
func (self *ZanRedisClient) KVMExists(readLeader bool, pKeys ...*PKey) (int64, error)
func (*ZanRedisClient) KVMGet ¶
func (self *ZanRedisClient) KVMGet(readLeader bool, pKeys ...*PKey) ([][]byte, error)
func (*ZanRedisClient) KVScanChannel ¶
func (client *ZanRedisClient) KVScanChannel(set string, stopC chan struct{}) chan []byte
func (*ZanRedisClient) KVSet ¶
func (self *ZanRedisClient) KVSet(set string, key []byte, value []byte) error
func (*ZanRedisClient) SScanChannel ¶
func (client *ZanRedisClient) SScanChannel(set string, key []byte, stopC chan struct{}) chan []byte
func (*ZanRedisClient) SetLargeKeyConf ¶ added in v0.3.0
func (self *ZanRedisClient) SetLargeKeyConf(conf *LargeKeyConf)
func (*ZanRedisClient) Start ¶
func (self *ZanRedisClient) Start()
func (*ZanRedisClient) Stop ¶
func (self *ZanRedisClient) Stop()
func (*ZanRedisClient) SwitchSameDC ¶
func (self *ZanRedisClient) SwitchSameDC(useSameDC bool)
while deploy across two datacenters, to improve read latency we can enable this, and set toLeader=false while calling DoRedis if there is no node in the same data center, we will fallback to the random node in other dc.
func (*ZanRedisClient) ZScanChannel ¶
func (client *ZanRedisClient) ZScanChannel(set string, key []byte, stopC chan struct{}) chan ZSetElem