service

package
v0.8.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 9, 2023 License: Apache-2.0 Imports: 38 Imported by: 0

Documentation

Overview

Package integration implements an integration test framework. Via this package, one could start a cluster easily. It supports different operations for simulation purpose, and it also provides different ways to check cluster state.

Index

Constants

View Source
const (
	// The expected number of dn replicas.
	DNShardExpectedSize = 1
)

Variables

This section is empty.

Functions

func ParseDNShardReportedSize

func ParseDNShardReportedSize(
	shardID uint64, state pb.DNState, hkcfg hakeeper.Config, currTick uint64,
) int

ParseDNShardReportedSize returns the reported count of dn replicas.

func ParseExpectedDNShardCount

func ParseExpectedDNShardCount(cluster pb.ClusterInfo) int

ParseExpectedDNShardCount returns the expected count of dn shards.

func ParseExpectedLogShardCount

func ParseExpectedLogShardCount(cluster pb.ClusterInfo) int

ParseExpectedLogShardCount returns the expected count of log shards.

func ParseLogShardExpectedSize

func ParseLogShardExpectedSize(shardID uint64, cluster pb.ClusterInfo) int

ParseLogShardExpectedSize returns the expected count of log replicas.

func ParseLogShardReportedSize

func ParseLogShardReportedSize(
	shardID uint64, state pb.LogState, hkcfg hakeeper.Config, currTick uint64,
) int

ParseLogShardReportedSize returns the reported count of log replicas.

func ParseReportedDNShardCount

func ParseReportedDNShardCount(
	state pb.DNState, hkcfg hakeeper.Config, currTick uint64,
) int

ParseReportedDNShardCount returns the reported count of dn shards.

func ParseReportedLogShardCount

func ParseReportedLogShardCount(
	state pb.LogState, hkcfg hakeeper.Config, currTick uint64,
) int

ParseReportedLogShardCount returns the reported count of log shards.

Types

type CNService

type CNService interface {
	// Start sends heartbeat and start to handle command.
	Start() error
	// Close stops store
	Close() error
	// Status returns the status of service.
	Status() ServiceStatus

	// ID returns uuid of store
	ID() string
	// SQLAddress returns the sql listen address
	SQLAddress() string
	//GetTaskRunner returns the taskRunner.
	GetTaskRunner() taskservice.TaskRunner
	// GetTaskService returns the taskservice
	GetTaskService() (taskservice.TaskService, bool)
	// WaitSystemInitCompleted wait system init task completed
	WaitSystemInitCompleted(ctx context.Context) error
	//SetCancel sets CancelFunc to stop GetClusterDetailsFromHAKeeper
	SetCancel(context.CancelFunc)
}

CNService describes expected behavior for dn service.

type Cluster

type Cluster interface {
	// Start starts svcs sequentially, after start, system init is completed.
	Start() error
	// Close stops svcs sequentially
	Close() error
	// Options returns the adjusted options
	Options() Options

	ClusterOperation
	ClusterAwareness
	ClusterState
	ClusterWaitState
}

Cluster describes behavior of test framework.

func NewCluster

func NewCluster(ctx context.Context, t *testing.T, opt Options) (Cluster, error)

NewCluster construct a cluster for integration test.

type ClusterAwareness

type ClusterAwareness interface {
	// ListDNServices lists uuid of all dn services.
	ListDNServices() []string
	// ListLogServices lists uuid of all log services.
	ListLogServices() []string
	// ListCnServices lists uuid of all cn services.
	ListCnServices() []string
	// ListHAKeeperServices lists all hakeeper log services.
	ListHAKeeperServices() []LogService

	// GetDNService fetches dn service instance by uuid.
	GetDNService(uuid string) (DNService, error)
	// GetLogService fetches log service instance by index.
	GetLogService(uuid string) (LogService, error)
	// GetDNServiceIndexed fetches dn service instance by uuid.
	GetDNServiceIndexed(index int) (DNService, error)
	// GetLogServiceIndexed fetches log service instance by index.
	GetLogServiceIndexed(index int) (LogService, error)
	// GetCNService fetches cn service instance by index.
	GetCNService(uuid string) (CNService, error)
	// GetCNServiceIndexed fetches cn service instance by index.
	GetCNServiceIndexed(index int) (CNService, error)

	// GetClusterState fetches current cluster state
	GetClusterState(ctx context.Context) (*logpb.CheckerState, error)
}

ClusterAwareness provides cluster awareness information.

type ClusterOperation

type ClusterOperation interface {
	// CloseDNService closes dn service by uuid.
	CloseDNService(uuid string) error
	// StartDNService starts dn service by uuid.
	StartDNService(uuid string) error

	// CloseDNServiceIndexed closes dn service by its index.
	CloseDNServiceIndexed(index int) error
	// StartDNServiceIndexed starts dn service by its index.
	StartDNServiceIndexed(index int) error

	// CloseLogService closes log service by uuid.
	CloseLogService(uuid string) error
	// StartLogService starts log service by uuid.
	StartLogService(uuid string) error

	// CloseLogServiceIndexed closes log service by its index.
	CloseLogServiceIndexed(index int) error
	// StartLogServiceIndexed starts log service by its index.
	StartLogServiceIndexed(index int) error

	// CloseCNService closes cn service by uuid.
	CloseCNService(uuid string) error
	// StartCNService starts cn service by uuid.
	StartCNService(uuid string) error

	// CloseCNServiceIndexed closes cn service by its index.
	CloseCNServiceIndexed(index int) error
	// StartCNServiceIndexed starts cn service by its index.
	StartCNServiceIndexed(index int) error

	// NewNetworkPartition constructs network partition from service index.
	NewNetworkPartition(dnIndexes, logIndexes, cnIndexes []uint32) NetworkPartition
	// RemainingNetworkPartition returns partition for the remaining services.
	RemainingNetworkPartition(partitions ...NetworkPartition) NetworkPartition
	// StartNetworkPartition enables network partition feature.
	StartNetworkPartition(partitions ...NetworkPartition)
	// CloseNetworkPartition disables network partition feature.
	CloseNetworkPartition()
}

ClusterOperation supports kinds of cluster operations.

type ClusterState

type ClusterState interface {
	// ListDNShards lists all dn shards within the cluster.
	ListDNShards(ctx context.Context) ([]metadata.DNShardRecord, error)
	// ListLogShards lists all log shards within the cluster.
	ListLogShards(ctx context.Context) ([]metadata.LogShardRecord, error)

	// GetDNStoreInfo gets dn store information by uuid.
	GetDNStoreInfo(ctx context.Context, uuid string) (logpb.DNStoreInfo, error)
	// GetDNStoreInfoIndexed gets dn store information by index.
	GetDNStoreInfoIndexed(ctx context.Context, index int) (logpb.DNStoreInfo, error)

	// GetLogStoreInfo gets log store information by uuid.
	GetLogStoreInfo(ctx context.Context, uuid string) (logpb.LogStoreInfo, error)
	// GetLogStoreInfoIndexed gets log store information by index.
	GetLogStoreInfoIndexed(ctx context.Context, index int) (logpb.LogStoreInfo, error)

	// GetCNStoreInfo gets cn store information by uuid.
	GetCNStoreInfo(ctx context.Context, uuid string) (logpb.CNStoreInfo, error)
	// GetCNStoreInfoIndexed gets cn store information by index.
	GetCNStoreInfoIndexed(ctx context.Context, index int) (logpb.CNStoreInfo, error)

	// GetHAKeeperState returns hakeeper state from running hakeeper.
	GetHAKeeperState() logpb.HAKeeperState
	// GetHAKeeperConfig returns hakeeper configuration.
	GetHAKeeperConfig() hakeeper.Config

	// DNStoreExpired checks dn store expired or not by uuid.
	DNStoreExpired(uuid string) (bool, error)
	// DNStoreExpiredIndexed checks dn store expired or not by index.
	DNStoreExpiredIndexed(index int) (bool, error)
	// LogStoreExpired checks log store expired or not by uuid.
	LogStoreExpired(uuid string) (bool, error)
	// LogStoreExpiredIndexed checks log store expired or not by index.
	LogStoreExpiredIndexed(index int) (bool, error)
	// CNStoreExpired checks cn store expired or not by uuid.
	CNStoreExpired(uuid string) (bool, error)
	// CNStoreExpiredIndexed checks cn store expired or not by index.
	CNStoreExpiredIndexed(index int) (bool, error)

	// IsClusterHealthy checks whether cluster is healthy or not.
	IsClusterHealthy() bool
}

ClusterState provides cluster running state.

type ClusterWaitState

type ClusterWaitState interface {
	// WaitHAKeeperLeader waits hakeeper leader elected and return it.
	WaitHAKeeperLeader(ctx context.Context) LogService
	// WaitHAKeeperState waits the specific hakeeper state.
	WaitHAKeeperState(ctx context.Context, expected logpb.HAKeeperState)

	// WaitDNShardsReported waits the expected count of dn shards reported.
	WaitDNShardsReported(ctx context.Context)
	// WaitLogShardsReported waits the expected count of log shards reported.
	WaitLogShardsReported(ctx context.Context)
	// WaitDNReplicaReported waits dn replica reported.
	WaitDNReplicaReported(ctx context.Context, shardID uint64)
	// WaitLogReplicaReported waits log replicas reported.
	WaitLogReplicaReported(ctx context.Context, shardID uint64)

	// WaitDNStoreTimeout waits dn store timeout by uuid.
	WaitDNStoreTimeout(ctx context.Context, uuid string)
	// WaitDNStoreTimeoutIndexed waits dn store timeout by index.
	WaitDNStoreTimeoutIndexed(ctx context.Context, index int)
	// WaitDNStoreReported waits dn store reported by uuid.
	WaitDNStoreReported(ctx context.Context, uuid string)
	// WaitDNStoreReportedIndexed waits dn store reported by index.
	WaitDNStoreReportedIndexed(ctx context.Context, index int)
	// WaitDNStoreTaskServiceCreated waits dn store task service started by uuid.
	WaitDNStoreTaskServiceCreated(ctx context.Context, uuid string)
	// WaitDNStoreTaskServiceCreatedIndexed waits dn store task service started by index.
	WaitDNStoreTaskServiceCreatedIndexed(ctx context.Context, index int)
	// WaitCNStoreReported waits cn store reported by uuid.
	WaitCNStoreReported(ctx context.Context, uuid string)
	// WaitCNStoreReportedIndexed waits cn store reported by index.
	WaitCNStoreReportedIndexed(ctx context.Context, index int)
	// WaitCNStoreTaskServiceCreated waits cn store task service started by uuid.
	WaitCNStoreTaskServiceCreated(ctx context.Context, uuid string)
	// WaitCNStoreTaskServiceCreatedIndexed waits cn store task service started by index.
	WaitCNStoreTaskServiceCreatedIndexed(ctx context.Context, index int)
	// WaitLogStoreTaskServiceCreated waits log store task service started by uuid
	WaitLogStoreTaskServiceCreated(ctx context.Context, uuid string)
	// WaitLogStoreTaskServiceCreatedIndexed waits log store task service started by index
	WaitLogStoreTaskServiceCreatedIndexed(ctx context.Context, index int)

	// WaitLogStoreTimeout waits log store timeout by uuid.
	WaitLogStoreTimeout(ctx context.Context, uuid string)
	// WaitLogStoreTimeoutIndexed waits log store timeout by index.
	WaitLogStoreTimeoutIndexed(ctx context.Context, index int)
	// WaitLogStoreReported waits log store reported by uuid.
	WaitLogStoreReported(ctx context.Context, uuid string)
	// WaitLogStoreReportedIndexed waits log store reported by index.
	WaitLogStoreReportedIndexed(ctx context.Context, index int)
}

ClusterWaitState waits cluster state until timeout.

type DNService

type DNService interface {
	// Start sends heartbeat and start to handle command.
	Start() error
	// Close stops store
	Close() error
	// Status returns the status of service.
	Status() ServiceStatus

	// ID returns uuid of store
	ID() string

	// StartDNReplica start the DNShard replica
	StartDNReplica(shard metadata.DNShard) error
	// CloseDNReplica close the DNShard replica.
	CloseDNReplica(shard metadata.DNShard) error

	// GetTaskService returns the taskservice
	GetTaskService() (taskservice.TaskService, bool)
}

DNService describes expected behavior for dn service.

type FilterFunc

type FilterFunc func(morpc.Message, string) bool

FilterFunc returns true if traffic was allowed.

type LogService

type LogService interface {
	// Start sends heartbeat and start to handle command.
	Start() error
	// Close stops store
	Close() error
	// Status returns the status of service
	Status() ServiceStatus

	// ID returns uuid of store
	ID() string

	// IsLeaderHakeeper checks hakeeper information.
	IsLeaderHakeeper() (bool, error)

	// GetClusterState returns cluster information from hakeeper leader.
	GetClusterState() (*logpb.CheckerState, error)

	// SetInitialClusterInfo sets cluster initialize state.
	SetInitialClusterInfo(numOfLogShards, numOfDNShards, numOfLogReplicas uint64) error

	// StartHAKeeperReplica starts hakeeper replicas.
	StartHAKeeperReplica(replicaID uint64, initialReplicas map[uint64]dragonboat.Target, join bool) error

	// GetTaskService returns the taskService
	GetTaskService() (taskservice.TaskService, bool)

	// CreateInitTasks create init task
	CreateInitTasks() error
}

LogService describes expected behavior for log service.

type NetworkPartition

type NetworkPartition struct {
	// contains filtered or unexported fields
}

NetworkPartition records index of services from the same network partition.

func (NetworkPartition) ListCNServiceIndex

func (p NetworkPartition) ListCNServiceIndex() []uint32

func (NetworkPartition) ListDNServiceIndex

func (p NetworkPartition) ListDNServiceIndex() []uint32

ListDNServiceIndex lists index of all dn services in the partition.

func (NetworkPartition) ListLogServiceIndex

func (p NetworkPartition) ListLogServiceIndex() []uint32

ListLogServiceIndex lists index of all log services in the partition.

type Options

type Options struct {
	// contains filtered or unexported fields
}

Options are params for creating test cluster.

func DefaultOptions

func DefaultOptions() Options

DefaultOptions sets a list of recommended options.

func (Options) BuildHAKeeperConfig

func (opt Options) BuildHAKeeperConfig() hakeeper.Config

BuildHAKeeperConfig returns hakeeper.Config

We could check timeout for dn/log store via hakeeper.Config.

func (Options) GetCNEngineType

func (opt Options) GetCNEngineType() dnservice.StorageType

GetCNEngineType returns the engine type that the cnservice used

func (Options) GetDNStorageType

func (opt Options) GetDNStorageType() dnservice.StorageType

GetDNStorageType returns the storage type that the dnservice used

func (Options) WithCNHeartbeatInterval

func (opt Options) WithCNHeartbeatInterval(interval time.Duration) Options

WithCNHeartbeatInterval sets heartbeat interval fo cn service.

func (Options) WithCNServiceNum

func (opt Options) WithCNServiceNum(num int) Options

func (Options) WithCNShardNum

func (opt Options) WithCNShardNum(num uint64) Options

func (Options) WithCNUseDistributedTAEEngine

func (opt Options) WithCNUseDistributedTAEEngine() Options

WithCNUseDistributedTAEEngine use distributed tae engine as cn engine

func (Options) WithCNUseMemoryEngine

func (opt Options) WithCNUseMemoryEngine() Options

WithCNUseMemoryEngine use memory engine as cn engine

func (Options) WithDNHeartbeatInterval

func (opt Options) WithDNHeartbeatInterval(interval time.Duration) Options

WithDNHeartbeatInterval sets heartbeat interval fo dn service.

func (Options) WithDNServiceNum

func (opt Options) WithDNServiceNum(num int) Options

WithDNServiceNum sets dn service number in the cluster.

func (Options) WithDNShardNum

func (opt Options) WithDNShardNum(num uint64) Options

WithDNShardNum sets dn shard number in the cluster.

func (Options) WithDNUseMEMStorage

func (opt Options) WithDNUseMEMStorage() Options

WithDNUseMEMStorage sets dn transaction use mem storage.

func (Options) WithDNUseTAEStorage

func (opt Options) WithDNUseTAEStorage() Options

WithDNUseTAEStorage sets dn transaction use tae storage.

func (Options) WithHKCNStoreTimeout

func (opt Options) WithHKCNStoreTimeout(timeout time.Duration) Options

func (Options) WithHKCheckInterval

func (opt Options) WithHKCheckInterval(interval time.Duration) Options

WithHKCheckInterval sets check interval for hakeeper.

func (Options) WithHKDNStoreTimeout

func (opt Options) WithHKDNStoreTimeout(timeout time.Duration) Options

WithHKDNStoreTimeout sets dn store timeout for hakeeper.

func (Options) WithHKLogStoreTimeout

func (opt Options) WithHKLogStoreTimeout(timeout time.Duration) Options

WithHKLogStoreTimeout sets log store timeout for hakeeper.

func (Options) WithHKTickPerSecond

func (opt Options) WithHKTickPerSecond(tick int) Options

WithHKTickPerSecond sets tick per second for hakeeper.

func (Options) WithHostAddress

func (opt Options) WithHostAddress(host string) Options

WithHostAddress sets host address for all services.

func (Options) WithKeepData

func (opt Options) WithKeepData() Options

WithKeepData sets keep data after cluster closed.

func (Options) WithLogHeartbeatInterval

func (opt Options) WithLogHeartbeatInterval(interval time.Duration) Options

WithLogHeartbeatInterval sets heartbeat interval fo log service.

func (Options) WithLogLevel

func (opt Options) WithLogLevel(lvl zapcore.Level) Options

WithLogLevel sets log level.

func (Options) WithLogReplicaNum

func (opt Options) WithLogReplicaNum(num uint64) Options

WithLogReplicaNum sets log replica number for the cluster.

func (Options) WithLogServiceNum

func (opt Options) WithLogServiceNum(num int) Options

WithLogServiceNum sets log service number in the cluster.

func (Options) WithLogShardNum

func (opt Options) WithLogShardNum(num uint64) Options

WithLogShardNum sets log shard number in the cluster.

func (Options) WithLogger

func (opt Options) WithLogger(logger *zap.Logger) Options

WithLogger sets logger.

func (Options) WithLogtailCollectInterval added in v0.7.0

func (opt Options) WithLogtailCollectInterval(interval time.Duration) Options

WithLogtailCollectInterval sets collection interval for logtail push server.

func (Options) WithLogtailMaxFetchFailure added in v0.7.0

func (opt Options) WithLogtailMaxFetchFailure(max int) Options

WithLogtailMaxFetchFailure sets max failure times when collecting logtail.

func (Options) WithLogtailResponseSendTimeout added in v0.7.0

func (opt Options) WithLogtailResponseSendTimeout(timeout time.Duration) Options

WithLogtailResponseSendTimeout sets response send timeout for logtail push server.

func (Options) WithLogtailRpcMaxMessageSize added in v0.7.0

func (opt Options) WithLogtailRpcMaxMessageSize(size int64) Options

WithLogtailRpcMaxMessageSize sets max rpc message size for logtail push server.

func (Options) WithLogtailRpcPayloadCopyBufferSize added in v0.7.0

func (opt Options) WithLogtailRpcPayloadCopyBufferSize(size int64) Options

WithLogtailRpcPayloadCopyBufferSize sets max rpc payload copy buffer size.

func (Options) WithRootDataDir

func (opt Options) WithRootDataDir(root string) Options

WithRootDataDir sets root for service data directory.

type ServiceStatus

type ServiceStatus int

ServiceStatus indicates service status.

const (
	ServiceInitialized ServiceStatus = iota
	ServiceStarted
	ServiceClosed
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL