Documentation ¶
Overview ¶
Package vtgate provides query routing rpc services for vttablets.
Package vtgate provides query routing rpc services for vttablets.
Index ¶
- Constants
- Variables
- func AggregateVtGateErrors(errors []error) error
- func RegisterGatewayCreator(name string, gc GatewayCreator)
- func StrsEquals(a, b []string) bool
- func WrapError(in error, keyspace, shard string, tabletType topodatapb.TabletType, ...) (wrapped error)
- type AddressList
- type Balancer
- type EndPointsCacheStatus
- type EndPointsCacheStatusList
- type Gateway
- type GatewayCreator
- type GatewayEndPointCacheStatus
- type GatewayEndPointCacheStatusList
- type GatewayEndPointStatusAggregator
- type GetEndPointsFunc
- type Planner
- type RegisterVTGate
- type ResilientSrvTopoServer
- func (server *ResilientSrvTopoServer) CacheStatus() *ResilientSrvTopoServerCacheStatus
- func (server *ResilientSrvTopoServer) GetEndPoints(ctx context.Context, cell, keyspace, shard string, ...) (result *topodatapb.EndPoints, version int64, err error)
- func (server *ResilientSrvTopoServer) GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topodatapb.SrvKeyspace, error)
- func (server *ResilientSrvTopoServer) GetSrvKeyspaceNames(ctx context.Context, cell string) ([]string, error)
- func (server *ResilientSrvTopoServer) GetSrvShard(ctx context.Context, cell, keyspace, shard string) (*topodatapb.SrvShard, error)
- type ResilientSrvTopoServerCacheStatus
- type Resolver
- func (res *Resolver) Commit(ctx context.Context, inSession *vtgatepb.Session) error
- func (res *Resolver) Execute(ctx context.Context, sql string, bindVars map[string]interface{}, ...) (*sqltypes.Result, error)
- func (res *Resolver) ExecuteBatch(ctx context.Context, tabletType topodatapb.TabletType, asTransaction bool, ...) ([]sqltypes.Result, error)
- func (res *Resolver) ExecuteBatchKeyspaceIds(ctx context.Context, queries []*vtgatepb.BoundKeyspaceIdQuery, ...) ([]sqltypes.Result, error)
- func (res *Resolver) ExecuteEntityIds(ctx context.Context, sql string, bindVariables map[string]interface{}, ...) (*sqltypes.Result, error)
- func (res *Resolver) ExecuteKeyRanges(ctx context.Context, sql string, bindVariables map[string]interface{}, ...) (*sqltypes.Result, error)
- func (res *Resolver) ExecuteKeyspaceIds(ctx context.Context, sql string, bindVariables map[string]interface{}, ...) (*sqltypes.Result, error)
- func (res *Resolver) GetGatewayCacheStatus() GatewayEndPointCacheStatusList
- func (res *Resolver) InitializeConnections(ctx context.Context) error
- func (res *Resolver) Rollback(ctx context.Context, inSession *vtgatepb.Session) error
- func (res *Resolver) StreamExecute(ctx context.Context, sql string, bindVars map[string]interface{}, ...) error
- func (res *Resolver) StreamExecuteKeyRanges(ctx context.Context, sql string, bindVariables map[string]interface{}, ...) error
- func (res *Resolver) StreamExecuteKeyspaceIds(ctx context.Context, sql string, bindVariables map[string]interface{}, ...) error
- type Router
- type SafeSession
- type ScatterConn
- func (stc *ScatterConn) Close() error
- func (stc *ScatterConn) Commit(ctx context.Context, session *SafeSession) (err error)
- func (stc *ScatterConn) Execute(ctx context.Context, query string, bindVars map[string]interface{}, ...) (*sqltypes.Result, error)
- func (stc *ScatterConn) ExecuteBatch(ctx context.Context, batchRequest *scatterBatchRequest, ...) (qrs []sqltypes.Result, err error)
- func (stc *ScatterConn) ExecuteEntityIds(ctx context.Context, shards []string, sqls map[string]string, ...) (*sqltypes.Result, error)
- func (stc *ScatterConn) ExecuteMulti(ctx context.Context, query string, keyspace string, ...) (*sqltypes.Result, error)
- func (stc *ScatterConn) GetGatewayCacheStatus() GatewayEndPointCacheStatusList
- func (stc *ScatterConn) InitializeConnections(ctx context.Context) error
- func (stc *ScatterConn) Rollback(ctx context.Context, session *SafeSession) (err error)
- func (stc *ScatterConn) SplitQueryCustomSharding(ctx context.Context, sql string, bindVariables map[string]interface{}, ...) ([]*vtgatepb.SplitQueryResponse_Part, error)
- func (stc *ScatterConn) SplitQueryKeyRange(ctx context.Context, sql string, bindVariables map[string]interface{}, ...) ([]*vtgatepb.SplitQueryResponse_Part, error)
- func (stc *ScatterConn) StreamExecute(ctx context.Context, query string, bindVars map[string]interface{}, ...) error
- func (stc *ScatterConn) StreamExecuteMulti(ctx context.Context, query string, keyspace string, ...) error
- type ScatterConnError
- type ShardConn
- func (sdc *ShardConn) Begin(ctx context.Context) (transactionID int64, err error)
- func (sdc *ShardConn) Close()
- func (sdc *ShardConn) Commit(ctx context.Context, transactionID int64) (err error)
- func (sdc *ShardConn) Dial(ctx context.Context) error
- func (sdc *ShardConn) Execute(ctx context.Context, query string, bindVars map[string]interface{}, ...) (qr *sqltypes.Result, err error)
- func (sdc *ShardConn) ExecuteBatch(ctx context.Context, queries []querytypes.BoundQuery, asTransaction bool, ...) (qrs []sqltypes.Result, err error)
- func (sdc *ShardConn) Rollback(ctx context.Context, transactionID int64) (err error)
- func (sdc *ShardConn) SplitQuery(ctx context.Context, sql string, bindVariables map[string]interface{}, ...) (queries []querytypes.QuerySplit, err error)
- func (sdc *ShardConn) StreamExecute(ctx context.Context, query string, bindVars map[string]interface{}, ...) (<-chan *sqltypes.Result, tabletconn.ErrFunc)
- func (sdc *ShardConn) WrapError(in error, endPoint *topodatapb.EndPoint, inTransaction bool) (wrapped error)
- type ShardConnError
- type SrvKeyspaceCacheStatus
- type SrvKeyspaceCacheStatusList
- type SrvKeyspaceNamesCacheStatus
- type SrvKeyspaceNamesCacheStatusList
- type SrvShardCacheStatus
- type SrvShardCacheStatusList
- type VTGate
- func (vtg *VTGate) Begin(ctx context.Context) (*vtgatepb.Session, error)
- func (vtg *VTGate) Commit(ctx context.Context, session *vtgatepb.Session) error
- func (vtg *VTGate) Execute(ctx context.Context, sql string, bindVariables map[string]interface{}, ...) (*sqltypes.Result, error)
- func (vtg *VTGate) ExecuteBatchKeyspaceIds(ctx context.Context, queries []*vtgatepb.BoundKeyspaceIdQuery, ...) ([]sqltypes.Result, error)
- func (vtg *VTGate) ExecuteBatchShards(ctx context.Context, queries []*vtgatepb.BoundShardQuery, ...) ([]sqltypes.Result, error)
- func (vtg *VTGate) ExecuteEntityIds(ctx context.Context, sql string, bindVariables map[string]interface{}, ...) (*sqltypes.Result, error)
- func (vtg *VTGate) ExecuteKeyRanges(ctx context.Context, sql string, bindVariables map[string]interface{}, ...) (*sqltypes.Result, error)
- func (vtg *VTGate) ExecuteKeyspaceIds(ctx context.Context, sql string, bindVariables map[string]interface{}, ...) (*sqltypes.Result, error)
- func (vtg *VTGate) ExecuteShards(ctx context.Context, sql string, bindVariables map[string]interface{}, ...) (*sqltypes.Result, error)
- func (vtg *VTGate) GetGatewayCacheStatus() GatewayEndPointCacheStatusList
- func (vtg *VTGate) GetSrvKeyspace(ctx context.Context, keyspace string) (*topodatapb.SrvKeyspace, error)
- func (vtg *VTGate) GetSrvShard(ctx context.Context, keyspace, shard string) (*topodatapb.SrvShard, error)
- func (vtg *VTGate) HandlePanic(err *error)
- func (vtg *VTGate) InitializeConnections(ctx context.Context) (err error)
- func (vtg *VTGate) Rollback(ctx context.Context, session *vtgatepb.Session) error
- func (vtg *VTGate) SplitQuery(ctx context.Context, keyspace string, sql string, ...) ([]*vtgatepb.SplitQueryResponse_Part, error)
- func (vtg *VTGate) StreamExecute(ctx context.Context, sql string, bindVariables map[string]interface{}, ...) error
- func (vtg *VTGate) StreamExecuteKeyRanges(ctx context.Context, sql string, bindVariables map[string]interface{}, ...) error
- func (vtg *VTGate) StreamExecuteKeyspaceIds(ctx context.Context, sql string, bindVariables map[string]interface{}, ...) error
- func (vtg *VTGate) StreamExecuteShards(ctx context.Context, sql string, bindVariables map[string]interface{}, ...) error
Constants ¶
const ( PrioritySuccess = iota PriorityTransientError PriorityQueryNotServed PriorityDeadlineExceeded PriorityCancelled PriorityIntegrityError PriorityNotInTx PriorityUnknownError PriorityInternalError PriorityResourceExhausted PriorityUnauthenticated PriorityPermissionDenied PriorityBadInput )
A list of all vtrpcpb.ErrorCodes, ordered by priority. These priorities are used when aggregating multiple errors in VtGate. Higher priority error codes are more urgent for users to see. They are prioritized based on the following question: assuming a scatter query produced multiple errors, which of the errors is the most likely to give the user useful information about why the query failed and how they should proceed?
Variables ¶
var ( // GatewayImplementation controls the implementation of Gateway. GatewayImplementation = flag.String("gateway_implementation", "shardgateway", "The implementation of gateway") )
var RegisterVTGates []RegisterVTGate
RegisterVTGates stores register funcs for VTGate server.
Functions ¶
func AggregateVtGateErrors ¶
AggregateVtGateErrors aggregates several VtErrors.
func RegisterGatewayCreator ¶
func RegisterGatewayCreator(name string, gc GatewayCreator)
RegisterGatewayCreator registers a GatewayCreator with given name.
func StrsEquals ¶
StrsEquals compares contents of two string slices.
func WrapError ¶
func WrapError(in error, keyspace, shard string, tabletType topodatapb.TabletType, endPoint *topodatapb.EndPoint, inTransaction bool) (wrapped error)
WrapError returns ShardConnError which preserves the original error code if possible, adds the connection context and adds a bit to determine whether the keyspace/shard needs to be re-resolved for a potential sharding event.
Types ¶
type AddressList ¶
type AddressList []*addressStatus
AddressList is the slice of addressStatus.
func (AddressList) Len ¶
func (al AddressList) Len() int
func (AddressList) Less ¶
func (al AddressList) Less(i, j int) bool
func (AddressList) Swap ¶
func (al AddressList) Swap(i, j int)
type Balancer ¶
type Balancer struct {
// contains filtered or unexported fields
}
Balancer is a simple round-robin load balancer. It allows you to temporarily mark down nodes that are non-functional.
func NewBalancer ¶
func NewBalancer(getEndPoints GetEndPointsFunc, retryDelay time.Duration) *Balancer
NewBalancer creates a Balancer. getAddresses is the function it will use to refresh the list of addresses if one of the nodes has been marked down. The list of addresses is shuffled. retryDelay specifies the minimum time a node will be marked down before it will be cleared for a retry.
func (*Balancer) Get ¶
func (blc *Balancer) Get() (endPoints []*topodatapb.EndPoint, err error)
Get returns a single endpoint that was not recently marked down. If it finds an address that was down for longer than retryDelay, it refreshes the list of addresses and returns the next available node. If all addresses are marked down, it waits and retries. If a refresh fails, it returns an error.
type EndPointsCacheStatus ¶
type EndPointsCacheStatus struct { Cell string Keyspace string Shard string TabletType topodatapb.TabletType Value *topodatapb.EndPoints OriginalValue *topodatapb.EndPoints LastError error LastErrorCtx context.Context }
EndPointsCacheStatus is the current value for an EndPoints object
func (*EndPointsCacheStatus) StatusAsHTML ¶
func (st *EndPointsCacheStatus) StatusAsHTML() template.HTML
StatusAsHTML returns an HTML version of our status. It works best if there is data in the cache.
type EndPointsCacheStatusList ¶
type EndPointsCacheStatusList []*EndPointsCacheStatus
EndPointsCacheStatusList is used for sorting
func (EndPointsCacheStatusList) Len ¶
func (epcsl EndPointsCacheStatusList) Len() int
Len is part of sort.Interface
func (EndPointsCacheStatusList) Less ¶
func (epcsl EndPointsCacheStatusList) Less(i, j int) bool
Less is part of sort.Interface
func (EndPointsCacheStatusList) Swap ¶
func (epcsl EndPointsCacheStatusList) Swap(i, j int)
Swap is part of sort.Interface
type Gateway ¶
type Gateway interface { // InitializeConnections creates connections to VTTablets. InitializeConnections(ctx context.Context) error // Execute executes the non-streaming query for the specified keyspace, shard, and tablet type. Execute(ctx context.Context, keyspace, shard string, tabletType topodatapb.TabletType, query string, bindVars map[string]interface{}, transactionID int64) (*sqltypes.Result, error) // ExecuteBatch executes a group of queries for the specified keyspace, shard, and tablet type. ExecuteBatch(ctx context.Context, keyspace, shard string, tabletType topodatapb.TabletType, queries []querytypes.BoundQuery, asTransaction bool, transactionID int64) ([]sqltypes.Result, error) // StreamExecute executes a streaming query for the specified keyspace, shard, and tablet type. StreamExecute(ctx context.Context, keyspace, shard string, tabletType topodatapb.TabletType, query string, bindVars map[string]interface{}, transactionID int64) (<-chan *sqltypes.Result, tabletconn.ErrFunc) // Begin starts a transaction for the specified keyspace, shard, and tablet type. // It returns the transaction ID. Begin(ctx context.Context, keyspace string, shard string, tabletType topodatapb.TabletType) (int64, error) // Commit commits the current transaction for the specified keyspace, shard, and tablet type. Commit(ctx context.Context, keyspace, shard string, tabletType topodatapb.TabletType, transactionID int64) error // Rollback rolls back the current transaction for the specified keyspace, shard, and tablet type. Rollback(ctx context.Context, keyspace, shard string, tabletType topodatapb.TabletType, transactionID int64) error // SplitQuery splits a query into sub-queries for the specified keyspace, shard, and tablet type. SplitQuery(ctx context.Context, keyspace, shard string, tabletType topodatapb.TabletType, sql string, bindVariables map[string]interface{}, splitColumn string, splitCount int64) ([]querytypes.QuerySplit, error) // Close shuts down underlying connections. Close(ctx context.Context) error // CacheStatus returns a list of GatewayEndPointCacheStatus per endpoint. CacheStatus() GatewayEndPointCacheStatusList }
A Gateway is the query processing module for each shard, which is used by ScatterConn.
type GatewayCreator ¶
type GatewayCreator func(hc discovery.HealthCheck, topoServer topo.Server, serv topo.SrvTopoServer, cell string, retryDelay time.Duration, retryCount int, connTimeoutTotal, connTimeoutPerConn, connLife time.Duration, connTimings *stats.MultiTimings, tabletTypesToWait []topodatapb.TabletType) Gateway
GatewayCreator is the func which can create the actual gateway object.
func GetGatewayCreator ¶
func GetGatewayCreator() GatewayCreator
GetGatewayCreator returns the GatewayCreator specified by GatewayImplementation flag.
func GetGatewayCreatorByName ¶
func GetGatewayCreatorByName(name string) GatewayCreator
GetGatewayCreatorByName returns the GatewayCreator specified by the given name.
type GatewayEndPointCacheStatus ¶
type GatewayEndPointCacheStatus struct { Keyspace string Shard string TabletType topodatapb.TabletType Name string Addr string QueryCount uint64 QueryError uint64 QPS uint64 AvgLatency float64 // in milliseconds }
GatewayEndPointCacheStatus contains the status per endpoint for a gateway.
type GatewayEndPointCacheStatusList ¶
type GatewayEndPointCacheStatusList []*GatewayEndPointCacheStatus
GatewayEndPointCacheStatusList is a slice of GatewayEndPointCacheStatus.
func (GatewayEndPointCacheStatusList) Len ¶
func (gepcsl GatewayEndPointCacheStatusList) Len() int
Len is part of sort.Interface.
func (GatewayEndPointCacheStatusList) Less ¶
func (gepcsl GatewayEndPointCacheStatusList) Less(i, j int) bool
Less is part of sort.Interface.
func (GatewayEndPointCacheStatusList) Swap ¶
func (gepcsl GatewayEndPointCacheStatusList) Swap(i, j int)
Swap is part of sort.Interface.
type GatewayEndPointStatusAggregator ¶
type GatewayEndPointStatusAggregator struct { Keyspace string Shard string TabletType topodatapb.TabletType Name string // the alternative name of an endpoint Addr string // the host:port of an endpoint QueryCount uint64 QueryError uint64 // contains filtered or unexported fields }
GatewayEndPointStatusAggregator tracks endpoint status for a gateway.
func NewGatewayEndPointStatusAggregator ¶
func NewGatewayEndPointStatusAggregator() *GatewayEndPointStatusAggregator
NewGatewayEndPointStatusAggregator creates a GatewayEndPointStatusAggregator.
func (*GatewayEndPointStatusAggregator) GetCacheStatus ¶
func (gepsa *GatewayEndPointStatusAggregator) GetCacheStatus() *GatewayEndPointCacheStatus
GetCacheStatus returns a GatewayEndPointCacheStatus representing the current gateway status.
func (*GatewayEndPointStatusAggregator) UpdateQueryInfo ¶
func (gepsa *GatewayEndPointStatusAggregator) UpdateQueryInfo(addr string, tabletType topodatapb.TabletType, elapsed time.Duration, hasError bool)
UpdateQueryInfo updates the aggregator with the given information about a query.
type GetEndPointsFunc ¶
type GetEndPointsFunc func() (*topodatapb.EndPoints, error)
GetEndPointsFunc defines the callback to topo server.
type Planner ¶
type Planner struct {
// contains filtered or unexported fields
}
func NewPlanner ¶
func NewPlanner(schema *planbuilder.Schema, cacheSize int) *Planner
type RegisterVTGate ¶
type RegisterVTGate func(vtgateservice.VTGateService)
RegisterVTGate defines the type of registration mechanism.
type ResilientSrvTopoServer ¶
type ResilientSrvTopoServer struct {
// contains filtered or unexported fields
}
ResilientSrvTopoServer is an implementation of SrvTopoServer based on a topo.Server that uses a cache for two purposes: - limit the QPS to the underlying topo.Server - return the last known value of the data if there is an error
func NewResilientSrvTopoServer ¶
func NewResilientSrvTopoServer(base topo.Server, counterPrefix string) *ResilientSrvTopoServer
NewResilientSrvTopoServer creates a new ResilientSrvTopoServer based on the provided SrvTopoServer.
func (*ResilientSrvTopoServer) CacheStatus ¶
func (server *ResilientSrvTopoServer) CacheStatus() *ResilientSrvTopoServerCacheStatus
CacheStatus returns a displayable version of the cache
func (*ResilientSrvTopoServer) GetEndPoints ¶
func (server *ResilientSrvTopoServer) GetEndPoints(ctx context.Context, cell, keyspace, shard string, tabletType topodatapb.TabletType) (result *topodatapb.EndPoints, version int64, err error)
GetEndPoints return all endpoints for the given cell, keyspace, shard, and tablet type.
func (*ResilientSrvTopoServer) GetSrvKeyspace ¶
func (server *ResilientSrvTopoServer) GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topodatapb.SrvKeyspace, error)
GetSrvKeyspace returns SrvKeyspace object for the given cell and keyspace.
func (*ResilientSrvTopoServer) GetSrvKeyspaceNames ¶
func (server *ResilientSrvTopoServer) GetSrvKeyspaceNames(ctx context.Context, cell string) ([]string, error)
GetSrvKeyspaceNames returns all keyspace names for the given cell.
func (*ResilientSrvTopoServer) GetSrvShard ¶
func (server *ResilientSrvTopoServer) GetSrvShard(ctx context.Context, cell, keyspace, shard string) (*topodatapb.SrvShard, error)
GetSrvShard returns SrvShard object for the given cell, keyspace, and shard.
type ResilientSrvTopoServerCacheStatus ¶
type ResilientSrvTopoServerCacheStatus struct { SrvKeyspaceNames SrvKeyspaceNamesCacheStatusList SrvKeyspaces SrvKeyspaceCacheStatusList SrvShards SrvShardCacheStatusList EndPoints EndPointsCacheStatusList }
ResilientSrvTopoServerCacheStatus has the full status of the cache
type Resolver ¶
type Resolver struct {
// contains filtered or unexported fields
}
Resolver is the layer to resolve KeyspaceIds and KeyRanges to shards. It will try to re-resolve shards if ScatterConn returns retryable error, which may imply horizontal or vertical resharding happened.
func NewResolver ¶
func NewResolver(hc discovery.HealthCheck, topoServer topo.Server, serv topo.SrvTopoServer, statsName, cell string, retryDelay time.Duration, retryCount int, connTimeoutTotal, connTimeoutPerConn, connLife time.Duration, tabletTypesToWait []topodatapb.TabletType, testGateway string) *Resolver
NewResolver creates a new Resolver. All input parameters are passed through for creating ScatterConn.
func (*Resolver) Execute ¶
func (res *Resolver) Execute( ctx context.Context, sql string, bindVars map[string]interface{}, keyspace string, tabletType topodatapb.TabletType, session *vtgatepb.Session, mapToShards func(string) (string, []string, error), notInTransaction bool, ) (*sqltypes.Result, error)
Execute executes a non-streaming query based on shards resolved by given func. It retries query if new keyspace/shards are re-resolved after a retryable error.
func (*Resolver) ExecuteBatch ¶
func (res *Resolver) ExecuteBatch( ctx context.Context, tabletType topodatapb.TabletType, asTransaction bool, session *vtgatepb.Session, buildBatchRequest func() (*scatterBatchRequest, error), ) ([]sqltypes.Result, error)
ExecuteBatch executes a group of queries based on shards resolved by given func. It retries query if new keyspace/shards are re-resolved after a retryable error.
func (*Resolver) ExecuteBatchKeyspaceIds ¶
func (res *Resolver) ExecuteBatchKeyspaceIds(ctx context.Context, queries []*vtgatepb.BoundKeyspaceIdQuery, tabletType topodatapb.TabletType, asTransaction bool, session *vtgatepb.Session) ([]sqltypes.Result, error)
ExecuteBatchKeyspaceIds executes a group of queries based on KeyspaceIds. It retries query if new keyspace/shards are re-resolved after a retryable error.
func (*Resolver) ExecuteEntityIds ¶
func (res *Resolver) ExecuteEntityIds( ctx context.Context, sql string, bindVariables map[string]interface{}, keyspace string, entityColumnName string, entityKeyspaceIDs []*vtgatepb.ExecuteEntityIdsRequest_EntityId, tabletType topodatapb.TabletType, session *vtgatepb.Session, notInTransaction bool, ) (*sqltypes.Result, error)
ExecuteEntityIds executes a non-streaming query based on given KeyspaceId map. It retries query if new keyspace/shards are re-resolved after a retryable error.
func (*Resolver) ExecuteKeyRanges ¶
func (res *Resolver) ExecuteKeyRanges(ctx context.Context, sql string, bindVariables map[string]interface{}, keyspace string, keyRanges []*topodatapb.KeyRange, tabletType topodatapb.TabletType, session *vtgatepb.Session, notInTransaction bool) (*sqltypes.Result, error)
ExecuteKeyRanges executes a non-streaming query based on KeyRanges. It retries query if new keyspace/shards are re-resolved after a retryable error.
func (*Resolver) ExecuteKeyspaceIds ¶
func (res *Resolver) ExecuteKeyspaceIds(ctx context.Context, sql string, bindVariables map[string]interface{}, keyspace string, keyspaceIds [][]byte, tabletType topodatapb.TabletType, session *vtgatepb.Session, notInTransaction bool) (*sqltypes.Result, error)
ExecuteKeyspaceIds executes a non-streaming query based on KeyspaceIds. It retries query if new keyspace/shards are re-resolved after a retryable error. This throws an error if a dml spans multiple keyspace_ids. Resharding depends on being able to uniquely route a write.
func (*Resolver) GetGatewayCacheStatus ¶
func (res *Resolver) GetGatewayCacheStatus() GatewayEndPointCacheStatusList
GetGatewayCacheStatus returns a displayable version of the Gateway cache.
func (*Resolver) InitializeConnections ¶
InitializeConnections pre-initializes VTGate by connecting to vttablets of all keyspace/shard/type. It is not necessary to call this function before serving queries, but it would reduce connection overhead when serving.
func (*Resolver) StreamExecute ¶
func (res *Resolver) StreamExecute( ctx context.Context, sql string, bindVars map[string]interface{}, keyspace string, tabletType topodatapb.TabletType, mapToShards func(string) (string, []string, error), sendReply func(*sqltypes.Result) error, ) error
StreamExecute executes a streaming query on shards resolved by given func. This function currently temporarily enforces the restriction of executing on one shard since it cannot merge-sort the results to guarantee ordering of response which is needed for checkpointing.
func (*Resolver) StreamExecuteKeyRanges ¶
func (res *Resolver) StreamExecuteKeyRanges(ctx context.Context, sql string, bindVariables map[string]interface{}, keyspace string, keyRanges []*topodatapb.KeyRange, tabletType topodatapb.TabletType, sendReply func(*sqltypes.Result) error) error
StreamExecuteKeyRanges executes a streaming query on the specified KeyRanges. The KeyRanges are resolved to shards using the serving graph. This function currently temporarily enforces the restriction of executing on one shard since it cannot merge-sort the results to guarantee ordering of response which is needed for checkpointing. The api supports supplying multiple keyranges to make it future proof.
func (*Resolver) StreamExecuteKeyspaceIds ¶
func (res *Resolver) StreamExecuteKeyspaceIds(ctx context.Context, sql string, bindVariables map[string]interface{}, keyspace string, keyspaceIds [][]byte, tabletType topodatapb.TabletType, sendReply func(*sqltypes.Result) error) error
StreamExecuteKeyspaceIds executes a streaming query on the specified KeyspaceIds. The KeyspaceIds are resolved to shards using the serving graph. This function currently temporarily enforces the restriction of executing on one shard since it cannot merge-sort the results to guarantee ordering of response which is needed for checkpointing. The api supports supplying multiple KeyspaceIds to make it future proof.
type Router ¶
type Router struct {
// contains filtered or unexported fields
}
Router is the layer to route queries to the correct shards based on the values in the query.
func NewRouter ¶
func NewRouter(serv topo.SrvTopoServer, cell string, schema *planbuilder.Schema, statsName string, scatterConn *ScatterConn) *Router
NewRouter creates a new Router.
func (*Router) Execute ¶
func (rtr *Router) Execute(ctx context.Context, sql string, bindVariables map[string]interface{}, tabletType topodatapb.TabletType, session *vtgatepb.Session, notInTransaction bool) (*sqltypes.Result, error)
Execute routes a non-streaming query.
func (*Router) StreamExecute ¶
func (rtr *Router) StreamExecute(ctx context.Context, sql string, bindVariables map[string]interface{}, tabletType topodatapb.TabletType, sendReply func(*sqltypes.Result) error) error
StreamExecute executes a streaming query.
type SafeSession ¶
SafeSession is a mutex-protected version of the Session. It is thread-safe if each thread only accesses one shard. (the use pattern is 'Find', if not found, then 'Append', for a single shard)
func NewSafeSession ¶
func NewSafeSession(sessn *vtgatepb.Session) *SafeSession
NewSafeSession returns a new SafeSession based on the Session
func (*SafeSession) Append ¶
func (session *SafeSession) Append(shardSession *vtgatepb.Session_ShardSession)
Append adds a new ShardSession
func (*SafeSession) Find ¶
func (session *SafeSession) Find(keyspace, shard string, tabletType topodatapb.TabletType) int64
Find returns the transactionId, if any, for a session
func (*SafeSession) InTransaction ¶
func (session *SafeSession) InTransaction() bool
InTransaction returns true if we are in a transaction
type ScatterConn ¶
type ScatterConn struct {
// contains filtered or unexported fields
}
ScatterConn is used for executing queries across multiple shard level connections.
func NewScatterConn ¶
func NewScatterConn(hc discovery.HealthCheck, topoServer topo.Server, serv topo.SrvTopoServer, statsName, cell string, retryDelay time.Duration, retryCount int, connTimeoutTotal, connTimeoutPerConn, connLife time.Duration, tabletTypesToWait []topodatapb.TabletType, testGateway string) *ScatterConn
NewScatterConn creates a new ScatterConn. All input parameters are passed through for creating the appropriate connections.
func (*ScatterConn) Close ¶
func (stc *ScatterConn) Close() error
Close closes the underlying Gateway.
func (*ScatterConn) Commit ¶
func (stc *ScatterConn) Commit(ctx context.Context, session *SafeSession) (err error)
Commit commits the current transaction. There are no retries on this operation.
func (*ScatterConn) Execute ¶
func (stc *ScatterConn) Execute( ctx context.Context, query string, bindVars map[string]interface{}, keyspace string, shards []string, tabletType topodatapb.TabletType, session *SafeSession, notInTransaction bool, ) (*sqltypes.Result, error)
Execute executes a non-streaming query on the specified shards.
func (*ScatterConn) ExecuteBatch ¶
func (stc *ScatterConn) ExecuteBatch( ctx context.Context, batchRequest *scatterBatchRequest, tabletType topodatapb.TabletType, asTransaction bool, session *SafeSession) (qrs []sqltypes.Result, err error)
ExecuteBatch executes a batch of non-streaming queries on the specified shards.
func (*ScatterConn) ExecuteEntityIds ¶
func (stc *ScatterConn) ExecuteEntityIds( ctx context.Context, shards []string, sqls map[string]string, bindVars map[string]map[string]interface{}, keyspace string, tabletType topodatapb.TabletType, session *SafeSession, notInTransaction bool, ) (*sqltypes.Result, error)
ExecuteEntityIds executes queries that are shard specific.
func (*ScatterConn) ExecuteMulti ¶
func (stc *ScatterConn) ExecuteMulti( ctx context.Context, query string, keyspace string, shardVars map[string]map[string]interface{}, tabletType topodatapb.TabletType, session *SafeSession, notInTransaction bool, ) (*sqltypes.Result, error)
ExecuteMulti is like Execute, but each shard gets its own bindVars. If len(shards) is not equal to len(bindVars), the function panics.
func (*ScatterConn) GetGatewayCacheStatus ¶
func (stc *ScatterConn) GetGatewayCacheStatus() GatewayEndPointCacheStatusList
GetGatewayCacheStatus returns a displayable version of the Gateway cache.
func (*ScatterConn) InitializeConnections ¶
func (stc *ScatterConn) InitializeConnections(ctx context.Context) error
InitializeConnections pre-initializes connections for all shards. It also populates topology cache by accessing it. It is not necessary to call this function before serving queries, but it would reduce connection overhead when serving.
func (*ScatterConn) Rollback ¶
func (stc *ScatterConn) Rollback(ctx context.Context, session *SafeSession) (err error)
Rollback rolls back the current transaction. There are no retries on this operation.
func (*ScatterConn) SplitQueryCustomSharding ¶
func (stc *ScatterConn) SplitQueryCustomSharding(ctx context.Context, sql string, bindVariables map[string]interface{}, splitColumn string, splitCount int64, shards []string, keyspace string) ([]*vtgatepb.SplitQueryResponse_Part, error)
SplitQueryCustomSharding scatters a SplitQuery request to all shards. For a set of splits received from a shard, it construct a KeyRange queries by appending that shard's name to the splits. Aggregates all splits across all shards in no specific order and returns.
func (*ScatterConn) SplitQueryKeyRange ¶
func (stc *ScatterConn) SplitQueryKeyRange(ctx context.Context, sql string, bindVariables map[string]interface{}, splitColumn string, splitCount int64, keyRangeByShard map[string]*topodatapb.KeyRange, keyspace string) ([]*vtgatepb.SplitQueryResponse_Part, error)
SplitQueryKeyRange scatters a SplitQuery request to all shards. For a set of splits received from a shard, it construct a KeyRange queries by appending that shard's keyrange to the splits. Aggregates all splits across all shards in no specific order and returns.
func (*ScatterConn) StreamExecute ¶
func (stc *ScatterConn) StreamExecute( ctx context.Context, query string, bindVars map[string]interface{}, keyspace string, shards []string, tabletType topodatapb.TabletType, sendReply func(reply *sqltypes.Result) error, ) error
StreamExecute executes a streaming query on vttablet. The retry rules are the same. The implementation of this function is similar to multiGo. A change there is likely to require a change in this function also.
func (*ScatterConn) StreamExecuteMulti ¶
func (stc *ScatterConn) StreamExecuteMulti( ctx context.Context, query string, keyspace string, shardVars map[string]map[string]interface{}, tabletType topodatapb.TabletType, sendReply func(reply *sqltypes.Result) error, ) error
StreamExecuteMulti is like StreamExecute, but each shard gets its own bindVars. If len(shards) is not equal to len(bindVars), the function panics.
type ScatterConnError ¶
type ScatterConnError struct { Code int // Preserve the original errors, so that we don't need to parse the error string. Errs []error // contains filtered or unexported fields }
ScatterConnError is the ScatterConn specific error.
func (*ScatterConnError) Error ¶
func (e *ScatterConnError) Error() string
func (*ScatterConnError) VtErrorCode ¶
func (e *ScatterConnError) VtErrorCode() vtrpcpb.ErrorCode
VtErrorCode returns the underlying Vitess error code
type ShardConn ¶
type ShardConn struct {
// contains filtered or unexported fields
}
ShardConn represents a load balanced connection to a group of vttablets that belong to the same shard. ShardConn can be concurrently used across goroutines. Such requests are interleaved on the same underlying connection.
func NewShardConn ¶
func NewShardConn(ctx context.Context, serv topo.SrvTopoServer, cell, keyspace, shard string, tabletType topodatapb.TabletType, retryDelay time.Duration, retryCount int, connTimeoutTotal, connTimeoutPerConn, connLife time.Duration, tabletConnectTimings *stats.MultiTimings) *ShardConn
NewShardConn creates a new ShardConn. It creates a Balancer using serv, cell, keyspace, tabletType and retryDelay. retryCount is the max number of retries before a ShardConn returns an error on an operation.
func (*ShardConn) Commit ¶
Commit commits the current transaction. The retry rules are the same as Execute.
func (*ShardConn) Dial ¶
Dial creates tablet connection and connects to the vttablet. It is not necessary to call this function before serving queries, but it would reduce connection overhead when serving the first query.
func (*ShardConn) Execute ¶
func (sdc *ShardConn) Execute(ctx context.Context, query string, bindVars map[string]interface{}, transactionID int64) (qr *sqltypes.Result, err error)
Execute executes a non-streaming query on vttablet. If there are connection errors, it retries retryCount times before failing. It does not retry if the connection is in the middle of a transaction.
func (*ShardConn) ExecuteBatch ¶
func (sdc *ShardConn) ExecuteBatch(ctx context.Context, queries []querytypes.BoundQuery, asTransaction bool, transactionID int64) (qrs []sqltypes.Result, err error)
ExecuteBatch executes a group of queries. The retry rules are the same as Execute.
func (*ShardConn) Rollback ¶
Rollback rolls back the current transaction. The retry rules are the same as Execute.
func (*ShardConn) SplitQuery ¶
func (sdc *ShardConn) SplitQuery(ctx context.Context, sql string, bindVariables map[string]interface{}, splitColumn string, splitCount int64) (queries []querytypes.QuerySplit, err error)
SplitQuery splits a query into sub queries. The retry rules are the same as Execute.
func (*ShardConn) StreamExecute ¶
func (sdc *ShardConn) StreamExecute(ctx context.Context, query string, bindVars map[string]interface{}, transactionID int64) (<-chan *sqltypes.Result, tabletconn.ErrFunc)
StreamExecute executes a streaming query on vttablet. The retry rules are the same as Execute.
func (*ShardConn) WrapError ¶
func (sdc *ShardConn) WrapError(in error, endPoint *topodatapb.EndPoint, inTransaction bool) (wrapped error)
WrapError returns ShardConnError which preserves the original error code if possible, adds the connection context and adds a bit to determine whether the keyspace/shard needs to be re-resolved for a potential sharding event.
type ShardConnError ¶
type ShardConnError struct { Code int ShardIdentifier string InTransaction bool // Preserve the original error, so that we don't need to parse the error string. Err error // EndPointCode is the error code to use for all the endpoint errors in aggregate EndPointCode vtrpcpb.ErrorCode }
ShardConnError is the shard conn specific error.
func (*ShardConnError) Error ¶
func (e *ShardConnError) Error() string
func (*ShardConnError) VtErrorCode ¶
func (e *ShardConnError) VtErrorCode() vtrpcpb.ErrorCode
VtErrorCode returns the underlying Vitess error code
type SrvKeyspaceCacheStatus ¶
type SrvKeyspaceCacheStatus struct { Cell string Keyspace string Value *topodatapb.SrvKeyspace LastError error LastErrorCtx context.Context }
SrvKeyspaceCacheStatus is the current value for a SrvKeyspace object
func (*SrvKeyspaceCacheStatus) StatusAsHTML ¶
func (st *SrvKeyspaceCacheStatus) StatusAsHTML() template.HTML
StatusAsHTML returns an HTML version of our status. It works best if there is data in the cache.
type SrvKeyspaceCacheStatusList ¶
type SrvKeyspaceCacheStatusList []*SrvKeyspaceCacheStatus
SrvKeyspaceCacheStatusList is used for sorting
func (SrvKeyspaceCacheStatusList) Len ¶
func (skcsl SrvKeyspaceCacheStatusList) Len() int
Len is part of sort.Interface
func (SrvKeyspaceCacheStatusList) Less ¶
func (skcsl SrvKeyspaceCacheStatusList) Less(i, j int) bool
Less is part of sort.Interface
func (SrvKeyspaceCacheStatusList) Swap ¶
func (skcsl SrvKeyspaceCacheStatusList) Swap(i, j int)
Swap is part of sort.Interface
type SrvKeyspaceNamesCacheStatus ¶
type SrvKeyspaceNamesCacheStatus struct { Cell string Value []string LastError error LastErrorCtx context.Context }
SrvKeyspaceNamesCacheStatus is the current value for SrvKeyspaceNames
type SrvKeyspaceNamesCacheStatusList ¶
type SrvKeyspaceNamesCacheStatusList []*SrvKeyspaceNamesCacheStatus
SrvKeyspaceNamesCacheStatusList is used for sorting
func (SrvKeyspaceNamesCacheStatusList) Len ¶
func (skncsl SrvKeyspaceNamesCacheStatusList) Len() int
Len is part of sort.Interface
func (SrvKeyspaceNamesCacheStatusList) Less ¶
func (skncsl SrvKeyspaceNamesCacheStatusList) Less(i, j int) bool
Less is part of sort.Interface
func (SrvKeyspaceNamesCacheStatusList) Swap ¶
func (skncsl SrvKeyspaceNamesCacheStatusList) Swap(i, j int)
Swap is part of sort.Interface
type SrvShardCacheStatus ¶
type SrvShardCacheStatus struct { Cell string Keyspace string Shard string Value *topodatapb.SrvShard LastError error LastErrorCtx context.Context }
SrvShardCacheStatus is the current value for a SrvShard object
func (*SrvShardCacheStatus) StatusAsHTML ¶
func (st *SrvShardCacheStatus) StatusAsHTML() template.HTML
StatusAsHTML returns an HTML version of our status. It works best if there is data in the cache.
type SrvShardCacheStatusList ¶
type SrvShardCacheStatusList []*SrvShardCacheStatus
SrvShardCacheStatusList is used for sorting
func (SrvShardCacheStatusList) Len ¶
func (sscsl SrvShardCacheStatusList) Len() int
Len is part of sort.Interface
func (SrvShardCacheStatusList) Less ¶
func (sscsl SrvShardCacheStatusList) Less(i, j int) bool
Less is part of sort.Interface
func (SrvShardCacheStatusList) Swap ¶
func (sscsl SrvShardCacheStatusList) Swap(i, j int)
Swap is part of sort.Interface
type VTGate ¶
type VTGate struct {
// contains filtered or unexported fields
}
VTGate is the rpc interface to vtgate. Only one instance can be created. It implements vtgateservice.VTGateService
func Init ¶
func Init(hc discovery.HealthCheck, topoServer topo.Server, serv topo.SrvTopoServer, schema *planbuilder.Schema, cell string, retryDelay time.Duration, retryCount int, connTimeoutTotal, connTimeoutPerConn, connLife time.Duration, tabletTypesToWait []topodatapb.TabletType, maxInFlight int, testGateway string) *VTGate
Init initializes VTGate server.
func (*VTGate) Execute ¶
func (vtg *VTGate) Execute(ctx context.Context, sql string, bindVariables map[string]interface{}, tabletType topodatapb.TabletType, session *vtgatepb.Session, notInTransaction bool) (*sqltypes.Result, error)
Execute executes a non-streaming query by routing based on the values in the query.
func (*VTGate) ExecuteBatchKeyspaceIds ¶
func (vtg *VTGate) ExecuteBatchKeyspaceIds(ctx context.Context, queries []*vtgatepb.BoundKeyspaceIdQuery, tabletType topodatapb.TabletType, asTransaction bool, session *vtgatepb.Session) ([]sqltypes.Result, error)
ExecuteBatchKeyspaceIds executes a group of queries based on the specified keyspace ids.
func (*VTGate) ExecuteBatchShards ¶
func (vtg *VTGate) ExecuteBatchShards(ctx context.Context, queries []*vtgatepb.BoundShardQuery, tabletType topodatapb.TabletType, asTransaction bool, session *vtgatepb.Session) ([]sqltypes.Result, error)
ExecuteBatchShards executes a group of queries on the specified shards.
func (*VTGate) ExecuteEntityIds ¶
func (vtg *VTGate) ExecuteEntityIds(ctx context.Context, sql string, bindVariables map[string]interface{}, keyspace string, entityColumnName string, entityKeyspaceIDs []*vtgatepb.ExecuteEntityIdsRequest_EntityId, tabletType topodatapb.TabletType, session *vtgatepb.Session, notInTransaction bool) (*sqltypes.Result, error)
ExecuteEntityIds excutes a non-streaming query based on given KeyspaceId map.
func (*VTGate) ExecuteKeyRanges ¶
func (vtg *VTGate) ExecuteKeyRanges(ctx context.Context, sql string, bindVariables map[string]interface{}, keyspace string, keyRanges []*topodatapb.KeyRange, tabletType topodatapb.TabletType, session *vtgatepb.Session, notInTransaction bool) (*sqltypes.Result, error)
ExecuteKeyRanges executes a non-streaming query based on the specified keyranges.
func (*VTGate) ExecuteKeyspaceIds ¶
func (vtg *VTGate) ExecuteKeyspaceIds(ctx context.Context, sql string, bindVariables map[string]interface{}, keyspace string, keyspaceIds [][]byte, tabletType topodatapb.TabletType, session *vtgatepb.Session, notInTransaction bool) (*sqltypes.Result, error)
ExecuteKeyspaceIds executes a non-streaming query based on the specified keyspace ids.
func (*VTGate) ExecuteShards ¶
func (vtg *VTGate) ExecuteShards(ctx context.Context, sql string, bindVariables map[string]interface{}, keyspace string, shards []string, tabletType topodatapb.TabletType, session *vtgatepb.Session, notInTransaction bool) (*sqltypes.Result, error)
ExecuteShards executes a non-streaming query on the specified shards.
func (*VTGate) GetGatewayCacheStatus ¶
func (vtg *VTGate) GetGatewayCacheStatus() GatewayEndPointCacheStatusList
GetGatewayCacheStatus returns a displayable version of the Gateway cache.
func (*VTGate) GetSrvKeyspace ¶
func (vtg *VTGate) GetSrvKeyspace(ctx context.Context, keyspace string) (*topodatapb.SrvKeyspace, error)
GetSrvKeyspace is part of the vtgate service API.
func (*VTGate) GetSrvShard ¶
func (vtg *VTGate) GetSrvShard(ctx context.Context, keyspace, shard string) (*topodatapb.SrvShard, error)
GetSrvShard is part of the vtgate service API.
func (*VTGate) HandlePanic ¶
HandlePanic recovers from panics, and logs / increment counters
func (*VTGate) InitializeConnections ¶
InitializeConnections pre-initializes VTGate by connecting to vttablets of all keyspace/shard/type. It is not necessary to call this function before serving queries, but it would reduce connection overhead when serving.
func (*VTGate) SplitQuery ¶
func (vtg *VTGate) SplitQuery(ctx context.Context, keyspace string, sql string, bindVariables map[string]interface{}, splitColumn string, splitCount int64) ([]*vtgatepb.SplitQueryResponse_Part, error)
SplitQuery splits a query into sub queries by appending keyranges and primary key range clauses. Rows corresponding to the sub queries are guaranteed to be non-overlapping and will add up to the rows of original query. Number of sub queries will be a multiple of N that is greater than or equal to SplitQueryRequest.SplitCount, where N is the number of shards.
func (*VTGate) StreamExecute ¶
func (vtg *VTGate) StreamExecute(ctx context.Context, sql string, bindVariables map[string]interface{}, tabletType topodatapb.TabletType, sendReply func(*sqltypes.Result) error) error
StreamExecute executes a streaming query by routing based on the values in the query.
func (*VTGate) StreamExecuteKeyRanges ¶
func (vtg *VTGate) StreamExecuteKeyRanges(ctx context.Context, sql string, bindVariables map[string]interface{}, keyspace string, keyRanges []*topodatapb.KeyRange, tabletType topodatapb.TabletType, sendReply func(*sqltypes.Result) error) error
StreamExecuteKeyRanges executes a streaming query on the specified KeyRanges. The KeyRanges are resolved to shards using the serving graph. This function currently temporarily enforces the restriction of executing on one shard since it cannot merge-sort the results to guarantee ordering of response which is needed for checkpointing. The api supports supplying multiple keyranges to make it future proof.
func (*VTGate) StreamExecuteKeyspaceIds ¶
func (vtg *VTGate) StreamExecuteKeyspaceIds(ctx context.Context, sql string, bindVariables map[string]interface{}, keyspace string, keyspaceIds [][]byte, tabletType topodatapb.TabletType, sendReply func(*sqltypes.Result) error) error
StreamExecuteKeyspaceIds executes a streaming query on the specified KeyspaceIds. The KeyspaceIds are resolved to shards using the serving graph. This function currently temporarily enforces the restriction of executing on one shard since it cannot merge-sort the results to guarantee ordering of response which is needed for checkpointing. The api supports supplying multiple KeyspaceIds to make it future proof.
func (*VTGate) StreamExecuteShards ¶
func (vtg *VTGate) StreamExecuteShards(ctx context.Context, sql string, bindVariables map[string]interface{}, keyspace string, shards []string, tabletType topodatapb.TabletType, sendReply func(*sqltypes.Result) error) error
StreamExecuteShards executes a streaming query on the specified shards.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package fakerpcvtgateconn provides a fake implementation of vtgateconn.Impl that doesn't do any RPC, but uses a local map to return results.
|
Package fakerpcvtgateconn provides a fake implementation of vtgateconn.Impl that doesn't do any RPC, but uses a local map to return results. |
Package gorpcvtgatecommon contains common data structures for go rpc vtgate client and server.
|
Package gorpcvtgatecommon contains common data structures for go rpc vtgate client and server. |
Package gorpcvtgateconn provides go rpc connectivity for VTGate.
|
Package gorpcvtgateconn provides go rpc connectivity for VTGate. |
Package gorpcvtgateservice provides to go rpc glue for vtgate
|
Package gorpcvtgateservice provides to go rpc glue for vtgate |
Package grpcvtgateconn provides gRPC connectivity for VTGate.
|
Package grpcvtgateconn provides gRPC connectivity for VTGate. |
Package grpcvtgateservice provides the gRPC glue for vtgate
|
Package grpcvtgateservice provides the gRPC glue for vtgate |
Package vtgateconntest provides the test methods to make sure a vtgateconn/vtgateservice pair over RPC works correctly.
|
Package vtgateconntest provides the test methods to make sure a vtgateconn/vtgateservice pair over RPC works correctly. |
Package vtgateservice provides to interface definition for the vtgate service
|
Package vtgateservice provides to interface definition for the vtgate service |