Documentation
¶
Index ¶
- func ErrorEqual(got, want error) bool
- func ErrorHasCode(got error, wantCode codes.Code) bool
- func ErrorHasMsg(got error, wantStr string) bool
- func MakeAny(msg proto.Message) *anypb.Any
- func RandomLiteZone() string
- type Barrier
- type Condition
- type DuplicateMsgDetector
- type FakeSource
- type MockServer
- type MsgTracker
- type OrderingReceiver
- type OrderingSender
- type RPCVerifier
- func (v *RPCVerifier) Flush()
- func (v *RPCVerifier) Pop(gotRequest interface{}) (interface{}, error)
- func (v *RPCVerifier) Push(wantRequest interface{}, retResponse interface{}, retErr error)
- func (v *RPCVerifier) PushWithBarrier(wantRequest interface{}, retResponse interface{}, retErr error) *Barrier
- func (v *RPCVerifier) TryPop() (bool, interface{}, error)
- type Server
- type Verifiers
- func (tv *Verifiers) AddAssignmentStream(subscription string, streamVerifier *RPCVerifier)
- func (tv *Verifiers) AddCommitStream(subscription string, partition int, streamVerifier *RPCVerifier)
- func (tv *Verifiers) AddPublishStream(topic string, partition int, streamVerifier *RPCVerifier)
- func (tv *Verifiers) AddSubscribeStream(subscription string, partition int, streamVerifier *RPCVerifier)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ErrorEqual ¶
ErrorEqual compares two errors for equivalence.
func ErrorHasCode ¶
ErrorHasCode returns true if an error has the desired canonical code.
func ErrorHasMsg ¶
ErrorHasMsg returns true if an error message contains the desired substring.
func RandomLiteZone ¶ added in v0.5.0
func RandomLiteZone() string
RandomLiteZone chooses a random Pub/Sub Lite zone for integration tests.
Types ¶
type Barrier ¶ added in v0.4.0
type Barrier struct {
// contains filtered or unexported fields
}
Barrier is used to perform two-way synchronization betwen the server and client (test) to ensure tests are deterministic.
func (*Barrier) Release ¶ added in v0.4.0
func (b *Barrier) Release()
Release should be called by the test.
func (*Barrier) ReleaseAfter ¶ added in v0.9.0
func (b *Barrier) ReleaseAfter(f func())
ReleaseAfter releases the barrier, after invoking f provided by the test.
type Condition ¶ added in v0.9.0
type Condition struct {
// contains filtered or unexported fields
}
Condition allows tests to wait for some event to occur, or check that it has not occurred.
func NewCondition ¶ added in v0.9.0
NewCondition creates a new condition.
func (*Condition) SetDone ¶ added in v0.9.0
func (c *Condition) SetDone()
SetDone marks the condition as done.
func (*Condition) VerifyNotDone ¶ added in v0.9.0
VerifyNotDone checks that the condition is not done.
type DuplicateMsgDetector ¶ added in v0.4.0
type DuplicateMsgDetector struct {
// contains filtered or unexported fields
}
DuplicateMsgDetector can be used to detect duplicate messages, either due to duplicate publishes or receives.
func NewDuplicateMsgDetector ¶ added in v0.4.0
func NewDuplicateMsgDetector() *DuplicateMsgDetector
NewDuplicateMsgDetector creates a new DuplicateMsgDetector.
func (*DuplicateMsgDetector) HasPublishDuplicates ¶ added in v0.4.0
func (dm *DuplicateMsgDetector) HasPublishDuplicates() bool
HasPublishDuplicates returns true if duplicate published messages were detected.
func (*DuplicateMsgDetector) HasReceiveDuplicates ¶ added in v0.4.0
func (dm *DuplicateMsgDetector) HasReceiveDuplicates() bool
HasReceiveDuplicates returns true if duplicate received messages were detected.
func (*DuplicateMsgDetector) Receive ¶ added in v0.4.0
func (dm *DuplicateMsgDetector) Receive(data string, offset int64)
Receive checks the given message data and offset.
func (*DuplicateMsgDetector) Status ¶ added in v0.4.0
func (dm *DuplicateMsgDetector) Status() string
Status returns a non-empty status string if there were duplicates detected.
type FakeSource ¶
type FakeSource struct {
Ret int64
}
FakeSource is a fake source that returns a configurable constant.
func (*FakeSource) Int63 ¶
func (f *FakeSource) Int63() int64
Int63 returns the configured fake random number.
type MockServer ¶
type MockServer interface { // OnTestStart must be called at the start of each test to clear any existing // state and set the test verifiers. OnTestStart(*Verifiers) // OnTestEnd should be called at the end of each test to flush the verifiers // (i.e. check whether any expected requests were not sent to the server). OnTestEnd() }
MockServer is an in-memory mock implementation of a Pub/Sub Lite service, which allows unit tests to inspect requests received by the server and send fake responses. This is the interface that should be used by tests.
type MsgTracker ¶ added in v0.4.0
type MsgTracker struct {
// contains filtered or unexported fields
}
MsgTracker is a helper for checking whether a set of messages make a full round trip from publisher to subscriber.
Add() registers published messages. Remove() should be called when messages are received by subscribers. Call Wait() to block until all tracked messages are received. The same MsgTracker instance can be reused to repeat this sequence for multiple test cycles.
Add() and Remove() calls should not be interleaved.
func NewMsgTracker ¶ added in v0.4.0
func NewMsgTracker() *MsgTracker
NewMsgTracker creates a new message tracker.
func (*MsgTracker) Add ¶ added in v0.4.0
func (mt *MsgTracker) Add(msgs ...string)
Add a set of tracked messages.
func (*MsgTracker) Empty ¶ added in v0.5.0
func (mt *MsgTracker) Empty() bool
Empty returns true if there are no tracked messages remaining.
func (*MsgTracker) Remove ¶ added in v0.4.0
func (mt *MsgTracker) Remove(msg string) bool
Remove and return true if `msg` is tracked. Signals the `done` channel once all messages have been received.
func (*MsgTracker) Status ¶ added in v0.5.0
func (mt *MsgTracker) Status() error
Status returns an error if there are tracked messages remaining.
type OrderingReceiver ¶ added in v0.4.0
type OrderingReceiver struct {
// contains filtered or unexported fields
}
OrderingReceiver consumes a message string generated by OrderingSender and verifies that messages in a partition are ordered. It is used in conjunction with Subscribers.
func NewOrderingReceiver ¶ added in v0.4.0
func NewOrderingReceiver() *OrderingReceiver
NewOrderingReceiver creates a new OrderingReceiver.
func (*OrderingReceiver) Receive ¶ added in v0.4.0
func (or *OrderingReceiver) Receive(data, key string) error
Receive checks the given message data and key and returns an error if unordered messages are detected.
Note: a normal scenario resulting in unordered messages is when the Publish stream breaks while there are in-flight batches, which are resent upon stream reconnect. Use DuplicateMsgDetector if it is undesirable to fail a test.
type OrderingSender ¶ added in v0.4.0
type OrderingSender struct {
TotalMsgCount int64
}
OrderingSender generates strings containing a message index to use for verifying message ordering. It is used on conjunction with Publishers.
func NewOrderingSender ¶ added in v0.4.0
func NewOrderingSender() *OrderingSender
NewOrderingSender creats a new OrderingSender.
func (*OrderingSender) Next ¶ added in v0.4.0
func (os *OrderingSender) Next(prefix string) string
Next generates the next string to publish.
type RPCVerifier ¶
type RPCVerifier struct {
// contains filtered or unexported fields
}
RPCVerifier stores an queue of requests expected from the client, and the corresponding response or error to return.
func NewRPCVerifier ¶
func NewRPCVerifier(t *testing.T) *RPCVerifier
NewRPCVerifier creates a new verifier for requests received by the server.
func (*RPCVerifier) Flush ¶
func (v *RPCVerifier) Flush()
Flush logs an error for any remaining {request, response, error} tuples, in case the client terminated early.
func (*RPCVerifier) Pop ¶
func (v *RPCVerifier) Pop(gotRequest interface{}) (interface{}, error)
Pop validates the received request with the next {request, response, error} tuple.
func (*RPCVerifier) Push ¶
func (v *RPCVerifier) Push(wantRequest interface{}, retResponse interface{}, retErr error)
Push appends a new {request, response, error} tuple.
Valid combinations for unary and streaming RPCs: - {request, response, nil} - {request, nil, error}
Additional combinations for streams only: - {nil, response, nil}: send a response without a request (e.g. messages). - {nil, nil, error}: break the stream without a request. - {request, nil, nil}: expect a request, but don't send any response.
func (*RPCVerifier) PushWithBarrier ¶ added in v0.4.0
func (v *RPCVerifier) PushWithBarrier(wantRequest interface{}, retResponse interface{}, retErr error) *Barrier
PushWithBarrier is like Push, but returns a barrier that the test should call Release when it would like the response to be sent to the client. This is useful for synchronizing with work that needs to be done on the client.
func (*RPCVerifier) TryPop ¶
func (v *RPCVerifier) TryPop() (bool, interface{}, error)
TryPop should be used only for streams. It checks whether the request in the next tuple is nil, in which case the response or error should be returned to the client without waiting for a request. Useful for streams where the server continuously sends data (e.g. subscribe stream).
type Server ¶
type Server struct { LiteServer MockServer // contains filtered or unexported fields }
Server is a mock Pub/Sub Lite server that can be used for unit testing.
func (*Server) ClientConn ¶ added in v0.6.0
func (s *Server) ClientConn() option.ClientOption
ClientConn creates a client connection to the gRPC test server.
type Verifiers ¶ added in v0.4.0
type Verifiers struct { // Global list of verifiers for all unary RPCs. GlobalVerifier *RPCVerifier // contains filtered or unexported fields }
Verifiers contains RPCVerifiers for unary RPCs and streaming RPCs.
func NewVerifiers ¶ added in v0.4.0
NewVerifiers creates a new instance of Verifiers for a test.
func (*Verifiers) AddAssignmentStream ¶ added in v0.4.0
func (tv *Verifiers) AddAssignmentStream(subscription string, streamVerifier *RPCVerifier)
AddAssignmentStream adds verifiers for an assignment stream.
func (*Verifiers) AddCommitStream ¶ added in v0.4.0
func (tv *Verifiers) AddCommitStream(subscription string, partition int, streamVerifier *RPCVerifier)
AddCommitStream adds verifiers for a commit stream.
func (*Verifiers) AddPublishStream ¶ added in v0.4.0
func (tv *Verifiers) AddPublishStream(topic string, partition int, streamVerifier *RPCVerifier)
AddPublishStream adds verifiers for a publish stream.
func (*Verifiers) AddSubscribeStream ¶ added in v0.4.0
func (tv *Verifiers) AddSubscribeStream(subscription string, partition int, streamVerifier *RPCVerifier)
AddSubscribeStream adds verifiers for a subscribe stream.