proxy

package
v0.0.0-...-0d6cf08 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 24, 2020 License: MIT Imports: 38 Imported by: 0

Documentation

Index

Constants

View Source
const (
	FlagWrite = 1 << iota
	FlagMasterOnly
	FlagMayWrite
	FlagNotAllow
)
View Source
const (
	StatsCmds = StatsFlags(1 << iota)
	StatsSlots
	StatsRuntime

	StatsFull = StatsFlags(^uint32(0))
)
View Source
const DefaultConfig = `` /* 3902-byte string literal not displayed */
View Source
const DefaultRequestChanBuffer = 128
View Source
const GOLDEN_RATIO_PRIME_32 = 0x9e370001
View Source
const MaxOpStrLen = 64
View Source
const MaxSlotNum = models.MaxSlotNum

Variables

View Source
var (
	ErrBackendConnReset = errors.New("backend conn reset")
	ErrRequestIsBroken  = errors.New("request is broken")
)
View Source
var (
	ErrSlotIsNotReady = errors.New("slot is not ready, may be offline")
	ErrRespIsRequired = errors.New("resp is required")
)
View Source
var (
	ErrBadMultiBulk = errors.New("bad multi-bulk for command")
	ErrBadOpStrLen  = errors.New("bad command length, too short or too long")
)
View Source
var (
	ErrClosedRouter  = errors.New("use of closed router")
	ErrInvalidSlotId = errors.New("use of invalid slot id")
	ErrInvalidMethod = errors.New("use of invalid forwarder method")
)
View Source
var (
	ErrRouterNotOnline          = errors.New("router is not online")
	ErrTooManySessions          = errors.New("too many sessions")
	ErrTooManyPipelinedRequests = errors.New("too many pipelined requests")
)
View Source
var ErrClosedJodis = errors.New("use of closed jodis")
View Source
var ErrClosedProxy = errors.New("use of closed proxy")
View Source
var RespOK = redis.NewString([]byte("OK"))

Functions

func Hash

func Hash(key []byte) uint32

func OpFails

func OpFails() int64

func OpQPS

func OpQPS() int64

func OpRedisErrors

func OpRedisErrors() int64

func OpTotal

func OpTotal() int64

func ResetStats

func ResetStats()

func SessionsAlive

func SessionsAlive() int64

func SessionsTotal

func SessionsTotal() int64

Types

type ApiClient

type ApiClient struct {
	// contains filtered or unexported fields
}

func NewApiClient

func NewApiClient(addr string) *ApiClient

func (*ApiClient) FillSlots

func (c *ApiClient) FillSlots(slots ...*models.Slot) error

func (*ApiClient) ForceGC

func (c *ApiClient) ForceGC() error

func (*ApiClient) LogLevel

func (c *ApiClient) LogLevel(level log.LogLevel) error

func (*ApiClient) Model

func (c *ApiClient) Model() (*models.Proxy, error)

func (*ApiClient) Overview

func (c *ApiClient) Overview() (*Overview, error)

func (*ApiClient) ResetStats

func (c *ApiClient) ResetStats() error

func (*ApiClient) RewatchSentinels

func (c *ApiClient) RewatchSentinels() error

func (*ApiClient) SetSentinels

func (c *ApiClient) SetSentinels(sentinel *models.Sentinel) error

func (*ApiClient) SetXAuth

func (c *ApiClient) SetXAuth(name, auth string, token string)

func (*ApiClient) Shutdown

func (c *ApiClient) Shutdown() error

func (*ApiClient) Slots

func (c *ApiClient) Slots() ([]*models.Slot, error)

func (*ApiClient) Start

func (c *ApiClient) Start() error

func (*ApiClient) Stats

func (c *ApiClient) Stats(flags StatsFlags) (*Stats, error)

func (*ApiClient) StatsSimple

func (c *ApiClient) StatsSimple() (*Stats, error)

func (*ApiClient) XPing

func (c *ApiClient) XPing() error

type BackendConn

type BackendConn struct {
	// contains filtered or unexported fields
}

func NewBackendConn

func NewBackendConn(addr string, database int, config *Config) *BackendConn

每个backendConn会创建对应的reader和writer

func (*BackendConn) Addr

func (bc *BackendConn) Addr() string

func (*BackendConn) Close

func (bc *BackendConn) Close()

func (*BackendConn) IsConnected

func (bc *BackendConn) IsConnected() bool

func (*BackendConn) KeepAlive

func (bc *BackendConn) KeepAlive() bool

func (*BackendConn) PushBack

func (bc *BackendConn) PushBack(r *Request)

请求放入BackendConn等待 处理。如果request的sync.WaitGroup不为空,就加一,然后判断加一之后的值,如果加一之后couter为0,那么所有阻塞在counter上的goroutine都会得到释放 将请求直接存入到BackendConn的chan *Request中,等待后续被取出并进行处理

type Config

type Config struct {
	ProtoType string `toml:"proto_type" json:"proto_type"`
	ProxyAddr string `toml:"proxy_addr" json:"proxy_addr"`
	AdminAddr string `toml:"admin_addr" json:"admin_addr"`

	HostProxy string `toml:"-" json:"-"`
	HostAdmin string `toml:"-" json:"-"`

	JodisName       string            `toml:"jodis_name" json:"jodis_name"`
	JodisAddr       string            `toml:"jodis_addr" json:"jodis_addr"`
	JodisAuth       string            `toml:"jodis_auth" json:"jodis_auth"`
	JodisTimeout    timesize.Duration `toml:"jodis_timeout" json:"jodis_timeout"`
	JodisCompatible bool              `toml:"jodis_compatible" json:"jodis_compatible"`

	ProductName string `toml:"product_name" json:"product_name"`
	ProductAuth string `toml:"product_auth" json:"-"`
	SessionAuth string `toml:"session_auth" json:"-"`

	ProxyDataCenter      string         `toml:"proxy_datacenter" json:"proxy_datacenter"`
	ProxyMaxClients      int            `toml:"proxy_max_clients" json:"proxy_max_clients"`
	ProxyMaxOffheapBytes bytesize.Int64 `toml:"proxy_max_offheap_size" json:"proxy_max_offheap_size"`
	ProxyHeapPlaceholder bytesize.Int64 `toml:"proxy_heap_placeholder" json:"proxy_heap_placeholder"`

	BackendPingPeriod      timesize.Duration `toml:"backend_ping_period" json:"backend_ping_period"`
	BackendRecvBufsize     bytesize.Int64    `toml:"backend_recv_bufsize" json:"backend_recv_bufsize"`
	BackendRecvTimeout     timesize.Duration `toml:"backend_recv_timeout" json:"backend_recv_timeout"`
	BackendSendBufsize     bytesize.Int64    `toml:"backend_send_bufsize" json:"backend_send_bufsize"`
	BackendSendTimeout     timesize.Duration `toml:"backend_send_timeout" json:"backend_send_timeout"`
	BackendMaxPipeline     int               `toml:"backend_max_pipeline" json:"backend_max_pipeline"`
	BackendPrimaryOnly     bool              `toml:"backend_primary_only" json:"backend_primary_only"`
	BackendPrimaryParallel int               `toml:"backend_primary_parallel" json:"backend_primary_parallel"`
	BackendReplicaParallel int               `toml:"backend_replica_parallel" json:"backend_replica_parallel"`
	BackendKeepAlivePeriod timesize.Duration `toml:"backend_keepalive_period" json:"backend_keepalive_period"`
	BackendNumberDatabases int32             `toml:"backend_number_databases" json:"backend_number_databases"`

	SessionRecvBufsize     bytesize.Int64    `toml:"session_recv_bufsize" json:"session_recv_bufsize"`
	SessionRecvTimeout     timesize.Duration `toml:"session_recv_timeout" json:"session_recv_timeout"`
	SessionSendBufsize     bytesize.Int64    `toml:"session_send_bufsize" json:"session_send_bufsize"`
	SessionSendTimeout     timesize.Duration `toml:"session_send_timeout" json:"session_send_timeout"`
	SessionMaxPipeline     int               `toml:"session_max_pipeline" json:"session_max_pipeline"`
	SessionKeepAlivePeriod timesize.Duration `toml:"session_keepalive_period" json:"session_keepalive_period"`
	SessionBreakOnFailure  bool              `toml:"session_break_on_failure" json:"session_break_on_failure"`

	MetricsReportServer           string            `toml:"metrics_report_server" json:"metrics_report_server"`
	MetricsReportPeriod           timesize.Duration `toml:"metrics_report_period" json:"metrics_report_period"`
	MetricsReportInfluxdbServer   string            `toml:"metrics_report_influxdb_server" json:"metrics_report_influxdb_server"`
	MetricsReportInfluxdbPeriod   timesize.Duration `toml:"metrics_report_influxdb_period" json:"metrics_report_influxdb_period"`
	MetricsReportInfluxdbUsername string            `toml:"metrics_report_influxdb_username" json:"metrics_report_influxdb_username"`
	MetricsReportInfluxdbPassword string            `toml:"metrics_report_influxdb_password" json:"-"`
	MetricsReportInfluxdbDatabase string            `toml:"metrics_report_influxdb_database" json:"metrics_report_influxdb_database"`
	MetricsReportStatsdServer     string            `toml:"metrics_report_statsd_server" json:"metrics_report_statsd_server"`
	MetricsReportStatsdPeriod     timesize.Duration `toml:"metrics_report_statsd_period" json:"metrics_report_statsd_period"`
	MetricsReportStatsdPrefix     string            `toml:"metrics_report_statsd_prefix" json:"metrics_report_statsd_prefix"`
}

func NewDefaultConfig

func NewDefaultConfig() *Config

func (*Config) LoadFromFile

func (c *Config) LoadFromFile(path string) error

func (*Config) String

func (c *Config) String() string

func (*Config) Validate

func (c *Config) Validate() error

type Delay

type Delay interface {
	Reset()
	After() <-chan time.Time
	Sleep()
	SleepWithCancel(canceled func() bool)
}

type DelayExp2

type DelayExp2 struct {
	Min, Max int
	Value    int
	Unit     time.Duration
}

func (*DelayExp2) After

func (d *DelayExp2) After() <-chan time.Time

func (*DelayExp2) NextValue

func (d *DelayExp2) NextValue() int

func (*DelayExp2) Reset

func (d *DelayExp2) Reset()

func (*DelayExp2) Sleep

func (d *DelayExp2) Sleep()

func (*DelayExp2) SleepWithCancel

func (d *DelayExp2) SleepWithCancel(canceled func() bool)

type Jodis

type Jodis struct {
	// contains filtered or unexported fields
}

客户端的连接

func NewJodis

func NewJodis(c models.Client, p *models.Proxy) *Jodis

func (*Jodis) Close

func (j *Jodis) Close() error

func (*Jodis) Data

func (j *Jodis) Data() string

func (*Jodis) IsClosed

func (j *Jodis) IsClosed() bool

func (*Jodis) IsWatching

func (j *Jodis) IsWatching() bool

func (*Jodis) Path

func (j *Jodis) Path() string

func (*Jodis) Rewatch

func (j *Jodis) Rewatch() (<-chan struct{}, error)

func (*Jodis) Start

func (j *Jodis) Start()

type OpFlag

type OpFlag uint32

func (OpFlag) IsMasterOnly

func (f OpFlag) IsMasterOnly() bool

func (OpFlag) IsNotAllowed

func (f OpFlag) IsNotAllowed() bool

func (OpFlag) IsReadOnly

func (f OpFlag) IsReadOnly() bool

type OpInfo

type OpInfo struct {
	Name string
	Flag OpFlag
}

type OpStats

type OpStats struct {
	OpStr        string `json:"opstr"`
	Calls        int64  `json:"calls"`
	Usecs        int64  `json:"usecs"`
	UsecsPercall int64  `json:"usecs_percall"`
	Fails        int64  `json:"fails"`
	RedisErrType int64  `json:"redis_errtype"`
}

func GetOpStatsAll

func GetOpStatsAll() []*OpStats

type Overview

type Overview struct {
	Version string         `json:"version"`
	Compile string         `json:"compile"`
	Config  *Config        `json:"config,omitempty"`
	Model   *models.Proxy  `json:"model,omitempty"`
	Stats   *Stats         `json:"stats,omitempty"`
	Slots   []*models.Slot `json:"slots,omitempty"`
}

type Proxy

type Proxy struct {
	// contains filtered or unexported fields
}

func New

func New(config *Config) (*Proxy, error)

func (*Proxy) Close

func (s *Proxy) Close() error

func (*Proxy) Config

func (s *Proxy) Config() *Config

func (*Proxy) FillSlot

func (s *Proxy) FillSlot(m *models.Slot) error

func (*Proxy) FillSlots

func (s *Proxy) FillSlots(slots []*models.Slot) error

填充Slot 这里传过来的slots实际上只有一个,就是整个集群中最小的Action.State对应的Slotmapping

func (*Proxy) GetSentinels

func (s *Proxy) GetSentinels() ([]string, map[int]string)

func (*Proxy) HasSwitched

func (s *Proxy) HasSwitched() bool

func (*Proxy) IsClosed

func (s *Proxy) IsClosed() bool

func (*Proxy) IsOnline

func (s *Proxy) IsOnline() bool

func (*Proxy) Model

func (s *Proxy) Model() *models.Proxy

func (*Proxy) Overview

func (s *Proxy) Overview(flags StatsFlags) *Overview

func (*Proxy) RewatchSentinels

func (s *Proxy) RewatchSentinels() error

func (*Proxy) SetSentinels

func (s *Proxy) SetSentinels(servers []string) error

func (*Proxy) Slots

func (s *Proxy) Slots() []*models.Slot

func (*Proxy) Start

func (s *Proxy) Start() error

将Proxy自身,和它的router和jodis设为上线 在zk中创建临时节点/jodis/codis-productName/proxy-token,并监听该节点的变化

func (*Proxy) Stats

func (s *Proxy) Stats(flags StatsFlags) *Stats

func (*Proxy) SwitchMasters

func (s *Proxy) SwitchMasters(masters map[int]string) error

func (*Proxy) XAuth

func (s *Proxy) XAuth() string

type Request

type Request struct {
	// 最开始客户端的请求(redis命令数组)
	Multi []*redis.Resp

	// 批量子请求控制器
	// 控制前端请求处理进度 前端传给后端请求时+1 后端处理完之后-1
	Batch *sync.WaitGroup

	// TODO: 这里有误:当需要进行命令拆分, 例如mget, 会将不同的命令路由到不同的后端server/slots
	// 其实是保证group slots映射关系. 每次在fillSlots之前会wait()住
	Group *sync.WaitGroup

	Broken *atomic2.Bool

	// redis请求的具体命令
	OpStr string
	// 命令对应标志 快慢/读写
	OpFlag

	// 当前哪个数据库 select相关
	Database int32
	UnixNano int64

	// 最终恢复给前端的rsp
	*redis.Resp
	Err error

	Coalesce func() error
}

请求处理的内部实体类

func (*Request) IsBroken

func (r *Request) IsBroken() bool

func (*Request) MakeSubRequest

func (r *Request) MakeSubRequest(n int) []Request

func (*Request) Seed16

func (r *Request) Seed16() uint

type RequestChan

type RequestChan struct {
	// contains filtered or unexported fields
}

每一个session都会有一个RequestChan

func NewRequestChan

func NewRequestChan() *RequestChan

func NewRequestChanBuffer

func NewRequestChanBuffer(n int) *RequestChan

func (*RequestChan) Buffered

func (c *RequestChan) Buffered() int

func (*RequestChan) Close

func (c *RequestChan) Close()

func (*RequestChan) IsEmpty

func (c *RequestChan) IsEmpty() bool

func (*RequestChan) PopFront

func (c *RequestChan) PopFront() (*Request, bool)

func (*RequestChan) PopFrontAll

func (c *RequestChan) PopFrontAll(onRequest func(r *Request) error) error

func (*RequestChan) PopFrontAllVoid

func (c *RequestChan) PopFrontAllVoid(onRequest func(r *Request))

func (*RequestChan) PushBack

func (c *RequestChan) PushBack(r *Request) int

type Router

type Router struct {
	// contains filtered or unexported fields
}

存储了集群中所有的sharedBackendConnPool和slot 用于将redis请求转发给相应的slot进行处理

func NewRouter

func NewRouter(config *Config) *Router

func (*Router) Close

func (s *Router) Close()

func (*Router) FillSlot

func (s *Router) FillSlot(m *models.Slot) error

func (*Router) GetSlot

func (s *Router) GetSlot(id int) *models.Slot

func (*Router) GetSlots

func (s *Router) GetSlots() []*models.Slot

func (*Router) HasSwitched

func (s *Router) HasSwitched() bool

func (*Router) KeepAlive

func (s *Router) KeepAlive() error

func (*Router) Start

func (s *Router) Start()

func (*Router) SwitchMasters

func (s *Router) SwitchMasters(masters map[int]string) error

type RuntimeStats

type RuntimeStats struct {
	General struct {
		Alloc   uint64 `json:"alloc"`
		Sys     uint64 `json:"sys"`
		Lookups uint64 `json:"lookups"`
		Mallocs uint64 `json:"mallocs"`
		Frees   uint64 `json:"frees"`
	} `json:"general"`

	Heap struct {
		Alloc   uint64 `json:"alloc"`
		Sys     uint64 `json:"sys"`
		Idle    uint64 `json:"idle"`
		Inuse   uint64 `json:"inuse"`
		Objects uint64 `json:"objects"`
	} `json:"heap"`

	GC struct {
		Num          uint32  `json:"num"`
		CPUFraction  float64 `json:"cpu_fraction"`
		TotalPauseMs uint64  `json:"total_pausems"`
	} `json:"gc"`

	NumProcs      int   `json:"num_procs"`
	NumGoroutines int   `json:"num_goroutines"`
	NumCgoCall    int64 `json:"num_cgo_call"`
	MemOffheap    int64 `json:"mem_offheap"`
}

type Session

type Session struct {
	// 与客户端的链接
	Conn *redis.Conn

	// 请求数量
	Ops int64

	// 上次创建时间
	CreateUnix int64
	// 上次请求时间
	LastOpUnix int64
	// contains filtered or unexported fields
}

func NewSession

func NewSession(sock net.Conn, config *Config) *Session

每接到一个redis请求,就创建一个独立的session进行处理(默认的每个session的tcp连接过期时间为75秒,也就是每个请求最多处理75秒)。 这里的第一个参数是net.Conn,Conn是一个通用的面向流的网络连接,多个goroutines可以同时调用Conn的方法 这里的net.Conn就是我们之前Proxy的lproxy这个Listener监听到的19000请求到来的时候返回的net.Conn

func (*Session) CloseReaderWithError

func (s *Session) CloseReaderWithError(err error) error

func (*Session) CloseWithError

func (s *Session) CloseWithError(err error) error

func (*Session) Start

func (s *Session) Start(d *Router)

func (*Session) String

func (s *Session) String() string

type Slot

type Slot struct {
	// contains filtered or unexported fields
}

type Stats

type Stats struct {
	Online bool `json:"online"`
	Closed bool `json:"closed"`

	Sentinels struct {
		Servers  []string          `json:"servers,omitempty"`
		Masters  map[string]string `json:"masters,omitempty"`
		Switched bool              `json:"switched,omitempty"`
	} `json:"sentinels"`

	Ops struct {
		Total int64 `json:"total"`
		Fails int64 `json:"fails"`
		Redis struct {
			Errors int64 `json:"errors"`
		} `json:"redis"`
		QPS int64      `json:"qps"`
		Cmd []*OpStats `json:"cmd,omitempty"`
	} `json:"ops"`

	Sessions struct {
		Total int64 `json:"total"`
		Alive int64 `json:"alive"`
	} `json:"sessions"`

	Rusage struct {
		Now string       `json:"now"`
		CPU float64      `json:"cpu"`
		Mem int64        `json:"mem"`
		Raw *utils.Usage `json:"raw,omitempty"`
	} `json:"rusage"`

	Backend struct {
		PrimaryOnly bool `json:"primary_only"`
	} `json:"backend"`

	Runtime *RuntimeStats `json:"runtime,omitempty"`
}

type StatsFlags

type StatsFlags uint32

func (StatsFlags) HasBit

func (s StatsFlags) HasBit(m StatsFlags) bool

type SysUsage

type SysUsage struct {
	Now time.Time
	CPU float64
	*utils.Usage
}

func GetSysUsage

func GetSysUsage() *SysUsage

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL