Documentation ¶
Overview ¶
The commitlog package contains the raft commit log manipulation logic. It doesn't implement any storage logic which is handled through the LogStorage interface.
Index ¶
- Constants
- type CommitLog
- func (clog *CommitLog) Append(msgs model.Messages, previousIndex int64, previousTerm int64, ...) (nextIndex int64, previousMatch bool, err error)
- func (clog *CommitLog) Commit(index int64) (err error)
- func (clog *CommitLog) ExpVar() interface{}
- func (clog *CommitLog) FirstIndex() (firstIndex int64, err error)
- func (clog *CommitLog) Get(ID int64, quantity int, waitForMessages bool) (msgs [][]byte, nextID int64, err error)
- func (clog *CommitLog) GetCurrentCommit() int64
- func (clog *CommitLog) GetLogStorage() LogStorage
- func (clog *CommitLog) LastIndex() (lastIndex int64, err error)
- func (clog *CommitLog) LastLogEntryInfo() (lastIndex int64, lastTerm int64, err error)
- func (clog *CommitLog) Queue(term int64, modelMsgs model.Messages) (IDs []int64, err error)
- func (clog *CommitLog) Retrieve(ID int64) (previousIndex int64, previousTerm int64, msgs model.Messages, err error)
- func (clog *CommitLog) SetupLog(topicName string, store LogStorage) (err error)
- func (clog *CommitLog) Shutdown()
- type CommitLogInfo
- type DiskSyncPolicy
- type LogStorage
Constants ¶
const DEFAULT_PERIODIC_SYNC_PERIOD = 5 * time.Second
DEFAULT_PERIODIC_SYNC_PERIOD defines how frequently Sync to storage is called when using the PERIODIC_SYNC sync policy.
const DEFAULT_SYNC_POLICY = OSSYNC
The commitlog can implement different disk sync policies. The default is to allow the operating system to decide when to sync the file contents to disk. This breaks the Raft algorithm but is required to achieve maximum performance.
Both Periodic Sync and Write Sync on linux ext4 results in stalls that trigger leadership elections.
Configuration of the policy using the admin tool isn't supported yet.
const MAX_MESSAGE_RESULTS = 500
MAX_MESSAGE_RESULTS is the maximum number of messages that a client may request. In practise more may be returned.
const MAX_RETRIEVE_MESSAGE_COUNT = 2500
MAX_RETRIEVE_MESSAGE_COUNT is the maximum number of messages that a leader node may send to a follower node that is behind.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CommitLog ¶
type CommitLog struct {
// contains filtered or unexported fields
}
The CommitLog is the api into the Raft commit log for the server. One instance is created per topic.
func (*CommitLog) Append ¶
func (clog *CommitLog) Append(msgs model.Messages, previousIndex int64, previousTerm int64, leaderFirstIndex int64) (nextIndex int64, previousMatch bool, err error)
Append logs the given message and returns the index of the next slot or an error if the message could not be logged. If the end of the log doesn't match the previousIndex and the previousTerm the append must fail and previousMatch should be false. If there is an existing entry at msg.Index with a different msg.Term then this entry and all subsequent entries must be deleted prior to the append.
NOTE: The lock is not held in this method as we do not touch commitIndex or waitingReaders
func (*CommitLog) Commit ¶
Commit moves all messages between the current commitIndex and index into a committed state.
NOTE: The lock is used so we can access commitIndex AND waitingReaders
func (*CommitLog) FirstIndex ¶
Get the first index in the log
func (*CommitLog) Get ¶
func (clog *CommitLog) Get(ID int64, quantity int, waitForMessages bool) (msgs [][]byte, nextID int64, err error)
Get retrieves messages from the log, starting at ID If there is no message ID and waitForMessage is true then the method will block until at least one message is available.
NOTE: The lock is held so we can use waitingReaders
func (*CommitLog) GetCurrentCommit ¶
func (*CommitLog) GetLogStorage ¶
func (clog *CommitLog) GetLogStorage() LogStorage
func (*CommitLog) LastLogEntryInfo ¶
Used for voting - returns the last term and index in the log
func (*CommitLog) Queue ¶
Queue is used to append client messages when the node is a leader.
NOTE: The lock is not held in this method as we do not touch commitIndex or waitingReaders
func (*CommitLog) Retrieve ¶
func (clog *CommitLog) Retrieve(ID int64) (previousIndex int64, previousTerm int64, msgs model.Messages, err error)
Retrieve is used by the leader to get messages for distribution to other peers.
If there are no messages at this ID, then previousIndex and previousTerm will be set to 0 and the first available messages will be returned.
NOTE: The lock is not held in this method as we do not touch commitIndex or waitingReaders
func (*CommitLog) SetupLog ¶
func (clog *CommitLog) SetupLog(topicName string, store LogStorage) (err error)
LoadLogInfo is used at startup by the Node to read in the persisted state. PersistentLog's are expected at this point to validate their storage and determine the current state.
commitIndex is the last committed index ID. nextIndex is the index ID for the next message to be appended to this log. error is populated if there are any issues loadnig the persisted state that are unrecoverable from.
type CommitLogInfo ¶
type CommitLogInfo struct { LastCommittedID int64 Storage interface{} }
CommitLogInfo is a container structure for recording information on the state of the CommitLog for use with ExpVar.
type DiskSyncPolicy ¶
type DiskSyncPolicy int
const ( // The WRITE_SYNC policy triggers a call to Sync the underlying storage after each completed Append. Required for full Raft compliancy. WRITE_SYNC DiskSyncPolicy = iota // The PERIODIC_SYNC policy triggers the call to Sync periodically (set by DEFAULT_PERIODIC_SYNC_PERIOD) PERIODIC_SYNC // OSSYNC does not explicitly call Sync on the underlying storage and allows the OS to decide when to sync data to disk. This provides the best performance. OSSYNC )
type LogStorage ¶
type LogStorage interface { // GetMessage returns a set of approximately count messages starting at index // The number of messages returned may be lower or higher than the number requested // If only one message is requested and it exists then it will be returned // If index > lastIndex then GetMessages returns an empty list with no error GetMessages(index int64, count int64) (model.Messages, error) // AppendMessages writes the given messages to the end of the log. // CommitLog is responsible for ensuring consistency - the LogStorage needs to just append the messages AppendMessages(msgs model.Messages) (lastIndex int64, err error) // AppendFirstMessages is used by the commit log to record the first messages in the log. // leaderFirstIndex is used if the log is empty to determine the first real index of the log (not always 1) AppendFirstMessages(msgs model.Messages, leaderFirstIndex int64) (lastIndex int64, err error) // Commit records any messages stored with AppendMessages to the underlying storage. Sync() error // GetLastIndex returns the index of the last message in the log. // A lastIndex of 0 is returned if there are no messages. GetLastIndex() (lastIndex int64, err error) // GetFirstIndex returns the index of the first message in the log. // A lastIndex of 0 is returned if there are no messages. GetFirstIndex() (firstIndex int64, err error) // TruncateMessages is used by CommitLog to remove messages that it knows are invalid (happens during leadership changes) // Truncate removes messages at this index and beyond, i.e. TruncateMessages(1) removes all messages TruncateMessages(index int64) error // ExpVar returns stats for ExpVar ExpVar() interface{} // Shutdown frees any resources used by the storage and closes any maintenance goroutines. Shutdown(notifier *utils.ShutdownNotifier) }
LogStorage is the interface between CommitLog and the storage mechanism.