proxy

package
v1.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 6, 2024 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// write commands always go to master
	// read from master
	READ_PREFER_MASTER = iota
	// read from slave if possible
	READ_PREFER_SLAVE
	// read from slave in the same idc if possible
	READ_PREFER_SLAVE_IDC

	CLUSTER_NODES_FIELD_NUM_IP_PORT = 1
	CLUSTER_NODES_FIELD_NUM_FLAGS   = 2
	// it must be larger than any FIELD index
	CLUSTER_NODES_FIELD_SPLIT_NUM = 4
)
View Source
const (
	NumSlots                   = 16384
	CLUSTER_SLOTS_START        = 0
	CLUSTER_SLOTS_END          = 1
	CLUSTER_SLOTS_SERVER_START = 2
)
View Source
const (
	CMD_FLAG_READ = iota
	CMD_FLAG_READ_ALL
	CMD_FLAG_PROXY
	CMD_FLAG_UNKNOWN
	CMD_FLAG_GENERAL
)

Variables

View Source
var (
	REDIS_CMD_CLUSTER_SLOTS *resp.Command
	REDIS_CMD_CLUSTER_NODES *resp.Command
	REDIS_CMD_READ_ONLY     *resp.Command
)
View Source
var (
	OK              = []byte("OK")
	MOVED           = []byte("-MOVED")
	ASK             = []byte("-ASK")
	ASK_CMD_BYTES   = []byte("+ASKING\r\n")
	AUTH_CMD_ERR    = []byte("ERR invalid password")
	UNKNOWN_CMD_ERR = []byte("ERR unknown command")
	ARGUMENTS_ERR   = []byte("ERR wrong number of arguments")
	NOAUTH_ERR      = []byte("NOAUTH Authentication required.")
	OK_DATA         = &resp.Data{T: resp.T_SimpleString, String: OK}
)

Functions

func CRC16

func CRC16(buf []byte) uint16

CRC16 returns checksum for a given set of bytes based on the crc algorithm defined for hashing redis keys in a cluster setup

func CmdAuthRequired

func CmdAuthRequired(cmd *resp.Command) bool

func CmdFlag

func CmdFlag(cmd *resp.Command) int

func CmdReadAll

func CmdReadAll(cmd *resp.Command) bool

func CmdReadOnly

func CmdReadOnly(cmd *resp.Command) bool

func CmdUnknown

func CmdUnknown(cmd *resp.Command) bool

func IsMultiCmd

func IsMultiCmd(cmd *resp.Command) (multiKey bool, numKeys int)

func Key2Slot

func Key2Slot(key string) int

func LocalIP

func LocalIP() string

func ParseRedirectInfo

func ParseRedirectInfo(msg string) (slot int, server string)

ParseRedirectInfo parse slot redirect information from MOVED and ASK Error

Types

type BackendServer

type BackendServer struct {
	// contains filtered or unexported fields
}

func NewBackendServer

func NewBackendServer(server string, redisConn *RedisConn) *BackendServer

func (*BackendServer) Close

func (tr *BackendServer) Close() error

func (*BackendServer) Request

func (tr *BackendServer) Request(req *PipelineRequest) (*PipelineResponse, error)

type BackendServerPool added in v1.1.0

type BackendServerPool struct {
	// contains filtered or unexported fields
}

func NewBackendServerPool added in v1.1.0

func NewBackendServerPool(redisConn *RedisConn) *BackendServerPool

func (*BackendServerPool) Get added in v1.1.0

func (b *BackendServerPool) Get(server string) (*BackendServer, error)

func (*BackendServerPool) Init added in v1.1.0

func (b *BackendServerPool) Init(server string) (*connpool.Pool, error)

func (*BackendServerPool) Put added in v1.1.0

func (b *BackendServerPool) Put(server *BackendServer) error

func (*BackendServerPool) Reload added in v1.1.0

func (b *BackendServerPool) Reload(servers map[string]bool)

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

func NewDispatcher

func NewDispatcher(startupNodes []string, slotReloadInterval time.Duration, redisConn *RedisConn, readPrefer int) *Dispatcher

func (*Dispatcher) InitSlotTable

func (d *Dispatcher) InitSlotTable() error

func (*Dispatcher) Run

func (d *Dispatcher) Run()

func (*Dispatcher) TriggerReloadSlots

func (d *Dispatcher) TriggerReloadSlots()

schedule a reload task this call is inherently throttled, so that multiple clients can call it at the same time and it will only actually occur once

type MultiCmd

type MultiCmd struct {
	// contains filtered or unexported fields
}

multi key cmd被拆分成numKeys个子请求按普通的pipeline request发送,最后在写出response时进行合并 当最后一个子请求的response到来时,整个multi key cmd完成,拼接最终response并写出

只要有一个子请求失败,都认定整个请求失败 多个子请求共享一个request sequence number

请求的失败包含两种类型:1、网络失败,比如读取超时,2,请求错误,比如本来该在A机器上,请求到了B机器上,表现为response type为error

func NewMultiCmd

func NewMultiCmd(session *Session, cmd *resp.Command, numSubCmds int) *MultiCmd

func (*MultiCmd) CoalesceRsp

func (mc *MultiCmd) CoalesceRsp() *PipelineResponse

func (*MultiCmd) Finished

func (mc *MultiCmd) Finished() bool

func (*MultiCmd) OnSubCmdFinished

func (mc *MultiCmd) OnSubCmdFinished(rsp *PipelineResponse)

func (*MultiCmd) SubCmd

func (mc *MultiCmd) SubCmd(index, size int) (*resp.Command, error)

type MultiCmdExec

type MultiCmdExec struct {
	// contains filtered or unexported fields
}

func NewMultiCmdExec

func NewMultiCmdExec(session *Session) *MultiCmdExec

func (*MultiCmdExec) Exec

func (m *MultiCmdExec) Exec() (*resp.Data, error)

type PipelineRequest

type PipelineRequest struct {
	// contains filtered or unexported fields
}

type PipelineResponse

type PipelineResponse struct {
	// contains filtered or unexported fields
}

type PipelineResponseHeap

type PipelineResponseHeap []*PipelineResponse

func (PipelineResponseHeap) Len

func (h PipelineResponseHeap) Len() int

func (PipelineResponseHeap) Less

func (h PipelineResponseHeap) Less(i, j int) bool

func (*PipelineResponseHeap) Pop

func (h *PipelineResponseHeap) Pop() interface{}

func (*PipelineResponseHeap) Push

func (h *PipelineResponseHeap) Push(x interface{})

func (PipelineResponseHeap) Swap

func (h PipelineResponseHeap) Swap(i, j int)

func (PipelineResponseHeap) Top

Peek will return the heap top element

type Proxy

type Proxy struct {
	// contains filtered or unexported fields
}

func NewProxy

func NewProxy(addr string, dispatcher *Dispatcher, redisConn *RedisConn) *Proxy

func (*Proxy) Exit

func (p *Proxy) Exit()

func (*Proxy) Run

func (p *Proxy) Run()

type RedisConn

type RedisConn struct {
	// contains filtered or unexported fields
}

func NewRedisConn

func NewRedisConn(initCap, maxIdle int, connTimeout time.Duration, password string, sendReadOnly bool) *RedisConn

func (*RedisConn) Auth

func (cp *RedisConn) Auth(password string) bool

func (*RedisConn) Conn

func (cp *RedisConn) Conn(server string) (net.Conn, error)

func (*RedisConn) Request

func (cp *RedisConn) Request(command *resp.Command, conn net.Conn) (*resp.Data, error)

type ServerGroup

type ServerGroup struct {
	// contains filtered or unexported fields
}

ServerGroup根据cluster slots和ReadPrefer得出

type Session

type Session struct {
	net.Conn
	// contains filtered or unexported fields
}

func (*Session) Close

func (s *Session) Close()

func (*Session) Read

func (s *Session) Read(p []byte) (int, error)

func (*Session) ReadingLoop

func (s *Session) ReadingLoop()

func (*Session) Reset added in v1.1.0

func (s *Session) Reset(conn net.Conn)

func (*Session) Run

func (s *Session) Run()

func (*Session) Schedule

func (s *Session) Schedule(req *PipelineRequest)

func (*Session) WritingLoop

func (s *Session) WritingLoop()

WritingLoop consumes backQ and send response to client It close the connection to notify reader on error and continue loop until the reader has exited

type SlotInfo

type SlotInfo struct {
	// contains filtered or unexported fields
}

func NewSlotInfo

func NewSlotInfo(data *resp.Data) *SlotInfo

type SlotTable

type SlotTable struct {
	// contains filtered or unexported fields
}

func NewSlotTable

func NewSlotTable() *SlotTable

func (*SlotTable) ReadServer

func (st *SlotTable) ReadServer(slot int) string

func (*SlotTable) ServerSlots

func (st *SlotTable) ServerSlots() []int

func (*SlotTable) SetSlotInfo

func (st *SlotTable) SetSlotInfo(si *SlotInfo)

func (*SlotTable) WriteServer

func (st *SlotTable) WriteServer(slot int) string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL