tnservice

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 17, 2024 License: Apache-2.0 Imports: 48 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// StorageTAE TAE txn storage backend
	StorageTAE = StorageType("TAE")
	// StorageMEMKV MEMKV txn storage backend
	StorageMEMKV = StorageType("MEMKV")
	// StorageMEMKV MEM txn storage backend
	StorageMEM = StorageType("MEM")
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// DataDir data dir
	DataDir string `toml:"-" user_setting:"basic"`
	// UUID tn store uuid
	UUID string `toml:"uuid" user_setting:"basic"`
	// ListenAddress listening address for receiving external requests.
	ListenAddress string `toml:"listen-address"`
	// ServiceAddress service address for communication, if this address is not set, use
	// ListenAddress as the communication address.
	ServiceAddress string `toml:"service-address"`

	// PortBase is the base port for the service. We reserve reservedPorts for
	// the service to start internal server inside it.
	//
	// TODO(volgariver6): The value of this field is also used to determine the version
	// of MO. If it is not set, we use the old listen-address/service-address fields, and
	// if it is set, we use the new policy to distribute the ports to all services.
	PortBase int `toml:"port-base" user_setting:"basic"`
	// ServiceHost is the host name/IP for the service address of RPC request. There is
	// no port value in it.
	ServiceHost string `toml:"service-host" user_setting:"basic"`

	// HAKeeper configuration
	HAKeeper struct {
		// HeatbeatInterval heartbeat interval to send message to hakeeper. Default is 1s
		HeatbeatInterval toml.Duration `toml:"hakeeper-heartbeat-interval"`
		// HeatbeatTimeout heartbeat request timeout. Default is 3s
		HeatbeatTimeout toml.Duration `toml:"hakeeper-heartbeat-timeout"`
		// DiscoveryTimeout discovery HAKeeper service timeout. Default is 30s
		DiscoveryTimeout toml.Duration `toml:"hakeeper-discovery-timeout"`
		// ClientConfig hakeeper client configuration
		ClientConfig logservice.HAKeeperClientConfig
	}

	// LogService log service configuration
	LogService struct {
		// ConnectTimeout timeout for connect to logservice. Default is 30s.
		ConnectTimeout toml.Duration `toml:"connect-timeout"`
	}

	// RPC configuration
	RPC rpc.Config `toml:"rpc"`

	Ckp struct {
		FlushInterval          toml.Duration `toml:"flush-interval"`
		ScanInterval           toml.Duration `toml:"scan-interval"`
		MinCount               int64         `toml:"min-count"`
		IncrementalInterval    toml.Duration `toml:"incremental-interval"`
		GlobalMinCount         int64         `toml:"global-min-count"`
		ReservedWALEntryCount  uint64        `toml:"reserved-WAL-entry-count"`
		OverallFlushMemControl uint64        `toml:"overall-flush-mem-control"`
	}

	GCCfg struct {
		GCTTL          toml.Duration `toml:"gc-ttl"`
		ScanGCInterval toml.Duration `toml:"scan-gc-interval"`
		DisableGC      bool          `toml:"disable-gc"`
	}

	Merge struct {
		CNTakeOverAll    bool          `toml:"offload-all"`
		CNStandaloneTake bool          `toml:"offload-when-standalone"`
		CNTakeOverExceed toml.ByteSize `toml:"offload-exceed"`
		CNMergeMemHint   toml.ByteSize `toml:"offload-mem-hint"`
	}

	LogtailServer struct {
		ListenAddress              string        `toml:"listen-address"`
		ServiceAddress             string        `toml:"service-address"`
		RpcMaxMessageSize          toml.ByteSize `toml:"rpc-max-message-size"`
		RpcEnableChecksum          bool          `toml:"rpc-enable-checksum" user_setting:"advanced"`
		LogtailRPCStreamPoisonTime toml.Duration `toml:"logtail-rpc-stream-poison-time"`
		LogtailCollectInterval     toml.Duration `toml:"logtail-collect-interval"`
		LogtailResponseSendTimeout toml.Duration `toml:"logtail-response-send-timeout"`
	}

	// Txn transactions configuration
	Txn struct {
		// ZombieTimeout A transaction timeout, if an active transaction has not operated for more
		// than the specified time, it will be considered a zombie transaction and the backend will
		// roll back the transaction.
		ZombieTimeout toml.Duration `toml:"zombie-timeout"`

		// Mode. [Optimistic|Pessimistic], default Pessimistic.
		Mode string `toml:"mode"`

		// If IncrementalDedup is 'true', it will enable the incremental dedup feature.
		// If incremental dedup feature is disable,
		// If empty, it will set 'false' when CN.Txn.Mode is optimistic,  set 'true' when CN.Txn.Mode is pessimistic
		// IncrementalDedup will be treated as FullSkipWorkspaceDedup.
		IncrementalDedup string `toml:"incremental-dedup"`

		// Storage txn storage config
		Storage struct {

			// Backend txn storage backend implementation. [TAE|Mem], default TAE.
			Backend StorageType `toml:"backend"`
			// contains filtered or unexported fields
		}
	}

	// Cluster configuration
	Cluster struct {
		// RefreshInterval refresh cluster info from hakeeper interval
		RefreshInterval toml.Duration `toml:"refresh-interval"`
	}

	// LockService lockservice config
	LockService lockservice.Config `toml:"lockservice"`

	// IsStandalone indicates whether the tn is in standalone cluster not an independent process.
	// For the tn does not boost an independent queryservice in standalone mode.
	// cn,tn shares the same queryservice in standalone mode.
	// Under distributed deploy mode, cn,tn are independent os process.
	// they have their own queryservice.
	InStandalone bool
}

Config tn store configuration

func (*Config) SetDefaultValue added in v1.1.0

func (c *Config) SetDefaultValue()

func (*Config) Validate

func (c *Config) Validate() error

type Option

type Option func(*store)

Option store option

func WithBackendFilter

func WithBackendFilter(filter func(morpc.Message, string) bool) Option

WithBackendFilter set filtering txn.TxnRequest sent to other DNShard

func WithConfigAdjust

func WithConfigAdjust(adjustConfigFunc func(c *Config)) Option

WithConfigAdjust set adjust config func

func WithConfigData added in v1.1.0

func WithConfigData(data map[string]*logservicepb.ConfigItem) Option

WithConfigData saves the data from the config file

func WithHAKeeperClientFactory

func WithHAKeeperClientFactory(factory func() (logservice.TNHAKeeperClient, error)) Option

WithHAKeeperClientFactory set hakeeper client factory

func WithLogServiceClientFactory

func WithLogServiceClientFactory(factory func(metadata.TNShard) (logservice.Client, error)) Option

WithLogServiceClientFactory set log service client factory

func WithTaskStorageFactory

func WithTaskStorageFactory(factory taskservice.TaskStorageFactory) Option

WithTaskStorageFactory setup the special task strorage factory

type PortSlot

type PortSlot int
const (
	TxnService PortSlot = iota
	LogtailService
	LockService
	QueryService
	MaxService
)

New service should add before the last one.

func (PortSlot) String

func (s PortSlot) String() string

String implements the fmt.Stringer interface.

type Service

type Service interface {
	// Start start tn store. Start all DNShards currently managed by the Store and listen
	// to and process requests from CN and other DNs.
	Start() error
	// Close close tn store
	Close() error

	// StartTNReplica start the DNShard replica
	StartTNReplica(metadata.TNShard) error
	// CloseTNReplica close the DNShard replica.
	CloseTNReplica(shard metadata.TNShard) error

	// GetTaskService returns taskservice
	GetTaskService() (taskservice.TaskService, bool)
}

Service TN Service

func NewService

func NewService(
	cfg *Config,
	rt runtime.Runtime,
	fileService fileservice.FileService,
	shutdownC chan struct{},
	opts ...Option) (Service, error)

NewService create TN Service

type StorageType

type StorageType string

StorageType txn storage type

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL