logservice

package
v0.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 30, 2023 License: Apache-2.0 Imports: 51 Imported by: 2

README

About

Log Service provides reliable logging for MatrixOne.

Documentation

Overview

Package logservice implement MO's LogService component.

Index

Constants

View Source
const (
	LogServiceRPCName = "logservice-rpc"
)

Variables

This section is empty.

Functions

func GetBackendOptions added in v0.6.0

func GetBackendOptions(ctx context.Context) []morpc.BackendOption

func GetClientOptions added in v0.6.0

func GetClientOptions(ctx context.Context) []morpc.ClientOption

func IsTempError

func IsTempError(err error) bool

IsTempError returns a boolean value indicating whether the specified error is a temp error that worth to be retried, e.g. timeouts, temp network issues. Non-temp error caused by program logics rather than some external factors.

func MustMarshal

func MustMarshal(m Marshaller) []byte

func MustUnmarshal

func MustUnmarshal(m Unmarshaler, data []byte)

func NewTestService

func NewTestService(fs vfs.FS) (*Service, ClientConfig, error)

func SetBackendOptions added in v0.6.0

func SetBackendOptions(ctx context.Context, opts ...morpc.BackendOption) context.Context

func SetClientOptions added in v0.6.0

func SetClientOptions(ctx context.Context, opts ...morpc.ClientOption) context.Context

Types

type CNHAKeeperClient added in v0.5.1

type CNHAKeeperClient interface {

	// SendCNHeartbeat sends the specified heartbeat message to the HAKeeper.
	SendCNHeartbeat(ctx context.Context, hb pb.CNStoreHeartbeat) (pb.CommandBatch, error)
	// contains filtered or unexported methods
}

CNHAKeeperClient is the HAKeeper client used by a CN store.

func NewCNHAKeeperClient added in v0.5.1

func NewCNHAKeeperClient(ctx context.Context,
	cfg HAKeeperClientConfig) (CNHAKeeperClient, error)

NewCNHAKeeperClient creates a HAKeeper client to be used by a CN node.

NB: caller could specify options for morpc.Client via ctx.

type Client

type Client interface {
	// Close closes the client.
	Close() error
	// Config returns the specified configuration when creating the client.
	Config() ClientConfig
	// GetLogRecord returns a new LogRecord instance with its Data field enough
	// to hold payloadLength bytes of payload. The layout of the Data field is
	// 4 bytes of record type (pb.UserEntryUpdate) + 8 bytes DN replica ID +
	// payloadLength bytes of actual payload.
	GetLogRecord(payloadLength int) pb.LogRecord
	// Append appends the specified LogRecord into the Log Service. On success, the
	// assigned Lsn will be returned. For the specified LogRecord, only its Data
	// field is used with all other fields ignored by Append(). Once returned, the
	// pb.LogRecord can be reused.
	Append(ctx context.Context, rec pb.LogRecord) (Lsn, error)
	// Read reads the Log Service from the specified Lsn position until the
	// returned LogRecord set reaches the specified maxSize in bytes. The returned
	// Lsn indicates the next Lsn to use to resume the read, or it means
	// everything available has been read when it equals to the specified Lsn.
	// The returned pb.LogRecord records will have their Lsn and Type fields set,
	// the Lsn field is the Lsn assigned to the record while the Type field tells
	// whether the record is an internal record generated by the Log Service itself
	// or appended by the user.
	Read(ctx context.Context, firstLsn Lsn, maxSize uint64) ([]pb.LogRecord, Lsn, error)
	// Truncate truncates the Log Service log at the specified Lsn with Lsn
	// itself included. This allows the Log Service to free up storage capacities
	// for future appends, all future reads must start after the specified Lsn
	// position.
	Truncate(ctx context.Context, lsn Lsn) error
	// GetTruncatedLsn returns the largest Lsn value that has been specified for
	// truncation.
	GetTruncatedLsn(ctx context.Context) (Lsn, error)
	// GetTSOTimestamp requests a total of count unique timestamps from the TSO and
	// return the first assigned such timestamp, that is TSO timestamps
	// [returned value, returned value + count] will be owned by the caller.
	GetTSOTimestamp(ctx context.Context, count uint64) (uint64, error)
}

Client is the Log Service Client interface exposed to the DN.

func NewClient

func NewClient(ctx context.Context, cfg ClientConfig) (Client, error)

NewClient creates a Log Service client. Each returned client can be used to synchronously issue requests to the Log Service. To send multiple requests to the Log Service in parallel, multiple clients should be created and used to do so.

type ClientConfig

type ClientConfig struct {
	// Tag client tag
	Tag string
	// ReadOnly indicates whether this is a read-only client.
	ReadOnly bool
	// LogShardID is the shard ID of the log service shard to be used.
	LogShardID uint64
	// DNReplicaID is the replica ID of the DN that owns the created client.
	DNReplicaID uint64
	// DiscoveryAddress is the Log Service discovery address provided by k8s.
	DiscoveryAddress string
	// LogService nodes service addresses. This field is provided for testing
	// purposes only.
	ServiceAddresses []string
	// MaxMessageSize is the max message size for RPC.
	MaxMessageSize int
	// EnableCompress enable compress
	EnableCompress bool
}

ClientConfig is the configuration for log service clients.

func (*ClientConfig) Validate added in v0.6.0

func (c *ClientConfig) Validate() error

Validate validates the ClientConfig.

type ClientFactory added in v0.6.0

type ClientFactory func() (Client, error)

type ClusterHAKeeperClient added in v0.8.0

type ClusterHAKeeperClient interface {
	// contains filtered or unexported methods
}

ClusterHAKeeperClient used to get cluster detail

type Config

type Config struct {
	// FS is the underlying virtual FS used by the log service. Leave it as empty
	// in production.
	FS vfs.FS
	// DeploymentID is basically the Cluster ID, nodes with different DeploymentID
	// will not be able to communicate via raft.
	DeploymentID uint64 `toml:"deployment-id"`
	// UUID is the UUID of the log service node. UUID value must be set.
	UUID string `toml:"uuid"`
	// RTTMillisecond is the average round trip time between log service nodes in
	// milliseconds.
	RTTMillisecond uint64 `toml:"rttmillisecond"`
	// DataDir is the name of the directory for storing all log service data. It
	// should a locally mounted partition with good write and fsync performance.
	DataDir string `toml:"data-dir"`
	// SnapshotExportDir is the directory where the dragonboat snapshots are
	// exported.
	SnapshotExportDir string `toml:"snapshot-export-dir"`
	// MaxExportedSnapshot is the max count of exported snapshots. If there are
	// already MaxExportedSnapshot exported snapshots, no exported snapshot will
	// be generated.
	MaxExportedSnapshot int `toml:"max-exported-snapshot"`
	// ServiceAddress is log service's service address that can be reached by
	// other nodes such as DN nodes.
	ServiceAddress string `toml:"logservice-address"`
	// ServiceListenAddress is the local listen address of the ServiceAddress.
	ServiceListenAddress string `toml:"logservice-listen-address"`
	// RaftAddress is the address that can be reached by other log service nodes
	// via their raft layer.
	RaftAddress string `toml:"raft-address"`
	// RaftListenAddress is the local listen address of the RaftAddress.
	RaftListenAddress string `toml:"raft-listen-address"`
	// UseTeeLogDB enables the log service to use tee based LogDB which is backed
	// by both a pebble and a tan based LogDB. This field should only be set to
	// true during testing.
	UseTeeLogDB bool `toml:"use-tee-logdb"`
	// LogDBBufferSize is the size of the logdb buffer in bytes.
	LogDBBufferSize uint64 `toml:"logdb-buffer-size"`
	// GossipAddress is the address used for accepting gossip communication.
	GossipAddress string `toml:"gossip-address"`
	// GossipAddressV2 is the address used for accepting gossip communication.
	// This is for domain name support.
	GossipAddressV2 string `toml:"gossip-address-v2"`
	// GossipListenAddress is the local listen address of the GossipAddress
	GossipListenAddress string `toml:"gossip-listen-address"`
	// GossipSeedAddresses is list of seed addresses that are used for
	// introducing the local node into the gossip network.
	GossipSeedAddresses []string `toml:"gossip-seed-addresses"`
	// GossipProbeInterval how often gossip nodes probe each other.
	GossipProbeInterval toml.Duration `toml:"gossip-probe-interval"`
	// GossipAllowSelfAsSeed allow use self as gossip seed
	GossipAllowSelfAsSeed bool `toml:"gossip-allow-self-as-seed"`
	// HeartbeatInterval is the interval of how often log service node should be
	// sending heartbeat message to the HAKeeper.
	HeartbeatInterval toml.Duration `toml:"logservice-heartbeat-interval"`
	// HAKeeperTickInterval is the interval of how often log service node should
	// tick the HAKeeper.
	HAKeeperTickInterval toml.Duration `toml:"hakeeper-tick-interval"`
	// HAKeeperCheckInterval is the interval of how often HAKeeper should run
	// cluster health checks.
	HAKeeperCheckInterval toml.Duration `toml:"hakeeper-check-interval"`
	// TruncateInterval is the interval of how often log service should
	// process truncate.
	TruncateInterval toml.Duration `toml:"truncate-interval"`

	RPC struct {
		// MaxMessageSize is the max size for RPC message. The default value is 10MiB.
		MaxMessageSize toml.ByteSize `toml:"max-message-size"`
		// EnableCompress enable compress
		EnableCompress bool `toml:"enable-compress"`
	}

	// BootstrapConfig is the configuration specified for the bootstrapping
	// procedure. It only needs to be specified for Log Stores selected to host
	// initial HAKeeper replicas during bootstrapping.
	BootstrapConfig struct {
		// BootstrapCluster indicates whether the cluster should be bootstrapped.
		// Note the bootstrapping procedure will only be executed if BootstrapCluster
		// is true and Config.UUID is found in Config.BootstrapConfig.InitHAKeeperMembers.
		BootstrapCluster bool `toml:"bootstrap-cluster"`
		// NumOfLogShards defines the number of Log shards in the initial deployment.
		NumOfLogShards uint64 `toml:"num-of-log-shards"`
		// NumOfDNShards defines the number of DN shards in the initial deployment.
		// The count must be the same as NumOfLogShards in the current implementation.
		NumOfDNShards uint64 `toml:"num-of-dn-shards"`
		// NumOfLogShardReplicas is the number of replicas for each shard managed by
		// Log Stores, including Log Service shards and the HAKeeper.
		NumOfLogShardReplicas uint64 `toml:"num-of-log-shard-replicas"`
		// InitHAKeeperMembers defines the initial members of the HAKeeper as a list
		// of HAKeeper replicaID and UUID pairs. For example,
		// when the initial HAKeeper members are
		// replica with replica ID 101 running on Log Store uuid1
		// replica with replica ID 102 running on Log Store uuid2
		// replica with replica ID 103 running on Log Store uuid3
		// the InitHAKeeperMembers string value should be
		// []string{"101:uuid1", "102:uuid2", "103:uuid3"}
		// Note that these initial HAKeeper replica IDs must be assigned by k8s
		// from the range [K8SIDRangeStart, K8SIDRangeEnd) as defined in pkg/hakeeper.
		// All uuid values are assigned by k8s, they are used to uniquely identify
		// CN/DN/Log stores.
		// Config.UUID and Config.BootstrapConfig values are considered together to
		// figure out what is the replica ID of the initial HAKeeper replica. That
		// is when Config.UUID is found in InitHAKeeperMembers, then the corresponding
		// replica ID value will be used to launch a HAKeeper replica on the Log
		// Service instance.
		InitHAKeeperMembers []string `toml:"init-hakeeper-members"`
	}

	HAKeeperConfig struct {
		// TickPerSecond indicates how many ticks every second.
		// In HAKeeper, we do not use actual time to measure time elapse.
		// Instead, we use ticks.
		TickPerSecond int `toml:"tick-per-second"`
		// LogStoreTimeout is the actual time limit between a log store's heartbeat.
		// If HAKeeper does not receive two heartbeat within LogStoreTimeout,
		// it regards the log store as down.
		LogStoreTimeout toml.Duration `toml:"log-store-timeout"`
		// DNStoreTimeout is the actual time limit between a dn store's heartbeat.
		// If HAKeeper does not receive two heartbeat within DNStoreTimeout,
		// it regards the dn store as down.
		DNStoreTimeout toml.Duration `toml:"dn-store-timeout"`
		// CNStoreTimeout is the actual time limit between a cn store's heartbeat.
		// If HAKeeper does not receive two heartbeat within CNStoreTimeout,
		// it regards the dn store as down.
		CNStoreTimeout toml.Duration `toml:"cn-store-timeout"`
	}

	// HAKeeperClientConfig is the config for HAKeeperClient
	HAKeeperClientConfig HAKeeperClientConfig

	// DisableWorkers disables the HAKeeper ticker and HAKeeper client in tests.
	// Never set this field to true in production
	DisableWorkers bool

	Ctl struct {
		// ListenAddress ctl service listen address for receiving ctl requests
		ListenAddress string `toml:"listen-address"`
		// ServiceAddress service address for communication, if this address is not set, use
		// ListenAddress as the communication address.
		ServiceAddress string `toml:"service-address"`
	} `toml:"ctl"`
}

Config defines the Configurations supported by the Log Service.

func (*Config) Bootstrapping added in v0.6.0

func (c *Config) Bootstrapping() (uint64, bool)

returns replica ID of the HAKeeper replica and a boolean indicating whether we should run the bootstrap procedure.

func (*Config) Fill

func (c *Config) Fill()

func (*Config) GetHAKeeperClientConfig added in v0.6.0

func (c *Config) GetHAKeeperClientConfig() HAKeeperClientConfig

func (*Config) GetHAKeeperConfig added in v0.6.0

func (c *Config) GetHAKeeperConfig() hakeeper.Config

func (*Config) GetInitHAKeeperMembers added in v0.6.0

func (c *Config) GetInitHAKeeperMembers() (map[uint64]dragonboat.Target, error)

func (*Config) Validate

func (c *Config) Validate() error

Validate validates the configuration.

type ContextKey added in v0.6.0

type ContextKey string
const (
	BackendOption ContextKey = "morpc.BackendOption"
	ClientOption  ContextKey = "morpc.ClientOption"
)

type DNHAKeeperClient added in v0.5.1

type DNHAKeeperClient interface {

	// SendDNHeartbeat sends the specified heartbeat message to the HAKeeper. The
	// returned CommandBatch contains Schedule Commands to be executed by the local
	// DN store.
	SendDNHeartbeat(ctx context.Context, hb pb.DNStoreHeartbeat) (pb.CommandBatch, error)
	// contains filtered or unexported methods
}

DNHAKeeperClient is the HAKeeper client used by a DN store.

func NewDNHAKeeperClient added in v0.5.1

func NewDNHAKeeperClient(ctx context.Context,
	cfg HAKeeperClientConfig) (DNHAKeeperClient, error)

NewDNHAKeeperClient creates a HAKeeper client to be used by a DN node.

NB: caller could specify options for morpc.Client via ctx.

type HAKeeperClientConfig added in v0.5.1

type HAKeeperClientConfig struct {
	// DiscoveryAddress is the Log Service discovery address provided by k8s.
	DiscoveryAddress string `toml:"discovery-address"`
	// ServiceAddresses is a list of well known Log Services' service addresses.
	ServiceAddresses []string `toml:"service-addresses"`
	// AllocateIDBatch how many IDs are assigned from hakeeper each time. Default is
	// 100.
	AllocateIDBatch uint64 `toml:"allocate-id-batch"`
	// EnableCompress enable compress
	EnableCompress bool `toml:"enable-compress"`
}

HAKeeperClientConfig is the config for HAKeeper clients.

func (*HAKeeperClientConfig) Validate added in v0.6.0

func (c *HAKeeperClientConfig) Validate() error

Validate validates the HAKeeperClientConfig.

type ISnapshotItem added in v0.6.0

type ISnapshotItem interface {
	// Exists returns if the snapshot item exists.
	Exists() bool
	// Remove removes the snapshot item.
	Remove() error
	// Valid check the legality of the snapshot item. Only the names of
	// the files in it are checked.
	Valid() (bool, error)
}

ISnapshotItem is an interface that represents a snapshot item.

type ISnapshotManager added in v0.6.0

type ISnapshotManager interface {
	// Init initialize snapshots by loading exported snapshots.
	Init(shardID uint64, replicaID uint64) error
	// Count returns the number of snapshots in the manager.
	Count(shardID uint64, replicaID uint64) int
	// Add adds a new snapshot for specified shard.
	Add(shardID uint64, replicaID uint64, index uint64) error
	// Remove removes snapshots whose index is LE than index.
	Remove(shardID uint64, replicaID uint64, index uint64) error
	// EvalImportSnapshot returns the source directory and index of
	// the biggest snapshot of the shard.
	EvalImportSnapshot(shardID uint64, replicaID uint64, index uint64) (string, uint64)
}

ISnapshotManager is an interface that managers snapshots.

type LogHAKeeperClient added in v0.5.1

type LogHAKeeperClient interface {

	// SendLogHeartbeat sends the specified heartbeat message to the HAKeeper. The
	// returned CommandBatch contains Schedule Commands to be executed by the local
	// Log store.
	SendLogHeartbeat(ctx context.Context, hb pb.LogStoreHeartbeat) (pb.CommandBatch, error)
	// contains filtered or unexported methods
}

LogHAKeeperClient is the HAKeeper client used by a Log store.

func NewLogHAKeeperClient added in v0.5.1

func NewLogHAKeeperClient(ctx context.Context,
	cfg HAKeeperClientConfig) (LogHAKeeperClient, error)

NewLogHAKeeperClient creates a HAKeeper client to be used by a Log Service node.

NB: caller could specify options for morpc.Client via ctx.

type LogRecord

type LogRecord = pb.LogRecord

type Lsn

type Lsn = uint64

type Marshaller added in v0.6.0

type Marshaller interface {
	Marshal() ([]byte, error)
}

type Option added in v0.6.0

type Option func(*Service)

Option is utility that sets callback for Service.

func WithBackendFilter added in v0.6.0

func WithBackendFilter(filter func(morpc.Message, string) bool) Option

WithBackendFilter sets filter via which could select remote backend.

func WithRuntime added in v0.7.0

func WithRuntime(runtime runtime.Runtime) Option

WithRuntime sets runtime

func WithTaskStorageFactory added in v0.6.0

func WithTaskStorageFactory(factory taskservice.TaskStorageFactory) Option

WithTaskStorageFactory set up the special task storage factory

type ProxyHAKeeperClient added in v0.8.0

type ProxyHAKeeperClient interface {

	// GetCNState gets CN state from HAKeeper.
	GetCNState(ctx context.Context) (pb.CNState, error)
	// UpdateCNLabel updates the labels of CN.
	UpdateCNLabel(ctx context.Context, label pb.CNStoreLabel) error
	// contains filtered or unexported methods
}

ProxyHAKeeperClient is the HAKeeper client used by proxy service.

func NewProxyHAKeeperClient added in v0.8.0

func NewProxyHAKeeperClient(ctx context.Context,
	cfg HAKeeperClientConfig) (ProxyHAKeeperClient, error)

NewProxyHAKeeperClient creates a HAKeeper client to be used by a proxy service.

NB: caller could specify options for morpc.Client via ctx.

type RPCRequest

type RPCRequest struct {
	pb.Request
	// contains filtered or unexported fields
}

RPCRequest is request message type used in morpc

func (*RPCRequest) DebugString

func (r *RPCRequest) DebugString() string

func (*RPCRequest) GetID

func (r *RPCRequest) GetID() uint64

func (*RPCRequest) GetPayloadField

func (r *RPCRequest) GetPayloadField() []byte

func (*RPCRequest) Release

func (r *RPCRequest) Release()

func (*RPCRequest) SetID

func (r *RPCRequest) SetID(id uint64)

func (*RPCRequest) SetPayloadField

func (r *RPCRequest) SetPayloadField(data []byte)

func (*RPCRequest) Size added in v0.7.0

func (r *RPCRequest) Size() int

type RPCResponse

type RPCResponse struct {
	pb.Response
	// contains filtered or unexported fields
}

RPCResponse is response message type used in morpc

func (*RPCResponse) DebugString

func (r *RPCResponse) DebugString() string

func (*RPCResponse) GetID

func (r *RPCResponse) GetID() uint64

func (*RPCResponse) GetPayloadField

func (r *RPCResponse) GetPayloadField() []byte

func (*RPCResponse) Release

func (r *RPCResponse) Release()

func (*RPCResponse) SetID

func (r *RPCResponse) SetID(id uint64)

func (*RPCResponse) SetPayloadField

func (r *RPCResponse) SetPayloadField(data []byte)

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service is the top layer component of a log service node. It manages the underlying log store which in turn manages all log shards including the HAKeeper shard. The Log Service component communicates with LogService clients owned by DN nodes and the HAKeeper service via network, it can be considered as the interface layer of the LogService.

func NewService

func NewService(
	cfg Config,
	fileService fileservice.FileService,
	opts ...Option,
) (*Service, error)

func (*Service) BootstrapHAKeeper added in v0.6.0

func (s *Service) BootstrapHAKeeper(ctx context.Context, cfg Config) error

func (*Service) Close

func (s *Service) Close() (err error)

func (*Service) ID

func (s *Service) ID() string

func (*Service) Start added in v0.6.0

func (s *Service) Start() error

type ShardInfo

type ShardInfo struct {
	// ReplicaID is the ID of the replica recommended to use
	ReplicaID uint64
	// Replicas is a map of replica ID to their service addresses
	Replicas map[uint64]string
}

func GetShardInfo added in v0.6.0

func GetShardInfo(address string, shardID uint64) (ShardInfo, bool, error)

GetShardInfo is to be invoked when querying ShardInfo on a Log Service node. address is usually the reverse proxy that randomly redirect the request to a known Log Service node.

type Unmarshaler

type Unmarshaler interface {
	Unmarshal([]byte) error
}

type WrappedService added in v0.6.0

type WrappedService struct {
	// contains filtered or unexported fields
}

func NewWrappedService added in v0.6.0

func NewWrappedService(
	c Config,
	fileService fileservice.FileService,
	opts ...Option,
) (*WrappedService, error)

func (*WrappedService) Close added in v0.6.0

func (w *WrappedService) Close() error

func (*WrappedService) CreateInitTasks added in v0.6.0

func (w *WrappedService) CreateInitTasks() error

func (*WrappedService) GetClusterState added in v0.6.0

func (w *WrappedService) GetClusterState() (*pb.CheckerState, error)

func (*WrappedService) GetTaskService added in v0.6.0

func (w *WrappedService) GetTaskService() (taskservice.TaskService, bool)

func (*WrappedService) ID added in v0.6.0

func (w *WrappedService) ID() string

func (*WrappedService) IsLeaderHakeeper added in v0.6.0

func (w *WrappedService) IsLeaderHakeeper() (bool, error)

func (*WrappedService) SetInitialClusterInfo added in v0.6.0

func (w *WrappedService) SetInitialClusterInfo(
	logShardNum, dnShardNum, logReplicaNum uint64,
) error

func (*WrappedService) Start added in v0.6.0

func (w *WrappedService) Start() error

func (*WrappedService) StartHAKeeperReplica added in v0.6.0

func (w *WrappedService) StartHAKeeperReplica(
	replicaID uint64, replicas map[uint64]dragonboat.Target, join bool,
) error

StartHAKeeperReplica TODO: start hakeeper with specified log store, specified by caller

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL