cnservice

package
v1.2.3-hotfix-20241016 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 16, 2024 License: Apache-2.0 Imports: 78 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func SaveProfile added in v1.2.0

func SaveProfile(profilePath string, profileType string, etlFS fileservice.FileService)

SaveProfile saves profile into etl fs profileType defined in pkg/util/profile/profile.go

Types

type Config

type Config struct {
	// UUID cn store uuid
	UUID string `toml:"uuid"`
	// Role cn node role, [AP|TP]
	Role string `toml:"role"`

	// ListenAddress listening address for receiving external requests
	ListenAddress string `toml:"listen-address"`
	// ServiceAddress service address for communication, if this address is not set, use
	// ListenAddress as the communication address.
	ServiceAddress string `toml:"service-address"`
	// SQLAddress service address for receiving external sql client
	SQLAddress string `toml:"sql-address"`

	// PortBase is the base port for the service. We reserve reservedPorts for
	// the service to start internal server inside it.
	//
	// TODO(volgariver6): The value of this field is also used to determine the version
	// of MO. If it is not set, we use the old listen-address/service-address fields, and
	// if it is set, we use the new policy to distribute the ports to all services.
	PortBase int `toml:"port-base" user_setting:"basic"`
	// ServiceHost is the host name/IP for the service address of RPC request. There is
	// no port value in it.
	ServiceHost string `toml:"service-host" user_setting:"basic"`

	Engine struct {
		Type     EngineType           `toml:"type"`
		Logstore options.LogstoreType `toml:"logstore"`
	}

	// parameters for cn-server related buffer.
	ReadBufferSize  int
	WriteBufferSize int

	// Pipeline configuration
	Pipeline struct {
		// HostSize is the memory limit
		HostSize int64 `toml:"host-size"`
		// GuestSize is the memory limit for one query
		GuestSize int64 `toml:"guest-size"`
		// BatchRows is the batch rows limit for one batch
		BatchRows int64 `toml:"batch-rows"`
		// BatchSize is the memory limit for one batch
		BatchSize int64 `toml:"batch-size"`
	}

	// Frontend parameters for the frontend
	Frontend config.FrontendParameters `toml:"frontend"`

	// HAKeeper configuration
	HAKeeper struct {
		// HeatbeatInterval heartbeat interval to send message to hakeeper. Default is 1s
		HeatbeatInterval toml.Duration `toml:"hakeeper-heartbeat-interval"`
		// HeatbeatTimeout heartbeat request timeout. Default is 500ms
		HeatbeatTimeout toml.Duration `toml:"hakeeper-heartbeat-timeout"`
		// DiscoveryTimeout discovery HAKeeper service timeout. Default is 30s
		DiscoveryTimeout toml.Duration `toml:"hakeeper-discovery-timeout"`
		// ClientConfig hakeeper client configuration
		ClientConfig logservice.HAKeeperClientConfig
	}

	// TaskRunner configuration
	TaskRunner struct {
		QueryLimit        int           `toml:"task-query-limit"`
		Parallelism       int           `toml:"task-parallelism"`
		MaxWaitTasks      int           `toml:"task-max-wait-tasks"`
		FetchInterval     toml.Duration `toml:"task-fetch-interval"`
		FetchTimeout      toml.Duration `toml:"task-fetch-timeout"`
		RetryInterval     toml.Duration `toml:"task-retry-interval"`
		HeartbeatInterval toml.Duration `toml:"task-heartbeat-interval"`
		HeartbeatTimeout  toml.Duration `toml:"task-heartbeat-timeout"`
	}

	// RPC rpc config used to build txn sender
	RPC rpc.Config `toml:"rpc"`

	// Cluster configuration
	Cluster struct {
		// RefreshInterval refresh cluster info from hakeeper interval
		RefreshInterval toml.Duration `toml:"refresh-interval"`
	}

	// LockService lockservice
	LockService lockservice.Config `toml:"lockservice"`

	// Txn txn config
	Txn struct {
		// Isolation txn isolation. SI or RC
		// when Isolation is not set. we will set SI when Mode is optimistic, RC when Mode is pessimistic
		Isolation string `toml:"isolation" user_setting:"advanced"`
		// Mode txn mode. optimistic or pessimistic, default is pessimistic
		Mode string `toml:"mode" user_setting:"advanced"`
		// EnableSacrificingFreshness In Push Mode, the transaction is not guaranteed
		// to see the latest commit data, and the latest Logtail commit timestamp received
		// by the current CN + 1 is used as the start time of the transaction. But it will
		// ensure that the transactions of the same database connection can see the writes
		// of the previous committed transactions.
		// -1: disable
		//	0: auto config based on txn mode
		//  1: enable
		EnableSacrificingFreshness int `toml:"enable-sacrificing-freshness"`
		// EnableCNBasedConsistency ensure that all the transactions on a CN can read
		// the writes of the previous committed transaction
		// -1: disable
		//	0: auto config based on txn mode
		//  1: enable
		EnableCNBasedConsistency int `toml:"enable-cn-based-consistency"`
		// EnableRefreshExpressionIn RC mode, in the event of a conflict, the later transaction
		// needs to see the latest data after the previous transaction commits. At this time we
		// need to re-read the data, re-read the latest data, and re-compute the expression. This
		// feature was turned off in 0.8 and is not supported for now. The replacement solution is
		// to return a retry error and let the whole computation re-execute.
		// -1: disable
		//	0: auto config based on txn mode
		//  1: enable
		EnableRefreshExpression int `toml:"enable-refresh-expression"`
		// EnableLeakCheck enable txn leak check
		// -1: disable
		//	0: auto config based on txn mode
		//  1: enable
		EnableLeakCheck int `toml:"enable-leak-check"`
		// MaxActiveAges a txn max active duration
		MaxActiveAges toml.Duration `toml:"max-active-ages"`
		// EnableCheckRCInvalidError this config is used to check and find RC bugs in pessimistic mode.
		// Will remove it later version.
		EnableCheckRCInvalidError bool `toml:"enable-check-rc-invalid-error"`
		// Limit flow control of transaction creation, maximum number of transactions per second. Default
		// is unlimited.
		Limit int `toml:"limit-per-second"`
		// MaxActive is the count of max active txn in current cn.  If reached max value, the txn
		// is added to a FIFO queue. Default is unlimited.
		MaxActive int `toml:"max-active"`
		// NormalStateNoWait is the config to control if it waits for the transaction client
		// to be normal state. If the value is false, it waits until the transaction client to be
		// normal state; if the value is true, it does not wait and just return an error to the
		// client. Default value is false.
		NormalStateNoWait bool `toml:"normal-state-no-wait"`
		//PKDedupCount check whether primary key in transaction's workspace is duplicated if the count of pk
		// is less than PKDedupCount when txn commits. Default value is 0 , which means don't do deduplication.
		PkDedupCount int `toml:"pk-dedup-count"`

		// Trace trace
		Trace struct {
			BufferSize    int           `toml:"buffer-size"`
			FlushBytes    toml.ByteSize `toml:"flush-bytes"`
			FlushDuration toml.Duration `toml:"force-flush-duration"`
			Dir           string        `toml:"dir"`
			Enable        bool          `toml:"enable"`
			Tables        []uint64      `toml:"tables"`
		} `toml:"trace"`
	} `toml:"txn"`

	// AutoIncrement auto increment config
	AutoIncrement incrservice.Config `toml:"auto-increment"`

	// QueryServiceConfig is the config for query service.
	QueryServiceConfig queryservice.Config `toml:"query-service"`

	// PrimaryKeyCheck
	PrimaryKeyCheck bool `toml:"primary-key-check"`

	// MaxPreparedStmtCount
	MaxPreparedStmtCount int `toml:"max_prepared_stmt_count"`

	// InitWorkState is the initial work state for CN. Valid values are:
	// "working", "draining" and "drained".
	InitWorkState string `toml:"init-work-state"`

	PythonUdfClient pythonservice.ClientConfig `toml:"python-udf-client"`

	// LogtailUpdateWorkerFactor is the times of CPU number of this node
	// to start update workers.
	LogtailUpdateWorkerFactor int `toml:"logtail-update-worker-factor"`

	// Whether to automatically upgrade when system startup
	AutomaticUpgrade       bool `toml:"auto-upgrade"`
	UpgradeTenantBatchSize int  `toml:"upgrade-tenant-batch"`
}

Config cn service

func (*Config) SetDefaultValue added in v1.1.0

func (c *Config) SetDefaultValue()

SetDefaultValue setups the default of the config. most of the code are copied from the Validate. But, the Validate may change some global variables that the SetDefaultValue does not need. So, need a different function.

func (*Config) Validate

func (c *Config) Validate() error

type EngineType

type EngineType string
const (
	EngineDistributedTAE       EngineType = "distributed-tae"
	EngineMemory               EngineType = "memory"
	EngineNonDistributedMemory EngineType = "non-distributed-memory"
	// ReservedTasks equals how many task must run background.
	// 1 for metric StorageUsage
	// 1 for trace ETLMerge
	ReservedTasks = 2
)

type Option

type Option func(*service)

Option option to create cn service

func WithBootstrapOptions added in v1.2.0

func WithBootstrapOptions(options ...bootstrap.Option) Option

WithBootstrapOptions setup bootstrap options

func WithConfigData added in v1.1.0

func WithConfigData(data map[string]*logservicepb.ConfigItem) Option

WithConfigData saves the data from the config file

func WithLogger

func WithLogger(logger *zap.Logger) Option

WithLogger setup cn service's logger

func WithMessageHandle

func WithMessageHandle(f func(ctx context.Context,
	cnAddr string,
	message morpc.Message,
	cs morpc.ClientSession,
	engine engine.Engine,
	fs fileservice.FileService,
	lockService lockservice.LockService,
	queryClient qclient.QueryClient,
	hakeeper logservice.CNHAKeeperClient,
	udfService udf.Service,
	cli client.TxnClient,
	aicm *defines.AutoIncrCacheManager,
	mAcquirer func() morpc.Message) error) Option

WithMessageHandle setup message handle

func WithTaskStorageFactory

func WithTaskStorageFactory(factory taskservice.TaskStorageFactory) Option

WithTaskStorageFactory setup the special task storage factory

func WithTxnTraceData added in v1.2.0

func WithTxnTraceData(traceDataPath string) Option

type PortSlot added in v1.0.0

type PortSlot int
const (
	PipelineService PortSlot = iota
	LockService
	QueryService
	Gossip
	MaxService
)

New service should add before the last one.

func (PortSlot) String added in v1.0.0

func (s PortSlot) String() string

String implements the fmt.Stringer interface.

type Service

type Service interface {
	Start() error
	Close() error
	// ID returns UUID of the service.
	ID() string
	GetTaskRunner() taskservice.TaskRunner
	GetTaskService() (taskservice.TaskService, bool)
	GetSQLExecutor() executor.SQLExecutor
	GetBootstrapService() bootstrap.Service
}

func NewService

func NewService(
	cfg *Config,
	ctx context.Context,
	fileService fileservice.FileService,
	gossipNode *gossip.Node,
	options ...Option,
) (Service, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL