Documentation ¶
Overview ¶
Package logservice implement MO's LogService component.
Index ¶
- Constants
- func GetBackendOptions(ctx context.Context) []morpc.BackendOption
- func GetClientOptions(ctx context.Context) []morpc.ClientOption
- func IsTempError(err error) bool
- func MustMarshal(m Marshaller) []byte
- func MustUnmarshal(m Unmarshaler, data []byte)
- func NewTestService(fs vfs.FS) (*Service, ClientConfig, error)
- func SetBackendOptions(ctx context.Context, opts ...morpc.BackendOption) context.Context
- func SetClientOptions(ctx context.Context, opts ...morpc.ClientOption) context.Context
- type CNHAKeeperClient
- type Client
- type ClientConfig
- type ClientFactory
- type Config
- func (c *Config) Bootstrapping() (uint64, bool)
- func (c *Config) Fill()
- func (c *Config) GetHAKeeperClientConfig() HAKeeperClientConfig
- func (c *Config) GetHAKeeperConfig() hakeeper.Config
- func (c *Config) GetInitHAKeeperMembers() (map[uint64]dragonboat.Target, error)
- func (c *Config) Validate() error
- type ContextKey
- type DNHAKeeperClient
- type HAKeeperClientConfig
- type ISnapshotItem
- type ISnapshotManager
- type LogHAKeeperClient
- type LogRecord
- type Lsn
- type Marshaller
- type Option
- type RPCRequest
- type RPCResponse
- type Service
- type ShardInfo
- type Unmarshaler
- type WrappedService
- func (w *WrappedService) Close() error
- func (w *WrappedService) CreateInitTasks() error
- func (w *WrappedService) GetClusterState() (*pb.CheckerState, error)
- func (w *WrappedService) GetTaskService() (taskservice.TaskService, bool)
- func (w *WrappedService) ID() string
- func (w *WrappedService) IsLeaderHakeeper() (bool, error)
- func (w *WrappedService) SetInitialClusterInfo(logShardNum, dnShardNum, logReplicaNum uint64) error
- func (w *WrappedService) Start() error
- func (w *WrappedService) StartHAKeeperReplica(replicaID uint64, replicas map[uint64]dragonboat.Target, join bool) error
Constants ¶
const (
LogServiceRPCName = "logservice-rpc"
)
Variables ¶
This section is empty.
Functions ¶
func GetBackendOptions ¶ added in v0.6.0
func GetBackendOptions(ctx context.Context) []morpc.BackendOption
func GetClientOptions ¶ added in v0.6.0
func GetClientOptions(ctx context.Context) []morpc.ClientOption
func IsTempError ¶
IsTempError returns a boolean value indicating whether the specified error is a temp error that worth to be retried, e.g. timeouts, temp network issues. Non-temp error caused by program logics rather than some external factors.
func MustMarshal ¶
func MustMarshal(m Marshaller) []byte
func MustUnmarshal ¶
func MustUnmarshal(m Unmarshaler, data []byte)
func NewTestService ¶
func NewTestService(fs vfs.FS) (*Service, ClientConfig, error)
func SetBackendOptions ¶ added in v0.6.0
func SetClientOptions ¶ added in v0.6.0
Types ¶
type CNHAKeeperClient ¶ added in v0.5.1
type CNHAKeeperClient interface { // SendCNHeartbeat sends the specified heartbeat message to the HAKeeper. SendCNHeartbeat(ctx context.Context, hb pb.CNStoreHeartbeat) (pb.CommandBatch, error) // contains filtered or unexported methods }
CNHAKeeperClient is the HAKeeper client used by a CN store.
func NewCNHAKeeperClient ¶ added in v0.5.1
func NewCNHAKeeperClient(ctx context.Context, cfg HAKeeperClientConfig) (CNHAKeeperClient, error)
NewCNHAKeeperClient creates a HAKeeper client to be used by a CN node.
NB: caller could specify options for morpc.Client via ctx.
type Client ¶
type Client interface { // Close closes the client. Close() error // Config returns the specified configuration when creating the client. Config() ClientConfig // GetLogRecord returns a new LogRecord instance with its Data field enough // to hold payloadLength bytes of payload. The layout of the Data field is // 4 bytes of record type (pb.UserEntryUpdate) + 8 bytes DN replica ID + // payloadLength bytes of actual payload. GetLogRecord(payloadLength int) pb.LogRecord // Append appends the specified LogRecord into the Log Service. On success, the // assigned Lsn will be returned. For the specified LogRecord, only its Data // field is used with all other fields ignored by Append(). Once returned, the // pb.LogRecord can be reused. Append(ctx context.Context, rec pb.LogRecord) (Lsn, error) // Read reads the Log Service from the specified Lsn position until the // returned LogRecord set reaches the specified maxSize in bytes. The returned // Lsn indicates the next Lsn to use to resume the read, or it means // everything available has been read when it equals to the specified Lsn. // The returned pb.LogRecord records will have their Lsn and Type fields set, // the Lsn field is the Lsn assigned to the record while the Type field tells // whether the record is an internal record generated by the Log Service itself // or appended by the user. Read(ctx context.Context, firstLsn Lsn, maxSize uint64) ([]pb.LogRecord, Lsn, error) // Truncate truncates the Log Service log at the specified Lsn with Lsn // itself included. This allows the Log Service to free up storage capacities // for future appends, all future reads must start after the specified Lsn // position. Truncate(ctx context.Context, lsn Lsn) error // GetTruncatedLsn returns the largest Lsn value that has been specified for // truncation. GetTruncatedLsn(ctx context.Context) (Lsn, error) // GetTSOTimestamp requests a total of count unique timestamps from the TSO and // return the first assigned such timestamp, that is TSO timestamps // [returned value, returned value + count] will be owned by the caller. GetTSOTimestamp(ctx context.Context, count uint64) (uint64, error) }
Client is the Log Service Client interface exposed to the DN.
func NewClient ¶
func NewClient(ctx context.Context, cfg ClientConfig) (Client, error)
NewClient creates a Log Service client. Each returned client can be used to synchronously issue requests to the Log Service. To send multiple requests to the Log Service in parallel, multiple clients should be created and used to do so.
type ClientConfig ¶
type ClientConfig struct { // Tag client tag Tag string // ReadOnly indicates whether this is a read-only client. ReadOnly bool // LogShardID is the shard ID of the log service shard to be used. LogShardID uint64 // DNReplicaID is the replica ID of the DN that owns the created client. DNReplicaID uint64 // DiscoveryAddress is the Log Service discovery address provided by k8s. DiscoveryAddress string // LogService nodes service addresses. This field is provided for testing // purposes only. ServiceAddresses []string // MaxMessageSize is the max message size for RPC. MaxMessageSize int // EnableCompress enable compress EnableCompress bool }
ClientConfig is the configuration for log service clients.
func (*ClientConfig) Validate ¶ added in v0.6.0
func (c *ClientConfig) Validate() error
Validate validates the ClientConfig.
type ClientFactory ¶ added in v0.6.0
type Config ¶
type Config struct { // FS is the underlying virtual FS used by the log service. Leave it as empty // in production. FS vfs.FS // DeploymentID is basically the Cluster ID, nodes with different DeploymentID // will not be able to communicate via raft. DeploymentID uint64 `toml:"deployment-id"` // UUID is the UUID of the log service node. UUID value must be set. UUID string `toml:"uuid"` // RTTMillisecond is the average round trip time between log service nodes in // milliseconds. RTTMillisecond uint64 `toml:"rttmillisecond"` // DataDir is the name of the directory for storing all log service data. It // should a locally mounted partition with good write and fsync performance. DataDir string `toml:"data-dir"` // SnapshotExportDir is the directory where the dragonboat snapshots are // exported. SnapshotExportDir string `toml:"snapshot-export-dir"` // MaxExportedSnapshot is the max count of exported snapshots. If there are // already MaxExportedSnapshot exported snapshots, no exported snapshot will // be generated. MaxExportedSnapshot int `toml:"max-exported-snapshot"` // ServiceAddress is log service's service address that can be reached by // other nodes such as DN nodes. ServiceAddress string `toml:"logservice-address"` // ServiceListenAddress is the local listen address of the ServiceAddress. ServiceListenAddress string `toml:"logservice-listen-address"` // RaftAddress is the address that can be reached by other log service nodes // via their raft layer. RaftAddress string `toml:"raft-address"` // RaftListenAddress is the local listen address of the RaftAddress. RaftListenAddress string `toml:"raft-listen-address"` // UseTeeLogDB enables the log service to use tee based LogDB which is backed // by both a pebble and a tan based LogDB. This field should only be set to // true during testing. UseTeeLogDB bool `toml:"use-tee-logdb"` // LogDBBufferSize is the size of the logdb buffer in bytes. LogDBBufferSize uint64 `toml:"logdb-buffer-size"` // GossipAddress is the address used for accepting gossip communication. GossipAddress string `toml:"gossip-address"` // GossipListenAddress is the local listen address of the GossipAddress GossipListenAddress string `toml:"gossip-listen-address"` // GossipSeedAddresses is list of seed addresses that are used for // introducing the local node into the gossip network. GossipSeedAddresses []string `toml:"gossip-seed-addresses"` // GossipProbeInterval how often gossip nodes probe each other. GossipProbeInterval toml.Duration `toml:"gossip-probe-interval"` // GossipAllowSelfAsSeed allow use self as gossip seed GossipAllowSelfAsSeed bool `toml:"gossip-allow-self-as-seed"` // HeartbeatInterval is the interval of how often log service node should be // sending heartbeat message to the HAKeeper. HeartbeatInterval toml.Duration `toml:"logservice-heartbeat-interval"` // HAKeeperTickInterval is the interval of how often log service node should // tick the HAKeeper. HAKeeperTickInterval toml.Duration `toml:"hakeeper-tick-interval"` // HAKeeperCheckInterval is the interval of how often HAKeeper should run // cluster health checks. HAKeeperCheckInterval toml.Duration `toml:"hakeeper-check-interval"` // TruncateInterval is the interval of how often log service should // process truncate. TruncateInterval toml.Duration `toml:"truncate-interval"` RPC struct { // MaxMessageSize is the max size for RPC message. The default value is 10MiB. MaxMessageSize toml.ByteSize `toml:"max-message-size"` // EnableCompress enable compress EnableCompress bool `toml:"enable-compress"` } // BootstrapConfig is the configuration specified for the bootstrapping // procedure. It only needs to be specified for Log Stores selected to host // initial HAKeeper replicas during bootstrapping. BootstrapConfig struct { // BootstrapCluster indicates whether the cluster should be bootstrapped. // Note the bootstrapping procedure will only be executed if BootstrapCluster // is true and Config.UUID is found in Config.BootstrapConfig.InitHAKeeperMembers. BootstrapCluster bool `toml:"bootstrap-cluster"` // NumOfLogShards defines the number of Log shards in the initial deployment. NumOfLogShards uint64 `toml:"num-of-log-shards"` // NumOfDNShards defines the number of DN shards in the initial deployment. // The count must be the same as NumOfLogShards in the current implementation. NumOfDNShards uint64 `toml:"num-of-dn-shards"` // NumOfLogShardReplicas is the number of replicas for each shard managed by // Log Stores, including Log Service shards and the HAKeeper. NumOfLogShardReplicas uint64 `toml:"num-of-log-shard-replicas"` // InitHAKeeperMembers defines the initial members of the HAKeeper as a list // of HAKeeper replicaID and UUID pairs. For example, // when the initial HAKeeper members are // replica with replica ID 101 running on Log Store uuid1 // replica with replica ID 102 running on Log Store uuid2 // replica with replica ID 103 running on Log Store uuid3 // the InitHAKeeperMembers string value should be // []string{"101:uuid1", "102:uuid2", "103:uuid3"} // Note that these initial HAKeeper replica IDs must be assigned by k8s // from the range [K8SIDRangeStart, K8SIDRangeEnd) as defined in pkg/hakeeper. // All uuid values are assigned by k8s, they are used to uniquely identify // CN/DN/Log stores. // Config.UUID and Config.BootstrapConfig values are considered together to // figure out what is the replica ID of the initial HAKeeper replica. That // is when Config.UUID is found in InitHAKeeperMembers, then the corresponding // replica ID value will be used to launch a HAKeeper replica on the Log // Service instance. InitHAKeeperMembers []string `toml:"init-hakeeper-members"` } HAKeeperConfig struct { // TickPerSecond indicates how many ticks every second. // In HAKeeper, we do not use actual time to measure time elapse. // Instead, we use ticks. TickPerSecond int `toml:"tick-per-second"` // LogStoreTimeout is the actual time limit between a log store's heartbeat. // If HAKeeper does not receive two heartbeat within LogStoreTimeout, // it regards the log store as down. LogStoreTimeout toml.Duration `toml:"log-store-timeout"` // DNStoreTimeout is the actual time limit between a dn store's heartbeat. // If HAKeeper does not receive two heartbeat within DNStoreTimeout, // it regards the dn store as down. DNStoreTimeout toml.Duration `toml:"dn-store-timeout"` // CNStoreTimeout is the actual time limit between a cn store's heartbeat. // If HAKeeper does not receive two heartbeat within CNStoreTimeout, // it regards the dn store as down. CNStoreTimeout toml.Duration `toml:"cn-store-timeout"` } // HAKeeperClientConfig is the config for HAKeeperClient HAKeeperClientConfig HAKeeperClientConfig // DisableWorkers disables the HAKeeper ticker and HAKeeper client in tests. // Never set this field to true in production DisableWorkers bool }
Config defines the Configurations supported by the Log Service.
func (*Config) Bootstrapping ¶ added in v0.6.0
returns replica ID of the HAKeeper replica and a boolean indicating whether we should run the bootstrap procedure.
func (*Config) GetHAKeeperClientConfig ¶ added in v0.6.0
func (c *Config) GetHAKeeperClientConfig() HAKeeperClientConfig
func (*Config) GetHAKeeperConfig ¶ added in v0.6.0
func (*Config) GetInitHAKeeperMembers ¶ added in v0.6.0
type ContextKey ¶ added in v0.6.0
type ContextKey string
const ( BackendOption ContextKey = "morpc.BackendOption" ClientOption ContextKey = "morpc.ClientOption" )
type DNHAKeeperClient ¶ added in v0.5.1
type DNHAKeeperClient interface { // SendDNHeartbeat sends the specified heartbeat message to the HAKeeper. The // returned CommandBatch contains Schedule Commands to be executed by the local // DN store. SendDNHeartbeat(ctx context.Context, hb pb.DNStoreHeartbeat) (pb.CommandBatch, error) // contains filtered or unexported methods }
DNHAKeeperClient is the HAKeeper client used by a DN store.
func NewDNHAKeeperClient ¶ added in v0.5.1
func NewDNHAKeeperClient(ctx context.Context, cfg HAKeeperClientConfig) (DNHAKeeperClient, error)
NewDNHAKeeperClient creates a HAKeeper client to be used by a DN node.
NB: caller could specify options for morpc.Client via ctx.
type HAKeeperClientConfig ¶ added in v0.5.1
type HAKeeperClientConfig struct { // DiscoveryAddress is the Log Service discovery address provided by k8s. DiscoveryAddress string `toml:"discovery-address"` // ServiceAddresses is a list of well known Log Services' service addresses. ServiceAddresses []string `toml:"service-addresses"` // AllocateIDBatch how many IDs are assigned from hakeeper each time. Default is // 100. AllocateIDBatch uint64 `toml:"allocate-id-batch"` // EnableCompress enable compress EnableCompress bool `toml:"enable-compress"` }
HAKeeperClientConfig is the config for HAKeeper clients.
func (*HAKeeperClientConfig) Validate ¶ added in v0.6.0
func (c *HAKeeperClientConfig) Validate() error
Validate validates the HAKeeperClientConfig.
type ISnapshotItem ¶ added in v0.6.0
type ISnapshotItem interface { // Exists returns if the snapshot item exists. Exists() bool // Remove removes the snapshot item. Remove() error // Valid check the legality of the snapshot item. Only the names of // the files in it are checked. Valid() (bool, error) }
ISnapshotItem is an interface that represents a snapshot item.
type ISnapshotManager ¶ added in v0.6.0
type ISnapshotManager interface { // Init initialize snapshots by loading exported snapshots. Init(shardID uint64, replicaID uint64) error // Count returns the number of snapshots in the manager. Count(shardID uint64, replicaID uint64) int // Add adds a new snapshot for specified shard. Add(shardID uint64, replicaID uint64, index uint64) error // Remove removes snapshots whose index is LE than index. Remove(shardID uint64, replicaID uint64, index uint64) error // EvalImportSnapshot returns the source directory and index of // the biggest snapshot of the shard. EvalImportSnapshot(shardID uint64, replicaID uint64, index uint64) (string, uint64) }
ISnapshotManager is an interface that managers snapshots.
type LogHAKeeperClient ¶ added in v0.5.1
type LogHAKeeperClient interface { // SendLogHeartbeat sends the specified heartbeat message to the HAKeeper. The // returned CommandBatch contains Schedule Commands to be executed by the local // Log store. SendLogHeartbeat(ctx context.Context, hb pb.LogStoreHeartbeat) (pb.CommandBatch, error) // contains filtered or unexported methods }
LogHAKeeperClient is the HAKeeper client used by a Log store.
func NewLogHAKeeperClient ¶ added in v0.5.1
func NewLogHAKeeperClient(ctx context.Context, cfg HAKeeperClientConfig) (LogHAKeeperClient, error)
NewLogHAKeeperClient creates a HAKeeper client to be used by a Log Service node.
NB: caller could specify options for morpc.Client via ctx.
type Marshaller ¶ added in v0.6.0
type Option ¶ added in v0.6.0
type Option func(*Service)
Option is utility that sets callback for Service.
func WithBackendFilter ¶ added in v0.6.0
WithBackendFilter sets filter via which could select remote backend.
func WithRuntime ¶ added in v0.7.0
WithRuntime sets runtime
func WithTaskStorageFactory ¶ added in v0.6.0
func WithTaskStorageFactory(factory taskservice.TaskStorageFactory) Option
WithTaskStorageFactory set up the special task storage factory
type RPCRequest ¶
RPCRequest is request message type used in morpc
func (*RPCRequest) DebugString ¶
func (r *RPCRequest) DebugString() string
func (*RPCRequest) GetID ¶
func (r *RPCRequest) GetID() uint64
func (*RPCRequest) GetPayloadField ¶
func (r *RPCRequest) GetPayloadField() []byte
func (*RPCRequest) Release ¶
func (r *RPCRequest) Release()
func (*RPCRequest) SetID ¶
func (r *RPCRequest) SetID(id uint64)
func (*RPCRequest) SetPayloadField ¶
func (r *RPCRequest) SetPayloadField(data []byte)
func (*RPCRequest) Size ¶ added in v0.7.0
func (r *RPCRequest) Size() int
type RPCResponse ¶
RPCResponse is response message type used in morpc
func (*RPCResponse) DebugString ¶
func (r *RPCResponse) DebugString() string
func (*RPCResponse) GetID ¶
func (r *RPCResponse) GetID() uint64
func (*RPCResponse) GetPayloadField ¶
func (r *RPCResponse) GetPayloadField() []byte
func (*RPCResponse) Release ¶
func (r *RPCResponse) Release()
func (*RPCResponse) SetID ¶
func (r *RPCResponse) SetID(id uint64)
func (*RPCResponse) SetPayloadField ¶
func (r *RPCResponse) SetPayloadField(data []byte)
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service is the top layer component of a log service node. It manages the underlying log store which in turn manages all log shards including the HAKeeper shard. The Log Service component communicates with LogService clients owned by DN nodes and the HAKeeper service via network, it can be considered as the interface layer of the LogService.
func NewService ¶
func NewService( cfg Config, fileService fileservice.FileService, opts ...Option, ) (*Service, error)
func (*Service) BootstrapHAKeeper ¶ added in v0.6.0
type ShardInfo ¶
type Unmarshaler ¶
type WrappedService ¶ added in v0.6.0
type WrappedService struct {
// contains filtered or unexported fields
}
func NewWrappedService ¶ added in v0.6.0
func NewWrappedService( c Config, fileService fileservice.FileService, opts ...Option, ) (*WrappedService, error)
func (*WrappedService) Close ¶ added in v0.6.0
func (w *WrappedService) Close() error
func (*WrappedService) CreateInitTasks ¶ added in v0.6.0
func (w *WrappedService) CreateInitTasks() error
func (*WrappedService) GetClusterState ¶ added in v0.6.0
func (w *WrappedService) GetClusterState() (*pb.CheckerState, error)
func (*WrappedService) GetTaskService ¶ added in v0.6.0
func (w *WrappedService) GetTaskService() (taskservice.TaskService, bool)
func (*WrappedService) ID ¶ added in v0.6.0
func (w *WrappedService) ID() string
func (*WrappedService) IsLeaderHakeeper ¶ added in v0.6.0
func (w *WrappedService) IsLeaderHakeeper() (bool, error)
func (*WrappedService) SetInitialClusterInfo ¶ added in v0.6.0
func (w *WrappedService) SetInitialClusterInfo( logShardNum, dnShardNum, logReplicaNum uint64, ) error
func (*WrappedService) Start ¶ added in v0.6.0
func (w *WrappedService) Start() error
func (*WrappedService) StartHAKeeperReplica ¶ added in v0.6.0
func (w *WrappedService) StartHAKeeperReplica( replicaID uint64, replicas map[uint64]dragonboat.Target, join bool, ) error
StartHAKeeperReplica TODO: start hakeeper with specified log store, specified by caller