log

package
v0.0.0-...-40b2247 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 30, 2024 License: MIT Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const RaftRPC = 1

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	Raft struct {
		raft.Config
		StreamLayer *StreamLayer
		Bootstrap   bool
	}
	Segment struct {
		MaxStoreBytes uint64
		MaxIndexBytes uint64
		InitialOffset uint64
	}
}

type DistributedLog

type DistributedLog struct {
	// contains filtered or unexported fields
}

func NewDistributedLog

func NewDistributedLog(dataDir string, config Config) (*DistributedLog, error)

NewDistributedLog returns a pointer to a new DistributedLog. Creates a log and then sets up Raft.

func (*DistributedLog) Append

func (l *DistributedLog) Append(record *api.Record) (uint64, error)

Append appends the record to the log. Apply a command to Raft that tells the fsm to append the record to the log. Raft then replicates the command to a majority of the Raft servers.

func (*DistributedLog) Close

func (l *DistributedLog) Close() error

Close shuts down the Raft instance and closes the local log.

func (*DistributedLog) GetServers

func (l *DistributedLog) GetServers() ([]*api.Server, error)

GetServers converts the data from Raft's raft.Server type into *api.Server for API to respond with.

func (*DistributedLog) Join

func (l *DistributedLog) Join(id, addr string) error

Join adds the server to the Raft cluster. Every server added joins as a voter (AddNonVoter() API for non-vote status). Addition of non-voter servers are for replicating state to many servers to serve read only eventually consistent state. Each addition of a voter increases the probability that replications and elections will take longer because the leader has more servers it need to communicate with to reach a majority

func (*DistributedLog) Leave

func (l *DistributedLog) Leave(id string) error

Leave removes the server from the cluster. Removing the leader will trigger an election. Raft will error and return ErrNotLeader when trying to change the cluster on non-leader nodes.

func (*DistributedLog) Read

func (l *DistributedLog) Read(offset uint64) (*api.Record, error)

Read reads the record for the offset from the server's log. If ok with relaxed consistency, reads don't have to go through Raft for inc efficiency.

func (*DistributedLog) WaitForLeader

func (l *DistributedLog) WaitForLeader(timeout time.Duration) error

WaitForLeader blocks until the cluster has elected a leader or times out.

type Log

type Log struct {
	Dir    string
	Config Config
	// contains filtered or unexported fields
}

Log consists of a list of segments and a pointer to the active segment to append writes to. Directory is where the segments are stored.

func NewLog

func NewLog(dir string, c Config) (*Log, error)

NewLog(dir string, c Config) sets defaults for the configs the caller didn't specify, create a Log instance and set up that instance.

func (*Log) Append

func (l *Log) Append(record *api.Record) (uint64, error)

Append(*api.Record) appends a record to the log. Append the record to the active segment. If the segment is at max size (found from Config), create a new active segment. TODO: Change to lock the segment rather than the whole Log. Need to only lock the Log when we are creating a new active segment.

func (*Log) Close

func (l *Log) Close() error

Close() iterates over segments and closes them

func (*Log) HighestOffset

func (l *Log) HighestOffset() (uint64, error)

func (*Log) LowestOffset

func (l *Log) LowestOffset() (uint64, error)

func (*Log) Read

func (l *Log) Read(off uint64) (*api.Record, error)

Read(offset uint64) reads the record stored at the given offset. First find the segment that contains the record. Segment's base offset is the smallest offset in the Segment. Iterate over segments until first segment whos base offset is <= offset searching for. When the segment has the record searching for, get index entry from the segment's index file & then read data out of the segment's store file. Return the data to caller

func (*Log) Reader

func (l *Log) Reader() io.Reader

Reader() returns an io.Reader to read the whole Log. Needed for coordinate consensus and need to support snapshots and restoring a log. io.MultiReader concatenate the segments' stores. segments' stores are wrapped by originReader to satisfy the io.Reader interface for the io.MultiReader call, and to ensure reading starts from the origin of the store and reads its entire file.

func (*Log) Remove

func (l *Log) Remove() error

Remove() closes the Log and removes its data

func (*Log) Reset

func (l *Log) Reset() error

Reset() removes the log and then creates a new Log to replace it.

func (*Log) Truncate

func (l *Log) Truncate(lowest uint64) error

Truncate(uint64) removes all segments whose highest offset is lower than the lowest. Remove old segments whose data should have been processed and is no longer needed.

type Replicator

type Replicator struct {
	//Options to configure the client
	DialOptions []grpc.DialOption
	LocalServer api.LogClient
	// contains filtered or unexported fields
}

Replicator connects to other servers with the gRPC client.

func (*Replicator) Close

func (r *Replicator) Close() error

Close closes the replicator so it doesn't replicate new servers that join the cluster and it stops replicating existing servers by causing the replicate() goroutine to return

func (*Replicator) Join

func (r *Replicator) Join(name, addr string) error

Join adds the given server address to the list of servers to replicate. Starts go routine for the actual replication logic

func (*Replicator) Leave

func (r *Replicator) Leave(name string) error

type RequestType

type RequestType uint8
const (
	AppendRequestType RequestType = 0
)

type StreamLayer

type StreamLayer struct {
	// contains filtered or unexported fields
}

func NewStreamLayer

func NewStreamLayer(ln net.Listener, serverTLSConfig, peerTLSConfig *tls.Config) *StreamLayer

func (*StreamLayer) Accept

func (s *StreamLayer) Accept() (net.Conn, error)

Accept is the mirror of Dial. Accept incoming connection and read the byte that id's the connection and then create the server-sid TLS connection

func (*StreamLayer) Addr

func (s *StreamLayer) Addr() net.Addr

Addr returns the listener's address

func (*StreamLayer) Close

func (s *StreamLayer) Close() error

Close closes the listener

func (*StreamLayer) Dial

func (s *StreamLayer) Dial(addr raft.ServerAddress, timeout time.Duration) (net.Conn, error)

Dial makes outgoing connections to other servers in the Raft cluster. Once connected to a server, write the RaftRPC byte to identify the connection type to multiplex Raft on the same port as Log gRPC requests

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL