Documentation ¶
Index ¶
- Constants
- Variables
- func WaitUntil(t testing.TB, cond func() bool, errorMsg string, timeout time.Duration)
- type BaseClusterFixture
- func (b *BaseClusterFixture) GetClusterSize() int
- func (b *BaseClusterFixture) GetMembers() []*cluster.Cluster
- func (b *BaseClusterFixture) Initialize()
- func (b *BaseClusterFixture) RemoveNode(node *cluster.Cluster, graceful bool)
- func (b *BaseClusterFixture) ShutDown()
- func (b *BaseClusterFixture) SpawnNode() *cluster.Cluster
- type ClusterFixture
- type ClusterFixtureConfig
- type ClusterFixtureOption
- func WithClusterConfigure(configure func(*cluster.Config) *cluster.Config) ClusterFixtureOption
- func WithGetClusterKinds(getKinds func() []*cluster.Kind) ClusterFixtureOption
- func WithGetClusterProvider(getProvider func() cluster.ClusterProvider) ClusterFixtureOption
- func WithGetIdentityLookup(identityLookup func(clusterName string) cluster.IdentityLookup) ClusterFixtureOption
- func WithOnDeposing(onDeposing func()) ClusterFixtureOption
- type DataPublished
- type Delivery
- type InMemorySubscribersStore
- type PubSubClusterFixture
- func (p *PubSubClusterFixture) AppendDelivery(delivery Delivery)
- func (p *PubSubClusterFixture) ClearDeliveries()
- func (p *PubSubClusterFixture) GetSubscribersForTopic(topic string) (*cluster.Subscribers, error)
- func (p *PubSubClusterFixture) PublishData(topic string, data int) (*cluster.PublishResponse, error)
- func (p *PubSubClusterFixture) PublishDataBatch(topic string, data []int) (*cluster.PublishResponse, error)
- func (p *PubSubClusterFixture) RandomMember() *cluster.Cluster
- func (p *PubSubClusterFixture) SubscribeAllTo(topic string, subscriberIds []string)
- func (p *PubSubClusterFixture) SubscribeTo(topic, identity, kind string)
- func (p *PubSubClusterFixture) SubscriberIds(prefix string, count int) []string
- func (p *PubSubClusterFixture) UnSubscribeAllFrom(topic string, subscriberIds []string)
- func (p *PubSubClusterFixture) UnSubscribeTo(topic, identity, kind string)
- func (p *PubSubClusterFixture) VerifyAllSubscribersGotAllTheData(subscriberIds []string, numMessages int)
- type Response
Constants ¶
const ( PubSubSubscriberKind = "Subscriber" PubSubTimeoutSubscriberKind = "TimeoutSubscriber" )
const DefaultWaitTimeout = time.Second * 5
const InvalidIdentity string = "invalid"
Variables ¶
var File_pubsub_cluster_proto protoreflect.FileDescriptor
Functions ¶
Types ¶
type BaseClusterFixture ¶
type BaseClusterFixture struct {
// contains filtered or unexported fields
}
func NewBaseClusterFixture ¶
func NewBaseClusterFixture(clusterSize int, opts ...ClusterFixtureOption) *BaseClusterFixture
func NewBaseInMemoryClusterFixture ¶
func NewBaseInMemoryClusterFixture(clusterSize int, opts ...ClusterFixtureOption) *BaseClusterFixture
NewBaseInMemoryClusterFixture creates a new in memory cluster fixture
func (*BaseClusterFixture) GetClusterSize ¶
func (b *BaseClusterFixture) GetClusterSize() int
func (*BaseClusterFixture) GetMembers ¶
func (b *BaseClusterFixture) GetMembers() []*cluster.Cluster
func (*BaseClusterFixture) Initialize ¶
func (b *BaseClusterFixture) Initialize()
Initialize initializes the cluster fixture
func (*BaseClusterFixture) RemoveNode ¶
func (b *BaseClusterFixture) RemoveNode(node *cluster.Cluster, graceful bool)
func (*BaseClusterFixture) ShutDown ¶
func (b *BaseClusterFixture) ShutDown()
func (*BaseClusterFixture) SpawnNode ¶
func (b *BaseClusterFixture) SpawnNode() *cluster.Cluster
type ClusterFixture ¶
type ClusterFixtureConfig ¶
type ClusterFixtureOption ¶
type ClusterFixtureOption func(*ClusterFixtureConfig)
func WithClusterConfigure ¶
func WithClusterConfigure(configure func(*cluster.Config) *cluster.Config) ClusterFixtureOption
WithClusterConfigure sets the cluster configure function for the cluster fixture
func WithGetClusterKinds ¶
func WithGetClusterKinds(getKinds func() []*cluster.Kind) ClusterFixtureOption
WithGetClusterKinds sets the cluster kinds for the cluster fixture
func WithGetClusterProvider ¶
func WithGetClusterProvider(getProvider func() cluster.ClusterProvider) ClusterFixtureOption
WithGetClusterProvider sets the cluster provider for the cluster fixture
func WithGetIdentityLookup ¶
func WithGetIdentityLookup(identityLookup func(clusterName string) cluster.IdentityLookup) ClusterFixtureOption
WithGetIdentityLookup sets the identity lookup function for the cluster fixture
func WithOnDeposing ¶
func WithOnDeposing(onDeposing func()) ClusterFixtureOption
WithOnDeposing sets the on deposing function for the cluster fixture
type DataPublished ¶
type DataPublished struct { Data int32 `protobuf:"varint,1,opt,name=data,proto3" json:"data,omitempty"` // contains filtered or unexported fields }
func (*DataPublished) Descriptor
deprecated
func (*DataPublished) Descriptor() ([]byte, []int)
Deprecated: Use DataPublished.ProtoReflect.Descriptor instead.
func (*DataPublished) GetData ¶
func (x *DataPublished) GetData() int32
func (*DataPublished) ProtoMessage ¶
func (*DataPublished) ProtoMessage()
func (*DataPublished) ProtoReflect ¶
func (x *DataPublished) ProtoReflect() protoreflect.Message
func (*DataPublished) Reset ¶
func (x *DataPublished) Reset()
func (*DataPublished) String ¶
func (x *DataPublished) String() string
type InMemorySubscribersStore ¶
type InMemorySubscribersStore[T any] struct { // contains filtered or unexported fields }
func NewInMemorySubscriberStore ¶
func NewInMemorySubscriberStore() *InMemorySubscribersStore[*cluster.Subscribers]
func (*InMemorySubscribersStore[T]) Clear ¶
func (i *InMemorySubscribersStore[T]) Clear(_ context.Context, key string) error
type PubSubClusterFixture ¶
type PubSubClusterFixture struct { *BaseClusterFixture Deliveries []Delivery DeliveriesLock *sync.RWMutex // contains filtered or unexported fields }
func NewPubSubClusterFixture ¶
func NewPubSubClusterFixture(t testing.TB, clusterSize int, useDefaultTopicRegistration bool, opts ...ClusterFixtureOption) *PubSubClusterFixture
func (*PubSubClusterFixture) AppendDelivery ¶
func (p *PubSubClusterFixture) AppendDelivery(delivery Delivery)
AppendDelivery appends a delivery to the deliveries slice
func (*PubSubClusterFixture) ClearDeliveries ¶
func (p *PubSubClusterFixture) ClearDeliveries()
ClearDeliveries clears the deliveries
func (*PubSubClusterFixture) GetSubscribersForTopic ¶
func (p *PubSubClusterFixture) GetSubscribersForTopic(topic string) (*cluster.Subscribers, error)
GetSubscribersForTopic returns the subscribers for the given topic
func (*PubSubClusterFixture) PublishData ¶
func (p *PubSubClusterFixture) PublishData(topic string, data int) (*cluster.PublishResponse, error)
PublishData publishes the given message to the given topic
func (*PubSubClusterFixture) PublishDataBatch ¶
func (p *PubSubClusterFixture) PublishDataBatch(topic string, data []int) (*cluster.PublishResponse, error)
PublishDataBatch publishes the given messages to the given topic
func (*PubSubClusterFixture) RandomMember ¶
func (p *PubSubClusterFixture) RandomMember() *cluster.Cluster
func (*PubSubClusterFixture) SubscribeAllTo ¶
func (p *PubSubClusterFixture) SubscribeAllTo(topic string, subscriberIds []string)
SubscribeAllTo subscribes all the given subscribers to the given topic
func (*PubSubClusterFixture) SubscribeTo ¶
func (p *PubSubClusterFixture) SubscribeTo(topic, identity, kind string)
SubscribeTo subscribes the given subscriber to the given topic
func (*PubSubClusterFixture) SubscriberIds ¶
func (p *PubSubClusterFixture) SubscriberIds(prefix string, count int) []string
SubscriberIds returns the subscriber ids
func (*PubSubClusterFixture) UnSubscribeAllFrom ¶
func (p *PubSubClusterFixture) UnSubscribeAllFrom(topic string, subscriberIds []string)
UnSubscribeAllFrom unsubscribes all the given subscribers from the given topic
func (*PubSubClusterFixture) UnSubscribeTo ¶
func (p *PubSubClusterFixture) UnSubscribeTo(topic, identity, kind string)
UnSubscribeTo unsubscribes the given subscriber from the given topic
func (*PubSubClusterFixture) VerifyAllSubscribersGotAllTheData ¶
func (p *PubSubClusterFixture) VerifyAllSubscribersGotAllTheData(subscriberIds []string, numMessages int)
VerifyAllSubscribersGotAllTheData verifies that all subscribers got all the data
type Response ¶
type Response struct {
// contains filtered or unexported fields
}
func (*Response) Descriptor
deprecated
func (*Response) ProtoMessage ¶
func (*Response) ProtoMessage()
func (*Response) ProtoReflect ¶
func (x *Response) ProtoReflect() protoreflect.Message