cnservice

package
v1.0.0-rc2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 24, 2023 License: Apache-2.0 Imports: 68 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// UUID cn store uuid
	UUID string `toml:"uuid"`
	// Role cn node role, [AP|TP]
	Role string `toml:"role"`

	// ListenAddress listening address for receiving external requests
	ListenAddress string `toml:"listen-address"`
	// ServiceAddress service address for communication, if this address is not set, use
	// ListenAddress as the communication address.
	ServiceAddress string `toml:"service-address"`
	// SQLAddress service address for receiving external sql client
	SQLAddress string `toml:"sql-address"`

	// PortBase is the base port for the service. We reserve reservedPorts for
	// the service to start internal server inside it.
	//
	// TODO(volgariver6): The value of this field is also used to determine the version
	// of MO. If it is not set, we use the old listen-address/service-address fields, and
	// if it is set, we use the new policy to distribute the ports to all services.
	PortBase int `toml:"port-base"`
	// ServiceHost is the host name/IP for the service address of RPC request. There is
	// no port value in it.
	ServiceHost string `toml:"service-host"`

	Engine struct {
		Type     EngineType           `toml:"type"`
		Logstore options.LogstoreType `toml:"logstore"`
	}

	// parameters for cn-server related buffer.
	ReadBufferSize  int
	WriteBufferSize int

	// Pipeline configuration
	Pipeline struct {
		// HostSize is the memory limit
		HostSize int64 `toml:"host-size"`
		// GuestSize is the memory limit for one query
		GuestSize int64 `toml:"guest-size"`
		// BatchRows is the batch rows limit for one batch
		BatchRows int64 `toml:"batch-rows"`
		// BatchSize is the memory limit for one batch
		BatchSize int64 `toml:"batch-size"`
	}

	// Frontend parameters for the frontend
	Frontend config.FrontendParameters `toml:"frontend"`

	// HAKeeper configuration
	HAKeeper struct {
		// HeatbeatInterval heartbeat interval to send message to hakeeper. Default is 1s
		HeatbeatInterval toml.Duration `toml:"hakeeper-heartbeat-interval"`
		// HeatbeatTimeout heartbeat request timeout. Default is 500ms
		HeatbeatTimeout toml.Duration `toml:"hakeeper-heartbeat-timeout"`
		// DiscoveryTimeout discovery HAKeeper service timeout. Default is 30s
		DiscoveryTimeout toml.Duration `toml:"hakeeper-discovery-timeout"`
		// ClientConfig hakeeper client configuration
		ClientConfig logservice.HAKeeperClientConfig
	}

	// TaskRunner configuration
	TaskRunner struct {
		QueryLimit        int           `toml:"task-query-limit"`
		Parallelism       int           `toml:"task-parallelism"`
		MaxWaitTasks      int           `toml:"task-max-wait-tasks"`
		FetchInterval     toml.Duration `toml:"task-fetch-interval"`
		FetchTimeout      toml.Duration `toml:"task-fetch-timeout"`
		RetryInterval     toml.Duration `toml:"task-retry-interval"`
		HeartbeatInterval toml.Duration `toml:"task-heartbeat-interval"`
	}

	// RPC rpc config used to build txn sender
	RPC rpc.Config `toml:"rpc"`

	// Cluster configuration
	Cluster struct {
		// RefreshInterval refresh cluster info from hakeeper interval
		RefreshInterval toml.Duration `toml:"refresh-interval"`
	}

	// LockService lockservice
	LockService lockservice.Config `toml:"lockservice"`

	// Txn txn config
	Txn struct {
		// Isolation txn isolation. SI or RC
		// when Isolation is not set. we will set SI when Mode is optimistic, RC when Mode is pessimistic
		Isolation string `toml:"isolation"`
		// Mode txn mode. optimistic or pessimistic, default is pessimistic
		Mode string `toml:"mode"`
		// EnableSacrificingFreshness In Push Mode, the transaction is not guaranteed
		// to see the latest commit data, and the latest Logtail commit timestamp received
		// by the current CN + 1 is used as the start time of the transaction. But it will
		// ensure that the transactions of the same database connection can see the writes
		// of the previous committed transactions.
		// -1: disable
		//	0: auto config based on txn mode
		//  1: enable
		EnableSacrificingFreshness int `toml:"enable-sacrificing-freshness"`
		// EnableCNBasedConsistency ensure that all the transactions on a CN can read
		// the writes of the previous committed transaction
		// -1: disable
		//	0: auto config based on txn mode
		//  1: enable
		EnableCNBasedConsistency int `toml:"enable-cn-based-consistency"`
		// EnableRefreshExpressionIn RC mode, in the event of a conflict, the later transaction
		// needs to see the latest data after the previous transaction commits. At this time we
		// need to re-read the data, re-read the latest data, and re-compute the expression. This
		// feature was turned off in 0.8 and is not supported for now. The replacement solution is
		// to return a retry error and let the whole computation re-execute.
		// -1: disable
		//	0: auto config based on txn mode
		//  1: enable
		EnableRefreshExpression int `toml:"enable-refresh-expression"`
		// EnableLeakCheck enable txn leak check
		// -1: disable
		//	0: auto config based on txn mode
		//  1: enable
		EnableLeakCheck int `toml:"enable-leak-check"`
		// MaxActiveAges a txn max active duration
		MaxActiveAges toml.Duration `toml:"max-active-ages"`
		// EnableCheckRCInvalidError this config is used to check and find RC bugs in pessimistic mode.
		// Will remove it later version.
		EnableCheckRCInvalidError bool `toml:"enable-check-rc-invalid-error"`
		// Limit flow control of transaction creation, maximum number of transactions per second. Default
		// is unlimited.
		Limit int `toml:"limit-per-second"`
		// MaxActive is the count of max active txn in current cn.  If reached max value, the txn
		// is added to a FIFO queue. Default is unlimited.
		MaxActive int `toml:"max-active"`
	} `toml:"txn"`

	// Ctl ctl service config. CtlService is used to handle ctl request. See mo_ctl for detail.
	Ctl ctlservice.Config `toml:"ctl"`

	// AutoIncrement auto increment config
	AutoIncrement incrservice.Config `toml:"auto-increment"`

	// QueryServiceConfig is the config for query service.
	QueryServiceConfig queryservice.Config `toml:"query-service"`

	// PrimaryKeyCheck
	PrimaryKeyCheck bool `toml:"primary-key-check"`

	// MaxPreparedStmtCount
	MaxPreparedStmtCount int `toml:"max_prepared_stmt_count"`

	// InitWorkState is the initial work state for CN. Valid values are:
	// "working", "draining" and "drained".
	InitWorkState string `toml:"init-work-state"`
}

Config cn service

func (*Config) Validate

func (c *Config) Validate() error

type EngineType

type EngineType string
const (
	EngineDistributedTAE       EngineType = "distributed-tae"
	EngineMemory               EngineType = "memory"
	EngineNonDistributedMemory EngineType = "non-distributed-memory"
	// ReservedTasks equals how many task must run background.
	// 1 for metric StorageUsage
	// 1 for trace ETLMerge
	ReservedTasks = 2
)

type Option

type Option func(*service)

Option option to create cn service

func WithLogger

func WithLogger(logger *zap.Logger) Option

WithLogger setup cn service's logger

func WithMessageHandle

func WithMessageHandle(f func(ctx context.Context,
	cnAddr string,
	message morpc.Message,
	cs morpc.ClientSession,
	engine engine.Engine,
	fs fileservice.FileService,
	lockService lockservice.LockService,
	queryService queryservice.QueryService,
	cli client.TxnClient,
	aicm *defines.AutoIncrCacheManager,
	mAcquirer func() morpc.Message) error) Option

WithMessageHandle setup message handle

func WithTaskStorageFactory

func WithTaskStorageFactory(factory taskservice.TaskStorageFactory) Option

WithTaskStorageFactory setup the special task strorage factory

type PortSlot added in v1.0.0

type PortSlot int
const (
	PipelineService PortSlot = iota
	LockService
	CtlService
	QueryService
	Gossip
	CacheService
	MaxService
)

New service should add before the last one.

func (PortSlot) String added in v1.0.0

func (s PortSlot) String() string

String implements the fmt.Stringer interface.

type Service

type Service interface {
	Start() error
	Close() error
	// ID returns UUID of the service.
	ID() string
	GetTaskRunner() taskservice.TaskRunner
	GetTaskService() (taskservice.TaskService, bool)
	WaitSystemInitCompleted(ctx context.Context) error
}

func NewService

func NewService(
	cfg *Config,
	ctx context.Context,
	fileService fileservice.FileService,
	gossipNode *gossip.Node,
	options ...Option,
) (Service, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL