Documentation ¶
Overview ¶
Package tabletconntest provides the test methods to make sure a tabletconn/queryservice pair over RPC works correctly.
Index ¶
- Constants
- Variables
- func TestSuite(t *testing.T, protocol string, tablet *topodatapb.Tablet, ...)
- type FakeQueryService
- func (f *FakeQueryService) Begin(ctx context.Context, target *querypb.Target) (int64, error)
- func (f *FakeQueryService) BeginExecute(ctx context.Context, target *querypb.Target, sql string, ...) (*sqltypes.Result, int64, error)
- func (f *FakeQueryService) BeginExecuteBatch(ctx context.Context, target *querypb.Target, queries []querytypes.BoundQuery, ...) ([]sqltypes.Result, int64, error)
- func (f *FakeQueryService) Commit(ctx context.Context, target *querypb.Target, transactionID int64) error
- func (f *FakeQueryService) CommitPrepared(ctx context.Context, target *querypb.Target, dtid string) (err error)
- func (f *FakeQueryService) CreateTransaction(ctx context.Context, target *querypb.Target, dtid string, ...) (err error)
- func (f *FakeQueryService) Execute(ctx context.Context, target *querypb.Target, sql string, ...) (*sqltypes.Result, error)
- func (f *FakeQueryService) ExecuteBatch(ctx context.Context, target *querypb.Target, queries []querytypes.BoundQuery, ...) ([]sqltypes.Result, error)
- func (f *FakeQueryService) HandlePanic(err *error)
- func (f *FakeQueryService) Prepare(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (err error)
- func (f *FakeQueryService) ReadTransaction(ctx context.Context, target *querypb.Target, dtid string) (metadata *querypb.TransactionMetadata, err error)
- func (f *FakeQueryService) ResolveTransaction(ctx context.Context, target *querypb.Target, dtid string) (err error)
- func (f *FakeQueryService) Rollback(ctx context.Context, target *querypb.Target, transactionID int64) error
- func (f *FakeQueryService) RollbackPrepared(ctx context.Context, target *querypb.Target, dtid string, originalID int64) (err error)
- func (f *FakeQueryService) SetRollback(ctx context.Context, target *querypb.Target, dtid string, transactionID int64) (err error)
- func (f *FakeQueryService) SplitQuery(ctx context.Context, target *querypb.Target, sql string, ...) ([]querytypes.QuerySplit, error)
- func (f *FakeQueryService) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (err error)
- func (f *FakeQueryService) StreamExecute(ctx context.Context, target *querypb.Target, sql string, ...) error
- func (f *FakeQueryService) StreamHealthRegister(c chan<- *querypb.StreamHealthResponse) (int, error)
- func (f *FakeQueryService) StreamHealthUnregister(int) error
- func (f *FakeQueryService) UpdateStream(ctx context.Context, target *querypb.Target, position string, timestamp int64, ...) error
Constants ¶
const BeginTransactionID int64 = 9990
const CommitTransactionID int64 = 999044
const Dtid string = "aa"
const ExecuteBatchTransactionID int64 = 678
const ExecuteQuery = "executeQuery"
const ExecuteTransactionID int64 = 678
const RollbackTransactionID int64 = 999044
const SplitQueryAlgorithm = querypb.SplitQueryRequest_FULL_SCAN
const SplitQueryNumRowsPerQueryPart = 123
const SplitQuerySplitCount = 372
const StreamExecuteQuery = "streamExecuteQuery"
const TestAsTransaction bool = true
const UpdateStreamPosition = "update stream position"
const UpdateStreamTimestamp = 123654
Variables ¶
var ExecuteBatchQueries = []querytypes.BoundQuery{ { Sql: "executeBatchQueries1", BindVariables: map[string]interface{}{ "bind1": int64(43), }, }, { Sql: "executeBatchQueries2", BindVariables: map[string]interface{}{ "bind2": int64(72), }, }, }
var ExecuteBatchQueryResultList = []sqltypes.Result{ { Fields: []*querypb.Field{ { Name: "field1", Type: sqltypes.Int8, }, }, RowsAffected: 1232, InsertID: 712, Rows: [][]sqltypes.Value{ { sqltypes.MakeTrusted(sqltypes.Int8, []byte("1")), }, { sqltypes.MakeTrusted(sqltypes.Int8, []byte("2")), }, }, Extras: &querypb.ResultExtras{ EventToken: &querypb.EventToken{ Timestamp: 456322, Shard: "test_shard2", Position: "test_position2", }, Fresher: true, }, }, { Fields: []*querypb.Field{ { Name: "field1", Type: sqltypes.VarBinary, }, }, RowsAffected: 12333, InsertID: 74442, Rows: [][]sqltypes.Value{ { sqltypes.MakeString([]byte("row1 value1")), }, { sqltypes.MakeString([]byte("row1 value2")), }, }, }, }
var ExecuteBindVars = map[string]interface{}{ "bind1": int64(1114444), }
var ExecuteQueryResult = sqltypes.Result{ Fields: []*querypb.Field{ { Name: "field1", Type: sqltypes.Int8, }, { Name: "field2", Type: sqltypes.Char, }, }, RowsAffected: 123, InsertID: 72, Rows: [][]sqltypes.Value{ { sqltypes.MakeTrusted(sqltypes.Int8, []byte("1")), sqltypes.NULL, }, { sqltypes.MakeTrusted(sqltypes.Int8, []byte("2")), sqltypes.MakeTrusted(sqltypes.Char, []byte("row2 value2")), }, }, Extras: &querypb.ResultExtras{ EventToken: &querypb.EventToken{ Timestamp: 456321, Shard: "test_shard", Position: "test_position", }, Fresher: true, }, }
var Metadata = &querypb.TransactionMetadata{ Dtid: "aa", State: querypb.TransactionState_PREPARE, TimeCreated: 1, TimeUpdated: 2, Participants: Participants, }
var Participants = []*querypb.Target{{
Keyspace: "ks0",
Shard: "0",
}, {
Keyspace: "ks1",
Shard: "1",
}}
var SplitQueryBoundQuery = querytypes.BoundQuery{ Sql: "splitQuery", BindVariables: map[string]interface{}{ "bind1": int64(43), }, }
var SplitQueryQuerySplitList = []querytypes.QuerySplit{ { Sql: "splitQuery", BindVariables: map[string]interface{}{ "bind1": int64(43), "keyspace_id": int64(3333), }, RowCount: 4456, }, }
var SplitQuerySplitColumns = []string{"nice_column_to_split"}
var StreamExecuteBindVars = map[string]interface{}{ "bind1": int64(93848000), }
var StreamExecuteQueryResult1 = sqltypes.Result{ Fields: []*querypb.Field{ { Name: "field1", Type: sqltypes.Int8, }, { Name: "field2", Type: sqltypes.Char, }, }, }
var StreamExecuteQueryResult2 = sqltypes.Result{ Rows: [][]sqltypes.Value{ { sqltypes.MakeTrusted(sqltypes.Int8, []byte("1")), sqltypes.MakeTrusted(sqltypes.Char, []byte("row1 value2")), }, { sqltypes.MakeTrusted(sqltypes.Int8, []byte("2")), sqltypes.MakeTrusted(sqltypes.Char, []byte("row2 value2")), }, }, }
var TestCallerID = &vtrpcpb.CallerID{
Principal: "test_principal",
Component: "test_component",
Subcomponent: "test_subcomponent",
}
var TestExecuteOptions = &querypb.ExecuteOptions{ ExcludeFieldNames: true, IncludeEventToken: true, CompareEventToken: &querypb.EventToken{ Timestamp: 9876, Shard: "ssss", Position: "pppp", }, }
var TestStreamHealthErrorMsg = "to trigger a server error"
var TestStreamHealthStreamHealthResponse = &querypb.StreamHealthResponse{ Target: &querypb.Target{ Keyspace: "test_keyspace", Shard: "test_shard", TabletType: topodatapb.TabletType_RDONLY, }, Serving: true, TabletExternallyReparentedTimestamp: 1234589, RealtimeStats: &querypb.RealtimeStats{ HealthError: "random error", SecondsBehindMaster: 234, BinlogPlayersCount: 1, SecondsBehindMasterFilteredReplication: 2, CpuUsage: 1.0, }, }
var TestTarget = &querypb.Target{ Keyspace: "test_keyspace", Shard: "test_shard", TabletType: topodatapb.TabletType_REPLICA, }
TestTarget is the target we use for this test
var TestVTGateCallerID = &querypb.VTGateCallerID{
Username: "test_username",
}
var UpdateStreamStreamEvent1 = querypb.StreamEvent{ Statements: []*querypb.StreamEvent_Statement{ { Category: querypb.StreamEvent_Statement_DML, TableName: "table1", }, }, EventToken: &querypb.EventToken{ Timestamp: 789654, Shard: "shard1", Position: "streaming position 1", }, }
var UpdateStreamStreamEvent2 = querypb.StreamEvent{ Statements: []*querypb.StreamEvent_Statement{ { Category: querypb.StreamEvent_Statement_DML, TableName: "table2", }, }, EventToken: &querypb.EventToken{ Timestamp: 789655, Shard: "shard1", Position: "streaming position 2", }, }
Functions ¶
func TestSuite ¶
func TestSuite(t *testing.T, protocol string, tablet *topodatapb.Tablet, fake *FakeQueryService)
TestSuite runs all the tests. If fake.TestingGateway is set, we only test the calls that can go through a gateway.
Types ¶
type FakeQueryService ¶
type FakeQueryService struct { TestingGateway bool // these fields are used to simulate and synchronize on errors HasError bool HasBeginError bool TabletError *tabletserver.TabletError ErrorWait chan struct{} // these fields are used to simulate and synchronize on panics Panics bool StreamExecutePanicsEarly bool UpdateStreamPanicsEarly bool PanicWait chan struct{} // ExpectedTransactionID is what transactionID to expect for Execute ExpectedTransactionID int64 // StreamHealthResponse is what we return for StreamHealth. // If not set, return TestStreamHealthStreamHealthResponse StreamHealthResponse *querypb.StreamHealthResponse // contains filtered or unexported fields }
FakeQueryService implements a programmable fake for the query service server side.
func CreateFakeServer ¶
func CreateFakeServer(t *testing.T) *FakeQueryService
CreateFakeServer returns the fake server for the tests
func (*FakeQueryService) BeginExecute ¶
func (f *FakeQueryService) BeginExecute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]interface{}, options *querypb.ExecuteOptions) (*sqltypes.Result, int64, error)
BeginExecute combines Begin and Execute.
func (*FakeQueryService) BeginExecuteBatch ¶
func (f *FakeQueryService) BeginExecuteBatch(ctx context.Context, target *querypb.Target, queries []querytypes.BoundQuery, asTransaction bool, options *querypb.ExecuteOptions) ([]sqltypes.Result, int64, error)
BeginExecuteBatch combines Begin and ExecuteBatch.
func (*FakeQueryService) Commit ¶
func (f *FakeQueryService) Commit(ctx context.Context, target *querypb.Target, transactionID int64) error
Commit is part of the queryservice.QueryService interface
func (*FakeQueryService) CommitPrepared ¶
func (f *FakeQueryService) CommitPrepared(ctx context.Context, target *querypb.Target, dtid string) (err error)
CommitPrepared is part of the queryservice.QueryService interface
func (*FakeQueryService) CreateTransaction ¶
func (f *FakeQueryService) CreateTransaction(ctx context.Context, target *querypb.Target, dtid string, participants []*querypb.Target) (err error)
CreateTransaction is part of the queryservice.QueryService interface
func (*FakeQueryService) Execute ¶
func (f *FakeQueryService) Execute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]interface{}, transactionID int64, options *querypb.ExecuteOptions) (*sqltypes.Result, error)
Execute is part of the queryservice.QueryService interface
func (*FakeQueryService) ExecuteBatch ¶
func (f *FakeQueryService) ExecuteBatch(ctx context.Context, target *querypb.Target, queries []querytypes.BoundQuery, asTransaction bool, transactionID int64, options *querypb.ExecuteOptions) ([]sqltypes.Result, error)
ExecuteBatch is part of the queryservice.QueryService interface
func (*FakeQueryService) HandlePanic ¶
func (f *FakeQueryService) HandlePanic(err *error)
HandlePanic is part of the queryservice.QueryService interface
func (*FakeQueryService) Prepare ¶
func (f *FakeQueryService) Prepare(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (err error)
Prepare is part of the queryservice.QueryService interface
func (*FakeQueryService) ReadTransaction ¶
func (f *FakeQueryService) ReadTransaction(ctx context.Context, target *querypb.Target, dtid string) (metadata *querypb.TransactionMetadata, err error)
ReadTransaction is part of the queryservice.QueryService interface
func (*FakeQueryService) ResolveTransaction ¶
func (f *FakeQueryService) ResolveTransaction(ctx context.Context, target *querypb.Target, dtid string) (err error)
ResolveTransaction is part of the queryservice.QueryService interface
func (*FakeQueryService) Rollback ¶
func (f *FakeQueryService) Rollback(ctx context.Context, target *querypb.Target, transactionID int64) error
Rollback is part of the queryservice.QueryService interface
func (*FakeQueryService) RollbackPrepared ¶
func (f *FakeQueryService) RollbackPrepared(ctx context.Context, target *querypb.Target, dtid string, originalID int64) (err error)
RollbackPrepared is part of the queryservice.QueryService interface
func (*FakeQueryService) SetRollback ¶
func (f *FakeQueryService) SetRollback(ctx context.Context, target *querypb.Target, dtid string, transactionID int64) (err error)
SetRollback is part of the queryservice.QueryService interface
func (*FakeQueryService) SplitQuery ¶
func (f *FakeQueryService) SplitQuery( ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]interface{}, splitColumns []string, splitCount int64, numRowsPerQueryPart int64, algorithm querypb.SplitQueryRequest_Algorithm, ) ([]querytypes.QuerySplit, error)
SplitQuery is part of the queryservice.QueryService interface
func (*FakeQueryService) StartCommit ¶
func (f *FakeQueryService) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (err error)
StartCommit is part of the queryservice.QueryService interface
func (*FakeQueryService) StreamExecute ¶
func (f *FakeQueryService) StreamExecute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]interface{}, options *querypb.ExecuteOptions, sendReply func(*sqltypes.Result) error) error
StreamExecute is part of the queryservice.QueryService interface
func (*FakeQueryService) StreamHealthRegister ¶
func (f *FakeQueryService) StreamHealthRegister(c chan<- *querypb.StreamHealthResponse) (int, error)
StreamHealthRegister is part of the queryservice.QueryService interface
func (*FakeQueryService) StreamHealthUnregister ¶
func (f *FakeQueryService) StreamHealthUnregister(int) error
StreamHealthUnregister is part of the queryservice.QueryService interface
func (*FakeQueryService) UpdateStream ¶
func (f *FakeQueryService) UpdateStream(ctx context.Context, target *querypb.Target, position string, timestamp int64, sendReply func(*querypb.StreamEvent) error) error
UpdateStream is part of the queryservice.QueryService interface