Documentation
¶
Index ¶
- Constants
- type Config
- type DistributedLog
- func (l *DistributedLog) Append(record *api.Record) (uint64, error)
- func (l *DistributedLog) Close() error
- func (l *DistributedLog) GetServers() ([]*api.Server, error)
- func (l *DistributedLog) Join(id, addr string) error
- func (l *DistributedLog) Leave(id string) error
- func (l *DistributedLog) Read(offset uint64) (*api.Record, error)
- func (l *DistributedLog) WaitForLeader(timeout time.Duration) error
- type Log
- func (l *Log) Append(record *api.Record) (uint64, error)
- func (l *Log) Close() error
- func (l *Log) HighestOffset() (uint64, error)
- func (l *Log) LowestOffset() (uint64, error)
- func (l *Log) Read(off uint64) (*api.Record, error)
- func (l *Log) Reader() io.Reader
- func (l *Log) Remove() error
- func (l *Log) Reset() error
- func (l *Log) Truncate(lowestOffset uint64) error
- type Replicator
- type RequestType
- type StreamLayer
Constants ¶
const ( DefaultMaxStoreBytes uint64 = 1024 DefaultMaxIndexBytes uint64 = 1024 )
const RaftRPC = 1
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DistributedLog ¶
type DistributedLog struct {
// contains filtered or unexported fields
}
DistributedLog implements a Raft consensus driven replicated log.
func NewDistributedLog ¶
func NewDistributedLog(dataDir string, config Config) (*DistributedLog, error)
func (*DistributedLog) Close ¶
func (l *DistributedLog) Close() error
Shutsdown the associated raft instances and closes the underlying commit log.
func (*DistributedLog) GetServers ¶
func (l *DistributedLog) GetServers() ([]*api.Server, error)
Returns a slice of all the servers in the cluster of which this server is a member.
func (*DistributedLog) Join ¶
func (l *DistributedLog) Join(id, addr string) error
Serf cluster membership "join" event handler.
func (*DistributedLog) Leave ¶
func (l *DistributedLog) Leave(id string) error
Serf "leave" event handler for our Raft cluster.
func (*DistributedLog) WaitForLeader ¶
func (l *DistributedLog) WaitForLeader(timeout time.Duration) error
Waits for leader to be elected synchronously. We check every second upto the given timeout duration whether a leader has been elected or not. If the leader is elected at some tick second we return. Otherwise we return after the timeout duration with an error.
This method is mostly useful in tests.
type Log ¶
Represents an append only Log of records.
A log is paritioned into a collection of segments, sorted by the offsets of the records they contain. The last segment is the active segment.
Writes goto the last segment till it's capacity is maxed out. Once it's capacity is full, we create new segment and set it as the active segment.
Read operations are serviced by a linear search on the segments to find the segment which contains the given offset. If the segment is found, we simply utilize its Read() operation to read the record.
func (*Log) Append ¶
Appends the given record to the active segment. If the active segment is is maxed out, it creates a new segment and sets it as the active segment. Returns the offset to which the record was written. In case of errors, 0 us returned as the offset, along with the error.
func (*Log) HighestOffset ¶
func (*Log) LowestOffset ¶
func (*Log) Read ¶
Read looks for the segment in this log containing this offset, and returns the invocation of (*segment).Read() on it.
func (*Log) Reader ¶
Returns a continuous io.Reader over all segments in this Log.
This utilizes io.MultiReader to concatenate the io.Reader implementations of every segment's store in this log.
type Replicator ¶
type Replicator struct { DialOptions []grpc.DialOption LocalServer api.LogClient // contains filtered or unexported fields }
func (*Replicator) Close ¶
func (r *Replicator) Close() error
func (*Replicator) Join ¶
func (r *Replicator) Join(name, addr string) error
"Joins" or adds this server to the list of server to replicate records from. It starts off record replication from this server in a goroutine and returns.
func (*Replicator) Leave ¶
func (r *Replicator) Leave(name string) error
Removes this server from the list of servers to replicate records from. Also signals the replicating goroutine for this server to return.
type RequestType ¶
type RequestType uint8
Type of request. (Enumeration of request types)
const (
AppendRequestType RequestType = 0
)
type StreamLayer ¶
type StreamLayer struct {
// contains filtered or unexported fields
}
Raft network communication layer implementation.
func NewStreamLayer ¶
func NewStreamLayer( ln net.Listener, serverTLSConfig, peerTLSConfig *tls.Config, ) *StreamLayer
func (*StreamLayer) Accept ¶
func (s *StreamLayer) Accept() (net.Conn, error)
Accepts incoming requests from other servers. This method checks if the first byte read matches the Raft RPC identifying byte. If it doesn't match we error our. We proceed normally if the byte matches. This method uses the server's TLS config to identify as a server.
Returns the connection made along with an error if any.
func (*StreamLayer) Addr ¶
func (s *StreamLayer) Addr() net.Addr
func (*StreamLayer) Close ¶
func (s *StreamLayer) Close() error
func (*StreamLayer) Dial ¶
func (s *StreamLayer) Dial( addr raft.ServerAddress, timeout time.Duration, ) (net.Conn, error)
Makes outgoing connections to other servers in the Raft cluster. We we connect to a server, we write a byte identifying this connection as a Raft RPC connection. This enables us to use the same port for Raft as well as Log gRPC requests. This method uses the peer TLS config to identiy as a client.
Returns the connection made, along with an error if any.