host

package
v1.2.15-prerelease11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 8, 2025 License: MIT Imports: 86 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var TestFlags struct {
	FrontendAddr          string
	PersistenceType       string
	SQLPluginName         string
	TestClusterConfigFile string
}

TestFlags contains the feature flags for integration tests

Functions

func NewClusterMetadata added in v0.20.0

func NewClusterMetadata(t *testing.T, options *TestClusterConfig) cluster.Metadata

NewClusterMetadata returns cluster metdata from config

func NewPersistenceTestCluster added in v0.20.0

func NewPersistenceTestCluster(t *testing.T, clusterConfig *TestClusterConfig) testcluster.PersistenceTestCluster

func NewSimpleResolver added in v0.24.0

func NewSimpleResolver(serviceName string, hosts map[string][]membership.HostInfo, currentHost membership.HostInfo) membership.Resolver

NewSimpleResolver returns a membership resolver interface

Types

type AdminClient added in v0.5.0

type AdminClient interface {
	admin.Client
}

AdminClient is the interface exposed by admin service client

func NewAdminClient added in v0.5.0

func NewAdminClient(d *yarpc.Dispatcher) AdminClient

NewAdminClient creates a client to cadence admin client

type ArchiverBase added in v0.7.0

type ArchiverBase struct {
	// contains filtered or unexported fields
}

ArchiverBase is a base struct for archiver provider being used in integration tests

type AsyncWFIntegrationSuite added in v1.2.8

type AsyncWFIntegrationSuite struct {
	*require.Assertions
	*IntegrationBase
}

type Cadence

type Cadence interface {
	Start() error
	Stop()
	GetAdminClient() adminClient.Client
	GetFrontendClient() frontendClient.Client
	FrontendHost() membership.HostInfo
	GetHistoryClient() historyClient.Client
	GetMatchingClient() matchingClient.Client
	GetMatchingClients() []matchingClient.Client
	GetExecutionManagerFactory() persistence.ExecutionManagerFactory
}

Cadence hosts all of cadence services in one process

func NewCadence

func NewCadence(params *CadenceParams) Cadence

NewCadence returns an instance that hosts full cadence in one process

type CadenceParams added in v0.5.7

type CadenceParams struct {
	ClusterMetadata               cluster.Metadata
	PersistenceConfig             config.Persistence
	MessagingClient               messaging.Client
	DomainManager                 persistence.DomainManager
	HistoryV2Mgr                  persistence.HistoryManager
	ExecutionMgrFactory           persistence.ExecutionManagerFactory
	DomainReplicationQueue        domain.ReplicationQueue
	Logger                        log.Logger
	ClusterNo                     int
	ArchiverMetadata              carchiver.ArchivalMetadata
	ArchiverProvider              provider.ArchiverProvider
	EnableReadHistoryFromArchival bool
	HistoryConfig                 *HistoryConfig
	MatchingConfig                *MatchingConfig
	ESConfig                      *config.ElasticSearchConfig
	ESClient                      elasticsearch.GenericClient
	WorkerConfig                  *WorkerConfig
	MockAdminClient               map[string]adminClient.Client
	DomainReplicationTaskExecutor domain.ReplicationTaskExecutor
	AuthorizationConfig           config.Authorization
	PinotConfig                   *config.PinotVisibilityConfig
	PinotClient                   pinot.GenericClient
	AsyncWFQueues                 map[string]config.AsyncWorkflowQueueProvider
	TimeSource                    clock.TimeSource

	FrontendDynCfgOverrides map[dynamicconfig.Key]interface{}
	HistoryDynCfgOverrides  map[dynamicconfig.Key]interface{}
	MatchingDynCfgOverrides map[dynamicconfig.Key]interface{}
	WorkerDynCfgOverrides   map[dynamicconfig.Key]interface{}
}

CadenceParams contains everything needed to bootstrap Cadence

type ClientIntegrationSuite added in v0.20.0

type ClientIntegrationSuite struct {
	// override suite.Suite.Assertions with require.Assertions; this means that s.NotNil(nil) will stop the test,
	// not merely log an error
	*require.Assertions
	*IntegrationBase
	// contains filtered or unexported fields
}

type FrontendClient added in v0.5.0

type FrontendClient interface {
	frontend.Client
}

FrontendClient is the interface exposed by frontend service client

func NewFrontendClient added in v0.5.0

func NewFrontendClient(d *yarpc.Dispatcher) FrontendClient

NewFrontendClient creates a client to cadence frontend client

type HistoryClient added in v0.9.3

type HistoryClient interface {
	history.Client
}

HistoryClient is the interface exposed by history service client

func NewHistoryClient added in v0.9.3

func NewHistoryClient(d *yarpc.Dispatcher) HistoryClient

NewHistoryClient creates a client to cadence history service client

type HistoryConfig added in v0.5.7

type HistoryConfig struct {
	// When MockClient is set, rest of the configs are ignored, history service is not started
	// and mock history client is passed to other services
	MockClient HistoryClient

	NumHistoryShards       int
	NumHistoryHosts        int
	HistoryCountLimitError int
	HistoryCountLimitWarn  int
}

HistoryConfig contains configs for history service

type IntegrationBase added in v0.4.0

type IntegrationBase struct {
	suite.Suite

	Logger log.Logger
	// contains filtered or unexported fields
}

IntegrationBase is a base struct for integration tests

func NewIntegrationBase added in v0.20.0

func NewIntegrationBase(params IntegrationBaseParams) *IntegrationBase

type IntegrationBaseParams added in v0.20.0

type IntegrationBaseParams struct {
	T                     *testing.T
	DefaultTestCluster    testcluster.PersistenceTestCluster
	VisibilityTestCluster testcluster.PersistenceTestCluster
	TestClusterConfig     *TestClusterConfig
}

type IntegrationSuite added in v0.20.0

type IntegrationSuite struct {
	// override suite.Suite.Assertions with require.Assertions; this means that s.NotNil(nil) will stop the test,
	// not merely log an error
	*require.Assertions
	*IntegrationBase
}

type MatchingClient added in v1.2.13

type MatchingClient interface {
	matching.Client
}

type MatchingConfig added in v1.2.13

type MatchingConfig struct {
	// number of matching host can be at most 4 due to existing static port assignments in onebox.go.
	// can be changed easily.
	NumMatchingHosts int
	SimulationConfig MatchingSimulationConfig
}

type MatchingSimulationConfig added in v1.2.13

type MatchingSimulationConfig struct {
	// Number of task list write partitions defaults to 1
	TaskListWritePartitions int

	// Number of task list read partitions defaults to 1
	TaskListReadPartitions int

	// At most N polls will be forwarded at a time. defaults to 20
	ForwarderMaxOutstandingPolls int

	// At most N tasks will be forwarded at a time. defaults to 1
	ForwarderMaxOutstandingTasks int

	// Forwarder rps limit defaults to 10
	ForwarderMaxRatePerSecond int

	// Children per node. defaults to 20
	ForwarderMaxChildrenPerNode int

	// LocalPollWaitTime. defaults to 0ms.
	LocalPollWaitTime time.Duration

	// LocalTaskWaitTime. defaults to 0ms.
	LocalTaskWaitTime time.Duration

	// RecordDecisionTaskStartedTime. The amount of time spent by History to complete RecordDecisionTaskStarted
	RecordDecisionTaskStartedTime time.Duration

	// TasklistLoadBalancerStrategy the strategy of load balancer. defaults to "random".
	TasklistLoadBalancerStrategy string

	// The pollers that will be created to process
	Pollers []SimulationPollerConfiguration

	Tasks []SimulationTaskConfiguration

	Backlogs []SimulationBacklogConfiguration

	// GetPartitionConfigFromDB indicates whether to get the partition config from DB or not.
	// This is a prerequisite for adaptive scaler.
	GetPartitionConfigFromDB bool

	// Adaptive scaler configurations
	EnableAdaptiveScaler                bool
	PartitionDownscaleFactor            float64
	PartitionUpscaleRPS                 int
	PartitionUpscaleSustainedDuration   time.Duration
	PartitionDownscaleSustainedDuration time.Duration
	AdaptiveScalerUpdateInterval        time.Duration
	QPSTrackerInterval                  time.Duration
	TaskIsolationDuration               time.Duration
}

type MatchingSimulationSuite added in v1.2.13

type MatchingSimulationSuite struct {
	*require.Assertions
	*IntegrationBase
}

type MessagingClientConfig added in v0.5.7

type MessagingClientConfig struct {
	UseMock     bool
	KafkaConfig *config.KafkaConfig
}

MessagingClientConfig is the config for messaging config

type Service added in v0.24.0

type Service interface {
	Start()
	Stop()
	GetLogger() log.Logger
	GetThrottledLogger() log.Logger
	GetMetricsClient() metrics.Client
	GetClientBean() client.Bean
	GetTimeSource() clock.TimeSource
	GetDispatcher() *yarpc.Dispatcher
	GetMembershipResolver() membership.Resolver
	GetHostInfo() membership.HostInfo
	GetClusterMetadata() cluster.Metadata
	GetMessagingClient() messaging.Client
	GetBlobstoreClient() blobstore.Client
	GetArchivalMetadata() archiver.ArchivalMetadata
	GetArchiverProvider() provider.ArchiverProvider
	GetPayloadSerializer() persistence.PayloadSerializer
}

Service is the interface which must be implemented by all the services TODO: Service contains many methods that are not used now that we have resource bean, these should be cleaned up

func NewService added in v0.24.0

func NewService(params *resource.Params) Service

NewService instantiates a Service Instance TODO: have a better name for Service.

type SimulationBacklogConfiguration

type SimulationBacklogConfiguration struct {
	// The partition number
	Partition int // Do not set it to 0, because it's not guaranteed to add backlog to partition 0
	// The backlog count
	BacklogCount int
	// The weight of each isolation group, can be empty
	IsolationGroups map[string]int
}

type SimulationPollerConfiguration added in v1.2.13

type SimulationPollerConfiguration struct {
	// The isolation group that pollers will be created with. Optional.
	IsolationGroup string
	// The number of pollers that will be created with this configuration. Defaults to 1
	NumPollers int
	// TaskProcessTime. The amount of time spent by the poller in-between requests. Defaults to 1ms
	TaskProcessTime time.Duration
	// Poll request timeout defaults to 15 seconds
	PollTimeout time.Duration
}

type SimulationTaskConfiguration added in v1.2.13

type SimulationTaskConfiguration struct {
	// The isolation groups that tasks will be evenly distributed between
	IsolationGroups []string

	// Number of task generators defaults to 1
	NumTaskGenerators int

	// Upper limit of tasks to generate. Task generators will stop if total number of tasks generated reaches MaxTaskToGenerate during simulation
	// Defaults to 2k
	MaxTaskToGenerate int

	// Task generation QPS. Defaults to 40.
	TasksPerSecond int

	// The burst value for the rate limiter for task generation. Controls the maximum number of AddTask requests
	// that can be sent concurrently. For example, if you have TasksPerSecond, TasksBurst, and NumTaskGenerators all
	// set to 10 then every second you'll get 10 tasks added right at the start of the second. If you instead set
	// TasksBurst to 1 then you'd get a steady stream of tasks, with one task every 100ms.
	TasksBurst int

	// OverTime is a list of TasksProduceSpec that will be used to change the qps over time.
	// Each item has a duration and they will be applied in the given order.
	// If this is set, TasksPerSecond and TasksBurst will be ignored.
	OverTime []TasksProduceSpec
}

type SizeLimitIntegrationSuite added in v0.20.0

type SizeLimitIntegrationSuite struct {
	// override suite.Suite.Assertions with require.Assertions; this means that s.NotNil(nil) will stop the test,
	// not merely log an error
	*require.Assertions
	*IntegrationBase
}

type TaskListIsolationIntegrationSuite

type TaskListIsolationIntegrationSuite struct {
	*require.Assertions
	*IntegrationBase
}

type TaskPoller added in v0.4.0

type TaskPoller struct {
	Engine                              FrontendClient
	Domain                              string
	TaskList                            *types.TaskList
	StickyTaskList                      *types.TaskList
	StickyScheduleToStartTimeoutSeconds *int32
	Identity                            string
	DecisionHandler                     decisionTaskHandler
	ActivityHandler                     activityTaskHandler
	QueryHandler                        queryHandler
	Logger                              log.Logger
	T                                   *testing.T
	CallOptions                         []yarpc.CallOption
}

TaskPoller is used in integration tests to poll decision or activity tasks

func (*TaskPoller) HandlePartialDecision added in v0.4.0

HandlePartialDecision for decision task

func (*TaskPoller) PollAndProcessActivityTask added in v0.4.0

func (p *TaskPoller) PollAndProcessActivityTask(dropTask bool) error

PollAndProcessActivityTask for activity tasks

func (*TaskPoller) PollAndProcessActivityTaskWithID added in v0.4.0

func (p *TaskPoller) PollAndProcessActivityTaskWithID(dropTask bool) error

PollAndProcessActivityTaskWithID is similar to PollAndProcessActivityTask but using RespondActivityTask...ByID

func (*TaskPoller) PollAndProcessDecisionTask added in v0.4.0

func (p *TaskPoller) PollAndProcessDecisionTask(dumpHistory bool, dropTask bool) (isQueryTask bool, err error)

PollAndProcessDecisionTask for decision tasks

func (*TaskPoller) PollAndProcessDecisionTaskWithAttempt added in v0.4.0

func (p *TaskPoller) PollAndProcessDecisionTaskWithAttempt(
	dumpHistory bool,
	dropTask bool,
	pollStickyTaskList bool,
	respondStickyTaskList bool,
	decisionAttempt int64,
) (isQueryTask bool, err error)

PollAndProcessDecisionTaskWithAttempt for decision tasks

func (*TaskPoller) PollAndProcessDecisionTaskWithAttemptAndRetry added in v0.4.0

func (p *TaskPoller) PollAndProcessDecisionTaskWithAttemptAndRetry(
	dumpHistory bool,
	dropTask bool,
	pollStickyTaskList bool,
	respondStickyTaskList bool,
	decisionAttempt int64,
	retryCount int,
) (isQueryTask bool, err error)

PollAndProcessDecisionTaskWithAttemptAndRetry for decision tasks

func (*TaskPoller) PollAndProcessDecisionTaskWithAttemptAndRetryAndForceNewDecision added in v0.4.0

func (p *TaskPoller) PollAndProcessDecisionTaskWithAttemptAndRetryAndForceNewDecision(
	dumpHistory bool,
	dropTask bool,
	pollStickyTaskList bool,
	respondStickyTaskList bool,
	decisionAttempt int64,
	retryCount int,
	forceCreateNewDecision bool,
	queryResult *types.WorkflowQueryResult,
) (isQueryTask bool, newTask *types.RespondDecisionTaskCompletedResponse, err error)

PollAndProcessDecisionTaskWithAttemptAndRetryAndForceNewDecision for decision tasks

func (*TaskPoller) PollAndProcessDecisionTaskWithSticky added in v0.4.0

func (p *TaskPoller) PollAndProcessDecisionTaskWithSticky(dumpHistory bool, dropTask bool) (isQueryTask bool, err error)

PollAndProcessDecisionTaskWithSticky for decision tasks

func (*TaskPoller) PollAndProcessDecisionTaskWithoutRetry added in v0.4.0

func (p *TaskPoller) PollAndProcessDecisionTaskWithoutRetry(dumpHistory bool, dropTask bool) (isQueryTask bool, err error)

PollAndProcessDecisionTaskWithoutRetry for decision tasks

func (*TaskPoller) PollAndProcessDecisions

func (p *TaskPoller) PollAndProcessDecisions() context.CancelFunc

type TasksProduceSpec

type TasksProduceSpec struct {
	// Task generation qps
	TasksPerSecond int

	// The burst value for the rate limiter for task generation.
	TasksBurst int

	// The duration for which the settings will be applied.
	// If the duration is unset, the settings will be applied indefinitely.
	Duration *time.Duration
}

type TestCluster added in v0.5.7

type TestCluster struct {
	// contains filtered or unexported fields
}

TestCluster is a base struct for integration tests

func NewCluster added in v0.5.7

func NewCluster(t *testing.T, options *TestClusterConfig, logger log.Logger, params persistencetests.TestBaseParams) (*TestCluster, error)

NewCluster creates and sets up the test cluster

func NewPinotTestCluster added in v1.2.5

func NewPinotTestCluster(t *testing.T, options *TestClusterConfig, logger log.Logger, params persistencetests.TestBaseParams) (*TestCluster, error)

func (*TestCluster) GetAdminClient added in v0.5.7

func (tc *TestCluster) GetAdminClient() AdminClient

GetAdminClient returns an admin client from the test cluster

func (*TestCluster) GetExecutionManagerFactory added in v0.11.0

func (tc *TestCluster) GetExecutionManagerFactory() persistence.ExecutionManagerFactory

GetExecutionManagerFactory returns an execution manager factory from the test cluster

func (*TestCluster) GetFrontendClient added in v0.5.7

func (tc *TestCluster) GetFrontendClient() FrontendClient

GetFrontendClient returns a frontend client from the test cluster

func (*TestCluster) GetHistoryClient added in v0.9.3

func (tc *TestCluster) GetHistoryClient() HistoryClient

GetHistoryClient returns a history client from the test cluster

func (*TestCluster) GetMatchingClient added in v1.2.13

func (tc *TestCluster) GetMatchingClient() MatchingClient

GetMatchingClient returns a matching client from the test cluster

func (*TestCluster) GetMatchingClients added in v1.2.14

func (tc *TestCluster) GetMatchingClients() []MatchingClient

func (*TestCluster) TearDownCluster added in v0.5.7

func (tc *TestCluster) TearDownCluster()

TearDownCluster tears down the test cluster

type TestClusterConfig added in v0.5.7

type TestClusterConfig struct {
	FrontendAddress       string
	EnableArchival        bool
	IsPrimaryCluster      bool
	ClusterNo             int
	ClusterGroupMetadata  config.ClusterGroupMetadata
	MessagingClientConfig *MessagingClientConfig
	Persistence           persistencetests.TestBaseOptions
	HistoryConfig         *HistoryConfig
	MatchingConfig        *MatchingConfig
	ESConfig              *config.ElasticSearchConfig
	WorkerConfig          *WorkerConfig
	MockAdminClient       map[string]adminClient.Client
	PinotConfig           *config.PinotVisibilityConfig
	AsyncWFQueues         map[string]config.AsyncWorkflowQueueProvider

	// TimeSource is used to override the time source of internal components.
	// Note that most components don't respect this, and it's only used in a few places.
	// e.g. async workflow test's consumer manager and domain manager
	TimeSource                     clock.MockedTimeSource
	FrontendDynamicConfigOverrides map[dynamicconfig.Key]interface{}
	HistoryDynamicConfigOverrides  map[dynamicconfig.Key]interface{}
	MatchingDynamicConfigOverrides map[dynamicconfig.Key]interface{}
	WorkerDynamicConfigOverrides   map[dynamicconfig.Key]interface{}
}

TestClusterConfig are config for a test cluster

func GetTestClusterConfig added in v0.5.7

func GetTestClusterConfig(configFile string) (*TestClusterConfig, error)

GetTestClusterConfig return test cluster config

func GetTestClusterConfigs added in v0.20.0

func GetTestClusterConfigs(configFile string) ([]*TestClusterConfig, error)

GetTestClusterConfigs return test cluster configs

type WorkerConfig added in v0.5.7

type WorkerConfig struct {
	EnableArchiver        bool
	EnableIndexer         bool
	EnableReplicator      bool
	EnableAsyncWFConsumer bool
}

WorkerConfig is the config for enabling/disabling cadence worker

type WorkflowIDInternalRateLimitIntegrationSuite added in v1.2.10

type WorkflowIDInternalRateLimitIntegrationSuite struct {
	*require.Assertions
	*IntegrationBase
}

type WorkflowIDRateLimitIntegrationSuite added in v1.2.9

type WorkflowIDRateLimitIntegrationSuite struct {
	*require.Assertions
	*IntegrationBase
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL