cnservice

package
v0.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 30, 2023 License: Apache-2.0 Imports: 57 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// UUID cn store uuid
	UUID string `toml:"uuid"`
	// Role cn node role, [AP|TP]
	Role string `toml:"role"`

	// ListenAddress listening address for receiving external requests
	ListenAddress string `toml:"listen-address"`
	// ServiceAddress service address for communication, if this address is not set, use
	// ListenAddress as the communication address.
	ServiceAddress string `toml:"service-address"`
	// SQLAddress service address for receiving external sql client
	SQLAddress string `toml:"sql-address"`

	Engine struct {
		Type                EngineType           `toml:"type"`
		Logstore            options.LogstoreType `toml:"logstore"`
		FlushInterval       toml.Duration        `toml:"flush-interval"`
		MinCount            int64                `toml:"min-count"`
		ScanInterval        toml.Duration        `toml:"scan-interval"`
		IncrementalInterval toml.Duration        `toml:"incremental-interval"`
		GlobalMinCount      int64                `toml:"global-min-count"`
	}

	// parameters for cn-server related buffer.
	ReadBufferSize  int
	WriteBufferSize int

	// Pipeline configuration
	Pipeline struct {
		// HostSize is the memory limit
		HostSize int64 `toml:"host-size"`
		// GuestSize is the memory limit for one query
		GuestSize int64 `toml:"guest-size"`
		// OperatorSize is the memory limit for one operator
		OperatorSize int64 `toml:"operator-size"`
		// BatchRows is the batch rows limit for one batch
		BatchRows int64 `toml:"batch-rows"`
		// BatchSize is the memory limit for one batch
		BatchSize int64 `toml:"batch-size"`
	}

	// Frontend parameters for the frontend
	Frontend config.FrontendParameters `toml:"frontend"`

	// HAKeeper configuration
	HAKeeper struct {
		// HeatbeatInterval heartbeat interval to send message to hakeeper. Default is 1s
		HeatbeatInterval toml.Duration `toml:"hakeeper-heartbeat-interval"`
		// HeatbeatTimeout heartbeat request timeout. Default is 500ms
		HeatbeatTimeout toml.Duration `toml:"hakeeper-heartbeat-timeout"`
		// DiscoveryTimeout discovery HAKeeper service timeout. Default is 30s
		DiscoveryTimeout toml.Duration `toml:"hakeeper-discovery-timeout"`
		// ClientConfig hakeeper client configuration
		ClientConfig logservice.HAKeeperClientConfig
	}

	// TaskRunner configuration
	TaskRunner struct {
		QueryLimit        int           `toml:"task-query-limit"`
		Parallelism       int           `toml:"task-parallelism"`
		MaxWaitTasks      int           `toml:"task-max-wait-tasks"`
		FetchInterval     toml.Duration `toml:"task-fetch-interval"`
		FetchTimeout      toml.Duration `toml:"task-fetch-timeout"`
		RetryInterval     toml.Duration `toml:"task-retry-interval"`
		HeartbeatInterval toml.Duration `toml:"task-heartbeat-interval"`
	}

	// RPC rpc config used to build txn sender
	RPC rpc.Config `toml:"rpc"`

	// Cluster configuration
	Cluster struct {
		// RefreshInterval refresh cluster info from hakeeper interval
		RefreshInterval toml.Duration `toml:"refresh-interval"`
	}

	// LockService lockservice
	LockService lockservice.Config `toml:"lockservice"`

	// Txn txn config
	Txn struct {
		// Isolation txn isolation. SI or RC, default is SI
		Isolation string `toml:"isolation"`
		// Mode txn mode. optimistic or pessimistic, default is optimistic
		Mode string `toml:"mode"`
		// EnableSacrificingFreshness In Push Mode, the transaction is not guaranteed
		// to see the latest commit data, and the latest Logtail commit timestamp received
		// by the current CN + 1 is used as the start time of the transaction. But it will
		// ensure that the transactions of the same database connection can see the writes
		// of the previous committed transactions.
		EnableSacrificingFreshness bool `toml:"enable-sacrificing-freshness"`
		// EnableCNBasedConsistency ensure that all the transactions on a CN can read
		// the writes of the previous committed transaction
		EnableCNBasedConsistency bool `toml:"enable-cn-based-consistency"`
		// EnableRefreshExpressionIn RC mode, in the event of a conflict, the later transaction
		// needs to see the latest data after the previous transaction commits. At this time we
		// need to re-read the data, re-read the latest data, and re-compute the expression. This
		// feature was turned off in 0.8 and is not supported for now. The replacement solution is
		// to return a retry error and let the whole computation re-execute.
		EnableRefreshExpression bool `toml:"enable-refresh-expression"`
		// EnableLeakCheck enable txn leak check
		EnableLeakCheck bool `toml:"enable-leak-check"`
		// MaxActiveAges a txn max active duration
		MaxActiveAges toml.Duration `toml:"max-active-ages"`
	} `toml:"txn"`

	// Ctl ctl service config. CtlService is used to handle ctl request. See mo_ctl for detail.
	Ctl ctlservice.Config `toml:"ctl"`

	// AutoIncrement auto increment config
	AutoIncrement incrservice.Config `toml:"auto-increment"`

	// PrimaryKeyCheck
	PrimaryKeyCheck bool `toml:"primary-key-check"`
}

Config cn service

func (*Config) Validate

func (c *Config) Validate() error

type EngineType

type EngineType string
const (
	EngineDistributedTAE       EngineType = "distributed-tae"
	EngineMemory               EngineType = "memory"
	EngineNonDistributedMemory EngineType = "non-distributed-memory"
	// ReservedTasks equals how many task must run background.
	// 1 for metric StorageUsage
	// 1 for trace ETLMerge
	ReservedTasks = 2
)

type Option

type Option func(*service)

Option option to create cn service

func WithLogger

func WithLogger(logger *zap.Logger) Option

WithLogger setup cn service's logger

func WithMessageHandle

func WithMessageHandle(f func(ctx context.Context,
	cnAddr string,
	message morpc.Message,
	cs morpc.ClientSession,
	engine engine.Engine,
	fs fileservice.FileService,
	lockService lockservice.LockService,
	cli client.TxnClient,
	aicm *defines.AutoIncrCacheManager,
	mAcquirer func() morpc.Message) error) Option

WithMessageHandle setup message handle

func WithTaskStorageFactory

func WithTaskStorageFactory(factory taskservice.TaskStorageFactory) Option

WithTaskStorageFactory setup the special task strorage factory

type Service

type Service interface {
	Start() error
	Close() error

	GetTaskRunner() taskservice.TaskRunner
	GetTaskService() (taskservice.TaskService, bool)
	WaitSystemInitCompleted(ctx context.Context) error
}

func NewService

func NewService(
	cfg *Config,
	ctx context.Context,
	fileService fileservice.FileService,
	options ...Option,
) (Service, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL