service

package
v1.2.1-20240709 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 9, 2024 License: Apache-2.0 Imports: 40 Imported by: 0

Documentation

Overview

Package integration implements an integration test framework. Via this package, one could start a cluster easily. It supports different operations for simulation purpose, and it also provides different ways to check cluster state.

Index

Constants

View Source
const (
	// The expected number of tn replicas.
	TNShardExpectedSize = 1
)

Variables

This section is empty.

Functions

func ParseExpectedLogShardCount

func ParseExpectedLogShardCount(cluster pb.ClusterInfo) int

ParseExpectedLogShardCount returns the expected count of log shards.

func ParseExpectedTNShardCount added in v1.0.0

func ParseExpectedTNShardCount(cluster pb.ClusterInfo) int

ParseExpectedTNShardCount returns the expected count of tn shards.

func ParseLogShardExpectedSize

func ParseLogShardExpectedSize(shardID uint64, cluster pb.ClusterInfo) int

ParseLogShardExpectedSize returns the expected count of log replicas.

func ParseLogShardReportedSize

func ParseLogShardReportedSize(
	shardID uint64, state pb.LogState, hkcfg hakeeper.Config, currTick uint64,
) int

ParseLogShardReportedSize returns the reported count of log replicas.

func ParseReportedLogShardCount

func ParseReportedLogShardCount(
	state pb.LogState, hkcfg hakeeper.Config, currTick uint64,
) int

ParseReportedLogShardCount returns the reported count of log shards.

func ParseReportedTNShardCount added in v1.0.0

func ParseReportedTNShardCount(
	state pb.TNState, hkcfg hakeeper.Config, currTick uint64,
) int

ParseReportedTNShardCount returns the reported count of tn shards.

func ParseTNShardReportedSize added in v1.0.0

func ParseTNShardReportedSize(
	shardID uint64, state pb.TNState, hkcfg hakeeper.Config, currTick uint64,
) int

ParseTNShardReportedSize returns the reported count of tn replicas.

Types

type CNService

type CNService interface {
	// Start sends heartbeat and start to handle command.
	Start() error
	// Close stops store
	Close() error
	// Status returns the status of service.
	Status() ServiceStatus

	// ID returns uuid of store
	ID() string
	// SQLAddress returns the sql listen address
	SQLAddress() string
	//GetTaskRunner returns the taskRunner.
	GetTaskRunner() taskservice.TaskRunner
	// GetTaskService returns the taskservice
	GetTaskService() (taskservice.TaskService, bool)
	// GetSQLExecutor returns sql executor
	GetSQLExecutor() executor.SQLExecutor
	// GetBootstrapService returns bootstrap service
	GetBootstrapService() bootstrap.Service
	//SetCancel sets CancelFunc to stop GetClusterDetailsFromHAKeeper
	SetCancel(context.CancelFunc)
}

CNService describes expected behavior for tn service.

type Cluster

type Cluster interface {
	// Start starts svcs sequentially, after start, system init is completed.
	Start() error
	// Close stops svcs sequentially
	Close() error
	// Options returns the adjusted options
	Options() Options
	// Clock get cluster clock
	Clock() clock.Clock

	ClusterOperation
	ClusterAwareness
	ClusterState
	ClusterWaitState
}

Cluster describes behavior of test framework.

func NewCluster

func NewCluster(ctx context.Context, t *testing.T, opt Options) (Cluster, error)

NewCluster construct a cluster for integration test.

type ClusterAwareness

type ClusterAwareness interface {
	// ListTNServices lists uuid of all tn services.
	ListTNServices() []string
	// ListLogServices lists uuid of all log services.
	ListLogServices() []string
	// ListCnServices lists uuid of all cn services.
	ListCnServices() []string
	// ListHAKeeperServices lists all hakeeper log services.
	ListHAKeeperServices() []LogService

	// GetTNService fetches tn service instance by uuid.
	GetTNService(uuid string) (TNService, error)
	// GetLogService fetches log service instance by index.
	GetLogService(uuid string) (LogService, error)
	// GetTNServiceIndexed fetches tn service instance by uuid.
	GetTNServiceIndexed(index int) (TNService, error)
	// GetLogServiceIndexed fetches log service instance by index.
	GetLogServiceIndexed(index int) (LogService, error)
	// GetCNService fetches cn service instance by index.
	GetCNService(uuid string) (CNService, error)
	// GetCNServiceIndexed fetches cn service instance by index.
	GetCNServiceIndexed(index int) (CNService, error)

	// GetClusterState fetches current cluster state
	GetClusterState(ctx context.Context) (*logpb.CheckerState, error)
}

ClusterAwareness provides cluster awareness information.

type ClusterOperation

type ClusterOperation interface {
	// CloseTNService closes tn service by uuid.
	CloseTNService(uuid string) error
	// StartTNService starts tn service by uuid.
	StartTNService(uuid string) error

	// CloseTNServiceIndexed closes tn service by its index.
	CloseTNServiceIndexed(index int) error
	// StartTNServiceIndexed starts tn service by its index.
	StartTNServiceIndexed(index int) error

	// CloseLogService closes log service by uuid.
	CloseLogService(uuid string) error
	// StartLogService starts log service by uuid.
	StartLogService(uuid string) error

	// CloseLogServiceIndexed closes log service by its index.
	CloseLogServiceIndexed(index int) error
	// StartLogServiceIndexed starts log service by its index.
	StartLogServiceIndexed(index int) error

	// CloseCNService closes cn service by uuid.
	CloseCNService(uuid string) error
	// StartCNService starts cn service by uuid.
	StartCNService(uuid string) error

	// CloseCNServiceIndexed closes cn service by its index.
	CloseCNServiceIndexed(index int) error
	// StartCNServiceIndexed starts cn service by its index.
	StartCNServiceIndexed(index int) error

	// StartCNServices start number of cn services.
	StartCNServices(n int) error

	// NewNetworkPartition constructs network partition from service index.
	NewNetworkPartition(tnIndexes, logIndexes, cnIndexes []uint32) NetworkPartition
	// RemainingNetworkPartition returns partition for the remaining services.
	RemainingNetworkPartition(partitions ...NetworkPartition) NetworkPartition
	// StartNetworkPartition enables network partition feature.
	StartNetworkPartition(partitions ...NetworkPartition)
	// CloseNetworkPartition disables network partition feature.
	CloseNetworkPartition()
}

ClusterOperation supports kinds of cluster operations.

type ClusterState

type ClusterState interface {
	// ListTNShards lists all tn shards within the cluster.
	ListTNShards(ctx context.Context) ([]metadata.TNShardRecord, error)
	// ListLogShards lists all log shards within the cluster.
	ListLogShards(ctx context.Context) ([]metadata.LogShardRecord, error)

	// GetTNStoreInfo gets tn store information by uuid.
	GetTNStoreInfo(ctx context.Context, uuid string) (logpb.TNStoreInfo, error)
	// GetTNStoreInfoIndexed gets tn store information by index.
	GetTNStoreInfoIndexed(ctx context.Context, index int) (logpb.TNStoreInfo, error)

	// GetLogStoreInfo gets log store information by uuid.
	GetLogStoreInfo(ctx context.Context, uuid string) (logpb.LogStoreInfo, error)
	// GetLogStoreInfoIndexed gets log store information by index.
	GetLogStoreInfoIndexed(ctx context.Context, index int) (logpb.LogStoreInfo, error)

	// GetCNStoreInfo gets cn store information by uuid.
	GetCNStoreInfo(ctx context.Context, uuid string) (logpb.CNStoreInfo, error)
	// GetCNStoreInfoIndexed gets cn store information by index.
	GetCNStoreInfoIndexed(ctx context.Context, index int) (logpb.CNStoreInfo, error)

	// GetHAKeeperState returns hakeeper state from running hakeeper.
	GetHAKeeperState() logpb.HAKeeperState
	// GetHAKeeperConfig returns hakeeper configuration.
	GetHAKeeperConfig() hakeeper.Config

	// TNStoreExpired checks tn store expired or not by uuid.
	TNStoreExpired(uuid string) (bool, error)
	// TNStoreExpiredIndexed checks tn store expired or not by index.
	TNStoreExpiredIndexed(index int) (bool, error)
	// LogStoreExpired checks log store expired or not by uuid.
	LogStoreExpired(uuid string) (bool, error)
	// LogStoreExpiredIndexed checks log store expired or not by index.
	LogStoreExpiredIndexed(index int) (bool, error)
	// CNStoreExpired checks cn store expired or not by uuid.
	CNStoreExpired(uuid string) (bool, error)
	// CNStoreExpiredIndexed checks cn store expired or not by index.
	CNStoreExpiredIndexed(index int) (bool, error)

	// IsClusterHealthy checks whether cluster is healthy or not.
	IsClusterHealthy() bool
}

ClusterState provides cluster running state.

type ClusterWaitState

type ClusterWaitState interface {
	// WaitHAKeeperLeader waits hakeeper leader elected and return it.
	WaitHAKeeperLeader(ctx context.Context) LogService
	// WaitHAKeeperState waits the specific hakeeper state.
	WaitHAKeeperState(ctx context.Context, expected logpb.HAKeeperState)

	// WaitTNShardsReported waits the expected count of tn shards reported.
	WaitTNShardsReported(ctx context.Context)
	// WaitLogShardsReported waits the expected count of log shards reported.
	WaitLogShardsReported(ctx context.Context)
	// WaitTNReplicaReported waits tn replica reported.
	WaitTNReplicaReported(ctx context.Context, shardID uint64)
	// WaitLogReplicaReported waits log replicas reported.
	WaitLogReplicaReported(ctx context.Context, shardID uint64)

	// WaitTNStoreTimeout waits tn store timeout by uuid.
	WaitTNStoreTimeout(ctx context.Context, uuid string)
	// WaitTNStoreTimeoutIndexed waits tn store timeout by index.
	WaitTNStoreTimeoutIndexed(ctx context.Context, index int)
	// WaitTNStoreReported waits tn store reported by uuid.
	WaitTNStoreReported(ctx context.Context, uuid string)
	// WaitTNStoreReportedIndexed waits tn store reported by index.
	WaitTNStoreReportedIndexed(ctx context.Context, index int)
	// WaitTNStoreTaskServiceCreated waits tn store task service started by uuid.
	WaitTNStoreTaskServiceCreated(ctx context.Context, uuid string)
	// WaitTNStoreTaskServiceCreatedIndexed waits tn store task service started by index.
	WaitTNStoreTaskServiceCreatedIndexed(ctx context.Context, index int)
	// WaitCNStoreReported waits cn store reported by uuid.
	WaitCNStoreReported(ctx context.Context, uuid string)
	// WaitCNStoreReportedIndexed waits cn store reported by index.
	WaitCNStoreReportedIndexed(ctx context.Context, index int)
	// WaitCNStoreTaskServiceCreated waits cn store task service started by uuid.
	WaitCNStoreTaskServiceCreated(ctx context.Context, uuid string)
	// WaitCNStoreTaskServiceCreatedIndexed waits cn store task service started by index.
	WaitCNStoreTaskServiceCreatedIndexed(ctx context.Context, index int)
	// WaitLogStoreTaskServiceCreated waits log store task service started by uuid
	WaitLogStoreTaskServiceCreated(ctx context.Context, uuid string)
	// WaitLogStoreTaskServiceCreatedIndexed waits log store task service started by index
	WaitLogStoreTaskServiceCreatedIndexed(ctx context.Context, index int)

	// WaitLogStoreTimeout waits log store timeout by uuid.
	WaitLogStoreTimeout(ctx context.Context, uuid string)
	// WaitLogStoreTimeoutIndexed waits log store timeout by index.
	WaitLogStoreTimeoutIndexed(ctx context.Context, index int)
	// WaitLogStoreReported waits log store reported by uuid.
	WaitLogStoreReported(ctx context.Context, uuid string)
	// WaitLogStoreReportedIndexed waits log store reported by index.
	WaitLogStoreReportedIndexed(ctx context.Context, index int)
}

ClusterWaitState waits cluster state until timeout.

type FilterFunc

type FilterFunc func(morpc.Message, string) bool

FilterFunc returns true if traffic was allowed.

type LogService

type LogService interface {
	// Start sends heartbeat and start to handle command.
	Start() error
	// Close stops store
	Close() error
	// Status returns the status of service
	Status() ServiceStatus

	// ID returns uuid of store
	ID() string

	// IsLeaderHakeeper checks hakeeper information.
	IsLeaderHakeeper() (bool, error)

	// GetClusterState returns cluster information from hakeeper leader.
	GetClusterState() (*logpb.CheckerState, error)

	// SetInitialClusterInfo sets cluster initialize state.
	SetInitialClusterInfo(numOfLogShards, numOfTNShards, numOfLogReplicas uint64) error

	// StartHAKeeperReplica starts hakeeper replicas.
	StartHAKeeperReplica(replicaID uint64, initialReplicas map[uint64]dragonboat.Target, join bool) error

	// GetTaskService returns the taskService
	GetTaskService() (taskservice.TaskService, bool)
}

LogService describes expected behavior for log service.

type NetworkPartition

type NetworkPartition struct {
	// contains filtered or unexported fields
}

NetworkPartition records index of services from the same network partition.

func (NetworkPartition) ListCNServiceIndex

func (p NetworkPartition) ListCNServiceIndex() []uint32

func (NetworkPartition) ListLogServiceIndex

func (p NetworkPartition) ListLogServiceIndex() []uint32

ListLogServiceIndex lists index of all log services in the partition.

func (NetworkPartition) ListTNServiceIndex added in v1.0.0

func (p NetworkPartition) ListTNServiceIndex() []uint32

ListTNServiceIndex lists index of all tn services in the partition.

type Options

type Options struct {
	// contains filtered or unexported fields
}

Options are params for creating test cluster.

func DefaultOptions

func DefaultOptions() Options

DefaultOptions sets a list of recommended options.

func (Options) BuildHAKeeperConfig

func (opt Options) BuildHAKeeperConfig() hakeeper.Config

BuildHAKeeperConfig returns hakeeper.Config

We could check timeout for dn/log store via hakeeper.Config.

func (Options) GetCNEngineType

func (opt Options) GetCNEngineType() tnservice.StorageType

GetCNEngineType returns the engine type that the cnservice used

func (Options) GetTNStorageType added in v1.0.0

func (opt Options) GetTNStorageType() tnservice.StorageType

GetTNStorageType returns the storage type that the dnservice used

func (Options) WithCNHeartbeatInterval

func (opt Options) WithCNHeartbeatInterval(interval time.Duration) Options

WithCNHeartbeatInterval sets heartbeat interval fo cn service.

func (Options) WithCNOptionFunc added in v1.2.0

func (opt Options) WithCNOptionFunc(fn func(i int) []cnservice.Option) Options

WithCNOptionFunc set build cn options func

func (Options) WithCNServiceNum

func (opt Options) WithCNServiceNum(num int) Options

func (Options) WithCNShardNum

func (opt Options) WithCNShardNum(num uint64) Options

func (Options) WithCNUseDistributedTAEEngine

func (opt Options) WithCNUseDistributedTAEEngine() Options

WithCNUseDistributedTAEEngine use distributed tae engine as cn engine

func (Options) WithCNUseMemoryEngine

func (opt Options) WithCNUseMemoryEngine() Options

WithCNUseMemoryEngine use memory engine as cn engine

func (Options) WithHKCNStoreTimeout

func (opt Options) WithHKCNStoreTimeout(timeout time.Duration) Options

func (Options) WithHKCheckInterval

func (opt Options) WithHKCheckInterval(interval time.Duration) Options

WithHKCheckInterval sets check interval for hakeeper.

func (Options) WithHKLogStoreTimeout

func (opt Options) WithHKLogStoreTimeout(timeout time.Duration) Options

WithHKLogStoreTimeout sets log store timeout for hakeeper.

func (Options) WithHKTNStoreTimeout added in v1.0.0

func (opt Options) WithHKTNStoreTimeout(timeout time.Duration) Options

WithHKTNStoreTimeout sets tn store timeout for hakeeper.

func (Options) WithHKTickPerSecond

func (opt Options) WithHKTickPerSecond(tick int) Options

WithHKTickPerSecond sets tick per second for hakeeper.

func (Options) WithHostAddress

func (opt Options) WithHostAddress(host string) Options

WithHostAddress sets host address for all services.

func (Options) WithKeepData

func (opt Options) WithKeepData() Options

WithKeepData sets keep data after cluster closed.

func (Options) WithLogHeartbeatInterval

func (opt Options) WithLogHeartbeatInterval(interval time.Duration) Options

WithLogHeartbeatInterval sets heartbeat interval fo log service.

func (Options) WithLogLevel

func (opt Options) WithLogLevel(lvl zapcore.Level) Options

WithLogLevel sets log level.

func (Options) WithLogReplicaNum

func (opt Options) WithLogReplicaNum(num uint64) Options

WithLogReplicaNum sets log replica number for the cluster.

func (Options) WithLogServiceNum

func (opt Options) WithLogServiceNum(num int) Options

WithLogServiceNum sets log service number in the cluster.

func (Options) WithLogShardNum

func (opt Options) WithLogShardNum(num uint64) Options

WithLogShardNum sets log shard number in the cluster.

func (Options) WithLogger

func (opt Options) WithLogger(logger *zap.Logger) Options

WithLogger sets logger.

func (Options) WithLogtailCollectInterval added in v0.7.0

func (opt Options) WithLogtailCollectInterval(interval time.Duration) Options

WithLogtailCollectInterval sets collection interval for logtail push server.

func (Options) WithLogtailResponseSendTimeout added in v0.7.0

func (opt Options) WithLogtailResponseSendTimeout(timeout time.Duration) Options

WithLogtailResponseSendTimeout sets response send timeout for logtail push server.

func (Options) WithLogtailRpcMaxMessageSize added in v0.7.0

func (opt Options) WithLogtailRpcMaxMessageSize(size int64) Options

WithLogtailRpcMaxMessageSize sets max rpc message size for logtail push server.

func (Options) WithPullWorkerPoolSize added in v1.2.1

func (opt Options) WithPullWorkerPoolSize(s int64) Options

WithLogtailResponseSendTimeout sets response send timeout for logtail push server.

func (Options) WithRootDataDir

func (opt Options) WithRootDataDir(root string) Options

WithRootDataDir sets root for service data directory.

func (Options) WithTNHeartbeatInterval added in v1.0.0

func (opt Options) WithTNHeartbeatInterval(interval time.Duration) Options

WithTNHeartbeatInterval sets heartbeat interval fo tn service.

func (Options) WithTNServiceNum added in v1.0.0

func (opt Options) WithTNServiceNum(num int) Options

WithTNServiceNum sets tn service number in the cluster.

func (Options) WithTNShardNum added in v1.2.0

func (opt Options) WithTNShardNum(num uint64) Options

WithTNShardNum sets tn shard number in the cluster.

func (Options) WithTNUseMEMStorage added in v1.0.0

func (opt Options) WithTNUseMEMStorage() Options

WithTNUseMEMStorage sets tn transaction use mem storage.

func (Options) WithTNUseTAEStorage added in v1.0.0

func (opt Options) WithTNUseTAEStorage() Options

WithTNUseTAEStorage sets tn transaction use tae storage.

type ServiceStatus

type ServiceStatus int

ServiceStatus indicates service status.

const (
	ServiceInitialized ServiceStatus = iota
	ServiceStarted
	ServiceClosed
)

type TNService added in v1.0.0

type TNService interface {
	// Start sends heartbeat and start to handle command.
	Start() error
	// Close stops store
	Close() error
	// Status returns the status of service.
	Status() ServiceStatus

	// ID returns uuid of store
	ID() string

	// StartTNReplica start the TNShard replica
	StartTNReplica(shard metadata.TNShard) error
	// CloseTNReplica close the TNShard replica.
	CloseTNReplica(shard metadata.TNShard) error

	// GetTaskService returns the taskservice
	GetTaskService() (taskservice.TaskService, bool)
}

TNService describes expected behavior for tn service.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL