framework

package
v0.0.0-...-beee317 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 3, 2025 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// UpstreamPD is upstream PD URI.
	UpstreamPD = "http://upstream-pd:2379"
	// UpstreamDSN is upstream database dsn
	UpstreamDSN = "root@tcp(127.0.0.1:4000)/"
	// DownstreamDSN is downstream database dsn
	DownstreamDSN = "root@tcp(127.0.0.1:5000)/"
	// DockerComposeFilePathPrefix is prefix of docker compose file path.
	DockerComposeFilePathPrefix = "/deployments/ticdc/docker-compose/"
	// ControllerContainerName is the ticdc controller container name.
	ControllerContainerName = "ticdc_controller"
)

Variables

This section is empty.

Functions

func CdcHealthCheck

func CdcHealthCheck(cdcContainer, pdEndpoint string) error

CdcHealthCheck check cdc cluster health.

Types

type Awaitable

type Awaitable interface {
	SetTimeOut(duration time.Duration) Awaitable
	Wait() Checkable
}

Awaitable represents the handle of an SQL operation that can be waited on

func All

func All(helper *SQLHelper, awaitables []Awaitable) Awaitable

All joins a slice of Awaitable sql requests. The request must be to the same table. TODO does not support composite primary key for now!

type CDCProfile

type CDCProfile struct {
	PDUri          string
	SinkURI        string
	ConfigFile     string
	SchemaRegistry string
}

CDCProfile represents the command line arguments used to create the changefeed

func (*CDCProfile) String

func (p *CDCProfile) String() string

String returns the string representation of the CDCProfile

type Checkable

type Checkable interface {
	Check() error
}

Checkable represents the handle of an SQL operation whose correctness can be checked

type DockerComposeOperator

type DockerComposeOperator struct {
	FileName      string
	Controller    string
	HealthChecker func() error
	ExecEnv       []string
}

DockerComposeOperator represent a docker compose

func (*DockerComposeOperator) DumpStdout

func (d *DockerComposeOperator) DumpStdout() error

DumpStdout dumps all container logs

func (*DockerComposeOperator) ExecInController

func (d *DockerComposeOperator) ExecInController(shellCmd string) ([]byte, error)

ExecInController provides a way to execute commands inside a container in the service

func (*DockerComposeOperator) RestartComponents

func (d *DockerComposeOperator) RestartComponents(names ...string)

RestartComponents restarts a docker-compose service

func (*DockerComposeOperator) Setup

func (d *DockerComposeOperator) Setup()

Setup brings up a docker-compose service

func (*DockerComposeOperator) TearDown

func (d *DockerComposeOperator) TearDown()

TearDown terminates a docker-compose service and remove all volumes

func (*DockerComposeOperator) WaitClusterStarted

func (d *DockerComposeOperator) WaitClusterStarted()

WaitClusterStarted waits the cluster is started and ready

type DockerEnv

type DockerEnv struct {
	DockerComposeOperator
}

DockerEnv represents the docker-compose service

func (*DockerEnv) Reset

func (e *DockerEnv) Reset()

Reset implements Environment

func (*DockerEnv) RunTest

func (e *DockerEnv) RunTest(task Task)

RunTest implements Environment

func (*DockerEnv) SetListener

func (e *DockerEnv) SetListener(states interface{}, listener MqListener)

SetListener implements Environment. Currently unfinished, will be used to monitor Kafka output

type Environment

type Environment interface {
	Setup()
	TearDown()
	Reset()
	RunTest(Task)
	SetListener(states interface{}, listener MqListener)
}

Environment is an abstract of the CDC-Upstream-Downstream-MQ complex to be tested

type MqListener

type MqListener func(states interface{}, topic string, key []byte, value []byte) error

MqListener represents a callback function for listening on the MQ output

type SQLHelper

type SQLHelper struct {
	// contains filtered or unexported fields
}

SQLHelper provides basic utilities for manipulating data

func (*SQLHelper) GetTable

func (h *SQLHelper) GetTable(tableName string) *Table

GetTable returns the handle of the given table

type Sendable

type Sendable interface {
	Send() Awaitable
}

Sendable is a sendable request to the upstream

type Table

type Table struct {
	// contains filtered or unexported fields
}

Table represents the handle of a table in the upstream

func (*Table) Delete

func (t *Table) Delete(rowData map[string]interface{}) Sendable

Delete returns a Sendable object that represents a Delete from clause

func (*Table) Insert

func (t *Table) Insert(rowData map[string]interface{}) Sendable

Insert returns a Sendable object that represents an Insert clause

func (*Table) Upsert

func (t *Table) Upsert(rowData map[string]interface{}) Sendable

Upsert returns a Sendable object that represents a Replace Into clause

type Task

type Task interface {
	Name() string
	GetCDCProfile() *CDCProfile
	Prepare(taskContext *TaskContext) error
	Run(taskContext *TaskContext) error
}

Task represents a single test case

type TaskContext

type TaskContext struct {
	Upstream     *sql.DB
	Downstream   *sql.DB
	Env          Environment
	WaitForReady func() error
	Ctx          context.Context
}

TaskContext is passed to the test case to provide basic utilities for testing

func (*TaskContext) CreateDB

func (c *TaskContext) CreateDB(name string) error

CreateDB creates a database in both the upstream and the downstream

func (*TaskContext) SQLHelper

func (c *TaskContext) SQLHelper() *SQLHelper

SQLHelper returns an SQLHelper

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL