Documentation ¶
Overview ¶
Package tabletconntest provides the test methods to make sure a tabletconn/queryservice pair over RPC works correctly.
Index ¶
- Constants
- Variables
- func TargetsEqual(t1, t2 []*querypb.Target) bool
- func TestSuite(t *testing.T, protocol string, tablet *topodatapb.Tablet, ...)
- type FakeQueryService
- func (f *FakeQueryService) Begin(ctx context.Context, target *querypb.Target, options *querypb.ExecuteOptions) (int64, error)
- func (f *FakeQueryService) BeginExecute(ctx context.Context, target *querypb.Target, sql string, ...) (*sqltypes.Result, int64, error)
- func (f *FakeQueryService) BeginExecuteBatch(ctx context.Context, target *querypb.Target, queries []*querypb.BoundQuery, ...) ([]sqltypes.Result, int64, error)
- func (f *FakeQueryService) Close(ctx context.Context) error
- func (f *FakeQueryService) Commit(ctx context.Context, target *querypb.Target, transactionID int64) error
- func (f *FakeQueryService) CommitPrepared(ctx context.Context, target *querypb.Target, dtid string) (err error)
- func (f *FakeQueryService) ConcludeTransaction(ctx context.Context, target *querypb.Target, dtid string) (err error)
- func (f *FakeQueryService) CreateTransaction(ctx context.Context, target *querypb.Target, dtid string, ...) (err error)
- func (f *FakeQueryService) Execute(ctx context.Context, target *querypb.Target, sql string, ...) (*sqltypes.Result, error)
- func (f *FakeQueryService) ExecuteBatch(ctx context.Context, target *querypb.Target, queries []*querypb.BoundQuery, ...) ([]sqltypes.Result, error)
- func (f *FakeQueryService) HandlePanic(err *error)
- func (f *FakeQueryService) MessageAck(ctx context.Context, target *querypb.Target, name string, ids []*querypb.Value) (count int64, err error)
- func (f *FakeQueryService) MessageStream(ctx context.Context, target *querypb.Target, name string, ...) (err error)
- func (f *FakeQueryService) Prepare(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (err error)
- func (f *FakeQueryService) ReadTransaction(ctx context.Context, target *querypb.Target, dtid string) (metadata *querypb.TransactionMetadata, err error)
- func (f *FakeQueryService) Rollback(ctx context.Context, target *querypb.Target, transactionID int64) error
- func (f *FakeQueryService) RollbackPrepared(ctx context.Context, target *querypb.Target, dtid string, originalID int64) (err error)
- func (f *FakeQueryService) SetRollback(ctx context.Context, target *querypb.Target, dtid string, transactionID int64) (err error)
- func (f *FakeQueryService) SplitQuery(ctx context.Context, target *querypb.Target, query *querypb.BoundQuery, ...) ([]*querypb.QuerySplit, error)
- func (f *FakeQueryService) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (err error)
- func (f *FakeQueryService) StreamExecute(ctx context.Context, target *querypb.Target, sql string, ...) error
- func (f *FakeQueryService) StreamHealth(ctx context.Context, callback func(*querypb.StreamHealthResponse) error) error
- func (f *FakeQueryService) UpdateStream(ctx context.Context, target *querypb.Target, position string, timestamp int64, ...) error
- func (f *FakeQueryService) VStream(ctx context.Context, target *querypb.Target, position string, ...) error
- func (f *FakeQueryService) VStreamRows(ctx context.Context, target *querypb.Target, query string, ...) error
Constants ¶
const ( // UpdateStreamPosition is a test update stream position. UpdateStreamPosition = "update stream position" // UpdateStreamTimestamp is a test update stream timestamp. UpdateStreamTimestamp = 123654 )
const BeginTransactionID int64 = 9990
BeginTransactionID is a test transaction id for Begin.
const CommitTransactionID int64 = 999044
CommitTransactionID is a test transaction id for Commit.
const Dtid string = "aa"
Dtid is a test dtid
const ExecuteBatchTransactionID int64 = 678
ExecuteBatchTransactionID is a test transaction id for batch.
const ExecuteQuery = "executeQuery"
ExecuteQuery is a fake test query.
const ExecuteTransactionID int64 = 678
ExecuteTransactionID is a test transaction id.
const RollbackTransactionID int64 = 999044
RollbackTransactionID is a test transactin id for Rollback.
const SplitQueryAlgorithm = querypb.SplitQueryRequest_FULL_SCAN
SplitQueryAlgorithm is a test algorithm for splits.
const SplitQueryNumRowsPerQueryPart = 123
SplitQueryNumRowsPerQueryPart is a test num rows for split.
const SplitQuerySplitCount = 372
SplitQuerySplitCount is a test split count.
const StreamExecuteQuery = "streamExecuteQuery"
StreamExecuteQuery is a fake test query for streaming.
const TestAsTransaction bool = true
TestAsTransaction is a test 'asTransaction' flag.
Variables ¶
var ( // MessageName is a test message name. MessageName = "vitess_message" // MessageStreamResult is a test stream result. MessageStreamResult = &sqltypes.Result{ Fields: []*querypb.Field{{ Name: "id", Type: sqltypes.VarBinary, }, { Name: "message", Type: sqltypes.VarBinary, }}, Rows: [][]sqltypes.Value{{ sqltypes.NewVarBinary("1"), sqltypes.NewVarBinary("row1 value2"), }, { sqltypes.NewVarBinary("2"), sqltypes.NewVarBinary("row2 value2"), }}, } // MessageIDs is a test list of message ids. MessageIDs = []*querypb.Value{{ Type: sqltypes.VarChar, Value: []byte("1"), }} )
var ExecuteBatchQueries = []*querypb.BoundQuery{ { Sql: "executeBatchQueries1", BindVariables: map[string]*querypb.BindVariable{ "bind1": sqltypes.Int64BindVariable(43), }, }, { Sql: "executeBatchQueries2", BindVariables: map[string]*querypb.BindVariable{ "bind2": sqltypes.Int64BindVariable(72), }, }, }
ExecuteBatchQueries are test queries for batch.
var ExecuteBatchQueryResultList = []sqltypes.Result{ { Fields: []*querypb.Field{ { Name: "field1", Type: sqltypes.Int8, }, }, RowsAffected: 1232, InsertID: 712, Rows: [][]sqltypes.Value{ { sqltypes.TestValue(sqltypes.Int8, "1"), }, { sqltypes.TestValue(sqltypes.Int8, "2"), }, }, Extras: &querypb.ResultExtras{ EventToken: &querypb.EventToken{ Timestamp: 456322, Shard: "test_shard2", Position: "test_position2", }, Fresher: true, }, }, { Fields: []*querypb.Field{ { Name: "field1", Type: sqltypes.VarBinary, }, }, RowsAffected: 12333, InsertID: 74442, Rows: [][]sqltypes.Value{ { sqltypes.NewVarBinary("row1 value1"), }, { sqltypes.NewVarBinary("row1 value2"), }, }, }, }
ExecuteBatchQueryResultList is a list of test query results.
var ExecuteBindVars = map[string]*querypb.BindVariable{ "bind1": sqltypes.Int64BindVariable(1114444), }
ExecuteBindVars is a test bind var.
var ExecuteQueryResult = sqltypes.Result{ Fields: []*querypb.Field{ { Name: "field1", Type: sqltypes.Int8, }, { Name: "field2", Type: sqltypes.Char, }, }, RowsAffected: 123, InsertID: 72, Rows: [][]sqltypes.Value{ { sqltypes.TestValue(sqltypes.Int8, "1"), sqltypes.NULL, }, { sqltypes.TestValue(sqltypes.Int8, "2"), sqltypes.TestValue(sqltypes.Char, "row2 value2"), }, }, Extras: &querypb.ResultExtras{ EventToken: &querypb.EventToken{ Timestamp: 456321, Shard: "test_shard", Position: "test_position", }, Fresher: true, }, }
ExecuteQueryResult is a test query result.
var Metadata = &querypb.TransactionMetadata{ Dtid: "aa", State: querypb.TransactionState_PREPARE, TimeCreated: 1, Participants: Participants, }
Metadata is a test metadata for 2pc transactions.
var Participants = []*querypb.Target{{
Keyspace: "ks0",
Shard: "0",
}, {
Keyspace: "ks1",
Shard: "1",
}}
Participants is a test list of 2pc participants.
var SplitQueryBoundQuery = &querypb.BoundQuery{ Sql: "splitQuery", BindVariables: map[string]*querypb.BindVariable{ "bind1": sqltypes.Int64BindVariable(43), }, }
SplitQueryBoundQuery is a test query for splits.
var SplitQueryQuerySplitList = []*querypb.QuerySplit{ { Query: &querypb.BoundQuery{ Sql: "splitQuery", BindVariables: map[string]*querypb.BindVariable{ "bind1": sqltypes.Int64BindVariable(43), "keyspace_id": sqltypes.Int64BindVariable(3333), }, }, RowCount: 4456, }, }
SplitQueryQuerySplitList is a test result for splits.
var SplitQuerySplitColumns = []string{"nice_column_to_split"}
SplitQuerySplitColumns is a test list for column splits.
var StreamExecuteBindVars = map[string]*querypb.BindVariable{ "bind1": sqltypes.Int64BindVariable(93848000), }
StreamExecuteBindVars is a test bind var for streaming.
var StreamExecuteQueryResult1 = sqltypes.Result{ Fields: []*querypb.Field{ { Name: "field1", Type: sqltypes.Int8, }, { Name: "field2", Type: sqltypes.Char, }, }, }
StreamExecuteQueryResult1 is the first packet of a streaming result.
var StreamExecuteQueryResult2 = sqltypes.Result{ Rows: [][]sqltypes.Value{ { sqltypes.TestValue(sqltypes.Int8, "1"), sqltypes.TestValue(sqltypes.Char, "row1 value2"), }, { sqltypes.TestValue(sqltypes.Int8, "2"), sqltypes.TestValue(sqltypes.Char, "row2 value2"), }, }, }
StreamExecuteQueryResult2 is the second packet of a streaming result.
var TestCallerID = &vtrpcpb.CallerID{
Principal: "test_principal",
Component: "test_component",
Subcomponent: "test_subcomponent",
}
TestCallerID is a test caller id.
var TestExecuteOptions = &querypb.ExecuteOptions{ IncludedFields: querypb.ExecuteOptions_TYPE_ONLY, IncludeEventToken: true, CompareEventToken: &querypb.EventToken{ Timestamp: 9876, Shard: "ssss", Position: "pppp", }, ClientFoundRows: true, }
TestExecuteOptions is a test execute options.
var TestStreamHealthErrorMsg = "to trigger a server error"
TestStreamHealthErrorMsg is a test error message for health streaming.
var TestStreamHealthStreamHealthResponse = &querypb.StreamHealthResponse{ Target: &querypb.Target{ Keyspace: "test_keyspace", Shard: "test_shard", TabletType: topodatapb.TabletType_RDONLY, }, Serving: true, TabletExternallyReparentedTimestamp: 1234589, RealtimeStats: &querypb.RealtimeStats{ HealthError: "random error", SecondsBehindMaster: 234, BinlogPlayersCount: 1, SecondsBehindMasterFilteredReplication: 2, CpuUsage: 1.0, }, }
TestStreamHealthStreamHealthResponse is a test stream health response.
var TestTarget = &querypb.Target{ Keyspace: "test_keyspace", Shard: "test_shard", TabletType: topodatapb.TabletType_REPLICA, }
TestTarget is the target we use for this test
var TestVTGateCallerID = &querypb.VTGateCallerID{
Username: "test_username",
}
TestVTGateCallerID is a test vtgate caller id.
var UpdateStreamStreamEvent1 = querypb.StreamEvent{ Statements: []*querypb.StreamEvent_Statement{ { Category: querypb.StreamEvent_Statement_DML, TableName: "table1", }, }, EventToken: &querypb.EventToken{ Timestamp: 789654, Shard: "shard1", Position: "streaming position 1", }, }
UpdateStreamStreamEvent1 is a test update stream event.
var UpdateStreamStreamEvent2 = querypb.StreamEvent{ Statements: []*querypb.StreamEvent_Statement{ { Category: querypb.StreamEvent_Statement_DML, TableName: "table2", }, }, EventToken: &querypb.EventToken{ Timestamp: 789655, Shard: "shard1", Position: "streaming position 2", }, }
UpdateStreamStreamEvent2 is a test update stream event.
Functions ¶
func TargetsEqual ¶
TargetsEqual returns true if the targets are equal.
func TestSuite ¶
func TestSuite(t *testing.T, protocol string, tablet *topodatapb.Tablet, fake *FakeQueryService, clientCreds *os.File)
TestSuite runs all the tests. If fake.TestingGateway is set, we only test the calls that can go through a gateway.
Types ¶
type FakeQueryService ¶
type FakeQueryService struct { TestingGateway bool // these fields are used to simulate and synchronize on errors HasError bool HasBeginError bool TabletError error ErrorWait chan struct{} // these fields are used to simulate and synchronize on panics Panics bool StreamExecutePanicsEarly bool UpdateStreamPanicsEarly bool PanicWait chan struct{} // ExpectedTransactionID is what transactionID to expect for Execute ExpectedTransactionID int64 // StreamHealthResponse is what we return for StreamHealth. // If not set, return TestStreamHealthStreamHealthResponse StreamHealthResponse *querypb.StreamHealthResponse // contains filtered or unexported fields }
FakeQueryService implements a programmable fake for the query service server side.
func CreateFakeServer ¶
func CreateFakeServer(t *testing.T) *FakeQueryService
CreateFakeServer returns the fake server for the tests
func (*FakeQueryService) Begin ¶
func (f *FakeQueryService) Begin(ctx context.Context, target *querypb.Target, options *querypb.ExecuteOptions) (int64, error)
Begin is part of the queryservice.QueryService interface
func (*FakeQueryService) BeginExecute ¶
func (f *FakeQueryService) BeginExecute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (*sqltypes.Result, int64, error)
BeginExecute combines Begin and Execute.
func (*FakeQueryService) BeginExecuteBatch ¶
func (f *FakeQueryService) BeginExecuteBatch(ctx context.Context, target *querypb.Target, queries []*querypb.BoundQuery, asTransaction bool, options *querypb.ExecuteOptions) ([]sqltypes.Result, int64, error)
BeginExecuteBatch combines Begin and ExecuteBatch.
func (*FakeQueryService) Close ¶
func (f *FakeQueryService) Close(ctx context.Context) error
Close is a no-op.
func (*FakeQueryService) Commit ¶
func (f *FakeQueryService) Commit(ctx context.Context, target *querypb.Target, transactionID int64) error
Commit is part of the queryservice.QueryService interface
func (*FakeQueryService) CommitPrepared ¶
func (f *FakeQueryService) CommitPrepared(ctx context.Context, target *querypb.Target, dtid string) (err error)
CommitPrepared is part of the queryservice.QueryService interface
func (*FakeQueryService) ConcludeTransaction ¶
func (f *FakeQueryService) ConcludeTransaction(ctx context.Context, target *querypb.Target, dtid string) (err error)
ConcludeTransaction is part of the queryservice.QueryService interface
func (*FakeQueryService) CreateTransaction ¶
func (f *FakeQueryService) CreateTransaction(ctx context.Context, target *querypb.Target, dtid string, participants []*querypb.Target) (err error)
CreateTransaction is part of the queryservice.QueryService interface
func (*FakeQueryService) Execute ¶
func (f *FakeQueryService) Execute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]*querypb.BindVariable, transactionID int64, options *querypb.ExecuteOptions) (*sqltypes.Result, error)
Execute is part of the queryservice.QueryService interface
func (*FakeQueryService) ExecuteBatch ¶
func (f *FakeQueryService) ExecuteBatch(ctx context.Context, target *querypb.Target, queries []*querypb.BoundQuery, asTransaction bool, transactionID int64, options *querypb.ExecuteOptions) ([]sqltypes.Result, error)
ExecuteBatch is part of the queryservice.QueryService interface
func (*FakeQueryService) HandlePanic ¶
func (f *FakeQueryService) HandlePanic(err *error)
HandlePanic is part of the queryservice.QueryService interface
func (*FakeQueryService) MessageAck ¶
func (f *FakeQueryService) MessageAck(ctx context.Context, target *querypb.Target, name string, ids []*querypb.Value) (count int64, err error)
MessageAck is part of the queryservice.QueryService interface
func (*FakeQueryService) MessageStream ¶
func (f *FakeQueryService) MessageStream(ctx context.Context, target *querypb.Target, name string, callback func(*sqltypes.Result) error) (err error)
MessageStream is part of the queryservice.QueryService interface
func (*FakeQueryService) Prepare ¶
func (f *FakeQueryService) Prepare(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (err error)
Prepare is part of the queryservice.QueryService interface
func (*FakeQueryService) ReadTransaction ¶
func (f *FakeQueryService) ReadTransaction(ctx context.Context, target *querypb.Target, dtid string) (metadata *querypb.TransactionMetadata, err error)
ReadTransaction is part of the queryservice.QueryService interface
func (*FakeQueryService) Rollback ¶
func (f *FakeQueryService) Rollback(ctx context.Context, target *querypb.Target, transactionID int64) error
Rollback is part of the queryservice.QueryService interface
func (*FakeQueryService) RollbackPrepared ¶
func (f *FakeQueryService) RollbackPrepared(ctx context.Context, target *querypb.Target, dtid string, originalID int64) (err error)
RollbackPrepared is part of the queryservice.QueryService interface
func (*FakeQueryService) SetRollback ¶
func (f *FakeQueryService) SetRollback(ctx context.Context, target *querypb.Target, dtid string, transactionID int64) (err error)
SetRollback is part of the queryservice.QueryService interface
func (*FakeQueryService) SplitQuery ¶
func (f *FakeQueryService) SplitQuery( ctx context.Context, target *querypb.Target, query *querypb.BoundQuery, splitColumns []string, splitCount int64, numRowsPerQueryPart int64, algorithm querypb.SplitQueryRequest_Algorithm, ) ([]*querypb.QuerySplit, error)
SplitQuery is part of the queryservice.QueryService interface
func (*FakeQueryService) StartCommit ¶
func (f *FakeQueryService) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (err error)
StartCommit is part of the queryservice.QueryService interface
func (*FakeQueryService) StreamExecute ¶
func (f *FakeQueryService) StreamExecute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]*querypb.BindVariable, transactionID int64, options *querypb.ExecuteOptions, callback func(*sqltypes.Result) error) error
StreamExecute is part of the queryservice.QueryService interface
func (*FakeQueryService) StreamHealth ¶
func (f *FakeQueryService) StreamHealth(ctx context.Context, callback func(*querypb.StreamHealthResponse) error) error
StreamHealth is part of the queryservice.QueryService interface
func (*FakeQueryService) UpdateStream ¶
func (f *FakeQueryService) UpdateStream(ctx context.Context, target *querypb.Target, position string, timestamp int64, callback func(*querypb.StreamEvent) error) error
UpdateStream is part of the queryservice.QueryService interface
func (*FakeQueryService) VStream ¶ added in v1.5.0
func (f *FakeQueryService) VStream(ctx context.Context, target *querypb.Target, position string, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error
VStream is part of the queryservice.QueryService interface
func (*FakeQueryService) VStreamRows ¶ added in v1.6.0
func (f *FakeQueryService) VStreamRows(ctx context.Context, target *querypb.Target, query string, lastpk *querypb.QueryResult, send func(*binlogdatapb.VStreamRowsResponse) error) error
VStreamRows is part of the QueryService interface.