cluster_test_tool

package
v0.0.0-...-99a34e4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 28, 2024 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	PubSubSubscriberKind        = "Subscriber"
	PubSubTimeoutSubscriberKind = "TimeoutSubscriber"
)
View Source
const DefaultWaitTimeout = time.Second * 5
View Source
const InvalidIdentity string = "invalid"

Variables

View Source
var File_pubsub_cluster_proto protoreflect.FileDescriptor

Functions

func WaitUntil

func WaitUntil(t testing.TB, cond func() bool, errorMsg string, timeout time.Duration)

Types

type BaseClusterFixture

type BaseClusterFixture struct {
	// contains filtered or unexported fields
}

func NewBaseClusterFixture

func NewBaseClusterFixture(clusterSize int, opts ...ClusterFixtureOption) *BaseClusterFixture

func NewBaseInMemoryClusterFixture

func NewBaseInMemoryClusterFixture(clusterSize int, opts ...ClusterFixtureOption) *BaseClusterFixture

NewBaseInMemoryClusterFixture creates a new in memory cluster fixture

func (*BaseClusterFixture) GetClusterSize

func (b *BaseClusterFixture) GetClusterSize() int

func (*BaseClusterFixture) GetMembers

func (b *BaseClusterFixture) GetMembers() []*cluster.Cluster

func (*BaseClusterFixture) Initialize

func (b *BaseClusterFixture) Initialize()

Initialize initializes the cluster fixture

func (*BaseClusterFixture) RemoveNode

func (b *BaseClusterFixture) RemoveNode(node *cluster.Cluster, graceful bool)

func (*BaseClusterFixture) ShutDown

func (b *BaseClusterFixture) ShutDown()

func (*BaseClusterFixture) SpawnNode

func (b *BaseClusterFixture) SpawnNode() *cluster.Cluster

type ClusterFixture

type ClusterFixture interface {
	GetMembers() []*cluster.Cluster
	GetClusterSize() int
	SpawnNode() *cluster.Cluster
	RemoveNode(node *cluster.Cluster, graceful bool)
	ShutDown()
}

type ClusterFixtureConfig

type ClusterFixtureConfig struct {
	GetClusterKinds    func() []*cluster.Kind
	GetClusterProvider func() cluster.ClusterProvider
	Configure          func(*cluster.Config) *cluster.Config
	GetIdentityLookup  func(clusterName string) cluster.IdentityLookup
	OnDeposing         func()
}

type ClusterFixtureOption

type ClusterFixtureOption func(*ClusterFixtureConfig)

func WithClusterConfigure

func WithClusterConfigure(configure func(*cluster.Config) *cluster.Config) ClusterFixtureOption

WithClusterConfigure sets the cluster configure function for the cluster fixture

func WithGetClusterKinds

func WithGetClusterKinds(getKinds func() []*cluster.Kind) ClusterFixtureOption

WithGetClusterKinds sets the cluster kinds for the cluster fixture

func WithGetClusterProvider

func WithGetClusterProvider(getProvider func() cluster.ClusterProvider) ClusterFixtureOption

WithGetClusterProvider sets the cluster provider for the cluster fixture

func WithGetIdentityLookup

func WithGetIdentityLookup(identityLookup func(clusterName string) cluster.IdentityLookup) ClusterFixtureOption

WithGetIdentityLookup sets the identity lookup function for the cluster fixture

func WithOnDeposing

func WithOnDeposing(onDeposing func()) ClusterFixtureOption

WithOnDeposing sets the on deposing function for the cluster fixture

type DataPublished

type DataPublished struct {
	Data int32 `protobuf:"varint,1,opt,name=data,proto3" json:"data,omitempty"`
	// contains filtered or unexported fields
}

func (*DataPublished) Descriptor deprecated

func (*DataPublished) Descriptor() ([]byte, []int)

Deprecated: Use DataPublished.ProtoReflect.Descriptor instead.

func (*DataPublished) GetData

func (x *DataPublished) GetData() int32

func (*DataPublished) ProtoMessage

func (*DataPublished) ProtoMessage()

func (*DataPublished) ProtoReflect

func (x *DataPublished) ProtoReflect() protoreflect.Message

func (*DataPublished) Reset

func (x *DataPublished) Reset()

func (*DataPublished) String

func (x *DataPublished) String() string

type Delivery

type Delivery struct {
	Identity string
	Data     int
}

type InMemorySubscribersStore

type InMemorySubscribersStore[T any] struct {
	// contains filtered or unexported fields
}

func NewInMemorySubscriberStore

func NewInMemorySubscriberStore() *InMemorySubscribersStore[*cluster.Subscribers]

func (*InMemorySubscribersStore[T]) Clear

func (i *InMemorySubscribersStore[T]) Clear(_ context.Context, key string) error

func (*InMemorySubscribersStore[T]) Get

func (i *InMemorySubscribersStore[T]) Get(_ context.Context, key string) (T, error)

func (*InMemorySubscribersStore[T]) Set

func (i *InMemorySubscribersStore[T]) Set(_ context.Context, key string, value T) error

type PubSubClusterFixture

type PubSubClusterFixture struct {
	*BaseClusterFixture

	Deliveries     []Delivery
	DeliveriesLock *sync.RWMutex
	// contains filtered or unexported fields
}

func NewPubSubClusterFixture

func NewPubSubClusterFixture(t testing.TB, clusterSize int, useDefaultTopicRegistration bool, opts ...ClusterFixtureOption) *PubSubClusterFixture

func (*PubSubClusterFixture) AppendDelivery

func (p *PubSubClusterFixture) AppendDelivery(delivery Delivery)

AppendDelivery appends a delivery to the deliveries slice

func (*PubSubClusterFixture) ClearDeliveries

func (p *PubSubClusterFixture) ClearDeliveries()

ClearDeliveries clears the deliveries

func (*PubSubClusterFixture) GetSubscribersForTopic

func (p *PubSubClusterFixture) GetSubscribersForTopic(topic string) (*cluster.Subscribers, error)

GetSubscribersForTopic returns the subscribers for the given topic

func (*PubSubClusterFixture) PublishData

func (p *PubSubClusterFixture) PublishData(topic string, data int) (*cluster.PublishResponse, error)

PublishData publishes the given message to the given topic

func (*PubSubClusterFixture) PublishDataBatch

func (p *PubSubClusterFixture) PublishDataBatch(topic string, data []int) (*cluster.PublishResponse, error)

PublishDataBatch publishes the given messages to the given topic

func (*PubSubClusterFixture) RandomMember

func (p *PubSubClusterFixture) RandomMember() *cluster.Cluster

func (*PubSubClusterFixture) SubscribeAllTo

func (p *PubSubClusterFixture) SubscribeAllTo(topic string, subscriberIds []string)

SubscribeAllTo subscribes all the given subscribers to the given topic

func (*PubSubClusterFixture) SubscribeTo

func (p *PubSubClusterFixture) SubscribeTo(topic, identity, kind string)

SubscribeTo subscribes the given subscriber to the given topic

func (*PubSubClusterFixture) SubscriberIds

func (p *PubSubClusterFixture) SubscriberIds(prefix string, count int) []string

SubscriberIds returns the subscriber ids

func (*PubSubClusterFixture) UnSubscribeAllFrom

func (p *PubSubClusterFixture) UnSubscribeAllFrom(topic string, subscriberIds []string)

UnSubscribeAllFrom unsubscribes all the given subscribers from the given topic

func (*PubSubClusterFixture) UnSubscribeTo

func (p *PubSubClusterFixture) UnSubscribeTo(topic, identity, kind string)

UnSubscribeTo unsubscribes the given subscriber from the given topic

func (*PubSubClusterFixture) VerifyAllSubscribersGotAllTheData

func (p *PubSubClusterFixture) VerifyAllSubscribersGotAllTheData(subscriberIds []string, numMessages int)

VerifyAllSubscribersGotAllTheData verifies that all subscribers got all the data

type Response

type Response struct {
	// contains filtered or unexported fields
}

func (*Response) Descriptor deprecated

func (*Response) Descriptor() ([]byte, []int)

Deprecated: Use Response.ProtoReflect.Descriptor instead.

func (*Response) ProtoMessage

func (*Response) ProtoMessage()

func (*Response) ProtoReflect

func (x *Response) ProtoReflect() protoreflect.Message

func (*Response) Reset

func (x *Response) Reset()

func (*Response) String

func (x *Response) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL