Documentation ¶
Index ¶
- Constants
- Variables
- func AddCommand(list map[string]*Cmd)
- func ConvertLuaTable(l *lua.LState, value lua.LValue) []string
- func ErrLuaParseError(err error) string
- func ErrWrongNumber(cmd string) string
- func InitLuaPool(s *Server)
- func LowerSlice(buf []byte) []byte
- func LuaToRedis(l *lua.LState, c *Client, value lua.LValue)
- func MkLuaFuncs(srv *Server) map[string]lua.LGFunction
- func ParseReply(rd *bufio.Reader) (interface{}, error)
- func ParseSetArgs(args [][]byte) (e ExpireType, t int64, c SetCondition, err error)
- func PutLuaClientToPool(l *LuaClient)
- func PutRaftClientToPool(c *Client)
- func RunCpuAdjuster(s *Server)
- func RunInfoCollection(s *Server)
- func StringSlice(b [][]byte) []string
- type Client
- func (c *Client) ApplyDB(raftSyncCostNs int64) error
- func (c *Client) Close()
- func (c *Client) FormatData(reqData [][]byte)
- func (c *Client) GetInfo() *SInfo
- func (c *Client) HandleRequest(reqData [][]byte, isHashTag bool) (err error)
- func (c *Client) RaftSync() error
- func (c *Client) ResetQueryStartTime()
- type Cmd
- type DbSyncStatusType
- type ExpireType
- type LuaClient
- type ModelType
- type SInfo
- type SRuntimeStats
- type Server
- func (s *Server) Close()
- func (s *Server) FlushCallback(compactIndex uint64)
- func (s *Server) GetCommand(c string) *Cmd
- func (s *Server) GetDB() *engine.Bitalos
- func (s *Server) IsClosed() bool
- func (s *Server) ListenAndServe()
- func (s *Server) OnBoot(eng gnet.Engine) (action gnet.Action)
- func (s *Server) OnClose(conn gnet.Conn, err error) (action gnet.Action)
- func (s *Server) OnOpen(conn gnet.Conn) (out []byte, action gnet.Action)
- func (s *Server) OnTraffic(conn gnet.Conn) (action gnet.Action)
- func (s *Server) PrepareSnapshot() (ls interface{}, err error)
- func (s *Server) RecoverFromSnapshot(r io.Reader, done <-chan struct{}) error
- func (s *Server) RunDeleteExpireDataTask()
- func (s *Server) SaveSnapshot(ctx interface{}, w io.Writer, done <-chan struct{}) error
- type SetCondition
- type SinfoClient
- type SinfoCluster
- type SinfoData
- type SinfoServer
- type SinfoStats
- type TxLocker
- type TxShardLocker
- type TxWatchKey
Constants ¶
View Source
const ( TxStateNone int = 0 TxStateWatch int = 0x1 TxStateMulti int = 0x2 TxStatePrepare int = 0x4 )
View Source
const ( PrepareStateNone = iota PrepareStateKeyModified PrepareStateLockFail PrepareStateLocked PrepareStateUnlock )
View Source
const ( CONFIGGET = "GET" CONFIGSET = "SET" )
View Source
const ( MsgWrongType = "WRONGTYPE Operation against a key holding the wrong kind of value" MsgInvalidInt = "ERR value is not an integer or out of range" MsgInvalidFloat = "ERR value is not a valid float" MsgInvalidMinMax = "ERR min or max is not a float" MsgInvalidRangeItem = "ERR min or max not valid string range item" MsgInvalidTimeout = "ERR timeout is not a float or out of range" MsgErrSyntaxor = "ERR syntax error" MsgKeyNotFound = "ERR no such key" MsgOutOfRange = "ERR index out of range" MsgInvalidCursor = "ERR invalid cursor" MsgXXandNX = "ERR XX and NX options at the same time are not compatible" MsgNegTimeout = "ERR timeout is negative" MsgInvalidSETime = "ERR invalid expire time in set" MsgInvalidSETEXTime = "ERR invalid expire time in setex" MsgInvalidPSETEXTime = "ERR invalid expire time in psetex" MsgInvalidKeysNumber = "ERR Number of keys can't be greater than number of args" MsgNegativeKeysNumber = "ERR Number of keys can't be negative" MsgFScriptUsage = "ERR Unknown subcommand or wrong number of arguments for '%s'. Try SCRIPT HELP." MsgFPubsubUsage = "ERR Unknown subcommand or wrong number of arguments for '%s'. Try PUBSUB HELP." MsgSingleElementPair = "ERR INCR option supports a single increment-element pair" MsgInvalidStreamID = "ERR Invalid stream ID specified as stream command argument" MsgStreamIDTooSmall = "ERR The ID specified in XADD is equal or smaller than the target stream top item" MsgStreamIDZero = "ERR The ID specified in XADD must be greater than 0-0" MsgNoScriptFound = "NOSCRIPT No matching script. Please use EVAL." MsgUnsupportedUnit = "ERR unsupported unit provided. please use m, km, ft, mi" MsgNotFromScripts = "This Redis command is not allowed from scripts" MsgXreadUnbalanced = "ERR Unbalanced XREAD list of streams: for each stream key an ID or '$' must be specified." )
View Source
const ( M_NORMAL ModelType = 0 M_OBSERVER ModelType = 1 M_WITNESS ModelType = 2 DB_SYNC_RUN_TYPE_END = 0 DB_SYNC_RUN_TYPE_SEND = 1 DB_SYNC_RUN_TYPE_RECV = 2 DB_SYNC_NOTHING = 0 DB_SYNC_PREPARE_FAIL = 1 DB_SYNC_PREPARE_SUCC = 2 DB_SYNC_SEND_FAIL = 3 DB_SYNC_SENDING = 4 DB_SYNC_SEND_SUCC = 5 DB_SYNC_RECVING_FAIL = 6 DB_SYNC_RECVING = 7 DB_SYNC_RECVING_SUCC = 8 DB_SYNC_CONN_FAIL = 9 DB_SYNC_CONN_SUCC = 10 )
Variables ¶
View Source
var ( BEFORE = []byte("before") AFTER = []byte("after") )
View Source
var LuaShardCount uint32 = 64
Functions ¶
func AddCommand ¶
func ErrLuaParseError ¶
func ErrWrongNumber ¶
func InitLuaPool ¶
func InitLuaPool(s *Server)
func LowerSlice ¶
func MkLuaFuncs ¶
func MkLuaFuncs(srv *Server) map[string]lua.LGFunction
func ParseReply ¶
func ParseSetArgs ¶
func ParseSetArgs(args [][]byte) (e ExpireType, t int64, c SetCondition, err error)
func PutLuaClientToPool ¶
func PutLuaClientToPool(l *LuaClient)
func PutRaftClientToPool ¶
func PutRaftClientToPool(c *Client)
func RunCpuAdjuster ¶
func RunCpuAdjuster(s *Server)
func RunInfoCollection ¶
func RunInfoCollection(s *Server)
func StringSlice ¶
Types ¶
type Client ¶
type Client struct { Cmd string Args [][]byte Keys []byte Data [][]byte ParseMarks []int Reader *resp.Reader Writer *resp.Writer DB *engine.Bitalos QueryStartTime time.Time KeyHash uint32 IsMaster func() bool // contains filtered or unexported fields }
func GetRaftClientFromPool ¶
func GetVmFromPool ¶
func (*Client) FormatData ¶
func (*Client) HandleRequest ¶
func (*Client) ResetQueryStartTime ¶
func (c *Client) ResetQueryStartTime()
type DbSyncStatusType ¶
type DbSyncStatusType int
func (DbSyncStatusType) String ¶
func (dst DbSyncStatusType) String() string
type ExpireType ¶
type ExpireType string
const ( EX ExpireType = "EX" PX ExpireType = "PX" NO_TYPE ExpireType = "" )
type LuaClient ¶
func GetLuaClientFromPool ¶
func GetLuaClientFromPool() *LuaClient
type SInfo ¶
type SInfo struct { Server SinfoServer Client SinfoClient Cluster SinfoCluster Stats SinfoStats Data SinfoData RuntimeStats SRuntimeStats BitalosdbUsage *bitsdb.BitsUsage }
type SRuntimeStats ¶
type SRuntimeStats struct { General struct { Alloc uint64 `json:"runtime_general_alloc"` Sys uint64 `json:"runtime_general_sys"` Lookups uint64 `json:"runtime_general_lookups"` Mallocs uint64 `json:"runtime_general_mallocs"` Frees uint64 `json:"runtime_general_frees"` } `json:"runtime_general"` Heap struct { Alloc uint64 `json:"runtime_heap_alloc"` Sys uint64 `json:"runtime_heap_sys"` Idle uint64 `json:"runtime_heap_idle"` Inuse uint64 `json:"runtime_heap_inuse"` Objects uint64 `json:"runtime_heap_objects"` } `json:"heap"` GC struct { Num uint32 `json:"runtime_gc_num"` CPUFraction float64 `json:"runtime_gc_cpu_fraction"` TotalPauseMs uint64 `json:"runtime_gc_total_pausems"` } `json:"gc"` NumProcs int `json:"runtime_num_procs"` NumGoroutines int `json:"runtime_num_goroutines"` MemoryTotal int64 `json:"memory_total"` MemoryShr int64 `json:"memory_shr"` CPU float64 `json:"cpu"` // contains filtered or unexported fields }
func (*SRuntimeStats) Marshal ¶
func (srs *SRuntimeStats) Marshal() ([]byte, func())
func (*SRuntimeStats) Samples ¶
func (srs *SRuntimeStats) Samples()
func (*SRuntimeStats) UpdateCache ¶
func (srs *SRuntimeStats) UpdateCache()
type Server ¶
type Server struct { *gnet.BuiltinEventEngine Info *SInfo IsMaster func() bool MigrateDelToSlave func(keyHash uint32, data [][]byte) error IsWitness bool DoRaftSync func(keyHash uint32, data [][]byte) ([]byte, error) DoRaftStop func() // contains filtered or unexported fields }
func (*Server) FlushCallback ¶
func (*Server) GetCommand ¶
func (*Server) ListenAndServe ¶
func (s *Server) ListenAndServe()
func (*Server) PrepareSnapshot ¶
func (*Server) RecoverFromSnapshot ¶
func (*Server) RunDeleteExpireDataTask ¶
func (s *Server) RunDeleteExpireDataTask()
type SetCondition ¶
type SetCondition string
const ( NX SetCondition = "NX" XX SetCondition = "XX" NO_CONDITION SetCondition = "" )
type SinfoClient ¶
type SinfoClient struct { ClientTotal atomic.Int64 `json:"total_clients"` ClientAlive atomic.Int64 `json:"connected_clients"` // contains filtered or unexported fields }
func (*SinfoClient) Marshal ¶
func (sc *SinfoClient) Marshal() ([]byte, func())
func (*SinfoClient) UpdateCache ¶
func (sc *SinfoClient) UpdateCache()
type SinfoCluster ¶
type SinfoCluster struct { StartModel ModelType `json:"start_model"` Status bool `json:"status"` Role string `json:"role"` ClusterId uint64 `json:"cluster_id"` CurrentNodeId uint64 `json:"current_node_id"` RaftAddress string `json:"raft_address"` LeaderNodeId uint64 `json:"leader_node_id"` LeaderAddress string `json:"leader_address"` ClusterNodes string `json:"cluster_nodes"` ClusterNodesList string `json:"cluster_nodes_list"` // contains filtered or unexported fields }
func (*SinfoCluster) Marshal ¶
func (sc *SinfoCluster) Marshal() ([]byte, func())
func (*SinfoCluster) UpdateCache ¶
func (sc *SinfoCluster) UpdateCache()
type SinfoData ¶
type SinfoData struct { UsedSize int64 `json:"used_size"` DataSize int64 `json:"data_size"` RaftNodeHostSize int64 `json:"raft_nodehost_size"` RaftWalSize int64 `json:"raft_wal_size"` SnapshotSize int64 `json:"snapshot_size"` // contains filtered or unexported fields }
func (*SinfoData) UpdateCache ¶
func (sd *SinfoData) UpdateCache()
type SinfoServer ¶
type SinfoServer struct { ProcessId int `json:"process_id"` StartTime string `json:"start_time"` ServerAddress string `json:"server_address"` MaxClient int64 `json:"max_client"` SingleDegrade bool `json:"single_degrade"` GitVersion string `json:"git_version"` Compile string `json:"compile"` ConfigFile string `json:"config_file"` AutoCompact bool `json:"auto_compact"` // contains filtered or unexported fields }
func (*SinfoServer) Marshal ¶
func (ss *SinfoServer) Marshal() ([]byte, func())
func (*SinfoServer) UpdateCache ¶
func (ss *SinfoServer) UpdateCache()
type SinfoStats ¶
type SinfoStats struct { TotolCmd atomic.Uint64 QPS atomic.Uint64 QueueLen int RaftLogIndex uint64 IsDelExpire int StartModel ModelType DbSyncRunning atomic.Int32 DbSyncStatus DbSyncStatusType DbSyncErr string IsMigrate atomic.Int32 `json:"is_migrate"` // contains filtered or unexported fields }
func (*SinfoStats) Marshal ¶
func (ss *SinfoStats) Marshal() ([]byte, func())
func (*SinfoStats) UpdateCache ¶
func (ss *SinfoStats) UpdateCache()
type TxShardLocker ¶
type TxShardLocker struct {
// contains filtered or unexported fields
}
func NewTxLockers ¶
func NewTxLockers(shards uint32) *TxShardLocker
func (*TxShardLocker) GetTxLock ¶
func (sl *TxShardLocker) GetTxLock(khash uint32) *TxLocker
func (*TxShardLocker) GetTxLockByKey ¶
func (sl *TxShardLocker) GetTxLockByKey(key []byte) *TxLocker
func (*TxShardLocker) GetWatchKey ¶
func (sl *TxShardLocker) GetWatchKey(keyStr string) *TxWatchKey
func (*TxShardLocker) GetWatchKeyWithKhash ¶
func (sl *TxShardLocker) GetWatchKeyWithKhash(khash uint32, keyStr string) *TxWatchKey
type TxWatchKey ¶
type TxWatchKey struct {
// contains filtered or unexported fields
}
Source Files ¶
Click to show internal directories.
Click to hide internal directories.