commitlog

package
v0.0.0-...-e848abc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 6, 2015 License: MIT Imports: 5 Imported by: 0

Documentation

Overview

The commitlog package contains the raft commit log manipulation logic. It doesn't implement any storage logic which is handled through the LogStorage interface.

Index

Constants

View Source
const DEFAULT_PERIODIC_SYNC_PERIOD = 5 * time.Second

DEFAULT_PERIODIC_SYNC_PERIOD defines how frequently Sync to storage is called when using the PERIODIC_SYNC sync policy.

View Source
const DEFAULT_SYNC_POLICY = OSSYNC

The commitlog can implement different disk sync policies. The default is to allow the operating system to decide when to sync the file contents to disk. This breaks the Raft algorithm but is required to achieve maximum performance.

Both Periodic Sync and Write Sync on linux ext4 results in stalls that trigger leadership elections.

Configuration of the policy using the admin tool isn't supported yet.

View Source
const MAX_MESSAGE_RESULTS = 500

MAX_MESSAGE_RESULTS is the maximum number of messages that a client may request. In practise more may be returned.

View Source
const MAX_RETRIEVE_MESSAGE_COUNT = 2500

MAX_RETRIEVE_MESSAGE_COUNT is the maximum number of messages that a leader node may send to a follower node that is behind.

Variables

This section is empty.

Functions

This section is empty.

Types

type CommitLog

type CommitLog struct {
	// contains filtered or unexported fields
}

The CommitLog is the api into the Raft commit log for the server. One instance is created per topic.

func (*CommitLog) Append

func (clog *CommitLog) Append(msgs model.Messages, previousIndex int64, previousTerm int64, leaderFirstIndex int64) (nextIndex int64, previousMatch bool, err error)

Append logs the given message and returns the index of the next slot or an error if the message could not be logged. If the end of the log doesn't match the previousIndex and the previousTerm the append must fail and previousMatch should be false. If there is an existing entry at msg.Index with a different msg.Term then this entry and all subsequent entries must be deleted prior to the append.

NOTE: The lock is not held in this method as we do not touch commitIndex or waitingReaders

func (*CommitLog) Commit

func (clog *CommitLog) Commit(index int64) (err error)

Commit moves all messages between the current commitIndex and index into a committed state.

NOTE: The lock is used so we can access commitIndex AND waitingReaders

func (*CommitLog) ExpVar

func (clog *CommitLog) ExpVar() interface{}

func (*CommitLog) FirstIndex

func (clog *CommitLog) FirstIndex() (firstIndex int64, err error)

Get the first index in the log

func (*CommitLog) Get

func (clog *CommitLog) Get(ID int64, quantity int, waitForMessages bool) (msgs [][]byte, nextID int64, err error)

Get retrieves messages from the log, starting at ID If there is no message ID and waitForMessage is true then the method will block until at least one message is available.

NOTE: The lock is held so we can use waitingReaders

func (*CommitLog) GetCurrentCommit

func (clog *CommitLog) GetCurrentCommit() int64

func (*CommitLog) GetLogStorage

func (clog *CommitLog) GetLogStorage() LogStorage

func (*CommitLog) LastIndex

func (clog *CommitLog) LastIndex() (lastIndex int64, err error)

Get the last index in the log

func (*CommitLog) LastLogEntryInfo

func (clog *CommitLog) LastLogEntryInfo() (lastIndex int64, lastTerm int64, err error)

Used for voting - returns the last term and index in the log

func (*CommitLog) Queue

func (clog *CommitLog) Queue(term int64, modelMsgs model.Messages) (IDs []int64, err error)

Queue is used to append client messages when the node is a leader.

NOTE: The lock is not held in this method as we do not touch commitIndex or waitingReaders

func (*CommitLog) Retrieve

func (clog *CommitLog) Retrieve(ID int64) (previousIndex int64, previousTerm int64, msgs model.Messages, err error)

Retrieve is used by the leader to get messages for distribution to other peers.

If there are no messages at this ID, then previousIndex and previousTerm will be set to 0 and the first available messages will be returned.

NOTE: The lock is not held in this method as we do not touch commitIndex or waitingReaders

func (*CommitLog) SetupLog

func (clog *CommitLog) SetupLog(topicName string, store LogStorage) (err error)

LoadLogInfo is used at startup by the Node to read in the persisted state. PersistentLog's are expected at this point to validate their storage and determine the current state.

commitIndex is the last committed index ID.
nextIndex is the index ID for the next message to be appended to this log.
error is populated if there are any issues loadnig the persisted state that are unrecoverable from.

func (*CommitLog) Shutdown

func (clog *CommitLog) Shutdown()

Shutdown is used by the Node to notify any clients waiting on the appearance of new messages that we are in shutdown and should return empty handed.

It does not trigger shutdown of the storage.

type CommitLogInfo

type CommitLogInfo struct {
	LastCommittedID int64
	Storage         interface{}
}

CommitLogInfo is a container structure for recording information on the state of the CommitLog for use with ExpVar.

type DiskSyncPolicy

type DiskSyncPolicy int
const (
	// The WRITE_SYNC policy triggers a call to Sync the underlying storage after each completed Append.  Required for full Raft compliancy.
	WRITE_SYNC DiskSyncPolicy = iota
	// The PERIODIC_SYNC policy triggers the call to Sync periodically (set by DEFAULT_PERIODIC_SYNC_PERIOD)
	PERIODIC_SYNC
	// OSSYNC does not explicitly call Sync on the underlying storage and allows the OS to decide when to sync data to disk.  This provides the best performance.
	OSSYNC
)

type LogStorage

type LogStorage interface {
	// GetMessage returns a set of approximately count messages starting at index
	// The number of messages returned may be lower or higher than the number requested
	// If only one message is requested and it exists then it will be returned
	// If index > lastIndex then GetMessages returns an empty list with no error
	GetMessages(index int64, count int64) (model.Messages, error)

	// AppendMessages writes the given messages to the end of the log.
	// CommitLog is responsible for ensuring consistency - the LogStorage needs to just append the messages
	AppendMessages(msgs model.Messages) (lastIndex int64, err error)

	// AppendFirstMessages is used by the commit log to record the first messages in the log.
	// leaderFirstIndex is used if the log is empty to determine the first real index of the log (not always 1)
	AppendFirstMessages(msgs model.Messages, leaderFirstIndex int64) (lastIndex int64, err error)

	// Commit records any messages stored with AppendMessages to the underlying storage.
	Sync() error

	// GetLastIndex returns the index of the last message in the log.
	// A lastIndex of 0 is returned if there are no messages.
	GetLastIndex() (lastIndex int64, err error)

	// GetFirstIndex returns the index of the first message in the log.
	// A lastIndex of 0 is returned if there are no messages.
	GetFirstIndex() (firstIndex int64, err error)

	// TruncateMessages is used by CommitLog to remove messages that it knows are invalid (happens during leadership changes)
	// Truncate removes messages at this index and beyond, i.e. TruncateMessages(1) removes all messages
	TruncateMessages(index int64) error

	// ExpVar returns stats for ExpVar
	ExpVar() interface{}

	// Shutdown frees any resources used by the storage and closes any maintenance goroutines.
	Shutdown(notifier *utils.ShutdownNotifier)
}

LogStorage is the interface between CommitLog and the storage mechanism.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL