relay

package
v0.0.0-...-f948758 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 6, 2024 License: PostgreSQL Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// Message from client
	BufferedMessageRegular = BufferedMessageType(0)
	// Message produced by spqr
	BufferedMessageInternal = BufferedMessageType(1)
)

Variables

View Source
var ErrSkipQuery = fmt.Errorf("wait for a next query")
View Source
var MultiShardPrepStmtDeployError = fmt.Errorf("multishard prepared statement deploy is not supported")

Functions

func AdvancedPoolModeNeeded

func AdvancedPoolModeNeeded(rst RelayStateMgr) bool

func ProcQueryAdvanced

func ProcQueryAdvanced(rst RelayStateMgr, query string, ph ProtoStateHandler, binderQ func() error, doCaching bool) error

ProcQueryAdvanced processes query, with router relay state There are several types of query that we want to process in non-passthrough way. For example, after BEGIN we wait until first client query witch can be router to some shard. So, we need to proccess SETs, BEGINs, ROLLBACKs etc ourselves. ProtoStateHandler provides set of function for either simple of extended protoc interactions query param is either plain query from simple proto or bind query from x proto

Types

type BufferedMessage

type BufferedMessage struct {
	// contains filtered or unexported fields
}

func InternalBufferedMessage

func InternalBufferedMessage(q pgproto3.FrontendMessage) BufferedMessage

func RegularBufferedMessage

func RegularBufferedMessage(q pgproto3.FrontendMessage) BufferedMessage

type BufferedMessageType

type BufferedMessageType int

type ParseCacheEntry

type ParseCacheEntry struct {
	// contains filtered or unexported fields
}

type PortalDesc

type PortalDesc struct {
	// contains filtered or unexported fields
}

type ProtoStateHandler

type ProtoStateHandler interface {
	ExecCommit(rst RelayStateMgr, query string) error
	ExecRollback(rst RelayStateMgr, query string) error

	ExecSet(rst RelayStateMgr, query, name, value string) error
	ExecSetLocal(rst RelayStateMgr, query, name, value string) error
	ExecReset(rst RelayStateMgr, query, name string) error
	ExecResetMetadata(rst RelayStateMgr, query, setting string) error
}

Execute requered command via some protoc-specific logic

func NewSimpleProtoStateHandler

func NewSimpleProtoStateHandler(cmngr poolmgr.PoolMgr) ProtoStateHandler

type RelayStateImpl

type RelayStateImpl struct {
	CopyActive bool

	TargetKeyRange kr.KeyRange

	WorldShardFallback bool

	Qr qrouter.QueryRouter

	Cl client.RouterClient
	// contains filtered or unexported fields
}

func (*RelayStateImpl) ActiveShards

func (rst *RelayStateImpl) ActiveShards() []kr.ShardKey

func (*RelayStateImpl) ActiveShardsReset

func (rst *RelayStateImpl) ActiveShardsReset()

func (*RelayStateImpl) AddExtendedProtocMessage

func (rst *RelayStateImpl) AddExtendedProtocMessage(q pgproto3.FrontendMessage)

TODO : unit tests

func (*RelayStateImpl) AddQuery

func (rst *RelayStateImpl) AddQuery(q pgproto3.FrontendMessage)

TODO : unit tests

func (*RelayStateImpl) AddSilentQuery

func (rst *RelayStateImpl) AddSilentQuery(q pgproto3.FrontendMessage)

TODO : unit tests

func (*RelayStateImpl) Client

func (rst *RelayStateImpl) Client() client.RouterClient

func (*RelayStateImpl) Close

func (rst *RelayStateImpl) Close() error

func (*RelayStateImpl) CompleteRelay

func (rst *RelayStateImpl) CompleteRelay(replyCl bool) error

func (*RelayStateImpl) ConnMgr

func (rst *RelayStateImpl) ConnMgr() poolmgr.PoolMgr

func (*RelayStateImpl) Connect

func (rst *RelayStateImpl) Connect(shardRoutes []*routingstate.DataShardRoute) error

TODO : unit tests

func (*RelayStateImpl) ConnectWorld

func (rst *RelayStateImpl) ConnectWorld() error

func (*RelayStateImpl) CurrentRoutes

func (rst *RelayStateImpl) CurrentRoutes() []*routingstate.DataShardRoute

TODO : unit tests

func (*RelayStateImpl) DataPending

func (rst *RelayStateImpl) DataPending() bool

func (*RelayStateImpl) DeployPrepStmt

TODO : unit tests

func (*RelayStateImpl) Flush

func (rst *RelayStateImpl) Flush()

TODO : unit tests

func (*RelayStateImpl) HoldRouting

func (rst *RelayStateImpl) HoldRouting()

HoldRouting implements RelayStateMgr.

func (*RelayStateImpl) Parse

func (rst *RelayStateImpl) Parse(query string, doCaching bool) (parser.ParseState, string, error)

TODO : unit tests

func (*RelayStateImpl) PgprotoDebug

func (rst *RelayStateImpl) PgprotoDebug() bool

func (*RelayStateImpl) PrepareRelayStep

func (rst *RelayStateImpl) PrepareRelayStep(cmngr poolmgr.PoolMgr) error

TODO : unit tests

func (*RelayStateImpl) PrepareRelayStepOnAnyRoute

func (rst *RelayStateImpl) PrepareRelayStepOnAnyRoute(cmngr poolmgr.PoolMgr) (func() error, error)

TODO : unit tests

func (*RelayStateImpl) PrepareRelayStepOnHintRoute

func (rst *RelayStateImpl) PrepareRelayStepOnHintRoute(cmngr poolmgr.PoolMgr, route *routingstate.DataShardRoute) error

TODO : unit tests

func (*RelayStateImpl) PrepareStatement

TODO : unit tests

func (*RelayStateImpl) ProcCommand

func (rst *RelayStateImpl) ProcCommand(query pgproto3.FrontendMessage, waitForResp bool, replyCl bool) error

TODO : unit tests

func (*RelayStateImpl) ProcCopy

func (rst *RelayStateImpl) ProcCopy(stmt *lyx.Copy, data *pgproto3.CopyData, expRoute *routingstate.DataShardRoute) error

TODO : unit tests

func (*RelayStateImpl) ProcCopyComplete

func (rst *RelayStateImpl) ProcCopyComplete(query *pgproto3.FrontendMessage) error

TODO : unit tests

func (*RelayStateImpl) ProcQuery

func (rst *RelayStateImpl) ProcQuery(query pgproto3.FrontendMessage, waitForResp bool, replyCl bool) (txstatus.TXStatus, []pgproto3.BackendMessage, bool, error)

TODO : unit tests

func (*RelayStateImpl) ProcessExtendedBuffer

func (rst *RelayStateImpl) ProcessExtendedBuffer(cmngr poolmgr.PoolMgr) error

TODO : unit tests

func (*RelayStateImpl) ProcessMessage

func (rst *RelayStateImpl) ProcessMessage(
	msg pgproto3.FrontendMessage,
	waitForResp, replyCl bool,
	cmngr poolmgr.PoolMgr) error

TODO : unit tests

func (*RelayStateImpl) ProcessMessageBuf

func (rst *RelayStateImpl) ProcessMessageBuf(waitForResp, replyCl, completeRelay bool, cmngr poolmgr.PoolMgr) (bool, error)

TODO : unit tests

func (*RelayStateImpl) QueryRouter

func (rst *RelayStateImpl) QueryRouter() qrouter.QueryRouter

func (*RelayStateImpl) RelayCommand

func (rst *RelayStateImpl) RelayCommand(v pgproto3.FrontendMessage, waitForResp bool, replyCl bool) error

TODO : unit tests

func (*RelayStateImpl) RelayFlush

func (rst *RelayStateImpl) RelayFlush(waitForResp bool, replyCl bool) (txstatus.TXStatus, bool, error)

TODO : unit tests

func (*RelayStateImpl) RelayRunCommand

func (rst *RelayStateImpl) RelayRunCommand(msg pgproto3.FrontendMessage, waitForResp bool, replyCl bool) error

func (*RelayStateImpl) RelayStep

func (rst *RelayStateImpl) RelayStep(msg pgproto3.FrontendMessage, waitForResp bool, replyCl bool) (txstatus.TXStatus, []pgproto3.BackendMessage, error)

TODO : unit tests

func (*RelayStateImpl) RequestData

func (rst *RelayStateImpl) RequestData()

RequestData implements RelayStateMgr.

func (*RelayStateImpl) Reroute

func (rst *RelayStateImpl) Reroute() error

TODO : unit tests

func (*RelayStateImpl) RerouteToRandomRoute

func (rst *RelayStateImpl) RerouteToRandomRoute() error

TODO : unit tests

func (*RelayStateImpl) RerouteToTargetRoute

func (rst *RelayStateImpl) RerouteToTargetRoute(route *routingstate.DataShardRoute) error

TODO : unit tests

func (*RelayStateImpl) RerouteWorld

func (rst *RelayStateImpl) RerouteWorld() ([]*routingstate.DataShardRoute, error)

func (*RelayStateImpl) Reset

func (rst *RelayStateImpl) Reset() error

TODO : unit tests

func (*RelayStateImpl) RouterMode

func (rst *RelayStateImpl) RouterMode() config.RouterMode

func (*RelayStateImpl) SetTxStatus

func (rst *RelayStateImpl) SetTxStatus(status txstatus.TXStatus)

func (*RelayStateImpl) ShouldRetry

func (rst *RelayStateImpl) ShouldRetry(err error) bool

func (*RelayStateImpl) StartTrace

func (rst *RelayStateImpl) StartTrace()

TODO : unit tests

func (*RelayStateImpl) SyncCount

func (rst *RelayStateImpl) SyncCount() int64

func (*RelayStateImpl) TxActive

func (rst *RelayStateImpl) TxActive() bool

func (*RelayStateImpl) TxStatus

func (rst *RelayStateImpl) TxStatus() txstatus.TXStatus

func (*RelayStateImpl) UnRouteWithError

func (rst *RelayStateImpl) UnRouteWithError(shkey []kr.ShardKey, errmsg error) error

TODO : unit tests

func (*RelayStateImpl) UnholdRouting

func (rst *RelayStateImpl) UnholdRouting()

UnholdRouting implements RelayStateMgr.

func (*RelayStateImpl) Unroute

func (rst *RelayStateImpl) Unroute(shkey []kr.ShardKey) error

TODO : unit tests

func (*RelayStateImpl) UnrouteRoutes

func (rst *RelayStateImpl) UnrouteRoutes(routes []*routingstate.DataShardRoute) error

TODO : unit tests

type RelayStateMgr

type RelayStateMgr interface {
	poolmgr.ConnectionKeeper
	route.RouteMgr
	QueryRouter() qrouter.QueryRouter
	Reset() error
	StartTrace()
	Flush()

	ConnMgr() poolmgr.PoolMgr

	ShouldRetry(err error) bool
	Parse(query string, doCaching bool) (parser.ParseState, string, error)

	AddQuery(q pgproto3.FrontendMessage)
	AddSilentQuery(q pgproto3.FrontendMessage)
	TxActive() bool

	PgprotoDebug() bool

	RelayStep(msg pgproto3.FrontendMessage, waitForResp bool, replyCl bool) (txstatus.TXStatus, []pgproto3.BackendMessage, error)

	CompleteRelay(replyCl bool) error
	Close() error
	Client() client.RouterClient

	ProcessMessage(msg pgproto3.FrontendMessage, waitForResp, replyCl bool, cmngr poolmgr.PoolMgr) error
	PrepareStatement(hash uint64, d *prepstatement.PreparedStatementDefinition) (*prepstatement.PreparedStatementDescriptor, pgproto3.BackendMessage, error)

	PrepareRelayStep(cmngr poolmgr.PoolMgr) error
	PrepareRelayStepOnAnyRoute(cmngr poolmgr.PoolMgr) (func() error, error)
	PrepareRelayStepOnHintRoute(cmngr poolmgr.PoolMgr, route *routingstate.DataShardRoute) error

	HoldRouting()
	UnholdRouting()

	ProcessMessageBuf(waitForResp, replyCl, completeRelay bool, cmngr poolmgr.PoolMgr) (bool, error)
	RelayRunCommand(msg pgproto3.FrontendMessage, waitForResp bool, replyCl bool) error

	ProcQuery(query pgproto3.FrontendMessage, waitForResp bool, replyCl bool) (txstatus.TXStatus, []pgproto3.BackendMessage, bool, error)
	ProcCopy(stmt *lyx.Copy, data *pgproto3.CopyData, expRoute *routingstate.DataShardRoute) error

	ProcCommand(query pgproto3.FrontendMessage, waitForResp bool, replyCl bool) error

	ProcCopyComplete(query *pgproto3.FrontendMessage) error

	RouterMode() config.RouterMode

	AddExtendedProtocMessage(q pgproto3.FrontendMessage)
	ProcessExtendedBuffer(cmngr poolmgr.PoolMgr) error
}

func NewRelayState

func NewRelayState(qr qrouter.QueryRouter, client client.RouterClient, manager poolmgr.PoolMgr, rcfg *config.Router) RelayStateMgr

type SimpleProtoStateHandler

type SimpleProtoStateHandler struct {
	// contains filtered or unexported fields
}

func (*SimpleProtoStateHandler) ExecCommit

func (s *SimpleProtoStateHandler) ExecCommit(rst RelayStateMgr, query string) error

query in commit query. maybe commit or commit `name`

func (*SimpleProtoStateHandler) ExecReset

func (s *SimpleProtoStateHandler) ExecReset(rst RelayStateMgr, query, setting string) error

func (*SimpleProtoStateHandler) ExecResetMetadata

func (s *SimpleProtoStateHandler) ExecResetMetadata(rst RelayStateMgr, query string, setting string) error

func (*SimpleProtoStateHandler) ExecRollback

func (s *SimpleProtoStateHandler) ExecRollback(rst RelayStateMgr, query string) error

TODO: proper support for rollback to savepoint

func (*SimpleProtoStateHandler) ExecSet

func (s *SimpleProtoStateHandler) ExecSet(rst RelayStateMgr, query string, name, value string) error

func (*SimpleProtoStateHandler) ExecSetLocal

func (s *SimpleProtoStateHandler) ExecSetLocal(rst RelayStateMgr, query, name, value string) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL