Documentation ¶
Index ¶
- Constants
- Variables
- func Hash(key []byte) uint32
- func OpFails() int64
- func OpQPS() int64
- func OpRedisErrors() int64
- func OpTotal() int64
- func ResetStats()
- func SessionsAlive() int64
- func SessionsTotal() int64
- type ApiClient
- func (c *ApiClient) FillSlots(slots ...*models.Slot) error
- func (c *ApiClient) ForceGC() error
- func (c *ApiClient) LogLevel(level log.LogLevel) error
- func (c *ApiClient) Model() (*models.Proxy, error)
- func (c *ApiClient) Overview() (*Overview, error)
- func (c *ApiClient) ResetStats() error
- func (c *ApiClient) RewatchSentinels() error
- func (c *ApiClient) SetSentinels(sentinel *models.Sentinel) error
- func (c *ApiClient) SetXAuth(name, auth string, token string)
- func (c *ApiClient) Shutdown() error
- func (c *ApiClient) Slots() ([]*models.Slot, error)
- func (c *ApiClient) Start() error
- func (c *ApiClient) Stats(flags StatsFlags) (*Stats, error)
- func (c *ApiClient) StatsSimple() (*Stats, error)
- func (c *ApiClient) XPing() error
- type BackendConn
- type Config
- type Delay
- type DelayExp2
- type Jodis
- type OpFlag
- type OpInfo
- type OpStats
- type Overview
- type Proxy
- func (s *Proxy) Close() error
- func (s *Proxy) Config() *Config
- func (s *Proxy) FillSlot(m *models.Slot) error
- func (s *Proxy) FillSlots(slots []*models.Slot) error
- func (s *Proxy) GetSentinels() ([]string, map[int]string)
- func (s *Proxy) HasSwitched() bool
- func (s *Proxy) IsClosed() bool
- func (s *Proxy) IsOnline() bool
- func (s *Proxy) Model() *models.Proxy
- func (s *Proxy) Overview(flags StatsFlags) *Overview
- func (s *Proxy) RewatchSentinels() error
- func (s *Proxy) SetSentinels(servers []string) error
- func (s *Proxy) Slots() []*models.Slot
- func (s *Proxy) Start() error
- func (s *Proxy) Stats(flags StatsFlags) *Stats
- func (s *Proxy) SwitchMasters(masters map[int]string) error
- func (s *Proxy) XAuth() string
- type Request
- type RequestChan
- func (c *RequestChan) Buffered() int
- func (c *RequestChan) Close()
- func (c *RequestChan) IsEmpty() bool
- func (c *RequestChan) PopFront() (*Request, bool)
- func (c *RequestChan) PopFrontAll(onRequest func(r *Request) error) error
- func (c *RequestChan) PopFrontAllVoid(onRequest func(r *Request))
- func (c *RequestChan) PushBack(r *Request) int
- type Router
- func (s *Router) Close()
- func (s *Router) FillSlot(m *models.Slot) error
- func (s *Router) GetSlot(id int) *models.Slot
- func (s *Router) GetSlots() []*models.Slot
- func (s *Router) HasSwitched() bool
- func (s *Router) KeepAlive() error
- func (s *Router) Start()
- func (s *Router) SwitchMasters(masters map[int]string) error
- type RuntimeStats
- type Session
- type Slot
- type Stats
- type StatsFlags
- type SysUsage
Constants ¶
View Source
const ( FlagWrite = 1 << iota FlagMasterOnly FlagMayWrite FlagNotAllow )
View Source
const ( StatsCmds = StatsFlags(1 << iota) StatsSlots StatsRuntime StatsFull = StatsFlags(^uint32(0)) )
View Source
const DefaultConfig = `` /* 3902-byte string literal not displayed */
View Source
const DefaultRequestChanBuffer = 128
View Source
const GOLDEN_RATIO_PRIME_32 = 0x9e370001
View Source
const MaxOpStrLen = 64
View Source
const MaxSlotNum = models.MaxSlotNum
Variables ¶
View Source
var ( ErrBackendConnReset = errors.New("backend conn reset") ErrRequestIsBroken = errors.New("request is broken") )
View Source
var ( ErrSlotIsNotReady = errors.New("slot is not ready, may be offline") ErrRespIsRequired = errors.New("resp is required") )
View Source
var ( ErrBadMultiBulk = errors.New("bad multi-bulk for command") ErrBadOpStrLen = errors.New("bad command length, too short or too long") )
View Source
var ( ErrClosedRouter = errors.New("use of closed router") ErrInvalidSlotId = errors.New("use of invalid slot id") ErrInvalidMethod = errors.New("use of invalid forwarder method") )
View Source
var ( ErrRouterNotOnline = errors.New("router is not online") ErrTooManySessions = errors.New("too many sessions") ErrTooManyPipelinedRequests = errors.New("too many pipelined requests") )
View Source
var ErrClosedJodis = errors.New("use of closed jodis")
View Source
var ErrClosedProxy = errors.New("use of closed proxy")
View Source
var RespOK = redis.NewString([]byte("OK"))
Functions ¶
func OpRedisErrors ¶
func OpRedisErrors() int64
func ResetStats ¶
func ResetStats()
func SessionsAlive ¶
func SessionsAlive() int64
func SessionsTotal ¶
func SessionsTotal() int64
Types ¶
type ApiClient ¶
type ApiClient struct {
// contains filtered or unexported fields
}
func NewApiClient ¶
func (*ApiClient) ResetStats ¶
func (*ApiClient) RewatchSentinels ¶
func (*ApiClient) StatsSimple ¶
type BackendConn ¶
type BackendConn struct {
// contains filtered or unexported fields
}
func NewBackendConn ¶
func NewBackendConn(addr string, database int, config *Config) *BackendConn
每个backendConn会创建对应的reader和writer
func (*BackendConn) Addr ¶
func (bc *BackendConn) Addr() string
func (*BackendConn) Close ¶
func (bc *BackendConn) Close()
func (*BackendConn) IsConnected ¶
func (bc *BackendConn) IsConnected() bool
func (*BackendConn) KeepAlive ¶
func (bc *BackendConn) KeepAlive() bool
func (*BackendConn) PushBack ¶
func (bc *BackendConn) PushBack(r *Request)
请求放入BackendConn等待 处理。如果request的sync.WaitGroup不为空,就加一,然后判断加一之后的值,如果加一之后couter为0,那么所有阻塞在counter上的goroutine都会得到释放 将请求直接存入到BackendConn的chan *Request中,等待后续被取出并进行处理
type Config ¶
type Config struct { ProtoType string `toml:"proto_type" json:"proto_type"` ProxyAddr string `toml:"proxy_addr" json:"proxy_addr"` AdminAddr string `toml:"admin_addr" json:"admin_addr"` HostProxy string `toml:"-" json:"-"` HostAdmin string `toml:"-" json:"-"` JodisName string `toml:"jodis_name" json:"jodis_name"` JodisAddr string `toml:"jodis_addr" json:"jodis_addr"` JodisAuth string `toml:"jodis_auth" json:"jodis_auth"` JodisTimeout timesize.Duration `toml:"jodis_timeout" json:"jodis_timeout"` JodisCompatible bool `toml:"jodis_compatible" json:"jodis_compatible"` ProductName string `toml:"product_name" json:"product_name"` ProductAuth string `toml:"product_auth" json:"-"` SessionAuth string `toml:"session_auth" json:"-"` ProxyDataCenter string `toml:"proxy_datacenter" json:"proxy_datacenter"` ProxyMaxClients int `toml:"proxy_max_clients" json:"proxy_max_clients"` ProxyMaxOffheapBytes bytesize.Int64 `toml:"proxy_max_offheap_size" json:"proxy_max_offheap_size"` ProxyHeapPlaceholder bytesize.Int64 `toml:"proxy_heap_placeholder" json:"proxy_heap_placeholder"` BackendPingPeriod timesize.Duration `toml:"backend_ping_period" json:"backend_ping_period"` BackendRecvBufsize bytesize.Int64 `toml:"backend_recv_bufsize" json:"backend_recv_bufsize"` BackendRecvTimeout timesize.Duration `toml:"backend_recv_timeout" json:"backend_recv_timeout"` BackendSendBufsize bytesize.Int64 `toml:"backend_send_bufsize" json:"backend_send_bufsize"` BackendSendTimeout timesize.Duration `toml:"backend_send_timeout" json:"backend_send_timeout"` BackendMaxPipeline int `toml:"backend_max_pipeline" json:"backend_max_pipeline"` BackendPrimaryOnly bool `toml:"backend_primary_only" json:"backend_primary_only"` BackendPrimaryParallel int `toml:"backend_primary_parallel" json:"backend_primary_parallel"` BackendReplicaParallel int `toml:"backend_replica_parallel" json:"backend_replica_parallel"` BackendKeepAlivePeriod timesize.Duration `toml:"backend_keepalive_period" json:"backend_keepalive_period"` BackendNumberDatabases int32 `toml:"backend_number_databases" json:"backend_number_databases"` SessionRecvBufsize bytesize.Int64 `toml:"session_recv_bufsize" json:"session_recv_bufsize"` SessionRecvTimeout timesize.Duration `toml:"session_recv_timeout" json:"session_recv_timeout"` SessionSendBufsize bytesize.Int64 `toml:"session_send_bufsize" json:"session_send_bufsize"` SessionSendTimeout timesize.Duration `toml:"session_send_timeout" json:"session_send_timeout"` SessionMaxPipeline int `toml:"session_max_pipeline" json:"session_max_pipeline"` SessionKeepAlivePeriod timesize.Duration `toml:"session_keepalive_period" json:"session_keepalive_period"` SessionBreakOnFailure bool `toml:"session_break_on_failure" json:"session_break_on_failure"` MetricsReportServer string `toml:"metrics_report_server" json:"metrics_report_server"` MetricsReportPeriod timesize.Duration `toml:"metrics_report_period" json:"metrics_report_period"` MetricsReportInfluxdbServer string `toml:"metrics_report_influxdb_server" json:"metrics_report_influxdb_server"` MetricsReportInfluxdbPeriod timesize.Duration `toml:"metrics_report_influxdb_period" json:"metrics_report_influxdb_period"` MetricsReportInfluxdbUsername string `toml:"metrics_report_influxdb_username" json:"metrics_report_influxdb_username"` MetricsReportInfluxdbPassword string `toml:"metrics_report_influxdb_password" json:"-"` MetricsReportInfluxdbDatabase string `toml:"metrics_report_influxdb_database" json:"metrics_report_influxdb_database"` MetricsReportStatsdServer string `toml:"metrics_report_statsd_server" json:"metrics_report_statsd_server"` MetricsReportStatsdPeriod timesize.Duration `toml:"metrics_report_statsd_period" json:"metrics_report_statsd_period"` MetricsReportStatsdPrefix string `toml:"metrics_report_statsd_prefix" json:"metrics_report_statsd_prefix"` }
func NewDefaultConfig ¶
func NewDefaultConfig() *Config
func (*Config) LoadFromFile ¶
type DelayExp2 ¶
func (*DelayExp2) SleepWithCancel ¶
type Jodis ¶
type Jodis struct {
// contains filtered or unexported fields
}
客户端的连接
func (*Jodis) IsWatching ¶
type OpFlag ¶
type OpFlag uint32
func (OpFlag) IsMasterOnly ¶
func (OpFlag) IsNotAllowed ¶
func (OpFlag) IsReadOnly ¶
type OpStats ¶
type OpStats struct { OpStr string `json:"opstr"` Calls int64 `json:"calls"` Usecs int64 `json:"usecs"` UsecsPercall int64 `json:"usecs_percall"` Fails int64 `json:"fails"` RedisErrType int64 `json:"redis_errtype"` }
func GetOpStatsAll ¶
func GetOpStatsAll() []*OpStats
type Proxy ¶
type Proxy struct {
// contains filtered or unexported fields
}
func (*Proxy) HasSwitched ¶
func (*Proxy) Overview ¶
func (s *Proxy) Overview(flags StatsFlags) *Overview
func (*Proxy) RewatchSentinels ¶
func (*Proxy) SetSentinels ¶
func (*Proxy) Start ¶
将Proxy自身,和它的router和jodis设为上线 在zk中创建临时节点/jodis/codis-productName/proxy-token,并监听该节点的变化
func (*Proxy) Stats ¶
func (s *Proxy) Stats(flags StatsFlags) *Stats
type Request ¶
type Request struct { // 最开始客户端的请求(redis命令数组) Multi []*redis.Resp // 批量子请求控制器 // 控制前端请求处理进度 前端传给后端请求时+1 后端处理完之后-1 Batch *sync.WaitGroup // TODO: 这里有误:当需要进行命令拆分, 例如mget, 会将不同的命令路由到不同的后端server/slots // 其实是保证group slots映射关系. 每次在fillSlots之前会wait()住 Group *sync.WaitGroup Broken *atomic2.Bool // redis请求的具体命令 OpStr string // 命令对应标志 快慢/读写 OpFlag // 当前哪个数据库 select相关 Database int32 UnixNano int64 // 最终恢复给前端的rsp *redis.Resp Err error Coalesce func() error }
请求处理的内部实体类
func (*Request) MakeSubRequest ¶
type RequestChan ¶
type RequestChan struct {
// contains filtered or unexported fields
}
每一个session都会有一个RequestChan
func NewRequestChan ¶
func NewRequestChan() *RequestChan
func NewRequestChanBuffer ¶
func NewRequestChanBuffer(n int) *RequestChan
func (*RequestChan) Buffered ¶
func (c *RequestChan) Buffered() int
func (*RequestChan) Close ¶
func (c *RequestChan) Close()
func (*RequestChan) IsEmpty ¶
func (c *RequestChan) IsEmpty() bool
func (*RequestChan) PopFront ¶
func (c *RequestChan) PopFront() (*Request, bool)
func (*RequestChan) PopFrontAll ¶
func (c *RequestChan) PopFrontAll(onRequest func(r *Request) error) error
func (*RequestChan) PopFrontAllVoid ¶
func (c *RequestChan) PopFrontAllVoid(onRequest func(r *Request))
func (*RequestChan) PushBack ¶
func (c *RequestChan) PushBack(r *Request) int
type Router ¶
type Router struct {
// contains filtered or unexported fields
}
存储了集群中所有的sharedBackendConnPool和slot 用于将redis请求转发给相应的slot进行处理
func (*Router) HasSwitched ¶
type RuntimeStats ¶
type RuntimeStats struct { General struct { Alloc uint64 `json:"alloc"` Sys uint64 `json:"sys"` Lookups uint64 `json:"lookups"` Mallocs uint64 `json:"mallocs"` Frees uint64 `json:"frees"` } `json:"general"` Heap struct { Alloc uint64 `json:"alloc"` Sys uint64 `json:"sys"` Idle uint64 `json:"idle"` Inuse uint64 `json:"inuse"` Objects uint64 `json:"objects"` } `json:"heap"` GC struct { Num uint32 `json:"num"` CPUFraction float64 `json:"cpu_fraction"` TotalPauseMs uint64 `json:"total_pausems"` } `json:"gc"` NumProcs int `json:"num_procs"` NumGoroutines int `json:"num_goroutines"` NumCgoCall int64 `json:"num_cgo_call"` MemOffheap int64 `json:"mem_offheap"` }
type Session ¶
type Session struct { // 与客户端的链接 Conn *redis.Conn // 请求数量 Ops int64 // 上次创建时间 CreateUnix int64 // 上次请求时间 LastOpUnix int64 // contains filtered or unexported fields }
func NewSession ¶
每接到一个redis请求,就创建一个独立的session进行处理(默认的每个session的tcp连接过期时间为75秒,也就是每个请求最多处理75秒)。 这里的第一个参数是net.Conn,Conn是一个通用的面向流的网络连接,多个goroutines可以同时调用Conn的方法 这里的net.Conn就是我们之前Proxy的lproxy这个Listener监听到的19000请求到来的时候返回的net.Conn
func (*Session) CloseReaderWithError ¶
func (*Session) CloseWithError ¶
type Stats ¶
type Stats struct { Online bool `json:"online"` Closed bool `json:"closed"` Sentinels struct { Servers []string `json:"servers,omitempty"` Masters map[string]string `json:"masters,omitempty"` Switched bool `json:"switched,omitempty"` } `json:"sentinels"` Ops struct { Total int64 `json:"total"` Fails int64 `json:"fails"` Redis struct { Errors int64 `json:"errors"` } `json:"redis"` QPS int64 `json:"qps"` Cmd []*OpStats `json:"cmd,omitempty"` } `json:"ops"` Sessions struct { Total int64 `json:"total"` Alive int64 `json:"alive"` } `json:"sessions"` Rusage struct { Now string `json:"now"` CPU float64 `json:"cpu"` Mem int64 `json:"mem"` Raw *utils.Usage `json:"raw,omitempty"` } `json:"rusage"` Backend struct { PrimaryOnly bool `json:"primary_only"` } `json:"backend"` Runtime *RuntimeStats `json:"runtime,omitempty"` }
type StatsFlags ¶
type StatsFlags uint32
func (StatsFlags) HasBit ¶
func (s StatsFlags) HasBit(m StatsFlags) bool
Source Files ¶
Click to show internal directories.
Click to hide internal directories.